refactor: openpitrix module

Signed-off-by: hongming <talonwan@yunify.com>
This commit is contained in:
hongming
2019-09-25 14:07:15 +08:00
parent d0dc66cf28
commit 1b5681c12b
314 changed files with 72092 additions and 25762 deletions

132
vendor/openpitrix.io/openpitrix/pkg/manager/checker.go generated vendored Normal file
View File

@@ -0,0 +1,132 @@
// Copyright 2018 The OpenPitrix Authors. All rights reserved.
// Use of this source code is governed by a Apache license
// that can be found in the LICENSE file.
package manager
import (
"context"
"github.com/fatih/structs"
"github.com/golang/protobuf/ptypes/wrappers"
"openpitrix.io/openpitrix/pkg/gerr"
"openpitrix.io/openpitrix/pkg/util/stringutil"
)
type checker struct {
ctx context.Context
req Request
required []string
stringChosen map[string][]string
}
func NewChecker(ctx context.Context, req Request) *checker {
return &checker{
ctx: ctx,
req: req,
required: []string{},
stringChosen: make(map[string][]string),
}
}
func (c *checker) Required(params ...string) *checker {
c.required = append(c.required, params...)
return c
}
func (c *checker) checkRequired(param string, value interface{}) error {
if len(c.required) > 0 && stringutil.StringIn(param, c.required) {
switch v := value.(type) {
case string:
if v == "" {
return gerr.New(c.ctx, gerr.InvalidArgument, gerr.ErrorMissingParameter, param)
}
case *wrappers.StringValue:
if v == nil || v.GetValue() == "" {
return gerr.New(c.ctx, gerr.InvalidArgument, gerr.ErrorMissingParameter, param)
}
case *wrappers.BytesValue:
if v == nil || len(v.GetValue()) == 0 {
return gerr.New(c.ctx, gerr.InvalidArgument, gerr.ErrorMissingParameter, param)
}
case []byte:
if len(v) == 0 {
return gerr.New(c.ctx, gerr.InvalidArgument, gerr.ErrorMissingParameter, param)
}
case []string:
var values []string
for _, v := range v {
if v != "" {
values = append(values, v)
}
}
if len(values) == 0 {
return gerr.New(c.ctx, gerr.InvalidArgument, gerr.ErrorMissingParameter, param)
}
}
}
return nil
}
func (c *checker) StringChosen(param string, chosen []string) *checker {
if exist, ok := c.stringChosen[param]; ok {
c.stringChosen[param] = append(exist, chosen...)
} else {
c.stringChosen[param] = chosen
}
return c
}
func (c *checker) checkStringChosen(param string, value interface{}) error {
if len(c.stringChosen) > 0 {
if chosen, ok := c.stringChosen[param]; ok {
switch v := value.(type) {
case string:
if !stringutil.StringIn(v, chosen) {
return gerr.New(c.ctx, gerr.InvalidArgument, gerr.ErrorUnsupportedParameterValue, param, v)
}
case *wrappers.StringValue:
if v != nil {
if !stringutil.StringIn(v.GetValue(), chosen) {
return gerr.New(c.ctx, gerr.InvalidArgument, gerr.ErrorUnsupportedParameterValue, param, v.GetValue())
}
}
case []string:
for _, s := range v {
if !stringutil.StringIn(s, chosen) {
return gerr.New(c.ctx, gerr.InvalidArgument, gerr.ErrorUnsupportedParameterValue, param, s)
}
}
}
}
}
return nil
}
func (c *checker) chainChecker(param string, value interface{}, checks ...func(string, interface{}) error) error {
var err error
for _, c := range checks {
err = c(param, value)
if err != nil {
return err
}
}
return nil
}
func (c *checker) Exec() error {
for _, field := range structs.Fields(c.req) {
param := getFieldName(field)
value := field.Value()
err := c.chainChecker(param, value,
c.checkRequired,
c.checkStringChosen,
)
if err != nil {
return err
}
}
return nil
}

247
vendor/openpitrix.io/openpitrix/pkg/manager/common.go generated vendored Normal file
View File

