sysom1/environment/1_sdk/gclient_cmg/cmg_gclient.py

90 lines
3.5 KiB
Python

# -*- coding: utf-8 -*- #
"""
Time 2023/5/9 10:28
Author: mingfeng (SunnyQjm)
Email mfeng@linux.alibaba.com
File cmg_gclient.py
Description:
"""
import requests
from urllib.parse import urljoin
from gclient_base import GClient, GClientUrl, GClientException
from cmg_base import dispatch_service_discovery, LoadBalancingStrategy
from .common import StaticConst
class CmgGClient(GClient):
def __init__(self, url: GClientUrl):
special_params = StaticConst.parse_special_parameter(url.params)
cmg_protocol = special_params.get(
StaticConst.SPECIAL_PARM_CMG_PROTOCOL, "redis")
cmg_service_name = special_params.get(
StaticConst.SPECIAL_PARM_CMG_SERVICE_NAME, None
)
cmg_load_balance_strategy = special_params.get(
StaticConst.SPECIAL_PARM_CMG_LOAD_BALANCE_STRATEGY, "RANDOM"
)
cmg_fetch_interval = special_params.get(
StaticConst.SPECIAL_PARM_CMG_FETCH_INTERVAL, 5
)
params = []
for k in url.params:
params.append(f"{k}={url.params[k]}")
cmg_url = f"{cmg_protocol}://{url.netloc}?{'&'.join(params)}"
if cmg_service_name is None:
raise GClientException(
f"CmgGClient: required "
f"{StaticConst.SPECIAL_PARM_CMG_SERVICE_NAME}")
self.discover = dispatch_service_discovery(
cmg_url, fetch_interval=cmg_fetch_interval
)
self.load_balance_strategy = self._get_load_balance_strategy(
cmg_load_balance_strategy
)
self.service_name = cmg_service_name
@staticmethod
def _get_load_balance_strategy(strategy_str: str) -> LoadBalancingStrategy:
strategies = LoadBalancingStrategy.__members__.keys()
if strategy_str not in strategies:
raise GClientException(
f"CmgGClient: Not support strategy => '{strategy_str}'"
)
return LoadBalancingStrategy[strategy_str]
def _get_http_base_url(self):
instance = self.discover.get_instance(self.service_name,
self.load_balance_strategy)
# TODO: Support automatic recognition of http and https
return f"http://{instance.host}:{instance.port}"
def get(self, path: str, params=None, **kwargs) -> requests.Response:
url = urljoin(self._get_http_base_url(), path)
return requests.get(url, params, **kwargs)
def options(self, path: str, **kwargs) -> requests.Response:
url = urljoin(self._get_http_base_url(), path)
return requests.options(url, **kwargs)
def head(self, path: str, **kwargs) -> requests.Response:
url = urljoin(self._get_http_base_url(), path)
return requests.head(url, **kwargs)
def post(self, path: str, data=None, json=None,
**kwargs) -> requests.Response:
url = urljoin(self._get_http_base_url(), path)
return requests.post(url, data, json, **kwargs)
def put(self, path: str, data=None, **kwargs) -> requests.Response:
url = urljoin(self._get_http_base_url(), path)
return requests.put(url, data, **kwargs)
def patch(self, path: str, data=None, **kwargs) -> requests.Response:
url = urljoin(self._get_http_base_url(), path)
return requests.patch(url, data, **kwargs)
def delete(self, path: str, **kwargs) -> requests.Response:
url = urljoin(self._get_http_base_url(), path)
return requests.delete(url, **kwargs)