mirror of https://gitee.com/anolis/sysom.git
90 lines
3.5 KiB
Python
90 lines
3.5 KiB
Python
# -*- coding: utf-8 -*- #
|
|
"""
|
|
Time 2023/5/9 10:28
|
|
Author: mingfeng (SunnyQjm)
|
|
Email mfeng@linux.alibaba.com
|
|
File cmg_gclient.py
|
|
Description:
|
|
"""
|
|
import requests
|
|
from urllib.parse import urljoin
|
|
from gclient_base import GClient, GClientUrl, GClientException
|
|
from cmg_base import dispatch_service_discovery, LoadBalancingStrategy
|
|
from .common import StaticConst
|
|
|
|
|
|
class CmgGClient(GClient):
|
|
def __init__(self, url: GClientUrl):
|
|
special_params = StaticConst.parse_special_parameter(url.params)
|
|
cmg_protocol = special_params.get(
|
|
StaticConst.SPECIAL_PARM_CMG_PROTOCOL, "redis")
|
|
cmg_service_name = special_params.get(
|
|
StaticConst.SPECIAL_PARM_CMG_SERVICE_NAME, None
|
|
)
|
|
cmg_load_balance_strategy = special_params.get(
|
|
StaticConst.SPECIAL_PARM_CMG_LOAD_BALANCE_STRATEGY, "RANDOM"
|
|
)
|
|
cmg_fetch_interval = special_params.get(
|
|
StaticConst.SPECIAL_PARM_CMG_FETCH_INTERVAL, 5
|
|
)
|
|
params = []
|
|
for k in url.params:
|
|
params.append(f"{k}={url.params[k]}")
|
|
cmg_url = f"{cmg_protocol}://{url.netloc}?{'&'.join(params)}"
|
|
if cmg_service_name is None:
|
|
raise GClientException(
|
|
f"CmgGClient: required "
|
|
f"{StaticConst.SPECIAL_PARM_CMG_SERVICE_NAME}")
|
|
|
|
self.discover = dispatch_service_discovery(
|
|
cmg_url, fetch_interval=cmg_fetch_interval
|
|
)
|
|
self.load_balance_strategy = self._get_load_balance_strategy(
|
|
cmg_load_balance_strategy
|
|
)
|
|
self.service_name = cmg_service_name
|
|
|
|
@staticmethod
|
|
def _get_load_balance_strategy(strategy_str: str) -> LoadBalancingStrategy:
|
|
strategies = LoadBalancingStrategy.__members__.keys()
|
|
if strategy_str not in strategies:
|
|
raise GClientException(
|
|
f"CmgGClient: Not support strategy => '{strategy_str}'"
|
|
)
|
|
return LoadBalancingStrategy[strategy_str]
|
|
|
|
def _get_http_base_url(self):
|
|
instance = self.discover.get_instance(self.service_name,
|
|
self.load_balance_strategy)
|
|
# TODO: Support automatic recognition of http and https
|
|
return f"http://{instance.host}:{instance.port}"
|
|
|
|
def get(self, path: str, params=None, **kwargs) -> requests.Response:
|
|
url = urljoin(self._get_http_base_url(), path)
|
|
return requests.get(url, params, **kwargs)
|
|
|
|
def options(self, path: str, **kwargs) -> requests.Response:
|
|
url = urljoin(self._get_http_base_url(), path)
|
|
return requests.options(url, **kwargs)
|
|
|
|
def head(self, path: str, **kwargs) -> requests.Response:
|
|
url = urljoin(self._get_http_base_url(), path)
|
|
return requests.head(url, **kwargs)
|
|
|
|
def post(self, path: str, data=None, json=None,
|
|
**kwargs) -> requests.Response:
|
|
url = urljoin(self._get_http_base_url(), path)
|
|
return requests.post(url, data, json, **kwargs)
|
|
|
|
def put(self, path: str, data=None, **kwargs) -> requests.Response:
|
|
url = urljoin(self._get_http_base_url(), path)
|
|
return requests.put(url, data, **kwargs)
|
|
|
|
def patch(self, path: str, data=None, **kwargs) -> requests.Response:
|
|
url = urljoin(self._get_http_base_url(), path)
|
|
return requests.patch(url, data, **kwargs)
|
|
|
|
def delete(self, path: str, **kwargs) -> requests.Response:
|
|
url = urljoin(self._get_http_base_url(), path)
|
|
return requests.delete(url, **kwargs)
|