katalyst-core/pkg/config/generic/qos.go

373 lines
13 KiB
Go

/*
Copyright 2022 The Katalyst Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package generic
import (
"encoding/json"
"fmt"
"sync"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
apiconsts "github.com/kubewharf/katalyst-api/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/util/general"
)
// defaultQoSLevel willed b e use as default QoS Level if nothing in annotation
const defaultQoSLevel = apiconsts.PodAnnotationQoSLevelSharedCores
type qosValidationFunc func(map[string]string) (bool, error)
// validQosEnhancementKey contains all the enhancement that Katalyst supports
var validQosEnhancementKey = sets.NewString(
apiconsts.PodAnnotationCPUEnhancementKey,
apiconsts.PodAnnotationMemoryEnhancementKey,
apiconsts.PodAnnotationNetworkEnhancementKey,
)
// QoSConfiguration stores the qos configurations needed by core katalyst components.
// since we may have legacy QoS judgement ways, we should map those legacy configs
// into standard katalyst QoS Level.
type QoSConfiguration struct {
sync.RWMutex
// QoSClassAnnotationSelector is used as an expanded way to match legacy specified
// QoS annotations into standard katalyst QoS level
// - if no expended selector is configured
// --- only use the default key-value instead
// - if multiple expended selectors are defined
// --- returns true if anyone matches
// - we should also do validation
QoSClassAnnotationSelector map[string]map[string]string
// QoSEnhancementAnnotationKey is used as an expanded way to match legacy specified
// QoS annotations into standard katalyst QoS enhancement
QoSEnhancementAnnotationSelector map[string]string
// qosCheckFunc is used as a syntactic sugar to easily walk through
// all QoS Level validation functions
qosCheckFuncMap map[string]qosValidationFunc
// for different situation, there may be different default values for enhancement keys
// we use options to control those different values
// the key here is specific enhancement key such as "numa_binding", "numa_exclusive"
// the value is the default value of the key
EnhancementDefaultValues map[string]string
}
// NewQoSConfiguration creates a new qos configuration.
func NewQoSConfiguration() *QoSConfiguration {
c := &QoSConfiguration{
QoSClassAnnotationSelector: map[string]map[string]string{
apiconsts.PodAnnotationQoSLevelSharedCores: {
apiconsts.PodAnnotationQoSLevelKey: apiconsts.PodAnnotationQoSLevelSharedCores,
},
apiconsts.PodAnnotationQoSLevelDedicatedCores: {
apiconsts.PodAnnotationQoSLevelKey: apiconsts.PodAnnotationQoSLevelDedicatedCores,
},
apiconsts.PodAnnotationQoSLevelReclaimedCores: {
apiconsts.PodAnnotationQoSLevelKey: apiconsts.PodAnnotationQoSLevelReclaimedCores,
},
apiconsts.PodAnnotationQoSLevelSystemCores: {
apiconsts.PodAnnotationQoSLevelKey: apiconsts.PodAnnotationQoSLevelSystemCores,
},
},
QoSEnhancementAnnotationSelector: make(map[string]string),
EnhancementDefaultValues: make(map[string]string),
}
c.qosCheckFuncMap = map[string]qosValidationFunc{
apiconsts.PodAnnotationQoSLevelSharedCores: c.CheckSharedQoS,
apiconsts.PodAnnotationQoSLevelDedicatedCores: c.CheckDedicatedQoS,
apiconsts.PodAnnotationQoSLevelReclaimedCores: c.CheckReclaimedQoS,
apiconsts.PodAnnotationQoSLevelSystemCores: c.CheckSystemQoS,
}
return c
}
// FilterQoSAndEnhancement filter map that are related to katalyst QoS and katalyst Enhancement.
// for enhancements,we should unmarshal and store the unmarshal key-value.
// it works both for default katalyst QoS keys and expanded QoS keys.
func (c *QoSConfiguration) FilterQoSAndEnhancement(annotations map[string]string) (map[string]string, error) {
filteredAnnotations := c.FilterQoSMap(annotations)
c.RLock()
defer c.RUnlock()
validEnhancementKeyList := validQosEnhancementKey.List()
for enhancementExpandKey := range c.QoSEnhancementAnnotationSelector {
validEnhancementKeyList = append(validEnhancementKeyList, enhancementExpandKey)
}
for _, enhancementKey := range validEnhancementKeyList {
if annotations[enhancementKey] != "" {
enhancementKVs := make(map[string]string)
err := json.Unmarshal([]byte(annotations[enhancementKey]), &enhancementKVs)
if err != nil {
return nil, fmt.Errorf("unmarshal %s: %s failed with error: %v",
enhancementKey, annotations[enhancementKey], err)
}
for key, val := range enhancementKVs {
if filteredAnnotations[key] != "" {
klog.Warningf("[FilterQoSAndEnhancement] get %s:%s from %s, "+
"but the kv already exists: %s:%s", key, val, enhancementKey, key, filteredAnnotations[key])
}
filteredAnnotations[key] = val
}
}
}
for enhancementKey, defaultValue := range c.EnhancementDefaultValues {
if _, found := filteredAnnotations[enhancementKey]; !found {
klog.Infof("[FilterQoSAndEnhancement] enhancementKey: %s isn't declared, "+
"set its value to defaultValue: %s", enhancementKey, defaultValue)
filteredAnnotations[enhancementKey] = defaultValue
}
}
return filteredAnnotations, nil
}
// FilterQoSMap filter map that are related to katalyst QoS.
// it works both for default katalyst QoS keys and expanded QoS keys
func (c *QoSConfiguration) FilterQoSMap(annotations map[string]string) map[string]string {
c.RLock()
defer c.RUnlock()
filteredAnnotations := make(map[string]string)
for qos := range c.QoSClassAnnotationSelector {
for qosExpand := range c.QoSClassAnnotationSelector[qos] {
if val, ok := annotations[qosExpand]; ok {
filteredAnnotations[qosExpand] = val
}
}
}
return filteredAnnotations
}
func (c *QoSConfiguration) SetExpandQoSLevelSelector(qosLevel string, selectorMap map[string]string) {
if _, ok := c.qosCheckFuncMap[qosLevel]; !ok {
return
}
c.Lock()
defer c.Unlock()
c.QoSClassAnnotationSelector[qosLevel] = general.MergeMap(c.QoSClassAnnotationSelector[qosLevel], selectorMap)
}
func (c *QoSConfiguration) GetQoSLevelForPod(pod *v1.Pod) (string, error) {
if pod == nil {
return "", fmt.Errorf("nil pod")
}
return c.GetQoSLevel(pod.Annotations)
}
// GetQoSLevel returns the standard katalyst QoS Level for given annotations;
// - returns error if there is conflict in qos level annotations or can't get valid qos level.
// - returns defaultQoSLevel if nothing matches and isNotDefaultQoSLevel is false.
func (c *QoSConfiguration) GetQoSLevel(annotations map[string]string) (string, error) {
isNotDefaultQoSLevel := false
for qos := range c.QoSClassAnnotationSelector {
identified, matched, err := c.checkQosMatched(annotations, qos)
if err != nil {
klog.Errorf("check qos level %v for annotation failed: %v", qos, err)
return "", err
} else if identified {
if matched {
return qos, nil
} else if qos == defaultQoSLevel {
isNotDefaultQoSLevel = true
}
}
}
if isNotDefaultQoSLevel {
return "", fmt.Errorf("can't get valid qos level")
}
return defaultQoSLevel, nil
}
func (c *QoSConfiguration) SetExpandQoSEnhancementSelector(enhancementAdapter map[string]string) {
c.Lock()
defer c.Unlock()
for defaultKey, expandedKey := range enhancementAdapter {
if validQosEnhancementKey.Has(defaultKey) {
c.QoSEnhancementAnnotationSelector[expandedKey] = defaultKey
}
}
}
func (c *QoSConfiguration) GetQoSEnhancementsForPod(pod *v1.Pod) map[string]string {
if pod == nil {
return map[string]string{}
}
return c.GetQoSEnhancements(pod.Annotations)
}
// GetQoSEnhancements returns the standard katalyst QoS Enhancement Map for given annotations;
// - ignore conflict cases: default enhancement key always prior to expand enhancement key
func (c *QoSConfiguration) GetQoSEnhancements(annotations map[string]string) map[string]string {
res := make(map[string]string)
c.RLock()
defer c.RUnlock()
for k, v := range annotations {
if validQosEnhancementKey.Has(k) {
res[k] = v
} else if defaultK, ok := c.QoSEnhancementAnnotationSelector[k]; ok {
if _, exist := res[defaultK]; !exist {
res[defaultK] = v
}
}
}
return res
}
func (c *QoSConfiguration) CheckReclaimedQoSForPod(pod *v1.Pod) (bool, error) {
if pod == nil {
return false, nil
}
return c.CheckReclaimedQoS(pod.Annotations)
}
// CheckReclaimedQoS returns true if the annotation indicates for ReclaimedCores;
// - returns error if different QoS configurations conflict with each other.
func (c *QoSConfiguration) CheckReclaimedQoS(annotations map[string]string) (bool, error) {
if qosLevel, err := c.GetQoSLevel(annotations); err != nil {
return false, err
} else {
return qosLevel == apiconsts.PodAnnotationQoSLevelReclaimedCores, nil
}
}
func (c *QoSConfiguration) CheckSharedQoSForPod(pod *v1.Pod) (bool, error) {
if pod == nil {
return false, nil
}
return c.CheckSharedQoS(pod.Annotations)
}
// CheckSharedQoS returns true if the annotation indicates for SharedCores;
// - returns error if different QoS configurations conflict with each other.
func (c *QoSConfiguration) CheckSharedQoS(annotations map[string]string) (bool, error) {
if qosLevel, err := c.GetQoSLevel(annotations); err != nil {
return false, err
} else {
return qosLevel == apiconsts.PodAnnotationQoSLevelSharedCores, nil
}
}
func (c *QoSConfiguration) CheckDedicatedQoSForPod(pod *v1.Pod) (bool, error) {
if pod == nil {
return false, nil
}
return c.CheckDedicatedQoS(pod.Annotations)
}
// CheckDedicatedQoS returns true if the annotation indicates for DedicatedCores;
// - returns error if different QoS configurations conflict with each other.
func (c *QoSConfiguration) CheckDedicatedQoS(annotations map[string]string) (bool, error) {
if qosLevel, err := c.GetQoSLevel(annotations); err != nil {
return false, err
} else {
return qosLevel == apiconsts.PodAnnotationQoSLevelDedicatedCores, nil
}
}
func (c *QoSConfiguration) CheckSystemQoSForPod(pod *v1.Pod) (bool, error) {
if pod == nil {
return false, nil
}
return c.CheckSystemQoS(pod.Annotations)
}
// CheckSystemQoS returns true if the annotation indicates for SystemCores;
// - returns error if different QoS configurations conflict with each other.
func (c *QoSConfiguration) CheckSystemQoS(annotations map[string]string) (bool, error) {
if qosLevel, err := c.GetQoSLevel(annotations); err != nil {
return false, err
} else {
return qosLevel == apiconsts.PodAnnotationQoSLevelSystemCores, nil
}
}
// SetEnhancementDefaultValues set default values for enhancement keys
// because sometimes we need different default values for enhancement keys in different types of clusters
func (c *QoSConfiguration) SetEnhancementDefaultValues(enhancementDefaultValues map[string]string) {
c.Lock()
defer c.Unlock()
c.EnhancementDefaultValues = general.MergeMap(c.EnhancementDefaultValues, enhancementDefaultValues)
}
// checkQosMatched is a unified helper function to judge whether annotation
// matches with the given QoS Level;
// return
// - identified: returning true if the function identifies the qos level matches according to QoSClassAnnotationSelector, else false.
// - matched: returning true if annotations match with qosValue, else false.
// - error: return err != nil if different QoS configurations conflict with each other
func (c *QoSConfiguration) checkQosMatched(annotations map[string]string, qosValue string) (identified bool, matched bool, err error) {
c.RLock()
defer c.RUnlock()
valueNotEqualCnt, valueEqualCnt := 0, 0
for key, value := range c.QoSClassAnnotationSelector[qosValue] {
_, valueNotEqual, valueEqual := checkKeyValueMatched(annotations, key, value)
valueNotEqualCnt, valueEqualCnt = valueNotEqualCnt+valueNotEqual, valueEqualCnt+valueEqual
}
if valueEqualCnt > 0 {
// some key-value list match while others don't
if valueNotEqualCnt > 0 {
return false, false, fmt.Errorf("qos %v conflicts, matched count %v, mis matched count %v", qosValue, valueEqualCnt, valueNotEqualCnt)
}
// some key-value list match and some key may not exist
return true, true, nil
} else if valueNotEqualCnt == 0 {
return false, false, nil
}
return true, false, nil
}
// checkKeyValueMatched checks whether the given key-value pair exists in the map
// if the returns value equals 1, it represents
// - key not exists
// - key exists, but value not equals
// - key exists, and value not equal
// returns 0 otherwise
func checkKeyValueMatched(m map[string]string, key, value string) (keyNotExist int, valueNotEqual int, valueEqual int) {
v, ok := m[key]
if !ok {
keyNotExist = 1
} else if v != value {
valueNotEqual = 1
} else {
valueEqual = 1
}
return
}