diff --git a/cmd/ks-apiserver/app/options/options.go b/cmd/ks-apiserver/app/options/options.go index a5cebd046..36b116350 100644 --- a/cmd/ks-apiserver/app/options/options.go +++ b/cmd/ks-apiserver/app/options/options.go @@ -87,7 +87,6 @@ func (s *ServerRunOptions) Flags() (fss cliflag.NamedFlagSets) { s.AuthorizationOptions.AddFlags(fss.FlagSet("authorization"), s.AuthorizationOptions) s.DevopsOptions.AddFlags(fss.FlagSet("devops"), s.DevopsOptions) s.SonarQubeOptions.AddFlags(fss.FlagSet("sonarqube"), s.SonarQubeOptions) - s.RedisOptions.AddFlags(fss.FlagSet("redis"), s.RedisOptions) s.S3Options.AddFlags(fss.FlagSet("s3"), s.S3Options) s.OpenPitrixOptions.AddFlags(fss.FlagSet("openpitrix"), s.OpenPitrixOptions) s.NetworkOptions.AddFlags(fss.FlagSet("network"), s.NetworkOptions) @@ -176,21 +175,23 @@ func (s *ServerRunOptions) NewAPIServer(stopCh <-chan struct{}) (*apiserver.APIS apiServer.SonarClient = sonarqube.NewSonar(sonarClient.SonarQube()) } - var cacheClient cache.Interface - if s.RedisOptions != nil && len(s.RedisOptions.Host) != 0 { - if s.RedisOptions.Host == fakeInterface && s.DebugMode { - apiServer.CacheClient = cache.NewSimpleCache() - } else { - cacheClient, err = cache.NewRedisClient(s.RedisOptions, stopCh) - if err != nil { - return nil, fmt.Errorf("failed to connect to redis service, please check redis status, error: %v", err) - } - apiServer.CacheClient = cacheClient + // If debug mode is on or CacheOptions is nil, will create a fake cache. + if s.CacheOptions.Type != "" { + if s.DebugMode { + s.CacheOptions.Type = cache.DefaultCacheType } + cacheClient, err := cache.New(s.CacheOptions, stopCh) + if err != nil { + return nil, fmt.Errorf("failed to create cache, error: %v", err) + } + apiServer.CacheClient = cacheClient } else { - klog.Warning("ks-apiserver starts without redis provided, it will use in memory cache. " + - "This may cause inconsistencies when running ks-apiserver with multiple replicas.") - apiServer.CacheClient = cache.NewSimpleCache() + s.CacheOptions = &cache.Options{Type: cache.DefaultCacheType} + // fake cache has no error to return + cacheClient, _ := cache.New(s.CacheOptions, stopCh) + apiServer.CacheClient = cacheClient + klog.Warning("ks-apiserver starts without cache provided, it will use in memory cache. " + + "This may cause inconsistencies when running ks-apiserver with multiple replicas, and memory leak risk") } if s.EventsOptions.Host != "" { diff --git a/pkg/apiserver/config/config.go b/pkg/apiserver/config/config.go index fbc878fd9..aee9996e8 100644 --- a/pkg/apiserver/config/config.go +++ b/pkg/apiserver/config/config.go @@ -160,7 +160,7 @@ type Config struct { ServiceMeshOptions *servicemesh.Options `json:"servicemesh,omitempty" yaml:"servicemesh,omitempty" mapstructure:"servicemesh"` NetworkOptions *network.Options `json:"network,omitempty" yaml:"network,omitempty" mapstructure:"network"` LdapOptions *ldap.Options `json:"-,omitempty" yaml:"ldap,omitempty" mapstructure:"ldap"` - RedisOptions *cache.Options `json:"redis,omitempty" yaml:"redis,omitempty" mapstructure:"redis"` + CacheOptions *cache.Options `json:"cache,omitempty" yaml:"cache,omitempty" mapstructure:"cache"` S3Options *s3.Options `json:"s3,omitempty" yaml:"s3,omitempty" mapstructure:"s3"` OpenPitrixOptions *openpitrix.Options `json:"openpitrix,omitempty" yaml:"openpitrix,omitempty" mapstructure:"openpitrix"` MonitoringOptions *prometheus.Options `json:"monitoring,omitempty" yaml:"monitoring,omitempty" mapstructure:"monitoring"` @@ -189,7 +189,7 @@ func New() *Config { ServiceMeshOptions: servicemesh.NewServiceMeshOptions(), NetworkOptions: network.NewNetworkOptions(), LdapOptions: ldap.NewOptions(), - RedisOptions: cache.NewRedisOptions(), + CacheOptions: cache.NewCacheOptions(), S3Options: s3.NewS3Options(), OpenPitrixOptions: openpitrix.NewOptions(), MonitoringOptions: prometheus.NewPrometheusOptions(), @@ -292,8 +292,8 @@ func (conf *Config) ToMap() map[string]bool { // Remove invalid options before serializing to json or yaml func (conf *Config) stripEmptyOptions() { - if conf.RedisOptions != nil && conf.RedisOptions.Host == "" { - conf.RedisOptions = nil + if conf.CacheOptions != nil && conf.CacheOptions.Type == "" { + conf.CacheOptions = nil } if conf.DevopsOptions != nil && conf.DevopsOptions.Host == "" { diff --git a/pkg/apiserver/config/config_test.go b/pkg/apiserver/config/config_test.go index 55de258e6..6bb5e380c 100644 --- a/pkg/apiserver/config/config_test.go +++ b/pkg/apiserver/config/config_test.go @@ -88,11 +88,9 @@ func newTestConfig() (*Config, error) { MaxCap: 100, PoolName: "ldap", }, - RedisOptions: &cache.Options{ - Host: "localhost", - Port: 6379, - Password: "KUBESPHERE_REDIS_PASSWORD", - DB: 0, + CacheOptions: &cache.Options{ + Type: "redis", + Options: map[string]interface{}{}, }, S3Options: &s3.Options{ Endpoint: "http://minio.openpitrix-system.svc", @@ -236,9 +234,6 @@ func TestGet(t *testing.T) { saveTestConfig(t, conf) defer cleanTestConfig(t) - conf.RedisOptions.Password = "P@88w0rd" - os.Setenv("KUBESPHERE_REDIS_PASSWORD", "P@88w0rd") - conf2, err := TryLoadFromDisk() if err != nil { t.Fatal(err) @@ -251,7 +246,7 @@ func TestGet(t *testing.T) { func TestStripEmptyOptions(t *testing.T) { var config Config - config.RedisOptions = &cache.Options{Host: ""} + config.CacheOptions = &cache.Options{Type: ""} config.DevopsOptions = &jenkins.Options{Host: ""} config.MonitoringOptions = &prometheus.Options{Endpoint: ""} config.SonarQubeOptions = &sonarqube.Options{Host: ""} @@ -284,7 +279,7 @@ func TestStripEmptyOptions(t *testing.T) { config.stripEmptyOptions() - if config.RedisOptions != nil || + if config.CacheOptions != nil || config.DevopsOptions != nil || config.MonitoringOptions != nil || config.SonarQubeOptions != nil || diff --git a/pkg/simple/client/cache/cache.go b/pkg/simple/client/cache/cache.go index c48232ec1..932c01c2f 100644 --- a/pkg/simple/client/cache/cache.go +++ b/pkg/simple/client/cache/cache.go @@ -16,7 +16,17 @@ limitations under the License. package cache -import "time" +import ( + "encoding/json" + "fmt" + "time" + + "k8s.io/klog" +) + +var ( + cacheFactories = make(map[string]CacheFactory) +) var NeverExpire = time.Duration(0) @@ -39,3 +49,32 @@ type Interface interface { // Expires updates object's expiration time, return err if key doesn't exist Expire(key string, duration time.Duration) error } + +// DynamicOptions the options of the cache. For redis, options key can be "host", "port", "db", "password". +// For InMemoryCache, options key can be "cleanupperiod" +type DynamicOptions map[string]interface{} + +func (o DynamicOptions) MarshalJSON() ([]byte, error) { + + data, err := json.Marshal(o) + return data, err +} + +func RegisterCacheFactory(factory CacheFactory) { + cacheFactories[factory.Type()] = factory +} + +func New(option *Options, stopCh <-chan struct{}) (Interface, error) { + if cacheFactories[option.Type] == nil { + err := fmt.Errorf("cache with type %s is not supported", option.Type) + klog.Error(err) + return nil, err + } + + cache, err := cacheFactories[option.Type].Create(option.Options, stopCh) + if err != nil { + klog.Errorf("failed to create cache, error: %v", err) + return nil, err + } + return cache, nil +} diff --git a/pkg/simple/client/cache/factory.go b/pkg/simple/client/cache/factory.go new file mode 100644 index 000000000..45b45c326 --- /dev/null +++ b/pkg/simple/client/cache/factory.go @@ -0,0 +1,8 @@ +package cache + +type CacheFactory interface { + // Type unique type of the cache + Type() string + // Create relevant caches by type + Create(options DynamicOptions, stopCh <-chan struct{}) (Interface, error) +} diff --git a/pkg/simple/client/cache/inmemory_cache.go b/pkg/simple/client/cache/inmemory_cache.go new file mode 100644 index 000000000..95397cb2e --- /dev/null +++ b/pkg/simple/client/cache/inmemory_cache.go @@ -0,0 +1,200 @@ +/* +Copyright 2019 The KubeSphere Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "regexp" + "strings" + "time" + + "github.com/mitchellh/mapstructure" + "k8s.io/apimachinery/pkg/util/wait" + + "kubesphere.io/kubesphere/pkg/server/errors" +) + +var ErrNoSuchKey = errors.New("no such key") + +const ( + typeInMemoryCache = "InMemoryCache" + DefaultCacheType = typeInMemoryCache + + defaultCleanupPeriod = 2 * time.Hour +) + +type simpleObject struct { + value string + neverExpire bool + expiredAt time.Time +} + +func (so *simpleObject) IsExpired() bool { + if so.neverExpire { + return false + } + if time.Now().After(so.expiredAt) { + return true + } + return false +} + +// InMemoryCacheOptions used to create inMemoryCache in memory. +// CleanupPeriod specifies cleans up expired token every period. +// Note the SimpleCache cannot be used in multi-replicas apiserver, +// which will lead to data inconsistency. +type InMemoryCacheOptions struct { + CleanupPeriod time.Duration `json:"cleanupPeriod" yaml:"cleanupPeriod" mapstructure:"cleanupperiod"` +} + +// imMemoryCache implements cache.Interface use memory objects, it should be used only for testing +type inMemoryCache struct { + store map[string]simpleObject +} + +func NewInMemoryCache(options *InMemoryCacheOptions, stopCh <-chan struct{}) (Interface, error) { + var cleanupPeriod time.Duration + cache := &inMemoryCache{ + store: make(map[string]simpleObject), + } + + if options == nil || options.CleanupPeriod == 0 { + cleanupPeriod = defaultCleanupPeriod + } else { + cleanupPeriod = options.CleanupPeriod + } + go wait.Until(cache.cleanInvalidToken, cleanupPeriod, stopCh) + + return cache, nil +} + +func (s *inMemoryCache) cleanInvalidToken() { + for k, v := range s.store { + if v.IsExpired() { + delete(s.store, k) + } + } +} + +func (s *inMemoryCache) Keys(pattern string) ([]string, error) { + // There is a little difference between go regexp and redis key pattern + // In redis, * means any character, while in go . means match everything. + pattern = strings.Replace(pattern, "*", ".", -1) + + re, err := regexp.Compile(pattern) + if err != nil { + return nil, err + } + var keys []string + for k := range s.store { + if re.MatchString(k) { + keys = append(keys, k) + } + } + + return keys, nil +} + +func (s *inMemoryCache) Set(key string, value string, duration time.Duration) error { + sobject := simpleObject{ + value: value, + neverExpire: false, + expiredAt: time.Now().Add(duration), + } + + if duration == NeverExpire { + sobject.neverExpire = true + } + + s.store[key] = sobject + return nil +} + +func (s *inMemoryCache) Del(keys ...string) error { + for _, key := range keys { + delete(s.store, key) + } + return nil +} + +func (s *inMemoryCache) Get(key string) (string, error) { + if sobject, ok := s.store[key]; ok { + if sobject.neverExpire || time.Now().Before(sobject.expiredAt) { + return sobject.value, nil + } + } + + return "", ErrNoSuchKey +} + +func (s *inMemoryCache) Exists(keys ...string) (bool, error) { + for _, key := range keys { + if _, ok := s.store[key]; !ok { + return false, nil + } + } + + return true, nil +} + +func (s *inMemoryCache) Expire(key string, duration time.Duration) error { + value, err := s.Get(key) + if err != nil { + return err + } + + sobject := simpleObject{ + value: value, + neverExpire: false, + expiredAt: time.Now().Add(duration), + } + + if duration == NeverExpire { + sobject.neverExpire = true + } + + s.store[key] = sobject + return nil +} + +type inMemoryCacheFactory struct { +} + +func (sf *inMemoryCacheFactory) Type() string { + return typeInMemoryCache +} + +func (sf *inMemoryCacheFactory) Create(options DynamicOptions, stopCh <-chan struct{}) (Interface, error) { + var sOptions InMemoryCacheOptions + + decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ + DecodeHook: mapstructure.StringToTimeDurationHookFunc(), + WeaklyTypedInput: true, + Result: &sOptions, + }) + if err != nil { + return nil, err + } + if err := decoder.Decode(options); err != nil { + return nil, err + } + + return NewInMemoryCache(&sOptions, stopCh) +} + +func init() { + RegisterCacheFactory(&inMemoryCacheFactory{}) +} diff --git a/pkg/simple/client/cache/simple_cache_test.go b/pkg/simple/client/cache/inmemory_cache_test.go similarity index 98% rename from pkg/simple/client/cache/simple_cache_test.go rename to pkg/simple/client/cache/inmemory_cache_test.go index c444364e6..0a268a674 100644 --- a/pkg/simple/client/cache/simple_cache_test.go +++ b/pkg/simple/client/cache/inmemory_cache_test.go @@ -102,7 +102,7 @@ func TestDeleteAndExpireCache(t *testing.T) { } for _, testCase := range testCases { - cacheClient := NewSimpleCache() + cacheClient, _ := NewInMemoryCache(nil, nil) t.Run(testCase.description, func(t *testing.T) { err := load(cacheClient, dataSet) diff --git a/pkg/simple/client/cache/options.go b/pkg/simple/client/cache/options.go index eb6c6f942..08fd64021 100644 --- a/pkg/simple/client/cache/options.go +++ b/pkg/simple/client/cache/options.go @@ -18,25 +18,19 @@ package cache import ( "fmt" - - "github.com/spf13/pflag" ) type Options struct { - Host string `json:"host" yaml:"host"` - Port int `json:"port" yaml:"port"` - Password string `json:"password" yaml:"password"` - DB int `json:"db" yaml:"db"` + Type string `json:"type"` + Options DynamicOptions `json:"options"` } -// NewRedisOptions returns options points to nowhere, +// NewCacheOptions returns options points to nowhere, // because redis is not required for some components -func NewRedisOptions() *Options { +func NewCacheOptions() *Options { return &Options{ - Host: "", - Port: 0, - Password: "", - DB: 0, + Type: "", + Options: map[string]interface{}{}, } } @@ -44,20 +38,9 @@ func NewRedisOptions() *Options { func (r *Options) Validate() []error { errors := make([]error, 0) - if r.Port == 0 { - errors = append(errors, fmt.Errorf("invalid service port number")) + if r.Type == "" { + errors = append(errors, fmt.Errorf("invalid cache type")) } return errors } - -// AddFlags add option flags to command line flags, -// if redis-host left empty, the following options will be ignored. -func (r *Options) AddFlags(fs *pflag.FlagSet, s *Options) { - fs.StringVar(&r.Host, "redis-host", s.Host, "Redis connection URL. If left blank, means redis is unnecessary, "+ - "redis will be disabled.") - - fs.IntVar(&r.Port, "redis-port", s.Port, "") - fs.StringVar(&r.Password, "redis-password", s.Password, "") - fs.IntVar(&r.DB, "redis-db", s.DB, "") -} diff --git a/pkg/simple/client/cache/redis.go b/pkg/simple/client/cache/redis.go index 77877afd5..aa247d0b8 100644 --- a/pkg/simple/client/cache/redis.go +++ b/pkg/simple/client/cache/redis.go @@ -17,19 +17,31 @@ limitations under the License. package cache import ( + "errors" "fmt" "time" "github.com/go-redis/redis" + "github.com/mitchellh/mapstructure" "k8s.io/klog" ) -type Client struct { +const typeRedis = "redis" + +type redisClient struct { client *redis.Client } -func NewRedisClient(option *Options, stopCh <-chan struct{}) (Interface, error) { - var r Client +// redisOptions used to create a redis client. +type redisOptions struct { + Host string `json:"host" yaml:"host" mapstructure:"host"` + Port int `json:"port" yaml:"port" mapstructure:"port"` + Password string `json:"password" yaml:"password" mapstructure:"password"` + DB int `json:"db" yaml:"db" mapstructure:"db"` +} + +func NewRedisClient(option *redisOptions, stopCh <-chan struct{}) (Interface, error) { + var r redisClient redisOptions := &redis.Options{ Addr: fmt.Sprintf("%s:%d", option.Host, option.Port), @@ -61,23 +73,23 @@ func NewRedisClient(option *Options, stopCh <-chan struct{}) (Interface, error) return &r, nil } -func (r *Client) Get(key string) (string, error) { +func (r *redisClient) Get(key string) (string, error) { return r.client.Get(key).Result() } -func (r *Client) Keys(pattern string) ([]string, error) { +func (r *redisClient) Keys(pattern string) ([]string, error) { return r.client.Keys(pattern).Result() } -func (r *Client) Set(key string, value string, duration time.Duration) error { +func (r *redisClient) Set(key string, value string, duration time.Duration) error { return r.client.Set(key, value, duration).Err() } -func (r *Client) Del(keys ...string) error { +func (r *redisClient) Del(keys ...string) error { return r.client.Del(keys...).Err() } -func (r *Client) Exists(keys ...string) (bool, error) { +func (r *redisClient) Exists(keys ...string) (bool, error) { existedKeys, err := r.client.Exists(keys...).Result() if err != nil { return false, err @@ -86,6 +98,34 @@ func (r *Client) Exists(keys ...string) (bool, error) { return len(keys) == int(existedKeys), nil } -func (r *Client) Expire(key string, duration time.Duration) error { +func (r *redisClient) Expire(key string, duration time.Duration) error { return r.client.Expire(key, duration).Err() } + +type redisFactory struct{} + +func (rf *redisFactory) Type() string { + return typeRedis +} + +func (rf *redisFactory) Create(options DynamicOptions, stopCh <-chan struct{}) (Interface, error) { + var rOptions redisOptions + if err := mapstructure.Decode(options, &rOptions); err != nil { + return nil, err + } + if rOptions.Port == 0 { + return nil, errors.New("invalid service port number") + } + if len(rOptions.Host) == 0 { + return nil, errors.New("invalid service host") + } + client, err := NewRedisClient(&rOptions, stopCh) + if err != nil { + return nil, err + } + return client, nil +} + +func init() { + RegisterCacheFactory(&redisFactory{}) +} diff --git a/pkg/simple/client/cache/simple_cache.go b/pkg/simple/client/cache/simple_cache.go deleted file mode 100644 index 8b9922fb3..000000000 --- a/pkg/simple/client/cache/simple_cache.go +++ /dev/null @@ -1,123 +0,0 @@ -/* -Copyright 2019 The KubeSphere Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cache - -import ( - "regexp" - "strings" - "time" - - "kubesphere.io/kubesphere/pkg/server/errors" -) - -var ErrNoSuchKey = errors.New("no such key") - -type simpleObject struct { - value string - neverExpire bool - expiredAt time.Time -} - -// SimpleCache implements cache.Interface use memory objects, it should be used only for testing -type simpleCache struct { - store map[string]simpleObject -} - -func NewSimpleCache() Interface { - return &simpleCache{store: make(map[string]simpleObject)} -} - -func (s *simpleCache) Keys(pattern string) ([]string, error) { - // There is a little difference between go regexp and redis key pattern - // In redis, * means any character, while in go . means match everything. - pattern = strings.Replace(pattern, "*", ".", -1) - - re, err := regexp.Compile(pattern) - if err != nil { - return nil, err - } - var keys []string - for k := range s.store { - if re.MatchString(k) { - keys = append(keys, k) - } - } - - return keys, nil -} - -func (s *simpleCache) Set(key string, value string, duration time.Duration) error { - sobject := simpleObject{ - value: value, - neverExpire: false, - expiredAt: time.Now().Add(duration), - } - - if duration == NeverExpire { - sobject.neverExpire = true - } - - s.store[key] = sobject - return nil -} - -func (s *simpleCache) Del(keys ...string) error { - for _, key := range keys { - delete(s.store, key) - } - return nil -} - -func (s *simpleCache) Get(key string) (string, error) { - if sobject, ok := s.store[key]; ok { - if sobject.neverExpire || time.Now().Before(sobject.expiredAt) { - return sobject.value, nil - } - } - - return "", ErrNoSuchKey -} - -func (s *simpleCache) Exists(keys ...string) (bool, error) { - for _, key := range keys { - if _, ok := s.store[key]; !ok { - return false, nil - } - } - - return true, nil -} - -func (s *simpleCache) Expire(key string, duration time.Duration) error { - value, err := s.Get(key) - if err != nil { - return err - } - - sobject := simpleObject{ - value: value, - neverExpire: false, - expiredAt: time.Now().Add(duration), - } - - if duration == NeverExpire { - sobject.neverExpire = true - } - - s.store[key] = sobject - return nil -} diff --git a/pkg/simple/client/kiali/client_test.go b/pkg/simple/client/kiali/client_test.go index 2130b5132..75e9e065a 100644 --- a/pkg/simple/client/kiali/client_test.go +++ b/pkg/simple/client/kiali/client_test.go @@ -38,6 +38,11 @@ func TestClient_Get(t *testing.T) { type args struct { url string } + + inMemoryCache, err := cache.NewInMemoryCache(nil, nil) + if err != nil { + t.Fatal(err) + } token, _ := json.Marshal( &TokenResponse{ Username: "test", @@ -92,7 +97,7 @@ func TestClient_Get(t *testing.T) { name: "Token", fields: fields{ Strategy: AuthStrategyToken, - cache: cache.NewSimpleCache(), + cache: inMemoryCache, client: &MockClient{ TokenResult: token, RequestResult: "fake",