sysom1/sysom_server/sysom_diagnosis/service_scripts/base.py

247 lines
6.8 KiB
Python

"""
Time 2023/06/19 11:32
Author: mingfeng (SunnyQjm)
Email mfeng@linux.alibaba.com
File base.py
Description:
"""
from abc import ABC, abstractmethod
from typing import List, Optional, Union
from apps.task.models import JobModel
from asgiref.sync import sync_to_async
class FileItem:
def __init__(self, name: str, remote_path: str, local_path: str = "") -> None:
self.name = name
self.remote_path = remote_path
self.local_path = local_path
def to_dict(self):
return {
"name": self.name,
"remote_path": self.remote_path,
"local_path": self.local_path,
}
class DiagnosisJob:
"""Diagnosis Job indicates a diagnosis command to be executed on a specified instance
Args:
instance(str): Instance IP
cmd(str): Diagnosis command
fetch_file_list(List[str]): File list to be fetched after diagnosis
"""
def __init__(
self, instance: str, cmd: str, fetch_file_list: List[FileItem] = []
) -> None:
self.instance = instance
self.cmd = cmd
self.fetch_file_list = fetch_file_list
@classmethod
def from_dict(cls, data: dict) -> "DiagnosisJob":
return DiagnosisJob(instance=data.get("instance", ""), cmd=data.get("cmd", ""))
def to_dict(self):
return {
"instance": self.instance,
"cmd": self.cmd,
"fetch_file_list": [item.to_dict() for item in self.fetch_file_list],
}
class DiagnosisTask:
"""Diagnosis Task
Args:
jobs([DiagnosisJob]): Diagnosis jobs
in_order(bool): Whether to execute all jobs in order
False => Concurrent execution of all Jobs
True => Execute each job in turn
offline_mode(bool): Whether to execute in offline mode
"""
def __init__(
self,
jobs: List[DiagnosisJob] = [],
in_order: bool = False,
offline_mode: bool = False,
offline_results: List[str] = [],
) -> None:
self.jobs = jobs
self.in_order = in_order
self.offline_mode = offline_mode
self.offline_results = offline_results
def to_dict(self):
return {
"in_order": self.in_order,
"jobs": [item.to_dict() for item in self.jobs],
"offline_mode": self.offline_mode,
"offline_results": self.offline_results,
}
class DiagnosisJobResult:
def __init__(
self,
code: int,
job: Optional[DiagnosisJob] = None,
err_msg: str = "",
stdout: str = "",
file_list: List[FileItem] = [],
) -> None:
self.job = job
self.code = code
self.err_msg = err_msg
self.stdout = stdout
self.file_list = file_list
class DiagnosisTaskResult:
def __init__(
self,
code: int,
job_results: List[DiagnosisJobResult],
err_msg: str = "",
in_order: bool = False,
) -> None:
self.code = code
self.err_msg = err_msg
self.job_results = job_results
self.in_order = in_order
class PostProcessResult:
def __init__(self, code: int, result: Union[str, dict], err_msg: str = "") -> None:
self.code = code
self.result = result
self.err_msg = err_msg
def to_dict(self):
return {"code": self.code, "result": self.result, "err_msg": self.err_msg}
@classmethod
def from_dict(cls, data: dict) -> "PostProcessResult":
return PostProcessResult(
code=data.get("code", 1),
result=data.get("result", ""),
err_msg=data.get("err_msg", ""),
)
class DiagnosisProcessorBase(ABC):
def version_check(self, version: str) -> bool:
"""Override this method to check tool version"""
return version >= self.get_min_version_support()
def get_min_version_support(self) -> str:
return "1.3.0-2"
class DiagnosisPreProcessor(DiagnosisProcessorBase):
"""Pre-processor used to perform: <parms> -> <diagnosis cmd>"""
def __init__(self, service_name: str, **kwargs):
self.service_name = service_name
def get_version_cmd(self) -> str:
"""Get tool version
The default is to use the sysak command for diagnosiss, so
the default implement is to query the sysak version number
Returns:
str: tool version
"""
return 'rpm -q sysak --queryformat "%{VERSION}-%{RELEASE}"'
@abstractmethod
def get_diagnosis_cmds(self, params: dict) -> DiagnosisTask:
"""Convert params to diagnosis cmds
params => { "instance": "127.0.0.1", "service_name": "xxx", "time": 15 }
cmds => [
DiagnosisJob(instance="127.0.0.1", command="sysak memleak"),
DiagnosisJob(instance="192.168.0.1", command="sysak memleak")
]
Args:
params (dict): Diagnosis parameters
Returns:
DiagnosisTask: Diagnosis task
"""
class DiagnosisPostProcessor(DiagnosisProcessorBase):
"""Post-processor used to perform: <diagnosis result> -> <Front-end formatted data>
Args:
DiagnosisProcessorBase (_type_): _description_
"""
def __init__(self, service_name: str, **kwargs):
self.service_name = service_name
@abstractmethod
def parse_diagnosis_result(
self, results: List[DiagnosisJobResult]
) -> PostProcessResult:
"""Parse diagnosis results to front-end formmated data
Args:
results (List[DiagnosisResult]): Diagnosis results
"""
class HookProcessResult:
def __init__(self, code: int, data: dict, err_msg: str = "") -> None:
self.code = code
self.data = data
self.err_msg = err_msg
def to_dict(self):
return {"code": self.code, "data": self.data, "err_msg": self.err_msg}
@classmethod
def from_dict(cls, data: dict) -> "HookProcessResult":
return HookProcessResult(
code=data.get("code", 1),
data=data.get("data", ""),
err_msg=data.get("err_msg", ""),
)
class DiagnosisHookProcessor(DiagnosisProcessorBase):
"""Hook-processor used to invoke hook scripts
Args:
DiagnosisProcessorBase (_type_): _description_
"""
def __init__(self, service_name: str, **kwargs):
self.service_name = service_name
async def save_job(self, instance: JobModel):
try:
await sync_to_async(instance.save)()
except Exception as e:
raise e
@abstractmethod
async def invoke_hook(self, instance: JobModel, params: dict) -> HookProcessResult:
"""Invoke hook scripts
Args:
params (dict): Diagnosis parameters
Returns:
HookProcessResult: Hook process result
"""
return HookProcessResult(code=200, data={}, err_msg="")