sysom1/sysom_server/sysom_rca/main.py

466 lines
18 KiB
Python

# -*- coding: utf-8 -*- #
"""
Time 2023/06/21 15:37
Author: chenshiyan
Email chenshiyan@linux.alibaba.com
File main.py
Description:
"""
from clogger import logger
from fastapi import FastAPI
from app.routers import health
from conf.settings import YAML_CONFIG
from sysom_utils import CmgPlugin, SysomFramework
import traceback
import sys,os,json
sys.path.append("%s/lib"%(os.path.dirname(os.path.abspath(__file__))))
import time
import datetime
from datetime import date, datetime, timedelta
import requests
from dtw import accelerated_dtw,Dtw
from metric_reader import MetricReader, dispatch_metric_reader,RangeQueryTask
from sysom_monitor_item import sysommonit_items,sysommonit_items_obser,sysommonit_items_general_name
from rca_methods import rca_analysis_entry
from sysom_utils import SysomFramework
from fastapi.responses import RedirectResponse
from app.database import SessionLocal
from app.crud import create_rcacallrecord,get_rcacallrecord_by_recordid,update_or_create_rcacallrecord,update_rcacallrecord_state
from app.crud import create_rcaitemsrecord,get_rcaitemsrecord_by_recordid,update_or_create_rcaitemsrecord
from app.schemas import RcaCallRecord,RcaItemsRecord
g_client = SysomFramework.gclient("sysom_diagnosis")
metric_reader = dispatch_metric_reader("prometheus://localhost:9090")
app = FastAPI()
app.include_router(health.router, prefix="/api/v1/rca/health")
db = SessionLocal()
SYSOM_POLL_TIMEOUT = 5
SYSOM_POLL_INTERVAL = 1
#############################################################################
# Write your API interface here, or add to app/routes
#############################################################################
def format_list(alist):
ret = {}
ts = []
for i in range(len(alist)):
ret[i] = float(alist[i][1])
ts.append(alist[i][0])
return ret,ts
def get_diagnose_result(taskid):
retdict = {"data": {"status": "Failed"}}
try:
retdiag = g_client.get("api/v1/tasks/%s/"%taskid)
retdict = retdiag.json()
except:
print ("get_diagnose_result exception!")
traceback.print_exc()
pass
return retdict
def rca_call_raw(diag_dict):
recordid = "%s-%s-%s"%(diag_dict["base_item"],diag_dict["instance"],diag_dict["time"])
timestamp = time.time()
state = "calling"
url = "/diagnose/custom/rca"
user = "rca_call"
try:
record_item = get_rcacallrecord_by_recordid(db, recordid)
if record_item is not None:
if (timestamp - record_item.timestamp) <= 180 or record_item.state == "success":
return record_item.url
else:
record_item = RcaCallRecord.from_orm(create_rcacallrecord(db, RcaCallRecord(
recordid=recordid, state=state, url=url, user=user,timestamp=timestamp )))
except Exception as e:
traceback.print_exc()
logger.exception(e)
pass
returl = "/diagnose/custom/rca"
retdiag = g_client.post("api/v1/tasks/",json=diag_dict)
try:
retdiag_dict = retdiag.json()
if retdiag_dict["success"] == True:
taskid = retdiag_dict["data"]["task_id"]
returl = "/diagnose/detail/%s"%taskid
start_time = datetime.now()
end_time = datetime.now()
while True:
if (end_time - start_time).seconds >= SYSOM_POLL_TIMEOUT:
break
retdict_t = get_diagnose_result(taskid)
state = retdict_t["data"]["status"]
if retdict_t["data"]["status"] == "Success":
if "url" in retdict_t["data"]:
returl = retdict_t["data"]["url"]
state = "success"
if retdict_t["data"]["status"] == "Fail":
returl = url
print (json.dumps(retdict_t,ensure_ascii=False,indent=4))
break
time.sleep(SYSOM_POLL_INTERVAL)
end_time = datetime.now()
except Exception as e:
traceback.print_exc()
logger.exception(e)
pass
try:
update_or_create_rcacallrecord(db, RcaCallRecord(
recordid=recordid, state=state, url=returl, user=user, timestamp=timestamp ))
except Exception as e:
traceback.print_exc()
logger.exception(e)
pass
return returl
@app.post("/api/v1/rca/rca_call")
async def rca_call_post(request: dict):
retdict = {"success":False,"errmsg":"","dist":[],"summary":"","url":"/diagnose/custom/rca"}
param_list = ["time","base_item","machine_ip"]
try:
log_time = time.strftime("%Y%m%d%H%M%S", time.localtime())
errmsg = ""
for param in param_list:
if param not in request:
errmsg += "Missing parametre: %s\n"%param
retdict["errmsg"] = errmsg
print ("%s; params: %s\n"%(errmsg,retdict))
return retdict["url"]
diag_dict = {}
diag_dict["rca_type"] = "rca"
diag_dict["time"] = request["time"]
diag_dict["base_item"] = request["base_item"]
diag_dict["instance"] = request["machine_ip"]
diag_dict["service_name"] = "rca"
retdict["url"] = rca_call_raw(diag_dict)
except:
traceback.print_exc()
pass
print ("Redirect Url: %s\n"%retdict["url"])
return retdict["url"]
@app.get("/api/v1/rca/rca_call")
async def rca_call_get(timestamp: str = None, base_item: str = None,machine_ip: str = None):
retdict = {"success":False,"errmsg":"","dist":[],"summary":"","url":"/diagnose/custom/rca"}
try:
log_time = time.strftime("%Y%m%d%H%M%S", time.localtime())
errmsg = ""
if timestamp is None:
errmsg += "Missing parametre: timestamp\n"
if base_item is None:
errmsg += "Missing parametre: base_item\n"
if machine_ip is None:
errmsg += "Missing parametre: machine_ip\n"
if len(errmsg) > 0:
retdict["errmsg"] = errmsg
return RedirectResponse(retdict["url"])
diag_dict = {}
diag_dict["rca_type"] = "rca"
diag_dict["time"] = timestamp
diag_dict["base_item"] = base_item
diag_dict["instance"] = machine_ip
diag_dict["service_name"] = "rca"
retdict["url"] = rca_call_raw(diag_dict)
except:
traceback.print_exc()
pass
print ("Redirect Url: %s\n"%retdict["url"])
return RedirectResponse(retdict["url"])
def format_time_to_timestamp(t):
ret = {"success":True,"timestamp":""}
ret["timestamp"] = t
try:
ret["timestamp"] = int(float(t))
except:
try:
struct_time = time.strptime(t, '%Y-%m-%d %H:%M:%S')
ret["timestamp"] = time.mktime(struct_time)
except:
ret["success"] = False
traceback.print_exc()
pass
pass
return ret
def pull_baseitem_data(warn_time,sp_item,machine_ip):
data_dict = {"success":False,"errmsg":"","values":[]}
sysommonit_items_all = {**sysommonit_items,**sysommonit_items_obser}
try:
if "sysom" in sp_item or "sysak" in sp_item:
if "-" in sp_item:
tag_list = sp_item.split('-')
sysom_tag = tag_list[0]
if sysom_tag not in sysommonit_items_all:
data_dict["errmsg"] = "no record of %s in sysommonit_items_all"%sysom_tag
return data_dict
if len(tag_list)-1 > len(sysommonit_items_all[sysom_tag]):
data_dict["errmsg"] = "sysommonit_items_all not match tag: %s"%tag_list
return data_dict
tag_value = tag_list[1:]
tag_name = []
for i in range(len(sysommonit_items_all[sysom_tag])):
if sysommonit_items_all[sysom_tag][i] != "instance":
tag_name.append(sysommonit_items_all[sysom_tag][i])
task = RangeQueryTask(sysom_tag, warn_time - 600, warn_time + 180).append_wildcard_filter("instance", "%s:8400"%machine_ip)
ret_tmp = metric_reader.range_query([task])
if len(ret_tmp.data) <= 0:
data_dict["errmsg"] = "no data of %s"%sp_item
return data_dict
if len(ret_tmp.data) > 0:
for i in range(len(ret_tmp.data)):
match_cnt = 0
if "labels" in ret_tmp.data[i].to_dict():
if ret_tmp.data[i].to_dict()["labels"]["instance"] != "%s:8400"%machine_ip:
continue
item_labels = ret_tmp.data[i].to_dict()["labels"]
for j in range(len(tag_name)):
if tag_name[j] not in item_labels:
data_dict["errmsg"] = "%s has no tag: %s"%(sysom_tag,tag_name[j])
return data_dict
for k in range(len(tag_value)):
if item_labels[tag_name[j]] == tag_value[k]:
match_cnt += 1
else:
data_dict["errmsg"] = "fail to get data of %s: no labels"%sysom_tag
return data_dict
if match_cnt == len(tag_value):
data_dict["values"] = ret_tmp.data[i].to_dict()["values"]
data_dict["success"] = True
return data_dict
data_dict["errmsg"] = "fail to get data of %s: not match!"%sysom_tag
else:
task = RangeQueryTask(sp_item, warn_time - 600, warn_time + 180).append_wildcard_filter("instance", "*")
ret_tmp = metric_reader.range_query([task])
if len(ret_tmp.data) <= 0:
data_dict["success"] = False
data_dict["errmsg"] = "no data of %s"%sp_item
return data_dict
data_dict["success"] = True
data_dict["values"] = ret_tmp.data[0].to_dict()["values"]
except:
traceback.print_exc()
data_dict["success"] = False
data_dict["errmsg"] = "fail to get data of %s"%sp_item
pass
return data_dict
def pull_monidata(warn_time,sp_item,machine_ip):
try:
adict = {}
adict["item"] = {}
adict["timestamp"] = warn_time
adict["base_item_name"] = sp_item
adict["success"] = True
adict["errmsg"] = ""
print ("original time:%s"%warn_time)
rettime = format_time_to_timestamp(warn_time)
if ":" in machine_ip:
machine_ip = machine_ip.split(":")[0]
if rettime["success"] == True:
warn_time = rettime["timestamp"]
else:
adict["success"] = False
adict["errmsg"] = "time format wrong! should be timestamp or type: Y-M-D H:M:S"
return adict
print ("transfer time:%s"%warn_time)
adict["timestamp"] = warn_time
if sp_item in sysommonit_items_general_name:
sp_item = sysommonit_items_general_name[sp_item]
adict["base_item_name"] = sp_item
retdata_dict = pull_baseitem_data(warn_time,sp_item,machine_ip)
if retdata_dict["success"] != True:
adict["success"] = False
adict["errmsg"] = retdata_dict["errmsg"]
return adict
ret_tmp_value = retdata_dict["values"]
adict["base"] = {}
adict["base"]["value"],adict["base"]["timestamp"] = format_list(ret_tmp_value)
ret_items = metric_reader.get_metric_names()
for item in ret_items.data:
if item == sp_item or item not in sysommonit_items:
continue
task = RangeQueryTask(item, warn_time - 600, warn_time + 180).append_wildcard_filter("instance", "*")
ret_tmp = metric_reader.range_query([task])
if len(ret_tmp.data) <= 0:
print ("ignore: ------",item,ret_tmp.data)
if len(ret_tmp.data) > 0:
for i in range(len(ret_tmp.data)):
if "labels" in ret_tmp.data[i].to_dict():
if ret_tmp.data[i].to_dict()["labels"]["instance"] != "%s:8400"%machine_ip:
continue
item_key = item
for j in sysommonit_items[item]:
if j == "instance":
continue
item_key = "%s-%s"%(item_key,ret_tmp.data[i].to_dict()["labels"][j])
if len(ret_tmp.data[i].to_dict()["values"]) > 0:
if item_key == sp_item:
continue
adict["item"][item_key] = {}
adict["item"][item_key]["value"],adict["item"][item_key]["timestamp"] = format_list(ret_tmp.data[i].to_dict()["values"])
return adict
except:
traceback.print_exc()
@app.post("/api/v1/rca/rca_entry")
async def rca_entry(request: dict):
retdict = {"success":False,"errmsg":"","dist":[],"summary":"","sum_dict":{"ref_item_sum":"","fix_sum":"","ref_item_list":""},"pic_url":""}
param_list = ["rca_type","timestamp","base_item"]
try:
log_time = time.strftime("%Y%m%d%H%M%S", time.localtime())
print (log_time)
errmsg = ""
for param in param_list:
if param not in request:
errmsg += "Missing parametre: %s\n"%param
retdict["errmsg"] = errmsg
return retdict
if request["rca_type"] == "rca":
if "machine_ip" not in request:
errmsg += "Missing parametre: machine_ip\n"
retdict["errmsg"] = errmsg
return retdict
adict = pull_monidata(request["timestamp"],request["base_item"],request["machine_ip"])
if adict["success"] != True:
retdict["errmsg"] = adict["errmsg"]
return retdict
elif request["rca_type"] == "dtw":
if request["base_item"] not in adict["item"]:
errmsg += "base_item not included in item!\n"
retdict["errmsg"] = errmsg
return retdict
adict = {}
adict["item"] = {}
adict["timestamp"] = request["timestamp"]
adict["base_item_name"] = request["base_item"]
adict["base"] = {}
adict["base"]["value"],adict["base"]["timestamp"] = format_list(request["item"][request["base_item"]])
for item in request["item"]:
if item == request["base_item"]:
continue
adict["item"][item] = {}
adict["item"][item]["value"],adict["item"][item]["timestamp"] = format_list(request["item"][item])
if len(adict["item"]) <= 1:
errmsg += "items not enough!\n"
if len(errmsg) > 0:
retdict["errmsg"] = errmsg
return retdict
recordid = "%s-%s-%s"%(request["base_item"],request["machine_ip"],request["timestamp"])
url = ""
machine_ip = request["machine_ip"]
try:
time_occur = adict["timestamp"]
time_start = adict["base"]["timestamp"][0]
time_end = adict["base"]["timestamp"][-1]
state_itemsrecord = "items_got"
user = ""
metric_dict = json.dumps(adict,ensure_ascii=False)
rca_conclusion = ""
final_conclusion = ""
update_or_create_rcaitemsrecord(db, RcaItemsRecord(
recordid=recordid, state=state_itemsrecord, url=url, user=user, time_occur=time_occur,
time_start=time_start,time_end=time_end,machine_ip=machine_ip, metric_dict=metric_dict,
rca_conclusion=rca_conclusion,final_conclusion=final_conclusion))
except Exception as e:
traceback.print_exc()
logger.exception(e)
pass
adtw= Dtw()
bdict = adtw.dtw_pre(adict)
ret = adtw.dtw_loop(bdict)
a = {}
for i in ret["item"]:
a[i] = ret["item"][i]["dist"]
retdict["dist"] = sorted(a.items(), key=lambda x:x[1], reverse=False)
pic_pathname = "test-%s.png"%log_time
adtw.dtw_plot(pic_pathname,19)
rca_ret = rca_analysis_entry(ret)
try:
url = "/diagnose/rca/%s"%pic_pathname
cmd = """ /bin/mv /usr/local/sysom/server/sysom_rca/%s /usr/local/sysom/web/diagnose/rca/%s"""%(pic_pathname,pic_pathname)
print (cmd)
output = os.popen(cmd)
output_msg = output.read()
output.close()
try:
record_item = update_rcacallrecord_state(db,"success",recordid)
rca_conclusion = rca_ret["summary"]
state_itemsrecord = "items_analyzed"
update_or_create_rcaitemsrecord(db, RcaItemsRecord(
recordid=recordid, state=state_itemsrecord, url=url, user=user, time_occur=time_occur,
time_start=time_start,time_end=time_end,machine_ip=machine_ip, metric_dict=metric_dict,
rca_conclusion=rca_conclusion,final_conclusion=final_conclusion))
except Exception as e:
traceback.print_exc()
logger.exception(e)
pass
except Exception as e:
logger.exception(e)
pass
ret_msg = "%s"%(rca_ret["summary"].replace("\n","\n\n\n\n"))
for i in rca_ret["sum_dict"]:
rca_ret["sum_dict"][i] = rca_ret["sum_dict"][i].replace("\n","\n\n\n\n")
retdict["pic_url"] = url
retdict["success"] = True
retdict["summary"] = ret_msg
retdict["sum_dict"] = rca_ret["sum_dict"]
return retdict
except:
traceback.print_exc()
retdict["errmsg"] += "Internal error!"
return retdict
def init_framwork():
SysomFramework\
.init(YAML_CONFIG) \
.load_plugin_cls(CmgPlugin) \
.start()
logger.info("SysomFramework init finished!")
@app.on_event("startup")
async def on_start():
init_framwork()
#############################################################################
# Perform some microservice initialization operations over here
#############################################################################
@app.on_event("shutdown")
async def on_shutdown():
pass