calculate the category for the app
Signed-off-by: LiHui <andrewli@kubesphere.io>
This commit is contained in:
@@ -17,6 +17,8 @@ import (
|
||||
"context"
|
||||
"sort"
|
||||
|
||||
"kubesphere.io/kubesphere/pkg/utils/reposcache"
|
||||
|
||||
"kubesphere.io/kubesphere/pkg/apiserver/query"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
@@ -48,12 +50,14 @@ type CategoryInterface interface {
|
||||
type categoryOperator struct {
|
||||
ctgClient typed_v1alpha1.ApplicationV1alpha1Interface
|
||||
ctgLister listers_v1alpha1.HelmCategoryLister
|
||||
repoCache reposcache.ReposCache
|
||||
}
|
||||
|
||||
func newCategoryOperator(ksFactory externalversions.SharedInformerFactory, ksClient versioned.Interface) CategoryInterface {
|
||||
func newCategoryOperator(repoCache reposcache.ReposCache, ksFactory externalversions.SharedInformerFactory, ksClient versioned.Interface) CategoryInterface {
|
||||
c := &categoryOperator{
|
||||
ctgClient: ksClient.ApplicationV1alpha1(),
|
||||
ctgLister: ksFactory.Application().V1alpha1().HelmCategories().Lister(),
|
||||
repoCache: repoCache,
|
||||
}
|
||||
|
||||
return c
|
||||
@@ -190,8 +194,15 @@ func (c *categoryOperator) ListCategories(conditions *params.Conditions, orderBy
|
||||
start, end := (&query.Pagination{Limit: limit, Offset: offset}).GetValidPagination(totalCount)
|
||||
ctgs = ctgs[start:end]
|
||||
items := make([]interface{}, 0, len(ctgs))
|
||||
|
||||
ctgCountsOfBuiltinRepo := c.repoCache.CopyCategoryCount()
|
||||
for i := range ctgs {
|
||||
items = append(items, convertCategory(ctgs[i]))
|
||||
convertedCtg := convertCategory(ctgs[i])
|
||||
// The statistic of category for app in etcd is stored in the crd.
|
||||
// The statistic of category for the app in the built-in repo is stored in the memory.
|
||||
// So we should calculate these two value then return.
|
||||
*convertedCtg.AppTotal += ctgCountsOfBuiltinRepo[convertedCtg.CategoryID]
|
||||
items = append(items, convertedCtg)
|
||||
}
|
||||
|
||||
return &models.PageableResponse{Items: items, TotalCount: totalCount}, nil
|
||||
|
||||
@@ -20,6 +20,8 @@ import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"kubesphere.io/kubesphere/pkg/utils/reposcache"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
fakek8s "k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/klog"
|
||||
@@ -82,5 +84,5 @@ func prepareCategoryOperator() CategoryInterface {
|
||||
k8sClient = fakek8s.NewSimpleClientset()
|
||||
fakeInformerFactory = informers.NewInformerFactories(k8sClient, ksClient, nil, nil, nil, nil)
|
||||
|
||||
return newCategoryOperator(fakeInformerFactory.KubeSphereSharedInformerFactory(), ksClient)
|
||||
return newCategoryOperator(reposcache.NewReposCache(), fakeInformerFactory.KubeSphereSharedInformerFactory(), ksClient)
|
||||
}
|
||||
|
||||
@@ -56,7 +56,6 @@ func init() {
|
||||
}
|
||||
|
||||
func NewOpenpitrixOperator(ksInformers ks_informers.InformerFactory, ksClient versioned.Interface, s3Client s3.Interface) Interface {
|
||||
|
||||
once.Do(func() {
|
||||
klog.Infof("start helm repo informer")
|
||||
helmReposInformer = ksInformers.KubeSphereSharedInformerFactory().Application().V1alpha1().HelmRepos().Informer()
|
||||
@@ -75,6 +74,19 @@ func NewOpenpitrixOperator(ksInformers ks_informers.InformerFactory, ksClient ve
|
||||
cachedReposData.DeleteRepo(r)
|
||||
},
|
||||
})
|
||||
|
||||
ctgInformer := ksInformers.KubeSphereSharedInformerFactory().Application().V1alpha1().HelmCategories().Informer()
|
||||
ctgInformer.AddIndexers(map[string]cache.IndexFunc{
|
||||
reposcache.CategoryIndexer: func(obj interface{}) ([]string, error) {
|
||||
ctg, _ := obj.(*v1alpha1.HelmCategory)
|
||||
return []string{ctg.Spec.Name}, nil
|
||||
},
|
||||
})
|
||||
indexer := ctgInformer.GetIndexer()
|
||||
|
||||
cachedReposData.SetCategoryIndexer(indexer)
|
||||
|
||||
go ctgInformer.Run(wait.NeverStop)
|
||||
go helmReposInformer.Run(wait.NeverStop)
|
||||
})
|
||||
|
||||
@@ -83,6 +95,6 @@ func NewOpenpitrixOperator(ksInformers ks_informers.InformerFactory, ksClient ve
|
||||
ApplicationInterface: newApplicationOperator(cachedReposData, ksInformers.KubeSphereSharedInformerFactory(), ksClient, s3Client),
|
||||
RepoInterface: newRepoOperator(cachedReposData, ksInformers.KubeSphereSharedInformerFactory(), ksClient),
|
||||
ReleaseInterface: newReleaseOperator(cachedReposData, ksInformers.KubernetesSharedInformerFactory(), ksInformers.KubeSphereSharedInformerFactory(), ksClient),
|
||||
CategoryInterface: newCategoryOperator(ksInformers.KubeSphereSharedInformerFactory(), ksClient),
|
||||
CategoryInterface: newCategoryOperator(cachedReposData, ksInformers.KubeSphereSharedInformerFactory(), ksClient),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -108,7 +108,7 @@ func MergeRepoIndex(repo *v1alpha1.HelmRepo, index *helmrepo.IndexFile, existsSa
|
||||
Created: time.Now(),
|
||||
}
|
||||
|
||||
// The app version will be added to the labels of the helm release.
|
||||
// The app id will be added to the labels of the helm release.
|
||||
// But the apps in the repos which are created by the user may contain malformed text, so we generate a random name for them.
|
||||
// The apps in the system repo have been audited by the admin, so the name of the charts should not include malformed text.
|
||||
// Then we can add the name string to the labels of the k8s object.
|
||||
@@ -222,7 +222,7 @@ func (i *SavedIndex) GetApplicationVersion(appId, versionId string) *v1alpha1.He
|
||||
return nil
|
||||
}
|
||||
|
||||
// The app version name will be added to the labels of the helm release.
|
||||
// The app version id will be added to the labels of the helm release.
|
||||
// But the apps in the repos which are created by the user may contain malformed text, so we generate a random name for them.
|
||||
// The apps in the system repo have been audited by the admin, so the name of the charts should not include malformed text.
|
||||
// Then we can add the name string to the labels of the k8s object.
|
||||
@@ -237,7 +237,7 @@ func generateAppVersionId(repo *v1alpha1.HelmRepo, chartName, version string) st
|
||||
|
||||
// IsBuiltInRepo checks whether a repo is a built-in repo.
|
||||
// All the built-in repos are located in the workspace system-workspace and the name starts with 'built-in'
|
||||
// to differentiate from the repos created by the user
|
||||
// to differentiate from the repos created by the user.
|
||||
func IsBuiltInRepo(repoName string) bool {
|
||||
return strings.HasPrefix(repoName, v1alpha1.BuiltinRepoPrefix)
|
||||
}
|
||||
|
||||
@@ -27,6 +27,8 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
"github.com/Masterminds/semver/v3"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
@@ -38,15 +40,20 @@ import (
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/openpitrix/helmrepoindex"
|
||||
)
|
||||
|
||||
const (
|
||||
CategoryIndexer = "category_indexer"
|
||||
CategoryAnnotationKey = "app.kubesphere.io/category"
|
||||
)
|
||||
|
||||
var WorkDir string
|
||||
|
||||
func NewReposCache() ReposCache {
|
||||
return &cachedRepos{
|
||||
chartsInRepo: map[workspace]map[string]int{},
|
||||
repos: map[string]*v1alpha1.HelmRepo{},
|
||||
apps: map[string]*v1alpha1.HelmApplication{},
|
||||
versions: map[string]*v1alpha1.HelmApplicationVersion{},
|
||||
repoCtgCounts: map[string]map[string]int{},
|
||||
chartsInRepo: map[workspace]map[string]int{},
|
||||
repos: map[string]*v1alpha1.HelmRepo{},
|
||||
apps: map[string]*v1alpha1.HelmApplication{},
|
||||
versions: map[string]*v1alpha1.HelmApplicationVersion{},
|
||||
builtinCategoryCounts: map[string]int{},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -62,31 +69,40 @@ type ReposCache interface {
|
||||
ListAppVersionsByAppId(appId string) (ret []*v1alpha1.HelmApplicationVersion, exists bool)
|
||||
ListApplicationsInRepo(repoId string) (ret []*v1alpha1.HelmApplication, exists bool)
|
||||
ListApplicationsInBuiltinRepo(selector labels.Selector) (ret []*v1alpha1.HelmApplication, exists bool)
|
||||
|
||||
SetCategoryIndexer(indexer cache.Indexer)
|
||||
CopyCategoryCount() map[string]int
|
||||
}
|
||||
|
||||
type workspace string
|
||||
type cachedRepos struct {
|
||||
sync.RWMutex
|
||||
|
||||
chartsInRepo map[workspace]map[string]int
|
||||
repoCtgCounts map[string]map[string]int
|
||||
chartsInRepo map[workspace]map[string]int
|
||||
|
||||
// builtinCategoryCounts saves the count of every category in the built-in repo.
|
||||
builtinCategoryCounts map[string]int
|
||||
|
||||
repos map[string]*v1alpha1.HelmRepo
|
||||
apps map[string]*v1alpha1.HelmApplication
|
||||
versions map[string]*v1alpha1.HelmApplicationVersion
|
||||
|
||||
// indexerOfHelmCtg is the indexer of HelmCategory, used to query the category id from category name.
|
||||
indexerOfHelmCtg cache.Indexer
|
||||
}
|
||||
|
||||
func (c *cachedRepos) deleteRepo(repo *v1alpha1.HelmRepo) {
|
||||
if len(repo.Status.Data) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
index, err := helmrepoindex.ByteArrayToSavedIndex([]byte(repo.Status.Data))
|
||||
if err != nil {
|
||||
klog.Errorf("json unmarshal repo %s failed, error: %s", repo.Name, err)
|
||||
return
|
||||
}
|
||||
|
||||
klog.V(4).Infof("delete repo %s from cache", repo.Name)
|
||||
klog.V(2).Infof("delete repo %s from cache", repo.Name)
|
||||
|
||||
repoId := repo.GetHelmRepoId()
|
||||
ws := workspace(repo.GetWorkspace())
|
||||
@@ -94,10 +110,19 @@ func (c *cachedRepos) deleteRepo(repo *v1alpha1.HelmRepo) {
|
||||
delete(c.chartsInRepo[ws], repoId)
|
||||
}
|
||||
|
||||
delete(c.repoCtgCounts, repoId)
|
||||
delete(c.repos, repoId)
|
||||
|
||||
for _, app := range index.Applications {
|
||||
if _, exists := c.apps[app.ApplicationId]; !exists {
|
||||
continue
|
||||
}
|
||||
if helmrepoindex.IsBuiltInRepo(repo.Name) {
|
||||
ctgId := c.apps[app.ApplicationId].Labels[constants.CategoryIdLabelKey]
|
||||
if ctgId != "" {
|
||||
c.builtinCategoryCounts[ctgId] -= 1
|
||||
}
|
||||
}
|
||||
|
||||
delete(c.apps, app.ApplicationId)
|
||||
for _, ver := range app.Charts {
|
||||
delete(c.versions, ver.ApplicationVersionId)
|
||||
@@ -127,6 +152,40 @@ func (c *cachedRepos) DeleteRepo(repo *v1alpha1.HelmRepo) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// CopyCategoryCount copies the internal map to avoid `concurrent map iteration and map write`.
|
||||
func (c *cachedRepos) CopyCategoryCount() map[string]int {
|
||||
c.RLock()
|
||||
defer c.RUnlock()
|
||||
|
||||
ret := make(map[string]int, len(c.builtinCategoryCounts))
|
||||
for k, v := range c.builtinCategoryCounts {
|
||||
ret[k] = v
|
||||
}
|
||||
|
||||
return ret
|
||||
}
|
||||
|
||||
func (c *cachedRepos) SetCategoryIndexer(indexer cache.Indexer) {
|
||||
c.Lock()
|
||||
c.indexerOfHelmCtg = indexer
|
||||
c.Unlock()
|
||||
}
|
||||
|
||||
// translateCategoryNameToId translate a category-name to a category-id.
|
||||
// The caller should hold the lock
|
||||
func (c *cachedRepos) translateCategoryNameToId(ctgName string) string {
|
||||
if c.indexerOfHelmCtg == nil || ctgName == "" {
|
||||
return v1alpha1.UncategorizedId
|
||||
}
|
||||
|
||||
if items, err := c.indexerOfHelmCtg.ByIndex(CategoryIndexer, ctgName); len(items) == 0 || err != nil {
|
||||
return v1alpha1.UncategorizedId
|
||||
} else {
|
||||
obj, _ := items[0].(*v1alpha1.HelmCategory)
|
||||
return obj.Name
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cachedRepos) GetApplication(appId string) (app *v1alpha1.HelmApplication, exists bool) {
|
||||
c.RLock()
|
||||
defer c.RUnlock()
|
||||
@@ -153,7 +212,7 @@ func (c *cachedRepos) AddRepo(repo *v1alpha1.HelmRepo) error {
|
||||
return c.addRepo(repo, false)
|
||||
}
|
||||
|
||||
//Add new Repo to cachedRepos
|
||||
// Add a new Repo to cachedRepos
|
||||
func (c *cachedRepos) addRepo(repo *v1alpha1.HelmRepo, builtin bool) error {
|
||||
if len(repo.Status.Data) == 0 {
|
||||
return nil
|
||||
@@ -173,9 +232,6 @@ func (c *cachedRepos) addRepo(repo *v1alpha1.HelmRepo, builtin bool) error {
|
||||
|
||||
repoId := repo.GetHelmRepoId()
|
||||
c.repos[repoId] = repo
|
||||
if _, exists := c.repoCtgCounts[repoId]; !exists {
|
||||
c.repoCtgCounts[repoId] = map[string]int{}
|
||||
}
|
||||
var appName string
|
||||
|
||||
chartsCount := 0
|
||||
@@ -189,7 +245,7 @@ func (c *cachedRepos) addRepo(repo *v1alpha1.HelmRepo, builtin bool) error {
|
||||
|
||||
appLabels[constants.ChartRepoIdLabelKey] = repoId
|
||||
|
||||
HelmApp := v1alpha1.HelmApplication{
|
||||
helmApp := v1alpha1.HelmApplication{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: appName,
|
||||
Annotations: map[string]string{
|
||||
@@ -207,7 +263,7 @@ func (c *cachedRepos) addRepo(repo *v1alpha1.HelmRepo, builtin bool) error {
|
||||
State: v1alpha1.StateActive,
|
||||
},
|
||||
}
|
||||
c.apps[app.ApplicationId] = &HelmApp
|
||||
c.apps[app.ApplicationId] = &helmApp
|
||||
|
||||
var ctg, appVerName string
|
||||
var chartData []byte
|
||||
@@ -215,11 +271,10 @@ func (c *cachedRepos) addRepo(repo *v1alpha1.HelmRepo, builtin bool) error {
|
||||
var latestSemver *semver.Version
|
||||
|
||||
// build all the versions of this app
|
||||
for _, ver := range app.Charts {
|
||||
for _, chartVersion := range app.Charts {
|
||||
chartsCount += 1
|
||||
ctg = ver.Annotations["category"]
|
||||
|
||||
appVerName = ver.ApplicationVersionId
|
||||
appVerName = chartVersion.ApplicationVersionId
|
||||
version := &v1alpha1.HelmApplicationVersion{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: appVerName,
|
||||
@@ -228,23 +283,23 @@ func (c *cachedRepos) addRepo(repo *v1alpha1.HelmRepo, builtin bool) error {
|
||||
constants.ChartApplicationIdLabelKey: appName,
|
||||
constants.ChartRepoIdLabelKey: repo.GetHelmRepoId(),
|
||||
},
|
||||
CreationTimestamp: metav1.Time{Time: ver.Created},
|
||||
CreationTimestamp: metav1.Time{Time: chartVersion.Created},
|
||||
},
|
||||
Spec: v1alpha1.HelmApplicationVersionSpec{
|
||||
Metadata: &v1alpha1.Metadata{
|
||||
Name: ver.Name,
|
||||
AppVersion: ver.AppVersion,
|
||||
Version: ver.Version,
|
||||
Name: chartVersion.Name,
|
||||
AppVersion: chartVersion.AppVersion,
|
||||
Version: chartVersion.Version,
|
||||
},
|
||||
URLs: ver.URLs,
|
||||
Digest: ver.Digest,
|
||||
URLs: chartVersion.URLs,
|
||||
Digest: chartVersion.Digest,
|
||||
Data: chartData,
|
||||
},
|
||||
Status: v1alpha1.HelmApplicationVersionStatus{
|
||||
State: v1alpha1.StateActive,
|
||||
},
|
||||
}
|
||||
c.versions[ver.ApplicationVersionId] = version
|
||||
c.versions[chartVersion.ApplicationVersionId] = version
|
||||
|
||||
// Find the latest version.
|
||||
currSemver, err := semver.NewVersion(version.GetSemver())
|
||||
@@ -253,10 +308,14 @@ func (c *cachedRepos) addRepo(repo *v1alpha1.HelmRepo, builtin bool) error {
|
||||
// the first valid semver
|
||||
latestSemver = currSemver
|
||||
latestVersionName = version.GetVersionName()
|
||||
|
||||
// Use the category of the latest version as the category of the app.
|
||||
ctg = chartVersion.Annotations[CategoryAnnotationKey]
|
||||
} else if latestSemver.LessThan(currSemver) {
|
||||
// find a newer valid semver
|
||||
latestSemver = currSemver
|
||||
latestVersionName = version.GetVersionName()
|
||||
ctg = chartVersion.Annotations[CategoryAnnotationKey]
|
||||
}
|
||||
} else {
|
||||
// If the semver is invalid, just ignore it.
|
||||
@@ -264,25 +323,17 @@ func (c *cachedRepos) addRepo(repo *v1alpha1.HelmRepo, builtin bool) error {
|
||||
}
|
||||
}
|
||||
|
||||
HelmApp.Status.LatestVersion = latestVersionName
|
||||
helmApp.Status.LatestVersion = latestVersionName
|
||||
|
||||
//modify application category
|
||||
ctgId := ""
|
||||
if ctg != "" {
|
||||
if c.apps[app.ApplicationId].Annotations == nil {
|
||||
c.apps[app.ApplicationId].Annotations = map[string]string{constants.CategoryIdLabelKey: ctg}
|
||||
} else {
|
||||
c.apps[app.ApplicationId].Annotations[constants.CategoryIdLabelKey] = ctg
|
||||
if helmrepoindex.IsBuiltInRepo(repo.Name) {
|
||||
// Add category id to the apps in the built-in repo
|
||||
ctgId := c.translateCategoryNameToId(ctg)
|
||||
if helmApp.Labels == nil {
|
||||
helmApp.Labels = map[string]string{}
|
||||
}
|
||||
ctgId = ctg
|
||||
} else {
|
||||
ctgId = v1alpha1.UncategorizedId
|
||||
}
|
||||
helmApp.Labels[constants.CategoryIdLabelKey] = ctgId
|
||||
|
||||
if _, exists := c.repoCtgCounts[repoId][ctgId]; !exists {
|
||||
c.repoCtgCounts[repoId][ctgId] = 1
|
||||
} else {
|
||||
c.repoCtgCounts[repoId][ctgId] += 1
|
||||
c.builtinCategoryCounts[ctgId] += 1
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user