Fix typo
This commit is contained in:
@@ -26,6 +26,9 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/client-go/discovery"
|
||||
|
||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/util/retry"
|
||||
|
||||
@@ -366,50 +369,51 @@ func (s *APIServer) buildHandlerChain(stopCh <-chan struct{}) {
|
||||
s.Server.Handler = handler
|
||||
}
|
||||
|
||||
func isResourceExists(apiResources []v1.APIResource, resource schema.GroupVersionResource) bool {
|
||||
for _, apiResource := range apiResources {
|
||||
if apiResource.Name == resource.Resource {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
type informerForResourceFunc func(resource schema.GroupVersionResource) (interface{}, error)
|
||||
|
||||
func waitForCacheSync(discoveryClient discovery.DiscoveryInterface, sharedInformerFactory informers.GenericInformerFactory, informerForResourceFunc informerForResourceFunc, GVRs map[schema.GroupVersion][]string, stopCh <-chan struct{}) error {
|
||||
for groupVersion, resourceNames := range GVRs {
|
||||
var apiResourceList *v1.APIResourceList
|
||||
var err error
|
||||
err = retry.OnError(retry.DefaultRetry, func(err error) bool {
|
||||
return !errors.IsNotFound(err)
|
||||
}, func() error {
|
||||
apiResourceList, err = discoveryClient.ServerResourcesForGroupVersion(groupVersion.String())
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to fetch group version resources %s: %s", groupVersion, err)
|
||||
}
|
||||
for _, resourceName := range resourceNames {
|
||||
groupVersionResource := groupVersion.WithResource(resourceName)
|
||||
if !isResourceExists(apiResourceList.APIResources, groupVersionResource) {
|
||||
klog.Warningf("resource %s not exists in the cluster", groupVersionResource)
|
||||
} else {
|
||||
// reflect.ValueOf(sharedInformerFactory).MethodByName("ForResource").Call([]reflect.Value{reflect.ValueOf(groupVersionResource)})
|
||||
if _, err = informerForResourceFunc(groupVersionResource); err != nil {
|
||||
return fmt.Errorf("failed to create informer for %s: %s", groupVersionResource, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
sharedInformerFactory.Start(stopCh)
|
||||
sharedInformerFactory.WaitForCacheSync(stopCh)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *APIServer) waitForResourceSync(ctx context.Context) error {
|
||||
klog.V(0).Info("Start cache objects")
|
||||
|
||||
stopCh := ctx.Done()
|
||||
isResourceExists := func(apiResources []v1.APIResource, resource schema.GroupVersionResource) bool {
|
||||
for _, apiResource := range apiResources {
|
||||
if apiResource.Name == resource.Resource {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
type informerForResourceFunc func(resource schema.GroupVersionResource) (interface{}, error)
|
||||
waitForResourceSync := func(sharedInformerFactory informers.GenericInformerFactory, informerForResourceFunc informerForResourceFunc, GVRs map[schema.GroupVersion][]string) error {
|
||||
for groupVersion, resourceNames := range GVRs {
|
||||
var apiResourceList *v1.APIResourceList
|
||||
var err error
|
||||
err = retry.OnError(retry.DefaultRetry, func(err error) bool {
|
||||
return err != nil
|
||||
}, func() error {
|
||||
apiResourceList, err = s.KubernetesClient.Kubernetes().Discovery().ServerResourcesForGroupVersion(groupVersion.String())
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to fetch group version resources: %s", err)
|
||||
}
|
||||
for _, resourceName := range resourceNames {
|
||||
groupVersionResource := groupVersion.WithResource(resourceName)
|
||||
if !isResourceExists(apiResourceList.APIResources, groupVersionResource) {
|
||||
klog.Warningf("resource %s not exists in the cluster", groupVersionResource)
|
||||
} else {
|
||||
// reflect.ValueOf(sharedInformerFactory).MethodByName("ForResource").Call([]reflect.Value{reflect.ValueOf(groupVersionResource)})
|
||||
if _, err = informerForResourceFunc(groupVersionResource); err != nil {
|
||||
return fmt.Errorf("failed to create informer for %s: %s", groupVersionResource, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
sharedInformerFactory.Start(stopCh)
|
||||
sharedInformerFactory.WaitForCacheSync(stopCh)
|
||||
return nil
|
||||
}
|
||||
|
||||
// resources we have to create informer first
|
||||
k8sGVRs := map[schema.GroupVersion][]string{
|
||||
{Group: "", Version: "v1"}: {
|
||||
@@ -442,6 +446,8 @@ func (s *APIServer) waitForResourceSync(ctx context.Context) error {
|
||||
},
|
||||
{Group: "batch", Version: "v1"}: {
|
||||
"jobs",
|
||||
},
|
||||
{Group: "batch", Version: "v1beta1"}: {
|
||||
"cronjobs",
|
||||
},
|
||||
{Group: "networking.k8s.io", Version: "v1"}: {
|
||||
@@ -453,9 +459,12 @@ func (s *APIServer) waitForResourceSync(ctx context.Context) error {
|
||||
},
|
||||
}
|
||||
|
||||
if err := waitForResourceSync(s.InformerFactory.KubernetesSharedInformerFactory(), func(resource schema.GroupVersionResource) (interface{}, error) {
|
||||
return s.InformerFactory.KubernetesSharedInformerFactory().ForResource(resource)
|
||||
}, k8sGVRs); err != nil {
|
||||
if err := waitForCacheSync(s.KubernetesClient.Kubernetes().Discovery(),
|
||||
s.InformerFactory.KubernetesSharedInformerFactory(),
|
||||
func(resource schema.GroupVersionResource) (interface{}, error) {
|
||||
return s.InformerFactory.KubernetesSharedInformerFactory().ForResource(resource)
|
||||
},
|
||||
k8sGVRs, stopCh); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -490,13 +499,13 @@ func (s *APIServer) waitForResourceSync(ctx context.Context) error {
|
||||
|
||||
// skip caching devops resources if devops not enabled
|
||||
if s.DevopsClient != nil {
|
||||
ksGVRs[schema.GroupVersion{Group: "devops.kubesphere.io", Version: "v1alpah1"}] = []string{
|
||||
ksGVRs[schema.GroupVersion{Group: "devops.kubesphere.io", Version: "v1alpha1"}] = []string{
|
||||
"s2ibinaries",
|
||||
"s2ibuildertemplates",
|
||||
"s2iruns",
|
||||
"s2ibuilders",
|
||||
}
|
||||
ksGVRs[schema.GroupVersion{Group: "devops.kubesphere.io", Version: "v1alpah3"}] = []string{
|
||||
ksGVRs[schema.GroupVersion{Group: "devops.kubesphere.io", Version: "v1alpha3"}] = []string{
|
||||
"devopsprojects",
|
||||
"pipelines",
|
||||
}
|
||||
@@ -528,9 +537,12 @@ func (s *APIServer) waitForResourceSync(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
if err := waitForResourceSync(s.InformerFactory.KubeSphereSharedInformerFactory(), func(resource schema.GroupVersionResource) (interface{}, error) {
|
||||
return s.InformerFactory.KubeSphereSharedInformerFactory().ForResource(resource)
|
||||
}, ksGVRs); err != nil {
|
||||
if err := waitForCacheSync(s.KubernetesClient.Kubernetes().Discovery(),
|
||||
s.InformerFactory.KubeSphereSharedInformerFactory(),
|
||||
func(resource schema.GroupVersionResource) (interface{}, error) {
|
||||
return s.InformerFactory.KubeSphereSharedInformerFactory().ForResource(resource)
|
||||
},
|
||||
ksGVRs, stopCh); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -542,9 +554,11 @@ func (s *APIServer) waitForResourceSync(ctx context.Context) error {
|
||||
},
|
||||
}
|
||||
|
||||
if err := waitForResourceSync(s.InformerFactory.SnapshotSharedInformerFactory(), func(resource schema.GroupVersionResource) (interface{}, error) {
|
||||
return s.InformerFactory.SnapshotSharedInformerFactory().ForResource(resource)
|
||||
}, snapshotGVRs); err != nil {
|
||||
if err := waitForCacheSync(s.KubernetesClient.Kubernetes().Discovery(),
|
||||
s.InformerFactory.SnapshotSharedInformerFactory(), func(resource schema.GroupVersionResource) (interface{}, error) {
|
||||
return s.InformerFactory.SnapshotSharedInformerFactory().ForResource(resource)
|
||||
},
|
||||
snapshotGVRs, stopCh); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -554,9 +568,11 @@ func (s *APIServer) waitForResourceSync(ctx context.Context) error {
|
||||
},
|
||||
}
|
||||
|
||||
if err := waitForResourceSync(s.InformerFactory.ApiExtensionSharedInformerFactory(), func(resource schema.GroupVersionResource) (interface{}, error) {
|
||||
return s.InformerFactory.ApiExtensionSharedInformerFactory().ForResource(resource)
|
||||
}, apiextensionsGVRs); err != nil {
|
||||
if err := waitForCacheSync(s.KubernetesClient.Kubernetes().Discovery(),
|
||||
s.InformerFactory.ApiExtensionSharedInformerFactory(), func(resource schema.GroupVersionResource) (interface{}, error) {
|
||||
return s.InformerFactory.ApiExtensionSharedInformerFactory().ForResource(resource)
|
||||
},
|
||||
apiextensionsGVRs, stopCh); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -568,9 +584,11 @@ func (s *APIServer) waitForResourceSync(ctx context.Context) error {
|
||||
"thanosrulers",
|
||||
},
|
||||
}
|
||||
if err := waitForResourceSync(promFactory, func(resource schema.GroupVersionResource) (interface{}, error) {
|
||||
return promFactory.ForResource(resource)
|
||||
}, prometheusGVRs); err != nil {
|
||||
if err := waitForCacheSync(s.KubernetesClient.Kubernetes().Discovery(),
|
||||
promFactory, func(resource schema.GroupVersionResource) (interface{}, error) {
|
||||
return promFactory.ForResource(resource)
|
||||
},
|
||||
prometheusGVRs, stopCh); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user