feature: reduce telemetry dependence. (#6276)

Co-authored-by: joyceliu <joyceliu@yunify.com>
This commit is contained in:
liujian
2024-11-26 11:31:35 +08:00
committed by GitHub
parent fa7c1340e3
commit d0483aa39a
30 changed files with 1958 additions and 113 deletions

View File

@@ -24,7 +24,6 @@ type Options struct {
KubernetesOptions *k8s.Options
AuthenticationOptions *authentication.Options
MultiClusterOptions *multicluster.Options
TelemetryOptions *TelemetryOptions
TerminalOptions *terminal.Options
ComposedAppOptions *composedapp.Options
HelmExecutorOptions *HelmExecutorOptions
@@ -81,16 +80,3 @@ func NewKubeSphereOptions() *KubeSphereOptions {
TLS: false,
}
}
// TelemetryOptions is the config data for telemetry.
type TelemetryOptions struct {
// KSCloudURL for kubesphere cloud
KSCloudURL string `json:"ksCloudURL,omitempty" yaml:"ksCloudURL,omitempty" mapstructure:"ksCloudURL"`
// collect period
Period *time.Duration `json:"period,omitempty" yaml:"period,omitempty" mapstructure:"period"`
}
func NewTelemetryOptions() *TelemetryOptions {
return &TelemetryOptions{}
}

View File

@@ -0,0 +1,48 @@
/*
* Please refer to the LICENSE file in the root directory of the project.
* https://github.com/kubesphere/kubesphere/blob/master/LICENSE
*/
package telemetry
import (
"fmt"
"gopkg.in/yaml.v3"
corev1 "k8s.io/api/core/v1"
)
const (
ConfigName = "io.kubesphere.config.platformconfig.telemetry"
ConfigDataKey = "configuration.yaml"
)
// PlatformOptions store in constants.PlatformConfigurationName by hot loading.
type TelemetryOptions struct {
// should enable the telemetry.
Enabled bool `json:"enabled,omitempty" yaml:"enabled,omitempty" mapstructure:"enabled"`
// KSCloudURL for kubesphere cloud
KSCloudURL string `json:"ksCloudURL,omitempty" yaml:"ksCloudURL,omitempty" mapstructure:"ksCloudURL"`
// collect period
// The schedule in telemetry clusterInfo format, see https://en.wikipedia.org/wiki/Cron.
Schedule string `json:"schedule,omitempty" yaml:"schedule,omitempty" mapstructure:"schedule"`
}
func NewTelemetryOptions() *TelemetryOptions {
return &TelemetryOptions{
Schedule: "0 1 * * *", // 1:00 each day
}
}
// LoadPlatformConfig from given ConfigMap.
func LoadTelemetryConfig(secret *corev1.Secret) (*TelemetryOptions, error) {
value, ok := secret.Data[ConfigDataKey]
if !ok {
return nil, fmt.Errorf("failed to get config %s from secret %s value", ConfigDataKey, ConfigName)
}
o := &TelemetryOptions{}
if err := yaml.Unmarshal([]byte(value), o); err != nil {
return nil, fmt.Errorf("failed to unmarshal value from configmap. err: %s", err)
}
return o, nil
}

View File

@@ -0,0 +1,94 @@
/*
* Please refer to the LICENSE file in the root directory of the project.
* https://github.com/kubesphere/kubesphere/blob/master/LICENSE
*/
package telemetry
import (
"context"
"os/exec"
"sync"
"time"
"github.com/robfig/cron/v3"
"k8s.io/klog/v2"
runtimeclient "sigs.k8s.io/controller-runtime/pkg/client"
)
// runnable struct contains the dynamic scheduling logic.
type runnable struct {
client runtimeclient.Client
cron *cron.Cron
*TelemetryOptions
taskID cron.EntryID
// taskFunc func()
mu sync.Mutex
}
// newRunnable creates a new runnable instance with the initial schedule.
func NewRunnable(ctx context.Context, options *TelemetryOptions, client runtimeclient.Client) (*runnable, error) {
r := &runnable{
cron: cron.New(),
TelemetryOptions: options,
client: client,
}
// Initialize and start the task.
if err := r.startTask(); err != nil {
return nil, err
}
r.cron.Start()
go func() {
<-ctx.Done()
r.cron.Stop()
}()
return r, nil
}
// startTask adds the task to the cron scheduler using the current schedule.
func (r *runnable) startTask() error {
r.mu.Lock()
defer r.mu.Unlock()
// Add the task to the cron scheduler
id, err := r.cron.AddFunc(r.TelemetryOptions.Schedule, func() {
var args = []string{
"--url", r.TelemetryOptions.KSCloudURL,
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
cmd := exec.CommandContext(ctx, "telemetry", args...)
if output, err := cmd.CombinedOutput(); err != nil {
klog.Errorf("failed to exec command for telemetry %v. output is %s", err, output)
}
})
if err != nil {
return err
}
r.taskID = id
return nil
}
// UpdateSchedule dynamically updates the task's schedule.
func (r *runnable) UpdateSchedule(newSchedule string) error {
r.mu.Lock()
defer r.mu.Unlock()
// If the schedule hasn't changed, don't update.
if newSchedule == r.TelemetryOptions.Schedule {
return nil
}
// Remove the current task from the cron scheduler.
r.cron.Remove(r.taskID)
// Update the schedule and re-add the task.
r.TelemetryOptions.Schedule = newSchedule
return r.startTask()
}
func (r *runnable) Close() {
r.cron.Stop()
}

View File

@@ -0,0 +1,130 @@
/*
* Please refer to the LICENSE file in the root directory of the project.
* https://github.com/kubesphere/kubesphere/blob/master/LICENSE
*/
package telemetry
import (
"context"
"strings"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
clusterv1alpha1 "kubesphere.io/api/cluster/v1alpha1"
"sigs.k8s.io/controller-runtime/pkg/builder"
runtimeclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"kubesphere.io/kubesphere/pkg/constants"
kscontroller "kubesphere.io/kubesphere/pkg/controller"
)
const (
ControllerName = "telemetry"
)
var _ kscontroller.Controller = &Reconciler{}
var _ reconcile.Reconciler = &Reconciler{}
func (r *Reconciler) Name() string {
return ControllerName
}
func (r *Reconciler) Enabled(clusterRole string) bool {
return strings.EqualFold(clusterRole, string(clusterv1alpha1.ClusterRoleHost))
}
func (r *Reconciler) Hidden() bool {
return true
}
type Reconciler struct {
*TelemetryOptions
runtimeclient.Client
telemetryRunnable *runnable
}
func (r *Reconciler) SetupWithManager(mgr *kscontroller.Manager) error {
r.Client = mgr.GetClient()
return builder.
ControllerManagedBy(mgr).
For(&corev1.Secret{}, builder.WithPredicates(predicate.NewPredicateFuncs(func(obj runtimeclient.Object) bool {
secret, ok := obj.(*corev1.Secret)
if !ok {
return false
}
return secret.Namespace == constants.KubeSphereNamespace &&
secret.Name == ConfigName &&
secret.Type == constants.SecretTypePlatformConfig
}))).
Complete(r)
}
func (r *Reconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: ConfigName,
Namespace: constants.KubeSphereNamespace,
},
}
if err := r.Client.Get(ctx, runtimeclient.ObjectKeyFromObject(secret), secret); err != nil {
if errors.IsNotFound(err) { // not found. telemetry is disabled.
if r.telemetryRunnable != nil {
r.telemetryRunnable.Close()
}
return reconcile.Result{}, nil
}
klog.V(9).ErrorS(err, "cannot get telemetry option secret")
return reconcile.Result{RequeueAfter: time.Second}, nil
}
// ignore delete resource
if secret.DeletionTimestamp != nil {
return reconcile.Result{}, nil
}
// get config from configmap
conf, err := LoadTelemetryConfig(secret)
if err != nil {
klog.V(9).ErrorS(err, "cannot log telemetry option secret")
return reconcile.Result{}, nil
}
// check value when telemetry is enabled.
if conf.Enabled &&
(conf.KSCloudURL == "" || conf.Schedule == "") {
klog.V(9).ErrorS(nil, "ksCloudURL and schedule should not be empty when telemetry enabled is true.")
return reconcile.Result{}, nil
}
// stop telemetryRunnable when telemetry is disabled.
if !conf.Enabled {
if r.telemetryRunnable != nil {
r.telemetryRunnable.Close()
}
return reconcile.Result{}, nil
}
if r.telemetryRunnable == nil {
r.TelemetryOptions = conf
r.telemetryRunnable, err = NewRunnable(ctx, r.TelemetryOptions, r.Client)
if err != nil {
klog.V(9).ErrorS(err, "failed to new telemetryRunnable")
}
return reconcile.Result{}, nil
}
if conf.Schedule != r.TelemetryOptions.Schedule {
r.TelemetryOptions = conf
if err := r.telemetryRunnable.UpdateSchedule(conf.Schedule); err != nil {
klog.V(9).ErrorS(err, "failed to update telemetryRunnable schedule")
}
return reconcile.Result{}, nil
}
return reconcile.Result{}, nil
}

