refactor kubectl websocket API

Signed-off-by: hongming <talonwan@yunify.com>
This commit is contained in:
hongming
2019-11-07 12:40:44 +08:00
parent 5cc4169244
commit 2c52a7064c
102 changed files with 853 additions and 16388 deletions

View File

@@ -42,14 +42,11 @@ func addWebService(c *restful.Container) error {
tags := []string{"Terminal"}
webservice.Route(webservice.GET("/namespaces/{namespace}/pods/{pod}").
To(terminal.CreateTerminalSession).
To(terminal.HandleTerminalSession).
Doc("create terminal session").
Metadata(restfulspec.KeyOpenAPITags, tags).
Writes(models.PodInfo{}))
path := runtime.ApiRootPath + "/" + GroupVersion.String() + "/sockjs"
c.Handle(path+"/", terminal.NewTerminalHandler(path))
c.Add(webservice)
return nil

View File

@@ -19,38 +19,32 @@ package terminal
import (
"github.com/emicklei/go-restful"
"gopkg.in/igm/sockjs-go.v2/sockjs"
"github.com/gorilla/websocket"
"k8s.io/klog"
"kubesphere.io/kubesphere/pkg/models/terminal"
"kubesphere.io/kubesphere/pkg/server/errors"
"net/http"
)
// TerminalResponse is sent by handleExecShell. The Id is a random session id that binds the original REST request and the SockJS connection.
// Any clientapi in possession of this Id can hijack the terminal session.
type TerminalResponse struct {
Id string `json:"id"`
}
// CreateAttachHandler is called from main for /api/sockjs
func NewTerminalHandler(path string) http.Handler {
return sockjs.NewHandler(path, sockjs.DefaultOptions, terminal.HandleTerminalSession)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
// Allow connections from any Origin
CheckOrigin: func(r *http.Request) bool { return true },
}
// Handles execute shell API call
func CreateTerminalSession(request *restful.Request, resp *restful.Response) {
func HandleTerminalSession(request *restful.Request, resp *restful.Response) {
namespace := request.PathParameter("namespace")
podName := request.PathParameter("pod")
containerName := request.QueryParameter("container")
shell := request.QueryParameter("shell")
sessionId, err := terminal.NewSession(shell, namespace, podName, containerName)
conn, err := upgrader.Upgrade(resp.ResponseWriter, request.Request, nil)
if err != nil {
resp.WriteHeaderAndEntity(http.StatusInternalServerError, errors.Wrap(err))
klog.Warning(err)
return
}
TerminalResponse := &TerminalResponse{Id: sessionId}
resp.WriteAsJson(TerminalResponse)
terminal.HandleSession(shell, namespace, podName, containerName, conn)
}

View File

@@ -19,17 +19,20 @@
package terminal
import (
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"gopkg.in/igm/sockjs-go.v2/sockjs"
"github.com/gorilla/websocket"
"io"
"k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/klog"
"kubesphere.io/kubesphere/pkg/simple/client"
"time"
)
const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
)
// PtyHandler is what remotecommand expects from a pty
@@ -41,24 +44,21 @@ type PtyHandler interface {
// TerminalSession implements PtyHandler (using a SockJS connection)
type TerminalSession struct {
id string
bound chan error
sockJSSession sockjs.Session
sizeChan chan remotecommand.TerminalSize
conn *websocket.Conn
sizeChan chan remotecommand.TerminalSize
}
// TerminalMessage is the messaging protocol between ShellController and TerminalSession.
//
// OP DIRECTION FIELD(S) USED DESCRIPTION
// ---------------------------------------------------------------------
// bind fe->be SessionID Id sent back from TerminalResponse
// stdin fe->be Data Keystrokes/paste buffer
// resize fe->be Rows, Cols New terminal size
// stdout be->fe Data Output from the process
// toast be->fe Data OOB message to be shown to the user
type TerminalMessage struct {
Op, Data, SessionID string
Rows, Cols uint16
Op, Data string
Rows, Cols uint16
}
// TerminalSize handles pty->process resize events
@@ -73,13 +73,10 @@ func (t TerminalSession) Next() *remotecommand.TerminalSize {
// Read handles pty->process messages (stdin, resize)
// Called in a loop from remotecommand as long as the process is running
func (t TerminalSession) Read(p []byte) (int, error) {
m, err := t.sockJSSession.Recv()
if err != nil {
return 0, err
}
var msg TerminalMessage
if err := json.Unmarshal([]byte(m), &msg); err != nil {
err := t.conn.ReadJSON(&msg)
if err != nil {
return 0, err
}
@@ -104,8 +101,8 @@ func (t TerminalSession) Write(p []byte) (int, error) {
if err != nil {
return 0, err
}
if err = t.sockJSSession.Send(string(msg)); err != nil {
t.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err = t.conn.WriteMessage(websocket.TextMessage, msg); err != nil {
return 0, err
}
return len(p), nil
@@ -121,8 +118,8 @@ func (t TerminalSession) Toast(p string) error {
if err != nil {
return err
}
if err = t.sockJSSession.Send(string(msg)); err != nil {
t.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err = t.conn.WriteMessage(websocket.TextMessage, msg); err != nil {
return err
}
return nil
@@ -132,47 +129,12 @@ func (t TerminalSession) Toast(p string) error {
// Can happen if the process exits or if there is an error starting up the process
// For now the status code is unused and reason is shown to the user (unless "")
func (t TerminalSession) Close(status uint32, reason string) {
t.sockJSSession.Close(status, reason)
}
// terminalSessions stores a map of all TerminalSession objects
// FIXME: this structure needs locking
var terminalSessions = make(map[string]TerminalSession)
// handleTerminalSession is Called by net/http for any new /api/sockjs connections
func HandleTerminalSession(session sockjs.Session) {
klog.Infof("handleTerminalSession, ID:%s", session.ID())
var (
buf string
err error
msg TerminalMessage
terminalSession TerminalSession
ok bool
)
if buf, err = session.Recv(); err != nil {
klog.Errorf("handleTerminalSession: can't Recv: %v", err)
return
}
if err = json.Unmarshal([]byte(buf), &msg); err != nil {
klog.Errorf("handleTerminalSession: can't UnMarshal (%v): %s", err, buf)
return
}
if msg.Op != "bind" {
klog.Errorf("handleTerminalSession: expected 'bind' message, got: %s", buf)
return
}
if terminalSession, ok = terminalSessions[msg.SessionID]; !ok {
klog.Errorf("handleTerminalSession: can't find session '%s'", msg.SessionID)
return
}
terminalSession.sockJSSession = session
terminalSessions[msg.SessionID] = terminalSession
terminalSession.bound <- nil
data, _ := json.Marshal(struct {
Status uint32 `json:"status"`
Reason string `json:"reason"`
}{Status: status, Reason: reason})
t.conn.WriteMessage(websocket.TextMessage, data)
t.conn.Close()
}
// startProcess is called by handleAttach
@@ -216,22 +178,6 @@ func startProcess(namespace, podName, containerName string, cmd []string, ptyHan
return nil
}
// genTerminalSessionId generates a random session ID string. The format is not really interesting.
// This ID is used to identify the session when the client opens the SockJS connection.
// Not the same as the SockJS session id! We can't use that as that is generated
// on the client side and we don't have it yet at this point.
func genTerminalSessionId() (string, error) {
bytes := make([]byte, 16)
if _, err := rand.Read(bytes); err != nil {
return "", err
}
id := make([]byte, hex.EncodedLen(len(bytes)))
hex.Encode(id, bytes)
klog.Infof("genTerminalSessionId, id:" + string(id))
return string(id), nil
}
// isValidShell checks if the shell is an allowed one
func isValidShell(validShells []string, shell string) bool {
for _, validShell := range validShells {
@@ -242,53 +188,31 @@ func isValidShell(validShells []string, shell string) bool {
return false
}
// WaitingForConnection is called from apihandler.handleAttach as a goroutine
// Waits for the SockJS connection to be opened by the client the session to be bound in handleTerminalSession
func WaitingForConnection(shell string, namespace, podName, containerName string, sessionId string) {
klog.Infof("WaitingForConnection, ID:%s", sessionId)
select {
case <-terminalSessions[sessionId].bound:
close(terminalSessions[sessionId].bound)
defer delete(terminalSessions, sessionId)
var err error
validShells := []string{"sh", "bash"}
func HandleSession(shell, namespace, podName, containerName string, conn *websocket.Conn) {
if isValidShell(validShells, shell) {
cmd := []string{shell}
err = startProcess(namespace, podName, containerName, cmd, terminalSessions[sessionId])
} else {
// No shell given or it was not valid: try some shells until one succeeds or all fail
// FIXME: if the first shell fails then the first keyboard event is lost
for _, testShell := range validShells {
cmd := []string{testShell}
if err = startProcess(namespace, podName, containerName, cmd, terminalSessions[sessionId]); err == nil {
break
}
var err error
validShells := []string{"sh", "bash"}
session := &TerminalSession{conn: conn}
if isValidShell(validShells, shell) {
cmd := []string{shell}
err = startProcess(namespace, podName, containerName, cmd, session)
} else {
// No shell given or it was not valid: try some shells until one succeeds or all fail
// FIXME: if the first shell fails then the first keyboard event is lost
for _, testShell := range validShells {
cmd := []string{testShell}
if err = startProcess(namespace, podName, containerName, cmd, session); err == nil {
break
}
}
if err != nil {
terminalSessions[sessionId].Close(2, err.Error())
return
}
terminalSessions[sessionId].Close(1, "Process exited")
}
}
func NewSession(shell, namespace, podName, containerName string) (string, error) {
sessionId, err := genTerminalSessionId()
if err != nil {
return "", err
session.Close(2, err.Error())
return
}
terminalSessions[sessionId] = TerminalSession{
id: sessionId,
bound: make(chan error),
sizeChan: make(chan remotecommand.TerminalSize),
}
go WaitingForConnection(shell, namespace, podName, containerName, sessionId)
return sessionId, nil
session.Close(1, "Process exited")
}