From 4eb5401f766032093fabf746d061ab5054f8faad Mon Sep 17 00:00:00 2001 From: LiHui Date: Fri, 17 Sep 2021 16:36:25 +0800 Subject: [PATCH] calculate the category for the app Signed-off-by: LiHui --- pkg/models/openpitrix/categories.go | 15 +- pkg/models/openpitrix/category_test.go | 4 +- pkg/models/openpitrix/interface.go | 16 ++- .../openpitrix/helmrepoindex/repo_index.go | 6 +- pkg/utils/reposcache/repo_cahes.go | 133 ++++++++++++------ 5 files changed, 125 insertions(+), 49 deletions(-) diff --git a/pkg/models/openpitrix/categories.go b/pkg/models/openpitrix/categories.go index 9afddb966..0116b656b 100644 --- a/pkg/models/openpitrix/categories.go +++ b/pkg/models/openpitrix/categories.go @@ -17,6 +17,8 @@ import ( "context" "sort" + "kubesphere.io/kubesphere/pkg/utils/reposcache" + "kubesphere.io/kubesphere/pkg/apiserver/query" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -48,12 +50,14 @@ type CategoryInterface interface { type categoryOperator struct { ctgClient typed_v1alpha1.ApplicationV1alpha1Interface ctgLister listers_v1alpha1.HelmCategoryLister + repoCache reposcache.ReposCache } -func newCategoryOperator(ksFactory externalversions.SharedInformerFactory, ksClient versioned.Interface) CategoryInterface { +func newCategoryOperator(repoCache reposcache.ReposCache, ksFactory externalversions.SharedInformerFactory, ksClient versioned.Interface) CategoryInterface { c := &categoryOperator{ ctgClient: ksClient.ApplicationV1alpha1(), ctgLister: ksFactory.Application().V1alpha1().HelmCategories().Lister(), + repoCache: repoCache, } return c @@ -190,8 +194,15 @@ func (c *categoryOperator) ListCategories(conditions *params.Conditions, orderBy start, end := (&query.Pagination{Limit: limit, Offset: offset}).GetValidPagination(totalCount) ctgs = ctgs[start:end] items := make([]interface{}, 0, len(ctgs)) + + ctgCountsOfBuiltinRepo := c.repoCache.CopyCategoryCount() for i := range ctgs { - items = append(items, convertCategory(ctgs[i])) + convertedCtg := convertCategory(ctgs[i]) + // The statistic of category for app in etcd is stored in the crd. + // The statistic of category for the app in the built-in repo is stored in the memory. + // So we should calculate these two value then return. + *convertedCtg.AppTotal += ctgCountsOfBuiltinRepo[convertedCtg.CategoryID] + items = append(items, convertedCtg) } return &models.PageableResponse{Items: items, TotalCount: totalCount}, nil diff --git a/pkg/models/openpitrix/category_test.go b/pkg/models/openpitrix/category_test.go index 5c3c273f5..9169fcc8f 100644 --- a/pkg/models/openpitrix/category_test.go +++ b/pkg/models/openpitrix/category_test.go @@ -20,6 +20,8 @@ import ( "context" "testing" + "kubesphere.io/kubesphere/pkg/utils/reposcache" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" fakek8s "k8s.io/client-go/kubernetes/fake" "k8s.io/klog" @@ -82,5 +84,5 @@ func prepareCategoryOperator() CategoryInterface { k8sClient = fakek8s.NewSimpleClientset() fakeInformerFactory = informers.NewInformerFactories(k8sClient, ksClient, nil, nil, nil, nil) - return newCategoryOperator(fakeInformerFactory.KubeSphereSharedInformerFactory(), ksClient) + return newCategoryOperator(reposcache.NewReposCache(), fakeInformerFactory.KubeSphereSharedInformerFactory(), ksClient) } diff --git a/pkg/models/openpitrix/interface.go b/pkg/models/openpitrix/interface.go index b0b4a143d..ba2229d79 100644 --- a/pkg/models/openpitrix/interface.go +++ b/pkg/models/openpitrix/interface.go @@ -56,7 +56,6 @@ func init() { } func NewOpenpitrixOperator(ksInformers ks_informers.InformerFactory, ksClient versioned.Interface, s3Client s3.Interface) Interface { - once.Do(func() { klog.Infof("start helm repo informer") helmReposInformer = ksInformers.KubeSphereSharedInformerFactory().Application().V1alpha1().HelmRepos().Informer() @@ -75,6 +74,19 @@ func NewOpenpitrixOperator(ksInformers ks_informers.InformerFactory, ksClient ve cachedReposData.DeleteRepo(r) }, }) + + ctgInformer := ksInformers.KubeSphereSharedInformerFactory().Application().V1alpha1().HelmCategories().Informer() + ctgInformer.AddIndexers(map[string]cache.IndexFunc{ + reposcache.CategoryIndexer: func(obj interface{}) ([]string, error) { + ctg, _ := obj.(*v1alpha1.HelmCategory) + return []string{ctg.Spec.Name}, nil + }, + }) + indexer := ctgInformer.GetIndexer() + + cachedReposData.SetCategoryIndexer(indexer) + + go ctgInformer.Run(wait.NeverStop) go helmReposInformer.Run(wait.NeverStop) }) @@ -83,6 +95,6 @@ func NewOpenpitrixOperator(ksInformers ks_informers.InformerFactory, ksClient ve ApplicationInterface: newApplicationOperator(cachedReposData, ksInformers.KubeSphereSharedInformerFactory(), ksClient, s3Client), RepoInterface: newRepoOperator(cachedReposData, ksInformers.KubeSphereSharedInformerFactory(), ksClient), ReleaseInterface: newReleaseOperator(cachedReposData, ksInformers.KubernetesSharedInformerFactory(), ksInformers.KubeSphereSharedInformerFactory(), ksClient), - CategoryInterface: newCategoryOperator(ksInformers.KubeSphereSharedInformerFactory(), ksClient), + CategoryInterface: newCategoryOperator(cachedReposData, ksInformers.KubeSphereSharedInformerFactory(), ksClient), } } diff --git a/pkg/simple/client/openpitrix/helmrepoindex/repo_index.go b/pkg/simple/client/openpitrix/helmrepoindex/repo_index.go index 0087dd94c..2a2a2c219 100644 --- a/pkg/simple/client/openpitrix/helmrepoindex/repo_index.go +++ b/pkg/simple/client/openpitrix/helmrepoindex/repo_index.go @@ -108,7 +108,7 @@ func MergeRepoIndex(repo *v1alpha1.HelmRepo, index *helmrepo.IndexFile, existsSa Created: time.Now(), } - // The app version will be added to the labels of the helm release. + // The app id will be added to the labels of the helm release. // But the apps in the repos which are created by the user may contain malformed text, so we generate a random name for them. // The apps in the system repo have been audited by the admin, so the name of the charts should not include malformed text. // Then we can add the name string to the labels of the k8s object. @@ -222,7 +222,7 @@ func (i *SavedIndex) GetApplicationVersion(appId, versionId string) *v1alpha1.He return nil } -// The app version name will be added to the labels of the helm release. +// The app version id will be added to the labels of the helm release. // But the apps in the repos which are created by the user may contain malformed text, so we generate a random name for them. // The apps in the system repo have been audited by the admin, so the name of the charts should not include malformed text. // Then we can add the name string to the labels of the k8s object. @@ -237,7 +237,7 @@ func generateAppVersionId(repo *v1alpha1.HelmRepo, chartName, version string) st // IsBuiltInRepo checks whether a repo is a built-in repo. // All the built-in repos are located in the workspace system-workspace and the name starts with 'built-in' -// to differentiate from the repos created by the user +// to differentiate from the repos created by the user. func IsBuiltInRepo(repoName string) bool { return strings.HasPrefix(repoName, v1alpha1.BuiltinRepoPrefix) } diff --git a/pkg/utils/reposcache/repo_cahes.go b/pkg/utils/reposcache/repo_cahes.go index a671bd527..81a9b8d68 100644 --- a/pkg/utils/reposcache/repo_cahes.go +++ b/pkg/utils/reposcache/repo_cahes.go @@ -27,6 +27,8 @@ import ( "strings" "sync" + "k8s.io/client-go/tools/cache" + "github.com/Masterminds/semver/v3" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -38,15 +40,20 @@ import ( "kubesphere.io/kubesphere/pkg/simple/client/openpitrix/helmrepoindex" ) +const ( + CategoryIndexer = "category_indexer" + CategoryAnnotationKey = "app.kubesphere.io/category" +) + var WorkDir string func NewReposCache() ReposCache { return &cachedRepos{ - chartsInRepo: map[workspace]map[string]int{}, - repos: map[string]*v1alpha1.HelmRepo{}, - apps: map[string]*v1alpha1.HelmApplication{}, - versions: map[string]*v1alpha1.HelmApplicationVersion{}, - repoCtgCounts: map[string]map[string]int{}, + chartsInRepo: map[workspace]map[string]int{}, + repos: map[string]*v1alpha1.HelmRepo{}, + apps: map[string]*v1alpha1.HelmApplication{}, + versions: map[string]*v1alpha1.HelmApplicationVersion{}, + builtinCategoryCounts: map[string]int{}, } } @@ -62,31 +69,40 @@ type ReposCache interface { ListAppVersionsByAppId(appId string) (ret []*v1alpha1.HelmApplicationVersion, exists bool) ListApplicationsInRepo(repoId string) (ret []*v1alpha1.HelmApplication, exists bool) ListApplicationsInBuiltinRepo(selector labels.Selector) (ret []*v1alpha1.HelmApplication, exists bool) + + SetCategoryIndexer(indexer cache.Indexer) + CopyCategoryCount() map[string]int } type workspace string type cachedRepos struct { sync.RWMutex - chartsInRepo map[workspace]map[string]int - repoCtgCounts map[string]map[string]int + chartsInRepo map[workspace]map[string]int + + // builtinCategoryCounts saves the count of every category in the built-in repo. + builtinCategoryCounts map[string]int repos map[string]*v1alpha1.HelmRepo apps map[string]*v1alpha1.HelmApplication versions map[string]*v1alpha1.HelmApplicationVersion + + // indexerOfHelmCtg is the indexer of HelmCategory, used to query the category id from category name. + indexerOfHelmCtg cache.Indexer } func (c *cachedRepos) deleteRepo(repo *v1alpha1.HelmRepo) { if len(repo.Status.Data) == 0 { return } + index, err := helmrepoindex.ByteArrayToSavedIndex([]byte(repo.Status.Data)) if err != nil { klog.Errorf("json unmarshal repo %s failed, error: %s", repo.Name, err) return } - klog.V(4).Infof("delete repo %s from cache", repo.Name) + klog.V(2).Infof("delete repo %s from cache", repo.Name) repoId := repo.GetHelmRepoId() ws := workspace(repo.GetWorkspace()) @@ -94,10 +110,19 @@ func (c *cachedRepos) deleteRepo(repo *v1alpha1.HelmRepo) { delete(c.chartsInRepo[ws], repoId) } - delete(c.repoCtgCounts, repoId) delete(c.repos, repoId) for _, app := range index.Applications { + if _, exists := c.apps[app.ApplicationId]; !exists { + continue + } + if helmrepoindex.IsBuiltInRepo(repo.Name) { + ctgId := c.apps[app.ApplicationId].Labels[constants.CategoryIdLabelKey] + if ctgId != "" { + c.builtinCategoryCounts[ctgId] -= 1 + } + } + delete(c.apps, app.ApplicationId) for _, ver := range app.Charts { delete(c.versions, ver.ApplicationVersionId) @@ -127,6 +152,40 @@ func (c *cachedRepos) DeleteRepo(repo *v1alpha1.HelmRepo) error { return nil } +// CopyCategoryCount copies the internal map to avoid `concurrent map iteration and map write`. +func (c *cachedRepos) CopyCategoryCount() map[string]int { + c.RLock() + defer c.RUnlock() + + ret := make(map[string]int, len(c.builtinCategoryCounts)) + for k, v := range c.builtinCategoryCounts { + ret[k] = v + } + + return ret +} + +func (c *cachedRepos) SetCategoryIndexer(indexer cache.Indexer) { + c.Lock() + c.indexerOfHelmCtg = indexer + c.Unlock() +} + +// translateCategoryNameToId translate a category-name to a category-id. +// The caller should hold the lock +func (c *cachedRepos) translateCategoryNameToId(ctgName string) string { + if c.indexerOfHelmCtg == nil || ctgName == "" { + return v1alpha1.UncategorizedId + } + + if items, err := c.indexerOfHelmCtg.ByIndex(CategoryIndexer, ctgName); len(items) == 0 || err != nil { + return v1alpha1.UncategorizedId + } else { + obj, _ := items[0].(*v1alpha1.HelmCategory) + return obj.Name + } +} + func (c *cachedRepos) GetApplication(appId string) (app *v1alpha1.HelmApplication, exists bool) { c.RLock() defer c.RUnlock() @@ -153,7 +212,7 @@ func (c *cachedRepos) AddRepo(repo *v1alpha1.HelmRepo) error { return c.addRepo(repo, false) } -//Add new Repo to cachedRepos +// Add a new Repo to cachedRepos func (c *cachedRepos) addRepo(repo *v1alpha1.HelmRepo, builtin bool) error { if len(repo.Status.Data) == 0 { return nil @@ -173,9 +232,6 @@ func (c *cachedRepos) addRepo(repo *v1alpha1.HelmRepo, builtin bool) error { repoId := repo.GetHelmRepoId() c.repos[repoId] = repo - if _, exists := c.repoCtgCounts[repoId]; !exists { - c.repoCtgCounts[repoId] = map[string]int{} - } var appName string chartsCount := 0 @@ -189,7 +245,7 @@ func (c *cachedRepos) addRepo(repo *v1alpha1.HelmRepo, builtin bool) error { appLabels[constants.ChartRepoIdLabelKey] = repoId - HelmApp := v1alpha1.HelmApplication{ + helmApp := v1alpha1.HelmApplication{ ObjectMeta: metav1.ObjectMeta{ Name: appName, Annotations: map[string]string{ @@ -207,7 +263,7 @@ func (c *cachedRepos) addRepo(repo *v1alpha1.HelmRepo, builtin bool) error { State: v1alpha1.StateActive, }, } - c.apps[app.ApplicationId] = &HelmApp + c.apps[app.ApplicationId] = &helmApp var ctg, appVerName string var chartData []byte @@ -215,11 +271,10 @@ func (c *cachedRepos) addRepo(repo *v1alpha1.HelmRepo, builtin bool) error { var latestSemver *semver.Version // build all the versions of this app - for _, ver := range app.Charts { + for _, chartVersion := range app.Charts { chartsCount += 1 - ctg = ver.Annotations["category"] - appVerName = ver.ApplicationVersionId + appVerName = chartVersion.ApplicationVersionId version := &v1alpha1.HelmApplicationVersion{ ObjectMeta: metav1.ObjectMeta{ Name: appVerName, @@ -228,23 +283,23 @@ func (c *cachedRepos) addRepo(repo *v1alpha1.HelmRepo, builtin bool) error { constants.ChartApplicationIdLabelKey: appName, constants.ChartRepoIdLabelKey: repo.GetHelmRepoId(), }, - CreationTimestamp: metav1.Time{Time: ver.Created}, + CreationTimestamp: metav1.Time{Time: chartVersion.Created}, }, Spec: v1alpha1.HelmApplicationVersionSpec{ Metadata: &v1alpha1.Metadata{ - Name: ver.Name, - AppVersion: ver.AppVersion, - Version: ver.Version, + Name: chartVersion.Name, + AppVersion: chartVersion.AppVersion, + Version: chartVersion.Version, }, - URLs: ver.URLs, - Digest: ver.Digest, + URLs: chartVersion.URLs, + Digest: chartVersion.Digest, Data: chartData, }, Status: v1alpha1.HelmApplicationVersionStatus{ State: v1alpha1.StateActive, }, } - c.versions[ver.ApplicationVersionId] = version + c.versions[chartVersion.ApplicationVersionId] = version // Find the latest version. currSemver, err := semver.NewVersion(version.GetSemver()) @@ -253,10 +308,14 @@ func (c *cachedRepos) addRepo(repo *v1alpha1.HelmRepo, builtin bool) error { // the first valid semver latestSemver = currSemver latestVersionName = version.GetVersionName() + + // Use the category of the latest version as the category of the app. + ctg = chartVersion.Annotations[CategoryAnnotationKey] } else if latestSemver.LessThan(currSemver) { // find a newer valid semver latestSemver = currSemver latestVersionName = version.GetVersionName() + ctg = chartVersion.Annotations[CategoryAnnotationKey] } } else { // If the semver is invalid, just ignore it. @@ -264,25 +323,17 @@ func (c *cachedRepos) addRepo(repo *v1alpha1.HelmRepo, builtin bool) error { } } - HelmApp.Status.LatestVersion = latestVersionName + helmApp.Status.LatestVersion = latestVersionName - //modify application category - ctgId := "" - if ctg != "" { - if c.apps[app.ApplicationId].Annotations == nil { - c.apps[app.ApplicationId].Annotations = map[string]string{constants.CategoryIdLabelKey: ctg} - } else { - c.apps[app.ApplicationId].Annotations[constants.CategoryIdLabelKey] = ctg + if helmrepoindex.IsBuiltInRepo(repo.Name) { + // Add category id to the apps in the built-in repo + ctgId := c.translateCategoryNameToId(ctg) + if helmApp.Labels == nil { + helmApp.Labels = map[string]string{} } - ctgId = ctg - } else { - ctgId = v1alpha1.UncategorizedId - } + helmApp.Labels[constants.CategoryIdLabelKey] = ctgId - if _, exists := c.repoCtgCounts[repoId][ctgId]; !exists { - c.repoCtgCounts[repoId][ctgId] = 1 - } else { - c.repoCtgCounts[repoId][ctgId] += 1 + c.builtinCategoryCounts[ctgId] += 1 } }