forked from kubewharf/katalyst-core
336 lines
11 KiB
Go
336 lines
11 KiB
Go
/*
|
|
Copyright 2022 The Katalyst Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package util
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
|
|
"github.com/pkg/errors"
|
|
core "k8s.io/api/core/v1"
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
|
"k8s.io/apimachinery/pkg/labels"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
corelisters "k8s.io/client-go/listers/core/v1"
|
|
"k8s.io/client-go/tools/cache"
|
|
"k8s.io/klog/v2"
|
|
|
|
apis "github.com/kubewharf/katalyst-api/pkg/apis/autoscaling/v1alpha1"
|
|
apiworkload "github.com/kubewharf/katalyst-api/pkg/apis/workload/v1alpha1"
|
|
workloadlister "github.com/kubewharf/katalyst-api/pkg/client/listers/workload/v1alpha1"
|
|
apiconsts "github.com/kubewharf/katalyst-api/pkg/consts"
|
|
"github.com/kubewharf/katalyst-core/pkg/consts"
|
|
"github.com/kubewharf/katalyst-core/pkg/util/general"
|
|
"github.com/kubewharf/katalyst-core/pkg/util/native"
|
|
)
|
|
|
|
const (
|
|
spdConfigHashLength = 12
|
|
)
|
|
|
|
/*
|
|
helper functions to get spd-related objects with indexed informer
|
|
*/
|
|
|
|
// GetSPDForWorkload is used to get spd that should manage the given workload
|
|
func getSPDFroWorkloadWithIndex(workload *unstructured.Unstructured, spdIndexer cache.Indexer) (*apiworkload.ServiceProfileDescriptor, error) {
|
|
objs, err := spdIndexer.ByIndex(consts.TargetReferenceIndex, generateWorkloadReferenceKey(workload))
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "spd for workload %s/%s not exist", workload.GetNamespace(), workload.GetName())
|
|
} else if len(objs) > 1 || len(objs) == 0 {
|
|
return nil, fmt.Errorf("spd for workload %s/%s invalid", workload.GetNamespace(), workload.GetName())
|
|
}
|
|
|
|
spd, ok := objs[0].(*apiworkload.ServiceProfileDescriptor)
|
|
if !ok {
|
|
return nil, fmt.Errorf("invalid spd")
|
|
}
|
|
return spd, nil
|
|
}
|
|
|
|
/*
|
|
helper functions to build indexed informer for spd-related objects
|
|
*/
|
|
|
|
// SPDTargetReferenceIndex is used to construct informer index for target reference in SPD
|
|
func SPDTargetReferenceIndex(obj interface{}) ([]string, error) {
|
|
spd, ok := obj.(*apiworkload.ServiceProfileDescriptor)
|
|
if !ok || spd == nil {
|
|
return nil, fmt.Errorf("failed to reflect a obj to spd")
|
|
}
|
|
return objectTargetReferenceIndex(spd.Spec.TargetRef)
|
|
}
|
|
|
|
/*
|
|
helper functions to get spd-related objects
|
|
*/
|
|
|
|
// GetWorkloadForSPD is used to get workload that should be managed the given spd
|
|
func GetWorkloadForSPD(spd *apiworkload.ServiceProfileDescriptor, lister cache.GenericLister) (runtime.Object, error) {
|
|
return lister.ByNamespace(spd.Namespace).Get(spd.Spec.TargetRef.Name)
|
|
}
|
|
|
|
// GetSPDForWorkload is used to get spd that should manage the given workload
|
|
// the preference is annotation ---> indexer --> lister
|
|
func GetSPDForWorkload(workload *unstructured.Unstructured, spdIndexer cache.Indexer,
|
|
spdLister workloadlister.ServiceProfileDescriptorLister) (*apiworkload.ServiceProfileDescriptor, error) {
|
|
if !CheckWorkloadSPDEnabled(workload) {
|
|
return nil, fmt.Errorf("workload not enable spd")
|
|
}
|
|
|
|
if spdName, ok := workload.GetAnnotations()[apiconsts.WorkloadAnnotationSPDNameKey]; ok {
|
|
spd, err := spdLister.ServiceProfileDescriptors(workload.GetNamespace()).Get(spdName)
|
|
if err == nil && checkTargetRefMatch(spd.Spec.TargetRef, workload) {
|
|
return spd, nil
|
|
}
|
|
return nil, apierrors.NewNotFound(apis.Resource(apiworkload.ResourceNameServiceProfileDescriptors), "matched target refer spd")
|
|
}
|
|
|
|
if spdIndexer != nil {
|
|
if spd, err := getSPDFroWorkloadWithIndex(workload, spdIndexer); err == nil {
|
|
return spd, nil
|
|
}
|
|
}
|
|
|
|
spdList, err := spdLister.List(labels.Everything())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, spd := range spdList {
|
|
if checkTargetRefMatch(spd.Spec.TargetRef, workload) {
|
|
return spd, nil
|
|
}
|
|
}
|
|
return nil, apierrors.NewNotFound(apiworkload.Resource(apiworkload.ResourceNameServiceProfileDescriptors), "spd for workload")
|
|
}
|
|
|
|
// GetSPDForPod is used to get spd that should manage the given vpa,
|
|
// we'll try to find by annotation for pod, and then go through workload if not exist,
|
|
// and we will find it recursively since we don't know in which level the owner will be.
|
|
func GetSPDForPod(pod *core.Pod, spdIndexer cache.Indexer, workloadListerMap map[schema.GroupVersionKind]cache.GenericLister,
|
|
spdLister workloadlister.ServiceProfileDescriptorLister) (*apiworkload.ServiceProfileDescriptor, error) {
|
|
// different with vpa, we will store spd name in pod name, so we will check whether it's still valid
|
|
if spdName, ok := pod.GetAnnotations()[apiconsts.PodAnnotationSPDNameKey]; ok {
|
|
spd, err := spdLister.ServiceProfileDescriptors(pod.GetNamespace()).Get(spdName)
|
|
if err == nil && CheckSPDMatchWithPod(pod, spd, workloadListerMap) {
|
|
return spd, nil
|
|
}
|
|
}
|
|
|
|
for _, owner := range pod.GetOwnerReferences() {
|
|
gvk := schema.FromAPIVersionAndKind(owner.APIVersion, owner.Kind)
|
|
if _, ok := workloadListerMap[gvk]; ok {
|
|
if workloadObj, err := workloadListerMap[gvk].ByNamespace(pod.GetNamespace()).Get(owner.Name); err == nil {
|
|
var targetSPD *apiworkload.ServiceProfileDescriptor
|
|
native.VisitUnstructuredAncestors(workloadObj.(*unstructured.Unstructured),
|
|
workloadListerMap, func(owner *unstructured.Unstructured) bool {
|
|
spd, err := GetSPDForWorkload(owner, spdIndexer, spdLister)
|
|
if err != nil {
|
|
return true
|
|
}
|
|
|
|
targetSPD = spd
|
|
return false
|
|
})
|
|
if targetSPD != nil {
|
|
return targetSPD, nil
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
spdList, err := spdLister.List(labels.Everything())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, spd := range spdList {
|
|
if CheckSPDMatchWithPod(pod, spd, workloadListerMap) {
|
|
return spd, nil
|
|
}
|
|
}
|
|
return nil, apierrors.NewNotFound(apiworkload.Resource(apiworkload.ResourceNameServiceProfileDescriptors), "spd for pod")
|
|
}
|
|
|
|
// GetPodListForSPD is used to get pods that should be managed by the given spd,
|
|
// we'll always get through workload
|
|
func GetPodListForSPD(spd *apiworkload.ServiceProfileDescriptor, podIndexer cache.Indexer, podLabelIndexKeyList []string,
|
|
workloadLister cache.GenericLister, podLister corelisters.PodLister) ([]*core.Pod, error) {
|
|
workloadObj, err := GetWorkloadForSPD(spd, workloadLister)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return native.GetPodListForWorkload(workloadObj, podIndexer, podLabelIndexKeyList, podLister)
|
|
}
|
|
|
|
/*
|
|
helper functions to do validation works
|
|
*/
|
|
|
|
// CheckWorkloadSPDEnabled checks if the given workload is enabled with service profiling.
|
|
func CheckWorkloadSPDEnabled(workload metav1.Object) bool {
|
|
return workload.GetAnnotations()[apiconsts.WorkloadAnnotationSPDEnableKey] == apiconsts.WorkloadAnnotationSPDEnabled
|
|
}
|
|
|
|
// CheckSPDMatchWithPod checks whether the given pod and spd matches with each other
|
|
func CheckSPDMatchWithPod(pod *core.Pod, spd *apiworkload.ServiceProfileDescriptor, workloadListerMap map[schema.GroupVersionKind]cache.GenericLister) bool {
|
|
gvk := schema.FromAPIVersionAndKind(spd.Spec.TargetRef.APIVersion, spd.Spec.TargetRef.Kind)
|
|
if _, ok := workloadListerMap[gvk]; !ok {
|
|
return false
|
|
}
|
|
|
|
workloadObj, err := GetWorkloadForSPD(spd, workloadListerMap[gvk])
|
|
if err != nil {
|
|
klog.Errorf("failed to get workload for spd %v: %v", spd.Name, err)
|
|
return false
|
|
}
|
|
|
|
if !CheckWorkloadSPDEnabled(workloadObj.(metav1.Object)) {
|
|
return false
|
|
}
|
|
|
|
selector, err := native.GetUnstructuredSelector(workloadObj.(*unstructured.Unstructured))
|
|
if err != nil || selector == nil {
|
|
klog.Errorf("failed to get workload selector %v: %v", workloadObj, err)
|
|
return false
|
|
}
|
|
|
|
return selector.Matches(labels.Set(pod.Labels))
|
|
}
|
|
|
|
/*
|
|
helper functions to update spd info in-place
|
|
*/
|
|
|
|
func InsertSPDBusinessIndicatorSpec(spec *apiworkload.ServiceProfileDescriptorSpec,
|
|
serviceBusinessIndicatorSpec *apiworkload.ServiceBusinessIndicatorSpec) {
|
|
if spec == nil || serviceBusinessIndicatorSpec == nil {
|
|
return
|
|
}
|
|
|
|
if spec.BusinessIndicator == nil {
|
|
spec.BusinessIndicator = []apiworkload.ServiceBusinessIndicatorSpec{}
|
|
}
|
|
|
|
for i := range spec.BusinessIndicator {
|
|
if spec.BusinessIndicator[i].Name == serviceBusinessIndicatorSpec.Name {
|
|
spec.BusinessIndicator[i].Indicators = serviceBusinessIndicatorSpec.Indicators
|
|
return
|
|
}
|
|
}
|
|
spec.BusinessIndicator = append(spec.BusinessIndicator, *serviceBusinessIndicatorSpec)
|
|
}
|
|
|
|
func InsertSPDSystemIndicatorSpec(spec *apiworkload.ServiceProfileDescriptorSpec,
|
|
serviceSystemIndicatorSpec *apiworkload.ServiceSystemIndicatorSpec) {
|
|
if spec == nil || serviceSystemIndicatorSpec == nil {
|
|
return
|
|
}
|
|
|
|
if spec.SystemIndicator == nil {
|
|
spec.SystemIndicator = []apiworkload.ServiceSystemIndicatorSpec{}
|
|
}
|
|
|
|
for i := range spec.SystemIndicator {
|
|
if spec.SystemIndicator[i].Name == serviceSystemIndicatorSpec.Name {
|
|
spec.SystemIndicator[i].Indicators = serviceSystemIndicatorSpec.Indicators
|
|
return
|
|
}
|
|
}
|
|
spec.SystemIndicator = append(spec.SystemIndicator, *serviceSystemIndicatorSpec)
|
|
}
|
|
|
|
func InsertSPDBusinessIndicatorStatus(status *apiworkload.ServiceProfileDescriptorStatus,
|
|
serviceBusinessIndicatorStatus *apiworkload.ServiceBusinessIndicatorStatus) {
|
|
if status == nil || serviceBusinessIndicatorStatus == nil {
|
|
return
|
|
}
|
|
|
|
if status.BusinessStatus == nil {
|
|
status.BusinessStatus = []apiworkload.ServiceBusinessIndicatorStatus{}
|
|
}
|
|
|
|
for i := range status.BusinessStatus {
|
|
if status.BusinessStatus[i].Name == serviceBusinessIndicatorStatus.Name {
|
|
status.BusinessStatus[i].Current = serviceBusinessIndicatorStatus.Current
|
|
return
|
|
}
|
|
}
|
|
status.BusinessStatus = append(status.BusinessStatus, *serviceBusinessIndicatorStatus)
|
|
}
|
|
|
|
/*
|
|
helper functions to get the spd hash and the pod's spd name
|
|
*/
|
|
|
|
// GetSPDHash get spd hash from spd annotation
|
|
func GetSPDHash(spd *apiworkload.ServiceProfileDescriptor) string {
|
|
if spd == nil || spd.Annotations == nil {
|
|
return ""
|
|
}
|
|
return spd.Annotations[consts.ServiceProfileDescriptorAnnotationKeyConfigHash]
|
|
}
|
|
|
|
// SetSPDHash set spd hash to spd annotation
|
|
func SetSPDHash(spd *apiworkload.ServiceProfileDescriptor, hash string) {
|
|
if spd == nil {
|
|
return
|
|
}
|
|
|
|
if spd.Annotations == nil {
|
|
spd.Annotations = map[string]string{}
|
|
}
|
|
|
|
spd.Annotations[consts.ServiceProfileDescriptorAnnotationKeyConfigHash] = hash
|
|
}
|
|
|
|
// CalculateSPDHash calculate current spd hash by its spec and status
|
|
func CalculateSPDHash(spd *apiworkload.ServiceProfileDescriptor) (string, error) {
|
|
if spd == nil {
|
|
return "", fmt.Errorf("spd is nil")
|
|
}
|
|
|
|
spdCopy := &apiworkload.ServiceProfileDescriptor{}
|
|
spdCopy.Spec = spd.Spec
|
|
spdCopy.Status = spd.Status
|
|
data, err := json.Marshal(spdCopy)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return general.GenerateHash(data, spdConfigHashLength), nil
|
|
}
|
|
|
|
// GetPodSPDName gets spd name from pod annotation
|
|
func GetPodSPDName(pod *core.Pod) (string, error) {
|
|
if pod == nil {
|
|
return "", fmt.Errorf("pod is nil")
|
|
}
|
|
|
|
spdName, ok := pod.GetAnnotations()[apiconsts.PodAnnotationSPDNameKey]
|
|
if !ok {
|
|
return "", fmt.Errorf("pod without spd annotation")
|
|
}
|
|
|
|
return spdName, nil
|
|
}
|