support to dynamically switched to transformed informer (#70)

This commit is contained in:
shaowei 2023-05-23 11:11:50 +08:00 committed by GitHub
parent 0be9a136fc
commit 715fd595f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 163 additions and 13 deletions

View File

@ -73,6 +73,10 @@ type GenericContext struct {
DynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory
DynamicResourcesManager *native.DynamicResourcesManager
// if we want to support for transformed informer for a certain object,
// it must be enabled transparently
transformedInformerForPod bool
// DisabledByDefault is the set of components which is disabled by default
DisabledByDefault sets.String
}
@ -152,16 +156,17 @@ func NewGenericContext(
Handler: httpHandler.WithHandleChain(mux),
Addr: genericConf.GenericEndpoint,
},
healthChecker: NewHealthzChecker(),
DisabledByDefault: disabledByDefault,
MetaInformerFactory: metaInformerFactory,
KubeInformerFactory: kubeInformerFactory,
InternalInformerFactory: internalInformerFactory,
DynamicInformerFactory: dynamicInformerFactory,
BroadcastAdapter: broadcastAdapter,
Client: clientSet,
EmitterPool: metricspool.NewCustomMetricsEmitterPool(emitterPool),
DynamicResourcesManager: dynamicResourceManager,
healthChecker: NewHealthzChecker(),
DisabledByDefault: disabledByDefault,
MetaInformerFactory: metaInformerFactory,
KubeInformerFactory: kubeInformerFactory,
InternalInformerFactory: internalInformerFactory,
DynamicInformerFactory: dynamicInformerFactory,
BroadcastAdapter: broadcastAdapter,
Client: clientSet,
EmitterPool: metricspool.NewCustomMetricsEmitterPool(emitterPool),
DynamicResourcesManager: dynamicResourceManager,
transformedInformerForPod: genericConf.TransformedInformerForPod,
}
// add profiling and health check http paths listening on generic endpoint
@ -201,6 +206,9 @@ func (c *GenericContext) StartInformer(ctx context.Context) {
}
if c.KubeInformerFactory != nil {
if transformers, ok := native.GetPodTransformer(); ok && c.transformedInformerForPod {
_ = c.KubeInformerFactory.Core().V1().Pods().Informer().SetTransform(transformers)
}
c.KubeInformerFactory.Start(ctx.Done())
}

View File

