Merge pull request #4838 from wansir/fix-4039

Fix typo
This commit is contained in:
KubeSphere CI Bot
2022-05-05 09:47:30 +08:00
committed by GitHub

View File

@@ -26,6 +26,9 @@ import (
"sync"
"time"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/discovery"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
@@ -366,50 +369,51 @@ func (s *APIServer) buildHandlerChain(stopCh <-chan struct{}) {
s.Server.Handler = handler
}
func isResourceExists(apiResources []v1.APIResource, resource schema.GroupVersionResource) bool {
for _, apiResource := range apiResources {
if apiResource.Name == resource.Resource {
return true
}
}
return false
}
type informerForResourceFunc func(resource schema.GroupVersionResource) (interface{}, error)
func waitForCacheSync(discoveryClient discovery.DiscoveryInterface, sharedInformerFactory informers.GenericInformerFactory, informerForResourceFunc informerForResourceFunc, GVRs map[schema.GroupVersion][]string, stopCh <-chan struct{}) error {
for groupVersion, resourceNames := range GVRs {
var apiResourceList *v1.APIResourceList
var err error
err = retry.OnError(retry.DefaultRetry, func(err error) bool {
return !errors.IsNotFound(err)
}, func() error {
apiResourceList, err = discoveryClient.ServerResourcesForGroupVersion(groupVersion.String())
return err
})
if err != nil {
return fmt.Errorf("failed to fetch group version resources %s: %s", groupVersion, err)
}
for _, resourceName := range resourceNames {
groupVersionResource := groupVersion.WithResource(resourceName)
if !isResourceExists(apiResourceList.APIResources, groupVersionResource) {
klog.Warningf("resource %s not exists in the cluster", groupVersionResource)
} else {
// reflect.ValueOf(sharedInformerFactory).MethodByName("ForResource").Call([]reflect.Value{reflect.ValueOf(groupVersionResource)})
if _, err = informerForResourceFunc(groupVersionResource); err != nil {
return fmt.Errorf("failed to create informer for %s: %s", groupVersionResource, err)
}
}
}
}
sharedInformerFactory.Start(stopCh)
sharedInformerFactory.WaitForCacheSync(stopCh)
return nil
}
func (s *APIServer) waitForResourceSync(ctx context.Context) error {
klog.V(0).Info("Start cache objects")
stopCh := ctx.Done()
isResourceExists := func(apiResources []v1.APIResource, resource schema.GroupVersionResource) bool {
for _, apiResource := range apiResources {
if apiResource.Name == resource.Resource {
return true
}
}
return false
}
type informerForResourceFunc func(resource schema.GroupVersionResource) (interface{}, error)
waitForResourceSync := func(sharedInformerFactory informers.GenericInformerFactory, informerForResourceFunc informerForResourceFunc, GVRs map[schema.GroupVersion][]string) error {
for groupVersion, resourceNames := range GVRs {
var apiResourceList *v1.APIResourceList
var err error
err = retry.OnError(retry.DefaultRetry, func(err error) bool {
return err != nil
}, func() error {
apiResourceList, err = s.KubernetesClient.Kubernetes().Discovery().ServerResourcesForGroupVersion(groupVersion.String())
return err
})
if err != nil {
return fmt.Errorf("failed to fetch group version resources: %s", err)
}
for _, resourceName := range resourceNames {
groupVersionResource := groupVersion.WithResource(resourceName)
if !isResourceExists(apiResourceList.APIResources, groupVersionResource) {
klog.Warningf("resource %s not exists in the cluster", groupVersionResource)
} else {
// reflect.ValueOf(sharedInformerFactory).MethodByName("ForResource").Call([]reflect.Value{reflect.ValueOf(groupVersionResource)})
if _, err = informerForResourceFunc(groupVersionResource); err != nil {
return fmt.Errorf("failed to create informer for %s: %s", groupVersionResource, err)
}
}
}
}
sharedInformerFactory.Start(stopCh)
sharedInformerFactory.WaitForCacheSync(stopCh)
return nil
}
// resources we have to create informer first
k8sGVRs := map[schema.GroupVersion][]string{
{Group: "", Version: "v1"}: {
@@ -442,6 +446,8 @@ func (s *APIServer) waitForResourceSync(ctx context.Context) error {
},
{Group: "batch", Version: "v1"}: {
"jobs",
},
{Group: "batch", Version: "v1beta1"}: {
"cronjobs",
},
{Group: "networking.k8s.io", Version: "v1"}: {
@@ -453,9 +459,12 @@ func (s *APIServer) waitForResourceSync(ctx context.Context) error {
},
}
if err := waitForResourceSync(s.InformerFactory.KubernetesSharedInformerFactory(), func(resource schema.GroupVersionResource) (interface{}, error) {
return s.InformerFactory.KubernetesSharedInformerFactory().ForResource(resource)
}, k8sGVRs); err != nil {
if err := waitForCacheSync(s.KubernetesClient.Kubernetes().Discovery(),
s.InformerFactory.KubernetesSharedInformerFactory(),
func(resource schema.GroupVersionResource) (interface{}, error) {
return s.InformerFactory.KubernetesSharedInformerFactory().ForResource(resource)
},
k8sGVRs, stopCh); err != nil {
return err
}
@@ -490,13 +499,13 @@ func (s *APIServer) waitForResourceSync(ctx context.Context) error {
// skip caching devops resources if devops not enabled
if s.DevopsClient != nil {
ksGVRs[schema.GroupVersion{Group: "devops.kubesphere.io", Version: "v1alpah1"}] = []string{
ksGVRs[schema.GroupVersion{Group: "devops.kubesphere.io", Version: "v1alpha1"}] = []string{
"s2ibinaries",
"s2ibuildertemplates",
"s2iruns",
"s2ibuilders",
}
ksGVRs[schema.GroupVersion{Group: "devops.kubesphere.io", Version: "v1alpah3"}] = []string{
ksGVRs[schema.GroupVersion{Group: "devops.kubesphere.io", Version: "v1alpha3"}] = []string{
"devopsprojects",
"pipelines",
}
@@ -528,9 +537,12 @@ func (s *APIServer) waitForResourceSync(ctx context.Context) error {
}
}
if err := waitForResourceSync(s.InformerFactory.KubeSphereSharedInformerFactory(), func(resource schema.GroupVersionResource) (interface{}, error) {
return s.InformerFactory.KubeSphereSharedInformerFactory().ForResource(resource)
}, ksGVRs); err != nil {
if err := waitForCacheSync(s.KubernetesClient.Kubernetes().Discovery(),
s.InformerFactory.KubeSphereSharedInformerFactory(),
func(resource schema.GroupVersionResource) (interface{}, error) {
return s.InformerFactory.KubeSphereSharedInformerFactory().ForResource(resource)
},
ksGVRs, stopCh); err != nil {
return err
}
@@ -542,9 +554,11 @@ func (s *APIServer) waitForResourceSync(ctx context.Context) error {
},
}
if err := waitForResourceSync(s.InformerFactory.SnapshotSharedInformerFactory(), func(resource schema.GroupVersionResource) (interface{}, error) {
return s.InformerFactory.SnapshotSharedInformerFactory().ForResource(resource)
}, snapshotGVRs); err != nil {
if err := waitForCacheSync(s.KubernetesClient.Kubernetes().Discovery(),
s.InformerFactory.SnapshotSharedInformerFactory(), func(resource schema.GroupVersionResource) (interface{}, error) {
return s.InformerFactory.SnapshotSharedInformerFactory().ForResource(resource)
},
snapshotGVRs, stopCh); err != nil {
return err
}
@@ -554,9 +568,11 @@ func (s *APIServer) waitForResourceSync(ctx context.Context) error {
},
}
if err := waitForResourceSync(s.InformerFactory.ApiExtensionSharedInformerFactory(), func(resource schema.GroupVersionResource) (interface{}, error) {
return s.InformerFactory.ApiExtensionSharedInformerFactory().ForResource(resource)
}, apiextensionsGVRs); err != nil {
if err := waitForCacheSync(s.KubernetesClient.Kubernetes().Discovery(),
s.InformerFactory.ApiExtensionSharedInformerFactory(), func(resource schema.GroupVersionResource) (interface{}, error) {
return s.InformerFactory.ApiExtensionSharedInformerFactory().ForResource(resource)
},
apiextensionsGVRs, stopCh); err != nil {
return err
}
@@ -568,9 +584,11 @@ func (s *APIServer) waitForResourceSync(ctx context.Context) error {
"thanosrulers",
},
}
if err := waitForResourceSync(promFactory, func(resource schema.GroupVersionResource) (interface{}, error) {
return promFactory.ForResource(resource)
}, prometheusGVRs); err != nil {
if err := waitForCacheSync(s.KubernetesClient.Kubernetes().Discovery(),
promFactory, func(resource schema.GroupVersionResource) (interface{}, error) {
return promFactory.ForResource(resource)
},
prometheusGVRs, stopCh); err != nil {
return err
}
}