@@ -0,0 +1,247 @@
// Copyright 2018 The OpenPitrix Authors. All rights reserved.
// Use of this source code is governed by a Apache license
// that can be found in the LICENSE file.
package manager
import (
"context"
"fmt"
"reflect"
"strings"
"time"
"github.com/fatih/structs"
"github.com/gocraft/dbr"
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/golang/protobuf/ptypes/wrappers"
"openpitrix.io/openpitrix/pkg/constants"
"openpitrix.io/openpitrix/pkg/db"
"openpitrix.io/openpitrix/pkg/logger"
"openpitrix.io/openpitrix/pkg/util/ctxutil"
"openpitrix.io/openpitrix/pkg/util/pbutil"
"openpitrix.io/openpitrix/pkg/util/reflectutil"
"openpitrix.io/openpitrix/pkg/util/stringutil"
)
type Request interface {
Reset()
String() string
ProtoMessage()
Descriptor() ([]byte, []int)
}
type RequestWithSortKey interface {
Request
GetSortKey() *wrappers.StringValue
}
type RequestWithReverse interface {
RequestWithSortKey
GetReverse() *wrappers.BoolValue
}
type RequestWithOwner interface {
Request
GetOwner() []string
}
const (
TagName = "json"
SearchWordColumnName = "search_word"
)
func getSearchFilter(tableName string, value interface{}, exclude ...string) dbr.Builder {
if v, ok := value.(string); ok {
var ops []dbr.Builder
for _, column := range constants.SearchColumns[tableName] {
if stringutil.StringIn(column, exclude) {
continue
}
// if column suffix is _id, must exact match
if strings.HasSuffix(column, "_id") {
ops = append(ops, db.Eq(column, v))
} else {
ops = append(ops, db.Like(column, v))
}
}
if len(ops) == 0 {
return nil
}
return db.Or(ops...)
} else if value != nil {
logger.Warn(nil, "search_word [%+v] is not string", value)
}
return nil
}
func getReqValue(param interface{}) interface{} {
switch value := param.(type) {
case string:
if value == "" {
return nil
}
return value
case *wrappers.StringValue:
if value == nil {
return nil
}
return value.GetValue()
case *wrappers.Int32Value:
if value == nil {
return nil
}
return value.GetValue()
case []string:
var values []string
for _, v := range value {
if v != "" {
values = append(values, v)
}
}
if len(values) == 0 {
return nil
}
return values
}
return nil
}
func BuildFilterConditions(req Request, tableName string, exclude ...string) dbr.Builder {
return buildFilterConditions(false, req, tableName, exclude...)
}
func GetDisplayColumns(displayColumns []string, wholeColumns []string) []string {
if displayColumns == nil {
return wholeColumns
} else if len(displayColumns) == 0 {
return nil
} else {
var newDisplayColumns []string
for _, column := range displayColumns {
if stringutil.StringIn(column, wholeColumns) {
newDisplayColumns = append(newDisplayColumns, column)
}
}
return newDisplayColumns
}
}
func BuildFilterConditionsWithPrefix(req Request, tableName string, exclude ...string) dbr.Builder {
return buildFilterConditions(true, req, tableName, exclude...)
}
func getFieldName(field *structs.Field) string {
tag := field.Tag(TagName)
t := strings.Split(tag, ",")
if len(t) == 0 {
return "-"
}
return t[0]
}
func buildFilterConditions(withPrefix bool, req Request, tableName string, exclude ...string) dbr.Builder {
var conditions []dbr.Builder
for _, field := range structs.Fields(req) {
column := getFieldName(field)
param := field.Value()
indexedColumns, ok := constants.IndexedColumns[tableName]
if ok && stringutil.StringIn(column, indexedColumns) {
value := getReqValue(param)
if value != nil {
key := column
if withPrefix {
key = tableName + "." + key
}
conditions = append(conditions, db.Eq(key, value))
}
}
// TODO: search column
if column == SearchWordColumnName && stringutil.StringIn(tableName, constants.SearchWordColumnTable) {
value := getReqValue(param)
condition := getSearchFilter(tableName, value, exclude...)
if condition != nil {
conditions = append(conditions, condition)
}
}
}
if len(conditions) == 0 {
return nil
}
return db.And(conditions...)
}
func BuildUpdateAttributes(req Request, columns ...string) map[string]interface{} {
attributes := make(map[string]interface{})
for _, field := range structs.Fields(req) {
column := getFieldName(field)
f := field.Value()
v := reflect.ValueOf(f)
if !stringutil.StringIn(column, columns) {
continue
}
if !reflectutil.ValueIsNil(v) {
switch v := f.(type) {
case *wrappers.StringValue:
attributes[column] = v.GetValue()
case *wrappers.BoolValue:
attributes[column] = v.GetValue()
case *wrappers.Int32Value:
attributes[column] = v.GetValue()
case *wrappers.UInt32Value:
attributes[column] = v.GetValue()
case *timestamp.Timestamp:
attributes[column] = pbutil.GetTime(v)
case string, bool, int32, uint32, time.Time:
attributes[column] = v
default:
attributes[column] = v
}
}
}
return attributes
}
func AddQueryOrderDir(query *db.SelectQuery, req Request, defaultColumn string) *db.SelectQuery {
isAsc := false
if r, ok := req.(RequestWithReverse); ok {
reverse := r.GetReverse()
if reverse != nil {
isAsc = reverse.GetValue()
}
}
if r, ok := req.(RequestWithSortKey); ok {
s := r.GetSortKey()
if s != nil {
defaultColumn = s.GetValue()
}
}
query = query.OrderDir(defaultColumn, isAsc)
return query
}
func AddQueryJoinWithMap(query *db.SelectQuery, table, joinTable, primaryKey, keyField, valueField string, filterMap map[string][]string) *db.SelectQuery {
var whereCondition []dbr.Builder
for key, values := range filterMap {
aliasTableName := fmt.Sprintf("table_label_%d", query.JoinCount)
onCondition := fmt.Sprintf("%s.%s = %s.%s", aliasTableName, primaryKey, table, primaryKey)
query = query.Join(dbr.I(joinTable).As(aliasTableName), onCondition)
whereCondition = append(whereCondition, db.And(db.Eq(aliasTableName+"."+keyField, key), db.Eq(aliasTableName+"."+valueField, values)))
query.JoinCount++
}
if len(whereCondition) > 0 {
query = query.Where(db.And(whereCondition...))
}
return query
}
func BuildPermissionFilter(ctx context.Context) dbr.Builder {
s := ctxutil.GetSender(ctx)
if s == nil {
return nil
}
ops := []dbr.Builder{
db.Prefix(constants.ColumnOwnerPath, string(s.GetAccessPath())),
db.Eq(constants.ColumnOwner, s.UserId),
}
return db.Or(ops...)
}

