mirror of https://gitee.com/anolis/sysom.git
76 lines
3.1 KiB
Python
76 lines
3.1 KiB
Python
# -*- coding: utf-8 -*- #
|
|
"""
|
|
Time 2023/4/3 20:04
|
|
Author: mingfeng (SunnyQjm)
|
|
Email mfeng@linux.alibaba.com
|
|
File service_invoker.py
|
|
Description:
|
|
"""
|
|
import requests
|
|
from urllib.parse import urljoin
|
|
from abc import ABCMeta
|
|
from cmg_base import dispatch_service_discovery, LoadBalancingStrategy
|
|
|
|
|
|
class ServiceInvoker(metaclass=ABCMeta):
|
|
def __init__(self, cmg_url: str, fetch_interval: int = 5,
|
|
load_balance_strategy: LoadBalancingStrategy =
|
|
LoadBalancingStrategy.ROBIN):
|
|
self.discover = dispatch_service_discovery(
|
|
cmg_url, fetch_interval=fetch_interval
|
|
)
|
|
self.load_balance_strategy = load_balance_strategy
|
|
|
|
|
|
class HttpServiceInvoker(ServiceInvoker):
|
|
def __init__(self, cmg_url: str, fetch_interval: int = 5,
|
|
load_balance_strategy: LoadBalancingStrategy =
|
|
LoadBalancingStrategy.ROBIN):
|
|
super().__init__(cmg_url, fetch_interval=fetch_interval,
|
|
load_balance_strategy=load_balance_strategy)
|
|
|
|
def _get_http_base_url(self, service_name):
|
|
instance = self.discover.get_instance(service_name,
|
|
self.load_balance_strategy)
|
|
# TODO: Support automatic recognition of http and https
|
|
return f"http://{instance.host}:{instance.port}"
|
|
|
|
def request(self, service_name: str, method: str, path: str,
|
|
**kwargs) -> requests.Response:
|
|
url = urljoin(self._get_http_base_url(service_name), path)
|
|
return requests.request(method, url, **kwargs)
|
|
|
|
def get(self, service_name: str, path: str, params=None,
|
|
**kwargs) -> requests.Response:
|
|
url = urljoin(self._get_http_base_url(service_name), path)
|
|
return requests.get(url, params=params, **kwargs)
|
|
|
|
def options(self, service_name: str, path: str,
|
|
**kwargs) -> requests.Response:
|
|
url = urljoin(self._get_http_base_url(service_name), path)
|
|
return requests.options(url, **kwargs)
|
|
|
|
def head(self, service_name: str, path: str,
|
|
**kwargs) -> requests.Response:
|
|
url = urljoin(self._get_http_base_url(service_name), path)
|
|
return requests.options(url, **kwargs)
|
|
|
|
def post(self, service_name: str, path: str, data=None, json=None,
|
|
**kwargs) -> requests.Response:
|
|
url = urljoin(self._get_http_base_url(service_name), path)
|
|
return requests.post(url, data, json, **kwargs)
|
|
|
|
def put(self, service_name: str, path: str, data=None,
|
|
**kwargs) -> requests.Response:
|
|
url = urljoin(self._get_http_base_url(service_name), path)
|
|
return requests.put(url, data, **kwargs)
|
|
|
|
def patch(self, service_name: str, path: str, data=None,
|
|
**kwargs) -> requests.Response:
|
|
url = urljoin(self._get_http_base_url(service_name), path)
|
|
return requests.patch(url, data=data, **kwargs)
|
|
|
|
def delete(self, service_name: str, path: str,
|
|
**kwargs) -> requests.Response:
|
|
url = urljoin(self._get_http_base_url(service_name), path)
|
|
return requests.delete(url, **kwargs) |