sysom1/sysom_server/sysom_cluster_health/lib/utils.py

105 lines
3.4 KiB
Python

import time
import conf.settings as settings
from typing import List, Tuple
from metric_reader.metric_reader import MetricReader, RangeQueryTask
from clogger import logger
INSTANCES_VRAIABLE = "sysom_proc_cpu_total"
PODS_VARIABLE = "sysom_container_memory_oomcnt"
NODE_LABELS = settings.NODE_LABEL
POD_LABELS = settings.POD_LABEL
NAMESPACE_LABELS = settings.NAMESPACE_LABEL
def collect_all_clusters(metric_reader: MetricReader) -> List[str]:
cluster_list = []
res = metric_reader.get_label_values("cluster")
if len(res.data) <= 0:
logger.error("Collect all cluster failed!")
return cluster_list
return [item for item in res.data]
def collect_instances_of_cluster(cluster_id: str, metric_reader: MetricReader,
interval: int) -> List[str]:
""" Collect all instances of specific cluster
Use "sysom_proc_cpu_total" metric to collect all pods
of specific instance, need to make sure the metric has been correctlly
exported (similar to grafana variables).
Args:
instance_id: instance id
metric_reader: MetricReader instance of metric_reader sdk
interval: time interval of query
Returns:
List of instances
"""
instances_list = []
task = RangeQueryTask(INSTANCES_VRAIABLE,
start_time=time.time() - interval,
end_time=time.time()) \
.append_equal_filter("mode", "total") \
if cluster_id != "default":
task.append_equal_filter("cluster", cluster_id)
node_metric_res = metric_reader.range_query([task])
if len(node_metric_res.data) <= 0:
logger.error(
f"Collect instances of {cluster_id} info: no instances found!")
return instances_list
try:
for i in range(len(node_metric_res.data)):
labels = node_metric_res.data[i].to_dict()["labels"]
if NODE_LABELS in labels:
instances_list.append(labels[NODE_LABELS])
except Exception as e:
raise e
return list(set(instances_list))
def collect_pods_of_instance(instance_id: str, metric_reader: MetricReader,
interval: int) -> List[Tuple[str, str]]:
""" Collect all pods of specific instance
Use "sysom_container_memory_oomcnt" metric to collect all pods
of specific instance, need to make sure the metric has been correctlly
exported.
Args:
instance_id: instance id
metric_reader: MetricReader instance of metric_reader sdk
interval: time interval of query
Returns:
List of (pod name and namespace)
"""
pod_list = []
task = RangeQueryTask(PODS_VARIABLE,
start_time=time.time() - interval,
end_time=time.time()) \
.append_equal_filter(NODE_LABELS, instance_id)
pod_metric_res = metric_reader.range_query([task])
if len(pod_metric_res.data) <= 0:
logger.error(f"Collect pod of {instance_id} info: no pod found!")
return pod_list
try:
for i in range(len(pod_metric_res.data)):
labels = pod_metric_res.data[i].to_dict()["labels"]
if POD_LABELS in labels and NAMESPACE_LABELS in labels:
pod_list.append(
(labels[POD_LABELS], labels[NAMESPACE_LABELS])
)
except Exception as e:
raise e
return list(set(pod_list))