Fix crash caused by resouce discovery failed

This commit is contained in:
hongming
2022-04-28 18:55:47 +08:00
parent ef5fcbd9ce
commit 7603c74ebb
3 changed files with 186 additions and 188 deletions

View File

@@ -26,6 +26,9 @@ import (
"sync"
"time"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
"github.com/emicklei/go-restful"
extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -254,7 +257,7 @@ func (s *APIServer) installKubeSphereAPIs(stopCh <-chan struct{}) {
urlruntime.Must(alertingv1.AddToContainer(s.container, s.Config.AlertingOptions.Endpoint))
urlruntime.Must(alertingv2alpha1.AddToContainer(s.container, s.InformerFactory,
s.KubernetesClient.Prometheus(), s.AlertingClient, s.Config.AlertingOptions))
urlruntime.Must(version.AddToContainer(s.container, s.KubernetesClient.Discovery()))
urlruntime.Must(version.AddToContainer(s.container, s.KubernetesClient.Kubernetes().Discovery()))
urlruntime.Must(kubeedgev1alpha1.AddToContainer(s.container, s.Config.KubeEdgeOptions.Endpoint))
urlruntime.Must(edgeruntimev1alpha1.AddToContainer(s.container, s.Config.EdgeRuntimeOptions.Endpoint))
urlruntime.Must(notificationkapisv2beta1.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.Kubernetes(),
@@ -367,211 +370,215 @@ func (s *APIServer) waitForResourceSync(ctx context.Context) error {
klog.V(0).Info("Start cache objects")
stopCh := ctx.Done()
discoveryClient := s.KubernetesClient.Kubernetes().Discovery()
_, apiResourcesList, err := discoveryClient.ServerGroupsAndResources()
if err != nil {
return err
}
isResourceExists := func(resource schema.GroupVersionResource) bool {
for _, apiResource := range apiResourcesList {
if apiResource.GroupVersion == resource.GroupVersion().String() {
for _, rsc := range apiResource.APIResources {
if rsc.Name == resource.Resource {
return true
}
}
isResourceExists := func(apiResources []v1.APIResource, resource schema.GroupVersionResource) bool {
for _, apiResource := range apiResources {
if apiResource.Name == resource.Resource {
return true
}
}
return false
}
// resources we have to create informer first
k8sGVRs := []schema.GroupVersionResource{
{Group: "", Version: "v1", Resource: "namespaces"},
{Group: "", Version: "v1", Resource: "nodes"},
{Group: "", Version: "v1", Resource: "resourcequotas"},
{Group: "", Version: "v1", Resource: "pods"},
{Group: "", Version: "v1", Resource: "services"},
{Group: "", Version: "v1", Resource: "persistentvolumeclaims"},
{Group: "", Version: "v1", Resource: "persistentvolumes"},
{Group: "", Version: "v1", Resource: "secrets"},
{Group: "", Version: "v1", Resource: "configmaps"},
{Group: "", Version: "v1", Resource: "serviceaccounts"},
{Group: "rbac.authorization.k8s.io", Version: "v1", Resource: "roles"},
{Group: "rbac.authorization.k8s.io", Version: "v1", Resource: "rolebindings"},
{Group: "rbac.authorization.k8s.io", Version: "v1", Resource: "clusterroles"},
{Group: "rbac.authorization.k8s.io", Version: "v1", Resource: "clusterrolebindings"},
{Group: "apps", Version: "v1", Resource: "deployments"},
{Group: "apps", Version: "v1", Resource: "daemonsets"},
{Group: "apps", Version: "v1", Resource: "replicasets"},
{Group: "apps", Version: "v1", Resource: "statefulsets"},
{Group: "apps", Version: "v1", Resource: "controllerrevisions"},
{Group: "storage.k8s.io", Version: "v1", Resource: "storageclasses"},
{Group: "batch", Version: "v1", Resource: "jobs"},
{Group: "batch", Version: "v1beta1", Resource: "cronjobs"},
{Group: "networking.k8s.io", Version: "v1", Resource: "ingresses"},
{Group: "autoscaling", Version: "v2beta2", Resource: "horizontalpodautoscalers"},
{Group: "networking.k8s.io", Version: "v1", Resource: "networkpolicies"},
}
for _, gvr := range k8sGVRs {
if !isResourceExists(gvr) {
klog.Warningf("resource %s not exists in the cluster", gvr)
} else {
_, err := s.InformerFactory.KubernetesSharedInformerFactory().ForResource(gvr)
if err != nil {
klog.Errorf("cannot create informer for %s", gvr)
type informerForResourceFunc func(resource schema.GroupVersionResource) (interface{}, error)
waitForResourceSync := func(sharedInformerFactory informers.GenericInformerFactory, informerForResourceFunc informerForResourceFunc, GVRs map[schema.GroupVersion][]string) error {
for groupVersion, resourceNames := range GVRs {
var apiResourceList *v1.APIResourceList
var err error
err = retry.OnError(retry.DefaultRetry, func(err error) bool {
return err != nil
}, func() error {
apiResourceList, err = s.KubernetesClient.Kubernetes().Discovery().ServerResourcesForGroupVersion(groupVersion.String())
return err
})
if err != nil {
return fmt.Errorf("failed to fetch group version resources: %s", err)
}
for _, resourceName := range resourceNames {
groupVersionResource := groupVersion.WithResource(resourceName)
if !isResourceExists(apiResourceList.APIResources, groupVersionResource) {
klog.Warningf("resource %s not exists in the cluster", groupVersionResource)
} else {
// reflect.ValueOf(sharedInformerFactory).MethodByName("ForResource").Call([]reflect.Value{reflect.ValueOf(groupVersionResource)})
if _, err = informerForResourceFunc(groupVersionResource); err != nil {
return fmt.Errorf("failed to create informer for %s: %s", groupVersionResource, err)
}
}
}
}
sharedInformerFactory.Start(stopCh)
sharedInformerFactory.WaitForCacheSync(stopCh)
return nil
}
s.InformerFactory.KubernetesSharedInformerFactory().Start(stopCh)
s.InformerFactory.KubernetesSharedInformerFactory().WaitForCacheSync(stopCh)
ksInformerFactory := s.InformerFactory.KubeSphereSharedInformerFactory()
ksGVRs := []schema.GroupVersionResource{
{Group: "tenant.kubesphere.io", Version: "v1alpha1", Resource: "workspaces"},
{Group: "tenant.kubesphere.io", Version: "v1alpha2", Resource: "workspacetemplates"},
{Group: "iam.kubesphere.io", Version: "v1alpha2", Resource: "users"},
{Group: "iam.kubesphere.io", Version: "v1alpha2", Resource: "globalroles"},
{Group: "iam.kubesphere.io", Version: "v1alpha2", Resource: "globalrolebindings"},
{Group: "iam.kubesphere.io", Version: "v1alpha2", Resource: "groups"},
{Group: "iam.kubesphere.io", Version: "v1alpha2", Resource: "groupbindings"},
{Group: "iam.kubesphere.io", Version: "v1alpha2", Resource: "workspaceroles"},
{Group: "iam.kubesphere.io", Version: "v1alpha2", Resource: "workspacerolebindings"},
{Group: "iam.kubesphere.io", Version: "v1alpha2", Resource: "loginrecords"},
{Group: "cluster.kubesphere.io", Version: "v1alpha1", Resource: "clusters"},
{Group: "network.kubesphere.io", Version: "v1alpha1", Resource: "ippools"},
{Group: "notification.kubesphere.io", Version: "v2beta1", Resource: notificationv2beta1.ResourcesPluralConfig},
{Group: "notification.kubesphere.io", Version: "v2beta1", Resource: notificationv2beta1.ResourcesPluralReceiver},
// resources we have to create informer first
k8sGVRs := map[schema.GroupVersion][]string{
{Group: "", Version: "v1"}: {
"namespaces",
"nodes",
"resourcequotas",
"pods",
"services",
"persistentvolumeclaims",
"persistentvolumes",
"secrets",
"configmaps",
"serviceaccounts",
},
{Group: "rbac.authorization.k8s.io", Version: "v1"}: {
"roles",
"rolebindings",
"clusterroles",
"clusterrolebindings",
},
{Group: "apps", Version: "v1"}: {
"deployments",
"daemonsets",
"replicasets",
"statefulsets",
"controllerrevisions",
},
{Group: "storage.k8s.io", Version: "v1"}: {
"storageclasses",
},
{Group: "batch", Version: "v1"}: {
"jobs",
"cronjobs",
},
{Group: "networking.k8s.io", Version: "v1"}: {
"ingresses",
"networkpolicies",
},
{Group: "autoscaling", Version: "v2beta2"}: {
"horizontalpodautoscalers",
},
}
devopsGVRs := []schema.GroupVersionResource{
{Group: "devops.kubesphere.io", Version: "v1alpha1", Resource: "s2ibinaries"},
{Group: "devops.kubesphere.io", Version: "v1alpha1", Resource: "s2ibuildertemplates"},
{Group: "devops.kubesphere.io", Version: "v1alpha1", Resource: "s2iruns"},
{Group: "devops.kubesphere.io", Version: "v1alpha1", Resource: "s2ibuilders"},
{Group: "devops.kubesphere.io", Version: "v1alpha3", Resource: "devopsprojects"},
{Group: "devops.kubesphere.io", Version: "v1alpha3", Resource: "pipelines"},
if err := waitForResourceSync(s.InformerFactory.KubernetesSharedInformerFactory(), func(resource schema.GroupVersionResource) (interface{}, error) {
return s.InformerFactory.KubernetesSharedInformerFactory().ForResource(resource)
}, k8sGVRs); err != nil {
return err
}
servicemeshGVRs := []schema.GroupVersionResource{
{Group: "servicemesh.kubesphere.io", Version: "v1alpha2", Resource: "strategies"},
{Group: "servicemesh.kubesphere.io", Version: "v1alpha2", Resource: "servicepolicies"},
}
// federated resources on cached in multi cluster setup
federatedResourceGVRs := []schema.GroupVersionResource{
typesv1beta1.SchemeGroupVersion.WithResource(typesv1beta1.ResourcePluralFederatedClusterRole),
typesv1beta1.SchemeGroupVersion.WithResource(typesv1beta1.ResourcePluralFederatedClusterRoleBindingBinding),
typesv1beta1.SchemeGroupVersion.WithResource(typesv1beta1.ResourcePluralFederatedNamespace),
typesv1beta1.SchemeGroupVersion.WithResource(typesv1beta1.ResourcePluralFederatedService),
typesv1beta1.SchemeGroupVersion.WithResource(typesv1beta1.ResourcePluralFederatedDeployment),
typesv1beta1.SchemeGroupVersion.WithResource(typesv1beta1.ResourcePluralFederatedSecret),
typesv1beta1.SchemeGroupVersion.WithResource(typesv1beta1.ResourcePluralFederatedConfigmap),
typesv1beta1.SchemeGroupVersion.WithResource(typesv1beta1.ResourcePluralFederatedStatefulSet),
typesv1beta1.SchemeGroupVersion.WithResource(typesv1beta1.ResourcePluralFederatedIngress),
typesv1beta1.SchemeGroupVersion.WithResource(typesv1beta1.ResourcePluralFederatedResourceQuota),
typesv1beta1.SchemeGroupVersion.WithResource(typesv1beta1.ResourcePluralFederatedPersistentVolumeClaim),
typesv1beta1.SchemeGroupVersion.WithResource(typesv1beta1.ResourcePluralFederatedApplication),
ksGVRs := map[schema.GroupVersion][]string{
{Group: "tenant.kubesphere.io", Version: "v1alpha1"}: {
"workspaces",
},
{Group: "tenant.kubesphere.io", Version: "v1alpha2"}: {
"workspacetemplates",
},
{Group: "iam.kubesphere.io", Version: "v1alpha2"}: {
"users",
"globalroles",
"globalrolebindings",
"groups",
"groupbindings",
"workspaceroles",
"workspacerolebindings",
"loginrecords",
},
{Group: "cluster.kubesphere.io", Version: "v1alpha1"}: {
"clusters",
},
{Group: "network.kubesphere.io", Version: "v1alpha1"}: {
"ippools",
},
{Group: "notification.kubesphere.io", Version: "v2beta1"}: {
notificationv2beta1.ResourcesPluralConfig,
notificationv2beta1.ResourcesPluralReceiver,
},
}
// skip caching devops resources if devops not enabled
if s.DevopsClient != nil {
ksGVRs = append(ksGVRs, devopsGVRs...)
ksGVRs[schema.GroupVersion{Group: "devops.kubesphere.io", Version: "v1alpah1"}] = []string{
"s2ibinaries",
"s2ibuildertemplates",
"s2iruns",
"s2ibuilders",
}
ksGVRs[schema.GroupVersion{Group: "devops.kubesphere.io", Version: "v1alpah3"}] = []string{
"devopsprojects",
"pipelines",
}
}
// skip caching servicemesh resources if servicemesh not enabled
if s.KubernetesClient.Istio() != nil {
ksGVRs = append(ksGVRs, servicemeshGVRs...)
ksGVRs[schema.GroupVersion{Group: "servicemesh.kubesphere.io", Version: "v1alpha2"}] = []string{
"strategies",
"servicepolicies",
}
}
// federated resources on cached in multi cluster setup
if s.Config.MultiClusterOptions.Enable {
ksGVRs = append(ksGVRs, federatedResourceGVRs...)
}
for _, gvr := range ksGVRs {
if !isResourceExists(gvr) {
klog.Warningf("resource %s not exists in the cluster", gvr)
} else {
_, err = ksInformerFactory.ForResource(gvr)
if err != nil {
return err
}
ksGVRs[typesv1beta1.SchemeGroupVersion] = []string{
typesv1beta1.ResourcePluralFederatedClusterRole,
typesv1beta1.ResourcePluralFederatedClusterRoleBindingBinding,
typesv1beta1.ResourcePluralFederatedNamespace,
typesv1beta1.ResourcePluralFederatedService,
typesv1beta1.ResourcePluralFederatedDeployment,
typesv1beta1.ResourcePluralFederatedSecret,
typesv1beta1.ResourcePluralFederatedConfigmap,
typesv1beta1.ResourcePluralFederatedStatefulSet,
typesv1beta1.ResourcePluralFederatedIngress,
typesv1beta1.ResourcePluralFederatedResourceQuota,
typesv1beta1.ResourcePluralFederatedPersistentVolumeClaim,
typesv1beta1.ResourcePluralFederatedApplication,
}
}
ksInformerFactory.Start(stopCh)
ksInformerFactory.WaitForCacheSync(stopCh)
snapshotInformerFactory := s.InformerFactory.SnapshotSharedInformerFactory()
snapshotGVRs := []schema.GroupVersionResource{
{Group: "snapshot.storage.k8s.io", Version: "v1", Resource: "volumesnapshotclasses"},
{Group: "snapshot.storage.k8s.io", Version: "v1", Resource: "volumesnapshots"},
{Group: "snapshot.storage.k8s.io", Version: "v1", Resource: "volumesnapshotcontents"},
}
for _, gvr := range snapshotGVRs {
if !isResourceExists(gvr) {
klog.Warningf("resource %s not exists in the cluster", gvr)
} else {
_, err = snapshotInformerFactory.ForResource(gvr)
if err != nil {
return err
}
}
}
snapshotInformerFactory.Start(stopCh)
snapshotInformerFactory.WaitForCacheSync(stopCh)
apiextensionsInformerFactory := s.InformerFactory.ApiExtensionSharedInformerFactory()
apiextensionsGVRs := []schema.GroupVersionResource{
{Group: "apiextensions.k8s.io", Version: "v1", Resource: "customresourcedefinitions"},
if err := waitForResourceSync(s.InformerFactory.KubeSphereSharedInformerFactory(), func(resource schema.GroupVersionResource) (interface{}, error) {
return s.InformerFactory.KubeSphereSharedInformerFactory().ForResource(resource)
}, ksGVRs); err != nil {
return err
}
for _, gvr := range apiextensionsGVRs {
if !isResourceExists(gvr) {
klog.Warningf("resource %s not exists in the cluster", gvr)
} else {
_, err = apiextensionsInformerFactory.ForResource(gvr)
if err != nil {
return err
}
}
snapshotGVRs := map[schema.GroupVersion][]string{
{Group: "snapshot.storage.k8s.io", Version: "v1"}: {
"volumesnapshots",
"volumesnapshotcontents",
"volumesnapshotclasses",
},
}
if err := waitForResourceSync(s.InformerFactory.SnapshotSharedInformerFactory(), func(resource schema.GroupVersionResource) (interface{}, error) {
return s.InformerFactory.SnapshotSharedInformerFactory().ForResource(resource)
}, snapshotGVRs); err != nil {
return err
}
apiextensionsGVRs := map[schema.GroupVersion][]string{
{Group: "apiextensions.k8s.io", Version: "v1"}: {
"customresourcedefinitions",
},
}
if err := waitForResourceSync(s.InformerFactory.ApiExtensionSharedInformerFactory(), func(resource schema.GroupVersionResource) (interface{}, error) {
return s.InformerFactory.ApiExtensionSharedInformerFactory().ForResource(resource)
}, apiextensionsGVRs); err != nil {
return err
}
apiextensionsInformerFactory.Start(stopCh)
apiextensionsInformerFactory.WaitForCacheSync(stopCh)
if promFactory := s.InformerFactory.PrometheusSharedInformerFactory(); promFactory != nil {
prometheusGVRs := []schema.GroupVersionResource{
{Group: "monitoring.coreos.com", Version: "v1", Resource: "prometheuses"},
{Group: "monitoring.coreos.com", Version: "v1", Resource: "prometheusrules"},
{Group: "monitoring.coreos.com", Version: "v1", Resource: "thanosrulers"},
prometheusGVRs := map[schema.GroupVersion][]string{
{Group: "monitoring.coreos.com", Version: "v1"}: {
"prometheuses",
"prometheusrules",
"thanosrulers",
},
}
for _, gvr := range prometheusGVRs {
if isResourceExists(gvr) {
_, err = promFactory.ForResource(gvr)
if err != nil {
return err
}
} else {
klog.Warningf("resource %s not exists in the cluster", gvr)
}
if err := waitForResourceSync(promFactory, func(resource schema.GroupVersionResource) (interface{}, error) {
return promFactory.ForResource(resource)
}, prometheusGVRs); err != nil {
return err
}
promFactory.Start(stopCh)
promFactory.WaitForCacheSync(stopCh)
}
// controller runtime cache for resources
go s.RuntimeCache.Start(ctx)
s.RuntimeCache.WaitForCacheSync(ctx)
klog.V(0).Info("Finished caching objects")
return nil
}

View File

@@ -17,6 +17,7 @@ limitations under the License.
package informers
import (
"reflect"
"time"
snapshotclient "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned"
@@ -51,6 +52,11 @@ type InformerFactory interface {
Start(stopCh <-chan struct{})
}
type GenericInformerFactory interface {
Start(stopCh <-chan struct{})
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
}
type informerFactories struct {
informerFactory k8sinformers.SharedInformerFactory
ksInformerFactory ksinformers.SharedInformerFactory

View File

@@ -23,7 +23,6 @@ import (
promresourcesclient "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned"
istioclient "istio.io/client-go/pkg/clientset/versioned"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/client-go/discovery"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
@@ -37,7 +36,6 @@ type Client interface {
Istio() istioclient.Interface
Snapshot() snapshotclient.Interface
ApiExtensions() apiextensionsclient.Interface
Discovery() discovery.DiscoveryInterface
Prometheus() promresourcesclient.Interface
Master() string
Config() *rest.Config
@@ -47,9 +45,6 @@ type kubernetesClient struct {
// kubernetes client interface
k8s kubernetes.Interface
// discovery client
discoveryClient *discovery.DiscoveryClient
// generated clientset
ks kubesphere.Interface
@@ -77,15 +72,14 @@ func NewKubernetesClientOrDie(options *KubernetesOptions) Client {
config.Burst = options.Burst
k := &kubernetesClient{
k8s: kubernetes.NewForConfigOrDie(config),
discoveryClient: discovery.NewDiscoveryClientForConfigOrDie(config),
ks: kubesphere.NewForConfigOrDie(config),
istio: istioclient.NewForConfigOrDie(config),
snapshot: snapshotclient.NewForConfigOrDie(config),
apiextensions: apiextensionsclient.NewForConfigOrDie(config),
prometheus: promresourcesclient.NewForConfigOrDie(config),
master: config.Host,
config: config,
k8s: kubernetes.NewForConfigOrDie(config),
ks: kubesphere.NewForConfigOrDie(config),
istio: istioclient.NewForConfigOrDie(config),
snapshot: snapshotclient.NewForConfigOrDie(config),
apiextensions: apiextensionsclient.NewForConfigOrDie(config),
prometheus: promresourcesclient.NewForConfigOrDie(config),
master: config.Host,
config: config,
}
if options.Master != "" {
@@ -116,11 +110,6 @@ func NewKubernetesClient(options *KubernetesOptions) (Client, error) {
return nil, err
}
k.discoveryClient, err = discovery.NewDiscoveryClientForConfig(config)
if err != nil {
return nil, err
}
k.ks, err = kubesphere.NewForConfig(config)
if err != nil {
return nil, err
@@ -157,10 +146,6 @@ func (k *kubernetesClient) Kubernetes() kubernetes.Interface {
return k.k8s
}
func (k *kubernetesClient) Discovery() discovery.DiscoveryInterface {
return k.discoveryClient
}
func (k *kubernetesClient) KubeSphere() kubesphere.Interface {
return k.ks
}