new channel design

This commit is contained in:
fuqiu.ffq 2023-06-27 00:26:07 +08:00
parent 398ad85946
commit d4ba32fc13
5 changed files with 576 additions and 170 deletions

View File

@ -65,7 +65,7 @@ class DiagnosisTaskExecutor(AsyncEventExecutor):
# 2. Execute and Postprocess # 2. Execute and Postprocess
if res: if res:
job_result = await DiagnosisHelper.execute(instance) job_result = await DiagnosisHelper.execute(instance, res)
await DiagnosisHelper.postprocess(instance, job_result) await DiagnosisHelper.postprocess(instance, job_result)
# 3. TODO: produce task execute result to cec # 3. TODO: produce task execute result to cec

View File

@ -1,15 +1,26 @@
import json import json
import os import os
import subprocess import subprocess
import asyncio
from clogger import logger from clogger import logger
import tempfile import tempfile
import ast import ast
from typing import List from typing import List, Type, Optional
from apps.task.models import JobModel from apps.task.models import JobModel
from importlib import import_module
from django.conf import settings from django.conf import settings
from lib.utils import uuid_8 from lib.utils import uuid_8
from asgiref.sync import sync_to_async from asgiref.sync import sync_to_async
from channel_job.job import default_channel_job_executor, JobResult from channel_job.job import default_channel_job_executor, JobResult
from service_scripts.base import (
DiagnosisJob,
DiagnosisJobResult,
DiagnosisTask,
DiagnosisTaskResult,
DiagnosisPreProcessor,
PostProcessResult,
DiagnosisPostProcessor,
)
class DiagnosisHelper: class DiagnosisHelper:
@ -20,7 +31,7 @@ class DiagnosisHelper:
"""Update JobModel object""" """Update JobModel object"""
try: try:
if isinstance(kwargs.get("result", ""), dict): if isinstance(kwargs.get("result", ""), dict):
kwargs['result'] = json.dumps(kwargs.get('result', "")) kwargs["result"] = json.dumps(kwargs.get("result", ""))
instance.__dict__.update(**kwargs) instance.__dict__.update(**kwargs)
await sync_to_async(instance.save)() await sync_to_async(instance.save)()
except Exception as e: except Exception as e:
@ -40,13 +51,17 @@ class DiagnosisHelper:
# 1. Determines if there is a task with the same parameters # 1. Determines if there is a task with the same parameters
# and a status of Running. # and a status of Running.
if JobModel.objects.filter( if (
status__in=["Ready", "Running"], service_name=service_name, JobModel.objects.filter(
params=json.dumps(params) status__in=["Ready", "Running"],
).first() \ service_name=service_name,
is not None: params=json.dumps(params),
).first()
is not None
):
raise Exception( raise Exception(
f"node:{data.get('instance', '')}, There are tasks in progress, {service_name}") f"node:{data.get('instance', '')}, There are tasks in progress, {service_name}"
)
# 2. Create a task with Ready status # 2. Create a task with Ready status
task_params = { task_params = {
@ -55,7 +70,7 @@ class DiagnosisHelper:
"created_by": user_id, "created_by": user_id,
"params": json.dumps(params), "params": json.dumps(params),
"service_name": service_name, "service_name": service_name,
"status": "Ready" "status": "Ready",
} }
return JobModel.objects.create(**task_params) return JobModel.objects.create(**task_params)
@ -71,34 +86,22 @@ class DiagnosisHelper:
"created_by": user_id, "created_by": user_id,
"params": json.dumps(data), "params": json.dumps(data),
"service_name": service_name, "service_name": service_name,
"status": "Ready" "status": "Ready",
} }
return JobModel.objects.create(**task_params) return JobModel.objects.create(**task_params)
@staticmethod @staticmethod
async def run_subprocess(cmd: List[str]) -> dict: async def run_subprocess(cmd: List[str]) -> dict:
resp = subprocess.run(cmd, stdout=subprocess.PIPE, resp = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stderr=subprocess.PIPE)
return { return {
"stdout": resp.stdout.decode("utf-8"), "stdout": resp.stdout.decode("utf-8"),
"stderr": resp.stderr.decode("utf-8"), "stderr": resp.stderr.decode("utf-8"),
"returncode": resp.returncode "returncode": resp.returncode,
} }
# proc = await asyncio.create_subprocess_shell(
# cmd,
# stdout=asyncio.subprocess.PIPE,
# stderr=asyncio.subprocess.PIPE
# )
# stdout, stderr = await proc.communicate()
# return {
# "stdout": stdout.decode("utf-8"),
# "stderr": stderr.decode("utf-8"),
# "returncode": proc.returncode
# }
@staticmethod @staticmethod
async def preprocess(instance: JobModel) -> bool: async def preprocess_v1(instance: JobModel) -> DiagnosisTask:
""""Perform diagnosis preprocessing """ "Perform diagnosis preprocessing
{ {
"commands":[ "commands":[
@ -110,106 +113,327 @@ class DiagnosisHelper:
} }
} }
] ]
} }
""" """
success = False service_name = instance.service_name
try: params = instance.params
service_name = instance.service_name if isinstance(params, str):
params = instance.params
if isinstance(params, str):
try:
params = json.loads(params)
except Exception as exc:
logger.exception(
f"Task params loads error: {str(exc)}")
# 2. Invoke preprocessing scriptpreprocessing script
SCRIPTS_DIR = settings.SCRIPTS_DIR
service_path = os.path.join(SCRIPTS_DIR, service_name)
if not os.path.exists(service_path):
raise Exception(
"Can not find script file, please check service name")
try: try:
resp = await DiagnosisHelper.run_subprocess([service_path, json.dumps(params)]) params = json.loads(params)
except Exception as exc: except Exception as exc:
raise Exception( logger.exception(f"Task params loads error: {str(exc)}")
f"Execute preprocess script error: {str(exc)}"
) from exc
# 3. If the preprocessing script executes with an error # 2. Invoke preprocessing scriptpreprocessing script
if resp["returncode"] != 0: SCRIPTS_DIR = settings.SCRIPTS_DIR
raise (Exception( service_path = os.path.join(SCRIPTS_DIR, service_name)
f"Execute preprocess script error: {resp['stderr']}" if not os.path.exists(service_path):
)) raise Exception("Can not find script file, please check service name")
try:
# 4. If the preprocessing script executes successfully, resp = await DiagnosisHelper.run_subprocess(
# take out the processing result [service_path, json.dumps(params)]
stdout = resp["stdout"]
resp = ast.literal_eval(stdout)
#
resp_scripts = resp.get("commands")
# 5. If the preprocessing result not contains 'commands', it's a not expect bug
if not resp_scripts:
raise (Exception(
f"Not find commands, please check the preprocess script return"
))
# 6. If the preprocessing script executes successfully, the parameters are compliant
# and the Job instance is updated
await DiagnosisHelper._update_job(
instance, command=json.dumps(resp_scripts), status="Running"
) )
success = True
except Exception as exc: except Exception as exc:
logger.exception( raise Exception(f"Execute preprocess script error: {str(exc)}") from exc
f"Diagnosis preprocess error: {str(exc)}")
await DiagnosisHelper._update_job( # 3. If the preprocessing script executes with an error
instance, result="Diagnosis preprocess error", status="Fail", if resp["returncode"] != 0:
code=1, err_msg=f"Diagnosis preprocess error: {str(exc)}") raise (Exception(f"Execute preprocess script error: {resp['stderr']}"))
return success
# 4. If the preprocessing script executes successfully,
# take out the processing result
stdout = resp["stdout"]
resp = ast.literal_eval(stdout)
resp_scripts = resp.get("commands")
# 5. If the preprocessing result not contains 'commands', it's a not expect bug
if not resp_scripts:
raise (
Exception(
f"Not find commands, please check the preprocess script return"
)
)
diagnosis_task = DiagnosisTask(
jobs=[DiagnosisJob.from_dict(item) for item in resp_scripts], in_order=True
)
return diagnosis_task
@staticmethod @staticmethod
async def execute(instance: JobModel) -> JobResult: async def preprocess_v2(instance: JobModel) -> Optional[DiagnosisTask]:
"""Execute diagnosis task""" """Pre-processing V2
job_result = JobResult(code=1, err_msg="Diagnosis execute task error")
Args:
instance (JobModel): JobModel
Returns:
Optional[DiagnosisTask]: Diagnosis task
"""
def _get_pre_processor(service_name: str) -> Type[DiagnosisPreProcessor]:
"""
根据要执行的命令动态引入一个 PreProcessor 的实现用于执行前处理
"""
try:
return import_module(f"service_scripts.{service_name}_pre").PreProcessor
except Exception as e:
raise Exception(f"No Pre-processor available => {str(e)}")
# 1. Get params
service_name = instance.service_name
params = {}
if isinstance(instance.params, str):
try:
params = json.loads(instance.params)
except Exception as exc:
raise Exception(f"Task params loads error: {str(exc)}")
instances = params.pop("instances", [])
if "instance" in params and len(instances) == 0:
instances.append(params.pop("instance", ""))
if len(instances) == 0:
raise Exception("Missing params, instances or instance")
# 2. Use PreProcessor to check if the version of the tool meets the requirements
try:
pre_processor = _get_pre_processor(service_name)(service_name, **params)
except Exception as e:
return None
for instance in instances:
job_params = {
"channel_type": params.pop("channel", "ssh"),
"channel_opt": "cmd",
"params": {
**params,
"instance": instance,
"command": pre_processor.get_version_cmd(),
},
"timeout": 1000 * 60, # 1 minutes
}
job_result = await default_channel_job_executor.dispatch_job(
**job_params
).execute_async()
if job_result.code != 0:
raise Exception(
f"Check tool version for '{instance}' failed: {job_result.err_msg}"
)
if not pre_processor.version_check(job_result.result.strip()):
raise Exception(
f"Tool version less than {pre_processor.get_min_version_support()}"
)
# 3. Use PreProcessor to convert params to diagnosis jobs
diagnosis_task = pre_processor.get_diagnosis_cmds(instances, params)
if diagnosis_task is None or len(diagnosis_task.jobs) == 0:
raise Exception(f"Pre-processor not return any diagnosis job")
return diagnosis_task
@staticmethod
async def preprocess(instance: JobModel) -> Optional[DiagnosisTask]:
""" "Perform diagnosis preprocessing
{
"commands":[
{
"instance":"xxx",
"cmd":"xxx",
"params":{ => overide initial param
"region":"target_region"
}
}
]
}
"""
diagnosis_task: Optional[DiagnosisTask] = None
try:
diagnosis_task = await DiagnosisHelper.preprocess_v2(instance)
if diagnosis_task is None:
diagnosis_task = await DiagnosisHelper.preprocess_v1(instance)
# If the pre-processor executes successfully, the parameters are compliant
# and the Job instance is updated
await DiagnosisHelper._update_job(
instance, command=json.dumps(diagnosis_task.to_dict()), status="Running"
)
except Exception as exc:
logger.exception(f"Diagnosis preprocess error: {str(exc)}")
await DiagnosisHelper._update_job(
instance,
result="Diagnosis preprocess error",
status="Fail",
code=1,
err_msg=f"Diagnosis preprocess error: {str(exc)}",
)
return diagnosis_task
@staticmethod
async def execute(
instance: JobModel, diagnosis_task: DiagnosisTask
) -> DiagnosisTaskResult:
"""Execute diagnosis task"""
diagnosis_task_result = DiagnosisTaskResult(
code=1,
job_results=[],
err_msg="Diagnosis execute task error",
in_order=diagnosis_task.in_order,
)
try: try:
resp_scripts = json.loads(instance.command)
params = instance.params params = instance.params
if isinstance(params, str): if isinstance(params, str):
try: try:
params = json.loads(params) params = json.loads(params)
except Exception as exc: except Exception as exc:
raise Exception( raise Exception(f"Task params loads error: {str(exc)}") from exc
f"Task params loads error: {str(exc)}") from exc
for idx, script in enumerate(resp_scripts): if diagnosis_task.in_order:
job_params = { # Execute diagnosis jobs in order
"channel_type": params.pop("channel", "ssh"), for job in diagnosis_task.jobs:
"channel_opt": "cmd", job_params = {
"params": { "channel_type": params.pop("channel", "ssh"),
**params, "channel_opt": "cmd",
"instance": script.get("instance", "Unknown"), "params": {
"command": script.get("cmd", "Unknown"), **params,
}, "instance": job.instance,
"echo": { "command": job.cmd,
"task_id": instance.task_id },
}, "echo": {"task_id": instance.task_id},
"timeout": 1000 * 60 * 10, # 10 minutes "timeout": 1000 * 60 * 10, # 10 minutes
} }
job_result = await default_channel_job_executor.dispatch_job(**job_params).execute_async() job_result = await default_channel_job_executor.dispatch_job(
if job_result.code != 0: **job_params
raise Exception( ).execute_async()
f"Task execute failed: {job_result.err_msg}")
diagnosis_task_result.job_results.append(
DiagnosisJobResult(
code=job_result.code,
err_msg=job_result.err_msg,
job=job,
stdout=job_result.result,
)
)
if job_result.code != 0:
raise Exception(f"Task execute failed: {job_result.err_msg}")
else:
# Execute diagnosis jobs in parallel
tasks = []
for job in diagnosis_task.jobs:
job_params = {
"channel_type": params.pop("channel", "ssh"),
"channel_opt": "cmd",
"params": {
**params,
"instance": job.instance,
"command": job.cmd,
},
"echo": {"task_id": instance.task_id},
"timeout": 1000 * 60 * 10, # 10 minutes
}
tasks.append(
default_channel_job_executor.dispatch_job(
**job_params
).execute_async()
)
job_results = asyncio.gather(*tasks)
diagnosis_task_result.job_results = [
DiagnosisJobResult(
code=job_result.code,
err_msg=job_result.err_msg,
job=diagnosis_task.jobs[idx],
stdout=job_result.result,
)
for idx, job_result in enumerate(job_results)
]
for job_result in job_results:
if job_result.code != 0:
raise Exception(f"Task execut failed: {job_result.err_msg}")
diagnosis_task_result.code = 0
diagnosis_task_result.err_msg = ""
except Exception as exc: except Exception as exc:
job_result.err_msg = f"Diagnosis execute task error: {str(exc)}" diagnosis_task_result.err_msg = f"Diagnosis execute task error: {str(exc)}"
logger.exception(job_result.err_msg) logger.exception(diagnosis_task_result.err_msg)
await DiagnosisHelper._update_job( await DiagnosisHelper._update_job(
instance, result="Diagnosis execute task error", status="Fail", instance,
code=1, err_msg=job_result.err_msg) result="Diagnosis execute task error",
return job_result status="Fail",
code=1,
err_msg=diagnosis_task_result.err_msg,
)
return diagnosis_task_result
@staticmethod @staticmethod
async def postprocess(instance: JobModel, job_result: JobResult): async def postprocess_v1(
instance: JobModel, diagnosis_task_result: DiagnosisTaskResult
) -> PostProcessResult:
service_name = instance.service_name
# 执行后处理脚本,将结果整理成前端可识别的规范结构
SCRIPTS_DIR = settings.SCRIPTS_DIR
service_post_name = service_name + "_post"
service_post_path = os.path.join(SCRIPTS_DIR, service_post_name)
if not os.path.exists(service_post_path):
raise Exception(
f"No matching post-processing script found: {service_post_path}"
)
# 创建一个临时文件,用于暂存中间结果
with tempfile.NamedTemporaryFile(mode="w") as tmp_file:
try:
# 将要传递的中间结果写入到临时文件当中
tmp_file.write(diagnosis_task_result.job_results[0].stdout)
tmp_file.flush()
resp = await DiagnosisHelper.run_subprocess(
[service_post_path, tmp_file.name, instance.task_id]
)
except Exception as exc:
raise Exception(f"Execute postprocess script error: {str(exc)}")
if resp["returncode"] != 0:
raise (
Exception(
f"Execute postprocess script error: {str(resp['stderr'])}"
)
)
result = json.loads(resp["stdout"].strip())
return PostProcessResult.from_dict(result)
@staticmethod
async def postprocess_v2(
instance: JobModel, diagnosis_task_result: DiagnosisTaskResult
) -> Optional[PostProcessResult]:
"""Post-processing V2
Args:
instance (JobModel): JobModel
diagnosis_task_result(DiagnosisTaskResult): Diagnosis task result, contain all job results
Returns:
Optional[DiagnosisTask]: Diagnosis task
"""
def _get_post_processor(service_name: str) -> Type[DiagnosisPostProcessor]:
"""
根据要执行的命令动态引入一个 PreProcessor 的实现用于执行前处理
"""
try:
return import_module(
f"service_scripts.{service_name}_post"
).PostProcessor
except Exception as e:
raise Exception(f"No Pre-processor available => {str(e)}")
service_name = instance.service_name
# Use PostProcessor to convert diagnosis result to fron-end format data
try:
post_processor = _get_post_processor(service_name)(service_name)
except Exception as e:
return None
return post_processor.parse_diagnosis_result(diagnosis_task_result.job_results)
@staticmethod
async def postprocess(
instance: JobModel, diagnosis_task_result: DiagnosisTaskResult
):
"""Perform diagnosis postprocessing """Perform diagnosis postprocessing
JobResult -> { JobResult -> {
"code": 0, => 0 表示成功1表示失败 "code": 0, => 0 表示成功1表示失败
@ -227,66 +451,47 @@ class DiagnosisHelper:
"result": {} => 后处理脚本处理的结果应该是一个 JSON Object "result": {} => 后处理脚本处理的结果应该是一个 JSON Object
} }
""" """
try:
code = job_result.code
err_msg = job_result.err_msg
if code != 0:
await DiagnosisHelper._update_job(
instance, status="Fail", code=code,
result=job_result.result, err_msg=err_msg)
return
# 如果任务执行成功,则执行后处理脚本
params = instance.params
if isinstance(params, str):
try:
params = json.loads(params)
except Exception as exc:
raise Exception(
f"Task params loads error: {str(exc)}") from exc
service_name = instance.service_name code = diagnosis_task_result.code
# 执行后处理脚本,将结果整理成前端可识别的规范结构 err_msg = diagnosis_task_result.err_msg
SCRIPTS_DIR = settings.SCRIPTS_DIR if code != 0:
service_post_name = service_name + '_post' # Diagnosis task execute failed
service_post_path = os.path.join(
SCRIPTS_DIR, service_post_name)
if not os.path.exists(service_post_path):
raise Exception(
f"No matching post-processing script found: {service_post_path}")
# 创建一个临时文件,用于暂存中间结果
with tempfile.NamedTemporaryFile(mode="w") as tmp_file:
try:
# 将要传递的中间结果写入到临时文件当中
tmp_file.write(job_result.result)
tmp_file.flush()
resp = await DiagnosisHelper.run_subprocess([service_post_path, tmp_file.name, instance.task_id])
except Exception as exc:
raise Exception(
f"Execute postprocess script error: {str(exc)}"
)
if resp["returncode"] != 0:
raise (Exception(
f"Execute postprocess script error: {str(resp['stderr'])}"
))
result = json.loads(resp["stdout"].strip())
code = result.get("code", 1)
if code != 0:
err_msg = result.get("err_msg", "Postprocess error")
# 后处理脚本认为诊断出错
await DiagnosisHelper._update_job(
instance, err_msg=err_msg, status="Fail")
else:
rel_result = result.get("result", {})
# 后处理脚本执行结束,更新任务状态
await DiagnosisHelper._update_job(
instance, result=rel_result, status="Success"
)
pass
except Exception as exc:
logger.exception(
f"Diagnosis postprocess error: {str(exc)}")
await DiagnosisHelper._update_job( await DiagnosisHelper._update_job(
instance, result="Diagnosis postprocess error", status="Fail", instance,
code=1, err_msg=f"Diagnosis postprocess error: {str(exc)}") status="Fail",
code=code,
result=diagnosis_task_result.job_results[0].stdout,
err_msg=err_msg,
)
return
post_process_result: Optional[PostProcessResult] = None
try:
# 1. 优先尝试使用 V2 方案
post_process_result = await DiagnosisHelper.postprocess_v2(
instance, diagnosis_task_result
)
if post_process_result is None:
# 2. 没有为目标诊断匹配到 v2 方案,回退尝试 v1 方案
post_process_result = await DiagnosisHelper.postprocess_v1(
instance, diagnosis_task_result
)
if post_process_result.code != 0:
# 后处理脚本认为诊断出错
await DiagnosisHelper._update_job(
instance, err_msg=post_process_result.err_msg, status="Fail"
)
else:
# 后处理脚本执行成功,更新任务状态
await DiagnosisHelper._update_job(
instance, result=post_process_result.result, status="Success"
)
except Exception as exc:
logger.exception(f"Diagnosis postprocess error: {str(exc)}")
await DiagnosisHelper._update_job(
instance,
result="Diagnosis postprocess error",
status="Fail",
code=1,
err_msg=f"Diagnosis postprocess error: {str(exc)}",
)

