forked from kubewharf/katalyst-core
275 lines
8.4 KiB
Go
275 lines
8.4 KiB
Go
/*
|
|
Copyright 2022 The Katalyst Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package fetcher
|
|
|
|
import (
|
|
"context"
|
|
"flag"
|
|
"io/ioutil"
|
|
"os"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
|
"k8s.io/client-go/tools/record"
|
|
"k8s.io/klog/v2"
|
|
"k8s.io/kubernetes/pkg/kubelet/config"
|
|
"k8s.io/kubernetes/pkg/kubelet/pluginmanager"
|
|
plugincache "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
|
|
|
|
"github.com/kubewharf/katalyst-api/pkg/plugins/registration"
|
|
"github.com/kubewharf/katalyst-api/pkg/plugins/skeleton"
|
|
"github.com/kubewharf/katalyst-api/pkg/protocol/reporterplugin/v1alpha1"
|
|
"github.com/kubewharf/katalyst-core/pkg/agent/resourcemanager/fetcher/plugin"
|
|
"github.com/kubewharf/katalyst-core/pkg/agent/resourcemanager/reporter"
|
|
katalystconfig "github.com/kubewharf/katalyst-core/pkg/config"
|
|
"github.com/kubewharf/katalyst-core/pkg/config/agent"
|
|
"github.com/kubewharf/katalyst-core/pkg/config/agent/global"
|
|
reporterconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/reporter"
|
|
"github.com/kubewharf/katalyst-core/pkg/metrics"
|
|
"github.com/kubewharf/katalyst-core/pkg/util/general"
|
|
)
|
|
|
|
const (
|
|
testPluginName = "fake-reporter-plugin-1"
|
|
testPluginNameSecond = "fake-reporter-plugin-2"
|
|
)
|
|
|
|
var (
|
|
testGroupVersionKind = v1.GroupVersionKind{
|
|
Group: "test-group",
|
|
Kind: "test-kind",
|
|
Version: "test-version",
|
|
}
|
|
)
|
|
|
|
func tmpSocketDir() (socketDir string, err error) {
|
|
socketDir, err = ioutil.TempDir("", "reporter_plugin")
|
|
if err != nil {
|
|
return
|
|
}
|
|
_ = os.MkdirAll(socketDir, 0755)
|
|
return
|
|
}
|
|
|
|
func generateTestConfiguration(dir string) *katalystconfig.Configuration {
|
|
return &katalystconfig.Configuration{
|
|
AgentConfiguration: &agent.AgentConfiguration{
|
|
GenericAgentConfiguration: &agent.GenericAgentConfiguration{
|
|
MetaServerConfiguration: &global.MetaServerConfiguration{CheckpointManagerDir: dir},
|
|
GenericReporterConfiguration: &reporterconfig.GenericReporterConfiguration{
|
|
CollectInterval: 5 * time.Second,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
func TestNewManagerImpl(t *testing.T) {
|
|
socketDir, err := tmpSocketDir()
|
|
testReporter := reporter.NewReporterManagerStub()
|
|
require.NoError(t, err)
|
|
defer os.RemoveAll(socketDir)
|
|
_, err = NewReporterPluginManager(testReporter, metrics.DummyMetrics{}, nil, generateTestConfiguration(socketDir))
|
|
require.NoError(t, err)
|
|
os.RemoveAll(socketDir)
|
|
}
|
|
|
|
// Tests that the device plugin manager correctly handles registration and re-registration by
|
|
// making sure that after registration, devices are correctly updated and if a re-registration
|
|
// happens, we will NOT delete devices; and no orphaned devices left.
|
|
func TestReporterPluginReRegistration(t *testing.T) {
|
|
// change default klog level
|
|
flagSet := flag.FlagSet{}
|
|
klog.InitFlags(&flagSet)
|
|
_ = flagSet.Parse([]string{
|
|
"-v", "0",
|
|
})
|
|
|
|
socketDir, err := tmpSocketDir()
|
|
require.NoError(t, err)
|
|
defer os.RemoveAll(socketDir)
|
|
|
|
content1 := []*v1alpha1.ReportContent{
|
|
{
|
|
GroupVersionKind: &testGroupVersionKind,
|
|
Field: []*v1alpha1.ReportField{
|
|
{
|
|
FieldType: v1alpha1.FieldType_Spec,
|
|
FieldName: "fieldName_a",
|
|
Value: []byte("Value_a"),
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
testReporter := reporter.NewReporterManagerStub()
|
|
|
|
ctx, cancel := context.WithCancel(context.TODO())
|
|
defer cancel()
|
|
_, ch, p1 := setup(t, ctx, content1, nil, socketDir, testPluginName, testReporter)
|
|
|
|
select {
|
|
case <-ch:
|
|
case <-time.After(5 * time.Second):
|
|
t.Fatalf("timeout while waiting for manager update")
|
|
}
|
|
|
|
p1GetReportContentResponse := testReporter.GetReportContentResponse(p1.Name())
|
|
require.NotNil(t, p1GetReportContentResponse)
|
|
reporterContentsEqual(t, content1, p1GetReportContentResponse.Content)
|
|
|
|
content2 := []*v1alpha1.ReportContent{
|
|
{
|
|
GroupVersionKind: &testGroupVersionKind,
|
|
Field: []*v1alpha1.ReportField{
|
|
{
|
|
FieldType: v1alpha1.FieldType_Spec,
|
|
FieldName: "fieldName_b",
|
|
Value: []byte("Value_b"),
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
p2 := setupReporterPlugin(t, content2, socketDir, testPluginNameSecond)
|
|
|
|
select {
|
|
case <-ch:
|
|
case <-time.After(5 * time.Second):
|
|
t.Fatalf("timeout while waiting for manager update")
|
|
}
|
|
|
|
p1GetReportContentResponse = testReporter.GetReportContentResponse(p1.Name())
|
|
require.NotNil(t, p1GetReportContentResponse)
|
|
reporterContentsEqual(t, content1, p1GetReportContentResponse.Content)
|
|
|
|
p2GetReportContentResponse := testReporter.GetReportContentResponse(p2.Name())
|
|
require.NotNil(t, p2GetReportContentResponse)
|
|
reporterContentsEqual(t, content2, p2GetReportContentResponse.Content)
|
|
|
|
// test the scenario that plugin de-register and graceful shut down
|
|
_ = p1.Stop()
|
|
_ = p2.Stop()
|
|
}
|
|
|
|
func TestHealthz(t *testing.T) {
|
|
// change default klog level
|
|
flagSet := flag.FlagSet{}
|
|
klog.InitFlags(&flagSet)
|
|
_ = flagSet.Parse([]string{
|
|
"-v", "0",
|
|
})
|
|
|
|
socketDir, err := tmpSocketDir()
|
|
require.NoError(t, err)
|
|
defer os.RemoveAll(socketDir)
|
|
|
|
content1 := []*v1alpha1.ReportContent{
|
|
{
|
|
GroupVersionKind: &testGroupVersionKind,
|
|
Field: []*v1alpha1.ReportField{
|
|
{
|
|
FieldType: v1alpha1.FieldType_Spec,
|
|
FieldName: "fieldName_a",
|
|
Value: []byte("Value_a"),
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
testReporter := reporter.NewReporterManagerStub()
|
|
|
|
ctx, cancel := context.WithCancel(context.TODO())
|
|
defer cancel()
|
|
_, ch, p := setup(t, ctx, content1, nil, socketDir, testPluginName, testReporter)
|
|
|
|
select {
|
|
case <-ch:
|
|
case <-time.After(6 * time.Second):
|
|
t.Fatalf("timeout while waiting for manager update")
|
|
}
|
|
|
|
results := general.CheckHealthz()
|
|
for name, response := range results {
|
|
if reporterFetcherRules.Has(string(name)) {
|
|
require.Equal(t, response.State, general.HealthzCheckStateReady)
|
|
}
|
|
}
|
|
|
|
_ = p.Stop()
|
|
}
|
|
|
|
func setup(t *testing.T, ctx context.Context, content []*v1alpha1.ReportContent, callback plugin.ListAndWatchCallback, socketDir string, pluginSocketName string, reporter reporter.Manager) (registration.AgentPluginHandler, <-chan interface{}, skeleton.GenericPlugin) {
|
|
m, updateChan := setupReporterManager(t, ctx, content, socketDir, callback, reporter)
|
|
p := setupReporterPlugin(t, content, socketDir, pluginSocketName)
|
|
return m, updateChan, p
|
|
}
|
|
|
|
func setupReporterManager(t *testing.T, ctx context.Context, content []*v1alpha1.ReportContent, socketDir string, callback plugin.ListAndWatchCallback, reporter reporter.Manager) (registration.AgentPluginHandler, <-chan interface{}) {
|
|
m, err := NewReporterPluginManager(reporter, metrics.DummyMetrics{}, nil, generateTestConfiguration(socketDir))
|
|
require.NoError(t, err)
|
|
updateChan := make(chan interface{})
|
|
|
|
if callback != nil {
|
|
m.callback = callback
|
|
}
|
|
|
|
originalCallback := m.callback
|
|
m.callback = func(pluginName string, response *v1alpha1.GetReportContentResponse) {
|
|
originalCallback(pluginName, response)
|
|
updateChan <- new(interface{})
|
|
}
|
|
|
|
go m.Run(ctx)
|
|
|
|
pluginManager := pluginmanager.NewPluginManager(
|
|
socketDir,
|
|
&record.FakeRecorder{},
|
|
)
|
|
|
|
pluginManager.AddHandler(m.GetHandlerType(), plugincache.PluginHandler(m))
|
|
|
|
go pluginManager.Run(config.NewSourcesReady(func(_ sets.String) bool { return true }), ctx.Done())
|
|
|
|
return m, updateChan
|
|
}
|
|
|
|
func setupReporterPlugin(t *testing.T, content []*v1alpha1.ReportContent, socketDir string, pluginName string) skeleton.GenericPlugin {
|
|
p, _ := skeleton.NewRegistrationPluginWrapper(
|
|
skeleton.NewReporterPluginStub(content, pluginName),
|
|
[]string{socketDir}, nil)
|
|
err := p.Start()
|
|
require.NoError(t, err)
|
|
return p
|
|
}
|
|
|
|
func reporterContentsEqual(t *testing.T, expected, actual []*v1alpha1.ReportContent) {
|
|
require.Equal(t, len(expected), len(actual))
|
|
for idx := range expected {
|
|
require.Equal(t, expected[idx].GroupVersionKind, actual[idx].GroupVersionKind)
|
|
require.Equal(t, len(expected[idx].Field), len(actual[idx].Field))
|
|
for fieldIdx := range expected[idx].Field {
|
|
require.Equal(t, expected[idx].Field[fieldIdx].FieldType, actual[idx].Field[fieldIdx].FieldType)
|
|
require.Equal(t, expected[idx].Field[fieldIdx].FieldName, actual[idx].Field[fieldIdx].FieldName)
|
|
require.Equal(t, expected[idx].Field[fieldIdx].Value, actual[idx].Field[fieldIdx].Value)
|
|
}
|
|
}
|
|
}
|