Merge remote-tracking branch 'upstream/master'
# Conflicts: # cmd/ks-apiserver/app/server.go
This commit is contained in:
87
cmd/controller-manager/app/controllers.go
Normal file
87
cmd/controller-manager/app/controllers.go
Normal file
@@ -0,0 +1,87 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/rest"
|
||||
"kubesphere.io/kubesphere/pkg/controller/destinationrule"
|
||||
"kubesphere.io/kubesphere/pkg/controller/virtualservice"
|
||||
"kubesphere.io/kubesphere/pkg/simple/controller/namespace"
|
||||
"sigs.k8s.io/controller-runtime/pkg/manager"
|
||||
"time"
|
||||
|
||||
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
|
||||
|
||||
istioclientset "github.com/knative/pkg/client/clientset/versioned"
|
||||
istioinformers "github.com/knative/pkg/client/informers/externalversions"
|
||||
servicemeshclientset "kubesphere.io/kubesphere/pkg/client/clientset/versioned"
|
||||
servicemeshinformers "kubesphere.io/kubesphere/pkg/client/informers/externalversions"
|
||||
)
|
||||
|
||||
const defaultResync = 600 * time.Second
|
||||
|
||||
var log = logf.Log.WithName("controller-manager")
|
||||
|
||||
func AddControllers(mgr manager.Manager, cfg *rest.Config, stopCh <-chan struct{}) error {
|
||||
|
||||
kubeClient, err := kubernetes.NewForConfig(cfg)
|
||||
if err != nil {
|
||||
log.Error(err, "building kubernetes client failed")
|
||||
}
|
||||
|
||||
istioclient, err := istioclientset.NewForConfig(cfg)
|
||||
if err != nil {
|
||||
log.Error(err, "create istio client failed")
|
||||
return err
|
||||
}
|
||||
|
||||
informerFactory := informers.NewSharedInformerFactory(kubeClient, defaultResync)
|
||||
istioInformer := istioinformers.NewSharedInformerFactory(istioclient, defaultResync)
|
||||
|
||||
servicemeshclient, err := servicemeshclientset.NewForConfig(cfg)
|
||||
if err != nil {
|
||||
log.Error(err, "create servicemesh client failed")
|
||||
return err
|
||||
}
|
||||
|
||||
servicemeshinformer := servicemeshinformers.NewSharedInformerFactory(servicemeshclient, defaultResync)
|
||||
|
||||
vsController := virtualservice.NewVirtualServiceController(informerFactory.Core().V1().Services(),
|
||||
istioInformer.Networking().V1alpha3().VirtualServices(),
|
||||
istioInformer.Networking().V1alpha3().DestinationRules(),
|
||||
servicemeshinformer.Servicemesh().V1alpha2().Strategies(),
|
||||
kubeClient,
|
||||
istioclient)
|
||||
|
||||
drController := destinationrule.NewDestinationRuleController(informerFactory.Apps().V1().Deployments(),
|
||||
istioInformer.Networking().V1alpha3().DestinationRules(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
servicemeshinformer.Servicemesh().V1alpha2().ServicePolicies(),
|
||||
kubeClient,
|
||||
istioclient)
|
||||
|
||||
nsController := namespace.NewNamespaceController(kubeClient,
|
||||
informerFactory.Core().V1().Namespaces(),
|
||||
informerFactory.Rbac().V1().Roles(),
|
||||
)
|
||||
|
||||
servicemeshinformer.Start(stopCh)
|
||||
istioInformer.Start(stopCh)
|
||||
informerFactory.Start(stopCh)
|
||||
|
||||
controllers := map[string]manager.Runnable{
|
||||
"virtualservice-controller": vsController,
|
||||
"destinationrule-controller": drController,
|
||||
"namespace-controller": nsController,
|
||||
}
|
||||
|
||||
for name, ctrl := range controllers {
|
||||
err = mgr.Add(ctrl)
|
||||
if err != nil {
|
||||
log.Error(err, "add controller to manager failed", "name", name)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
38
cmd/controller-manager/app/helper.go
Normal file
38
cmd/controller-manager/app/helper.go
Normal file
@@ -0,0 +1,38 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/klog"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
// WaitForAPIServer waits for the API Server's /healthz endpoint to report "ok" with timeout.
|
||||
func WaitForAPIServer(client clientset.Interface, timeout time.Duration) error {
|
||||
var lastErr error
|
||||
|
||||
err := wait.PollImmediate(time.Second, timeout, func() (bool, error) {
|
||||
healthStatus := 0
|
||||
result := client.Discovery().RESTClient().Get().AbsPath("/healthz").Do().StatusCode(&healthStatus)
|
||||
if result.Error() != nil {
|
||||
lastErr = fmt.Errorf("failed to get apiserver /healthz status: %v", result.Error())
|
||||
return false, nil
|
||||
}
|
||||
if healthStatus != http.StatusOK {
|
||||
content, _ := result.Raw()
|
||||
lastErr = fmt.Errorf("APIServer isn't healthy: %v", string(content))
|
||||
klog.Warningf("APIServer isn't healthy yet: %v. Waiting a little while.", string(content))
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return true, nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("%v: %v", err, lastErr)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -20,39 +20,42 @@ package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"kubesphere.io/kubesphere/cmd/controller-manager/app"
|
||||
"kubesphere.io/kubesphere/pkg/apis"
|
||||
"kubesphere.io/kubesphere/pkg/controller"
|
||||
"os"
|
||||
"sigs.k8s.io/application/pkg/apis/app/v1beta1"
|
||||
|
||||
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
|
||||
"kubesphere.io/kubesphere/pkg/informers"
|
||||
"kubesphere.io/kubesphere/pkg/simple/client/k8s"
|
||||
"kubesphere.io/kubesphere/pkg/simple/controller/namespace"
|
||||
"sigs.k8s.io/controller-runtime/pkg/manager"
|
||||
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
|
||||
"sigs.k8s.io/controller-runtime/pkg/runtime/signals"
|
||||
|
||||
"kubesphere.io/kubesphere/pkg/apis"
|
||||
"kubesphere.io/kubesphere/pkg/controller"
|
||||
"kubesphere.io/kubesphere/pkg/webhook"
|
||||
)
|
||||
|
||||
func main() {
|
||||
var metricsAddr string
|
||||
flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.")
|
||||
flag.Parse()
|
||||
logf.SetLogger(logf.ZapLogger(false))
|
||||
log := logf.Log.WithName("entrypoint")
|
||||
var (
|
||||
masterURL string
|
||||
kubeconfig string
|
||||
metricsAddr string
|
||||
)
|
||||
|
||||
// Get a config to talk to the apiserver
|
||||
log.Info("setting up client for manager")
|
||||
cfg, err := k8s.Config()
|
||||
func init() {
|
||||
flag.StringVar(&masterURL, "master-url", "", "only need if out of cluster")
|
||||
flag.StringVar(&kubeconfig, "kubeconfig", "", "only need if out of cluster")
|
||||
flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.")
|
||||
}
|
||||
|
||||
func main() {
|
||||
flag.Parse()
|
||||
|
||||
logf.SetLogger(logf.ZapLogger(false))
|
||||
log := logf.Log.WithName("controller-manager")
|
||||
|
||||
cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
|
||||
if err != nil {
|
||||
log.Error(err, "unable to set up client config")
|
||||
log.Error(err, "failed to build kubeconfig")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Create a new Cmd to provide shared dependencies and start components
|
||||
stopCh := signals.SetupSignalHandler()
|
||||
|
||||
log.Info("setting up manager")
|
||||
mgr, err := manager.New(cfg, manager.Options{})
|
||||
if err != nil {
|
||||
@@ -60,53 +63,27 @@ func main() {
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
log.Info("Registering Components.")
|
||||
|
||||
// Setup Scheme for all resources
|
||||
log.Info("setting up scheme")
|
||||
if err := apis.AddToScheme(mgr.GetScheme()); err != nil {
|
||||
log.Error(err, "unable add APIs to scheme")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
log.Info("Print all known types")
|
||||
for k, v := range mgr.GetScheme().AllKnownTypes() {
|
||||
if k.Group == v1beta1.SchemeGroupVersion.Group {
|
||||
log.Info(k.String() + " / " + v.String())
|
||||
}
|
||||
}
|
||||
|
||||
// Setup all Controllers
|
||||
log.Info("Setting up controller")
|
||||
log.Info("Setting up controllers")
|
||||
if err := controller.AddToManager(mgr); err != nil {
|
||||
log.Error(err, "unable to register controllers to the manager")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
log.Info("setting up webhooks")
|
||||
if err := webhook.AddToManager(mgr); err != nil {
|
||||
log.Error(err, "unable to register webhooks to the manager")
|
||||
if err := app.AddControllers(mgr, cfg, stopCh); err != nil {
|
||||
log.Error(err, "unable to register controllers to the manager")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
err = mgr.Add(manager.RunnableFunc(func(s <-chan struct{}) error {
|
||||
informerFactory := informers.SharedInformerFactory()
|
||||
informerFactory.Start(s)
|
||||
namespace.NewNamespaceController(k8s.Client(),
|
||||
informerFactory.Core().V1().Namespaces(),
|
||||
informerFactory.Rbac().V1().Roles()).Start(s)
|
||||
return nil
|
||||
}))
|
||||
|
||||
if err != nil {
|
||||
log.Error(err, "error Adding controllers to the Manager")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Start the Cmd
|
||||
log.Info("Starting the Cmd.")
|
||||
if err := mgr.Start(signals.SetupSignalHandler()); err != nil {
|
||||
if err := mgr.Start(stopCh); err != nil {
|
||||
log.Error(err, "unable to run the manager")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ import (
|
||||
"kubesphere.io/kubesphere/cmd/ks-apiserver/app"
|
||||
"log"
|
||||
// Install apis
|
||||
_ "kubesphere.io/kubesphere/pkg/apis/logging/install"
|
||||
_ "kubesphere.io/kubesphere/pkg/apis/metrics/install"
|
||||
_ "kubesphere.io/kubesphere/pkg/apis/monitoring/install"
|
||||
_ "kubesphere.io/kubesphere/pkg/apis/operations/install"
|
||||
|
||||
@@ -21,6 +21,7 @@ import (
|
||||
goflag "flag"
|
||||
"fmt"
|
||||
"github.com/golang/glog"
|
||||
"github.com/json-iterator/go"
|
||||
kconfig "github.com/kiali/kiali/config"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/pflag"
|
||||
@@ -28,12 +29,14 @@ import (
|
||||
"kubesphere.io/kubesphere/pkg/apiserver/runtime"
|
||||
"kubesphere.io/kubesphere/pkg/filter"
|
||||
"kubesphere.io/kubesphere/pkg/informers"
|
||||
"kubesphere.io/kubesphere/pkg/models/applications"
|
||||
logging "kubesphere.io/kubesphere/pkg/models/log"
|
||||
"kubesphere.io/kubesphere/pkg/signals"
|
||||
"log"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
var jsonIter = jsoniter.ConfigCompatibleWithStandardLibrary
|
||||
|
||||
func NewAPIServerCommand() *cobra.Command {
|
||||
s := options.NewServerRunOptions()
|
||||
|
||||
@@ -60,9 +63,6 @@ func Run(s *options.ServerRunOptions) error {
|
||||
log.Printf("FLAG: --%s=%q", flag.Name, flag.Value)
|
||||
})
|
||||
|
||||
applications.OpenPitrixServer = s.OpenPitrixServer
|
||||
applications.OpenPitrixProxyToken = s.OpenPitrixProxyToken
|
||||
|
||||
var err error
|
||||
|
||||
waitForResourceSync()
|
||||
@@ -78,6 +78,7 @@ func Run(s *options.ServerRunOptions) error {
|
||||
}
|
||||
}
|
||||
|
||||
initializeESClientConfig()
|
||||
initializeKialiConfig(s)
|
||||
|
||||
if s.GenericServerRunOptions.InsecurePort != 0 {
|
||||
@@ -110,6 +111,24 @@ func initializeKialiConfig(s *options.ServerRunOptions) {
|
||||
kconfig.Set(config)
|
||||
}
|
||||
|
||||
func initializeESClientConfig() {
|
||||
|
||||
// List all outputs
|
||||
outputs,err := logging.GetFluentbitOutputFromConfigMap()
|
||||
if err != nil {
|
||||
glog.Errorln(err)
|
||||
return
|
||||
}
|
||||
|
||||
// Iterate the outputs to get elasticsearch configs
|
||||
for _, output := range outputs {
|
||||
if configs := logging.ParseEsOutputParams(output.Parameters); configs != nil {
|
||||
configs.WriteESConfigs()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func waitForResourceSync() {
|
||||
stopChan := signals.SetupSignalHandler()
|
||||
|
||||
@@ -134,12 +153,19 @@ func waitForResourceSync() {
|
||||
informerFactory.Apps().V1().StatefulSets().Lister()
|
||||
informerFactory.Apps().V1().Deployments().Lister()
|
||||
informerFactory.Apps().V1().DaemonSets().Lister()
|
||||
informerFactory.Apps().V1().ReplicaSets().Lister()
|
||||
|
||||
informerFactory.Batch().V1().Jobs().Lister()
|
||||
informerFactory.Batch().V1beta1().CronJobs().Lister()
|
||||
|
||||
informerFactory.Start(stopChan)
|
||||
informerFactory.WaitForCacheSync(stopChan)
|
||||
|
||||
s2iInformerFactory := informers.S2iSharedInformerFactory()
|
||||
s2iInformerFactory.Devops().V1alpha1().S2iBuilderTemplates().Lister()
|
||||
s2iInformerFactory.Devops().V1alpha1().S2iRuns().Lister()
|
||||
s2iInformerFactory.Devops().V1alpha1().S2iBuilders().Lister()
|
||||
|
||||
s2iInformerFactory.Start(stopChan)
|
||||
s2iInformerFactory.WaitForCacheSync(stopChan)
|
||||
log.Println("resources sync success")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user