[release-3.3] add dynamic options for cache (#5325)

* add dynamic options for cache

* fixed bugs based on unit-test

* add doc for cache

* make cache implements be private

* Change simpleCache name to InMemoryCache

Signed-off-by: Wenhao Zhou <wenhaozhou@yunify.com>

* Remove fake cache and replacing to in memory cache with default parameter

Signed-off-by: Wenhao Zhou <wenhaozhou@yunify.com>

Signed-off-by: Wenhao Zhou <wenhaozhou@yunify.com>
Co-authored-by: Wenhao Zhou <wenhaozhou@yunify.com>
This commit is contained in:
KubeSphere CI Bot
2022-11-03 15:55:00 +08:00
committed by GitHub
parent 80b0301f79
commit c0419ddab5
11 changed files with 336 additions and 188 deletions

View File

@@ -87,7 +87,6 @@ func (s *ServerRunOptions) Flags() (fss cliflag.NamedFlagSets) {
s.AuthorizationOptions.AddFlags(fss.FlagSet("authorization"), s.AuthorizationOptions)
s.DevopsOptions.AddFlags(fss.FlagSet("devops"), s.DevopsOptions)
s.SonarQubeOptions.AddFlags(fss.FlagSet("sonarqube"), s.SonarQubeOptions)
s.RedisOptions.AddFlags(fss.FlagSet("redis"), s.RedisOptions)
s.S3Options.AddFlags(fss.FlagSet("s3"), s.S3Options)
s.OpenPitrixOptions.AddFlags(fss.FlagSet("openpitrix"), s.OpenPitrixOptions)
s.NetworkOptions.AddFlags(fss.FlagSet("network"), s.NetworkOptions)
@@ -176,21 +175,23 @@ func (s *ServerRunOptions) NewAPIServer(stopCh <-chan struct{}) (*apiserver.APIS
apiServer.SonarClient = sonarqube.NewSonar(sonarClient.SonarQube())
}
var cacheClient cache.Interface
if s.RedisOptions != nil && len(s.RedisOptions.Host) != 0 {
if s.RedisOptions.Host == fakeInterface && s.DebugMode {
apiServer.CacheClient = cache.NewSimpleCache()
} else {
cacheClient, err = cache.NewRedisClient(s.RedisOptions, stopCh)
if err != nil {
return nil, fmt.Errorf("failed to connect to redis service, please check redis status, error: %v", err)
}
apiServer.CacheClient = cacheClient
// If debug mode is on or CacheOptions is nil, will create a fake cache.
if s.CacheOptions.Type != "" {
if s.DebugMode {
s.CacheOptions.Type = cache.DefaultCacheType
}
cacheClient, err := cache.New(s.CacheOptions, stopCh)
if err != nil {
return nil, fmt.Errorf("failed to create cache, error: %v", err)
}
apiServer.CacheClient = cacheClient
} else {
klog.Warning("ks-apiserver starts without redis provided, it will use in memory cache. " +
"This may cause inconsistencies when running ks-apiserver with multiple replicas.")
apiServer.CacheClient = cache.NewSimpleCache()
s.CacheOptions = &cache.Options{Type: cache.DefaultCacheType}
// fake cache has no error to return
cacheClient, _ := cache.New(s.CacheOptions, stopCh)
apiServer.CacheClient = cacheClient
klog.Warning("ks-apiserver starts without cache provided, it will use in memory cache. " +
"This may cause inconsistencies when running ks-apiserver with multiple replicas, and memory leak risk")
}
if s.EventsOptions.Host != "" {

View File

@@ -160,7 +160,7 @@ type Config struct {
ServiceMeshOptions *servicemesh.Options `json:"servicemesh,omitempty" yaml:"servicemesh,omitempty" mapstructure:"servicemesh"`
NetworkOptions *network.Options `json:"network,omitempty" yaml:"network,omitempty" mapstructure:"network"`
LdapOptions *ldap.Options `json:"-,omitempty" yaml:"ldap,omitempty" mapstructure:"ldap"`
RedisOptions *cache.Options `json:"redis,omitempty" yaml:"redis,omitempty" mapstructure:"redis"`
CacheOptions *cache.Options `json:"cache,omitempty" yaml:"cache,omitempty" mapstructure:"cache"`
S3Options *s3.Options `json:"s3,omitempty" yaml:"s3,omitempty" mapstructure:"s3"`
OpenPitrixOptions *openpitrix.Options `json:"openpitrix,omitempty" yaml:"openpitrix,omitempty" mapstructure:"openpitrix"`
MonitoringOptions *prometheus.Options `json:"monitoring,omitempty" yaml:"monitoring,omitempty" mapstructure:"monitoring"`
@@ -189,7 +189,7 @@ func New() *Config {
ServiceMeshOptions: servicemesh.NewServiceMeshOptions(),
NetworkOptions: network.NewNetworkOptions(),
LdapOptions: ldap.NewOptions(),
RedisOptions: cache.NewRedisOptions(),
CacheOptions: cache.NewCacheOptions(),
S3Options: s3.NewS3Options(),
OpenPitrixOptions: openpitrix.NewOptions(),
MonitoringOptions: prometheus.NewPrometheusOptions(),
@@ -292,8 +292,8 @@ func (conf *Config) ToMap() map[string]bool {
// Remove invalid options before serializing to json or yaml
func (conf *Config) stripEmptyOptions() {
if conf.RedisOptions != nil && conf.RedisOptions.Host == "" {
conf.RedisOptions = nil
if conf.CacheOptions != nil && conf.CacheOptions.Type == "" {
conf.CacheOptions = nil
}
if conf.DevopsOptions != nil && conf.DevopsOptions.Host == "" {

View File

@@ -88,11 +88,9 @@ func newTestConfig() (*Config, error) {
MaxCap: 100,
PoolName: "ldap",
},
RedisOptions: &cache.Options{
Host: "localhost",
Port: 6379,
Password: "KUBESPHERE_REDIS_PASSWORD",
DB: 0,
CacheOptions: &cache.Options{
Type: "redis",
Options: map[string]interface{}{},
},
S3Options: &s3.Options{
Endpoint: "http://minio.openpitrix-system.svc",
@@ -236,9 +234,6 @@ func TestGet(t *testing.T) {
saveTestConfig(t, conf)
defer cleanTestConfig(t)
conf.RedisOptions.Password = "P@88w0rd"
os.Setenv("KUBESPHERE_REDIS_PASSWORD", "P@88w0rd")
conf2, err := TryLoadFromDisk()
if err != nil {
t.Fatal(err)
@@ -251,7 +246,7 @@ func TestGet(t *testing.T) {
func TestStripEmptyOptions(t *testing.T) {
var config Config
config.RedisOptions = &cache.Options{Host: ""}
config.CacheOptions = &cache.Options{Type: ""}
config.DevopsOptions = &jenkins.Options{Host: ""}
config.MonitoringOptions = &prometheus.Options{Endpoint: ""}
config.SonarQubeOptions = &sonarqube.Options{Host: ""}
@@ -284,7 +279,7 @@ func TestStripEmptyOptions(t *testing.T) {
config.stripEmptyOptions()
if config.RedisOptions != nil ||
if config.CacheOptions != nil ||
config.DevopsOptions != nil ||
config.MonitoringOptions != nil ||
config.SonarQubeOptions != nil ||

View File

@@ -16,7 +16,17 @@ limitations under the License.
package cache
import "time"
import (
"encoding/json"
"fmt"
"time"
"k8s.io/klog"
)
var (
cacheFactories = make(map[string]CacheFactory)
)
var NeverExpire = time.Duration(0)
@@ -39,3 +49,32 @@ type Interface interface {
// Expires updates object's expiration time, return err if key doesn't exist
Expire(key string, duration time.Duration) error
}
// DynamicOptions the options of the cache. For redis, options key can be "host", "port", "db", "password".
// For InMemoryCache, options key can be "cleanupperiod"
type DynamicOptions map[string]interface{}
func (o DynamicOptions) MarshalJSON() ([]byte, error) {
data, err := json.Marshal(o)
return data, err
}
func RegisterCacheFactory(factory CacheFactory) {
cacheFactories[factory.Type()] = factory
}
func New(option *Options, stopCh <-chan struct{}) (Interface, error) {
if cacheFactories[option.Type] == nil {
err := fmt.Errorf("cache with type %s is not supported", option.Type)
klog.Error(err)
return nil, err
}
cache, err := cacheFactories[option.Type].Create(option.Options, stopCh)
if err != nil {
klog.Errorf("failed to create cache, error: %v", err)
return nil, err
}
return cache, nil
}

8
pkg/simple/client/cache/factory.go vendored Normal file
View File

@@ -0,0 +1,8 @@
package cache
type CacheFactory interface {
// Type unique type of the cache
Type() string
// Create relevant caches by type
Create(options DynamicOptions, stopCh <-chan struct{}) (Interface, error)
}

View File

@@ -0,0 +1,200 @@
/*
Copyright 2019 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"regexp"
"strings"
"time"
"github.com/mitchellh/mapstructure"
"k8s.io/apimachinery/pkg/util/wait"
"kubesphere.io/kubesphere/pkg/server/errors"
)
var ErrNoSuchKey = errors.New("no such key")
const (
typeInMemoryCache = "InMemoryCache"
DefaultCacheType = typeInMemoryCache
defaultCleanupPeriod = 2 * time.Hour
)
type simpleObject struct {
value string
neverExpire bool
expiredAt time.Time
}
func (so *simpleObject) IsExpired() bool {
if so.neverExpire {
return false
}
if time.Now().After(so.expiredAt) {
return true
}
return false
}
// InMemoryCacheOptions used to create inMemoryCache in memory.
// CleanupPeriod specifies cleans up expired token every period.
// Note the SimpleCache cannot be used in multi-replicas apiserver,
// which will lead to data inconsistency.
type InMemoryCacheOptions struct {
CleanupPeriod time.Duration `json:"cleanupPeriod" yaml:"cleanupPeriod" mapstructure:"cleanupperiod"`
}
// imMemoryCache implements cache.Interface use memory objects, it should be used only for testing
type inMemoryCache struct {
store map[string]simpleObject
}
func NewInMemoryCache(options *InMemoryCacheOptions, stopCh <-chan struct{}) (Interface, error) {
var cleanupPeriod time.Duration
cache := &inMemoryCache{
store: make(map[string]simpleObject),
}
if options == nil || options.CleanupPeriod == 0 {
cleanupPeriod = defaultCleanupPeriod
} else {
cleanupPeriod = options.CleanupPeriod
}
go wait.Until(cache.cleanInvalidToken, cleanupPeriod, stopCh)
return cache, nil
}
func (s *inMemoryCache) cleanInvalidToken() {
for k, v := range s.store {
if v.IsExpired() {
delete(s.store, k)
}
}
}
func (s *inMemoryCache) Keys(pattern string) ([]string, error) {
// There is a little difference between go regexp and redis key pattern
// In redis, * means any character, while in go . means match everything.
pattern = strings.Replace(pattern, "*", ".", -1)
re, err := regexp.Compile(pattern)
if err != nil {
return nil, err
}
var keys []string
for k := range s.store {
if re.MatchString(k) {
keys = append(keys, k)
}
}
return keys, nil
}
func (s *inMemoryCache) Set(key string, value string, duration time.Duration) error {
sobject := simpleObject{
value: value,
neverExpire: false,
expiredAt: time.Now().Add(duration),
}
if duration == NeverExpire {
sobject.neverExpire = true
}
s.store[key] = sobject
return nil
}
func (s *inMemoryCache) Del(keys ...string) error {
for _, key := range keys {
delete(s.store, key)
}
return nil
}
func (s *inMemoryCache) Get(key string) (string, error) {
if sobject, ok := s.store[key]; ok {
if sobject.neverExpire || time.Now().Before(sobject.expiredAt) {
return sobject.value, nil
}
}
return "", ErrNoSuchKey
}
func (s *inMemoryCache) Exists(keys ...string) (bool, error) {
for _, key := range keys {
if _, ok := s.store[key]; !ok {
return false, nil
}
}
return true, nil
}
func (s *inMemoryCache) Expire(key string, duration time.Duration) error {
value, err := s.Get(key)
if err != nil {
return err
}
sobject := simpleObject{
value: value,
neverExpire: false,
expiredAt: time.Now().Add(duration),
}
if duration == NeverExpire {
sobject.neverExpire = true
}
s.store[key] = sobject
return nil
}
type inMemoryCacheFactory struct {
}
func (sf *inMemoryCacheFactory) Type() string {
return typeInMemoryCache
}
func (sf *inMemoryCacheFactory) Create(options DynamicOptions, stopCh <-chan struct{}) (Interface, error) {
var sOptions InMemoryCacheOptions
decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
WeaklyTypedInput: true,
Result: &sOptions,
})
if err != nil {
return nil, err
}
if err := decoder.Decode(options); err != nil {
return nil, err
}
return NewInMemoryCache(&sOptions, stopCh)
}
func init() {
RegisterCacheFactory(&inMemoryCacheFactory{})
}

View File

@@ -102,7 +102,7 @@ func TestDeleteAndExpireCache(t *testing.T) {
}
for _, testCase := range testCases {
cacheClient := NewSimpleCache()
cacheClient, _ := NewInMemoryCache(nil, nil)
t.Run(testCase.description, func(t *testing.T) {
err := load(cacheClient, dataSet)

View File

@@ -18,25 +18,19 @@ package cache
import (
"fmt"
"github.com/spf13/pflag"
)
type Options struct {
Host string `json:"host" yaml:"host"`
Port int `json:"port" yaml:"port"`
Password string `json:"password" yaml:"password"`
DB int `json:"db" yaml:"db"`
Type string `json:"type"`
Options DynamicOptions `json:"options"`
}
// NewRedisOptions returns options points to nowhere,
// NewCacheOptions returns options points to nowhere,
// because redis is not required for some components
func NewRedisOptions() *Options {
func NewCacheOptions() *Options {
return &Options{
Host: "",
Port: 0,
Password: "",
DB: 0,
Type: "",
Options: map[string]interface{}{},
}
}
@@ -44,20 +38,9 @@ func NewRedisOptions() *Options {
func (r *Options) Validate() []error {
errors := make([]error, 0)
if r.Port == 0 {
errors = append(errors, fmt.Errorf("invalid service port number"))
if r.Type == "" {
errors = append(errors, fmt.Errorf("invalid cache type"))
}
return errors
}
// AddFlags add option flags to command line flags,
// if redis-host left empty, the following options will be ignored.
func (r *Options) AddFlags(fs *pflag.FlagSet, s *Options) {
fs.StringVar(&r.Host, "redis-host", s.Host, "Redis connection URL. If left blank, means redis is unnecessary, "+
"redis will be disabled.")
fs.IntVar(&r.Port, "redis-port", s.Port, "")
fs.StringVar(&r.Password, "redis-password", s.Password, "")
fs.IntVar(&r.DB, "redis-db", s.DB, "")
}

View File

@@ -17,19 +17,31 @@ limitations under the License.
package cache
import (
"errors"
"fmt"
"time"
"github.com/go-redis/redis"
"github.com/mitchellh/mapstructure"
"k8s.io/klog"
)
type Client struct {
const typeRedis = "redis"
type redisClient struct {
client *redis.Client
}
func NewRedisClient(option *Options, stopCh <-chan struct{}) (Interface, error) {
var r Client
// redisOptions used to create a redis client.
type redisOptions struct {
Host string `json:"host" yaml:"host" mapstructure:"host"`
Port int `json:"port" yaml:"port" mapstructure:"port"`
Password string `json:"password" yaml:"password" mapstructure:"password"`
DB int `json:"db" yaml:"db" mapstructure:"db"`
}
func NewRedisClient(option *redisOptions, stopCh <-chan struct{}) (Interface, error) {
var r redisClient
redisOptions := &redis.Options{
Addr: fmt.Sprintf("%s:%d", option.Host, option.Port),
@@ -61,23 +73,23 @@ func NewRedisClient(option *Options, stopCh <-chan struct{}) (Interface, error)
return &r, nil
}
func (r *Client) Get(key string) (string, error) {
func (r *redisClient) Get(key string) (string, error) {
return r.client.Get(key).Result()
}
func (r *Client) Keys(pattern string) ([]string, error) {
func (r *redisClient) Keys(pattern string) ([]string, error) {
return r.client.Keys(pattern).Result()
}
func (r *Client) Set(key string, value string, duration time.Duration) error {
func (r *redisClient) Set(key string, value string, duration time.Duration) error {
return r.client.Set(key, value, duration).Err()
}
func (r *Client) Del(keys ...string) error {
func (r *redisClient) Del(keys ...string) error {
return r.client.Del(keys...).Err()
}
func (r *Client) Exists(keys ...string) (bool, error) {
func (r *redisClient) Exists(keys ...string) (bool, error) {
existedKeys, err := r.client.Exists(keys...).Result()
if err != nil {
return false, err
@@ -86,6 +98,34 @@ func (r *Client) Exists(keys ...string) (bool, error) {
return len(keys) == int(existedKeys), nil
}
func (r *Client) Expire(key string, duration time.Duration) error {
func (r *redisClient) Expire(key string, duration time.Duration) error {
return r.client.Expire(key, duration).Err()
}
type redisFactory struct{}
func (rf *redisFactory) Type() string {
return typeRedis
}
func (rf *redisFactory) Create(options DynamicOptions, stopCh <-chan struct{}) (Interface, error) {
var rOptions redisOptions
if err := mapstructure.Decode(options, &rOptions); err != nil {
return nil, err
}
if rOptions.Port == 0 {
return nil, errors.New("invalid service port number")
}
if len(rOptions.Host) == 0 {
return nil, errors.New("invalid service host")
}
client, err := NewRedisClient(&rOptions, stopCh)
if err != nil {
return nil, err
}
return client, nil
}
func init() {
RegisterCacheFactory(&redisFactory{})
}

View File

@@ -1,123 +0,0 @@
/*
Copyright 2019 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"regexp"
"strings"
"time"
"kubesphere.io/kubesphere/pkg/server/errors"
)
var ErrNoSuchKey = errors.New("no such key")
type simpleObject struct {
value string
neverExpire bool
expiredAt time.Time
}
// SimpleCache implements cache.Interface use memory objects, it should be used only for testing
type simpleCache struct {
store map[string]simpleObject
}
func NewSimpleCache() Interface {
return &simpleCache{store: make(map[string]simpleObject)}
}
func (s *simpleCache) Keys(pattern string) ([]string, error) {
// There is a little difference between go regexp and redis key pattern
// In redis, * means any character, while in go . means match everything.
pattern = strings.Replace(pattern, "*", ".", -1)
re, err := regexp.Compile(pattern)
if err != nil {
return nil, err
}
var keys []string
for k := range s.store {
if re.MatchString(k) {
keys = append(keys, k)
}
}
return keys, nil
}
func (s *simpleCache) Set(key string, value string, duration time.Duration) error {
sobject := simpleObject{
value: value,
neverExpire: false,
expiredAt: time.Now().Add(duration),
}
if duration == NeverExpire {
sobject.neverExpire = true
}
s.store[key] = sobject
return nil
}
func (s *simpleCache) Del(keys ...string) error {
for _, key := range keys {
delete(s.store, key)
}
return nil
}
func (s *simpleCache) Get(key string) (string, error) {
if sobject, ok := s.store[key]; ok {
if sobject.neverExpire || time.Now().Before(sobject.expiredAt) {
return sobject.value, nil
}
}
return "", ErrNoSuchKey
}
func (s *simpleCache) Exists(keys ...string) (bool, error) {
for _, key := range keys {
if _, ok := s.store[key]; !ok {
return false, nil
}
}
return true, nil
}
func (s *simpleCache) Expire(key string, duration time.Duration) error {
value, err := s.Get(key)
if err != nil {
return err
}
sobject := simpleObject{
value: value,
neverExpire: false,
expiredAt: time.Now().Add(duration),
}
if duration == NeverExpire {
sobject.neverExpire = true
}
s.store[key] = sobject
return nil
}

View File

@@ -38,6 +38,11 @@ func TestClient_Get(t *testing.T) {
type args struct {
url string
}
inMemoryCache, err := cache.NewInMemoryCache(nil, nil)
if err != nil {
t.Fatal(err)
}
token, _ := json.Marshal(
&TokenResponse{
Username: "test",
@@ -92,7 +97,7 @@ func TestClient_Get(t *testing.T) {
name: "Token",
fields: fields{
Strategy: AuthStrategyToken,
cache: cache.NewSimpleCache(),
cache: inMemoryCache,
client: &MockClient{
TokenResult: token,
RequestResult: "fake",