View File

@ -0,0 +1,149 @@
"""
Time 2023/06/19 11:32
Author: mingfeng (SunnyQjm)
Email mfeng@linux.alibaba.com
File base.py
Description:
"""
from abc import ABC, abstractmethod
from typing import List, Dict
class DiagnosisJob:
def __init__(self, instance: str, cmd: str) -> None:
self.instance = instance
self.cmd = cmd
@classmethod
def from_dict(cls, data: dict) -> "DiagnosisJob":
return DiagnosisJob(instance=data.get("instance", ""), cmd=data.get("cmd", ""))
def to_dict(self):
return {"instance": self.instance, "cmd": self.cmd}
class DiagnosisTask:
"""Diagnosis Task
Args:
jobs([DiagnosisJob]): Diagnosis jobs
in_order(bool): Whether to execute all jobs in order
False => Concurrent execution of all Jobs
True => Execute each job in turn
"""
def __init__(self, jobs: List[DiagnosisJob] = [], in_order: bool = False) -> None:
self.jobs = jobs
self.in_order = in_order
def to_dict(self):
return {
"in_order": self.in_order,
"jobs": [item.to_dict() for item in self.jobs],
}
class DiagnosisJobResult:
def __init__(
self, code: int, job: DiagnosisJob, err_msg: str = "", stdout: str = ""
) -> None:
self.job = job
self.code = code
self.err_msg = err_msg
self.stdout = stdout
class DiagnosisTaskResult:
def __init__(
self,
code: int,
job_results: List[DiagnosisJobResult],
err_msg: str = "",
in_order: bool = False,
) -> None:
self.code = code
self.err_msg = err_msg
self.job_results = job_results
self.in_order = in_order
class PostProcessResult:
def __init__(self, code: int, result: str, err_msg: str = "") -> None:
self.code = code
self.result = result
self.err_msg = err_msg
def to_dict(self):
return {"code": self.code, "result": self.result, "err_msg": self.err_msg}
@classmethod
def from_dict(cls, data: dict) -> 'PostProcessResult':
return PostProcessResult(
code=data.get("code", 1),
result=data.get("result", ""),
err_msg=data.get("err_msg", ""),
)
class DiagnosisProcessorBase(ABC):
def version_check(self, version: str) -> bool:
"""Override this method to check tool version"""
return version >= self.get_min_version_support()
def get_min_version_support(self) -> str:
return "1.3.0-2"
class DiagnosisPreProcessor(DiagnosisProcessorBase):
"""Pre-processor used to perform: <parms> -> <diagnosis cmd>"""
def __init__(self, service_name: str, **kwargs):
self.service_name = service_name
def get_version_cmd(self) -> str:
"""Get tool version
The default is to use the sysak command for diagnostics, so
the default implement is to query the sysak version number
Returns:
str: tool version
"""
return 'rpm -q sysak --queryformat "%{VERSION}-%{RELEASE}"'
@abstractmethod
def get_diagnosis_cmds(self, instances: List[str], params: dict) -> DiagnosisTask:
"""Convert params to diagnosis cmds
params => { "service_name": "xxx", "time": 15 }
cmds => [
DiagnosisJob(instance="127.0.0.1", command="sysak memleak"),
DiagnosisJob(instance="192.168.0.1", command="sysak memleak")
]
Args:
params (dict): Diagnosis parameters
Returns:
DiagnosisTask: Diagnosis task
"""
class DiagnosisPostProcessor(DiagnosisProcessorBase):
"""Post-processor used to perform: <diagnosis result> -> <Front-end formatted data>
Args:
DiagnosisProcessorBase (_type_): _description_
"""
def __init__(self, service_name: str, **kwargs):
self.service_name = service_name
@abstractmethod
def parse_diagnosis_result(self, results: List[DiagnosisJobResult]) -> PostProcessResult:
"""Parse diagnosis results to front-end formmated data
Args:
results (List[DiagnosisResult]): Diagnosis results
"""

