forked from kubewharf/katalyst-core
413 lines
14 KiB
Go
413 lines
14 KiB
Go
/*
|
|
Copyright 2022 The Katalyst Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package vpa
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"k8s.io/apimachinery/pkg/api/errors"
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/labels"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
coreListers "k8s.io/client-go/listers/core/v1"
|
|
"k8s.io/client-go/tools/cache"
|
|
"k8s.io/client-go/util/workqueue"
|
|
"k8s.io/klog/v2"
|
|
|
|
apis "github.com/kubewharf/katalyst-api/pkg/apis/autoscaling/v1alpha1"
|
|
autoscalelister "github.com/kubewharf/katalyst-api/pkg/client/listers/autoscaling/v1alpha1"
|
|
workloadlister "github.com/kubewharf/katalyst-api/pkg/client/listers/workload/v1alpha1"
|
|
katalystbase "github.com/kubewharf/katalyst-core/cmd/base"
|
|
"github.com/kubewharf/katalyst-core/pkg/client/control"
|
|
"github.com/kubewharf/katalyst-core/pkg/config/controller"
|
|
"github.com/kubewharf/katalyst-core/pkg/config/generic"
|
|
"github.com/kubewharf/katalyst-core/pkg/consts"
|
|
"github.com/kubewharf/katalyst-core/pkg/controller/vpa/algorithm"
|
|
"github.com/kubewharf/katalyst-core/pkg/controller/vpa/algorithm/recommenders"
|
|
"github.com/kubewharf/katalyst-core/pkg/metrics"
|
|
katalystutil "github.com/kubewharf/katalyst-core/pkg/util"
|
|
"github.com/kubewharf/katalyst-core/pkg/util/general"
|
|
"github.com/kubewharf/katalyst-core/pkg/util/native"
|
|
)
|
|
|
|
const resourceRecommendControllerName = "resourceRecommend"
|
|
|
|
const metricNameRecommendControlVPASyncCosts = "res_rec_vpa_sync_costs"
|
|
|
|
// rs stores all the in-tree recommendation algorithm implementations
|
|
var rs = []algorithm.ResourceRecommender{
|
|
recommenders.NewCPURecommender(),
|
|
}
|
|
|
|
func init() {
|
|
for _, r := range rs {
|
|
algorithm.RegisterRecommender(r)
|
|
}
|
|
}
|
|
|
|
// ResourceRecommendController is responsible to use in-tree algorithm implementations
|
|
// to export those recommended results to vpa-rec according to vpa config.
|
|
//
|
|
// although we use informer index mechanism to speed up the looking
|
|
// efficiency, we can't assume that all function callers MUST use an
|
|
// indexed informer to look up objects.
|
|
type ResourceRecommendController struct {
|
|
ctx context.Context
|
|
conf *controller.VPAConfig
|
|
|
|
vpaUpdater control.VPAUpdater
|
|
vpaRecUpdater control.VPARecommendationUpdater
|
|
|
|
spdIndexer cache.Indexer
|
|
vpaRecIndexer cache.Indexer
|
|
podIndexer cache.Indexer
|
|
|
|
podLister coreListers.PodLister
|
|
spdLister workloadlister.ServiceProfileDescriptorLister
|
|
vpaLister autoscalelister.KatalystVerticalPodAutoscalerLister
|
|
vpaRecLister autoscalelister.VerticalPodAutoscalerRecommendationLister
|
|
workloadLister map[schema.GroupVersionKind]cache.GenericLister
|
|
|
|
syncedFunc []cache.InformerSynced
|
|
vpaQueue workqueue.RateLimitingInterface
|
|
|
|
metricsEmitter metrics.MetricEmitter
|
|
|
|
vpaSyncWorkers int
|
|
}
|
|
|
|
func NewResourceRecommendController(ctx context.Context, controlCtx *katalystbase.GenericContext,
|
|
genericConf *generic.GenericConfiguration, _ *controller.GenericControllerConfiguration,
|
|
config *controller.VPAConfig) (*ResourceRecommendController, error) {
|
|
if controlCtx == nil {
|
|
return nil, fmt.Errorf("controlCtx is invalid")
|
|
}
|
|
|
|
podInformer := controlCtx.KubeInformerFactory.Core().V1().Pods()
|
|
spdInformer := controlCtx.InternalInformerFactory.Workload().V1alpha1().ServiceProfileDescriptors()
|
|
vpaInformer := controlCtx.InternalInformerFactory.Autoscaling().V1alpha1().KatalystVerticalPodAutoscalers()
|
|
vpaRecInformer := controlCtx.InternalInformerFactory.Autoscaling().V1alpha1().VerticalPodAutoscalerRecommendations()
|
|
|
|
genericClient := controlCtx.Client
|
|
recController := &ResourceRecommendController{
|
|
ctx: ctx,
|
|
conf: config,
|
|
vpaUpdater: &control.DummyVPAUpdater{},
|
|
vpaRecUpdater: &control.DummyVPARecommendationUpdater{},
|
|
spdIndexer: spdInformer.Informer().GetIndexer(),
|
|
vpaRecIndexer: vpaRecInformer.Informer().GetIndexer(),
|
|
podIndexer: podInformer.Informer().GetIndexer(),
|
|
podLister: podInformer.Lister(),
|
|
spdLister: spdInformer.Lister(),
|
|
vpaLister: vpaInformer.Lister(),
|
|
vpaRecLister: vpaRecInformer.Lister(),
|
|
workloadLister: make(map[schema.GroupVersionKind]cache.GenericLister),
|
|
vpaQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "vpa"),
|
|
syncedFunc: []cache.InformerSynced{
|
|
podInformer.Informer().HasSynced,
|
|
spdInformer.Informer().HasSynced,
|
|
vpaInformer.Informer().HasSynced,
|
|
vpaRecInformer.Informer().HasSynced,
|
|
},
|
|
vpaSyncWorkers: config.VPASyncWorkers,
|
|
}
|
|
|
|
for _, wf := range controlCtx.DynamicResourcesManager.GetDynamicInformers() {
|
|
recController.workloadLister[wf.GVK] = wf.Informer.Lister()
|
|
recController.syncedFunc = append(recController.syncedFunc, wf.Informer.Informer().HasSynced)
|
|
}
|
|
|
|
klog.Infof("vpa resync period %v", config.VPAReSyncPeriod)
|
|
|
|
vpaInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
|
|
AddFunc: recController.addVPA,
|
|
UpdateFunc: recController.updateVPA,
|
|
}, config.VPAReSyncPeriod)
|
|
|
|
// build index: workload ---> spd
|
|
if _, ok := spdInformer.Informer().GetIndexer().GetIndexers()[consts.TargetReferenceIndex]; !ok {
|
|
err := spdInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{
|
|
consts.TargetReferenceIndex: katalystutil.SPDTargetReferenceIndex,
|
|
})
|
|
if err != nil {
|
|
klog.Errorf("failed to add spd target reference index: %v", err)
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// build index: vpa ---> vpaRec
|
|
if _, ok := vpaRecInformer.Informer().GetIndexer().GetIndexers()[consts.OwnerReferenceIndex]; !ok {
|
|
err := vpaRecInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{
|
|
consts.OwnerReferenceIndex: native.ObjectOwnerReferenceIndex,
|
|
})
|
|
if err != nil {
|
|
klog.Errorf("[vpa-rec] failed to add owner vpa index: %v", err)
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// build index: workload ---> pod
|
|
for _, key := range config.VPAPodLabelIndexerKeys {
|
|
indexer := native.PodLabelIndexer(key)
|
|
if _, ok := recController.podIndexer.GetIndexers()[key]; !ok {
|
|
err := recController.podIndexer.AddIndexers(cache.Indexers{
|
|
key: indexer.IndexFunc,
|
|
})
|
|
if err != nil {
|
|
klog.Errorf("[vpa-rec] failed to add label index for pod: %v", err)
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
|
|
recController.metricsEmitter = controlCtx.EmitterPool.GetDefaultMetricsEmitter()
|
|
if recController.metricsEmitter == nil {
|
|
recController.metricsEmitter = metrics.DummyMetrics{}
|
|
}
|
|
|
|
if !genericConf.DryRun {
|
|
recController.vpaUpdater = control.NewRealVPAUpdater(genericClient.InternalClient)
|
|
recController.vpaRecUpdater = control.NewRealVPARecommendationUpdater(genericClient.InternalClient)
|
|
}
|
|
|
|
return recController, nil
|
|
}
|
|
|
|
func (rrc *ResourceRecommendController) Run() {
|
|
defer utilruntime.HandleCrash()
|
|
defer rrc.vpaQueue.ShutDown()
|
|
|
|
defer klog.Infof("[resource-rec] shutting down %s controller", resourceRecommendControllerName)
|
|
|
|
if !cache.WaitForCacheSync(rrc.ctx.Done(), rrc.syncedFunc...) {
|
|
utilruntime.HandleError(fmt.Errorf("unable to sync caches for %s controller", resourceRecommendControllerName))
|
|
return
|
|
}
|
|
klog.Infof("[resource-rec] caches are synced for %s controller", resourceRecommendControllerName)
|
|
klog.Infof("[resource-rec] start %d workers for %s controller", rrc.vpaSyncWorkers, resourceRecommendControllerName)
|
|
|
|
for i := 0; i < rrc.vpaSyncWorkers; i++ {
|
|
go wait.Until(rrc.vpaWorker, time.Second, rrc.ctx.Done())
|
|
}
|
|
|
|
<-rrc.ctx.Done()
|
|
}
|
|
|
|
func (rrc *ResourceRecommendController) addVPA(obj interface{}) {
|
|
v, ok := obj.(*apis.KatalystVerticalPodAutoscaler)
|
|
if !ok {
|
|
klog.Errorf("[resource-rec] cannot convert obj to *apis.VerticalPodAutoscaler: %v", obj)
|
|
return
|
|
}
|
|
|
|
klog.V(4).Infof("[resource-rec] notice addition of vpa %s", v.Name)
|
|
rrc.enqueueVPA(v)
|
|
}
|
|
|
|
func (rrc *ResourceRecommendController) updateVPA(_, cur interface{}) {
|
|
v, ok := cur.(*apis.KatalystVerticalPodAutoscaler)
|
|
if !ok {
|
|
klog.Errorf("[resource-rec] cannot convert curObj to *apis.VerticalPodAutoscaler: %v", cur)
|
|
return
|
|
}
|
|
|
|
klog.V(4).Infof("[resource-rec] notice update of vpa %s", v.Name)
|
|
rrc.enqueueVPA(v)
|
|
}
|
|
|
|
func (rrc *ResourceRecommendController) enqueueVPA(vpa *apis.KatalystVerticalPodAutoscaler) {
|
|
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(vpa)
|
|
if err != nil {
|
|
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", vpa, err))
|
|
return
|
|
}
|
|
rrc.vpaQueue.Add(key)
|
|
}
|
|
|
|
func (rrc *ResourceRecommendController) vpaWorker() {
|
|
for rrc.processNextVPA() {
|
|
}
|
|
}
|
|
|
|
func (rrc *ResourceRecommendController) processNextVPA() bool {
|
|
key, quit := rrc.vpaQueue.Get()
|
|
if quit {
|
|
return false
|
|
}
|
|
defer rrc.vpaQueue.Done(key)
|
|
|
|
err := rrc.syncVPA(key.(string))
|
|
if err == nil {
|
|
rrc.vpaQueue.Forget(key)
|
|
return true
|
|
}
|
|
|
|
utilruntime.HandleError(fmt.Errorf("sync %q failed with %v", key, err))
|
|
rrc.vpaQueue.AddRateLimited(key)
|
|
|
|
return true
|
|
}
|
|
|
|
// syncVPA is mainly responsible to calculate resource recommendation for each vpa (with
|
|
// recommender setting as in-tree algorithms); since we will re-sync periodicallly, we
|
|
// won't return error in this function.
|
|
func (rrc *ResourceRecommendController) syncVPA(key string) error {
|
|
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
|
if err != nil {
|
|
klog.Errorf("[resource-rec] failed to split namespace and name from key %s", key)
|
|
return err
|
|
}
|
|
|
|
begin := time.Now()
|
|
defer func() {
|
|
costs := time.Since(begin).Microseconds()
|
|
klog.Infof("[resource-rec] syncing vpa [%v/%v] costs %v us", namespace, name, costs)
|
|
_ = rrc.metricsEmitter.StoreInt64(metricNameRecommendControlVPASyncCosts, costs, metrics.MetricTypeNameRaw,
|
|
metrics.MetricTag{Key: "vpa_namespace", Val: namespace},
|
|
metrics.MetricTag{Key: "vpa_name", Val: name},
|
|
)
|
|
}()
|
|
|
|
vpa, err := rrc.vpaLister.KatalystVerticalPodAutoscalers(namespace).Get(name)
|
|
if err != nil {
|
|
if errors.IsNotFound(err) {
|
|
klog.Warningf("[resource-rec] vpa %s/%s is not found", namespace, name)
|
|
return nil
|
|
}
|
|
|
|
klog.Errorf("[resource-rec] vpa %s/%s get error: %v", namespace, name, err)
|
|
return nil
|
|
}
|
|
klog.V(4).Infof("[resource-rec] syncing vpa %s", vpa.Name)
|
|
|
|
gvk := schema.FromAPIVersionAndKind(vpa.Spec.TargetRef.APIVersion, vpa.Spec.TargetRef.Kind)
|
|
workloadLister, ok := rrc.workloadLister[gvk]
|
|
if !ok {
|
|
klog.Errorf("[resource-rec] vpa %s/%s without workload lister", namespace, name)
|
|
return nil
|
|
}
|
|
|
|
recommender := vpa.Spec.ResourcePolicy.AlgorithmPolicy.Recommender
|
|
r, ok := algorithm.GetRecommender()[recommender]
|
|
if !ok {
|
|
klog.V(8).ErrorS(nil, fmt.Sprintf("[resource-rec] recommender %v not supported", recommender))
|
|
return nil
|
|
}
|
|
|
|
pods, err := katalystutil.GetPodListForVPA(vpa, rrc.podIndexer, rrc.conf.VPAPodLabelIndexerKeys, workloadLister, rrc.podLister)
|
|
if err != nil {
|
|
klog.Errorf("[resource-rec] get pods for vpa %s/%s error: %v", namespace, name, err)
|
|
return nil
|
|
}
|
|
|
|
spd, err := katalystutil.GetSPDForVPA(vpa, rrc.spdIndexer, workloadLister, rrc.spdLister)
|
|
if err != nil {
|
|
klog.Warningf("[resource-rec] get spd for vpa %s/%s error: %v", namespace, name, err)
|
|
return nil
|
|
}
|
|
|
|
podResources, containerResources, err := r.GetRecommendedPodResources(spd, pods)
|
|
if err != nil {
|
|
klog.Errorf("[resource-rec] calculate resources for vpa %s/%s error: %v", namespace, name, err)
|
|
return nil
|
|
}
|
|
|
|
vpaRec, err := rrc.getOrCreateVpaRec(vpa)
|
|
if err != nil {
|
|
klog.Errorf("[resource-rec] get vpaRec for vpa %s/%s error: %v", namespace, name, err)
|
|
return nil
|
|
}
|
|
|
|
vpaRecNew := vpaRec.DeepCopy()
|
|
vpaRecNew.Spec.PodRecommendations = podResources
|
|
vpaRecNew.Spec.ContainerRecommendations = containerResources
|
|
err = rrc.vpaRecUpdater.PatchVPARecommendation(rrc.ctx, vpaRec, vpaRecNew)
|
|
if err != nil {
|
|
klog.Errorf("[resource-rec] get vpaRec for vpa %s/%s error: %v", namespace, name, err)
|
|
return nil
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// cleanVPARec is mainly responsible to clean all vpaRec CR that should not exist
|
|
func (rrc *ResourceRecommendController) cleanVPARec() {
|
|
recList, err := rrc.vpaRecLister.List(labels.Everything())
|
|
if err != nil {
|
|
klog.Errorf("[resource-rec] failed to list all vpaRec: %v", err)
|
|
}
|
|
|
|
for _, vpaRec := range recList {
|
|
needDelete := false
|
|
vpa, err := katalystutil.GetVPAForVPARec(vpaRec, rrc.vpaLister)
|
|
if err != nil {
|
|
if errors.IsNotFound(err) {
|
|
needDelete = true
|
|
} else {
|
|
klog.Errorf("[resource-rec] get vpa for vpaRec %s error: %v", vpaRec.Name, err)
|
|
}
|
|
} else {
|
|
// delete vpa-rec if the recommender field has already be erased
|
|
recommender := vpa.Spec.ResourcePolicy.AlgorithmPolicy.Recommender
|
|
if recommender == "" {
|
|
needDelete = true
|
|
}
|
|
}
|
|
|
|
if needDelete {
|
|
klog.Warningf("[resource-rec] delete un-wanted vpaRec %v", vpaRec.Name)
|
|
if err := rrc.vpaRecUpdater.DeleteVPARecommendation(rrc.ctx, vpaRec, metav1.DeleteOptions{}); err != nil {
|
|
klog.Warningf("[resource-rec] delete un-wanted vpaRec %v err: %v", vpaRec.Name, err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// getOrCreateVpaRec is used to main the in-tree vpaRec objects if it dosn't exist
|
|
func (rrc *ResourceRecommendController) getOrCreateVpaRec(vpa *apis.KatalystVerticalPodAutoscaler) (*apis.VerticalPodAutoscalerRecommendation, error) {
|
|
vpaRec, err := katalystutil.GetVPARecForVPA(vpa, rrc.vpaRecIndexer, rrc.vpaRecLister)
|
|
if err != nil {
|
|
if !apierrors.IsNotFound(err) {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
return vpaRec, nil
|
|
}
|
|
|
|
klog.Errorf("[resource-rec] create vpaRec for vpa %s/%s", vpa.Namespace, vpa.Name)
|
|
ownerRef := metav1.OwnerReference{
|
|
Name: vpa.GetName(),
|
|
Kind: vpa.GroupVersionKind().Kind,
|
|
APIVersion: vpa.GroupVersionKind().GroupVersion().String(),
|
|
UID: vpa.UID,
|
|
}
|
|
vpaRec = &apis.VerticalPodAutoscalerRecommendation{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
OwnerReferences: []metav1.OwnerReference{ownerRef},
|
|
Namespace: vpa.GetNamespace(),
|
|
Name: vpa.GetName(),
|
|
Labels: general.DeepCopyMap(vpa.GetLabels()),
|
|
},
|
|
Spec: apis.VerticalPodAutoscalerRecommendationSpec{},
|
|
}
|
|
return rrc.vpaRecUpdater.CreateVPARecommendation(rrc.ctx, vpaRec, metav1.CreateOptions{})
|
|
}
|