sysom1/sysom_server/sysom_channel/lib/channels/base.py

126 lines
4.2 KiB
Python

# -*- coding: utf-8 -*- #
"""
Time 2022/11/14 14:32
Author: mingfeng (SunnyQjm)
Email mfeng@linux.alibaba.com
File base.py
Description:
"""
from abc import ABCMeta, abstractmethod
import time
import anyio
import asyncio
import functools
import json
from typing import Optional
from enum import Enum, unique
from clogger import logger
@unique
class ChannelCode(Enum):
SUCCESS = 0
SERVER_ERROR = 1
REQUEST_PARAM_ERROR = 2
CHANNEL_CONNECT_FAILED = 3
CHANNEL_CONNECT_TIMEOUT = 4
CHANNEL_EXEC_FAILED = 5
class ChannelException(Exception):
"""Exception which raise while use specific channel to communicate with instance
Args:
message(str): Detailed error messages for developers to locate problems
code(str): Error type
summary(str): Error summary for presentation to the user
"""
def __init__(self, message: str, code: int = ChannelCode.SERVER_ERROR.value,
summary: Optional[str] = None) -> None:
self.message = message
self.code = code
self.summary = summary if summary is not None else message
def __str__(self) -> str:
return self.message
class ChannelResult:
def __init__(self, code: int = 0, result: str = "", err_msg: str = "") -> None:
self.code = code
self.result = result
self.err_msg = err_msg
class BaseChannel(metaclass=ABCMeta):
@staticmethod
@abstractmethod
def initial(**kwargs) -> ChannelResult:
raise NotImplementedError
@staticmethod
async def initial_async(**kwargs) -> ChannelResult:
return await anyio.to_thread.run_sync(
functools.partial(BaseChannel.initial, **kwargs)
)
@abstractmethod
def run_command(self, **kwargs) -> ChannelResult:
raise NotImplementedError
def get_params(self, **kwargs) -> dict:
return {}
async def run_command_async(self, **kwargs) -> ChannelResult:
return await anyio.to_thread.run_sync(
functools.partial(self.run_command, **kwargs)
)
def run_command_auto_retry(self, **kwargs) -> ChannelResult:
timeout = kwargs.pop("timeout", 1000)
if timeout is None:
timeout = 1000
if kwargs.pop("auto_retry", False):
max_wait_time = time.time() + timeout / 1000
remain_time = int((max_wait_time - time.time()) * 1000)
kwargs["timeout"] = remain_time
res = self.run_command(**kwargs)
while res.code in [ChannelCode.CHANNEL_CONNECT_FAILED, ChannelCode.CHANNEL_CONNECT_TIMEOUT]:
logger.warning(
f"Channel retry due to: {res.err_msg}, params: {json.dumps(self.get_params())}")
time.sleep(1)
remain_time = int((max_wait_time - time.time()) * 1000)
if remain_time <= 0:
break
kwargs["timeout"] = remain_time
res = self.run_command(**kwargs)
return res
else:
kwargs["timeout"] = timeout
return self.run_command(**kwargs)
async def run_command_auto_retry_async(self, **kwargs) -> ChannelResult:
timeout = kwargs.pop("timeout", 1000)
if timeout is None:
timeout = 1000
if kwargs.pop("auto_retry", False):
max_wait_time = time.time() + timeout / 1000
remain_time = int((max_wait_time - time.time()) * 1000)
kwargs["timeout"] = remain_time
res = await self.run_command_async(**kwargs)
while res.code in [ChannelCode.CHANNEL_CONNECT_FAILED, ChannelCode.CHANNEL_CONNECT_TIMEOUT]:
logger.warning(
f"Channel retry due to: {res.err_msg}, params: {json.dumps(self.get_params())}")
await asyncio.sleep(1)
remain_time = int((max_wait_time - time.time()) * 1000)
if remain_time <= 0:
break
kwargs["timeout"] = remain_time
res = await self.run_command_async(**kwargs)
return res
else:
kwargs["timeout"] = timeout
return await self.run_command_async(**kwargs)