View File

@@ -1,79 +0,0 @@
/*
* Please refer to the LICENSE file in the root directory of the project.
* https://github.com/kubesphere/kubesphere/blob/master/LICENSE
*/
package telemetry
import (
"context"
"os/exec"
"strings"
"time"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
clusterv1alpha1 "kubesphere.io/api/cluster/v1alpha1"
"sigs.k8s.io/controller-runtime/pkg/manager"
kscontroller "kubesphere.io/kubesphere/pkg/controller"
"kubesphere.io/kubesphere/pkg/controller/options"
)
var _ manager.LeaderElectionRunnable = &Runnable{}
var _ manager.Runnable = &Runnable{}
var _ kscontroller.Controller = &Runnable{}
const (
runnableName = "telemetry"
// defaultPeriod for collect data
defaultPeriod = time.Hour * 24
)
type Runnable struct {
*options.TelemetryOptions
}
func (r *Runnable) Name() string {
return runnableName
}
func (r *Runnable) SetupWithManager(mgr *kscontroller.Manager) error {
if mgr.TelemetryOptions == nil || mgr.TelemetryOptions.KSCloudURL == "" {
klog.V(4).Infof("telemetry runnable is disabled")
return nil
}
r.TelemetryOptions = mgr.TelemetryOptions
if r.TelemetryOptions.Period == nil {
r.TelemetryOptions.Period = ptr.To[time.Duration](defaultPeriod)
}
return mgr.Add(r)
}
func (r *Runnable) Enabled(clusterRole string) bool {
return strings.EqualFold(clusterRole, string(clusterv1alpha1.ClusterRoleHost))
}
func (r *Runnable) Start(ctx context.Context) error {
t := time.NewTicker(*r.Period)
for {
select {
case <-t.C:
var args = []string{
"--url", r.KSCloudURL,
}
cmd := exec.CommandContext(ctx, "telemetry", args...)
if _, err := cmd.CombinedOutput(); err != nil {
klog.Errorf("failed to exec command for telemetry %v", err)
}
case <-ctx.Done():
t.Stop()
return nil
}
}
}
func (r *Runnable) NeedLeaderElection() bool {
return true
}