sysom1/environment/1_sdk/cmg_base/service_invoker.py

76 lines
3.1 KiB
Python

# -*- coding: utf-8 -*- #
"""
Time 2023/4/3 20:04
Author: mingfeng (SunnyQjm)
Email mfeng@linux.alibaba.com
File service_invoker.py
Description:
"""
import requests
from urllib.parse import urljoin
from abc import ABCMeta
from cmg_base import dispatch_service_discovery, LoadBalancingStrategy
class ServiceInvoker(metaclass=ABCMeta):
def __init__(self, cmg_url: str, fetch_interval: int = 5,
load_balance_strategy: LoadBalancingStrategy =
LoadBalancingStrategy.ROBIN):
self.discover = dispatch_service_discovery(
cmg_url, fetch_interval=fetch_interval
)
self.load_balance_strategy = load_balance_strategy
class HttpServiceInvoker(ServiceInvoker):
def __init__(self, cmg_url: str, fetch_interval: int = 5,
load_balance_strategy: LoadBalancingStrategy =
LoadBalancingStrategy.ROBIN):
super().__init__(cmg_url, fetch_interval=fetch_interval,
load_balance_strategy=load_balance_strategy)
def _get_http_base_url(self, service_name):
instance = self.discover.get_instance(service_name,
self.load_balance_strategy)
# TODO: Support automatic recognition of http and https
return f"http://{instance.host}:{instance.port}"
def request(self, service_name: str, method: str, path: str,
**kwargs) -> requests.Response:
url = urljoin(self._get_http_base_url(service_name), path)
return requests.request(method, url, **kwargs)
def get(self, service_name: str, path: str, params=None,
**kwargs) -> requests.Response:
url = urljoin(self._get_http_base_url(service_name), path)
return requests.get(url, params=params, **kwargs)
def options(self, service_name: str, path: str,
**kwargs) -> requests.Response:
url = urljoin(self._get_http_base_url(service_name), path)
return requests.options(url, **kwargs)
def head(self, service_name: str, path: str,
**kwargs) -> requests.Response:
url = urljoin(self._get_http_base_url(service_name), path)
return requests.options(url, **kwargs)
def post(self, service_name: str, path: str, data=None, json=None,
**kwargs) -> requests.Response:
url = urljoin(self._get_http_base_url(service_name), path)
return requests.post(url, data, json, **kwargs)
def put(self, service_name: str, path: str, data=None,
**kwargs) -> requests.Response:
url = urljoin(self._get_http_base_url(service_name), path)
return requests.put(url, data, **kwargs)
def patch(self, service_name: str, path: str, data=None,
**kwargs) -> requests.Response:
url = urljoin(self._get_http_base_url(service_name), path)
return requests.patch(url, data=data, **kwargs)
def delete(self, service_name: str, path: str,
**kwargs) -> requests.Response:
url = urljoin(self._get_http_base_url(service_name), path)
return requests.delete(url, **kwargs)