View File

@ -0,0 +1,22 @@
"""
Time 2023/06/19 17:32
Author: mingfeng (SunnyQjm)
Email mfeng@linux.alibaba.com
File command.py
Description:
"""
from typing import List
from .base import DiagnosisJobResult, DiagnosisPostProcessor
class CommandPostProcessor(DiagnosisPostProcessor):
def parse_diagnosis_result(self, results: List[DiagnosisJobResult]) -> dict:
postprocess_result = {
"code": 0,
"err_msg": "",
"result": {},
}
postprocess_result["result"] = {
"CommandResult": {"data": [{"key": "", "value": results[0].stdout}]}
}
return postprocess_result

View File

@ -0,0 +1,30 @@
"""
Time 2023/06/19 17:32
Author: mingfeng (SunnyQjm)
Email mfeng@linux.alibaba.com
File command.py
Description:
"""
from typing import List
from sysom_server.sysom_diagnosis.service_scripts.base import DiagnosisTask
from .base import DiagnosisJob, DiagnosisPreProcessor
class PreProcessor(DiagnosisPreProcessor):
"""Command diagnosis
Just invoke command in target instance and get stdout result
Args:
DiagnosisPreProcessor (_type_): _description_
"""
def get_diagnosis_cmds(self, instances: List[str], params: dict) -> DiagnosisTask:
command = params.get("command", "")
return DiagnosisTask(
jobs=[
DiagnosisJob(instance=instance, cmd=command) for instance in instances
],
in_order=False,
)