katalyst-core/pkg/agent/sysadvisor/sysadvisor.go

163 lines
5.0 KiB
Go

/*
Copyright 2022 The Katalyst Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sysadvisor
import (
"context"
"fmt"
"sync"
"time"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache"
pkgplugin "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin"
metacacheplugin "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/metacache"
metricemitter "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/metric-emitter"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/qosaware"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types"
"github.com/kubewharf/katalyst-core/pkg/config"
"github.com/kubewharf/katalyst-core/pkg/metaserver"
metricspool "github.com/kubewharf/katalyst-core/pkg/metrics/metrics-pool"
"github.com/kubewharf/katalyst-core/pkg/util/general"
)
const initTimeout = 10 * time.Second
func init() {
pkgplugin.RegisterAdvisorPlugin(types.AdvisorPluginNameQoSAware, qosaware.NewQoSAwarePlugin)
pkgplugin.RegisterAdvisorPlugin(types.AdvisorPluginNameMetaCache, metacacheplugin.NewMetaCachePlugin)
pkgplugin.RegisterAdvisorPlugin(types.AdvisorPluginNameMetricEmitter, metricemitter.NewCustomMetricEmitter)
}
// AdvisorAgent for sysadvisor
type AdvisorAgent struct {
// those are parameters that be passed to sysadvisor when starting agents.
config *config.Configuration
extraConf interface{}
metaServer *metaserver.MetaServer
emitPool metricspool.MetricsEmitterPool
plugins []pkgplugin.SysAdvisorPlugin
pluginsToRun []pkgplugin.SysAdvisorPlugin
wgInitPlugin sync.WaitGroup
mutex sync.Mutex
}
// NewAdvisorAgent initializes the sysadvisor agent logic.
func NewAdvisorAgent(conf *config.Configuration, extraConf interface{}, metaServer *metaserver.MetaServer,
emitPool metricspool.MetricsEmitterPool) (*AdvisorAgent, error) {
agent := &AdvisorAgent{
config: conf,
extraConf: extraConf,
metaServer: metaServer,
emitPool: emitPool,
plugins: make([]pkgplugin.SysAdvisorPlugin, 0),
pluginsToRun: make([]pkgplugin.SysAdvisorPlugin, 0),
}
if err := agent.getAdvisorPlugins(pkgplugin.GetRegisteredAdvisorPlugins()); err != nil {
return nil, err
}
agent.init()
return agent, nil
}
func (m *AdvisorAgent) getAdvisorPlugins(SysAdvisorPluginInitializers map[string]pkgplugin.AdvisorPluginInitFunc) error {
metaCache, err := metacache.NewMetaCacheImp(m.config, m.metaServer.MetricsFetcher)
if err != nil {
return fmt.Errorf("new metacache failed: %v", err)
}
for pluginName, initFn := range SysAdvisorPluginInitializers {
if !general.IsNameEnabled(pluginName, sets.NewString(), m.config.GenericSysAdvisorConfiguration.SysAdvisorPlugins) {
klog.Warningf("[sysadvisor] %s plugin is disabled", pluginName)
continue
}
klog.Infof("[sysadvisor] %s plugin is enabled", pluginName)
curPlugin, err := initFn(m.config, m.extraConf, m.emitPool, m.metaServer, metaCache)
if err != nil {
return fmt.Errorf("failed to start sysadvisor plugin %v: %v", pluginName, err)
}
m.plugins = append(m.plugins, curPlugin)
}
return nil
}
// Asynchronous initialization with timeout. Timeout plugin will neither be killed nor started.
func (m *AdvisorAgent) init() {
for _, plugin := range m.plugins {
p := context.TODO()
c, cancel := context.WithTimeout(p, initTimeout)
defer cancel()
m.wgInitPlugin.Add(1)
go func(ctx context.Context, plugin pkgplugin.SysAdvisorPlugin) {
defer m.wgInitPlugin.Done()
ch := make(chan error, 1)
go func(plugin pkgplugin.SysAdvisorPlugin) {
err := plugin.Init()
ch <- err
}(plugin)
for {
select {
case err := <-ch:
if err != nil {
klog.Errorf("[sysadvisor] initialize plugin %v with error: %v; do not start it", plugin.Name(), err)
} else {
m.mutex.Lock()
m.pluginsToRun = append(m.pluginsToRun, plugin)
m.mutex.Unlock()
klog.Infof("[sysadvisor] plugin %v initialized", plugin.Name())
}
return
case <-ctx.Done():
klog.Errorf("[sysadvisor] initialize plugin %v timeout, limit %v; ignore and do not start it", plugin.Name(), initTimeout)
return
}
}
}(c, plugin)
}
m.wgInitPlugin.Wait()
}
// Run starts sysadvisor agent
func (m *AdvisorAgent) Run(ctx context.Context) {
wg := sync.WaitGroup{}
// sysadvisor plugin can both run synchronously or asynchronously
for _, plugin := range m.pluginsToRun {
wg.Add(1)
go func(plugin pkgplugin.SysAdvisorPlugin) {
defer wg.Done()
klog.Infof("[sysadvisor] start plugin %v", plugin.Name())
plugin.Run(ctx)
}(plugin)
}
wg.Wait()
<-ctx.Done()
}