forked from kubewharf/katalyst-core
228 lines
6.3 KiB
Go
228 lines
6.3 KiB
Go
/*
|
|
Copyright 2022 The Katalyst Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package kubelet
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
"go.uber.org/atomic"
|
|
"k8s.io/klog/v2"
|
|
|
|
info "github.com/google/cadvisor/info/v1"
|
|
|
|
"github.com/kubewharf/katalyst-api/pkg/protocol/reporterplugin/v1alpha1"
|
|
"github.com/kubewharf/katalyst-core/pkg/agent/resourcemanager/fetcher/kubelet/topology"
|
|
"github.com/kubewharf/katalyst-core/pkg/agent/resourcemanager/fetcher/plugin"
|
|
"github.com/kubewharf/katalyst-core/pkg/agent/resourcemanager/fetcher/util/kubelet/podresources"
|
|
"github.com/kubewharf/katalyst-core/pkg/config"
|
|
"github.com/kubewharf/katalyst-core/pkg/metaserver"
|
|
"github.com/kubewharf/katalyst-core/pkg/metrics"
|
|
"github.com/kubewharf/katalyst-core/pkg/util"
|
|
"github.com/kubewharf/katalyst-core/pkg/util/process"
|
|
)
|
|
|
|
const (
|
|
// PluginName is name of kubelet reporter plugin
|
|
PluginName = "kubelet-reporter-plugin"
|
|
)
|
|
|
|
// kubeletPlugin implements the endpoint interface, and it's an in-tree reporter plugin
|
|
type kubeletPlugin struct {
|
|
mutex sync.RWMutex
|
|
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
|
|
// conf is used to indicate the file path and name for system data in the future
|
|
// currently, it's not used todo: implement this logic
|
|
conf *config.Configuration
|
|
|
|
topologyStatusAdapter topology.Adapter
|
|
|
|
// cb since kubeletPlugin needs to call updateContent whenever the topology changes,
|
|
// it needs a corresponding callback function
|
|
cb plugin.ListAndWatchCallback
|
|
|
|
// notifierCh channel sent by topology adapter to trigger ListAndWatch send to
|
|
// manager
|
|
notifierCh chan struct{}
|
|
|
|
latestReportContentResponse atomic.Value
|
|
|
|
*process.StopControl
|
|
emitter metrics.MetricEmitter
|
|
metaServer *metaserver.MetaServer
|
|
}
|
|
|
|
// NewKubeletReporterPlugin creates a kubelet reporter plugin
|
|
func NewKubeletReporterPlugin(emitter metrics.MetricEmitter, metaServer *metaserver.MetaServer,
|
|
conf *config.Configuration, callback plugin.ListAndWatchCallback) (plugin.ReporterPlugin, error) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
p := &kubeletPlugin{
|
|
emitter: emitter,
|
|
metaServer: metaServer,
|
|
conf: conf,
|
|
notifierCh: make(chan struct{}, 10),
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
cb: callback,
|
|
StopControl: process.NewStopControl(time.Time{}),
|
|
}
|
|
|
|
topologyStatusAdapter, err := topology.NewPodResourcesServerTopologyAdapter(metaServer,
|
|
conf.PodResourcesServerEndpoints, conf.KubeletResourcePluginPaths, nil,
|
|
p.getNumaInfo, nil, podresources.GetV1Client)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
p.topologyStatusAdapter = topologyStatusAdapter
|
|
|
|
return p, nil
|
|
}
|
|
|
|
func (p *kubeletPlugin) Name() string {
|
|
return PluginName
|
|
}
|
|
|
|
func (p *kubeletPlugin) Run(success chan<- bool) {
|
|
err := p.topologyStatusAdapter.Run(p.ctx, p.topologyStatusChangeHandler)
|
|
if err != nil {
|
|
klog.Fatalf("run topology status adapter failed")
|
|
return
|
|
}
|
|
success <- true
|
|
|
|
for {
|
|
select {
|
|
case _, ok := <-p.notifierCh:
|
|
if !ok {
|
|
klog.Infof("plugin %s has been stopped", PluginName)
|
|
return
|
|
}
|
|
|
|
resp, err := p.getReportContent(p.ctx)
|
|
if err != nil {
|
|
klog.Errorf("plugin %s failed to get report content with error %v", PluginName, err)
|
|
continue
|
|
}
|
|
|
|
p.ListAndWatchReportContentCallback(PluginName, resp)
|
|
case <-p.ctx.Done():
|
|
klog.Infof("plugin %s has been stopped", PluginName)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *kubeletPlugin) GetReportContent(ctx context.Context) (*v1alpha1.GetReportContentResponse, error) {
|
|
return p.getReportContent(ctx)
|
|
}
|
|
|
|
func (p *kubeletPlugin) ListAndWatchReportContentCallback(pluginName string, response *v1alpha1.GetReportContentResponse) {
|
|
p.setCache(response)
|
|
|
|
p.cb(pluginName, response)
|
|
}
|
|
|
|
func (p *kubeletPlugin) GetCache() *v1alpha1.GetReportContentResponse {
|
|
resp := p.latestReportContentResponse.Load()
|
|
if resp == nil {
|
|
return nil
|
|
}
|
|
|
|
return resp.(*v1alpha1.GetReportContentResponse)
|
|
}
|
|
|
|
// Stop to cancel all context and close notifierCh
|
|
func (p *kubeletPlugin) Stop() {
|
|
p.mutex.Lock()
|
|
defer p.mutex.Unlock()
|
|
|
|
p.cancel()
|
|
close(p.notifierCh)
|
|
|
|
p.StopControl.Stop()
|
|
}
|
|
|
|
// topologyStatusChangeHandler is called by topology adapter when topology status changes
|
|
func (p *kubeletPlugin) topologyStatusChangeHandler() {
|
|
p.mutex.RLock()
|
|
defer p.mutex.RUnlock()
|
|
|
|
select {
|
|
case p.notifierCh <- struct{}{}:
|
|
klog.Infof("send topology change notification to plugin %s", PluginName)
|
|
default:
|
|
klog.Warningf("plugin %s is busy, skip topology change notification", PluginName)
|
|
}
|
|
}
|
|
|
|
func (p *kubeletPlugin) setCache(resp *v1alpha1.GetReportContentResponse) {
|
|
p.latestReportContentResponse.Store(resp)
|
|
}
|
|
|
|
// getReportContent get report content from all collectors
|
|
func (p *kubeletPlugin) getReportContent(ctx context.Context) (*v1alpha1.GetReportContentResponse, error) {
|
|
reportContent, err := p.getTopologyStatusContent(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &v1alpha1.GetReportContentResponse{
|
|
Content: reportContent,
|
|
}, nil
|
|
}
|
|
|
|
// getTopologyStatusContent get topology status content from topologyStatusAdapter
|
|
func (p *kubeletPlugin) getTopologyStatusContent(ctx context.Context) ([]*v1alpha1.ReportContent, error) {
|
|
topologyStatus, err := p.topologyStatusAdapter.GetTopologyZones(ctx)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "get numa topology status from adapter failed")
|
|
}
|
|
|
|
value, err := json.Marshal(&topologyStatus)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "marshal topology status failed")
|
|
}
|
|
|
|
return []*v1alpha1.ReportContent{
|
|
{
|
|
GroupVersionKind: &util.CNRGroupVersionKind,
|
|
Field: []*v1alpha1.ReportField{
|
|
{
|
|
FieldType: v1alpha1.FieldType_Status,
|
|
FieldName: util.CNRFieldNameTopologyZone,
|
|
Value: value,
|
|
},
|
|
},
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func (p *kubeletPlugin) getNumaInfo() ([]info.Node, error) {
|
|
if p.metaServer == nil || p.metaServer.MachineInfo == nil {
|
|
return nil, fmt.Errorf("get metaserver machine info is nil")
|
|
}
|
|
return p.metaServer.MachineInfo.Topology, nil
|
|
}
|