forked from kubewharf/katalyst-core
234 lines
8.0 KiB
Go
234 lines
8.0 KiB
Go
/*
|
|
Copyright 2022 The Katalyst Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package spd
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"go.uber.org/atomic"
|
|
v1 "k8s.io/api/core/v1"
|
|
"k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/klog/v2"
|
|
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
|
|
|
|
configapis "github.com/kubewharf/katalyst-api/pkg/apis/config/v1alpha1"
|
|
workloadapis "github.com/kubewharf/katalyst-api/pkg/apis/workload/v1alpha1"
|
|
"github.com/kubewharf/katalyst-core/pkg/client"
|
|
pkgconfig "github.com/kubewharf/katalyst-core/pkg/config"
|
|
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/cnc"
|
|
"github.com/kubewharf/katalyst-core/pkg/metrics"
|
|
"github.com/kubewharf/katalyst-core/pkg/util"
|
|
"github.com/kubewharf/katalyst-core/pkg/util/general"
|
|
"github.com/kubewharf/katalyst-core/pkg/util/native"
|
|
)
|
|
|
|
const (
|
|
defaultClearUnusedSPDPeriod = 10 * time.Minute
|
|
)
|
|
|
|
const (
|
|
metricsNameGetCNCTargetConfigFailed = "spd_manager_get_cnc_target_failed"
|
|
metricsNameUpdateCacheFailed = "spd_manager_update_cache_failed"
|
|
metricsNameCacheNotFound = "spd_manager_cache_not_found"
|
|
)
|
|
|
|
type GetPodSPDNameFunc func(pod *v1.Pod) (string, error)
|
|
|
|
type SPDFetcher interface {
|
|
// GetSPD get spd for given pod
|
|
GetSPD(ctx context.Context, pod *v1.Pod) (*workloadapis.ServiceProfileDescriptor, error)
|
|
|
|
// Run async loop to clear unused spd
|
|
Run(ctx context.Context)
|
|
}
|
|
|
|
type spdFetcher struct {
|
|
started *atomic.Bool
|
|
mux sync.Mutex
|
|
|
|
client *client.GenericClientSet
|
|
emitter metrics.MetricEmitter
|
|
cncFetcher cnc.CNCFetcher
|
|
checkpointManager checkpointmanager.CheckpointManager
|
|
getPodSPDNameFunc GetPodSPDNameFunc
|
|
|
|
ServiceProfileCacheTTL time.Duration
|
|
|
|
// spdCache is a cache of namespace/name to current target spd
|
|
spdCache *Cache
|
|
}
|
|
|
|
// NewSPDFetcher creates a spd manager to implement SPDFetcher
|
|
func NewSPDFetcher(clientSet *client.GenericClientSet, emitter metrics.MetricEmitter,
|
|
cncFetcher cnc.CNCFetcher, conf *pkgconfig.Configuration) (SPDFetcher, error) {
|
|
checkpointManager, err := checkpointmanager.NewCheckpointManager(conf.CheckpointManagerDir)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
|
|
}
|
|
|
|
m := &spdFetcher{
|
|
started: atomic.NewBool(false),
|
|
client: clientSet,
|
|
emitter: emitter,
|
|
checkpointManager: checkpointManager,
|
|
cncFetcher: cncFetcher,
|
|
ServiceProfileCacheTTL: conf.ServiceProfileCacheTTL,
|
|
}
|
|
|
|
m.getPodSPDNameFunc = util.GetPodSPDName
|
|
m.spdCache = NewSPDCache(checkpointManager, defaultClearUnusedSPDPeriod)
|
|
|
|
return m, nil
|
|
}
|
|
|
|
func (s *spdFetcher) GetSPD(ctx context.Context, pod *v1.Pod) (*workloadapis.ServiceProfileDescriptor, error) {
|
|
spdName, err := s.getPodSPDNameFunc(pod)
|
|
if err != nil {
|
|
general.Warningf("get spd for pod (%v/%v) err %v", pod.Namespace, pod.Name, err)
|
|
return nil, errors.NewNotFound(workloadapis.Resource(workloadapis.ResourceNameServiceProfileDescriptors), fmt.Sprintf("for pod(%v/%v)", pod.Namespace, pod.Name))
|
|
}
|
|
|
|
return s.getSPDByNamespaceName(ctx, pod.GetNamespace(), spdName)
|
|
}
|
|
|
|
// SetGetPodSPDNameFunc set get spd name function to override default getPodSPDNameFunc before started
|
|
func (s *spdFetcher) SetGetPodSPDNameFunc(f GetPodSPDNameFunc) {
|
|
if s.started.Load() {
|
|
klog.Warningf("spd manager has already started, not allowed to set implementations")
|
|
return
|
|
}
|
|
|
|
s.getPodSPDNameFunc = f
|
|
}
|
|
|
|
func (s *spdFetcher) Run(ctx context.Context) {
|
|
if s.started.Swap(true) {
|
|
return
|
|
}
|
|
|
|
s.spdCache.Run(ctx)
|
|
<-ctx.Done()
|
|
}
|
|
|
|
func (s *spdFetcher) getSPDByNamespaceName(ctx context.Context, namespace, name string) (*workloadapis.ServiceProfileDescriptor, error) {
|
|
key := native.GenerateNamespaceNameKey(namespace, name)
|
|
baseTag := []metrics.MetricTag{
|
|
{Key: "spdNamespace", Val: namespace},
|
|
{Key: "spdName", Val: name},
|
|
}
|
|
|
|
// first get spd origin spd from local cache
|
|
originSPD := s.spdCache.GetSPD(key)
|
|
|
|
// get spd current target config from cnc to limit rate of get remote spd by comparing local spd
|
|
// hash with cnc target config hash, if cnc target config not found it will get remote spd directly
|
|
targetConfig, err := s.getSPDTargetConfig(ctx, namespace, name)
|
|
if err != nil {
|
|
klog.Errorf("[spd-manager] get spd targetConfig config failed: %v, use local cache instead", err)
|
|
targetConfig = &configapis.TargetConfig{
|
|
ConfigNamespace: namespace,
|
|
ConfigName: name,
|
|
}
|
|
_ = s.emitter.StoreInt64(metricsNameGetCNCTargetConfigFailed, 1, metrics.MetricTypeNameCount, baseTag...)
|
|
}
|
|
|
|
// try to update spd cache from remote if cache spd hash is not equal to target config hash,
|
|
// the rate of getting remote spd will be limited by spd ServiceProfileCacheTTL
|
|
err = s.updateSPDCacheIfNeed(ctx, originSPD, targetConfig)
|
|
if err != nil {
|
|
klog.Errorf("[spd-manager] failed update spd cache from remote: %v, use local cache instead", err)
|
|
_ = s.emitter.StoreInt64(metricsNameUpdateCacheFailed, 1, metrics.MetricTypeNameCount, baseTag...)
|
|
}
|
|
|
|
// get current spd after cache updated
|
|
currentSPD := s.spdCache.GetSPD(key)
|
|
if currentSPD != nil {
|
|
return currentSPD, nil
|
|
}
|
|
|
|
_ = s.emitter.StoreInt64(metricsNameCacheNotFound, 1, metrics.MetricTypeNameCount, baseTag...)
|
|
|
|
return nil, errors.NewNotFound(workloadapis.Resource(workloadapis.ResourceNameServiceProfileDescriptors), name)
|
|
}
|
|
|
|
// getSPDTargetConfig get spd target config from cnc
|
|
func (s *spdFetcher) getSPDTargetConfig(ctx context.Context, namespace, name string) (*configapis.TargetConfig, error) {
|
|
currentCNC, err := s.cncFetcher.GetCNC(ctx)
|
|
if err != nil {
|
|
return &configapis.TargetConfig{}, err
|
|
}
|
|
|
|
for _, target := range currentCNC.Status.ServiceProfileConfigList {
|
|
if target.ConfigNamespace == namespace && target.ConfigName == name {
|
|
return &target, nil
|
|
}
|
|
}
|
|
|
|
return nil, fmt.Errorf("get target spd %s/%s not found", namespace, name)
|
|
}
|
|
|
|
// updateSPDCacheIfNeed checks if the previous spd has changed, and
|
|
// re-get from APIServer if the previous is out-of date.
|
|
func (s *spdFetcher) updateSPDCacheIfNeed(ctx context.Context, originSPD *workloadapis.ServiceProfileDescriptor,
|
|
targetConfig *configapis.TargetConfig) error {
|
|
s.mux.Lock()
|
|
defer s.mux.Unlock()
|
|
|
|
if originSPD == nil && targetConfig == nil {
|
|
return nil
|
|
}
|
|
|
|
now := time.Now()
|
|
if originSPD == nil || util.GetSPDHash(originSPD) != targetConfig.Hash {
|
|
key := native.GenerateNamespaceNameKey(targetConfig.ConfigNamespace, targetConfig.ConfigName)
|
|
if lastFetchRemoteTime := s.spdCache.GetLastFetchRemoteTime(key); lastFetchRemoteTime.Add(s.ServiceProfileCacheTTL).After(time.Now()) {
|
|
return nil
|
|
} else {
|
|
// first update the timestamp of the last attempt to fetch the remote spd to
|
|
// avoid frequent requests to the api-server in some bad situations
|
|
s.spdCache.SetLastFetchRemoteTime(key, now)
|
|
}
|
|
|
|
klog.Infof("[spd-manager] spd %s targetConfig hash is changed from %s to %s", key, util.GetSPDHash(originSPD), targetConfig.Hash)
|
|
spd, err := s.client.InternalClient.WorkloadV1alpha1().ServiceProfileDescriptors(targetConfig.ConfigNamespace).
|
|
Get(ctx, targetConfig.ConfigName, metav1.GetOptions{ResourceVersion: "0"})
|
|
if err != nil && !errors.IsNotFound(err) {
|
|
return fmt.Errorf("get spd %s from remote failed: %v", key, err)
|
|
} else if err != nil {
|
|
err = s.spdCache.DeleteSPD(key)
|
|
if err != nil {
|
|
return fmt.Errorf("delete spd %s from cache failed: %v", key, err)
|
|
}
|
|
|
|
klog.Infof("[spd-manager] spd %s cache has been deleted", key)
|
|
return nil
|
|
}
|
|
|
|
err = s.spdCache.SetSPD(key, spd)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
klog.Infof("[spd-manager] spd %s cache has been updated to %v", key, spd)
|
|
}
|
|
|
|
return nil
|
|
}
|