diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index bebd07a16..5566df5ff 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -26,6 +26,9 @@ import ( "sync" "time" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/client-go/discovery" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/util/retry" @@ -366,50 +369,51 @@ func (s *APIServer) buildHandlerChain(stopCh <-chan struct{}) { s.Server.Handler = handler } +func isResourceExists(apiResources []v1.APIResource, resource schema.GroupVersionResource) bool { + for _, apiResource := range apiResources { + if apiResource.Name == resource.Resource { + return true + } + } + return false +} + +type informerForResourceFunc func(resource schema.GroupVersionResource) (interface{}, error) + +func waitForCacheSync(discoveryClient discovery.DiscoveryInterface, sharedInformerFactory informers.GenericInformerFactory, informerForResourceFunc informerForResourceFunc, GVRs map[schema.GroupVersion][]string, stopCh <-chan struct{}) error { + for groupVersion, resourceNames := range GVRs { + var apiResourceList *v1.APIResourceList + var err error + err = retry.OnError(retry.DefaultRetry, func(err error) bool { + return !errors.IsNotFound(err) + }, func() error { + apiResourceList, err = discoveryClient.ServerResourcesForGroupVersion(groupVersion.String()) + return err + }) + if err != nil { + return fmt.Errorf("failed to fetch group version resources %s: %s", groupVersion, err) + } + for _, resourceName := range resourceNames { + groupVersionResource := groupVersion.WithResource(resourceName) + if !isResourceExists(apiResourceList.APIResources, groupVersionResource) { + klog.Warningf("resource %s not exists in the cluster", groupVersionResource) + } else { + // reflect.ValueOf(sharedInformerFactory).MethodByName("ForResource").Call([]reflect.Value{reflect.ValueOf(groupVersionResource)}) + if _, err = informerForResourceFunc(groupVersionResource); err != nil { + return fmt.Errorf("failed to create informer for %s: %s", groupVersionResource, err) + } + } + } + } + sharedInformerFactory.Start(stopCh) + sharedInformerFactory.WaitForCacheSync(stopCh) + return nil +} + func (s *APIServer) waitForResourceSync(ctx context.Context) error { klog.V(0).Info("Start cache objects") stopCh := ctx.Done() - isResourceExists := func(apiResources []v1.APIResource, resource schema.GroupVersionResource) bool { - for _, apiResource := range apiResources { - if apiResource.Name == resource.Resource { - return true - } - } - return false - } - - type informerForResourceFunc func(resource schema.GroupVersionResource) (interface{}, error) - waitForResourceSync := func(sharedInformerFactory informers.GenericInformerFactory, informerForResourceFunc informerForResourceFunc, GVRs map[schema.GroupVersion][]string) error { - for groupVersion, resourceNames := range GVRs { - var apiResourceList *v1.APIResourceList - var err error - err = retry.OnError(retry.DefaultRetry, func(err error) bool { - return err != nil - }, func() error { - apiResourceList, err = s.KubernetesClient.Kubernetes().Discovery().ServerResourcesForGroupVersion(groupVersion.String()) - return err - }) - if err != nil { - return fmt.Errorf("failed to fetch group version resources: %s", err) - } - for _, resourceName := range resourceNames { - groupVersionResource := groupVersion.WithResource(resourceName) - if !isResourceExists(apiResourceList.APIResources, groupVersionResource) { - klog.Warningf("resource %s not exists in the cluster", groupVersionResource) - } else { - // reflect.ValueOf(sharedInformerFactory).MethodByName("ForResource").Call([]reflect.Value{reflect.ValueOf(groupVersionResource)}) - if _, err = informerForResourceFunc(groupVersionResource); err != nil { - return fmt.Errorf("failed to create informer for %s: %s", groupVersionResource, err) - } - } - } - } - sharedInformerFactory.Start(stopCh) - sharedInformerFactory.WaitForCacheSync(stopCh) - return nil - } - // resources we have to create informer first k8sGVRs := map[schema.GroupVersion][]string{ {Group: "", Version: "v1"}: { @@ -442,6 +446,8 @@ func (s *APIServer) waitForResourceSync(ctx context.Context) error { }, {Group: "batch", Version: "v1"}: { "jobs", + }, + {Group: "batch", Version: "v1beta1"}: { "cronjobs", }, {Group: "networking.k8s.io", Version: "v1"}: { @@ -453,9 +459,12 @@ func (s *APIServer) waitForResourceSync(ctx context.Context) error { }, } - if err := waitForResourceSync(s.InformerFactory.KubernetesSharedInformerFactory(), func(resource schema.GroupVersionResource) (interface{}, error) { - return s.InformerFactory.KubernetesSharedInformerFactory().ForResource(resource) - }, k8sGVRs); err != nil { + if err := waitForCacheSync(s.KubernetesClient.Kubernetes().Discovery(), + s.InformerFactory.KubernetesSharedInformerFactory(), + func(resource schema.GroupVersionResource) (interface{}, error) { + return s.InformerFactory.KubernetesSharedInformerFactory().ForResource(resource) + }, + k8sGVRs, stopCh); err != nil { return err } @@ -490,13 +499,13 @@ func (s *APIServer) waitForResourceSync(ctx context.Context) error { // skip caching devops resources if devops not enabled if s.DevopsClient != nil { - ksGVRs[schema.GroupVersion{Group: "devops.kubesphere.io", Version: "v1alpah1"}] = []string{ + ksGVRs[schema.GroupVersion{Group: "devops.kubesphere.io", Version: "v1alpha1"}] = []string{ "s2ibinaries", "s2ibuildertemplates", "s2iruns", "s2ibuilders", } - ksGVRs[schema.GroupVersion{Group: "devops.kubesphere.io", Version: "v1alpah3"}] = []string{ + ksGVRs[schema.GroupVersion{Group: "devops.kubesphere.io", Version: "v1alpha3"}] = []string{ "devopsprojects", "pipelines", } @@ -528,9 +537,12 @@ func (s *APIServer) waitForResourceSync(ctx context.Context) error { } } - if err := waitForResourceSync(s.InformerFactory.KubeSphereSharedInformerFactory(), func(resource schema.GroupVersionResource) (interface{}, error) { - return s.InformerFactory.KubeSphereSharedInformerFactory().ForResource(resource) - }, ksGVRs); err != nil { + if err := waitForCacheSync(s.KubernetesClient.Kubernetes().Discovery(), + s.InformerFactory.KubeSphereSharedInformerFactory(), + func(resource schema.GroupVersionResource) (interface{}, error) { + return s.InformerFactory.KubeSphereSharedInformerFactory().ForResource(resource) + }, + ksGVRs, stopCh); err != nil { return err } @@ -542,9 +554,11 @@ func (s *APIServer) waitForResourceSync(ctx context.Context) error { }, } - if err := waitForResourceSync(s.InformerFactory.SnapshotSharedInformerFactory(), func(resource schema.GroupVersionResource) (interface{}, error) { - return s.InformerFactory.SnapshotSharedInformerFactory().ForResource(resource) - }, snapshotGVRs); err != nil { + if err := waitForCacheSync(s.KubernetesClient.Kubernetes().Discovery(), + s.InformerFactory.SnapshotSharedInformerFactory(), func(resource schema.GroupVersionResource) (interface{}, error) { + return s.InformerFactory.SnapshotSharedInformerFactory().ForResource(resource) + }, + snapshotGVRs, stopCh); err != nil { return err } @@ -554,9 +568,11 @@ func (s *APIServer) waitForResourceSync(ctx context.Context) error { }, } - if err := waitForResourceSync(s.InformerFactory.ApiExtensionSharedInformerFactory(), func(resource schema.GroupVersionResource) (interface{}, error) { - return s.InformerFactory.ApiExtensionSharedInformerFactory().ForResource(resource) - }, apiextensionsGVRs); err != nil { + if err := waitForCacheSync(s.KubernetesClient.Kubernetes().Discovery(), + s.InformerFactory.ApiExtensionSharedInformerFactory(), func(resource schema.GroupVersionResource) (interface{}, error) { + return s.InformerFactory.ApiExtensionSharedInformerFactory().ForResource(resource) + }, + apiextensionsGVRs, stopCh); err != nil { return err } @@ -568,9 +584,11 @@ func (s *APIServer) waitForResourceSync(ctx context.Context) error { "thanosrulers", }, } - if err := waitForResourceSync(promFactory, func(resource schema.GroupVersionResource) (interface{}, error) { - return promFactory.ForResource(resource) - }, prometheusGVRs); err != nil { + if err := waitForCacheSync(s.KubernetesClient.Kubernetes().Discovery(), + promFactory, func(resource schema.GroupVersionResource) (interface{}, error) { + return promFactory.ForResource(resource) + }, + prometheusGVRs, stopCh); err != nil { return err } }