sysom1/sysom_server/sysom_diagnosis/service_scripts/generate_cmds.py

200 lines
7.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import subprocess
import os
import json
import ast
import logging
import re
from typing import List, Optional, Type
from importlib import import_module
from base import DiagnosisTask, DiagnosisJob, DiagnosisPreProcessor
logger = logging.getLogger("generate_cmds")
def run_subprocess(cmd: List[str]) -> dict:
resp = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return {
"stdout": resp.stdout.decode("utf-8"),
"stderr": resp.stderr.decode("utf-8"),
"returncode": resp.returncode,
}
def preprocess_v1_async(service_name: str, params: dict) -> DiagnosisTask:
""" "Perform diagnosis preprocessing
{
"commands":[
{
"instance":"xxx",
"cmd":"xxx",
"params":{ => overide initial param
"region":"target_region"
}
}
]
}
"""
# 2. Invoke preprocessing scriptpreprocessing script
SCRIPTS_DIR = "."
service_path = os.path.join(SCRIPTS_DIR, service_name)
if not os.path.exists(service_path):
raise Exception("Can not find script file, please check service name")
try:
resp = run_subprocess([service_path, json.dumps(params)])
except Exception as exc:
raise Exception(f"Execute preprocess script error: {str(exc)}") from exc
# 3. If the preprocessing script executes with an error
if resp["returncode"] != 0:
raise (Exception(f"Execute preprocess script error: {resp['stderr']}"))
# 4. If the preprocessing script executes successfully,
# take out the processing result
stdout = resp["stdout"]
resp = ast.literal_eval(stdout)
resp_scripts = resp.get("commands")
# 5. If the preprocessing result not contains 'commands', it's a not expect bug
if not resp_scripts:
raise (
Exception(
f"Not find commands, please check the preprocess script return"
)
)
diagnosis_task = DiagnosisTask(
jobs=[DiagnosisJob.from_dict(item) for item in resp_scripts], in_order=True
)
return diagnosis_task
def preprocess_v2_async(service_name: str, params: dict) -> Optional[DiagnosisTask]:
"""Pre-processing V2
Args:
instance (JobModel): JobModel
Returns:
Optional[DiagnosisTask]: Diagnosis task
"""
def _get_pre_processor(service_name: str) -> Type[DiagnosisPreProcessor]:
"""
根据要执行的命令,动态引入一个 PreProcessor 的实现用于执行前处理
"""
try:
return import_module(f"service_scripts.{service_name}_pre").PreProcessor
except Exception as e:
raise Exception(f"No Pre-processor available => {str(e)}")
# 2. Use PreProcessor to check if the version of the tool meets the requirements
try:
params.pop("service_name", "")
pre_processor = _get_pre_processor(service_name)(service_name, **params)
except Exception as e:
return None
# 3. Use PreProcessor to convert params to diagnosis jobs
diagnosis_task = pre_processor.get_diagnosis_cmds(params)
if diagnosis_task is None or len(diagnosis_task.jobs) == 0:
raise Exception(f"Pre-processor not return any diagnosis job")
return diagnosis_task
def preprocess_async(service_name: str, params: dict) -> Optional[DiagnosisTask]:
""" "Perform diagnosis preprocessing
{
"commands":[
{
"instance":"xxx",
"cmd":"xxx",
"params":{ => overide initial param
"region":"target_region"
}
}
]
}
"""
diagnosis_task: Optional[DiagnosisTask] = None
try:
diagnosis_task = preprocess_v2_async(service_name, params)
if diagnosis_task is None:
diagnosis_task = preprocess_v1_async(service_name, params)
except Exception as exc:
logger.exception(f"Diagnosis preprocess error: {str(exc)}")
return diagnosis_task
def get_trim_match_obj_value(match_obj, idx: int):
value = match_obj.group(idx)
if value is None:
return ""
else:
return value.strip()
if __name__ == "__main__":
diagnosis_cmds = {
"filecache": {"instance": "127.0.0.1", "value": 1},
"iofsstat": {"instance": "127.0.0.1"},
"iohang": {"instance": "127.0.0.1"},
"iolatency": {"instance": "127.0.0.1"},
"iosdiag_latency": {"instance": "127.0.0.1"},
"jitter": {"instance": "127.0.0.1", "time": 1},
"loadtask": {"instance": "127.0.0.1"},
"memgraph": {"instance": "127.0.0.1"},
"oomcheck": {"instance": "127.0.0.1"},
"ossre": {"instance": "127.0.0.1"},
"packetdrop": {"instance": "127.0.0.1", "time": 1},
"pingtrace": {"origin_instance": "127.0.0.1", "pkg_num": 1, "time_gap": 1, "target_instance": "192.168.0.22"},
"retran": {"instance": "127.0.0.1", "time": 1},
"schedmoni": {"instance": "127.0.0.1"},
"taskprofile": {"instance": "127.0.0.1", "timeout": 1}
}
for service_name, params in diagnosis_cmds.items():
diagnosis_task = preprocess_async(service_name, params)
if diagnosis_task is not None:
for idx, job in enumerate(diagnosis_task.jobs):
match_obj = re.match(r'(.*)sysak (-g )*(memgraph|podmem|iofsstat|iosdiag|rtrace|loadtask|oomcheck|ossre_client|pingtrace|schedmoni)(.*?)(>>|&&|>|&)(.*)', job.cmd, re.I)
# 1 => sysak 之前的命令
# 2 => -g or None
# 3 => sysak subcommand (memgraph|podmem|iofsstat|iosdiag|rtrace|loadtask|oomcheck|ossre_client|pingtrace|schedmoni)
# 4 => sysak subcommand 参数
# 5 => >> | && | > | &
# 6 => sysak 之后的命令
if match_obj:
sysak_pre = get_trim_match_obj_value(match_obj, 1)
sysak_g = get_trim_match_obj_value(match_obj, 2)
sysak_sub_cmd = get_trim_match_obj_value(match_obj, 3)
sysak_params = get_trim_match_obj_value(match_obj, 4)
sysak_sep = get_trim_match_obj_value(match_obj, 5)
sysak_post = get_trim_match_obj_value(match_obj, 6)
pre = sysak_pre
sysak_cmd = f"sysak{'' if not sysak_g else ' ' + sysak_g} {sysak_sub_cmd} {sysak_params}"
post = f"{sysak_sep} {sysak_post}"
sub_func = ""
if service_name == "pingtrace":
sub_func = \
f"""
sub_{service_name}_{"client" if "-c" in sysak_params else "server"}() {"{"}
{pre} sysak $@ {post}
{"}"}
"""
else:
sub_func = \
f"""
sub_{service_name}() {"{"}
{pre} sysak $@ {post}
{"}"}
"""
print(sub_func)
# print(' '.join([
# pre,
# sysak_cmd,
# post
# ]))
else:
raise Exception(f"{service_name} genrate failed")
# (.*)sysak (-g )*(memgraph|podmem|iofsstat|iosdiag|rtrace|loadtask|oomcheck|ossre_client|pingtrace|schedmoni)(.*?)(>|>>|&|&&)(.*)