forked from kubewharf/katalyst-core
1023 lines
38 KiB
Go
1023 lines
38 KiB
Go
/*
|
|
Copyright 2022 The Katalyst Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package dynamicpolicy
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"google.golang.org/grpc"
|
|
v1 "k8s.io/api/core/v1"
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1"
|
|
maputil "k8s.io/kubernetes/pkg/util/maps"
|
|
"k8s.io/utils/clock"
|
|
|
|
"github.com/kubewharf/katalyst-api/pkg/consts"
|
|
"github.com/kubewharf/katalyst-api/pkg/plugins/skeleton"
|
|
"github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/agent"
|
|
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/calculator"
|
|
advisorapi "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpuadvisor"
|
|
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpueviction"
|
|
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state"
|
|
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/validator"
|
|
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util"
|
|
"github.com/kubewharf/katalyst-core/pkg/config"
|
|
dynamicconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic"
|
|
"github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/crd"
|
|
"github.com/kubewharf/katalyst-core/pkg/config/generic"
|
|
"github.com/kubewharf/katalyst-core/pkg/metaserver"
|
|
"github.com/kubewharf/katalyst-core/pkg/metrics"
|
|
"github.com/kubewharf/katalyst-core/pkg/util/general"
|
|
"github.com/kubewharf/katalyst-core/pkg/util/machine"
|
|
"github.com/kubewharf/katalyst-core/pkg/util/native"
|
|
"github.com/kubewharf/katalyst-core/pkg/util/process"
|
|
)
|
|
|
|
const CPUResourcePluginPolicyNameDynamic = "dynamic"
|
|
|
|
const cpuPluginStateFileName = "cpu_plugin_state"
|
|
|
|
const (
|
|
reservedReclaimedCPUsSize = 4
|
|
|
|
cpusetCheckPeriod = 10 * time.Second
|
|
stateCheckPeriod = 30 * time.Second
|
|
maxResidualTime = 5 * time.Minute
|
|
syncCPUIdlePeriod = 30 * time.Second
|
|
)
|
|
|
|
var (
|
|
transitionPeriod = 30 * time.Second
|
|
)
|
|
|
|
var (
|
|
readonlyStateLock sync.RWMutex
|
|
readonlyState state.ReadonlyState
|
|
)
|
|
|
|
// GetReadonlyState returns state.ReadonlyState to provides a way
|
|
// to obtain the running states of the plugin
|
|
func GetReadonlyState() (state.ReadonlyState, error) {
|
|
readonlyStateLock.RLock()
|
|
defer readonlyStateLock.RUnlock()
|
|
|
|
if readonlyState == nil {
|
|
return nil, fmt.Errorf("readonlyState isn't setted")
|
|
}
|
|
return readonlyState, nil
|
|
}
|
|
|
|
// DynamicPolicy is the policy that's used by default;
|
|
// it will consider the dynamic running information to calculate
|
|
// and adjust resource requirements and configurations
|
|
type DynamicPolicy struct {
|
|
sync.RWMutex
|
|
name string
|
|
stopCh chan struct{}
|
|
started bool
|
|
|
|
emitter metrics.MetricEmitter
|
|
metaServer *metaserver.MetaServer
|
|
machineInfo *machine.KatalystMachineInfo
|
|
|
|
advisorClient advisorapi.CPUAdvisorClient
|
|
advisorConn *grpc.ClientConn
|
|
advisorValidator *validator.CPUAdvisorValidator
|
|
advisorapi.UnimplementedCPUPluginServer
|
|
|
|
state state.State
|
|
residualHitMap map[string]int64
|
|
allocationHandlers map[string]util.AllocationHandler
|
|
hintHandlers map[string]util.HintHandler
|
|
|
|
cpuEvictionPlugin *agent.PluginWrapper
|
|
cpuEvictionPluginCancel context.CancelFunc
|
|
|
|
// those are parsed from configurations
|
|
// todo if we want to use dynamic configuration, we'd better not use self-defined conf
|
|
enableCPUSysAdvisor bool
|
|
reservedCPUs machine.CPUSet
|
|
cpuAdvisorSocketAbsPath string
|
|
cpuPluginSocketAbsPath string
|
|
extraStateFileAbsPath string
|
|
enableCPUPressureEviction bool
|
|
enableCPUIdle bool
|
|
enableSyncingCPUIdle bool
|
|
reclaimRelativeRootCgroupPath string
|
|
qosConfig *generic.QoSConfiguration
|
|
dynamicConfig *dynamicconfig.DynamicAgentConfiguration
|
|
podDebugAnnoKeys []string
|
|
}
|
|
|
|
func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration,
|
|
_ interface{}, agentName string) (bool, agent.Component, error) {
|
|
allCPUs := agentCtx.CPUDetails.CPUs().Clone()
|
|
reservedCPUsNum := conf.ReservedCPUCores
|
|
|
|
reservedCPUs, _, reserveErr := calculator.TakeHTByNUMABalance(agentCtx.KatalystMachineInfo, allCPUs, reservedCPUsNum)
|
|
if reserveErr != nil {
|
|
return false, agent.ComponentStub{}, fmt.Errorf("takeByNUMABalance for reservedCPUsNum: %d failed with error: %v",
|
|
conf.ReservedCPUCores, reserveErr)
|
|
}
|
|
general.Infof("take reservedCPUs: %s by reservedCPUsNum: %d", reservedCPUs.String(), reservedCPUsNum)
|
|
|
|
stateImpl, stateErr := state.NewCheckpointState(conf.GenericQRMPluginConfiguration.StateFileDirectory, cpuPluginStateFileName,
|
|
CPUResourcePluginPolicyNameDynamic, agentCtx.CPUTopology, conf.SkipCPUStateCorruption)
|
|
if stateErr != nil {
|
|
return false, agent.ComponentStub{}, fmt.Errorf("NewCheckpointState failed with error: %v", stateErr)
|
|
}
|
|
|
|
readonlyStateLock.Lock()
|
|
readonlyState = stateImpl
|
|
readonlyStateLock.Unlock()
|
|
|
|
wrappedEmitter := agentCtx.EmitterPool.GetDefaultMetricsEmitter().WithTags(agentName, metrics.MetricTag{
|
|
Key: util.QRMPluginPolicyTagName,
|
|
Val: CPUResourcePluginPolicyNameDynamic,
|
|
})
|
|
|
|
cpuEvictionPlugin, err := cpueviction.NewCPUPressureEvictionPlugin(
|
|
agentCtx.EmitterPool.GetDefaultMetricsEmitter(), agentCtx.MetaServer, conf, stateImpl)
|
|
if err != nil {
|
|
return false, agent.ComponentStub{}, err
|
|
}
|
|
|
|
// since the reservedCPUs won't influence stateImpl directly.
|
|
// so we don't modify stateImpl with reservedCPUs here.
|
|
// for those pods have already been allocated reservedCPUs,
|
|
// we won't touch them and wait them to be deleted the next update.
|
|
policyImplement := &DynamicPolicy{
|
|
name: fmt.Sprintf("%s_%s", agentName, CPUResourcePluginPolicyNameDynamic),
|
|
stopCh: make(chan struct{}),
|
|
|
|
machineInfo: agentCtx.KatalystMachineInfo,
|
|
emitter: wrappedEmitter,
|
|
metaServer: agentCtx.MetaServer,
|
|
|
|
state: stateImpl,
|
|
residualHitMap: make(map[string]int64),
|
|
|
|
advisorValidator: validator.NewCPUAdvisorValidator(stateImpl, agentCtx.KatalystMachineInfo),
|
|
|
|
cpuEvictionPlugin: cpuEvictionPlugin,
|
|
|
|
qosConfig: conf.QoSConfiguration,
|
|
dynamicConfig: conf.DynamicAgentConfiguration,
|
|
cpuAdvisorSocketAbsPath: conf.CPUAdvisorSocketAbsPath,
|
|
cpuPluginSocketAbsPath: conf.CPUPluginSocketAbsPath,
|
|
enableCPUSysAdvisor: conf.CPUQRMPluginConfig.EnableSysAdvisor,
|
|
reservedCPUs: reservedCPUs,
|
|
extraStateFileAbsPath: conf.ExtraStateFileAbsPath,
|
|
enableCPUPressureEviction: conf.EnableCPUPressureEviction,
|
|
enableSyncingCPUIdle: conf.CPUQRMPluginConfig.EnableSyncingCPUIdle,
|
|
enableCPUIdle: conf.CPUQRMPluginConfig.EnableCPUIdle,
|
|
reclaimRelativeRootCgroupPath: conf.ReclaimRelativeRootCgroupPath,
|
|
podDebugAnnoKeys: conf.PodDebugAnnoKeys,
|
|
}
|
|
|
|
// register allocation behaviors for pods with different QoS level
|
|
policyImplement.allocationHandlers = map[string]util.AllocationHandler{
|
|
consts.PodAnnotationQoSLevelSharedCores: policyImplement.sharedCoresAllocationHandler,
|
|
consts.PodAnnotationQoSLevelDedicatedCores: policyImplement.dedicatedCoresAllocationHandler,
|
|
consts.PodAnnotationQoSLevelReclaimedCores: policyImplement.reclaimedCoresAllocationHandler,
|
|
}
|
|
|
|
// register hint providers for pods with different QoS level
|
|
policyImplement.hintHandlers = map[string]util.HintHandler{
|
|
consts.PodAnnotationQoSLevelSharedCores: policyImplement.sharedCoresHintHandler,
|
|
consts.PodAnnotationQoSLevelDedicatedCores: policyImplement.dedicatedCoresHintHandler,
|
|
consts.PodAnnotationQoSLevelReclaimedCores: policyImplement.reclaimedCoresHintHandler,
|
|
}
|
|
|
|
state.GetContainerRequestedCores = policyImplement.getContainerRequestedCores
|
|
|
|
if err := policyImplement.cleanPools(); err != nil {
|
|
return false, agent.ComponentStub{}, fmt.Errorf("cleanPools failed with error: %v", err)
|
|
}
|
|
|
|
if err := policyImplement.initReservePool(); err != nil {
|
|
return false, agent.ComponentStub{}, fmt.Errorf("dynamic policy initReservePool failed with error: %v", err)
|
|
}
|
|
|
|
if err := policyImplement.initReclaimPool(); err != nil {
|
|
return false, agent.ComponentStub{}, fmt.Errorf("dynamic policy initReclaimPool failed with error: %v", err)
|
|
}
|
|
|
|
err = agentCtx.MetaServer.ConfigurationManager.AddConfigWatcher(crd.AdminQoSConfigurationGVR)
|
|
if err != nil {
|
|
return false, nil, err
|
|
}
|
|
|
|
pluginWrapper, err := skeleton.NewRegistrationPluginWrapper(policyImplement, conf.QRMPluginSocketDirs, nil)
|
|
if err != nil {
|
|
return false, agent.ComponentStub{}, fmt.Errorf("dynamic policy new plugin wrapper failed with error: %v", err)
|
|
}
|
|
|
|
return true, &agent.PluginWrapper{GenericPlugin: pluginWrapper}, nil
|
|
}
|
|
|
|
func (p *DynamicPolicy) Name() string {
|
|
return p.name
|
|
}
|
|
|
|
func (p *DynamicPolicy) ResourceName() string {
|
|
return string(v1.ResourceCPU)
|
|
}
|
|
|
|
func (p *DynamicPolicy) Start() (err error) {
|
|
general.Infof("called")
|
|
|
|
p.Lock()
|
|
defer func() {
|
|
if err == nil {
|
|
p.started = true
|
|
}
|
|
p.Unlock()
|
|
}()
|
|
|
|
if p.started {
|
|
general.Infof("is already started")
|
|
return nil
|
|
}
|
|
p.stopCh = make(chan struct{})
|
|
|
|
go wait.Until(func() {
|
|
_ = p.emitter.StoreInt64(util.MetricNameHeartBeat, 1, metrics.MetricTypeNameRaw)
|
|
}, time.Second*30, p.stopCh)
|
|
go wait.Until(p.clearResidualState, stateCheckPeriod, p.stopCh)
|
|
go wait.Until(p.checkCPUSet, cpusetCheckPeriod, p.stopCh)
|
|
|
|
// start cpu-idle syncing if needed
|
|
if p.enableSyncingCPUIdle {
|
|
if p.reclaimRelativeRootCgroupPath == "" {
|
|
return fmt.Errorf("enable syncing cpu idle but not set reclaiemd relative root cgroup path in configuration")
|
|
}
|
|
go wait.Until(p.syncCPUIdle, syncCPUIdlePeriod, p.stopCh)
|
|
}
|
|
|
|
// start cpu-pressure eviction plugin if needed
|
|
if p.enableCPUPressureEviction {
|
|
var ctx context.Context
|
|
ctx, p.cpuEvictionPluginCancel = context.WithCancel(context.Background())
|
|
go p.cpuEvictionPlugin.Run(ctx)
|
|
}
|
|
|
|
// pre-check necessary dirs if sys-advisor is enabled
|
|
if !p.enableCPUSysAdvisor {
|
|
general.Infof("start dynamic policy cpu plugin without sys-advisor")
|
|
return nil
|
|
} else if p.cpuAdvisorSocketAbsPath == "" || p.cpuPluginSocketAbsPath == "" {
|
|
return fmt.Errorf("invalid cpuAdvisorSocketAbsPath: %s or cpuPluginSocketAbsPath: %s",
|
|
p.cpuAdvisorSocketAbsPath, p.cpuPluginSocketAbsPath)
|
|
}
|
|
|
|
general.Infof("start dynamic policy cpu plugin with sys-advisor")
|
|
err = p.initAdvisorClientConn()
|
|
if err != nil {
|
|
general.Errorf("initAdvisorClientConn failed with error: %v", err)
|
|
return
|
|
}
|
|
|
|
go wait.BackoffUntil(func() { p.serveForAdvisor(p.stopCh) }, wait.NewExponentialBackoffManager(
|
|
800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 0, &clock.RealClock{}), true, p.stopCh)
|
|
|
|
communicateWithCPUAdvisorServer := func() {
|
|
general.Infof("waiting cpu plugin checkpoint server serving confirmation")
|
|
if conn, err := process.Dial(p.cpuPluginSocketAbsPath, 5*time.Second); err != nil {
|
|
general.Errorf("dial check at socket: %s failed with err: %v", p.cpuPluginSocketAbsPath, err)
|
|
return
|
|
} else {
|
|
_ = conn.Close()
|
|
}
|
|
general.Infof("cpu plugin checkpoint server serving confirmed")
|
|
|
|
err = p.pushCPUAdvisor()
|
|
if err != nil {
|
|
general.Errorf("sync existing containers to cpu advisor failed with error: %v", err)
|
|
return
|
|
}
|
|
general.Infof("sync existing containers to cpu advisor successfully")
|
|
|
|
// call lw of CPUAdvisorServer and do allocation
|
|
if err := p.lwCPUAdvisorServer(p.stopCh); err != nil {
|
|
general.Errorf("lwCPUAdvisorServer failed with error: %v", err)
|
|
} else {
|
|
general.Infof("lwCPUAdvisorServer finished")
|
|
}
|
|
}
|
|
|
|
go wait.BackoffUntil(communicateWithCPUAdvisorServer, wait.NewExponentialBackoffManager(800*time.Millisecond,
|
|
30*time.Second, 2*time.Minute, 2.0, 0, &clock.RealClock{}), true, p.stopCh)
|
|
return nil
|
|
}
|
|
|
|
func (p *DynamicPolicy) Stop() error {
|
|
p.Lock()
|
|
defer func() {
|
|
p.started = false
|
|
p.Unlock()
|
|
general.Infof("stopped")
|
|
}()
|
|
|
|
if !p.started {
|
|
general.Warningf("already stopped")
|
|
return nil
|
|
}
|
|
close(p.stopCh)
|
|
|
|
if p.advisorConn != nil {
|
|
return p.advisorConn.Close()
|
|
}
|
|
|
|
if p.cpuEvictionPluginCancel != nil {
|
|
p.cpuEvictionPluginCancel()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetResourcesAllocation returns allocation results of corresponding resources
|
|
func (p *DynamicPolicy) GetResourcesAllocation(_ context.Context,
|
|
req *pluginapi.GetResourcesAllocationRequest) (*pluginapi.GetResourcesAllocationResponse, error) {
|
|
if req == nil {
|
|
return nil, fmt.Errorf("GetResourcesAllocation got nil req")
|
|
}
|
|
|
|
general.Infof("called")
|
|
p.Lock()
|
|
defer p.Unlock()
|
|
|
|
podEntries := p.state.GetPodEntries()
|
|
machineState := p.state.GetMachineState()
|
|
|
|
// pooledCPUs is the total available cpu cores minus those that are reserved
|
|
pooledCPUs := machineState.GetFilteredAvailableCPUSet(p.reservedCPUs,
|
|
state.CheckDedicated, state.CheckDedicatedNUMABinding)
|
|
pooledCPUsTopologyAwareAssignments, err := machine.GetNumaAwareAssignments(p.machineInfo.CPUTopology, pooledCPUs)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("GetNumaAwareAssignments err: %v", err)
|
|
}
|
|
|
|
podResources := make(map[string]*pluginapi.ContainerResources)
|
|
var allocationInfosJustFinishRampUp []*state.AllocationInfo
|
|
for podUID, containerEntries := range podEntries {
|
|
// if it's a pool, not returning to QRM
|
|
if containerEntries.IsPoolEntry() {
|
|
continue
|
|
}
|
|
|
|
if podResources[podUID] == nil {
|
|
podResources[podUID] = &pluginapi.ContainerResources{}
|
|
}
|
|
|
|
for containerName, allocationInfo := range containerEntries {
|
|
if allocationInfo == nil {
|
|
continue
|
|
}
|
|
allocationInfo = allocationInfo.Clone()
|
|
|
|
initTs, tsErr := time.Parse(util.QRMTimeFormat, allocationInfo.InitTimestamp)
|
|
if tsErr != nil {
|
|
if state.CheckShared(allocationInfo) {
|
|
general.Errorf("pod: %s/%s, container: %s init timestamp parsed failed with error: %v, re-ramp-up it",
|
|
allocationInfo.PodNamespace, allocationInfo.PodName, allocationInfo.ContainerName, tsErr)
|
|
|
|
clonedPooledCPUs := pooledCPUs.Clone()
|
|
clonedPooledCPUsTopologyAwareAssignments := machine.DeepcopyCPUAssignment(pooledCPUsTopologyAwareAssignments)
|
|
|
|
allocationInfo.AllocationResult = clonedPooledCPUs
|
|
allocationInfo.OriginalAllocationResult = clonedPooledCPUs
|
|
allocationInfo.TopologyAwareAssignments = clonedPooledCPUsTopologyAwareAssignments
|
|
allocationInfo.OriginalTopologyAwareAssignments = clonedPooledCPUsTopologyAwareAssignments
|
|
// fill OwnerPoolName with empty string when ramping up
|
|
allocationInfo.OwnerPoolName = advisorapi.EmptyOwnerPoolName
|
|
allocationInfo.RampUp = true
|
|
}
|
|
|
|
allocationInfo.InitTimestamp = time.Now().Format(util.QRMTimeFormat)
|
|
p.state.SetAllocationInfo(podUID, containerName, allocationInfo)
|
|
} else if allocationInfo.RampUp && time.Now().After(initTs.Add(transitionPeriod)) {
|
|
general.Infof("pod: %s/%s, container: %s ramp up finished", allocationInfo.PodNamespace, allocationInfo.PodName, allocationInfo.ContainerName)
|
|
allocationInfo.RampUp = false
|
|
p.state.SetAllocationInfo(podUID, containerName, allocationInfo)
|
|
|
|
if state.CheckShared(allocationInfo) {
|
|
allocationInfosJustFinishRampUp = append(allocationInfosJustFinishRampUp, allocationInfo)
|
|
}
|
|
}
|
|
|
|
if podResources[podUID].ContainerResources == nil {
|
|
podResources[podUID].ContainerResources = make(map[string]*pluginapi.ResourceAllocation)
|
|
}
|
|
podResources[podUID].ContainerResources[containerName] = &pluginapi.ResourceAllocation{
|
|
ResourceAllocation: map[string]*pluginapi.ResourceAllocationInfo{
|
|
string(v1.ResourceCPU): {
|
|
OciPropertyName: util.OCIPropertyNameCPUSetCPUs,
|
|
IsNodeResource: false,
|
|
IsScalarResource: true,
|
|
AllocatedQuantity: float64(allocationInfo.AllocationResult.Size()),
|
|
AllocationResult: allocationInfo.AllocationResult.String(),
|
|
},
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(allocationInfosJustFinishRampUp) > 0 {
|
|
if err = p.putAllocationsAndAdjustAllocationEntries(allocationInfosJustFinishRampUp); err != nil {
|
|
// not influencing return response to kubelet when putAllocationsAndAdjustAllocationEntries failed
|
|
general.Errorf("putAllocationsAndAdjustAllocationEntries failed with error: %v", err)
|
|
}
|
|
}
|
|
|
|
return &pluginapi.GetResourcesAllocationResponse{
|
|
PodResources: podResources,
|
|
}, nil
|
|
}
|
|
|
|
// GetTopologyAwareResources returns allocation results of corresponding resources as machineInfo aware format
|
|
func (p *DynamicPolicy) GetTopologyAwareResources(_ context.Context,
|
|
req *pluginapi.GetTopologyAwareResourcesRequest) (*pluginapi.GetTopologyAwareResourcesResponse, error) {
|
|
if req == nil {
|
|
return nil, fmt.Errorf("GetTopologyAwareResources got nil req")
|
|
}
|
|
|
|
general.Infof("called")
|
|
p.RLock()
|
|
defer p.RUnlock()
|
|
|
|
allocationInfo := p.state.GetAllocationInfo(req.PodUid, req.ContainerName)
|
|
if allocationInfo == nil {
|
|
return nil, fmt.Errorf("pod: %s, container: %s is not show up in cpu plugin state", req.PodUid, req.ContainerName)
|
|
}
|
|
|
|
resp := &pluginapi.GetTopologyAwareResourcesResponse{
|
|
PodUid: allocationInfo.PodUid,
|
|
PodName: allocationInfo.PodName,
|
|
PodNamespace: allocationInfo.PodNamespace,
|
|
ContainerTopologyAwareResources: &pluginapi.ContainerTopologyAwareResources{
|
|
ContainerName: allocationInfo.ContainerName,
|
|
},
|
|
}
|
|
|
|
if allocationInfo.CheckSideCar() {
|
|
resp.ContainerTopologyAwareResources.AllocatedResources = map[string]*pluginapi.TopologyAwareResource{
|
|
string(v1.ResourceCPU): {
|
|
IsNodeResource: false,
|
|
IsScalarResource: true,
|
|
AggregatedQuantity: 0,
|
|
OriginalAggregatedQuantity: 0,
|
|
TopologyAwareQuantityList: nil,
|
|
OriginalTopologyAwareQuantityList: nil,
|
|
},
|
|
}
|
|
} else {
|
|
resp.ContainerTopologyAwareResources.AllocatedResources = map[string]*pluginapi.TopologyAwareResource{
|
|
string(v1.ResourceCPU): {
|
|
IsNodeResource: false,
|
|
IsScalarResource: true,
|
|
AggregatedQuantity: float64(allocationInfo.AllocationResult.Size()),
|
|
OriginalAggregatedQuantity: float64(allocationInfo.OriginalAllocationResult.Size()),
|
|
TopologyAwareQuantityList: util.GetTopologyAwareQuantityFromAssignments(allocationInfo.TopologyAwareAssignments),
|
|
OriginalTopologyAwareQuantityList: util.GetTopologyAwareQuantityFromAssignments(allocationInfo.OriginalTopologyAwareAssignments),
|
|
},
|
|
}
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
// GetTopologyAwareAllocatableResources returns corresponding allocatable resources as machineInfo aware format
|
|
func (p *DynamicPolicy) GetTopologyAwareAllocatableResources(_ context.Context,
|
|
_ *pluginapi.GetTopologyAwareAllocatableResourcesRequest) (*pluginapi.GetTopologyAwareAllocatableResourcesResponse, error) {
|
|
general.Infof("is called")
|
|
|
|
numaNodes := p.machineInfo.CPUDetails.NUMANodes().ToSliceInt()
|
|
topologyAwareAllocatableQuantityList := make([]*pluginapi.TopologyAwareQuantity, 0, len(numaNodes))
|
|
topologyAwareCapacityQuantityList := make([]*pluginapi.TopologyAwareQuantity, 0, len(numaNodes))
|
|
|
|
for _, numaNode := range numaNodes {
|
|
numaNodeCPUs := p.machineInfo.CPUDetails.CPUsInNUMANodes(numaNode).Clone()
|
|
topologyAwareAllocatableQuantityList = append(topologyAwareAllocatableQuantityList, &pluginapi.TopologyAwareQuantity{
|
|
ResourceValue: float64(numaNodeCPUs.Difference(p.reservedCPUs).Size()),
|
|
Node: uint64(numaNode),
|
|
})
|
|
topologyAwareCapacityQuantityList = append(topologyAwareCapacityQuantityList, &pluginapi.TopologyAwareQuantity{
|
|
ResourceValue: float64(numaNodeCPUs.Size()),
|
|
Node: uint64(numaNode),
|
|
})
|
|
}
|
|
|
|
return &pluginapi.GetTopologyAwareAllocatableResourcesResponse{
|
|
AllocatableResources: map[string]*pluginapi.AllocatableTopologyAwareResource{
|
|
string(v1.ResourceCPU): {
|
|
IsNodeResource: false,
|
|
IsScalarResource: true,
|
|
AggregatedAllocatableQuantity: float64(p.machineInfo.NumCPUs - p.reservedCPUs.Size()),
|
|
TopologyAwareAllocatableQuantityList: topologyAwareAllocatableQuantityList,
|
|
AggregatedCapacityQuantity: float64(p.machineInfo.NumCPUs),
|
|
TopologyAwareCapacityQuantityList: topologyAwareCapacityQuantityList,
|
|
},
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
// GetTopologyHints returns hints of corresponding resources
|
|
func (p *DynamicPolicy) GetTopologyHints(ctx context.Context,
|
|
req *pluginapi.ResourceRequest) (resp *pluginapi.ResourceHintsResponse, err error) {
|
|
if req == nil {
|
|
return nil, fmt.Errorf("GetTopologyHints got nil req")
|
|
}
|
|
|
|
// identify if the pod is a debug pod,
|
|
// if so, apply specific strategy to it.
|
|
// since GetKatalystQoSLevelFromResourceReq function will filter annotations,
|
|
// we should do it before GetKatalystQoSLevelFromResourceReq.
|
|
isDebugPod := util.IsDebugPod(req.Annotations, p.podDebugAnnoKeys)
|
|
|
|
qosLevel, err := util.GetKatalystQoSLevelFromResourceReq(p.qosConfig, req)
|
|
if err != nil {
|
|
err = fmt.Errorf("GetKatalystQoSLevelFromResourceReq for pod: %s/%s, container: %s failed with error: %v",
|
|
req.PodNamespace, req.PodName, req.ContainerName, err)
|
|
general.Errorf("%s", err.Error())
|
|
return nil, err
|
|
}
|
|
|
|
reqInt, err := util.GetQuantityFromResourceReq(req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("getReqQuantityFromResourceReq failed with error: %v", err)
|
|
}
|
|
|
|
general.InfoS("called",
|
|
"podNamespace", req.PodNamespace,
|
|
"podName", req.PodName,
|
|
"containerName", req.ContainerName,
|
|
"podType", req.PodType,
|
|
"podRole", req.PodRole,
|
|
"containerType", req.ContainerType,
|
|
"qosLevel", qosLevel,
|
|
"numCPUs", reqInt,
|
|
"isDebugPod", isDebugPod)
|
|
|
|
if req.ContainerType == pluginapi.ContainerType_INIT || isDebugPod {
|
|
general.Infof("there is no NUMA preference, return nil hint")
|
|
return util.PackResourceHintsResponse(req, string(v1.ResourceCPU),
|
|
map[string]*pluginapi.ListOfTopologyHints{
|
|
string(v1.ResourceCPU): nil, // indicates that there is no numa preference
|
|
})
|
|
}
|
|
|
|
p.RLock()
|
|
defer func() {
|
|
p.RUnlock()
|
|
if err != nil {
|
|
_ = p.emitter.StoreInt64(util.MetricNameGetTopologyHintsFailed, 1, metrics.MetricTypeNameRaw)
|
|
}
|
|
}()
|
|
|
|
if p.hintHandlers[qosLevel] == nil {
|
|
return nil, fmt.Errorf("katalyst QoS level: %s is not supported yet", qosLevel)
|
|
}
|
|
return p.hintHandlers[qosLevel](ctx, req)
|
|
}
|
|
|
|
// GetResourcePluginOptions returns options to be communicated with Resource Manager
|
|
func (p *DynamicPolicy) GetResourcePluginOptions(context.Context,
|
|
*pluginapi.Empty) (*pluginapi.ResourcePluginOptions, error) {
|
|
general.Infof("called")
|
|
return &pluginapi.ResourcePluginOptions{PreStartRequired: false,
|
|
WithTopologyAlignment: true,
|
|
NeedReconcile: true,
|
|
}, nil
|
|
}
|
|
|
|
// Allocate is called during pod admit so that the resource
|
|
// plugin can allocate corresponding resource for the container
|
|
// according to resource request
|
|
func (p *DynamicPolicy) Allocate(ctx context.Context,
|
|
req *pluginapi.ResourceRequest) (resp *pluginapi.ResourceAllocationResponse, respErr error) {
|
|
if req == nil {
|
|
return nil, fmt.Errorf("allocate got nil req")
|
|
}
|
|
|
|
// identify if the pod is a debug pod,
|
|
// if so, apply specific strategy to it.
|
|
// since GetKatalystQoSLevelFromResourceReq function will filter annotations,
|
|
// we should do it before GetKatalystQoSLevelFromResourceReq.
|
|
isDebugPod := util.IsDebugPod(req.Annotations, p.podDebugAnnoKeys)
|
|
|
|
qosLevel, err := util.GetKatalystQoSLevelFromResourceReq(p.qosConfig, req)
|
|
if err != nil {
|
|
err = fmt.Errorf("GetKatalystQoSLevelFromResourceReq for pod: %s/%s, container: %s failed with error: %v",
|
|
req.PodNamespace, req.PodName, req.ContainerName, err)
|
|
general.Errorf("%s", err.Error())
|
|
return nil, err
|
|
}
|
|
|
|
reqInt, err := util.GetQuantityFromResourceReq(req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("getReqQuantityFromResourceReq failed with error: %v", err)
|
|
}
|
|
|
|
general.InfoS("called",
|
|
"podNamespace", req.PodNamespace,
|
|
"podName", req.PodName,
|
|
"containerName", req.ContainerName,
|
|
"podType", req.PodType,
|
|
"podRole", req.PodRole,
|
|
"containerType", req.ContainerType,
|
|
"qosLevel", qosLevel,
|
|
"numCPUs", reqInt,
|
|
"isDebugPod", isDebugPod)
|
|
|
|
if req.ContainerType == pluginapi.ContainerType_INIT {
|
|
return &pluginapi.ResourceAllocationResponse{
|
|
PodUid: req.PodUid,
|
|
PodNamespace: req.PodNamespace,
|
|
PodName: req.PodName,
|
|
ContainerName: req.ContainerName,
|
|
ContainerType: req.ContainerType,
|
|
ContainerIndex: req.ContainerIndex,
|
|
PodRole: req.PodRole,
|
|
PodType: req.PodType,
|
|
ResourceName: string(v1.ResourceCPU),
|
|
Labels: general.DeepCopyMap(req.Labels),
|
|
Annotations: general.DeepCopyMap(req.Annotations),
|
|
}, nil
|
|
} else if isDebugPod {
|
|
return &pluginapi.ResourceAllocationResponse{
|
|
PodUid: req.PodUid,
|
|
PodNamespace: req.PodNamespace,
|
|
PodName: req.PodName,
|
|
ContainerName: req.ContainerName,
|
|
ContainerType: req.ContainerType,
|
|
ContainerIndex: req.ContainerIndex,
|
|
PodRole: req.PodRole,
|
|
PodType: req.PodType,
|
|
ResourceName: string(v1.ResourceCPU),
|
|
AllocationResult: &pluginapi.ResourceAllocation{
|
|
ResourceAllocation: map[string]*pluginapi.ResourceAllocationInfo{
|
|
string(v1.ResourceCPU): {
|
|
// return ResourceAllocation with empty OciPropertyName, AllocatedQuantity, AllocationResult for containers in debug pod,
|
|
// it won't influence oci spec properties of the container
|
|
IsNodeResource: false,
|
|
IsScalarResource: true,
|
|
},
|
|
},
|
|
},
|
|
Labels: general.DeepCopyMap(req.Labels),
|
|
Annotations: general.DeepCopyMap(req.Annotations),
|
|
}, nil
|
|
}
|
|
|
|
p.Lock()
|
|
defer func() {
|
|
// calls sys-advisor to inform the latest container
|
|
if p.enableCPUSysAdvisor && respErr == nil && req.ContainerType != pluginapi.ContainerType_INIT {
|
|
_, err := p.advisorClient.AddContainer(ctx, &advisorapi.AddContainerRequest{
|
|
PodUid: req.PodUid,
|
|
PodNamespace: req.PodNamespace,
|
|
PodName: req.PodName,
|
|
ContainerName: req.ContainerName,
|
|
ContainerType: req.ContainerType,
|
|
ContainerIndex: req.ContainerIndex,
|
|
Labels: maputil.CopySS(req.Labels),
|
|
Annotations: maputil.CopySS(req.Annotations),
|
|
QosLevel: qosLevel,
|
|
RequestQuantity: uint64(reqInt),
|
|
})
|
|
|
|
if err != nil {
|
|
resp = nil
|
|
respErr = fmt.Errorf("add container to qos aware server failed with error: %v", err)
|
|
_ = p.removeContainer(req.PodUid, req.ContainerName)
|
|
}
|
|
} else if respErr != nil {
|
|
_ = p.removeContainer(req.PodUid, req.ContainerName)
|
|
_ = p.emitter.StoreInt64(util.MetricNameAllocateFailed, 1, metrics.MetricTypeNameRaw)
|
|
}
|
|
|
|
p.Unlock()
|
|
return
|
|
}()
|
|
|
|
allocationInfo := p.state.GetAllocationInfo(req.PodUid, req.ContainerName)
|
|
if allocationInfo != nil && allocationInfo.OriginalAllocationResult.Size() >= reqInt {
|
|
general.InfoS("already allocated and meet requirement",
|
|
"podNamespace", req.PodNamespace,
|
|
"podName", req.PodName,
|
|
"containerName", req.ContainerName,
|
|
"numCPUs", reqInt,
|
|
"originalAllocationResult", allocationInfo.OriginalAllocationResult.String(),
|
|
"currentResult", allocationInfo.AllocationResult.String())
|
|
|
|
return &pluginapi.ResourceAllocationResponse{
|
|
PodUid: req.PodUid,
|
|
PodNamespace: req.PodNamespace,
|
|
PodName: req.PodName,
|
|
ContainerName: req.ContainerName,
|
|
ContainerType: req.ContainerType,
|
|
ContainerIndex: req.ContainerIndex,
|
|
PodRole: req.PodRole,
|
|
PodType: req.PodType,
|
|
ResourceName: string(v1.ResourceCPU),
|
|
AllocationResult: &pluginapi.ResourceAllocation{
|
|
ResourceAllocation: map[string]*pluginapi.ResourceAllocationInfo{
|
|
string(v1.ResourceCPU): {
|
|
OciPropertyName: util.OCIPropertyNameCPUSetCPUs,
|
|
IsNodeResource: false,
|
|
IsScalarResource: true,
|
|
AllocatedQuantity: float64(allocationInfo.AllocationResult.Size()),
|
|
AllocationResult: allocationInfo.AllocationResult.String(),
|
|
},
|
|
},
|
|
},
|
|
Labels: general.DeepCopyMap(req.Labels),
|
|
Annotations: general.DeepCopyMap(req.Annotations),
|
|
}, nil
|
|
}
|
|
|
|
if p.allocationHandlers[qosLevel] == nil {
|
|
return nil, fmt.Errorf("katalyst QoS level: %s is not supported yet", qosLevel)
|
|
}
|
|
return p.allocationHandlers[qosLevel](ctx, req)
|
|
}
|
|
|
|
// PreStartContainer is called, if indicated by resource plugin during registration phase,
|
|
// before each container start. Resource plugin can run resource specific operations
|
|
// such as resetting the resource before making resources available to the container
|
|
func (p *DynamicPolicy) PreStartContainer(context.Context,
|
|
*pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
func (p *DynamicPolicy) RemovePod(ctx context.Context,
|
|
req *pluginapi.RemovePodRequest) (resp *pluginapi.RemovePodResponse, err error) {
|
|
if req == nil {
|
|
return nil, fmt.Errorf("RemovePod got nil req")
|
|
}
|
|
general.InfoS("is called", "podUID", req.PodUid)
|
|
|
|
p.Lock()
|
|
defer func() {
|
|
p.Unlock()
|
|
if err != nil {
|
|
_ = p.emitter.StoreInt64(util.MetricNameRemovePodFailed, 1, metrics.MetricTypeNameRaw)
|
|
}
|
|
}()
|
|
|
|
if p.enableCPUSysAdvisor {
|
|
_, err = p.advisorClient.RemovePod(ctx, &advisorapi.RemovePodRequest{PodUid: req.PodUid})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("remove pod in QoS aware server failed with error: %v", err)
|
|
}
|
|
}
|
|
|
|
err = p.removePod(req.PodUid)
|
|
if err != nil {
|
|
general.ErrorS(err, "remove pod failed with error", "podUID", req.PodUid)
|
|
return nil, err
|
|
}
|
|
|
|
err = p.adjustAllocationEntries()
|
|
if err != nil {
|
|
general.ErrorS(err, "adjustAllocationEntries failed", "podUID", req.PodUid)
|
|
}
|
|
|
|
return &pluginapi.RemovePodResponse{}, nil
|
|
}
|
|
|
|
func (p *DynamicPolicy) removePod(podUID string) error {
|
|
podEntries := p.state.GetPodEntries()
|
|
if len(podEntries[podUID]) == 0 {
|
|
return nil
|
|
}
|
|
delete(podEntries, podUID)
|
|
|
|
updatedMachineState, err := state.GenerateMachineStateFromPodEntries(p.machineInfo.CPUTopology, podEntries)
|
|
if err != nil {
|
|
return fmt.Errorf("GenerateMachineStateFromPodEntries failed with error: %v", err)
|
|
}
|
|
|
|
p.state.SetPodEntries(podEntries)
|
|
p.state.SetMachineState(updatedMachineState)
|
|
return nil
|
|
}
|
|
|
|
func (p *DynamicPolicy) removeContainer(podUID, containerName string) error {
|
|
podEntries := p.state.GetPodEntries()
|
|
if podEntries[podUID][containerName] == nil {
|
|
return nil
|
|
}
|
|
delete(podEntries[podUID], containerName)
|
|
|
|
updatedMachineState, err := state.GenerateMachineStateFromPodEntries(p.machineInfo.CPUTopology, podEntries)
|
|
if err != nil {
|
|
return fmt.Errorf("GenerateMachineStateFromPodEntries failed with error: %v", err)
|
|
}
|
|
|
|
p.state.SetPodEntries(podEntries)
|
|
p.state.SetMachineState(updatedMachineState)
|
|
return nil
|
|
}
|
|
|
|
// initAdvisorClientConn initializes cpu-advisor related connections
|
|
func (p *DynamicPolicy) initAdvisorClientConn() (err error) {
|
|
cpuAdvisorConn, err := process.Dial(p.cpuAdvisorSocketAbsPath, 5*time.Second)
|
|
if err != nil {
|
|
err = fmt.Errorf("get cpu advisor connection with socket: %s failed with error: %v", p.cpuAdvisorSocketAbsPath, err)
|
|
return
|
|
}
|
|
|
|
p.advisorClient = advisorapi.NewCPUAdvisorClient(cpuAdvisorConn)
|
|
p.advisorConn = cpuAdvisorConn
|
|
return nil
|
|
}
|
|
|
|
// cleanPools is used to clean pools-related data in local state
|
|
func (p *DynamicPolicy) cleanPools() error {
|
|
remainPools := make(map[string]bool)
|
|
|
|
// walk through pod entries to put them into specified pool maps
|
|
podEntries := p.state.GetPodEntries()
|
|
for _, entries := range podEntries {
|
|
if entries.IsPoolEntry() {
|
|
continue
|
|
}
|
|
|
|
for _, allocationInfo := range entries {
|
|
ownerPool := allocationInfo.GetOwnerPoolName()
|
|
specifiedPool := allocationInfo.GetSpecifiedPoolName()
|
|
if specifiedPool != advisorapi.EmptyOwnerPoolName || ownerPool != advisorapi.EmptyOwnerPoolName {
|
|
remainPools[specifiedPool] = true
|
|
} else if state.CheckReclaimed(allocationInfo) {
|
|
remainPools[state.PoolNameReclaim] = true
|
|
} else if state.CheckShared(allocationInfo) {
|
|
remainPools[state.PoolNameShare] = true
|
|
}
|
|
}
|
|
}
|
|
|
|
// if pool exists in entries, but has no corresponding container, we need to delete it
|
|
poolsToDelete := sets.NewString()
|
|
for poolName, entries := range podEntries {
|
|
if entries.IsPoolEntry() {
|
|
if !remainPools[poolName] && !state.ResidentPools.Has(poolName) {
|
|
poolsToDelete.Insert(poolName)
|
|
}
|
|
}
|
|
}
|
|
|
|
if poolsToDelete.Len() > 0 {
|
|
general.Infof("pools to delete: %v", poolsToDelete.UnsortedList())
|
|
for _, poolName := range poolsToDelete.UnsortedList() {
|
|
delete(podEntries, poolName)
|
|
}
|
|
|
|
machineState, err := state.GenerateMachineStateFromPodEntries(p.machineInfo.CPUTopology, podEntries)
|
|
if err != nil {
|
|
return fmt.Errorf("calculate machineState by podEntries failed with error: %v", err)
|
|
}
|
|
|
|
p.state.SetPodEntries(podEntries)
|
|
p.state.SetMachineState(machineState)
|
|
} else {
|
|
general.Infof("there is no pool to delete")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// initReservePool initializes reserve pool for system cores workload
|
|
func (p *DynamicPolicy) initReservePool() error {
|
|
reserveAllocationInfo := p.state.GetAllocationInfo(state.PoolNameReserve, advisorapi.FakedContainerName)
|
|
if reserveAllocationInfo != nil && !reserveAllocationInfo.AllocationResult.IsEmpty() {
|
|
general.Infof("pool: %s allocation result transform from %s to %s",
|
|
state.PoolNameReserve, reserveAllocationInfo.AllocationResult.String(), p.reservedCPUs)
|
|
}
|
|
|
|
general.Infof("initReservePool %s: %s", state.PoolNameReserve, p.reservedCPUs)
|
|
topologyAwareAssignments, err := machine.GetNumaAwareAssignments(p.machineInfo.CPUTopology, p.reservedCPUs)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to calculate topologyAwareAssignments for pool: %s, result cpuset: %s, error: %v",
|
|
state.PoolNameReserve, p.reservedCPUs.String(), err)
|
|
}
|
|
|
|
curReserveAllocationInfo := &state.AllocationInfo{
|
|
PodUid: state.PoolNameReserve,
|
|
OwnerPoolName: state.PoolNameReserve,
|
|
AllocationResult: p.reservedCPUs.Clone(),
|
|
OriginalAllocationResult: p.reservedCPUs.Clone(),
|
|
TopologyAwareAssignments: topologyAwareAssignments,
|
|
OriginalTopologyAwareAssignments: machine.DeepcopyCPUAssignment(topologyAwareAssignments),
|
|
}
|
|
p.state.SetAllocationInfo(state.PoolNameReserve, advisorapi.FakedContainerName, curReserveAllocationInfo)
|
|
|
|
return nil
|
|
}
|
|
|
|
// initReclaimPool initializes pools for reclaimed-cores.
|
|
// if this info already exists in state-file, just use it, otherwise calculate right away
|
|
func (p *DynamicPolicy) initReclaimPool() error {
|
|
reclaimedAllocationInfo := p.state.GetAllocationInfo(state.PoolNameReclaim, advisorapi.FakedContainerName)
|
|
if reclaimedAllocationInfo == nil {
|
|
podEntries := p.state.GetPodEntries()
|
|
noneResidentCPUs := podEntries.GetFilteredPoolsCPUSet(state.ResidentPools)
|
|
|
|
machineState := p.state.GetMachineState()
|
|
availableCPUs := machineState.GetFilteredAvailableCPUSet(p.reservedCPUs,
|
|
state.CheckDedicated, state.CheckDedicatedNUMABinding).Difference(noneResidentCPUs)
|
|
|
|
var initReclaimedCPUSetSize int
|
|
if availableCPUs.Size() >= reservedReclaimedCPUsSize {
|
|
initReclaimedCPUSetSize = reservedReclaimedCPUsSize
|
|
} else {
|
|
initReclaimedCPUSetSize = availableCPUs.Size()
|
|
}
|
|
|
|
reclaimedCPUSet, _, err := calculator.TakeByNUMABalance(p.machineInfo, availableCPUs, initReclaimedCPUSetSize)
|
|
if err != nil {
|
|
return fmt.Errorf("takeByNUMABalance faild in initReclaimPool for %s and %s with error: %v",
|
|
state.PoolNameShare, state.PoolNameReclaim, err)
|
|
}
|
|
|
|
// for residual pools, we must make them exist even if cause overlap
|
|
// todo: noneResidentCPUs is the same as reservedCPUs, why should we do this?
|
|
allAvailableCPUs := p.machineInfo.CPUDetails.CPUs().Difference(p.reservedCPUs)
|
|
if reclaimedCPUSet.IsEmpty() {
|
|
reclaimedCPUSet, _, err = calculator.TakeByNUMABalance(p.machineInfo, allAvailableCPUs, reservedReclaimedCPUsSize)
|
|
if err != nil {
|
|
return fmt.Errorf("fallback takeByNUMABalance faild in initReclaimPool for %s with error: %v",
|
|
state.PoolNameReclaim, err)
|
|
}
|
|
}
|
|
|
|
for poolName, cset := range map[string]machine.CPUSet{state.PoolNameReclaim: reclaimedCPUSet} {
|
|
general.Infof("initReclaimPool %s: %s", poolName, cset.String())
|
|
topologyAwareAssignments, err := machine.GetNumaAwareAssignments(p.machineInfo.CPUTopology, cset)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to calculate topologyAwareAssignments for pool: %s, "+
|
|
"result cpuset: %s, error: %v", poolName, cset.String(), err)
|
|
}
|
|
|
|
curPoolAllocationInfo := &state.AllocationInfo{
|
|
PodUid: poolName,
|
|
OwnerPoolName: poolName,
|
|
AllocationResult: cset.Clone(),
|
|
OriginalAllocationResult: cset.Clone(),
|
|
TopologyAwareAssignments: topologyAwareAssignments,
|
|
OriginalTopologyAwareAssignments: machine.DeepcopyCPUAssignment(topologyAwareAssignments),
|
|
}
|
|
p.state.SetAllocationInfo(poolName, advisorapi.FakedContainerName, curPoolAllocationInfo)
|
|
}
|
|
} else {
|
|
general.Infof("exist initial %s: %s", state.PoolNameReclaim, reclaimedAllocationInfo.AllocationResult.String())
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// getContainerRequestedCores parses and returns request cores for the given container
|
|
func (p *DynamicPolicy) getContainerRequestedCores(allocationInfo *state.AllocationInfo) int {
|
|
if allocationInfo == nil {
|
|
general.Errorf("got nil allocationInfo")
|
|
return 0
|
|
}
|
|
|
|
if allocationInfo.RequestQuantity == 0 {
|
|
if p.metaServer == nil {
|
|
general.Errorf("got nil metaServer")
|
|
return 0
|
|
}
|
|
|
|
container, err := p.metaServer.GetContainerSpec(allocationInfo.PodUid, allocationInfo.ContainerName)
|
|
if err != nil || container == nil {
|
|
general.Errorf("get container failed with error: %v", err)
|
|
return 0
|
|
}
|
|
|
|
cpuQuantity := native.GetCPUQuantity(container.Resources.Requests)
|
|
allocationInfo.RequestQuantity = general.Max(int(cpuQuantity.Value()), 0)
|
|
general.Infof("get cpu request quantity: %d for pod: %s/%s container: %s from podWatcher",
|
|
allocationInfo.RequestQuantity, allocationInfo.PodNamespace, allocationInfo.PodName, allocationInfo.ContainerName)
|
|
}
|
|
return allocationInfo.RequestQuantity
|
|
}
|