diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index b326ad0b9..bebd07a16 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -26,6 +26,9 @@ import ( "sync" "time" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/retry" + "github.com/emicklei/go-restful" extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -254,7 +257,7 @@ func (s *APIServer) installKubeSphereAPIs(stopCh <-chan struct{}) { urlruntime.Must(alertingv1.AddToContainer(s.container, s.Config.AlertingOptions.Endpoint)) urlruntime.Must(alertingv2alpha1.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.Prometheus(), s.AlertingClient, s.Config.AlertingOptions)) - urlruntime.Must(version.AddToContainer(s.container, s.KubernetesClient.Discovery())) + urlruntime.Must(version.AddToContainer(s.container, s.KubernetesClient.Kubernetes().Discovery())) urlruntime.Must(kubeedgev1alpha1.AddToContainer(s.container, s.Config.KubeEdgeOptions.Endpoint)) urlruntime.Must(edgeruntimev1alpha1.AddToContainer(s.container, s.Config.EdgeRuntimeOptions.Endpoint)) urlruntime.Must(notificationkapisv2beta1.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.Kubernetes(), @@ -367,211 +370,215 @@ func (s *APIServer) waitForResourceSync(ctx context.Context) error { klog.V(0).Info("Start cache objects") stopCh := ctx.Done() - - discoveryClient := s.KubernetesClient.Kubernetes().Discovery() - _, apiResourcesList, err := discoveryClient.ServerGroupsAndResources() - if err != nil { - return err - } - - isResourceExists := func(resource schema.GroupVersionResource) bool { - for _, apiResource := range apiResourcesList { - if apiResource.GroupVersion == resource.GroupVersion().String() { - for _, rsc := range apiResource.APIResources { - if rsc.Name == resource.Resource { - return true - } - } + isResourceExists := func(apiResources []v1.APIResource, resource schema.GroupVersionResource) bool { + for _, apiResource := range apiResources { + if apiResource.Name == resource.Resource { + return true } } return false } - // resources we have to create informer first - k8sGVRs := []schema.GroupVersionResource{ - {Group: "", Version: "v1", Resource: "namespaces"}, - {Group: "", Version: "v1", Resource: "nodes"}, - {Group: "", Version: "v1", Resource: "resourcequotas"}, - {Group: "", Version: "v1", Resource: "pods"}, - {Group: "", Version: "v1", Resource: "services"}, - {Group: "", Version: "v1", Resource: "persistentvolumeclaims"}, - {Group: "", Version: "v1", Resource: "persistentvolumes"}, - {Group: "", Version: "v1", Resource: "secrets"}, - {Group: "", Version: "v1", Resource: "configmaps"}, - {Group: "", Version: "v1", Resource: "serviceaccounts"}, - - {Group: "rbac.authorization.k8s.io", Version: "v1", Resource: "roles"}, - {Group: "rbac.authorization.k8s.io", Version: "v1", Resource: "rolebindings"}, - {Group: "rbac.authorization.k8s.io", Version: "v1", Resource: "clusterroles"}, - {Group: "rbac.authorization.k8s.io", Version: "v1", Resource: "clusterrolebindings"}, - {Group: "apps", Version: "v1", Resource: "deployments"}, - {Group: "apps", Version: "v1", Resource: "daemonsets"}, - {Group: "apps", Version: "v1", Resource: "replicasets"}, - {Group: "apps", Version: "v1", Resource: "statefulsets"}, - {Group: "apps", Version: "v1", Resource: "controllerrevisions"}, - {Group: "storage.k8s.io", Version: "v1", Resource: "storageclasses"}, - {Group: "batch", Version: "v1", Resource: "jobs"}, - {Group: "batch", Version: "v1beta1", Resource: "cronjobs"}, - {Group: "networking.k8s.io", Version: "v1", Resource: "ingresses"}, - {Group: "autoscaling", Version: "v2beta2", Resource: "horizontalpodautoscalers"}, - {Group: "networking.k8s.io", Version: "v1", Resource: "networkpolicies"}, - } - - for _, gvr := range k8sGVRs { - if !isResourceExists(gvr) { - klog.Warningf("resource %s not exists in the cluster", gvr) - } else { - _, err := s.InformerFactory.KubernetesSharedInformerFactory().ForResource(gvr) - if err != nil { - klog.Errorf("cannot create informer for %s", gvr) + type informerForResourceFunc func(resource schema.GroupVersionResource) (interface{}, error) + waitForResourceSync := func(sharedInformerFactory informers.GenericInformerFactory, informerForResourceFunc informerForResourceFunc, GVRs map[schema.GroupVersion][]string) error { + for groupVersion, resourceNames := range GVRs { + var apiResourceList *v1.APIResourceList + var err error + err = retry.OnError(retry.DefaultRetry, func(err error) bool { + return err != nil + }, func() error { + apiResourceList, err = s.KubernetesClient.Kubernetes().Discovery().ServerResourcesForGroupVersion(groupVersion.String()) return err + }) + if err != nil { + return fmt.Errorf("failed to fetch group version resources: %s", err) + } + for _, resourceName := range resourceNames { + groupVersionResource := groupVersion.WithResource(resourceName) + if !isResourceExists(apiResourceList.APIResources, groupVersionResource) { + klog.Warningf("resource %s not exists in the cluster", groupVersionResource) + } else { + // reflect.ValueOf(sharedInformerFactory).MethodByName("ForResource").Call([]reflect.Value{reflect.ValueOf(groupVersionResource)}) + if _, err = informerForResourceFunc(groupVersionResource); err != nil { + return fmt.Errorf("failed to create informer for %s: %s", groupVersionResource, err) + } + } } } + sharedInformerFactory.Start(stopCh) + sharedInformerFactory.WaitForCacheSync(stopCh) + return nil } - s.InformerFactory.KubernetesSharedInformerFactory().Start(stopCh) - s.InformerFactory.KubernetesSharedInformerFactory().WaitForCacheSync(stopCh) - - ksInformerFactory := s.InformerFactory.KubeSphereSharedInformerFactory() - - ksGVRs := []schema.GroupVersionResource{ - {Group: "tenant.kubesphere.io", Version: "v1alpha1", Resource: "workspaces"}, - {Group: "tenant.kubesphere.io", Version: "v1alpha2", Resource: "workspacetemplates"}, - {Group: "iam.kubesphere.io", Version: "v1alpha2", Resource: "users"}, - {Group: "iam.kubesphere.io", Version: "v1alpha2", Resource: "globalroles"}, - {Group: "iam.kubesphere.io", Version: "v1alpha2", Resource: "globalrolebindings"}, - {Group: "iam.kubesphere.io", Version: "v1alpha2", Resource: "groups"}, - {Group: "iam.kubesphere.io", Version: "v1alpha2", Resource: "groupbindings"}, - {Group: "iam.kubesphere.io", Version: "v1alpha2", Resource: "workspaceroles"}, - {Group: "iam.kubesphere.io", Version: "v1alpha2", Resource: "workspacerolebindings"}, - {Group: "iam.kubesphere.io", Version: "v1alpha2", Resource: "loginrecords"}, - {Group: "cluster.kubesphere.io", Version: "v1alpha1", Resource: "clusters"}, - {Group: "network.kubesphere.io", Version: "v1alpha1", Resource: "ippools"}, - {Group: "notification.kubesphere.io", Version: "v2beta1", Resource: notificationv2beta1.ResourcesPluralConfig}, - {Group: "notification.kubesphere.io", Version: "v2beta1", Resource: notificationv2beta1.ResourcesPluralReceiver}, + // resources we have to create informer first + k8sGVRs := map[schema.GroupVersion][]string{ + {Group: "", Version: "v1"}: { + "namespaces", + "nodes", + "resourcequotas", + "pods", + "services", + "persistentvolumeclaims", + "persistentvolumes", + "secrets", + "configmaps", + "serviceaccounts", + }, + {Group: "rbac.authorization.k8s.io", Version: "v1"}: { + "roles", + "rolebindings", + "clusterroles", + "clusterrolebindings", + }, + {Group: "apps", Version: "v1"}: { + "deployments", + "daemonsets", + "replicasets", + "statefulsets", + "controllerrevisions", + }, + {Group: "storage.k8s.io", Version: "v1"}: { + "storageclasses", + }, + {Group: "batch", Version: "v1"}: { + "jobs", + "cronjobs", + }, + {Group: "networking.k8s.io", Version: "v1"}: { + "ingresses", + "networkpolicies", + }, + {Group: "autoscaling", Version: "v2beta2"}: { + "horizontalpodautoscalers", + }, } - devopsGVRs := []schema.GroupVersionResource{ - {Group: "devops.kubesphere.io", Version: "v1alpha1", Resource: "s2ibinaries"}, - {Group: "devops.kubesphere.io", Version: "v1alpha1", Resource: "s2ibuildertemplates"}, - {Group: "devops.kubesphere.io", Version: "v1alpha1", Resource: "s2iruns"}, - {Group: "devops.kubesphere.io", Version: "v1alpha1", Resource: "s2ibuilders"}, - {Group: "devops.kubesphere.io", Version: "v1alpha3", Resource: "devopsprojects"}, - {Group: "devops.kubesphere.io", Version: "v1alpha3", Resource: "pipelines"}, + if err := waitForResourceSync(s.InformerFactory.KubernetesSharedInformerFactory(), func(resource schema.GroupVersionResource) (interface{}, error) { + return s.InformerFactory.KubernetesSharedInformerFactory().ForResource(resource) + }, k8sGVRs); err != nil { + return err } - servicemeshGVRs := []schema.GroupVersionResource{ - {Group: "servicemesh.kubesphere.io", Version: "v1alpha2", Resource: "strategies"}, - {Group: "servicemesh.kubesphere.io", Version: "v1alpha2", Resource: "servicepolicies"}, - } - - // federated resources on cached in multi cluster setup - federatedResourceGVRs := []schema.GroupVersionResource{ - typesv1beta1.SchemeGroupVersion.WithResource(typesv1beta1.ResourcePluralFederatedClusterRole), - typesv1beta1.SchemeGroupVersion.WithResource(typesv1beta1.ResourcePluralFederatedClusterRoleBindingBinding), - typesv1beta1.SchemeGroupVersion.WithResource(typesv1beta1.ResourcePluralFederatedNamespace), - typesv1beta1.SchemeGroupVersion.WithResource(typesv1beta1.ResourcePluralFederatedService), - typesv1beta1.SchemeGroupVersion.WithResource(typesv1beta1.ResourcePluralFederatedDeployment), - typesv1beta1.SchemeGroupVersion.WithResource(typesv1beta1.ResourcePluralFederatedSecret), - typesv1beta1.SchemeGroupVersion.WithResource(typesv1beta1.ResourcePluralFederatedConfigmap), - typesv1beta1.SchemeGroupVersion.WithResource(typesv1beta1.ResourcePluralFederatedStatefulSet), - typesv1beta1.SchemeGroupVersion.WithResource(typesv1beta1.ResourcePluralFederatedIngress), - typesv1beta1.SchemeGroupVersion.WithResource(typesv1beta1.ResourcePluralFederatedResourceQuota), - typesv1beta1.SchemeGroupVersion.WithResource(typesv1beta1.ResourcePluralFederatedPersistentVolumeClaim), - typesv1beta1.SchemeGroupVersion.WithResource(typesv1beta1.ResourcePluralFederatedApplication), + ksGVRs := map[schema.GroupVersion][]string{ + {Group: "tenant.kubesphere.io", Version: "v1alpha1"}: { + "workspaces", + }, + {Group: "tenant.kubesphere.io", Version: "v1alpha2"}: { + "workspacetemplates", + }, + {Group: "iam.kubesphere.io", Version: "v1alpha2"}: { + "users", + "globalroles", + "globalrolebindings", + "groups", + "groupbindings", + "workspaceroles", + "workspacerolebindings", + "loginrecords", + }, + {Group: "cluster.kubesphere.io", Version: "v1alpha1"}: { + "clusters", + }, + {Group: "network.kubesphere.io", Version: "v1alpha1"}: { + "ippools", + }, + {Group: "notification.kubesphere.io", Version: "v2beta1"}: { + notificationv2beta1.ResourcesPluralConfig, + notificationv2beta1.ResourcesPluralReceiver, + }, } // skip caching devops resources if devops not enabled if s.DevopsClient != nil { - ksGVRs = append(ksGVRs, devopsGVRs...) + ksGVRs[schema.GroupVersion{Group: "devops.kubesphere.io", Version: "v1alpah1"}] = []string{ + "s2ibinaries", + "s2ibuildertemplates", + "s2iruns", + "s2ibuilders", + } + ksGVRs[schema.GroupVersion{Group: "devops.kubesphere.io", Version: "v1alpah3"}] = []string{ + "devopsprojects", + "pipelines", + } } // skip caching servicemesh resources if servicemesh not enabled if s.KubernetesClient.Istio() != nil { - ksGVRs = append(ksGVRs, servicemeshGVRs...) + ksGVRs[schema.GroupVersion{Group: "servicemesh.kubesphere.io", Version: "v1alpha2"}] = []string{ + "strategies", + "servicepolicies", + } } + // federated resources on cached in multi cluster setup if s.Config.MultiClusterOptions.Enable { - ksGVRs = append(ksGVRs, federatedResourceGVRs...) - } - - for _, gvr := range ksGVRs { - if !isResourceExists(gvr) { - klog.Warningf("resource %s not exists in the cluster", gvr) - } else { - _, err = ksInformerFactory.ForResource(gvr) - if err != nil { - return err - } + ksGVRs[typesv1beta1.SchemeGroupVersion] = []string{ + typesv1beta1.ResourcePluralFederatedClusterRole, + typesv1beta1.ResourcePluralFederatedClusterRoleBindingBinding, + typesv1beta1.ResourcePluralFederatedNamespace, + typesv1beta1.ResourcePluralFederatedService, + typesv1beta1.ResourcePluralFederatedDeployment, + typesv1beta1.ResourcePluralFederatedSecret, + typesv1beta1.ResourcePluralFederatedConfigmap, + typesv1beta1.ResourcePluralFederatedStatefulSet, + typesv1beta1.ResourcePluralFederatedIngress, + typesv1beta1.ResourcePluralFederatedResourceQuota, + typesv1beta1.ResourcePluralFederatedPersistentVolumeClaim, + typesv1beta1.ResourcePluralFederatedApplication, } } - ksInformerFactory.Start(stopCh) - ksInformerFactory.WaitForCacheSync(stopCh) - - snapshotInformerFactory := s.InformerFactory.SnapshotSharedInformerFactory() - snapshotGVRs := []schema.GroupVersionResource{ - {Group: "snapshot.storage.k8s.io", Version: "v1", Resource: "volumesnapshotclasses"}, - {Group: "snapshot.storage.k8s.io", Version: "v1", Resource: "volumesnapshots"}, - {Group: "snapshot.storage.k8s.io", Version: "v1", Resource: "volumesnapshotcontents"}, - } - for _, gvr := range snapshotGVRs { - if !isResourceExists(gvr) { - klog.Warningf("resource %s not exists in the cluster", gvr) - } else { - _, err = snapshotInformerFactory.ForResource(gvr) - if err != nil { - return err - } - } - } - snapshotInformerFactory.Start(stopCh) - snapshotInformerFactory.WaitForCacheSync(stopCh) - - apiextensionsInformerFactory := s.InformerFactory.ApiExtensionSharedInformerFactory() - apiextensionsGVRs := []schema.GroupVersionResource{ - {Group: "apiextensions.k8s.io", Version: "v1", Resource: "customresourcedefinitions"}, + if err := waitForResourceSync(s.InformerFactory.KubeSphereSharedInformerFactory(), func(resource schema.GroupVersionResource) (interface{}, error) { + return s.InformerFactory.KubeSphereSharedInformerFactory().ForResource(resource) + }, ksGVRs); err != nil { + return err } - for _, gvr := range apiextensionsGVRs { - if !isResourceExists(gvr) { - klog.Warningf("resource %s not exists in the cluster", gvr) - } else { - _, err = apiextensionsInformerFactory.ForResource(gvr) - if err != nil { - return err - } - } + snapshotGVRs := map[schema.GroupVersion][]string{ + {Group: "snapshot.storage.k8s.io", Version: "v1"}: { + "volumesnapshots", + "volumesnapshotcontents", + "volumesnapshotclasses", + }, + } + + if err := waitForResourceSync(s.InformerFactory.SnapshotSharedInformerFactory(), func(resource schema.GroupVersionResource) (interface{}, error) { + return s.InformerFactory.SnapshotSharedInformerFactory().ForResource(resource) + }, snapshotGVRs); err != nil { + return err + } + + apiextensionsGVRs := map[schema.GroupVersion][]string{ + {Group: "apiextensions.k8s.io", Version: "v1"}: { + "customresourcedefinitions", + }, + } + + if err := waitForResourceSync(s.InformerFactory.ApiExtensionSharedInformerFactory(), func(resource schema.GroupVersionResource) (interface{}, error) { + return s.InformerFactory.ApiExtensionSharedInformerFactory().ForResource(resource) + }, apiextensionsGVRs); err != nil { + return err } - apiextensionsInformerFactory.Start(stopCh) - apiextensionsInformerFactory.WaitForCacheSync(stopCh) if promFactory := s.InformerFactory.PrometheusSharedInformerFactory(); promFactory != nil { - prometheusGVRs := []schema.GroupVersionResource{ - {Group: "monitoring.coreos.com", Version: "v1", Resource: "prometheuses"}, - {Group: "monitoring.coreos.com", Version: "v1", Resource: "prometheusrules"}, - {Group: "monitoring.coreos.com", Version: "v1", Resource: "thanosrulers"}, + prometheusGVRs := map[schema.GroupVersion][]string{ + {Group: "monitoring.coreos.com", Version: "v1"}: { + "prometheuses", + "prometheusrules", + "thanosrulers", + }, } - for _, gvr := range prometheusGVRs { - if isResourceExists(gvr) { - _, err = promFactory.ForResource(gvr) - if err != nil { - return err - } - } else { - klog.Warningf("resource %s not exists in the cluster", gvr) - } + if err := waitForResourceSync(promFactory, func(resource schema.GroupVersionResource) (interface{}, error) { + return promFactory.ForResource(resource) + }, prometheusGVRs); err != nil { + return err } - promFactory.Start(stopCh) - promFactory.WaitForCacheSync(stopCh) } - // controller runtime cache for resources go s.RuntimeCache.Start(ctx) s.RuntimeCache.WaitForCacheSync(ctx) klog.V(0).Info("Finished caching objects") - return nil } diff --git a/pkg/informers/informers.go b/pkg/informers/informers.go index 47149849d..98ba80e0d 100644 --- a/pkg/informers/informers.go +++ b/pkg/informers/informers.go @@ -17,6 +17,7 @@ limitations under the License. package informers import ( + "reflect" "time" snapshotclient "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned" @@ -51,6 +52,11 @@ type InformerFactory interface { Start(stopCh <-chan struct{}) } +type GenericInformerFactory interface { + Start(stopCh <-chan struct{}) + WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool +} + type informerFactories struct { informerFactory k8sinformers.SharedInformerFactory ksInformerFactory ksinformers.SharedInformerFactory diff --git a/pkg/simple/client/k8s/kubernetes.go b/pkg/simple/client/k8s/kubernetes.go index 178c5eca3..1ab32b0e8 100644 --- a/pkg/simple/client/k8s/kubernetes.go +++ b/pkg/simple/client/k8s/kubernetes.go @@ -23,7 +23,6 @@ import ( promresourcesclient "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned" istioclient "istio.io/client-go/pkg/clientset/versioned" apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" - "k8s.io/client-go/discovery" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -37,7 +36,6 @@ type Client interface { Istio() istioclient.Interface Snapshot() snapshotclient.Interface ApiExtensions() apiextensionsclient.Interface - Discovery() discovery.DiscoveryInterface Prometheus() promresourcesclient.Interface Master() string Config() *rest.Config @@ -47,9 +45,6 @@ type kubernetesClient struct { // kubernetes client interface k8s kubernetes.Interface - // discovery client - discoveryClient *discovery.DiscoveryClient - // generated clientset ks kubesphere.Interface @@ -77,15 +72,14 @@ func NewKubernetesClientOrDie(options *KubernetesOptions) Client { config.Burst = options.Burst k := &kubernetesClient{ - k8s: kubernetes.NewForConfigOrDie(config), - discoveryClient: discovery.NewDiscoveryClientForConfigOrDie(config), - ks: kubesphere.NewForConfigOrDie(config), - istio: istioclient.NewForConfigOrDie(config), - snapshot: snapshotclient.NewForConfigOrDie(config), - apiextensions: apiextensionsclient.NewForConfigOrDie(config), - prometheus: promresourcesclient.NewForConfigOrDie(config), - master: config.Host, - config: config, + k8s: kubernetes.NewForConfigOrDie(config), + ks: kubesphere.NewForConfigOrDie(config), + istio: istioclient.NewForConfigOrDie(config), + snapshot: snapshotclient.NewForConfigOrDie(config), + apiextensions: apiextensionsclient.NewForConfigOrDie(config), + prometheus: promresourcesclient.NewForConfigOrDie(config), + master: config.Host, + config: config, } if options.Master != "" { @@ -116,11 +110,6 @@ func NewKubernetesClient(options *KubernetesOptions) (Client, error) { return nil, err } - k.discoveryClient, err = discovery.NewDiscoveryClientForConfig(config) - if err != nil { - return nil, err - } - k.ks, err = kubesphere.NewForConfig(config) if err != nil { return nil, err @@ -157,10 +146,6 @@ func (k *kubernetesClient) Kubernetes() kubernetes.Interface { return k.k8s } -func (k *kubernetesClient) Discovery() discovery.DiscoveryInterface { - return k.discoveryClient -} - func (k *kubernetesClient) KubeSphere() kubesphere.Interface { return k.ks }