mirror of https://gitee.com/anolis/sysom.git
246 lines
9.0 KiB
Python
246 lines
9.0 KiB
Python
# -*- coding: utf-8 -*- #
|
|
"""
|
|
Time 2023/03/09 13:58
|
|
Author: mingfeng (SunnyQjm)
|
|
Email mfeng@linux.alibaba.com
|
|
File node_dispatcher.py
|
|
Description:
|
|
"""
|
|
import aiohttp
|
|
import aiofiles
|
|
import shutil
|
|
import tarfile
|
|
import os
|
|
from asyncer import syncify
|
|
from typing import List
|
|
from channel_job.job import ChannelJobExecutor
|
|
from .config_parser import ConfigParser
|
|
from .adddict import Dict
|
|
|
|
|
|
class NodeManagerException(Exception):
|
|
"""Base exception for ChanneJob sdk"""
|
|
|
|
|
|
class NodeManager:
|
|
def __init__(self, config: ConfigParser, channel_job_executor: ChannelJobExecutor) -> None:
|
|
self._config = config
|
|
self._service_config = config.get_service_config()
|
|
self._node_config = config.get_node_config()
|
|
self._channel_job_executor = channel_job_executor
|
|
|
|
##############################################################################
|
|
# Helper functions
|
|
##############################################################################
|
|
|
|
def _get_target_package_dir(self, arch: str) -> str:
|
|
return f"{self._service_config.service_name}-{arch}"
|
|
|
|
def _get_service_root_path(self) -> str:
|
|
return os.path.join(
|
|
self._service_config.path.root_path,
|
|
self._service_config.service_dir
|
|
)
|
|
|
|
def _get_from_dir_path(self) -> str:
|
|
return os.path.join(
|
|
self._get_service_root_path(),
|
|
self._node_config.delivery.from_dir
|
|
)
|
|
|
|
def _get_to_dir_path(self, arch: str) -> str:
|
|
return os.path.join(
|
|
self._get_service_root_path(),
|
|
self._node_config.delivery.to_dir,
|
|
self._get_target_package_dir(arch)
|
|
)
|
|
|
|
def _get_tar_package_name(self, arch: str) -> str:
|
|
return f"{self._get_target_package_dir(arch)}.tar.gz"
|
|
|
|
def _get_tar_package_path(self, arch) -> str:
|
|
return os.path.join(
|
|
self._get_service_root_path(),
|
|
"node", "tars",
|
|
self._get_tar_package_name(arch)
|
|
)
|
|
|
|
def _get_node_service_dir(self) -> str:
|
|
return os.path.join(
|
|
self._node_config.path.root_path,
|
|
self._service_config.service_name
|
|
)
|
|
|
|
def _is_tar_exists(self, arch: str):
|
|
"""Check if the tar package for the specified architecture already exists"""
|
|
return os.path.exists(self._get_tar_package_path(arch))
|
|
|
|
def _get_node_envs(self, arch: str):
|
|
envs = self._node_config.envs
|
|
envs["ARCH"] = arch
|
|
envs["SERVICE_NAME"] = self._service_config.service_name
|
|
res = ""
|
|
for env_k in envs:
|
|
res += f"export {env_k}={envs[env_k]};"
|
|
return res.strip()
|
|
|
|
##############################################################################
|
|
# Sync APIs
|
|
##############################################################################
|
|
|
|
def prepare_files(self, arch: str):
|
|
return syncify(self.prepare_files_async, raise_sync_error=False)(
|
|
arch=arch
|
|
)
|
|
|
|
def perform_init(self, arch: str, target_instance: str):
|
|
return syncify(self.perform_init_async, raise_sync_error=False)(
|
|
arch=arch,
|
|
target_instance=target_instance
|
|
)
|
|
|
|
def perform_update(self, arch: str, target_instance: str):
|
|
return syncify(self.perform_update_async, raise_sync_error=False)(
|
|
arch=arch,
|
|
target_instance=target_instance
|
|
)
|
|
|
|
def perform_clear(self, arch: str, target_instance: str):
|
|
return syncify(self.perform_clear_async, raise_sync_error=False)(
|
|
arch=arch,
|
|
target_instance=target_instance
|
|
)
|
|
|
|
##############################################################################
|
|
# Async APIs
|
|
##############################################################################
|
|
async def prepare_files_async(self, arch: str):
|
|
async def _do_copy(file: Dict):
|
|
src_file = os.path.join(self._get_from_dir_path(), file.local)
|
|
|
|
# not exists, try to download from remote
|
|
if not os.path.exists(src_file):
|
|
if not file.remote:
|
|
raise NodeManagerException(
|
|
f"Src file not exists: {src_file}"
|
|
f", And could not download from: {file.remote}"
|
|
)
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(file.remote) as resp:
|
|
async with aiofiles.open(src_file, 'wb') as fd:
|
|
async for chunk in resp.content.iter_chunked(1024):
|
|
await fd.write(chunk)
|
|
|
|
if not os.path.exists(src_file):
|
|
raise NodeManagerException(
|
|
f"Src file not exists: {file.local}"
|
|
f", And could not download from: {file.remote}"
|
|
)
|
|
|
|
# ensure parent dirs of dst_file exists
|
|
to_dir_path = self._get_to_dir_path(arch)
|
|
if not os.path.exists(to_dir_path):
|
|
os.makedirs(to_dir_path)
|
|
|
|
# copy to dst_file
|
|
dst_file = os.path.join(to_dir_path, file.local)
|
|
if os.path.exists(dst_file):
|
|
# in case of the src and dst are the same file
|
|
if os.path.samefile(src_file, dst_file):
|
|
return self
|
|
os.remove(dst_file)
|
|
|
|
# do copy
|
|
shutil.copy2(src_file, dst_file)
|
|
|
|
# 1. Check whether the specified architecture is supported.
|
|
files = self._node_config.delivery.files[arch]
|
|
if not files:
|
|
raise NodeManagerException(f"Unsupported architectures: {arch}")
|
|
|
|
# 2. Check whether the specified tar package is exists.
|
|
if self._is_tar_exists(arch):
|
|
return self
|
|
|
|
# 3. Copy files
|
|
for file in files:
|
|
if isinstance(file, List):
|
|
for f in file:
|
|
await _do_copy(f)
|
|
else:
|
|
await _do_copy(file)
|
|
|
|
# 4. Compressed package
|
|
to_dir_path = self._get_to_dir_path(arch)
|
|
tar_path = self._get_tar_package_path(arch)
|
|
tar_dir = os.path.dirname(tar_path)
|
|
if not os.path.exists(tar_dir):
|
|
os.makedirs(tar_dir)
|
|
with tarfile.open(tar_path, "w:gz") as tar:
|
|
tar.add(to_dir_path, arcname=self._get_target_package_dir(arch))
|
|
return self
|
|
|
|
async def perform_init_async(self, arch: str, target_instance: str):
|
|
# 1. Try to dispatcher tar file to remote node
|
|
node_service_dir = self._get_node_service_dir()
|
|
result = await self._channel_job_executor.dispatch_file_job(
|
|
params={
|
|
"local_path": self._get_tar_package_path(arch),
|
|
"remote_path": os.path.join(
|
|
node_service_dir,
|
|
self._get_tar_package_name(arch)
|
|
),
|
|
"instances": [target_instance]
|
|
}
|
|
).execute_async()
|
|
if result.code != 0:
|
|
# Dispatch file failed
|
|
raise NodeManagerException(
|
|
f"Dispatch file to node failed: {result.err_msg}"
|
|
)
|
|
|
|
# 2. Unzip tar file, execute init script
|
|
cd_workspace = f"cd {node_service_dir}"
|
|
unzip_cmd = f"tar -zxvf {self._get_tar_package_name(arch)}"
|
|
cd_tar_unzip_dir = f"cd {self._get_target_package_dir(arch)}"
|
|
init_cmd = f"bash {self._node_config.scripts.get('init', '')}"
|
|
result = await self._channel_job_executor.dispatch_job(
|
|
channel_opt="cmd",
|
|
params={
|
|
"instance": target_instance,
|
|
"command": f"{self._get_node_envs(arch)} {cd_workspace} && {unzip_cmd} && {cd_tar_unzip_dir} &&{init_cmd}",
|
|
},
|
|
timeout=self._node_config.get("timeout", 60000),
|
|
auto_retry=True
|
|
).execute_async()
|
|
if result.code != 0:
|
|
raise NodeManagerException(
|
|
f"Execute init script failed: {result.err_msg}"
|
|
)
|
|
return self
|
|
|
|
async def perform_update_async(self, arch: str, target_instance: str):
|
|
return await self.perform_init_async(arch, target_instance)
|
|
|
|
async def perform_clear_async(self, arch: str, target_instance: str):
|
|
node_service_dir = os.path.join(
|
|
self._get_node_service_dir(),
|
|
self._get_target_package_dir(arch)
|
|
)
|
|
cd_workspace = f"cd {node_service_dir}"
|
|
clear_cmd = f"bash {self._node_config.scripts.get('clear', '')}"
|
|
result = await self._channel_job_executor.dispatch_job(
|
|
channel_opt="cmd",
|
|
params={
|
|
"instance": target_instance,
|
|
"command": f"{self._get_node_envs(arch)} {cd_workspace} && {clear_cmd}",
|
|
},
|
|
timeout=self._node_config.get("timeout", 60000),
|
|
auto_retry=True
|
|
).execute_async()
|
|
if result.code != 0:
|
|
raise NodeManagerException(
|
|
f"Execute clear script failed: {result.err_msg}"
|
|
)
|
|
return self
|