forked from kubewharf/katalyst-core
317 lines
10 KiB
Go
317 lines
10 KiB
Go
/*
|
|
Copyright 2022 The Katalyst Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package target
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/labels"
|
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
|
"k8s.io/client-go/tools/cache"
|
|
"k8s.io/klog/v2"
|
|
|
|
configapis "github.com/kubewharf/katalyst-api/pkg/apis/config/v1alpha1"
|
|
configinformers "github.com/kubewharf/katalyst-api/pkg/client/informers/externalversions/config/v1alpha1"
|
|
kcclient "github.com/kubewharf/katalyst-core/pkg/client"
|
|
"github.com/kubewharf/katalyst-core/pkg/config/controller"
|
|
"github.com/kubewharf/katalyst-core/pkg/util/native"
|
|
)
|
|
|
|
type KatalystCustomConfigTargetHandler struct {
|
|
mu sync.RWMutex
|
|
|
|
ctx context.Context
|
|
client *kcclient.GenericClientSet
|
|
kccConfig *controller.KCCConfig
|
|
|
|
// map gvr to kcc key set; actually, it's invalid to hold more than one kcc for
|
|
//one individual gvr, and should be alerted
|
|
gvrKatalystCustomConfigMap map[metav1.GroupVersionResource]sets.String
|
|
// katalystCustomConfigGVRMap map kcc key to gvr; since the gvc in kcc may be unexpected changed
|
|
// by cases, store in cache to make sure we can still find ite original mapping
|
|
katalystCustomConfigGVRMap map[string]metav1.GroupVersionResource
|
|
// map gvr to kcc target accessor
|
|
targetAccessorMap map[metav1.GroupVersionResource]KatalystCustomConfigTargetAccessor
|
|
// targetHandlerFuncMap stores those handler functions for all controllers
|
|
// that are interested in kcc-target changes
|
|
targetHandlerFuncMap map[string]KatalystCustomConfigTargetHandlerFunc
|
|
}
|
|
|
|
func NewKatalystCustomConfigTargetHandler(ctx context.Context, client *kcclient.GenericClientSet, kccConfig *controller.KCCConfig,
|
|
katalystCustomConfigInformer configinformers.KatalystCustomConfigInformer) *KatalystCustomConfigTargetHandler {
|
|
k := &KatalystCustomConfigTargetHandler{
|
|
ctx: ctx,
|
|
client: client,
|
|
kccConfig: kccConfig,
|
|
gvrKatalystCustomConfigMap: make(map[metav1.GroupVersionResource]sets.String),
|
|
katalystCustomConfigGVRMap: make(map[string]metav1.GroupVersionResource),
|
|
targetHandlerFuncMap: make(map[string]KatalystCustomConfigTargetHandlerFunc),
|
|
targetAccessorMap: make(map[metav1.GroupVersionResource]KatalystCustomConfigTargetAccessor),
|
|
}
|
|
|
|
katalystCustomConfigInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
AddFunc: k.addKatalystCustomConfigEventHandle,
|
|
UpdateFunc: k.updateKatalystCustomConfigEventHandle,
|
|
DeleteFunc: k.deleteKatalystCustomConfigEventHandle,
|
|
})
|
|
return k
|
|
}
|
|
|
|
func (k *KatalystCustomConfigTargetHandler) Run() {
|
|
defer k.shutDown()
|
|
<-k.ctx.Done()
|
|
}
|
|
|
|
// RegisterTargetHandler is used to register handler functions for the given gvr
|
|
func (k *KatalystCustomConfigTargetHandler) RegisterTargetHandler(name string, handlerFunc KatalystCustomConfigTargetHandlerFunc) {
|
|
k.mu.Lock()
|
|
defer k.mu.Unlock()
|
|
|
|
k.targetHandlerFuncMap[name] = handlerFunc
|
|
}
|
|
|
|
// GetKCCKeyListByGVR get kcc keyList by gvr.
|
|
func (k *KatalystCustomConfigTargetHandler) GetKCCKeyListByGVR(gvr metav1.GroupVersionResource) []string {
|
|
k.mu.RLock()
|
|
defer k.mu.RUnlock()
|
|
|
|
kccKeys, ok := k.gvrKatalystCustomConfigMap[gvr]
|
|
if ok {
|
|
return kccKeys.List()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (k *KatalystCustomConfigTargetHandler) GetTargetAccessorByGVR(gvr metav1.GroupVersionResource) (KatalystCustomConfigTargetAccessor, bool) {
|
|
k.mu.RLock()
|
|
defer k.mu.RUnlock()
|
|
|
|
accessor, ok := k.targetAccessorMap[gvr]
|
|
if ok {
|
|
return accessor, true
|
|
}
|
|
return nil, false
|
|
}
|
|
|
|
// RangeGVRTargetAccessor is used to walk through all accessors and perform the given function
|
|
func (k *KatalystCustomConfigTargetHandler) RangeGVRTargetAccessor(f func(gvr metav1.GroupVersionResource, accessor KatalystCustomConfigTargetAccessor) bool) {
|
|
k.mu.RLock()
|
|
defer k.mu.RUnlock()
|
|
|
|
for gvr, a := range k.targetAccessorMap {
|
|
ret := f(gvr, a)
|
|
if !ret {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (k *KatalystCustomConfigTargetHandler) addKatalystCustomConfigEventHandle(obj interface{}) {
|
|
kcc, ok := obj.(*configapis.KatalystCustomConfig)
|
|
if !ok {
|
|
klog.Errorf("cannot convert obj to *KatalystCustomConfig: %v", obj)
|
|
return
|
|
}
|
|
|
|
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(kcc)
|
|
if err != nil {
|
|
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", kcc, err))
|
|
return
|
|
}
|
|
|
|
_, err = k.addOrUpdateGVRAndKCC(kcc.Spec.TargetType, key)
|
|
if err != nil {
|
|
klog.Errorf("cannot convert add or update gvr %s kcc %s: %v", kcc.Spec.TargetType, key, err)
|
|
return
|
|
}
|
|
}
|
|
|
|
func (k *KatalystCustomConfigTargetHandler) updateKatalystCustomConfigEventHandle(old interface{}, new interface{}) {
|
|
oldKCC, ok := old.(*configapis.KatalystCustomConfig)
|
|
if !ok {
|
|
klog.Errorf("cannot convert obj to *KatalystCustomConfig: %v", new)
|
|
return
|
|
}
|
|
|
|
newKCC, ok := new.(*configapis.KatalystCustomConfig)
|
|
if !ok {
|
|
klog.Errorf("cannot convert obj to *KatalystCustomConfig: %v", new)
|
|
return
|
|
}
|
|
|
|
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(newKCC)
|
|
if err != nil {
|
|
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", newKCC, err))
|
|
return
|
|
}
|
|
|
|
_, err = k.addOrUpdateGVRAndKCC(newKCC.Spec.TargetType, key)
|
|
if err != nil {
|
|
klog.Errorf("cannot convert add or update gvr %s kcc %s: %v", newKCC.Spec.TargetType, key, err)
|
|
return
|
|
}
|
|
|
|
// if kcc has updated, it needs trigger all related kcc target to reconcile
|
|
if newKCC.GetGeneration() == newKCC.Status.ObservedGeneration &&
|
|
oldKCC.Status.ObservedGeneration != newKCC.Status.ObservedGeneration {
|
|
accessor, ok := k.GetTargetAccessorByGVR(newKCC.Spec.TargetType)
|
|
if ok {
|
|
kccTargets, err := accessor.List(labels.Everything())
|
|
if err != nil {
|
|
klog.Errorf("list gvr %s kcc target failed: %v", newKCC.Spec.TargetType, err)
|
|
return
|
|
}
|
|
|
|
for _, target := range kccTargets {
|
|
accessor.Enqueue("", target)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (k *KatalystCustomConfigTargetHandler) deleteKatalystCustomConfigEventHandle(obj interface{}) {
|
|
kcc, ok := obj.(*configapis.KatalystCustomConfig)
|
|
if !ok {
|
|
klog.Errorf("cannot convert obj to *KatalystCustomConfig: %v", obj)
|
|
return
|
|
}
|
|
|
|
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(kcc)
|
|
if err != nil {
|
|
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", kcc, err))
|
|
return
|
|
}
|
|
|
|
k.deleteGVRAndKCCKey(kcc.Spec.TargetType, key)
|
|
|
|
// when some kcc of a gvr has been deleted, we need reconcile its kcc target immediately
|
|
accessor, ok := k.GetTargetAccessorByGVR(kcc.Spec.TargetType)
|
|
if ok {
|
|
kccTargets, err := accessor.List(labels.Everything())
|
|
if err != nil {
|
|
klog.Errorf("list gvr %s kcc target failed: %v", kcc.Spec.TargetType, err)
|
|
return
|
|
}
|
|
|
|
for _, target := range kccTargets {
|
|
accessor.Enqueue("", target)
|
|
}
|
|
}
|
|
}
|
|
|
|
// addOrUpdateGVRAndKCC add gvr and kcc key to cache and return current kcc keys which use this gvr.
|
|
func (k *KatalystCustomConfigTargetHandler) addOrUpdateGVRAndKCC(gvr metav1.GroupVersionResource, key string) (KatalystCustomConfigTargetAccessor, error) {
|
|
k.mu.Lock()
|
|
defer k.mu.Unlock()
|
|
old, ok := k.katalystCustomConfigGVRMap[key]
|
|
if ok && old != gvr {
|
|
k.deleteGVRAndKCCKeyWithoutLock(old, key)
|
|
}
|
|
|
|
return k.addGVRAndKCCKeyWithoutLock(gvr, key)
|
|
}
|
|
|
|
// deleteGVRAndKCCKey delete gvr and kcc key, return whether it is empty after delete that
|
|
func (k *KatalystCustomConfigTargetHandler) deleteGVRAndKCCKey(gvr metav1.GroupVersionResource, key string) {
|
|
k.mu.Lock()
|
|
defer k.mu.Unlock()
|
|
k.deleteGVRAndKCCKeyWithoutLock(gvr, key)
|
|
}
|
|
|
|
func (k *KatalystCustomConfigTargetHandler) deleteGVRAndKCCKeyWithoutLock(gvr metav1.GroupVersionResource, key string) {
|
|
kccKeys, ok := k.gvrKatalystCustomConfigMap[gvr]
|
|
if ok {
|
|
delete(kccKeys, key)
|
|
delete(k.katalystCustomConfigGVRMap, key)
|
|
}
|
|
|
|
if len(kccKeys) == 0 {
|
|
if accessor, ok := k.targetAccessorMap[gvr]; ok {
|
|
accessor.Stop()
|
|
}
|
|
delete(k.targetAccessorMap, gvr)
|
|
delete(k.gvrKatalystCustomConfigMap, gvr)
|
|
}
|
|
}
|
|
|
|
func (k *KatalystCustomConfigTargetHandler) addGVRAndKCCKeyWithoutLock(gvr metav1.GroupVersionResource, key string) (KatalystCustomConfigTargetAccessor, error) {
|
|
if err := k.checkGVRValid(gvr); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
_, ok := k.gvrKatalystCustomConfigMap[gvr]
|
|
if !ok {
|
|
_, ok := k.targetAccessorMap[gvr]
|
|
if !ok {
|
|
accessor, err := NewRealKatalystCustomConfigTargetAccessor(gvr,
|
|
k.client.DynamicClient, k.targetHandlerFuncMap)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
accessor.Start()
|
|
k.targetAccessorMap[gvr] = accessor
|
|
} else {
|
|
klog.Fatalf("gvr of targetAccessorMap %s not exist", gvr.String())
|
|
}
|
|
k.gvrKatalystCustomConfigMap[gvr] = sets.NewString()
|
|
}
|
|
k.gvrKatalystCustomConfigMap[gvr].Insert(key)
|
|
k.katalystCustomConfigGVRMap[key] = gvr
|
|
return k.targetAccessorMap[gvr], nil
|
|
}
|
|
|
|
// checkGVRValid is used to check whether the given gvr is valid, skip to create corresponding
|
|
// target accessor otherwise
|
|
func (k *KatalystCustomConfigTargetHandler) checkGVRValid(gvr metav1.GroupVersionResource) error {
|
|
if !k.kccConfig.ValidAPIGroupSet.Has(gvr.Group) {
|
|
return fmt.Errorf("gvr %s is not in valid api group set", gvr.String())
|
|
}
|
|
|
|
schemaGVR := native.ToSchemaGVR(gvr.Group, gvr.Version, gvr.Resource)
|
|
resourceList, err := k.client.DiscoveryClient.ServerResourcesForGroupVersion(schemaGVR.GroupVersion().String())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, resource := range resourceList.APIResources {
|
|
if resource.Name == gvr.Resource {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
return apierrors.NewNotFound(schemaGVR.GroupResource(), schemaGVR.Resource)
|
|
}
|
|
|
|
func (k *KatalystCustomConfigTargetHandler) shutDown() {
|
|
k.mu.Lock()
|
|
defer k.mu.Unlock()
|
|
|
|
for _, accessor := range k.targetAccessorMap {
|
|
accessor.Stop()
|
|
}
|
|
|
|
// clear all maps
|
|
k.targetAccessorMap = make(map[metav1.GroupVersionResource]KatalystCustomConfigTargetAccessor)
|
|
k.gvrKatalystCustomConfigMap = make(map[metav1.GroupVersionResource]sets.String)
|
|
k.katalystCustomConfigGVRMap = make(map[string]metav1.GroupVersionResource)
|
|
}
|