@ -36,11 +36,13 @@ type GenericOptions struct {
MasterURL string
KubeConfig string
TransformedInformerForPod bool
// todo actually those auth info should be stored in secrets or somewhere like that
GenericEndpoint string
GenericEndpointHandleChains []string
// todo actually those auth info should be stored in secrets or somewhere like that
GenericAuthStaticUser string
GenericAuthStaticPasswd string
GenericAuthStaticUser string
GenericAuthStaticPasswd string
qosOptions *QoSOptions
metricsOptions *MetricsOptions
@ -51,6 +53,7 @@ type GenericOptions struct {
func NewGenericOptions() *GenericOptions {
return &GenericOptions{
DryRun: false,
TransformedInformerForPod: false,
GenericEndpoint: ":9316",
qosOptions: NewQoSOptions(),
metricsOptions: NewMetricsOptions(),
@ -70,6 +73,9 @@ func (o *GenericOptions) AddFlags(fss *cliflag.NamedFlagSets) {
fs.BoolVar(&o.DryRun, "dry-run", o.DryRun, "A bool to enable and disable dry-run.")
fs.BoolVar(&o.TransformedInformerForPod, "transformed-informer-pod", o.TransformedInformerForPod,
"whether we should enable the ability of transformed informer for pods")
fs.StringVar(&o.MasterURL, "master", o.MasterURL,
`The url of the Kubernetes API server, will overrides any value in kubeconfig, only required if out-of-cluster.`)
fs.StringVar(&o.KubeConfig, "kubeconfig", o.KubeConfig, "The path of kubeconfig file")
@ -94,6 +100,8 @@ func (o *GenericOptions) AddFlags(fss *cliflag.NamedFlagSets) {
func (o *GenericOptions) ApplyTo(c *generic.GenericConfiguration) error {
c.DryRun = o.DryRun
c.TransformedInformerForPod = o.TransformedInformerForPod
c.GenericEndpoint = o.GenericEndpoint
c.GenericEndpointHandleChains = o.GenericEndpointHandleChains
c.GenericAuthStaticUser = o.GenericAuthStaticUser

View File

@ -25,6 +25,9 @@ import (
type GenericConfiguration struct {
DryRun bool
// for some cases, we may need to enable the ability of transformed informer
TransformedInformerForPod bool
GenericEndpoint string
GenericEndpointHandleChains []string
GenericAuthStaticUser string

View File

@ -305,6 +305,7 @@ func NewEvictionController(ctx context.Context,
ec.metricsEmitter = metricsEmitter.WithTags("agent-monitor")
}
native.WithPodTransformer(podTransformerFunc)
return ec, nil
}
@ -734,3 +735,7 @@ func (ec *EvictionController) handleDisruption(healthState string) {
}...)
klog.Infof("eviction controller detect nodes are %v.", healthState)
}
func podTransformerFunc(src, dest *corev1.Pod) {
dest.Spec.NodeName = src.Spec.NodeName
}

View File

@ -184,6 +184,7 @@ func NewSPDController(ctx context.Context, controlCtx *katalystbase.GenericConte
return nil, err
}
native.WithPodTransformer(podTransformerFunc)
return spdController, nil
}
@ -678,3 +679,5 @@ func (sc *SPDController) cleanPodSPDAnnotation(pod *core.Pod) error {
klog.Infof("[spd] successfully clear annotations for pod %v", pod.GetName())
return nil
}
func podTransformerFunc(_, _ *core.Pod) {}

View File

@ -0,0 +1,62 @@
/*
Copyright 2022 The Katalyst Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package native
import (
"testing"
"github.com/stretchr/testify/require"
core "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func TestPodTransformer(t *testing.T) {
as := require.New(t)
WithPodTransformer(func(src, dest *core.Pod) {
dest.Spec.NodeName = src.Spec.NodeName
})
transform, ok := GetPodTransformer()
as.Equal(true, ok)
p := &core.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod-1",
Annotations: map[string]string{
"t-anno-key": "ann",
},
Labels: map[string]string{
"t-label-key": "label",
},
},
Spec: core.PodSpec{
NodeName: "t-node",
Containers: []core.Container{
{},
},
},
Status: core.PodStatus{
Phase: core.PodPending,
},
}
p1, err := transform(p)
as.NoError(err)
as.Equal(core.PodStatus{}, p1.(*core.Pod).Status)
as.Equal(core.PodSpec{NodeName: "t-node"}, p1.(*core.Pod).Spec)
as.Equal(p.ObjectMeta, p1.(*core.Pod).ObjectMeta)
}

View File

@ -0,0 +1,61 @@
/*
Copyright 2022 The Katalyst Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package native
import (
"fmt"
"sync"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)
type PodTransformerFunc func(src, dest *corev1.Pod)
var podTransformers []PodTransformerFunc
var podTransformerMtx sync.RWMutex
func WithPodTransformer(f PodTransformerFunc) {
podTransformerMtx.Lock()
defer podTransformerMtx.Unlock()
podTransformers = append(podTransformers, f)
}
func GetPodTransformer() (cache.TransformFunc, bool) {
podTransformerMtx.RLock()
defer podTransformerMtx.RUnlock()
if len(podTransformers) == 0 {
return nil, false
}
return func(obj interface{}) (interface{}, error) {
pod, ok := obj.(*corev1.Pod)
if !ok {
return nil, fmt.Errorf("unexpected object type: %T", obj)
}
returnedPod := &corev1.Pod{
TypeMeta: pod.TypeMeta,
ObjectMeta: pod.ObjectMeta,
}
for _, f := range podTransformers {
f(pod, returnedPod)
}
return returnedPod, nil
}, true
}