katalyst-core/pkg/agent/sysadvisor/metacache/metacache.go

483 lines
14 KiB
Go

/*
Copyright 2022 The Katalyst Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metacache
import (
"fmt"
"reflect"
"sync"
"time"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types"
"github.com/kubewharf/katalyst-core/pkg/config"
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric"
"github.com/kubewharf/katalyst-core/pkg/util/machine"
)
// [notice]
// to compatible with checkpoint checksum calculation,
// we should make guarantees below in checkpoint properties assignment
// 1. resource.Quantity use resource.MustParse("0") to initialize, not to use resource.Quantity{}
// 2. CPUSet use NewCPUSet(...) to initialize, not to use CPUSet{}
// 3. not use omitempty in map property and must make new map to do initialization
const (
stateFileName string = "sys_advisor_state"
storeStateWarningDuration = 2 * time.Second
)
// MetaReader provides a standard interface to refer to metadata type
type MetaReader interface {
// GetContainerEntries returns a ContainerEntry copy keyed by pod uid
GetContainerEntries(podUID string) (types.ContainerEntries, bool)
// GetContainerInfo returns a ContainerInfo copy keyed by pod uid and container name
GetContainerInfo(podUID string, containerName string) (*types.ContainerInfo, bool)
// GetContainerMetric returns the metric value of a container
GetContainerMetric(podUID string, containerName string, metricName string) (float64, error)
// RangeContainer applies a function to every podUID, containerName, containerInfo set
RangeContainer(f func(podUID string, containerName string, containerInfo *types.ContainerInfo) bool)
// GetPoolInfo returns a PoolInfo copy by pool name
GetPoolInfo(poolName string) (*types.PoolInfo, bool)
// GetPoolSize returns the size of pool as integer
GetPoolSize(poolName string) (int, bool)
// GetRegionInfo returns a RegionInfo copy by region name
GetRegionInfo(regionName string) (*types.RegionInfo, bool)
// RangeRegionInfo applies a function to every regionName, regionInfo set.
// If f returns false, range stops the iteration.
RangeRegionInfo(f func(regionName string, regionInfo *types.RegionInfo) bool)
}
// RawMetaWriter provides a standard interface to modify raw metadata (generated by other agents) in local cache
type RawMetaWriter interface {
// AddContainer adds a container keyed by pod uid and container name. For repeatedly added
// container, only mutable metadata will be updated, i.e. request quantity changed by vpa
AddContainer(podUID string, containerName string, containerInfo *types.ContainerInfo) error
// SetContainerInfo updates ContainerInfo keyed by pod uid and container name
SetContainerInfo(podUID string, containerName string, containerInfo *types.ContainerInfo) error
// RangeAndUpdateContainer applies a function to every podUID, containerName, containerInfo set.
// Not recommended to use if RangeContainer satisfies the requirement.
// If f returns false, range stops the iteration.
RangeAndUpdateContainer(f func(podUID string, containerName string, containerInfo *types.ContainerInfo) bool) error
// DeleteContainer deletes a ContainerInfo keyed by pod uid and container name
DeleteContainer(podUID string, containerName string) error
// RangeAndDeleteContainer applies a function to every podUID, containerName, containerInfo set.
// If f returns true, the containerInfo will be deleted.
RangeAndDeleteContainer(f func(containerInfo *types.ContainerInfo) bool) error
// RemovePod deletes a PodInfo keyed by pod uid. Repeatedly remove will be ignored.
RemovePod(podUID string) error
// SetPoolInfo stores a PoolInfo by pool name
SetPoolInfo(poolName string, poolInfo *types.PoolInfo) error
// DeletePool deletes a PoolInfo keyed by pool name
DeletePool(poolName string) error
// GCPoolEntries deletes GCPoolEntries not existing on node
GCPoolEntries(livingPoolNameSet sets.String) error
}
// AdvisorMetaWriter provides a standard interface to modify advised metadata (generated by sysadvisor)
type AdvisorMetaWriter interface {
// SetRegionEntries overwrites the whole region entries
SetRegionEntries(entries types.RegionEntries) error
// SetRegionInfo stores a RegionInfo by region name
SetRegionInfo(regionName string, regionInfo *types.RegionInfo) error
}
type MetaCache interface {
MetaReader
RawMetaWriter
AdvisorMetaWriter
}
// MetaCacheImp stores metadata and info of pod, node, pool, subnuma etc. as a cache,
// and synchronizes data to sysadvisor state file. It is thread-safe to read and write.
// Deep copy logic is performed during accessing metacache entries instead of directly
// return pointer of each struct to avoid mis-overwrite.
type MetaCacheImp struct {
podEntries types.PodEntries
podMutex sync.RWMutex
poolEntries types.PoolEntries
poolMutex sync.RWMutex
regionEntries types.RegionEntries
regionMutex sync.RWMutex
checkpointManager checkpointmanager.CheckpointManager
checkpointName string
metricsFetcher metric.MetricsFetcher
}
var _ MetaCache = &MetaCacheImp{}
// NewMetaCacheImp returns the single instance of MetaCacheImp
func NewMetaCacheImp(conf *config.Configuration, metricsFetcher metric.MetricsFetcher) (*MetaCacheImp, error) {
stateFileDir := conf.GenericSysAdvisorConfiguration.StateFileDirectory
checkpointManager, err := checkpointmanager.NewCheckpointManager(stateFileDir)
if err != nil {
return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
}
mc := &MetaCacheImp{
podEntries: make(types.PodEntries),
poolEntries: make(types.PoolEntries),
regionEntries: make(types.RegionEntries),
checkpointManager: checkpointManager,
checkpointName: stateFileName,
metricsFetcher: metricsFetcher,
}
// Restore from checkpoint before any function call to metacache api
if err := mc.restoreState(); err != nil {
return mc, err
}
return mc, nil
}
/*
standard implementation for metaReader
*/
func (mc *MetaCacheImp) GetContainerEntries(podUID string) (types.ContainerEntries, bool) {
mc.podMutex.RLock()
defer mc.podMutex.RUnlock()
v, ok := mc.podEntries[podUID]
return v.Clone(), ok
}
func (mc *MetaCacheImp) GetContainerInfo(podUID string, containerName string) (*types.ContainerInfo, bool) {
mc.podMutex.RLock()
defer mc.podMutex.RUnlock()
podInfo, ok := mc.podEntries[podUID]
if !ok {
return nil, false
}
containerInfo, ok := podInfo[containerName]
return containerInfo.Clone(), ok
}
// RangeContainer should deepcopy so that pod and container entries will not be overwritten.
func (mc *MetaCacheImp) RangeContainer(f func(podUID string, containerName string, containerInfo *types.ContainerInfo) bool) {
mc.podMutex.RLock()
defer mc.podMutex.RUnlock()
for podUID, podInfo := range mc.podEntries.Clone() {
for containerName, containerInfo := range podInfo {
if !f(podUID, containerName, containerInfo) {
break
}
}
}
}
func (mc *MetaCacheImp) GetContainerMetric(podUID string, containerName string, metricName string) (float64, error) {
return mc.metricsFetcher.GetContainerMetric(podUID, containerName, metricName)
}
func (mc *MetaCacheImp) GetPoolInfo(poolName string) (*types.PoolInfo, bool) {
mc.poolMutex.RLock()
defer mc.poolMutex.RUnlock()
poolInfo, ok := mc.poolEntries[poolName]
return poolInfo.Clone(), ok
}
func (mc *MetaCacheImp) GetPoolSize(poolName string) (int, bool) {
mc.poolMutex.RLock()
defer mc.poolMutex.RUnlock()
pi, ok := mc.poolEntries[poolName]
if !ok {
return 0, false
}
return machine.CountCPUAssignmentCPUs(pi.TopologyAwareAssignments), true
}
func (mc *MetaCacheImp) GetRegionInfo(regionName string) (*types.RegionInfo, bool) {
mc.regionMutex.RLock()
defer mc.regionMutex.RUnlock()
regionInfo, ok := mc.regionEntries[regionName]
return regionInfo.Clone(), ok
}
func (mc *MetaCacheImp) RangeRegionInfo(f func(regionName string, regionInfo *types.RegionInfo) bool) {
mc.regionMutex.RLock()
defer mc.regionMutex.RUnlock()
for regionName, regionInfo := range mc.regionEntries.Clone() {
if !f(regionName, regionInfo) {
break
}
}
}
/*
standard implementation for RawMetaWriter
*/
func (mc *MetaCacheImp) AddContainer(podUID string, containerName string, containerInfo *types.ContainerInfo) error {
mc.podMutex.Lock()
defer mc.podMutex.Unlock()
if podInfo, ok := mc.podEntries[podUID]; ok {
if ci, ok := podInfo[containerName]; ok {
ci.UpdateMeta(containerInfo)
return nil
}
}
if mc.setContainerInfo(podUID, containerName, containerInfo) {
return mc.storeState()
}
return nil
}
func (mc *MetaCacheImp) SetContainerInfo(podUID string, containerName string, containerInfo *types.ContainerInfo) error {
mc.podMutex.Lock()
defer mc.podMutex.Unlock()
if mc.setContainerInfo(podUID, containerName, containerInfo) {
return mc.storeState()
}
return nil
}
func (mc *MetaCacheImp) setContainerInfo(podUID string, containerName string, containerInfo *types.ContainerInfo) bool {
podInfo, ok := mc.podEntries[podUID]
if !ok {
mc.podEntries[podUID] = make(types.ContainerEntries)
podInfo = mc.podEntries[podUID]
}
if reflect.DeepEqual(podInfo[containerName], containerInfo) {
return false
} else {
podInfo[containerName] = containerInfo
return true
}
}
func (mc *MetaCacheImp) RangeAndUpdateContainer(f func(podUID string, containerName string, containerInfo *types.ContainerInfo) bool) error {
mc.podMutex.Lock()
defer mc.podMutex.Unlock()
oldPodEntries := mc.podEntries.Clone()
for podUID, podInfo := range mc.podEntries {
for containerName, containerInfo := range podInfo {
if !f(podUID, containerName, containerInfo) {
break
}
}
}
if !reflect.DeepEqual(oldPodEntries, mc.podEntries) {
return mc.storeState()
}
return nil
}
func (mc *MetaCacheImp) DeleteContainer(podUID string, containerName string) error {
mc.podMutex.Lock()
defer mc.podMutex.Unlock()
if mc.deleteContainer(podUID, containerName) {
return mc.storeState()
}
return nil
}
func (mc *MetaCacheImp) RangeAndDeleteContainer(f func(containerInfo *types.ContainerInfo) bool) error {
mc.podMutex.Lock()
defer mc.podMutex.Unlock()
needStoreState := false
for _, podInfo := range mc.podEntries {
for _, containerInfo := range podInfo {
if f(containerInfo) {
if mc.deleteContainer(containerInfo.PodUID, containerInfo.ContainerName) {
needStoreState = true
}
}
}
}
if needStoreState {
return mc.storeState()
}
return nil
}
func (mc *MetaCacheImp) deleteContainer(podUID string, containerName string) bool {
podInfo, ok := mc.podEntries[podUID]
if !ok {
return false
}
_, ok = podInfo[containerName]
if !ok {
return false
}
delete(podInfo, containerName)
if len(podInfo) <= 0 {
delete(mc.podEntries, podUID)
}
return true
}
func (mc *MetaCacheImp) RemovePod(podUID string) error {
mc.podMutex.Lock()
defer mc.podMutex.Unlock()
_, ok := mc.podEntries[podUID]
if !ok {
return nil
}
delete(mc.podEntries, podUID)
return mc.storeState()
}
/*
standard implementation for AdvisorMetaWriter
*/
func (mc *MetaCacheImp) SetPoolInfo(poolName string, poolInfo *types.PoolInfo) error {
mc.poolMutex.Lock()
defer mc.poolMutex.Unlock()
if reflect.DeepEqual(mc.poolEntries[poolName], poolInfo) {
return nil
}
mc.poolEntries[poolName] = poolInfo
return mc.storeState()
}
func (mc *MetaCacheImp) DeletePool(poolName string) error {
mc.poolMutex.Lock()
defer mc.poolMutex.Unlock()
if _, ok := mc.poolEntries[poolName]; !ok {
return nil
}
delete(mc.poolEntries, poolName)
return mc.storeState()
}
func (mc *MetaCacheImp) GCPoolEntries(livingPoolNameSet sets.String) error {
mc.poolMutex.Lock()
defer mc.poolMutex.Unlock()
needStoreState := false
for poolName := range mc.poolEntries {
if _, ok := livingPoolNameSet[poolName]; !ok {
delete(mc.poolEntries, poolName)
needStoreState = true
}
}
if needStoreState {
return mc.storeState()
}
return nil
}
func (mc *MetaCacheImp) SetRegionEntries(entries types.RegionEntries) error {
mc.regionMutex.Lock()
defer mc.regionMutex.Unlock()
oldRegionEntries := mc.regionEntries.Clone()
mc.regionEntries = entries.Clone()
if !reflect.DeepEqual(oldRegionEntries, mc.regionEntries) {
return mc.storeState()
}
return nil
}
func (mc *MetaCacheImp) SetRegionInfo(regionName string, regionInfo *types.RegionInfo) error {
mc.regionMutex.Lock()
defer mc.regionMutex.Unlock()
if reflect.DeepEqual(mc.regionEntries[regionName], regionInfo) {
return nil
} else {
mc.regionEntries[regionName] = regionInfo
return mc.storeState()
}
}
/*
other helper functions
*/
func (mc *MetaCacheImp) storeState() error {
checkpoint := NewMetaCacheCheckpoint()
checkpoint.PodEntries = mc.podEntries
checkpoint.PoolEntries = mc.poolEntries
checkpoint.RegionEntries = mc.regionEntries
begin := time.Now()
defer func() {
duration := time.Since(begin)
if duration > storeStateWarningDuration {
klog.ErrorS(fmt.Errorf("storeState took too long"), "storeState took longer than expected", "expected", storeStateWarningDuration, "actual", duration.Round(time.Millisecond))
}
}()
if err := mc.checkpointManager.CreateCheckpoint(mc.checkpointName, checkpoint); err != nil {
klog.Errorf("[metacache] store state failed: %v", err)
return err
}
klog.Infof("[metacache] store state succeeded")
return nil
}
func (mc *MetaCacheImp) restoreState() error {
checkpoint := NewMetaCacheCheckpoint()
if err := mc.checkpointManager.GetCheckpoint(mc.checkpointName, checkpoint); err != nil {
klog.Infof("[metacache] checkpoint %v err %v, create it", mc.checkpointName, err)
return mc.storeState()
}
mc.podEntries = checkpoint.PodEntries
mc.poolEntries = checkpoint.PoolEntries
mc.regionEntries = checkpoint.RegionEntries
klog.Infof("[metacache] restore state succeeded")
return nil
}