add shell access to node

Signed-off-by: lynxcat <lynxcatdeng@gmail.com>
This commit is contained in:
lynxcat
2021-12-27 15:34:45 +08:00
parent e9a62896f7
commit 1342a9abe1
8 changed files with 263 additions and 10 deletions

View File

@@ -225,7 +225,7 @@ func (s *APIServer) installKubeSphereAPIs() {
s.KubernetesClient.Master()))
urlruntime.Must(tenantv1alpha2.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.Kubernetes(),
s.KubernetesClient.KubeSphere(), s.EventsClient, s.LoggingClient, s.AuditingClient, amOperator, rbacAuthorizer, s.MonitoringClient, s.RuntimeCache, s.Config.MeteringOptions))
urlruntime.Must(terminalv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), rbacAuthorizer, s.KubernetesClient.Config()))
urlruntime.Must(terminalv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), rbacAuthorizer, s.KubernetesClient.Config(), s.Config.TerminalOptions))
urlruntime.Must(clusterkapisv1alpha1.AddToContainer(s.container,
s.KubernetesClient.KubeSphere(),
s.InformerFactory.KubernetesSharedInformerFactory(),

View File

@@ -28,6 +28,7 @@ import (
networkv1alpha1 "kubesphere.io/api/network/v1alpha1"
"kubesphere.io/kubesphere/pkg/models/terminal"
"kubesphere.io/kubesphere/pkg/simple/client/alerting"
"kubesphere.io/kubesphere/pkg/simple/client/auditing"
"kubesphere.io/kubesphere/pkg/simple/client/cache"
@@ -109,6 +110,7 @@ type Config struct {
MeteringOptions *metering.Options `json:"metering,omitempty" yaml:"metering,omitempty" mapstructure:"metering"`
GatewayOptions *gateway.Options `json:"gateway,omitempty" yaml:"gateway,omitempty" mapstructure:"gateway"`
GPUOptions *gpu.Options `json:"gpu,omitempty" yaml:"gpu,omitempty" mapstructure:"gpu"`
TerminalOptions *terminal.Options `json:"terminal,omitempty" yaml:"terminal,omitempty" mapstructure:"terminal"`
}
// newConfig creates a default non-empty Config
@@ -136,6 +138,7 @@ func New() *Config {
MeteringOptions: metering.NewMeteringOptions(),
GatewayOptions: gateway.NewGatewayOptions(),
GPUOptions: gpu.NewGPUOptions(),
TerminalOptions: terminal.NewTerminalOptions(),
}
}

View File

@@ -19,6 +19,7 @@ package config
import (
"fmt"
"io/ioutil"
"os"
"testing"
"time"
@@ -32,6 +33,7 @@ import (
networkv1alpha1 "kubesphere.io/api/network/v1alpha1"
"kubesphere.io/kubesphere/pkg/apiserver/authentication/oauth"
"kubesphere.io/kubesphere/pkg/models/terminal"
"kubesphere.io/kubesphere/pkg/simple/client/alerting"
"kubesphere.io/kubesphere/pkg/simple/client/auditing"
"kubesphere.io/kubesphere/pkg/simple/client/cache"
@@ -190,6 +192,10 @@ func newTestConfig() (*Config, error) {
GPUOptions: &gpu.Options{
Kinds: []gpu.GPUKind{},
},
TerminalOptions: &terminal.Options{
Image: "alpine:3.15",
Timeout: 600,
},
}
return conf, nil
}

View File

@@ -45,10 +45,10 @@ type terminalHandler struct {
authorizer authorizer.Authorizer
}
func newTerminalHandler(client kubernetes.Interface, authorizer authorizer.Authorizer, config *rest.Config) *terminalHandler {
func newTerminalHandler(client kubernetes.Interface, authorizer authorizer.Authorizer, config *rest.Config, options *terminal.Options) *terminalHandler {
return &terminalHandler{
authorizer: authorizer,
terminaler: terminal.NewTerminaler(client, config),
terminaler: terminal.NewTerminaler(client, config, options),
}
}
@@ -89,3 +89,38 @@ func (t *terminalHandler) handleTerminalSession(request *restful.Request, respon
t.terminaler.HandleSession(shell, namespace, podName, containerName, conn)
}
func (t *terminalHandler) handleShellAccessToNode(request *restful.Request, response *restful.Response) {
nodename := request.PathParameter("nodename")
user, _ := requestctx.UserFrom(request.Request.Context())
createPodsExec := authorizer.AttributesRecord{
User: user,
Verb: "create",
Resource: "pods",
Subresource: "exec",
Namespace: "kubesphere-controls-system",
ResourceRequest: true,
ResourceScope: requestctx.NamespaceScope,
}
decision, reason, err := t.authorizer.Authorize(createPodsExec)
if err != nil {
api.HandleInternalError(response, request, err)
return
}
if decision != authorizer.DecisionAllow {
api.HandleForbidden(response, request, errors.New(reason))
return
}
conn, err := upgrader.Upgrade(response.ResponseWriter, request.Request, nil)
if err != nil {
klog.Warning(err)
return
}
t.terminaler.HandleShellAccessToNode(nodename, conn)
}

View File

@@ -28,6 +28,7 @@ import (
"kubesphere.io/kubesphere/pkg/apiserver/runtime"
"kubesphere.io/kubesphere/pkg/constants"
"kubesphere.io/kubesphere/pkg/models"
"kubesphere.io/kubesphere/pkg/models/terminal"
)
const (
@@ -36,11 +37,11 @@ const (
var GroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1alpha2"}
func AddToContainer(c *restful.Container, client kubernetes.Interface, authorizer authorizer.Authorizer, config *rest.Config) error {
func AddToContainer(c *restful.Container, client kubernetes.Interface, authorizer authorizer.Authorizer, config *rest.Config, options *terminal.Options) error {
webservice := runtime.NewWebService(GroupVersion)
handler := newTerminalHandler(client, authorizer, config)
handler := newTerminalHandler(client, authorizer, config, options)
webservice.Route(webservice.GET("/namespaces/{namespace}/pods/{pod}/exec").
To(handler.handleTerminalSession).
@@ -50,6 +51,14 @@ func AddToContainer(c *restful.Container, client kubernetes.Interface, authorize
Metadata(restfulspec.KeyOpenAPITags, []string{constants.TerminalTag}).
Writes(models.PodInfo{}))
//Add new Route to support shell access to the node
webservice.Route(webservice.GET("/nodes/{nodename}/exec").
To(handler.handleShellAccessToNode).
Param(webservice.PathParameter("nodename", "name of cluster node")).
Doc("create shell access to node session").
Metadata(restfulspec.KeyOpenAPITags, []string{constants.TerminalTag}).
Writes(models.PodInfo{}))
c.Add(webservice)
return nil

View File

@@ -0,0 +1,28 @@
package terminal
import "github.com/spf13/pflag"
type Options struct {
Image string `json:"image,omitempty" yaml:"image"`
Timeout int `json:"timeout,omitempty" yaml:"timeout"`
}
func NewTerminalOptions() *Options {
return &Options{
Image: "alpine:3.15",
Timeout: 600,
}
}
func (s *Options) Validate() []error {
var errs []error
return errs
}
func (s *Options) ApplyTo(options *Options) {
}
func (s *Options) AddFlags(fs *pflag.FlagSet, c *Options) {
}

View File

@@ -21,13 +21,18 @@ limitations under the License.
package terminal
import (
"context"
"encoding/json"
"fmt"
"io"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/gorilla/websocket"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
@@ -53,6 +58,10 @@ type TerminalSession struct {
sizeChan chan remotecommand.TerminalSize
}
var (
NodeSessionCounter sync.Map
)
// TerminalMessage is the messaging protocol between ShellController and TerminalSession.
//
// OP DIRECTION FIELD(S) USED DESCRIPTION
@@ -140,15 +149,133 @@ func (t TerminalSession) Close(status uint32, reason string) {
type Interface interface {
HandleSession(shell, namespace, podName, containerName string, conn *websocket.Conn)
HandleShellAccessToNode(nodename string, conn *websocket.Conn)
}
type terminaler struct {
client kubernetes.Interface
config *rest.Config
client kubernetes.Interface
config *rest.Config
options *Options
}
func NewTerminaler(client kubernetes.Interface, config *rest.Config) Interface {
return &terminaler{client: client, config: config}
type NodeTerminaler struct {
Nodename string
Namespace string
PodName string
ContainerName string
Shell string
Privileged bool
Config *Options
client kubernetes.Interface
}
func NewTerminaler(client kubernetes.Interface, config *rest.Config, options *Options) Interface {
return &terminaler{client: client, config: config, options: options}
}
func NewNodeTerminaler(nodename string, options *Options, client kubernetes.Interface) (*NodeTerminaler, error) {
n := &NodeTerminaler{
Namespace: "kubesphere-controls-system",
ContainerName: "nsenter",
Nodename: nodename,
PodName: nodename + "-shell-access",
Shell: "sh",
Privileged: true,
Config: options,
client: client,
}
node, err := n.client.CoreV1().Nodes().Get(context.Background(), n.Nodename, metav1.GetOptions{})
if err != nil {
return n, fmt.Errorf("node cannot exist. nodename:%s, err: %v", n.Nodename, err)
}
flag := false
for _, condition := range node.Status.Conditions {
if condition.Type == v1.NodeReady && condition.Status == v1.ConditionTrue {
flag = true
break
}
}
if !flag {
return n, fmt.Errorf("node status error. node: %s", n.Nodename)
}
idx := int64(0)
NodeSessionCounter.LoadOrStore(nodename, &idx)
return n, nil
}
func (n *NodeTerminaler) getNSEnterPod() (*v1.Pod, error) {
pod, err := n.client.CoreV1().Pods(n.Namespace).Get(context.Background(), n.PodName, metav1.GetOptions{})
if err != nil || (pod.Status.Phase != v1.PodRunning && pod.Status.Phase != v1.PodPending) {
//pod has timed out, but has not been cleaned up
if pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
err := n.client.CoreV1().Pods(n.Namespace).Delete(context.Background(), n.PodName, metav1.DeleteOptions{})
if err != nil {
return pod, err
}
}
var p = &v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "pod",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: n.PodName,
},
Spec: v1.PodSpec{
NodeName: n.Nodename,
HostPID: true,
HostNetwork: true,
RestartPolicy: v1.RestartPolicyNever,
Containers: []v1.Container{
{
Name: n.ContainerName,
Image: n.Config.Image,
Command: []string{
"nsenter", "-m", "-u", "-i", "-n", "-p", "-t", "1",
},
Stdin: true,
TTY: true,
SecurityContext: &v1.SecurityContext{
Privileged: &n.Privileged,
},
},
},
},
}
if n.Config.Timeout == 0 {
p.Spec.Containers[0].Args = []string{"tail", "-f", "/dev/null"}
} else {
p.Spec.Containers[0].Args = []string{"sleep", strconv.Itoa(n.Config.Timeout)}
}
pod, err = n.client.CoreV1().Pods(n.Namespace).Create(context.Background(), p, metav1.CreateOptions{})
if err != nil {
return nil, fmt.Errorf("create pod failed on %s node: %v", n.Nodename, err)
}
}
return pod, nil
}
func (n NodeTerminaler) CleanUpNSEnterPod() {
idx, _ := NodeSessionCounter.Load(n.Nodename)
atomic.AddInt64(idx.(*int64), -1)
if *(idx.(*int64)) == 0 {
err := n.client.CoreV1().Pods(n.Namespace).Delete(context.Background(), n.PodName, metav1.DeleteOptions{})
if err != nil {
klog.Warning(err)
}
}
}
// startProcess is called by handleAttach
@@ -224,3 +351,48 @@ func (t *terminaler) HandleSession(shell, namespace, podName, containerName stri
session.Close(1, "Process exited")
}
func (t *terminaler) HandleShellAccessToNode(nodename string, conn *websocket.Conn) {
succ, fail := make(chan bool), make(chan bool)
nodeTerminaler, err := NewNodeTerminaler(nodename, t.options, t.client)
if err != nil {
klog.Warning("node terminaler init error: ", err)
return
}
pod, err := nodeTerminaler.getNSEnterPod()
if err != nil {
klog.Warning("get nsenter pod error: ", err)
return
}
go nodeTerminaler.WatchPodStatusBeRunning(pod, succ, fail)
select {
case <-succ:
t.HandleSession(nodeTerminaler.Shell, nodeTerminaler.Namespace, nodeTerminaler.PodName, nodeTerminaler.ContainerName, conn)
defer nodeTerminaler.CleanUpNSEnterPod()
case <-fail:
klog.Warning("watching pod status error")
}
}
func (n *NodeTerminaler) WatchPodStatusBeRunning(pod *v1.Pod, succ chan bool, fail chan bool) {
var err error
for i := 0; i < 5; i++ {
if pod.Status.Phase == v1.PodRunning {
idx, _ := NodeSessionCounter.Load(n.Nodename)
atomic.AddInt64(idx.(*int64), 1)
close(succ)
return
}
time.Sleep(time.Second)
pod, err = n.client.CoreV1().Pods(pod.ObjectMeta.Namespace).Get(context.Background(), pod.ObjectMeta.Name, metav1.GetOptions{})
if err != nil {
klog.Warning(err)
close(fail)
return
}
}
close(fail)
}

View File

@@ -130,7 +130,7 @@ func generateSwaggerJson() []byte {
urlruntime.Must(resourcesv1alpha2.AddToContainer(container, clientsets.Kubernetes(), informerFactory, ""))
urlruntime.Must(resourcesv1alpha3.AddToContainer(container, informerFactory, nil))
urlruntime.Must(tenantv1alpha2.AddToContainer(container, informerFactory, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil))
urlruntime.Must(terminalv1alpha2.AddToContainer(container, clientsets.Kubernetes(), nil, nil))
urlruntime.Must(terminalv1alpha2.AddToContainer(container, clientsets.Kubernetes(), nil, nil, nil))
urlruntime.Must(metricsv1alpha2.AddToContainer(nil, container, clientsets.Kubernetes(), nil))
urlruntime.Must(networkv1alpha2.AddToContainer(container, ""))
alertingOptions := &alerting.Options{}