mirror of https://gitee.com/anolis/sysom.git
200 lines
7.2 KiB
Python
200 lines
7.2 KiB
Python
import subprocess
|
||
import os
|
||
import json
|
||
import ast
|
||
import logging
|
||
import re
|
||
from typing import List, Optional, Type
|
||
from importlib import import_module
|
||
from base import DiagnosisTask, DiagnosisJob, DiagnosisPreProcessor
|
||
|
||
logger = logging.getLogger("generate_cmds")
|
||
|
||
def run_subprocess(cmd: List[str]) -> dict:
|
||
resp = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||
return {
|
||
"stdout": resp.stdout.decode("utf-8"),
|
||
"stderr": resp.stderr.decode("utf-8"),
|
||
"returncode": resp.returncode,
|
||
}
|
||
|
||
def preprocess_v1_async(service_name: str, params: dict) -> DiagnosisTask:
|
||
""" "Perform diagnosis preprocessing
|
||
|
||
{
|
||
"commands":[
|
||
{
|
||
"instance":"xxx",
|
||
"cmd":"xxx",
|
||
"params":{ => overide initial param
|
||
"region":"target_region"
|
||
}
|
||
}
|
||
]
|
||
}
|
||
"""
|
||
# 2. Invoke preprocessing script(preprocessing script)
|
||
SCRIPTS_DIR = "."
|
||
service_path = os.path.join(SCRIPTS_DIR, service_name)
|
||
if not os.path.exists(service_path):
|
||
raise Exception("Can not find script file, please check service name")
|
||
try:
|
||
resp = run_subprocess([service_path, json.dumps(params)])
|
||
except Exception as exc:
|
||
raise Exception(f"Execute preprocess script error: {str(exc)}") from exc
|
||
|
||
# 3. If the preprocessing script executes with an error
|
||
if resp["returncode"] != 0:
|
||
raise (Exception(f"Execute preprocess script error: {resp['stderr']}"))
|
||
|
||
# 4. If the preprocessing script executes successfully,
|
||
# take out the processing result
|
||
stdout = resp["stdout"]
|
||
resp = ast.literal_eval(stdout)
|
||
resp_scripts = resp.get("commands")
|
||
|
||
# 5. If the preprocessing result not contains 'commands', it's a not expect bug
|
||
if not resp_scripts:
|
||
raise (
|
||
Exception(
|
||
f"Not find commands, please check the preprocess script return"
|
||
)
|
||
)
|
||
diagnosis_task = DiagnosisTask(
|
||
jobs=[DiagnosisJob.from_dict(item) for item in resp_scripts], in_order=True
|
||
)
|
||
|
||
return diagnosis_task
|
||
|
||
def preprocess_v2_async(service_name: str, params: dict) -> Optional[DiagnosisTask]:
|
||
"""Pre-processing V2
|
||
|
||
Args:
|
||
instance (JobModel): JobModel
|
||
|
||
Returns:
|
||
Optional[DiagnosisTask]: Diagnosis task
|
||
"""
|
||
|
||
def _get_pre_processor(service_name: str) -> Type[DiagnosisPreProcessor]:
|
||
"""
|
||
根据要执行的命令,动态引入一个 PreProcessor 的实现用于执行前处理
|
||
"""
|
||
try:
|
||
return import_module(f"service_scripts.{service_name}_pre").PreProcessor
|
||
except Exception as e:
|
||
raise Exception(f"No Pre-processor available => {str(e)}")
|
||
|
||
# 2. Use PreProcessor to check if the version of the tool meets the requirements
|
||
try:
|
||
params.pop("service_name", "")
|
||
pre_processor = _get_pre_processor(service_name)(service_name, **params)
|
||
except Exception as e:
|
||
return None
|
||
|
||
# 3. Use PreProcessor to convert params to diagnosis jobs
|
||
diagnosis_task = pre_processor.get_diagnosis_cmds(params)
|
||
if diagnosis_task is None or len(diagnosis_task.jobs) == 0:
|
||
raise Exception(f"Pre-processor not return any diagnosis job")
|
||
|
||
return diagnosis_task
|
||
|
||
def preprocess_async(service_name: str, params: dict) -> Optional[DiagnosisTask]:
|
||
""" "Perform diagnosis preprocessing
|
||
{
|
||
"commands":[
|
||
{
|
||
"instance":"xxx",
|
||
"cmd":"xxx",
|
||
"params":{ => overide initial param
|
||
"region":"target_region"
|
||
}
|
||
}
|
||
]
|
||
}
|
||
"""
|
||
diagnosis_task: Optional[DiagnosisTask] = None
|
||
try:
|
||
diagnosis_task = preprocess_v2_async(service_name, params)
|
||
if diagnosis_task is None:
|
||
diagnosis_task = preprocess_v1_async(service_name, params)
|
||
except Exception as exc:
|
||
logger.exception(f"Diagnosis preprocess error: {str(exc)}")
|
||
return diagnosis_task
|
||
|
||
|
||
def get_trim_match_obj_value(match_obj, idx: int):
|
||
value = match_obj.group(idx)
|
||
if value is None:
|
||
return ""
|
||
else:
|
||
return value.strip()
|
||
|
||
if __name__ == "__main__":
|
||
diagnosis_cmds = {
|
||
"filecache": {"instance": "127.0.0.1", "value": 1},
|
||
"iofsstat": {"instance": "127.0.0.1"},
|
||
"iohang": {"instance": "127.0.0.1"},
|
||
"iolatency": {"instance": "127.0.0.1"},
|
||
"iosdiag_latency": {"instance": "127.0.0.1"},
|
||
"jitter": {"instance": "127.0.0.1", "time": 1},
|
||
"loadtask": {"instance": "127.0.0.1"},
|
||
"memgraph": {"instance": "127.0.0.1"},
|
||
"oomcheck": {"instance": "127.0.0.1"},
|
||
"ossre": {"instance": "127.0.0.1"},
|
||
"packetdrop": {"instance": "127.0.0.1", "time": 1},
|
||
"pingtrace": {"origin_instance": "127.0.0.1", "pkg_num": 1, "time_gap": 1, "target_instance": "192.168.0.22"},
|
||
"retran": {"instance": "127.0.0.1", "time": 1},
|
||
"schedmoni": {"instance": "127.0.0.1"},
|
||
"taskprofile": {"instance": "127.0.0.1", "timeout": 1}
|
||
}
|
||
for service_name, params in diagnosis_cmds.items():
|
||
diagnosis_task = preprocess_async(service_name, params)
|
||
if diagnosis_task is not None:
|
||
for idx, job in enumerate(diagnosis_task.jobs):
|
||
match_obj = re.match(r'(.*)sysak (-g )*(memgraph|podmem|iofsstat|iosdiag|rtrace|loadtask|oomcheck|ossre_client|pingtrace|schedmoni)(.*?)(>>|&&|>|&)(.*)', job.cmd, re.I)
|
||
# 1 => sysak 之前的命令
|
||
# 2 => -g or None
|
||
# 3 => sysak subcommand (memgraph|podmem|iofsstat|iosdiag|rtrace|loadtask|oomcheck|ossre_client|pingtrace|schedmoni)
|
||
# 4 => sysak subcommand 参数
|
||
# 5 => >> | && | > | &
|
||
# 6 => sysak 之后的命令
|
||
if match_obj:
|
||
sysak_pre = get_trim_match_obj_value(match_obj, 1)
|
||
sysak_g = get_trim_match_obj_value(match_obj, 2)
|
||
sysak_sub_cmd = get_trim_match_obj_value(match_obj, 3)
|
||
sysak_params = get_trim_match_obj_value(match_obj, 4)
|
||
sysak_sep = get_trim_match_obj_value(match_obj, 5)
|
||
sysak_post = get_trim_match_obj_value(match_obj, 6)
|
||
|
||
|
||
pre = sysak_pre
|
||
sysak_cmd = f"sysak{'' if not sysak_g else ' ' + sysak_g} {sysak_sub_cmd} {sysak_params}"
|
||
post = f"{sysak_sep} {sysak_post}"
|
||
|
||
sub_func = ""
|
||
if service_name == "pingtrace":
|
||
sub_func = \
|
||
f"""
|
||
sub_{service_name}_{"client" if "-c" in sysak_params else "server"}() {"{"}
|
||
{pre} sysak $@ {post}
|
||
{"}"}
|
||
"""
|
||
else:
|
||
sub_func = \
|
||
f"""
|
||
sub_{service_name}() {"{"}
|
||
{pre} sysak $@ {post}
|
||
{"}"}
|
||
"""
|
||
print(sub_func)
|
||
# print(' '.join([
|
||
# pre,
|
||
# sysak_cmd,
|
||
# post
|
||
# ]))
|
||
else:
|
||
raise Exception(f"{service_name} genrate failed")
|
||
|
||
|
||
# (.*)sysak (-g )*(memgraph|podmem|iofsstat|iosdiag|rtrace|loadtask|oomcheck|ossre_client|pingtrace|schedmoni)(.*?)(>|>>|&|&&)(.*) |