mirror of https://gitee.com/anolis/sysom.git
247 lines
6.8 KiB
Python
247 lines
6.8 KiB
Python
"""
|
|
Time 2023/06/19 11:32
|
|
Author: mingfeng (SunnyQjm)
|
|
Email mfeng@linux.alibaba.com
|
|
File base.py
|
|
Description:
|
|
"""
|
|
from abc import ABC, abstractmethod
|
|
from typing import List, Optional, Union
|
|
from apps.task.models import JobModel
|
|
from asgiref.sync import sync_to_async
|
|
|
|
|
|
class FileItem:
|
|
def __init__(self, name: str, remote_path: str, local_path: str = "") -> None:
|
|
self.name = name
|
|
self.remote_path = remote_path
|
|
self.local_path = local_path
|
|
|
|
def to_dict(self):
|
|
return {
|
|
"name": self.name,
|
|
"remote_path": self.remote_path,
|
|
"local_path": self.local_path,
|
|
}
|
|
|
|
|
|
class DiagnosisJob:
|
|
"""Diagnosis Job indicates a diagnosis command to be executed on a specified instance
|
|
|
|
Args:
|
|
instance(str): Instance IP
|
|
cmd(str): Diagnosis command
|
|
fetch_file_list(List[str]): File list to be fetched after diagnosis
|
|
"""
|
|
|
|
def __init__(
|
|
self, instance: str, cmd: str, fetch_file_list: List[FileItem] = []
|
|
) -> None:
|
|
self.instance = instance
|
|
self.cmd = cmd
|
|
self.fetch_file_list = fetch_file_list
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: dict) -> "DiagnosisJob":
|
|
return DiagnosisJob(instance=data.get("instance", ""), cmd=data.get("cmd", ""))
|
|
|
|
def to_dict(self):
|
|
return {
|
|
"instance": self.instance,
|
|
"cmd": self.cmd,
|
|
"fetch_file_list": [item.to_dict() for item in self.fetch_file_list],
|
|
}
|
|
|
|
|
|
class DiagnosisTask:
|
|
"""Diagnosis Task
|
|
|
|
Args:
|
|
jobs([DiagnosisJob]): Diagnosis jobs
|
|
in_order(bool): Whether to execute all jobs in order
|
|
False => Concurrent execution of all Jobs
|
|
True => Execute each job in turn
|
|
offline_mode(bool): Whether to execute in offline mode
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
jobs: List[DiagnosisJob] = [],
|
|
in_order: bool = False,
|
|
offline_mode: bool = False,
|
|
offline_results: List[str] = [],
|
|
) -> None:
|
|
self.jobs = jobs
|
|
self.in_order = in_order
|
|
self.offline_mode = offline_mode
|
|
self.offline_results = offline_results
|
|
|
|
def to_dict(self):
|
|
return {
|
|
"in_order": self.in_order,
|
|
"jobs": [item.to_dict() for item in self.jobs],
|
|
"offline_mode": self.offline_mode,
|
|
"offline_results": self.offline_results,
|
|
}
|
|
|
|
|
|
class DiagnosisJobResult:
|
|
def __init__(
|
|
self,
|
|
code: int,
|
|
job: Optional[DiagnosisJob] = None,
|
|
err_msg: str = "",
|
|
stdout: str = "",
|
|
file_list: List[FileItem] = [],
|
|
) -> None:
|
|
self.job = job
|
|
self.code = code
|
|
self.err_msg = err_msg
|
|
self.stdout = stdout
|
|
self.file_list = file_list
|
|
|
|
|
|
class DiagnosisTaskResult:
|
|
def __init__(
|
|
self,
|
|
code: int,
|
|
job_results: List[DiagnosisJobResult],
|
|
err_msg: str = "",
|
|
in_order: bool = False,
|
|
) -> None:
|
|
self.code = code
|
|
self.err_msg = err_msg
|
|
self.job_results = job_results
|
|
self.in_order = in_order
|
|
|
|
|
|
class PostProcessResult:
|
|
def __init__(self, code: int, result: Union[str, dict], err_msg: str = "") -> None:
|
|
self.code = code
|
|
self.result = result
|
|
self.err_msg = err_msg
|
|
|
|
def to_dict(self):
|
|
return {"code": self.code, "result": self.result, "err_msg": self.err_msg}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: dict) -> "PostProcessResult":
|
|
return PostProcessResult(
|
|
code=data.get("code", 1),
|
|
result=data.get("result", ""),
|
|
err_msg=data.get("err_msg", ""),
|
|
)
|
|
|
|
|
|
class DiagnosisProcessorBase(ABC):
|
|
def version_check(self, version: str) -> bool:
|
|
"""Override this method to check tool version"""
|
|
return version >= self.get_min_version_support()
|
|
|
|
def get_min_version_support(self) -> str:
|
|
return "1.3.0-2"
|
|
|
|
|
|
class DiagnosisPreProcessor(DiagnosisProcessorBase):
|
|
"""Pre-processor used to perform: <parms> -> <diagnosis cmd>"""
|
|
|
|
def __init__(self, service_name: str, **kwargs):
|
|
self.service_name = service_name
|
|
|
|
def get_version_cmd(self) -> str:
|
|
"""Get tool version
|
|
|
|
The default is to use the sysak command for diagnosiss, so
|
|
the default implement is to query the sysak version number
|
|
|
|
Returns:
|
|
str: tool version
|
|
"""
|
|
return 'rpm -q sysak --queryformat "%{VERSION}-%{RELEASE}"'
|
|
|
|
@abstractmethod
|
|
def get_diagnosis_cmds(self, params: dict) -> DiagnosisTask:
|
|
"""Convert params to diagnosis cmds
|
|
|
|
params => { "instance": "127.0.0.1", "service_name": "xxx", "time": 15 }
|
|
|
|
cmds => [
|
|
DiagnosisJob(instance="127.0.0.1", command="sysak memleak"),
|
|
DiagnosisJob(instance="192.168.0.1", command="sysak memleak")
|
|
]
|
|
|
|
Args:
|
|
params (dict): Diagnosis parameters
|
|
|
|
Returns:
|
|
DiagnosisTask: Diagnosis task
|
|
"""
|
|
|
|
|
|
class DiagnosisPostProcessor(DiagnosisProcessorBase):
|
|
"""Post-processor used to perform: <diagnosis result> -> <Front-end formatted data>
|
|
|
|
Args:
|
|
DiagnosisProcessorBase (_type_): _description_
|
|
"""
|
|
|
|
def __init__(self, service_name: str, **kwargs):
|
|
self.service_name = service_name
|
|
|
|
@abstractmethod
|
|
def parse_diagnosis_result(
|
|
self, results: List[DiagnosisJobResult]
|
|
) -> PostProcessResult:
|
|
"""Parse diagnosis results to front-end formmated data
|
|
|
|
Args:
|
|
results (List[DiagnosisResult]): Diagnosis results
|
|
"""
|
|
|
|
|
|
class HookProcessResult:
|
|
def __init__(self, code: int, data: dict, err_msg: str = "") -> None:
|
|
self.code = code
|
|
self.data = data
|
|
self.err_msg = err_msg
|
|
|
|
def to_dict(self):
|
|
return {"code": self.code, "data": self.data, "err_msg": self.err_msg}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: dict) -> "HookProcessResult":
|
|
return HookProcessResult(
|
|
code=data.get("code", 1),
|
|
data=data.get("data", ""),
|
|
err_msg=data.get("err_msg", ""),
|
|
)
|
|
|
|
|
|
class DiagnosisHookProcessor(DiagnosisProcessorBase):
|
|
"""Hook-processor used to invoke hook scripts
|
|
|
|
Args:
|
|
DiagnosisProcessorBase (_type_): _description_
|
|
"""
|
|
|
|
def __init__(self, service_name: str, **kwargs):
|
|
self.service_name = service_name
|
|
|
|
async def save_job(self, instance: JobModel):
|
|
try:
|
|
await sync_to_async(instance.save)()
|
|
except Exception as e:
|
|
raise e
|
|
|
|
@abstractmethod
|
|
async def invoke_hook(self, instance: JobModel, params: dict) -> HookProcessResult:
|
|
"""Invoke hook scripts
|
|
|
|
Args:
|
|
params (dict): Diagnosis parameters
|
|
|
|
Returns:
|
|
HookProcessResult: Hook process result
|
|
"""
|
|
return HookProcessResult(code=200, data={}, err_msg="")
|