sysom1/environment/1_sdk/channel_job/job.py

414 lines
14 KiB
Python

# -*- coding: utf-8 -*- #
"""
Time 2022/11/7 14:16
Author: mingfeng (SunnyQjm)
Email mfeng@linux.alibaba.com
File job.py
Description:
"""
from typing import Optional, Callable, List, Dict
import threading
import anyio
import requests
import json
import time
from cec_base.consumer import Consumer, dispatch_consumer
from cec_base.producer import Producer, dispatch_producer
from cec_base.admin import Admin, dispatch_admin
from cec_base.event import Event
from cec_base.url import CecUrl
from cec_base.utils import StoppableThread
from cec_base.exceptions import TopicAlreadyExistsException
from gclient_base import GClient, dispatch_g_client
from .model import JobEntry, JobResult
from .exception import ChannelJobException
from .exception import ChannelJobException
from .model import JobEntry, JobResult
class ChannelJob:
"""A channel job class
This class used to describe a channel job, support for obtaining the execution
results of channel jobs by synchronous or asynchronous means
Args:
entry(JobEntry): Request parameters for channel job
"""
def __init__(self, entry: JobEntry) -> None:
self.job_entry: JobEntry = entry
self._chunk_callback: Optional[Callable[[JobResult], None]] = None
self._finish_callback: Optional[Callable[[JobResult], None]] = None
self._producer: Optional[Producer] = None
self._target_topic = ""
# A list to collect each result chunk
# if _then_callback is None, it's not necessary to collect result chunk
self._results: List[JobResult] = []
self._final_result: Optional[JobResult] = None
self._finish_conn: threading.Condition = threading.Condition()
# # timeout check scheduler
# self._timeout_task_scheduler = Scheduler()
def execute(
self, chunk_callback: Callable[[JobResult], None] = None,
**kwargs
) -> JobResult:
"""Get results by synchronization
Args:
chunk_callback(Callable[[JobResult], None]): A callback function that is
called every time a return result is received and can be used to fetch
streaming data.
Raises:
ChannlJobException: Unexpected errors occur during job execution
"""
# 1. Bind optional chunk callback
self._chunk_callback = chunk_callback
self.job_entry.timeout = kwargs.get("timeout", self.job_entry.timeout)
timeout = self.job_entry.timeout
with self._finish_conn:
# 2. Delivery job to CEC
self._delivery_to_cec(self.job_entry)
# 3. Wait job result by synchronization
if self._finish_conn.wait(None if timeout is None else timeout / 1000):
return self._final_result
else:
self._final_result = JobResult(
code=1,
err_msg="Execute job timeout",
result="",
echo=self.job_entry.echo
)
return self._final_result
async def execute_async(
self, chunk_callback: Callable[[JobResult], None] = None
) -> JobResult:
"""Execute channel by asynchronous
Args:
chunk_callback(Callable[[JobResult], None]): A callback function that is
"""
return await anyio.to_thread.run_sync(
self.execute, chunk_callback
)
def execute_async_with_callback(
self,
finish_callback: Callable[[JobResult], None],
chunk_callback: Callable[[JobResult], None] = None,
):
"""Get results by asynchronous with callback
"""
self._finish_callback = finish_callback
self._chunk_callback = chunk_callback
self._delivery_to_cec(self.job_entry)
#################################################################################
# Inner funtions
#################################################################################
def _bind_producer(self, producer: Producer, target_topic: str):
"""Bind cec producer used to delivery event to CEC"""
self._producer = producer
self._target_topic = target_topic
def _delivery_to_cec(self, entry: JobEntry):
"""Delivery channel job to cec
Generate event based on ChannelJob and delivery it to CEC
Args:
entry(JobEntry): Channel job entry
"""
if self._chunk_callback is not None:
entry.return_as_stream = True
if self._producer is None or self._target_topic == "":
raise ChannelJobException(
"ChannelJob not bind producer or target_topic")
try:
# 3. Delivery event to cec
self._producer.produce(
self._target_topic, entry.to_channel_vlaue())
self._producer.flush()
except Exception as exc:
raise ChannelJobException(exc)
def _update_chunk(self, result: JobResult):
"""Invoke each chunk is received"""
def invoke_chunk_callback(result: JobResult):
if self._chunk_callback is not None:
self._chunk_callback(result)
if result.is_finished:
if len(self._results) <= 0:
self._results.append(result)
invoke_chunk_callback(result)
self._final_result = result
with self._finish_conn:
self._finish_conn.notify()
if self._finish_callback is not None:
self._finish_callback(result)
else:
invoke_chunk_callback(result)
self._results.append(result)
class ChannelFileJob:
_OPT_TABLE = {
"send-file": "/api/v1/channel/file/send",
"get-file": "/api/v1/channel/file/get"
}
def __init__(
self, g_client: GClient,
opt: str = "send-file",
local_path: str = "",
remote_path: str = "",
instances: List[str] = [],
instance: str = "",
) -> None:
self.g_client = g_client
self.opt: str = opt
self.local_path: str = local_path
self.remote_path: str = remote_path
self.instances: str = instances
self.instance = instance
def _send_file(self) -> JobResult:
payload = {
"target_path": self.remote_path,
"target_instances": ";".join(self.instances)
}
files = [
('file', (self.local_path, open(
self.local_path, 'rb'), 'application/octet-stream'))
]
headers = {
'User-Agent': 'sysom_channel_job/1.0.0'
}
response = self.g_client.post(
self._OPT_TABLE[self.opt], headers=headers, data=payload, files=files)
if response.status_code != 200:
return JobResult(
code=1,
err_msg=f"{response.reason}: {response.text}",
result=response.text
)
channel_result = response.json()
return JobResult(
code=channel_result.get("code", 1),
err_msg=channel_result.get("err_msg", "Unknown error"),
result=json.dumps(channel_result.get("result", []))
)
def _get_file(self) -> JobResult:
payload = {
"target_instance": self.instance,
"remote_path": self.remote_path,
}
headers = {
'User-Agent': 'sysom_channel_job/1.0.0',
'Content-Type': 'application/json'
}
with self.g_client.get(self._OPT_TABLE[self.opt], headers=headers, data=json.dumps(payload)) as r:
with open(self.local_path, "wb") as f:
for chunk in r.iter_content(chunk_size=1024*1024):
if chunk:
f.write(chunk)
if r.status_code == 200:
return JobResult(
code=0,
result=""
)
else:
return JobResult(
code=1,
err_msg=r.text,
result=""
)
def execute(self) -> JobResult:
if self.opt == "send-file":
return self._send_file()
elif self.opt == "get-file":
return self._get_file()
async def execute_async(self) -> JobResult:
return await anyio.to_thread.run_sync(self.execute)
class ChannelJobExecutorConfig:
def __init__(self, host: str = "") -> None:
pass
class ChannelJobExecutor:
def __init__(self) -> None:
self._consumer: Optional[Consumer] = None
self._producer: Optional[Producer] = None
self._admin: Optional[Admin] = None
self._target_topic = ""
self._listen_topic = ""
self._g_client: Optional[GClient] = None
self._auto_recover = True
self._job_mapper: Dict[str, ChannelJob] = {
}
# _stop_event used to
self._job_executor_thread: Optional[StoppableThread] = None
def initial_from_remote_server(self, channel_server_url: str):
"""Auto initial ChannelJobExecutor
1. First pull the configuration from the remote server;
2. Automatic initialization based on the configuration returned by the server
Args:
channel_server_url(str): Channel server http url
"""
# 1. First pull config from server
try:
response = requests.get(channel_server_url)
if response.status_code == 200:
configs = json.loads(response.json()["data"])
self.init_config(**configs)
pass
else:
raise ChannelJobException(
f"Request config from {channel_server_url} failed: {response.status_code}")
except Exception as e:
raise ChannelJobException(e)
def init_config(
self, url: str, g_client: GClient = dispatch_g_client("http://127.0.0.1:7003"),
):
cec_url = CecUrl.parse(url)
self._g_client = g_client
# 1. Check require params
self._target_topic = cec_url.params.pop(
"channel_job_target_topic", None
)
self._listen_topic = cec_url.params.pop(
"channel_job_listen_topic", None
)
listen_consumer_group = cec_url.params.pop(
"channel_job_consumer_group", None
)
self._auto_recover = cec_url.params.pop(
"channel_job_auto_recover", True
)
if None in [self._target_topic, self._listen_topic, listen_consumer_group]:
raise (ChannelJobException(
f"CecUrl missing parameters: {cec_url}"))
# 2. Create Consumer, Producer, Admin instance
self._consumer: Consumer = dispatch_consumer(
str(cec_url), self._listen_topic, Consumer.generate_consumer_id(),
start_from_now=True
)
self._producer: Producer = dispatch_producer(str(cec_url))
self._admin: Admin = dispatch_admin(str(cec_url))
# 3. ensure topic exists
self._ensure_topic_exist(self._target_topic)
self._ensure_topic_exist(self._listen_topic)
return self
def _ensure_topic_exist(self, topic: str):
"""A func used to ensure a specific topic exist
Determine whether the topic exists and create it if it does not
Args:
topic(str): Topic Name
"""
if not self._admin.is_topic_exist(topic):
try:
if not self._admin.create_topic(topic):
# create_topic failed
raise ChannelJobException(f"Topic ${topic} not exists, and"
"the attempt to create failed. ")
except TopicAlreadyExistsException as _:
# This func used to ensure topic exists, if it already exists,
# it is not considered an error to be ignored
pass
def dispatch_job(self, channel_type: str = "ssh", channel_opt: str = "cmd",
params: dict = {}, echo: dict = {}, **kwargs) -> ChannelJob:
"""Dispatch one channel job to channel services
Args:
channel_type(str): The type of channel used to execute job
channel_opt(str): The operation type, eg.: cmd, init
params(dict): Channl job parameters
echo(dict): Echo information
Keyword Args:
timeout: The maximum time to wait for a channel job to
execute. (ms)
auto_retry: If the channel is not established successfully, it is
automatically retried before the timeout period expires.
"""
# 1. Generate JobEntry
job_entry = JobEntry(
channel_type=channel_type, channel_opt=channel_opt,
params=params, echo=echo, listen_topic=self._listen_topic,
**kwargs
)
# 2. Cache job instance, used to trigger callback while result received
channel_job = ChannelJob(job_entry)
channel_job._bind_producer(self._producer, self._target_topic)
self._job_mapper[job_entry.job_id] = channel_job
return channel_job
def dispatch_file_job(self, opt: str = "send-file", params: dict = {}) -> ChannelFileJob:
if self._g_client is None:
raise ChannelJobException("GClient is None")
return ChannelFileJob(g_client=self._g_client, opt=opt, **params)
def _deal_received_event(self, event: Event):
job_result = JobResult.parse_by_cec_event_value(event.value)
if job_result.job_id in self._job_mapper:
self._job_mapper[job_result.job_id]._update_chunk(job_result)
def _run(self):
while self._job_executor_thread is not None and \
not self._job_executor_thread.stopped():
for event in self._consumer:
# deal each event
self._deal_received_event(event)
if self._job_executor_thread is not None and \
not self._job_executor_thread.stopped():
time.sleep(5)
def start(self):
if self._job_executor_thread is None or \
self._job_executor_thread.stopped():
self._job_executor_thread = StoppableThread(
target=self._run, daemon=True
)
self._job_executor_thread.start()
def stop(self):
self._job_executor_thread.stop()
self._consumer.disconnect()
self._producer.disconnect()
self._admin.disconnect()
self._job_executor_thread.join()
default_channel_job_executor = ChannelJobExecutor()