forked from kubewharf/katalyst-core
185 lines
5.4 KiB
Go
185 lines
5.4 KiB
Go
/*
|
|
Copyright 2022 The Katalyst Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package pod
|
|
|
|
import (
|
|
"fmt"
|
|
"time"
|
|
|
|
kubetypes "k8s.io/apimachinery/pkg/types"
|
|
cri "k8s.io/cri-api/pkg/apis"
|
|
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
|
|
"k8s.io/klog/v2"
|
|
"k8s.io/kubernetes/pkg/kubelet/cri/remote"
|
|
"k8s.io/kubernetes/pkg/kubelet/types"
|
|
|
|
"github.com/kubewharf/katalyst-core/pkg/config"
|
|
"github.com/kubewharf/katalyst-core/pkg/util/general"
|
|
)
|
|
|
|
// RuntimePod is a group of containers.
|
|
type RuntimePod struct {
|
|
// The UID of the pod, which can be used to retrieve a particular pod
|
|
// from the pod list returned by GetPods().
|
|
UID kubetypes.UID
|
|
// The name and namespace of the pod, which is readable by human.
|
|
Name string
|
|
Namespace string
|
|
// List of containers that belongs to this pod. It may contain only
|
|
// running containers, or mixed with dead ones (when GetPods(true)).
|
|
Containers []*runtimeapi.Container
|
|
// List of sandboxes associated with this pod. This is only populated
|
|
// by kuberuntime.
|
|
Sandboxes []*runtimeapi.PodSandbox
|
|
}
|
|
|
|
type labeledContainerInfo struct {
|
|
ContainerName string
|
|
PodName string
|
|
PodNamespace string
|
|
runtimeName string
|
|
PodUID kubetypes.UID
|
|
}
|
|
|
|
type RuntimePodFetcher interface {
|
|
GetPods(all bool) ([]*RuntimePod, error)
|
|
}
|
|
|
|
type runtimePodFetcherImpl struct {
|
|
runtimeService cri.RuntimeService
|
|
}
|
|
|
|
func NewRuntimePodFetcher(conf *config.Configuration) (RuntimePodFetcher, error) {
|
|
runtimeService, err := remote.NewRemoteRuntimeService(conf.RemoteRuntimeEndpoint, 2*time.Minute)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create remote runtime service failed %s", err)
|
|
}
|
|
|
|
return &runtimePodFetcherImpl{runtimeService: runtimeService}, nil
|
|
}
|
|
|
|
// GetPods returns a list of containers grouped by pods. The boolean parameter
|
|
// specifies whether the runtime returns all containers including those already
|
|
// exited and dead containers (used for garbage collection).
|
|
func (r *runtimePodFetcherImpl) GetPods(all bool) ([]*RuntimePod, error) {
|
|
pods := make(map[kubetypes.UID]*RuntimePod)
|
|
sandboxes, err := r.getKubeletSandboxes(all)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for i := range sandboxes {
|
|
s := sandboxes[i]
|
|
if s.Metadata == nil {
|
|
klog.V(4).InfoS("Sandbox does not have metadata", "sandbox", s)
|
|
continue
|
|
}
|
|
podUID := kubetypes.UID(s.Metadata.Uid)
|
|
if _, ok := pods[podUID]; !ok {
|
|
pods[podUID] = &RuntimePod{
|
|
UID: podUID,
|
|
Name: s.Metadata.Name,
|
|
Namespace: s.Metadata.Namespace,
|
|
}
|
|
}
|
|
p := pods[podUID]
|
|
p.Sandboxes = append(p.Sandboxes, s)
|
|
}
|
|
|
|
containers, err := r.getKubeletContainers(all)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for i := range containers {
|
|
c := containers[i]
|
|
if c.Metadata == nil {
|
|
klog.V(4).InfoS("Container does not have metadata", "container", c)
|
|
continue
|
|
}
|
|
|
|
labelledInfo := getContainerInfoFromLabels(c.Labels)
|
|
pod, found := pods[labelledInfo.PodUID]
|
|
if !found {
|
|
pod = &RuntimePod{
|
|
UID: labelledInfo.PodUID,
|
|
Name: labelledInfo.PodName,
|
|
Namespace: labelledInfo.PodNamespace,
|
|
}
|
|
pods[labelledInfo.PodUID] = pod
|
|
}
|
|
pod.Containers = append(pod.Containers, c)
|
|
}
|
|
|
|
// Convert map to list.
|
|
var result []*RuntimePod
|
|
for _, pod := range pods {
|
|
result = append(result, pod)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// getKubeletSandboxes lists all (or just the running) sandboxes managed by kubelet.
|
|
func (r *runtimePodFetcherImpl) getKubeletSandboxes(all bool) ([]*runtimeapi.PodSandbox, error) {
|
|
var filter *runtimeapi.PodSandboxFilter
|
|
if !all {
|
|
readyState := runtimeapi.PodSandboxState_SANDBOX_READY
|
|
filter = &runtimeapi.PodSandboxFilter{
|
|
State: &runtimeapi.PodSandboxStateValue{
|
|
State: readyState,
|
|
},
|
|
}
|
|
}
|
|
|
|
resp, err := r.runtimeService.ListPodSandbox(filter)
|
|
if err != nil {
|
|
klog.ErrorS(err, "Failed to list pod sandboxes")
|
|
return nil, err
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
// getKubeletContainers lists containers managed by kubelet.
|
|
// The boolean parameter specifies whether returns all containers including
|
|
// those already exited and dead containers (used for garbage collection).
|
|
func (r *runtimePodFetcherImpl) getKubeletContainers(allContainers bool) ([]*runtimeapi.Container, error) {
|
|
filter := &runtimeapi.ContainerFilter{}
|
|
if !allContainers {
|
|
filter.State = &runtimeapi.ContainerStateValue{
|
|
State: runtimeapi.ContainerState_CONTAINER_RUNNING,
|
|
}
|
|
}
|
|
|
|
containers, err := r.runtimeService.ListContainers(filter)
|
|
if err != nil {
|
|
klog.ErrorS(err, "ListContainers failed")
|
|
return nil, err
|
|
}
|
|
|
|
return containers, nil
|
|
}
|
|
|
|
// getContainerInfoFromLabels gets labeledContainerInfo from labels.
|
|
func getContainerInfoFromLabels(labels map[string]string) *labeledContainerInfo {
|
|
return &labeledContainerInfo{
|
|
PodName: general.GetStringValueFromMap(labels, types.KubernetesPodNameLabel),
|
|
PodNamespace: general.GetStringValueFromMap(labels, types.KubernetesPodNamespaceLabel),
|
|
PodUID: kubetypes.UID(general.GetStringValueFromMap(labels, types.KubernetesPodUIDLabel)),
|
|
ContainerName: general.GetStringValueFromMap(labels, types.KubernetesContainerNameLabel),
|
|
}
|
|
}
|