ResourceGetter v1beta1 (#5416)

* add resource getter & reader

Signed-off-by: Wenhao Zhou <wenhaozhou@yunify.com>

* add resource v1beta1 handler

* delete gvrToGvk map instead of using the dynamicRESTMapper for getting gvk, and rename the ResourceLister to ResourceGetter

* add unregisteredMiddleware filter

Signed-off-by: wenhaozhou <wenhaozhou@yunify.com>

* add secret contains benchmark & add fieldSelector to resourcev1beta1

Signed-off-by: Wenhao Zhou <wenhaozhou@yunify.com>

* delete crds models

Signed-off-by: wenhaozhou <wenhaozhou@yunify.com>

* delete parameterExtractor and instead of requestInfo

Signed-off-by: Wenhao Zhou <wenhaozhou@yunify.com>

* add benchmark test

* move fieldSelector to DefaultObjectMetaFilter

Signed-off-by: wenhaozhou <wenhaozhou@yunify.com>

* move fieldSelector to DefaultObjectMetaFilter

* change registeredGv type to set

Signed-off-by: wenhaozhou <wenhaozhou@yunify.com>

* update filter chains

Signed-off-by: wenhaozhou <wenhaozhou@yunify.com>

* fix fieldSelector cannot work

Signed-off-by: wenhaozhou <wenhaozhou@yunify.com>

* fix: list known type do not need served label

Signed-off-by: wenhaozhou <wenhaozhou@yunify.com>

---------

Signed-off-by: Wenhao Zhou <wenhaozhou@yunify.com>
Signed-off-by: wenhaozhou <wenhaozhou@yunify.com>
This commit is contained in:
Wenhao Zhou
2023-02-08 15:00:15 +08:00
committed by GitHub
parent 1c49fcd57e
commit 23df7b051b
14 changed files with 566 additions and 804 deletions

View File

@@ -0,0 +1,119 @@
package secret
import (
"testing"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/rand"
"kubesphere.io/kubesphere/pkg/apiserver/query"
"kubesphere.io/kubesphere/pkg/models/resources/v1alpha3"
)
var testSecret = &v1.Secret{
TypeMeta: metav1.TypeMeta{
Kind: "Secret",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "prometheus-k8s",
Namespace: "kube-system",
ResourceVersion: "1234567",
Labels: map[string]string{
"modifiedAt": "1670227209",
"name": "snapshot-controller",
"owner": "helm",
"status": "superseded",
"version": "2",
},
},
Data: map[string][]byte{
"testdata": []byte("thisisatestsecret"),
},
Type: "helm.sh/release.v1",
}
func BenchmarkContains(b *testing.B) {
for i := 0; i < b.N; i++ {
if contains(testSecret, "metadata.labels.status!=superseded") {
b.Error("test failed")
}
}
}
func BenchmarkDefaultListWith1000(b *testing.B) {
s := &secretSearcher{}
q := query.New()
q.Filters[query.ParameterFieldSelector] = "metadata.resourceVersion=1234567"
expectedListCount := rand.Intn(20)
list := prepareList(testSecret, 1000, expectedListCount)
for i := 0; i < b.N; i++ {
list := v1alpha3.DefaultList(list, q, s.compare, s.filter)
if list.TotalItems != expectedListCount {
b.Error("test failed")
}
}
}
func BenchmarkDefaultListWith5000(b *testing.B) {
s := &secretSearcher{}
q := query.New()
q.Filters[query.ParameterFieldSelector] = "metadata.resourceVersion=1234567"
expectedListCount := rand.Intn(20)
list := prepareList(testSecret, 5000, expectedListCount)
for i := 0; i < b.N; i++ {
list := v1alpha3.DefaultList(list, q, s.compare, s.filter)
if list.TotalItems != expectedListCount {
b.Error("test failed")
}
}
}
func BenchmarkDefaultListWith10000(b *testing.B) {
s := &secretSearcher{}
q := query.New()
q.Filters[query.ParameterFieldSelector] = "metadata.resourceVersion=1234567"
expectedListCount := rand.Intn(20)
list := prepareList(testSecret, 100000, expectedListCount)
for i := 0; i < b.N; i++ {
list := v1alpha3.DefaultList(list, q, s.compare, s.filter)
if list.TotalItems != expectedListCount {
b.Error("test failed")
}
}
}
func BenchmarkDefaultListWith50000(b *testing.B) {
s := &secretSearcher{}
q := query.New()
q.Filters[query.ParameterFieldSelector] = "metadata.resourceVersion=1234567"
expectedListCount := rand.Intn(20)
for i := 0; i < b.N; i++ {
list := v1alpha3.DefaultList(prepareList(testSecret, 50000, expectedListCount), q, s.compare, s.filter)
if list.TotalItems != expectedListCount {
b.Error("test failed")
}
}
}
func prepareList(testSecret *v1.Secret, listLen, expected int) []runtime.Object {
secretList := make([]runtime.Object, listLen)
for i := 0; i < listLen; i++ {
secret := testSecret.DeepCopy()
secret.Name = rand.String(20)
secret.ObjectMeta.ResourceVersion = rand.String(10)
secretList[i] = secret
}
for i := 0; i < expected; i++ {
secretList[rand.Intn(listLen-1)] = testSecret
}
return secretList
}

View File

@@ -0,0 +1,66 @@
package v1beta1
import (
"context"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"kubesphere.io/kubesphere/pkg/apiserver/query"
)
type resourceCache struct {
cache cache.Cache
}
func NewResourceCache(cache cache.Cache) Interface {
return &resourceCache{cache: cache}
}
func (u *resourceCache) Get(name, namespace string, object client.Object) error {
return u.cache.Get(context.Background(), client.ObjectKey{Namespace: namespace, Name: name}, object)
}
func (u *resourceCache) List(namespace string, query *query.Query, list client.ObjectList) error {
listOpt := &client.ListOptions{
LabelSelector: query.Selector(),
Namespace: namespace,
}
err := u.cache.List(context.Background(), list, listOpt)
if err != nil {
return err
}
extractList, err := meta.ExtractList(list)
if err != nil {
return err
}
filtered := DefaultList(extractList, query, compare, filter)
if err := meta.SetList(list, filtered); err != nil {
return err
}
return nil
}
func compare(left, right runtime.Object, field query.Field) bool {
l, err := meta.Accessor(left)
if err != nil {
return false
}
r, err := meta.Accessor(right)
if err != nil {
return false
}
return DefaultObjectMetaCompare(l, r, field)
}
func filter(object runtime.Object, filter query.Filter) bool {
o, err := meta.Accessor(object)
if err != nil {
return false
}
return DefaultObjectMetaFilter(o, filter)
}

View File

@@ -0,0 +1,226 @@
package v1beta1
import (
"encoding/json"
"fmt"
"sort"
"strings"
"github.com/oliveagle/jsonpath"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"kubesphere.io/kubesphere/pkg/apiserver/query"
)
type Interface interface {
// Get retrieves a single object by its namespace and name
Get(namespace, name string, object client.Object) error
// List retrieves a collection of objects matches given query
List(namespace string, query *query.Query, object client.ObjectList) error
}
type ResourceGetter interface {
GetResource(schema.GroupVersionResource, string, string) (client.Object, error)
ListResources(schema.GroupVersionResource, string, *query.Query) (client.ObjectList, error)
}
// CompareFunc return true is left great than right
type CompareFunc func(runtime.Object, runtime.Object, query.Field) bool
type FilterFunc func(runtime.Object, query.Filter) bool
type TransformFunc func(runtime.Object) runtime.Object
func DefaultList(objects []runtime.Object, q *query.Query, compareFunc CompareFunc, filterFunc FilterFunc, transformFuncs ...TransformFunc) []runtime.Object {
// selected matched ones
var filtered []runtime.Object
if len(q.Filters) != 0 {
for _, object := range objects {
selected := true
for field, value := range q.Filters {
if !filterFunc(object, query.Filter{Field: field, Value: value}) {
selected = false
break
}
}
if selected {
for _, transform := range transformFuncs {
object = transform(object)
}
filtered = append(filtered, object)
}
}
}
// sort by sortBy field
sort.Slice(filtered, func(i, j int) bool {
if !q.Ascending {
return compareFunc(filtered[i], filtered[j], q.SortBy)
}
return !compareFunc(filtered[i], filtered[j], q.SortBy)
})
total := len(filtered)
if q.Pagination == nil {
q.Pagination = query.NoPagination
}
start, end := q.Pagination.GetValidPagination(total)
return filtered[start:end]
}
// DefaultObjectMetaCompare return true is left great than right
func DefaultObjectMetaCompare(left, right metav1.Object, sortBy query.Field) bool {
switch sortBy {
// ?sortBy=name
case query.FieldName:
return strings.Compare(left.GetName(), right.GetName()) > 0
// ?sortBy=creationTimestamp
default:
fallthrough
case query.FieldCreateTime:
fallthrough
case query.FieldCreationTimeStamp:
// compare by name if creation timestamp is equal
ltime := left.GetCreationTimestamp()
rtime := right.GetCreationTimestamp()
if ltime.Equal(&rtime) {
return strings.Compare(left.GetName(), right.GetName()) > 0
}
return left.GetCreationTimestamp().After(right.GetCreationTimestamp().Time)
}
}
// Default metadata filter
func DefaultObjectMetaFilter(item metav1.Object, filter query.Filter) bool {
switch filter.Field {
case query.FieldNames:
for _, name := range strings.Split(string(filter.Value), ",") {
if item.GetName() == name {
return true
}
}
return false
// /namespaces?page=1&limit=10&name=default
case query.FieldName:
return strings.Contains(item.GetName(), string(filter.Value))
// /namespaces?page=1&limit=10&uid=a8a8d6cf-f6a5-4fea-9c1b-e57610115706
case query.FieldUID:
return strings.Compare(string(item.GetUID()), string(filter.Value)) == 0
// /deployments?page=1&limit=10&namespace=kubesphere-system
case query.FieldNamespace:
return strings.Compare(item.GetNamespace(), string(filter.Value)) == 0
// /namespaces?page=1&limit=10&ownerReference=a8a8d6cf-f6a5-4fea-9c1b-e57610115706
case query.FieldOwnerReference:
for _, ownerReference := range item.GetOwnerReferences() {
if strings.Compare(string(ownerReference.UID), string(filter.Value)) == 0 {
return true
}
}
return false
// /namespaces?page=1&limit=10&ownerKind=Workspace
case query.FieldOwnerKind:
for _, ownerReference := range item.GetOwnerReferences() {
if strings.Compare(ownerReference.Kind, string(filter.Value)) == 0 {
return true
}
}
return false
// /namespaces?page=1&limit=10&annotation=openpitrix_runtime
case query.FieldAnnotation:
return labelMatch(item.GetAnnotations(), string(filter.Value))
// /namespaces?page=1&limit=10&label=kubesphere.io/workspace:system-workspace
case query.FieldLabel:
return labelMatch(item.GetLabels(), string(filter.Value))
case query.ParameterFieldSelector:
return contains(item.(runtime.Object), filter.Value)
default:
return false
}
}
func labelMatch(labels map[string]string, filter string) bool {
fields := strings.SplitN(filter, "=", 2)
var key, value string
var opposite bool
if len(fields) == 2 {
key = fields[0]
if strings.HasSuffix(key, "!") {
key = strings.TrimSuffix(key, "!")
opposite = true
}
value = fields[1]
} else {
key = fields[0]
value = "*"
}
for k, v := range labels {
if opposite {
if (k == key) && v != value {
return true
}
} else {
if (k == key) && (value == "*" || v == value) {
return true
}
}
}
return false
}
// implement a generic query filter to support multiple field selectors with "jsonpath.JsonPathLookup"
// https://github.com/oliveagle/jsonpath/blob/master/readme.md
func contains(object runtime.Object, queryValue query.Value) bool {
// call the ParseSelector function of "k8s.io/apimachinery/pkg/fields/selector.go" to validate and parse the selector
fieldSelector, err := fields.ParseSelector(string(queryValue))
if err != nil {
klog.V(4).Infof("failed parse selector error: %s", err)
return false
}
for _, requirement := range fieldSelector.Requirements() {
var negative bool
// supports '=', '==' and '!='.(e.g. ?fieldSelector=key1=value1,key2=value2)
// fields.ParseSelector(FieldSelector) has handled the case where the operator is '==' and converted it to '=',
// so case selection.DoubleEquals can be ignored here.
switch requirement.Operator {
case selection.NotEquals:
negative = true
case selection.Equals:
negative = false
}
key := requirement.Field
value := requirement.Value
var input map[string]interface{}
data, err := json.Marshal(object)
if err != nil {
klog.V(4).Infof("failed marshal to JSON string: %s", err)
return false
}
if err = json.Unmarshal(data, &input); err != nil {
klog.V(4).Infof("failed unmarshal to map object: %s", err)
return false
}
rawValue, err := jsonpath.JsonPathLookup(input, "$."+key)
if err != nil {
klog.V(4).Infof("failed to lookup jsonpath: %s", err)
return false
}
if (negative && fmt.Sprintf("%v", rawValue) != value) || (!negative && fmt.Sprintf("%v", rawValue) == value) {
continue
} else {
return false
}
}
return true
}

View File

@@ -0,0 +1,150 @@
package v1beta1
import (
"context"
"errors"
"strings"
"sync"
"sigs.k8s.io/controller-runtime/pkg/cache"
"kubesphere.io/kubesphere/pkg/apiserver/query"
extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client"
)
var ErrResourceNotSupported = errors.New("resource is not supported")
var ErrResourceNotServed = errors.New("resource is not served")
const labelResourceServed = "kubesphere.io/resource-served"
// TODO If delete the crd at the cluster when ks is running, the client.cache doesn`t return err but empty result
func New(client client.Client, cache cache.Cache) ResourceGetter {
return &resourceGetter{
client: client,
cache: NewResourceCache(cache),
serveCRD: make(map[string]bool, 0),
}
}
type resourceGetter struct {
client client.Client
cache Interface
serveCRD map[string]bool
sync.RWMutex
}
func (h *resourceGetter) GetResource(gvr schema.GroupVersionResource, name, namespace string) (client.Object, error) {
var obj client.Object
gvk, err := h.getGVK(gvr)
if err != nil {
return nil, err
}
if h.client.Scheme().Recognizes(gvk) {
gvkObject, err := h.client.Scheme().New(gvk)
if err != nil {
return nil, err
}
obj = gvkObject.(client.Object)
} else {
serviced, err := h.isServed(gvr)
if err != nil {
return nil, err
}
if !serviced {
return nil, ErrResourceNotServed
}
u := &unstructured.Unstructured{}
u.SetGroupVersionKind(gvk)
obj = u
}
if err := h.cache.Get(name, namespace, obj); err != nil {
return nil, err
}
return obj, nil
}
func (h *resourceGetter) ListResources(gvr schema.GroupVersionResource, namespace string, query *query.Query) (client.ObjectList, error) {
var obj client.ObjectList
gvk, err := h.getGVK(gvr)
if err != nil {
return nil, err
}
gvk = convertGVKToList(gvk)
if h.client.Scheme().Recognizes(gvk) {
gvkObject, err := h.client.Scheme().New(gvk)
if err != nil {
return nil, err
}
obj = gvkObject.(client.ObjectList)
} else {
serviced, err := h.isServed(gvr)
if err != nil {
return nil, err
}
if !serviced {
return nil, ErrResourceNotServed
}
u := &unstructured.UnstructuredList{}
u.SetGroupVersionKind(gvk)
obj = u
}
if err := h.cache.List(namespace, query, obj); err != nil {
return nil, err
}
return obj, nil
}
func convertGVKToList(gvk schema.GroupVersionKind) schema.GroupVersionKind {
if strings.HasSuffix(gvk.Kind, "List") {
return gvk
}
gvk.Kind = gvk.Kind + "List"
return gvk
}
func (h *resourceGetter) getGVK(gvr schema.GroupVersionResource) (schema.GroupVersionKind, error) {
var (
gvk schema.GroupVersionKind
err error
)
gvk, err = h.client.RESTMapper().KindFor(gvr)
if err != nil {
return gvk, err
}
return gvk, nil
}
func (h *resourceGetter) isServed(gvr schema.GroupVersionResource) (bool, error) {
resourceName := gvr.Resource + "." + gvr.Group
h.RWMutex.RLock()
isServed := h.serveCRD[resourceName]
h.RWMutex.RUnlock()
if isServed {
return true, nil
}
crd := &extv1.CustomResourceDefinition{}
err := h.client.Get(context.Background(), client.ObjectKey{Name: resourceName}, crd)
if err != nil {
return false, err
}
if crd.Labels[labelResourceServed] == "true" {
h.RWMutex.Lock()
h.serveCRD[resourceName] = true
h.RWMutex.Unlock()
return true, nil
}
return false, nil
}