sysom1/sysom_server/sysom_diagnosis/service_scripts/clustermem_pre.py

342 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import traceback
import json
from clogger import logger
from .base import DiagnosisPreProcessor, DiagnosisTask, DiagnosisJob
from metric_reader import MetricReader, dispatch_metric_reader,RangeQueryTask
metric_reader = dispatch_metric_reader("prometheus://localhost:9090")
def format_time_to_timestamp(t):
ret = {"success":True, "timestamp":""}
ret["timestamp"] = t
if (t.endswith(".000")):
t = t.replace(".000", "")
try:
ret["timestamp"] = int(float(t))
except:
try:
struct_time = time.strptime(t, '%Y-%m-%d %H:%M:%S')
ret["timestamp"] = time.mktime(struct_time)
except:
ret["success"] = False
traceback.print_exc()
pass
pass
return ret
def pull_monidata(warn_time, machine_ip, pod_name):
if machine_ip.endswith(":8400"):
instance = machine_ip
else:
instance = "%s:8400"%machine_ip
node_needed_metrics = {
'sysom_proc_meminfo': {'MemTotal': 0, 'MemFree': 0, 'Cached': 0,
'Buffers': 0, 'kernel_used': 0, 'user_anon': 0,
'user_mlock': 0, 'user_used': 0, 'user_filecache': 0}
}
pod_needed_metrics = {
'sysom_cg_memUtil':{'cache': 0, 'usage': 0, 'mem_util': 0, 'cache_ratio': 0,
'inactive_file': 0, 'active_file': 0}
}
# podmem_metrics = {
# "podname1": [(filename1, cache size), (filename2, cache size)]
# "podname2": [(filename1, cache size), (filename2, cache size)]
# }
podmem_metrics = {}
try:
ret = {}
ret["success"] = True
ret["errmsg"] = ""
ret["node_data"] = {}
ret["pod_data"] = {}
ret["podmem_data"] = {}
logger.info("original time:%s"%warn_time)
rettime = format_time_to_timestamp(warn_time)
if rettime["success"] == True:
warn_time = rettime["timestamp"]
else:
ret["success"] = False
ret["errmsg"] = "time format wrong! should be timestamp or type: Y-M-D H:M:S"
return ret
logger.info("transfer time:%s"%warn_time)
# get all node level memory metrics. (like sysom_proc_meminfo)
for table_key, table_value in node_needed_metrics.items():
# value = "total"...
for metric_key, _ in table_value.items():
#logger.info(table_key + "\t" + metric_key)
table_task = RangeQueryTask(table_key, warn_time - 30, warn_time) \
.append_equal_filter("instance", instance) \
.append_equal_filter("value", metric_key)
node_metrics_res = metric_reader.range_query([table_task])
if len(node_metrics_res.data) <= 0:
ret["success"] = False
ret["errmsg"] = "failed to get node metrics data: " + metric_key
return ret
else:
for i in range (len(node_metrics_res.data)):
# "values": [(1683266928.960442, 2), ...m] 区间向量中的所有值
values = node_metrics_res.data[i].to_dict()["values"]
if len(values) > 0:
# 只取指标在异常时间的瞬时向量值, 即区间向量最后一个值
val = float(values[-1][1])
node_needed_metrics[table_key][metric_key] += val
ret["node_data"] = node_needed_metrics
# get podmem metrics
podmem_task = RangeQueryTask("sysom_podmem", warn_time - 30, warn_time) \
.append_equal_filter("instance", instance) \
.append_equal_filter("podns", "default") \
.append_equal_filter("value", "cached") \
.append_wildcard_filter("podname", "*") # all pods' podmem in instacne
podmem_metrics_res = metric_reader.range_query([podmem_task])
if len(podmem_metrics_res.data) <= 0:
ret["success"] = False
ret["errmsg"] = "failed to get podmem metrics data"
return ret
else:
for i in range (len(podmem_metrics_res.data)):
tmp_dist = podmem_metrics_res.data[i].to_dict()
values = tmp_dist["values"]
if len(values) > 0:
val = float(values[-1][1])
filename_cached = (tmp_dist["labels"]["file"], val)
podname = tmp_dist["labels"]["podname"]
if podname not in podmem_metrics:
podmem_metrics[podname] = []
podmem_metrics[podname].append(filename_cached)
ret["podmem_data"] = podmem_metrics
# get certein pod level memory metrics
if pod_name != "":
for table_key, table_value in pod_needed_metrics.items():
for metric_key, _ in table_value.items():
pod_task = RangeQueryTask(table_key, warn_time - 30, warn_time) \
.append_equal_filter("instance", instance) \
.append_equal_filter("podname", pod_name) \
.append_equal_filter("value", metric_key)
pod_metrics_res = metric_reader.range_query([pod_task])
if len(pod_metrics_res.data) <= 0:
ret["success"] = False
ret["errmsg"] = "failed to get pod metrics data: " + metric_key
return ret
else:
for i in range (len(pod_metrics_res.data)):
values = pod_metrics_res.data[i].to_dict()["values"]
if len(values) > 0:
# 只取指标在异常时间的瞬时向量值, 即区间向量最后一个值
# 指定pod名也有可能有多个datapod中有多个容器累加起来
val = float(values[-1][1])
pod_needed_metrics[table_key][metric_key] += val
# 如果是利用率数据,需要除容器数量(len(data)),求平均值
if metric_key == "mem_util" or metric_key == "cache_ratio":
pod_needed_metrics[table_key][metric_key] /= len(pod_metrics_res.data)
ret["pod_data"] = pod_needed_metrics
return ret
except Exception as e:
logger.exception(e);
traceback.print_exc()
return ret
def check_node_memory_high(node_metrics):
ret = {}
ret["root_cause"] = ""
ret["suggestion"] = ""
node_proc_meminfo = node_metrics["sysom_proc_meminfo"]
mem_total = node_proc_meminfo["MemTotal"]
mem_used = node_proc_meminfo["MemTotal"] - node_proc_meminfo["MemFree"]
#mem_high_watermark = mem_total * 0.90
anon_high_watermark = mem_used * 0.6
cache_high_watermark = mem_used * 0.45
kernel_high_watermark = mem_total * 0.3
if mem_used < mem_total * 0.8:
ret["root_cause"] += "节点内存用量处于正常水平\n"
ret["suggestion"] += "可继续密切关注集群监控中节点资源详情中节点具体内存使用情况。\n"
if (node_proc_meminfo["user_anon"]) >= anon_high_watermark:
ret["root_cause"] += "已用内存中,节点应用申请内存多。\n"
ret["suggestion"] += "终止占用内存过多的无关进程或检查应用是否存在内存泄露。\n"
if (node_proc_meminfo["user_filecache"]) >= cache_high_watermark:
ret["root_cause"] += "已用内存中,节点缓存(cache)使用多。\n"
ret["suggestion"] += "查看下方节点文件缓存占用详情,考虑删除其中无用文件\n"
ret["show_cache"] = "node"
if (node_proc_meminfo["kernel_used"]) >= kernel_high_watermark:
ret["root_cause"] += "已用内存中,节点存在内核内存泄漏问题。\n"
ret["suggestion"] += "查看集群监控资源详情中内核内存监控详情或联系内核支持。\n"
if node_proc_meminfo["user_anon"] < anon_high_watermark and \
node_proc_meminfo["user_filecache"] < cache_high_watermark and \
node_proc_meminfo["kernel_used"] < kernel_high_watermark:
ret["root_cause"] += "节点缓存(cache),应用申请内存正常。\n"
ret["suggestion"] += "密切关注集群监控资源详情中的节点详细内存使用情况。\n"
return ret
def check_pod_memory_high(pod_metrics):
ret = {}
ret["root_cause"] = ""
ret["suggestion"] = ""
cg_memutils = pod_metrics["sysom_cg_memUtil"]
mem_usage = cg_memutils["usage"]
anon_mem = mem_usage - cg_memutils["active_file"] - cg_memutils["inactive_file"]
cache_mem = cg_memutils["cache"]
anon_high_watermark = mem_usage * 0.8
cache_high_watermark = mem_usage * 0.6
if cg_memutils["mem_util"] < 80:
ret["root_cause"] += "Pod内存用量处于正常水平或Pod未设置内存限制。\n"
ret["suggestion"] += "可继续密切关注容器监控中Pod资源详情中Pod具体内存使用情况。\n"
if anon_mem >= anon_high_watermark:
ret["root_cause"] += "已用内存中Pod应用申请内存过多。\n"
ret["suggestion"] += "终止Pod内占用内存过多的无关进程或检查Pod内应用是否存在内存泄露。\n"
if cache_mem >= cache_high_watermark:
ret["root_cause"] += "已用内存中Pod文件缓存(cache)使用过多。\n"
ret["suggestion"] += "查看下方Pod文件缓存占用详情删除其中无用文件。\n"
ret["show_cache"] = "pod"
if anon_mem < anon_high_watermark and cache_mem < cache_high_watermark:
ret["root_cause"] += "Pod文件缓存(cache),应用申请内存正常。\n"
ret["suggestion"] += "密切关注容器监控中Pod资源详情中Pod其他类型内存使用情况。\n"
return ret
# 手工填写节点内存是否异常的判断规则,返回根因(root cause)和建议(suggestion)
def diagnosis_node_mem(node_metrics, diagnosis_type):
node_ret = {}
node_ret["root_cause"] = ""
node_ret["suggestion"] = ""
node_proc_meminfo = node_metrics["sysom_proc_meminfo"]
mem_used = node_proc_meminfo["MemTotal"] - node_proc_meminfo["MemFree"]
# 目前节点内存高诊断和内存延时诊断都是诊断内存高
if (diagnosis_type == "内存高诊断"):
ret = check_node_memory_high(node_metrics)
elif (diagnosis_type == "内存延时诊断"):
if mem_used >= node_proc_meminfo["MemTotal"] * 0.9:
node_ret["root_cause"] += "节点内存用量过高导致节点内存延时。"
ret = check_node_memory_high(node_metrics)
node_ret["root_cause"] += ret["root_cause"]
node_ret["suggestion"] += ret["suggestion"]
if "show_cache" in ret:
node_ret["show_cache"] = ret["show_cache"]
return node_ret
def diagnosis_pod_mem(node_metrics, pod_metrics, diagnosis_type):
pod_ret = {}
pod_ret["root_cause"] = ""
pod_ret["suggestion"] = ""
node_proc_meminfo = node_metrics["sysom_proc_meminfo"]
mem_total = node_proc_meminfo["MemTotal"]
mem_used = node_proc_meminfo["MemTotal"] - node_proc_meminfo["MemFree"]
cg_memutil = pod_metrics["sysom_cg_memUtil"]["mem_util"]
mem_high_watermark = mem_total * 0.95
util_watermark = 95
ret = {}
if (diagnosis_type == "内存高诊断"):
ret = check_pod_memory_high(pod_metrics)
elif (diagnosis_type == "内存延时诊断"):
# 容器内存达到cgroup上限导致的内存延时
if cg_memutil > util_watermark:
pod_ret["root_cause"] += "容器内存用量达到内存限制。\n"
pod_ret["suggestion"] += "考虑增加容器内存容量。\n"
ret = check_pod_memory_high(pod_metrics)
elif mem_used > mem_high_watermark: #节点内存不足导致容器内存延时
pod_ret["root_cause"] += "节点内存紧张导致容器发生内存延时。\n"
ret = check_node_memory_high(node_metrics)
else:
pod_ret["root_cause"] += "容器和节点内存均未超过阈值\n"
pod_ret["suggestion"] += "继续密切关注内存延时指标\n"
return pod_ret
pod_ret["root_cause"] += ret["root_cause"]
pod_ret["suggestion"] += ret["suggestion"]
if "show_cache" in ret:
pod_ret["show_cache"] = ret["show_cache"]
return pod_ret
class PreProcessor(DiagnosisPreProcessor):
"""Command diagnosis
Just invoke command in target instance and get stdout result
Args:
DiagnosisPreProcessor (_type_): _description_
"""
def get_diagnosis_cmds(self, params: dict) -> DiagnosisTask:
pod_name = params.get("pod_name", "")
ts = params.get("time", "")
machine_ip = params.get("instance", "")
diagnosis_type = params.get("diagnosis_type", "内存高诊断")
diagnosis_type = str(diagnosis_type)
diagnosis_type.strip()
pre_result = {}
pre_result["success"] = True
pre_result["errmsg"] = ""
pre_result["root_cause"] = ""
pre_result["suggestion"] = ""
pre_result["show_cache"] = ""
pre_result["pod_name"] = pod_name
# 从prometheus拉取需要的nodepod指标
data_ret = pull_monidata(ts, machine_ip, pod_name)
if data_ret["success"] != True:
pre_result["success"] = data_ret["success"]
pre_result["errmsg"] = data_ret["errmsg"]
result_str = json.dumps(pre_result)
return DiagnosisTask(
jobs=[DiagnosisJob(instance="", cmd="")],
offline_mode=True,
offline_results=[
result_str
]
)
# 诊断node或者pod
if pod_name != "":
diagnosis_ret = diagnosis_pod_mem(data_ret["node_data"], data_ret["pod_data"],
diagnosis_type)
else:
diagnosis_ret = diagnosis_node_mem(data_ret["node_data"], diagnosis_type)
pre_result["root_cause"] = diagnosis_ret["root_cause"]
pre_result["suggestion"] = diagnosis_ret["suggestion"]
if "show_cache" in diagnosis_ret:
pre_result["show_cache"] = diagnosis_ret["show_cache"]
pre_result["node_data"] = data_ret["node_data"]
pre_result["pod_data"] = data_ret["pod_data"]
pre_result["podmem_data"] = data_ret["podmem_data"]
result_str = json.dumps(pre_result)
return DiagnosisTask(
# 离线模式不需要在节点上执行,所以此处 instance 和 cmd 指定为空即可
jobs=[DiagnosisJob(instance="", cmd="")],
# 开启离线模式
offline_mode=True,
# 返回离线诊断的结果(数组中的每一项,最后都会传递到 DiagnosisJobResult.stdout
offline_results=[
result_str
]
)