forked from kubewharf/katalyst-core
415 lines
14 KiB
Go
415 lines
14 KiB
Go
/*
|
|
Copyright 2022 The Katalyst Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
// Package fetcher is a framework to collect resources from multiple plugins
|
|
// (both in-tree and out-of-tree implementations) and push contents to reporter
|
|
// manager to assemble and update thrugh APIServer.
|
|
package fetcher // import "github.com/kubewharf/katalyst-core/pkg/reportermanager/fetcher"
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"google.golang.org/grpc/metadata"
|
|
"google.golang.org/grpc/status"
|
|
"k8s.io/apimachinery/pkg/util/errors"
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/klog/v2"
|
|
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
|
|
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
|
|
cpmerrors "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
|
|
|
|
"github.com/kubewharf/katalyst-api/pkg/plugins/registration"
|
|
"github.com/kubewharf/katalyst-api/pkg/protocol/reporterplugin/v1alpha1"
|
|
"github.com/kubewharf/katalyst-core/pkg/agent/resourcemanager/fetcher/checkpoint"
|
|
"github.com/kubewharf/katalyst-core/pkg/agent/resourcemanager/fetcher/kubelet"
|
|
"github.com/kubewharf/katalyst-core/pkg/agent/resourcemanager/fetcher/plugin"
|
|
"github.com/kubewharf/katalyst-core/pkg/agent/resourcemanager/fetcher/system"
|
|
"github.com/kubewharf/katalyst-core/pkg/agent/resourcemanager/reporter"
|
|
"github.com/kubewharf/katalyst-core/pkg/config"
|
|
"github.com/kubewharf/katalyst-core/pkg/metaserver"
|
|
"github.com/kubewharf/katalyst-core/pkg/metrics"
|
|
"github.com/kubewharf/katalyst-core/pkg/util/general"
|
|
)
|
|
|
|
const reporterManagerCheckpoint = "reporter_manager_checkpoint"
|
|
|
|
const healthzNameReporterFetcherReady = "ReporterFetcherReady"
|
|
|
|
// ReporterPluginManager is used to manage in-tree or out-tree reporter plugin registrations and
|
|
// get report content from these plugins to aggregate them into the Reporter Manager
|
|
type ReporterPluginManager struct {
|
|
// callback is used for reporting in one time call.
|
|
callback plugin.ListAndWatchCallback
|
|
|
|
// map pluginName to its corresponding endpoint implementation
|
|
mutex sync.Mutex
|
|
innerEndpoints sets.String
|
|
endpoints map[string]plugin.Endpoint
|
|
|
|
checkpointManager checkpointmanager.CheckpointManager
|
|
|
|
reporter reporter.Manager
|
|
emitter metrics.MetricEmitter
|
|
|
|
// reconcilePeriod is the duration between calls to sync.
|
|
reconcilePeriod time.Duration
|
|
syncFunc func(ctx context.Context)
|
|
|
|
// healthzState records last time that the corresponding module is determined as healthy.
|
|
healthzState sync.Map
|
|
}
|
|
|
|
var innerReporterPluginsDisabledByDefault = sets.NewString()
|
|
|
|
// NewReporterPluginManager creates a new reporter plugin manager.
|
|
func NewReporterPluginManager(reporterMgr reporter.Manager, emitter metrics.MetricEmitter,
|
|
metaServer *metaserver.MetaServer, conf *config.Configuration) (*ReporterPluginManager, error) {
|
|
manager := &ReporterPluginManager{
|
|
innerEndpoints: sets.NewString(),
|
|
endpoints: make(map[string]plugin.Endpoint),
|
|
reporter: reporterMgr,
|
|
emitter: emitter,
|
|
reconcilePeriod: conf.CollectInterval,
|
|
}
|
|
|
|
manager.syncFunc = manager.genericSync
|
|
manager.callback = manager.genericCallback
|
|
|
|
checkpointManager, err := checkpointmanager.NewCheckpointManager(conf.CheckpointManagerDir)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
|
|
}
|
|
manager.checkpointManager = checkpointManager
|
|
|
|
// load remote endpoints report response information from disk.
|
|
err = manager.readCheckpoint()
|
|
if err != nil {
|
|
_ = emitter.StoreInt64("reporter_plugin_checkpoint_read_failed", 1, metrics.MetricTypeNameCount)
|
|
klog.Warningf("continue after failing to read checkpoint file. response info from reporter plugin may NOT be up-to-date. Err: %v", err)
|
|
}
|
|
|
|
// register inner reporter plugins
|
|
err = manager.registerInnerReporterPlugins(emitter, metaServer, conf, manager.genericCallback, newReporterPluginInitializers())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("get inner reporter plugin failed: %s", err)
|
|
}
|
|
|
|
return manager, nil
|
|
}
|
|
|
|
// newReporterPluginInitializers adds in-tree reporter plugins into init function list
|
|
func newReporterPluginInitializers() map[string]plugin.InitFunc {
|
|
innerReporterPluginInitializers := make(map[string]plugin.InitFunc)
|
|
innerReporterPluginInitializers[system.PluginName] = system.NewSystemReporterPlugin
|
|
innerReporterPluginInitializers[kubelet.PluginName] = kubelet.NewKubeletReporterPlugin
|
|
return innerReporterPluginInitializers
|
|
}
|
|
|
|
func (m *ReporterPluginManager) registerInnerReporterPlugins(emitter metrics.MetricEmitter,
|
|
metaServer *metaserver.MetaServer, conf *config.Configuration, callback plugin.ListAndWatchCallback,
|
|
innerReporterPluginInitializers map[string]plugin.InitFunc) error {
|
|
var (
|
|
errList []error
|
|
)
|
|
|
|
for pluginName, initFn := range innerReporterPluginInitializers {
|
|
if !general.IsNameEnabled(pluginName, innerReporterPluginsDisabledByDefault, conf.GenericReporterConfiguration.InnerPlugins) {
|
|
klog.Infof("reporter plugin %s is disabled", pluginName)
|
|
continue
|
|
}
|
|
|
|
curPlugin, err := initFn(emitter, metaServer, conf, callback)
|
|
if err != nil {
|
|
errList = append(errList, err)
|
|
continue
|
|
}
|
|
|
|
err = m.registerPlugin(pluginName, curPlugin)
|
|
if err != nil {
|
|
errList = append(errList, err)
|
|
continue
|
|
}
|
|
|
|
m.innerEndpoints.Insert(pluginName)
|
|
}
|
|
|
|
if len(errList) > 0 {
|
|
return errors.NewAggregate(errList)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetHandlerType get manage plugin type
|
|
func (m *ReporterPluginManager) GetHandlerType() string {
|
|
return registration.ReporterPlugin
|
|
}
|
|
|
|
// ValidatePlugin is to validate the plugin info is supported
|
|
func (m *ReporterPluginManager) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
|
|
klog.Infof("[reporter manager] get Plugin %s at Endpoint %s with versions %v", pluginName, endpoint, versions)
|
|
|
|
if !m.isVersionCompatibleWithPlugin(versions) {
|
|
return fmt.Errorf("reporter manager version, %s, is not among plugin supported versions %v", pluginapi.Version, versions)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// RegisterPlugin is to handle plugin register event
|
|
func (m *ReporterPluginManager) RegisterPlugin(pluginName, endpoint string, versions []string) error {
|
|
klog.Infof("[reporter manager] registering Plugin %s at Endpoint %s", pluginName, endpoint)
|
|
|
|
e, err := plugin.NewRemoteEndpoint(endpoint, pluginName, m.emitter, m.callback)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to dial device plugin with socketPath %s: %v", endpoint, err)
|
|
}
|
|
|
|
return m.registerPlugin(pluginName, e)
|
|
}
|
|
|
|
// DeRegisterPlugin is to handler plugin de-register event
|
|
func (m *ReporterPluginManager) DeRegisterPlugin(pluginName string) {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
if e, ok := m.endpoints[pluginName]; ok {
|
|
e.Stop()
|
|
klog.Errorf("[reporter manager] reporter plugin %s has been deregistered", pluginName)
|
|
_ = m.emitter.StoreInt64("reporter_plugin_deregister", 1, metrics.MetricTypeNameCount,
|
|
metrics.ConvertMapToTags(map[string]string{
|
|
"plugin": pluginName,
|
|
})...)
|
|
}
|
|
}
|
|
|
|
// Run start the reporter plugin manager
|
|
func (m *ReporterPluginManager) Run(ctx context.Context) {
|
|
go wait.UntilWithContext(ctx, m.syncFunc, m.reconcilePeriod)
|
|
|
|
klog.Infof("reporter plugin manager started")
|
|
m.reporter.Run(ctx)
|
|
|
|
general.RegisterHealthzCheckRules(healthzNameReporterFetcherReady, m.healthz)
|
|
}
|
|
|
|
func (m *ReporterPluginManager) isVersionCompatibleWithPlugin(versions []string) bool {
|
|
// todo: currently this is fine as we only have a single supported version. When we do need to support
|
|
// multiple versions in the future, we may need to extend this function to return a supported version.
|
|
// E.g., say kubelet supports v1beta1 and v1beta2, and we get v1alpha1 and v1beta1 from a device plugin,
|
|
// this function should return v1beta1
|
|
for _, version := range versions {
|
|
for _, supportedVersion := range v1alpha1.SupportedVersions {
|
|
if version == supportedVersion {
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func (m *ReporterPluginManager) registerPlugin(pluginName string, e plugin.Endpoint) error {
|
|
m.registerEndpoint(pluginName, e)
|
|
|
|
success := make(chan bool)
|
|
|
|
go m.runEndpoint(pluginName, e, success)
|
|
|
|
select {
|
|
case pass := <-success:
|
|
if pass {
|
|
klog.Infof("plugin %s run success", pluginName)
|
|
return nil
|
|
}
|
|
return fmt.Errorf("failed to register plugin %s", pluginName)
|
|
}
|
|
}
|
|
|
|
func (m *ReporterPluginManager) registerEndpoint(pluginName string, e plugin.Endpoint) {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
old, ok := m.endpoints[pluginName]
|
|
|
|
if ok && !old.IsStopped() {
|
|
klog.Infof("stop old endpoint: %s", pluginName)
|
|
old.Stop()
|
|
}
|
|
|
|
m.endpoints[pluginName] = e
|
|
klog.Infof("registered plugin name %s", pluginName)
|
|
}
|
|
|
|
func (m *ReporterPluginManager) runEndpoint(pluginName string, e plugin.Endpoint, success chan<- bool) {
|
|
e.Run(success)
|
|
e.Stop()
|
|
|
|
_ = m.emitter.StoreInt64("reporter_plugin_unhealthy", 1, metrics.MetricTypeNameCount,
|
|
metrics.ConvertMapToTags(map[string]string{
|
|
"plugin": pluginName,
|
|
})...)
|
|
klog.Infof("reporter plugin %s became unhealthy", pluginName)
|
|
}
|
|
|
|
// genericCallback is triggered by ListAndWatch of plugin implementations;
|
|
// the ListWatch function will store report content in Endpoint and send to manager,
|
|
// and the manager can read it from Endpoint cache to obtain content changes initiative
|
|
func (m *ReporterPluginManager) genericCallback(pluginName string, _ *v1alpha1.GetReportContentResponse) {
|
|
// get report content from each healthy Endpoint from cache, the lastly response
|
|
// from this plugin has been already stored to its Endpoint cache before this callback called
|
|
reportResponses := m.getReportContent(true)
|
|
|
|
err := m.pushContents(context.Background(), reportResponses)
|
|
if err != nil {
|
|
_ = m.emitter.StoreInt64("reporter_plugin_lw_push_failed", 1, metrics.MetricTypeNameCount, []metrics.MetricTag{
|
|
{Key: "plugin", Val: pluginName},
|
|
}...)
|
|
klog.Errorf("report plugin %s in callback failed with error: %v", pluginName, err)
|
|
}
|
|
}
|
|
|
|
func (m *ReporterPluginManager) pushContents(ctx context.Context, reportResponses map[string]*v1alpha1.GetReportContentResponse) error {
|
|
if err := m.writeCheckpoint(reportResponses); err != nil {
|
|
klog.Errorf("writing checkpoint encountered %v", err)
|
|
}
|
|
|
|
return m.reporter.PushContents(ctx, reportResponses)
|
|
}
|
|
|
|
// genericSync periodically calls the Get function to obtain content changes
|
|
func (m *ReporterPluginManager) genericSync(ctx context.Context) {
|
|
// clear unhealthy plugin periodically
|
|
m.clearUnhealthyPlugin()
|
|
|
|
// get report content from each healthy Endpoint directly
|
|
reportResponses := m.getReportContent(false)
|
|
|
|
err := m.pushContents(ctx, reportResponses)
|
|
if err != nil {
|
|
_ = m.emitter.StoreInt64("reporter_plugin_sync_push_failed", 1, metrics.MetricTypeNameCount)
|
|
klog.Errorf("report plugin failed with error: %v", err)
|
|
}
|
|
|
|
m.healthzSyncLoop()
|
|
}
|
|
|
|
// clearUnhealthyPlugin is to clear stopped plugins from cache which exceeded grace period
|
|
func (m *ReporterPluginManager) clearUnhealthyPlugin() {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
for pluginName, e := range m.endpoints {
|
|
if e.StopGracePeriodExpired() {
|
|
delete(m.endpoints, pluginName)
|
|
|
|
klog.Warningf("plugin %s has been clear", pluginName)
|
|
_ = m.emitter.StoreInt64("reporter_plugin_clear", 1, metrics.MetricTypeNameCount,
|
|
metrics.ConvertMapToTags(map[string]string{
|
|
"plugin": pluginName,
|
|
})...)
|
|
}
|
|
}
|
|
}
|
|
|
|
// getReportContent is to get reportContent from plugins. if cacheFirst is true,
|
|
// use plugin cache (when it is no nil), otherwise we call plugin directly.
|
|
func (m *ReporterPluginManager) getReportContent(cacheFirst bool) map[string]*v1alpha1.GetReportContentResponse {
|
|
reportResponses := make(map[string]*v1alpha1.GetReportContentResponse)
|
|
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
// get report content from each Endpoint
|
|
for pluginName, e := range m.endpoints {
|
|
var (
|
|
resp *v1alpha1.GetReportContentResponse
|
|
err error
|
|
)
|
|
|
|
// if cacheFirst is false or cache response is nil, we will try to get report content directly from plugin
|
|
if cacheFirst {
|
|
cache := e.GetCache()
|
|
if cache != nil {
|
|
reportResponses[pluginName] = cache
|
|
continue
|
|
}
|
|
}
|
|
|
|
ctx := metadata.NewOutgoingContext(context.Background(), metadata.New(nil))
|
|
resp, err = e.GetReportContent(ctx)
|
|
if err != nil {
|
|
s, _ := status.FromError(err)
|
|
_ = m.emitter.StoreInt64("reporter_plugin_get_content_failed", 1, metrics.MetricTypeNameCount, []metrics.MetricTag{
|
|
{Key: "code", Val: s.Code().String()},
|
|
{Key: "plugin", Val: pluginName},
|
|
}...)
|
|
|
|
klog.Errorf("GetReportContentResponse from %s Endpoint failed with error: %v", pluginName, err)
|
|
// if it gets report content failed, uses cached response
|
|
resp = e.GetCache()
|
|
}
|
|
|
|
reportResponses[pluginName] = resp
|
|
}
|
|
|
|
return reportResponses
|
|
}
|
|
|
|
func (m *ReporterPluginManager) writeCheckpoint(reportResponses map[string]*v1alpha1.GetReportContentResponse) error {
|
|
remoteResponses := make(map[string]*v1alpha1.GetReportContentResponse, 0)
|
|
// only write remote endpoint response to checkpoint
|
|
for name, response := range reportResponses {
|
|
if m.innerEndpoints.Has(name) {
|
|
continue
|
|
}
|
|
remoteResponses[name] = response
|
|
}
|
|
data := checkpoint.New(remoteResponses)
|
|
err := m.checkpointManager.CreateCheckpoint(reporterManagerCheckpoint, data)
|
|
if err != nil {
|
|
_ = m.emitter.StoreInt64("reporter_plugin_checkpoint_write_failed", 1, metrics.MetricTypeNameCount)
|
|
return fmt.Errorf("failed to write checkpoint file %q: %v", reporterManagerCheckpoint, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *ReporterPluginManager) readCheckpoint() error {
|
|
reportResponses := make(map[string]*v1alpha1.GetReportContentResponse, 0)
|
|
cp := checkpoint.New(reportResponses)
|
|
err := m.checkpointManager.GetCheckpoint(reporterManagerCheckpoint, cp)
|
|
if err != nil {
|
|
if err == cpmerrors.ErrCheckpointNotFound {
|
|
klog.Warningf("failed to retrieve checkpoint for %q: %v", reporterManagerCheckpoint, err)
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
reportResponses = cp.GetData()
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
for name, response := range reportResponses {
|
|
// During start up, creates stopped remote endpoint so that the report content
|
|
// will stay zero till the corresponding device plugin re-registers.
|
|
m.endpoints[name] = plugin.NewStoppedRemoteEndpoint(name, response)
|
|
}
|
|
return nil
|
|
}
|