View File

@@ -0,0 +1,64 @@
// Copyright 2018 The OpenPitrix Authors. All rights reserved.
// Use of this source code is governed by a Apache license
// that can be found in the LICENSE file.
package manager
import (
"context"
"crypto/tls"
"fmt"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
)
var ClientOptions = []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 30 * time.Second,
Timeout: 10 * time.Second,
PermitWithoutStream: true,
}),
}
var clientCache sync.Map
func NewClient(host string, port int) (*grpc.ClientConn, error) {
endpoint := fmt.Sprintf("%s:%d", host, port)
if conn, ok := clientCache.Load(endpoint); ok {
return conn.(*grpc.ClientConn), nil
}
ctx := context.Background()
conn, err := grpc.DialContext(ctx, endpoint, ClientOptions...)
if err != nil {
return nil, err
}
clientCache.Store(endpoint, conn)
return conn, nil
}
func NewTLSClient(host string, port int, tlsConfig *tls.Config) (*grpc.ClientConn, error) {
endpoint := fmt.Sprintf("%s:%d", host, port)
if conn, ok := clientCache.Load(endpoint); ok {
return conn.(*grpc.ClientConn), nil
}
creds := credentials.NewTLS(tlsConfig)
tlsClientOptions := []grpc.DialOption{
grpc.WithTransportCredentials(creds),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 30 * time.Second,
Timeout: 10 * time.Second,
PermitWithoutStream: true,
}),
}
conn, err := grpc.Dial(endpoint, tlsClientOptions...)
if err != nil {
return nil, err
}
clientCache.Store(endpoint, conn)
return conn, nil
}

View File

