mirror of https://gitee.com/anolis/sysom.git
466 lines
18 KiB
Python
466 lines
18 KiB
Python
# -*- coding: utf-8 -*- #
|
|
"""
|
|
Time 2023/06/21 15:37
|
|
Author: chenshiyan
|
|
Email chenshiyan@linux.alibaba.com
|
|
File main.py
|
|
Description:
|
|
"""
|
|
from clogger import logger
|
|
from fastapi import FastAPI
|
|
from app.routers import health
|
|
from conf.settings import YAML_CONFIG
|
|
from sysom_utils import CmgPlugin, SysomFramework
|
|
import traceback
|
|
import sys,os,json
|
|
sys.path.append("%s/lib"%(os.path.dirname(os.path.abspath(__file__))))
|
|
import time
|
|
import datetime
|
|
from datetime import date, datetime, timedelta
|
|
import requests
|
|
from dtw import accelerated_dtw,Dtw
|
|
from metric_reader import MetricReader, dispatch_metric_reader,RangeQueryTask
|
|
from sysom_monitor_item import sysommonit_items,sysommonit_items_obser,sysommonit_items_general_name
|
|
from rca_methods import rca_analysis_entry
|
|
from sysom_utils import SysomFramework
|
|
from fastapi.responses import RedirectResponse
|
|
from app.database import SessionLocal
|
|
from app.crud import create_rcacallrecord,get_rcacallrecord_by_recordid,update_or_create_rcacallrecord,update_rcacallrecord_state
|
|
from app.crud import create_rcaitemsrecord,get_rcaitemsrecord_by_recordid,update_or_create_rcaitemsrecord
|
|
from app.schemas import RcaCallRecord,RcaItemsRecord
|
|
|
|
g_client = SysomFramework.gclient("sysom_diagnosis")
|
|
|
|
metric_reader = dispatch_metric_reader("prometheus://localhost:9090")
|
|
|
|
app = FastAPI()
|
|
|
|
app.include_router(health.router, prefix="/api/v1/rca/health")
|
|
|
|
db = SessionLocal()
|
|
|
|
SYSOM_POLL_TIMEOUT = 5
|
|
SYSOM_POLL_INTERVAL = 1
|
|
#############################################################################
|
|
# Write your API interface here, or add to app/routes
|
|
#############################################################################
|
|
|
|
def format_list(alist):
|
|
ret = {}
|
|
ts = []
|
|
for i in range(len(alist)):
|
|
ret[i] = float(alist[i][1])
|
|
ts.append(alist[i][0])
|
|
return ret,ts
|
|
|
|
def get_diagnose_result(taskid):
|
|
retdict = {"data": {"status": "Failed"}}
|
|
try:
|
|
retdiag = g_client.get("api/v1/tasks/%s/"%taskid)
|
|
retdict = retdiag.json()
|
|
except:
|
|
print ("get_diagnose_result exception!")
|
|
traceback.print_exc()
|
|
pass
|
|
return retdict
|
|
|
|
def rca_call_raw(diag_dict):
|
|
recordid = "%s-%s-%s"%(diag_dict["base_item"],diag_dict["instance"],diag_dict["time"])
|
|
timestamp = time.time()
|
|
state = "calling"
|
|
url = "/diagnose/custom/rca"
|
|
user = "rca_call"
|
|
|
|
try:
|
|
record_item = get_rcacallrecord_by_recordid(db, recordid)
|
|
if record_item is not None:
|
|
if (timestamp - record_item.timestamp) <= 180 or record_item.state == "success":
|
|
return record_item.url
|
|
else:
|
|
record_item = RcaCallRecord.from_orm(create_rcacallrecord(db, RcaCallRecord(
|
|
recordid=recordid, state=state, url=url, user=user,timestamp=timestamp )))
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
logger.exception(e)
|
|
pass
|
|
|
|
returl = "/diagnose/custom/rca"
|
|
retdiag = g_client.post("api/v1/tasks/",json=diag_dict)
|
|
try:
|
|
retdiag_dict = retdiag.json()
|
|
if retdiag_dict["success"] == True:
|
|
taskid = retdiag_dict["data"]["task_id"]
|
|
returl = "/diagnose/detail/%s"%taskid
|
|
start_time = datetime.now()
|
|
end_time = datetime.now()
|
|
while True:
|
|
if (end_time - start_time).seconds >= SYSOM_POLL_TIMEOUT:
|
|
break
|
|
retdict_t = get_diagnose_result(taskid)
|
|
state = retdict_t["data"]["status"]
|
|
if retdict_t["data"]["status"] == "Success":
|
|
if "url" in retdict_t["data"]:
|
|
returl = retdict_t["data"]["url"]
|
|
state = "success"
|
|
if retdict_t["data"]["status"] == "Fail":
|
|
returl = url
|
|
print (json.dumps(retdict_t,ensure_ascii=False,indent=4))
|
|
break
|
|
time.sleep(SYSOM_POLL_INTERVAL)
|
|
end_time = datetime.now()
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
logger.exception(e)
|
|
pass
|
|
|
|
try:
|
|
update_or_create_rcacallrecord(db, RcaCallRecord(
|
|
recordid=recordid, state=state, url=returl, user=user, timestamp=timestamp ))
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
logger.exception(e)
|
|
pass
|
|
|
|
return returl
|
|
|
|
@app.post("/api/v1/rca/rca_call")
|
|
async def rca_call_post(request: dict):
|
|
retdict = {"success":False,"errmsg":"","dist":[],"summary":"","url":"/diagnose/custom/rca"}
|
|
param_list = ["time","base_item","machine_ip"]
|
|
try:
|
|
log_time = time.strftime("%Y%m%d%H%M%S", time.localtime())
|
|
errmsg = ""
|
|
for param in param_list:
|
|
if param not in request:
|
|
errmsg += "Missing parametre: %s\n"%param
|
|
retdict["errmsg"] = errmsg
|
|
print ("%s; params: %s\n"%(errmsg,retdict))
|
|
return retdict["url"]
|
|
diag_dict = {}
|
|
diag_dict["rca_type"] = "rca"
|
|
diag_dict["time"] = request["time"]
|
|
diag_dict["base_item"] = request["base_item"]
|
|
diag_dict["instance"] = request["machine_ip"]
|
|
diag_dict["service_name"] = "rca"
|
|
|
|
retdict["url"] = rca_call_raw(diag_dict)
|
|
|
|
except:
|
|
traceback.print_exc()
|
|
pass
|
|
print ("Redirect Url: %s\n"%retdict["url"])
|
|
return retdict["url"]
|
|
|
|
@app.get("/api/v1/rca/rca_call")
|
|
async def rca_call_get(timestamp: str = None, base_item: str = None,machine_ip: str = None):
|
|
retdict = {"success":False,"errmsg":"","dist":[],"summary":"","url":"/diagnose/custom/rca"}
|
|
try:
|
|
log_time = time.strftime("%Y%m%d%H%M%S", time.localtime())
|
|
errmsg = ""
|
|
if timestamp is None:
|
|
errmsg += "Missing parametre: timestamp\n"
|
|
if base_item is None:
|
|
errmsg += "Missing parametre: base_item\n"
|
|
if machine_ip is None:
|
|
errmsg += "Missing parametre: machine_ip\n"
|
|
if len(errmsg) > 0:
|
|
retdict["errmsg"] = errmsg
|
|
return RedirectResponse(retdict["url"])
|
|
diag_dict = {}
|
|
diag_dict["rca_type"] = "rca"
|
|
diag_dict["time"] = timestamp
|
|
diag_dict["base_item"] = base_item
|
|
diag_dict["instance"] = machine_ip
|
|
diag_dict["service_name"] = "rca"
|
|
|
|
retdict["url"] = rca_call_raw(diag_dict)
|
|
|
|
except:
|
|
traceback.print_exc()
|
|
pass
|
|
print ("Redirect Url: %s\n"%retdict["url"])
|
|
return RedirectResponse(retdict["url"])
|
|
|
|
def format_time_to_timestamp(t):
|
|
ret = {"success":True,"timestamp":""}
|
|
ret["timestamp"] = t
|
|
try:
|
|
ret["timestamp"] = int(float(t))
|
|
except:
|
|
try:
|
|
struct_time = time.strptime(t, '%Y-%m-%d %H:%M:%S')
|
|
ret["timestamp"] = time.mktime(struct_time)
|
|
except:
|
|
ret["success"] = False
|
|
traceback.print_exc()
|
|
pass
|
|
pass
|
|
return ret
|
|
|
|
def pull_baseitem_data(warn_time,sp_item,machine_ip):
|
|
data_dict = {"success":False,"errmsg":"","values":[]}
|
|
sysommonit_items_all = {**sysommonit_items,**sysommonit_items_obser}
|
|
try:
|
|
if "sysom" in sp_item or "sysak" in sp_item:
|
|
if "-" in sp_item:
|
|
tag_list = sp_item.split('-')
|
|
sysom_tag = tag_list[0]
|
|
if sysom_tag not in sysommonit_items_all:
|
|
data_dict["errmsg"] = "no record of %s in sysommonit_items_all"%sysom_tag
|
|
return data_dict
|
|
if len(tag_list)-1 > len(sysommonit_items_all[sysom_tag]):
|
|
data_dict["errmsg"] = "sysommonit_items_all not match tag: %s"%tag_list
|
|
return data_dict
|
|
tag_value = tag_list[1:]
|
|
tag_name = []
|
|
for i in range(len(sysommonit_items_all[sysom_tag])):
|
|
if sysommonit_items_all[sysom_tag][i] != "instance":
|
|
tag_name.append(sysommonit_items_all[sysom_tag][i])
|
|
|
|
task = RangeQueryTask(sysom_tag, warn_time - 600, warn_time + 180).append_wildcard_filter("instance", "%s:8400"%machine_ip)
|
|
ret_tmp = metric_reader.range_query([task])
|
|
if len(ret_tmp.data) <= 0:
|
|
data_dict["errmsg"] = "no data of %s"%sp_item
|
|
return data_dict
|
|
if len(ret_tmp.data) > 0:
|
|
for i in range(len(ret_tmp.data)):
|
|
match_cnt = 0
|
|
if "labels" in ret_tmp.data[i].to_dict():
|
|
if ret_tmp.data[i].to_dict()["labels"]["instance"] != "%s:8400"%machine_ip:
|
|
continue
|
|
item_labels = ret_tmp.data[i].to_dict()["labels"]
|
|
for j in range(len(tag_name)):
|
|
if tag_name[j] not in item_labels:
|
|
data_dict["errmsg"] = "%s has no tag: %s"%(sysom_tag,tag_name[j])
|
|
return data_dict
|
|
for k in range(len(tag_value)):
|
|
if item_labels[tag_name[j]] == tag_value[k]:
|
|
match_cnt += 1
|
|
else:
|
|
data_dict["errmsg"] = "fail to get data of %s: no labels"%sysom_tag
|
|
return data_dict
|
|
if match_cnt == len(tag_value):
|
|
data_dict["values"] = ret_tmp.data[i].to_dict()["values"]
|
|
data_dict["success"] = True
|
|
return data_dict
|
|
data_dict["errmsg"] = "fail to get data of %s: not match!"%sysom_tag
|
|
else:
|
|
task = RangeQueryTask(sp_item, warn_time - 600, warn_time + 180).append_wildcard_filter("instance", "*")
|
|
ret_tmp = metric_reader.range_query([task])
|
|
if len(ret_tmp.data) <= 0:
|
|
data_dict["success"] = False
|
|
data_dict["errmsg"] = "no data of %s"%sp_item
|
|
return data_dict
|
|
data_dict["success"] = True
|
|
data_dict["values"] = ret_tmp.data[0].to_dict()["values"]
|
|
|
|
except:
|
|
traceback.print_exc()
|
|
data_dict["success"] = False
|
|
data_dict["errmsg"] = "fail to get data of %s"%sp_item
|
|
pass
|
|
return data_dict
|
|
|
|
def pull_monidata(warn_time,sp_item,machine_ip):
|
|
try:
|
|
adict = {}
|
|
adict["item"] = {}
|
|
adict["timestamp"] = warn_time
|
|
adict["base_item_name"] = sp_item
|
|
adict["success"] = True
|
|
adict["errmsg"] = ""
|
|
print ("original time:%s"%warn_time)
|
|
rettime = format_time_to_timestamp(warn_time)
|
|
if ":" in machine_ip:
|
|
machine_ip = machine_ip.split(":")[0]
|
|
if rettime["success"] == True:
|
|
warn_time = rettime["timestamp"]
|
|
else:
|
|
adict["success"] = False
|
|
adict["errmsg"] = "time format wrong! should be timestamp or type: Y-M-D H:M:S"
|
|
return adict
|
|
print ("transfer time:%s"%warn_time)
|
|
adict["timestamp"] = warn_time
|
|
if sp_item in sysommonit_items_general_name:
|
|
sp_item = sysommonit_items_general_name[sp_item]
|
|
adict["base_item_name"] = sp_item
|
|
|
|
retdata_dict = pull_baseitem_data(warn_time,sp_item,machine_ip)
|
|
if retdata_dict["success"] != True:
|
|
adict["success"] = False
|
|
adict["errmsg"] = retdata_dict["errmsg"]
|
|
return adict
|
|
ret_tmp_value = retdata_dict["values"]
|
|
|
|
adict["base"] = {}
|
|
|
|
adict["base"]["value"],adict["base"]["timestamp"] = format_list(ret_tmp_value)
|
|
ret_items = metric_reader.get_metric_names()
|
|
for item in ret_items.data:
|
|
if item == sp_item or item not in sysommonit_items:
|
|
continue
|
|
task = RangeQueryTask(item, warn_time - 600, warn_time + 180).append_wildcard_filter("instance", "*")
|
|
ret_tmp = metric_reader.range_query([task])
|
|
if len(ret_tmp.data) <= 0:
|
|
print ("ignore: ------",item,ret_tmp.data)
|
|
if len(ret_tmp.data) > 0:
|
|
for i in range(len(ret_tmp.data)):
|
|
if "labels" in ret_tmp.data[i].to_dict():
|
|
if ret_tmp.data[i].to_dict()["labels"]["instance"] != "%s:8400"%machine_ip:
|
|
continue
|
|
item_key = item
|
|
for j in sysommonit_items[item]:
|
|
if j == "instance":
|
|
continue
|
|
item_key = "%s-%s"%(item_key,ret_tmp.data[i].to_dict()["labels"][j])
|
|
if len(ret_tmp.data[i].to_dict()["values"]) > 0:
|
|
if item_key == sp_item:
|
|
continue
|
|
adict["item"][item_key] = {}
|
|
adict["item"][item_key]["value"],adict["item"][item_key]["timestamp"] = format_list(ret_tmp.data[i].to_dict()["values"])
|
|
return adict
|
|
|
|
except:
|
|
traceback.print_exc()
|
|
|
|
@app.post("/api/v1/rca/rca_entry")
|
|
async def rca_entry(request: dict):
|
|
retdict = {"success":False,"errmsg":"","dist":[],"summary":"","sum_dict":{"ref_item_sum":"","fix_sum":"","ref_item_list":""},"pic_url":""}
|
|
param_list = ["rca_type","timestamp","base_item"]
|
|
try:
|
|
log_time = time.strftime("%Y%m%d%H%M%S", time.localtime())
|
|
print (log_time)
|
|
errmsg = ""
|
|
for param in param_list:
|
|
if param not in request:
|
|
errmsg += "Missing parametre: %s\n"%param
|
|
retdict["errmsg"] = errmsg
|
|
return retdict
|
|
|
|
|
|
if request["rca_type"] == "rca":
|
|
if "machine_ip" not in request:
|
|
errmsg += "Missing parametre: machine_ip\n"
|
|
retdict["errmsg"] = errmsg
|
|
return retdict
|
|
adict = pull_monidata(request["timestamp"],request["base_item"],request["machine_ip"])
|
|
if adict["success"] != True:
|
|
retdict["errmsg"] = adict["errmsg"]
|
|
return retdict
|
|
elif request["rca_type"] == "dtw":
|
|
if request["base_item"] not in adict["item"]:
|
|
errmsg += "base_item not included in item!\n"
|
|
retdict["errmsg"] = errmsg
|
|
return retdict
|
|
adict = {}
|
|
adict["item"] = {}
|
|
adict["timestamp"] = request["timestamp"]
|
|
adict["base_item_name"] = request["base_item"]
|
|
adict["base"] = {}
|
|
adict["base"]["value"],adict["base"]["timestamp"] = format_list(request["item"][request["base_item"]])
|
|
for item in request["item"]:
|
|
if item == request["base_item"]:
|
|
continue
|
|
|
|
adict["item"][item] = {}
|
|
adict["item"][item]["value"],adict["item"][item]["timestamp"] = format_list(request["item"][item])
|
|
if len(adict["item"]) <= 1:
|
|
errmsg += "items not enough!\n"
|
|
|
|
if len(errmsg) > 0:
|
|
retdict["errmsg"] = errmsg
|
|
return retdict
|
|
|
|
recordid = "%s-%s-%s"%(request["base_item"],request["machine_ip"],request["timestamp"])
|
|
url = ""
|
|
machine_ip = request["machine_ip"]
|
|
try:
|
|
time_occur = adict["timestamp"]
|
|
time_start = adict["base"]["timestamp"][0]
|
|
time_end = adict["base"]["timestamp"][-1]
|
|
state_itemsrecord = "items_got"
|
|
user = ""
|
|
metric_dict = json.dumps(adict,ensure_ascii=False)
|
|
rca_conclusion = ""
|
|
final_conclusion = ""
|
|
update_or_create_rcaitemsrecord(db, RcaItemsRecord(
|
|
recordid=recordid, state=state_itemsrecord, url=url, user=user, time_occur=time_occur,
|
|
time_start=time_start,time_end=time_end,machine_ip=machine_ip, metric_dict=metric_dict,
|
|
rca_conclusion=rca_conclusion,final_conclusion=final_conclusion))
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
logger.exception(e)
|
|
pass
|
|
|
|
adtw= Dtw()
|
|
bdict = adtw.dtw_pre(adict)
|
|
ret = adtw.dtw_loop(bdict)
|
|
a = {}
|
|
for i in ret["item"]:
|
|
a[i] = ret["item"][i]["dist"]
|
|
retdict["dist"] = sorted(a.items(), key=lambda x:x[1], reverse=False)
|
|
|
|
pic_pathname = "test-%s.png"%log_time
|
|
adtw.dtw_plot(pic_pathname,19)
|
|
rca_ret = rca_analysis_entry(ret)
|
|
|
|
try:
|
|
url = "/diagnose/rca/%s"%pic_pathname
|
|
cmd = """ /bin/mv /usr/local/sysom/server/sysom_rca/%s /usr/local/sysom/web/diagnose/rca/%s"""%(pic_pathname,pic_pathname)
|
|
print (cmd)
|
|
output = os.popen(cmd)
|
|
output_msg = output.read()
|
|
output.close()
|
|
|
|
try:
|
|
record_item = update_rcacallrecord_state(db,"success",recordid)
|
|
rca_conclusion = rca_ret["summary"]
|
|
state_itemsrecord = "items_analyzed"
|
|
update_or_create_rcaitemsrecord(db, RcaItemsRecord(
|
|
recordid=recordid, state=state_itemsrecord, url=url, user=user, time_occur=time_occur,
|
|
time_start=time_start,time_end=time_end,machine_ip=machine_ip, metric_dict=metric_dict,
|
|
rca_conclusion=rca_conclusion,final_conclusion=final_conclusion))
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
logger.exception(e)
|
|
pass
|
|
|
|
except Exception as e:
|
|
logger.exception(e)
|
|
pass
|
|
ret_msg = "%s"%(rca_ret["summary"].replace("\n","\n\n\n\n"))
|
|
for i in rca_ret["sum_dict"]:
|
|
rca_ret["sum_dict"][i] = rca_ret["sum_dict"][i].replace("\n","\n\n\n\n")
|
|
retdict["pic_url"] = url
|
|
|
|
retdict["success"] = True
|
|
retdict["summary"] = ret_msg
|
|
retdict["sum_dict"] = rca_ret["sum_dict"]
|
|
return retdict
|
|
|
|
except:
|
|
traceback.print_exc()
|
|
retdict["errmsg"] += "Internal error!"
|
|
return retdict
|
|
|
|
def init_framwork():
|
|
SysomFramework\
|
|
.init(YAML_CONFIG) \
|
|
.load_plugin_cls(CmgPlugin) \
|
|
.start()
|
|
logger.info("SysomFramework init finished!")
|
|
|
|
|
|
@app.on_event("startup")
|
|
async def on_start():
|
|
init_framwork()
|
|
|
|
#############################################################################
|
|
# Perform some microservice initialization operations over here
|
|
#############################################################################
|
|
|
|
|
|
@app.on_event("shutdown")
|
|
async def on_shutdown():
|
|
pass
|