@@ -0,0 +1,197 @@
// Copyright 2018 The OpenPitrix Authors. All rights reserved.
// Use of this source code is governed by a Apache license
// that can be found in the LICENSE file.
package manager
import (
"context"
"fmt"
"net"
"runtime/debug"
"strings"
"time"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
grpc_validator "github.com/grpc-ecosystem/go-grpc-middleware/validator"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/status"
"openpitrix.io/openpitrix/pkg/config"
"openpitrix.io/openpitrix/pkg/db"
"openpitrix.io/openpitrix/pkg/gerr"
"openpitrix.io/openpitrix/pkg/logger"
"openpitrix.io/openpitrix/pkg/util/ctxutil"
"openpitrix.io/openpitrix/pkg/version"
)
type checkerT func(ctx context.Context, req interface{}) error
type builderT func(ctx context.Context, req interface{}) interface{}
var (
defaultChecker checkerT
defaultBuilder builderT
)
type GrpcServer struct {
ServiceName string
Port int
showErrorCause bool
checker checkerT
builder builderT
mysqlConfig config.MysqlConfig
}
type RegisterCallback func(*grpc.Server)
func NewGrpcServer(serviceName string, port int) *GrpcServer {
return &GrpcServer{
ServiceName: serviceName,
Port: port,
showErrorCause: false,
checker: defaultChecker,
builder: defaultBuilder,
}
}
func (g *GrpcServer) ShowErrorCause(b bool) *GrpcServer {
g.showErrorCause = b
return g
}
func (g *GrpcServer) WithChecker(c checkerT) *GrpcServer {
g.checker = c
return g
}
func (g *GrpcServer) WithBuilder(b builderT) *GrpcServer {
g.builder = b
return g
}
func (g *GrpcServer) WithMysqlConfig(cfg config.MysqlConfig) *GrpcServer {
g.mysqlConfig = cfg
return g
}
func (g *GrpcServer) Serve(callback RegisterCallback, opt ...grpc.ServerOption) {
version.PrintVersionInfo(func(s string, i ...interface{}) {
logger.Info(nil, s, i)
})
logger.Info(nil, "Service [%s] start listen at port [%d]", g.ServiceName, g.Port)
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", g.Port))
if err != nil {
err = errors.WithStack(err)
logger.Critical(nil, "failed to listen: %+v", err)
}
builtinOptions := []grpc.ServerOption{
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 10 * time.Second,
PermitWithoutStream: true,
}),
grpc_middleware.WithUnaryServerChain(
grpc_validator.UnaryServerInterceptor(),
g.unaryServerLogInterceptor(),
func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
ctx = db.NewContext(ctx, g.mysqlConfig)
if g.checker != nil {
err = g.checker(ctx, req)
if err != nil {
return
}
}
return handler(ctx, req)
},
func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
if g.builder != nil {
req = g.builder(ctx, req)
}
return handler(ctx, req)
},
grpc_recovery.UnaryServerInterceptor(
grpc_recovery.WithRecoveryHandler(func(p interface{}) error {
logger.Critical(nil, "GRPC server recovery with error: %+v", p)
logger.Critical(nil, string(debug.Stack()))
if e, ok := p.(error); ok {
return gerr.NewWithDetail(nil, gerr.Internal, e, gerr.ErrorInternalError)
}
return gerr.New(nil, gerr.Internal, gerr.ErrorInternalError)
}),
),
),
grpc_middleware.WithStreamServerChain(
grpc_recovery.StreamServerInterceptor(
grpc_recovery.WithRecoveryHandler(func(p interface{}) error {
logger.Critical(nil, "GRPC server recovery with error: %+v", p)
logger.Critical(nil, string(debug.Stack()))
if e, ok := p.(error); ok {
return gerr.NewWithDetail(nil, gerr.Internal, e, gerr.ErrorInternalError)
}
return gerr.New(nil, gerr.Internal, gerr.ErrorInternalError)
}),
),
),
}
grpcServer := grpc.NewServer(append(opt, builtinOptions...)...)
reflection.Register(grpcServer)
callback(grpcServer)
if err = grpcServer.Serve(lis); err != nil {
err = errors.WithStack(err)
logger.Critical(nil, "%+v", err)
}
}
var (
jsonPbMarshaller = &jsonpb.Marshaler{
OrigName: true,
}
)
func (g *GrpcServer) unaryServerLogInterceptor() grpc.UnaryServerInterceptor {
showErrorCause := g.showErrorCause
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
var err error
s := ctxutil.GetSender(ctx)
requestId := ctxutil.GetRequestId(ctx)
ctx = ctxutil.SetRequestId(ctx, requestId)
ctx = ctxutil.ContextWithSender(ctx, s)
method := strings.Split(info.FullMethod, "/")
action := method[len(method)-1]
if p, ok := req.(proto.Message); ok {
if content, err := jsonPbMarshaller.MarshalToString(p); err != nil {
logger.Error(ctx, "Failed to marshal proto message to string [%s] [%+v] [%+v]", action, s, err)
} else {
logger.Info(ctx, "Request received [%s] [%+v] [%s]", action, s, content)
}
}
start := time.Now()
resp, err := handler(ctx, req)
elapsed := time.Since(start)
logger.Info(ctx, "Handled request [%s] [%+v] exec_time is [%s]", action, s, elapsed)
if e, ok := status.FromError(err); ok {
if e.Code() != codes.OK {
logger.Debug(ctx, "Response is error: %s, %s", e.Code().String(), e.Message())
if !showErrorCause {
err = gerr.ClearErrorCause(err)
}
}
}
return resp, err
}
}