mirror of https://gitee.com/anolis/sysom.git
86 lines
2.7 KiB
Python
86 lines
2.7 KiB
Python
# -*- coding: utf-8 -*- #
|
|
"""
|
|
Time 2023/3/17 18:56
|
|
Author: mingfeng (SunnyQjm)
|
|
Email mfeng@linux.alibaba.com
|
|
File service_instance.py
|
|
Description:
|
|
"""
|
|
import json
|
|
import uuid
|
|
from typing import Optional, List
|
|
|
|
|
|
class ServiceInstance:
|
|
"""This class is used to represent a microservice.
|
|
|
|
Args:
|
|
service_name(str): Name of service
|
|
service_id(str): ID of service, set to name if not provided
|
|
host(str): Listen host of service
|
|
port(int): Listen port of service
|
|
tags(List[str]): Used to classification, filtering and
|
|
discovery of services
|
|
metadata(dict): Metadata of service
|
|
check(dict): An optional health *check* can be created for
|
|
this service is one of `ServiceCheck.http`_
|
|
"""
|
|
|
|
def __init__(self, service_name: str, service_id: Optional[str] = None,
|
|
host: str = "127.0.0.1", port: int = 80,
|
|
tags=None,
|
|
metadata=None,
|
|
check=None):
|
|
if check is None:
|
|
check = {}
|
|
if metadata is None:
|
|
metadata = {}
|
|
if tags is None:
|
|
tags = []
|
|
self.service_name = service_name
|
|
if service_id is not None:
|
|
self.service_id = service_id
|
|
else:
|
|
self.service_id = str(uuid.uuid4())
|
|
self.host = host
|
|
self.port = port
|
|
self.tags = tags
|
|
self.metadata = metadata
|
|
self.check = check
|
|
|
|
@classmethod
|
|
def from_redis_mapping(cls, value: dict):
|
|
service_name = value.pop("service_name", "")
|
|
raw_tags = value.get("tags", "[]")
|
|
if isinstance(raw_tags, str):
|
|
value["tags"] = json.loads(raw_tags)
|
|
raw_metadata = value.get("metadata", "{}")
|
|
if isinstance(raw_metadata, str):
|
|
value["metadata"] = json.loads(raw_metadata)
|
|
raw_check = value.get('check', "{}")
|
|
if isinstance(raw_check, str):
|
|
value["check"] = json.loads(raw_check)
|
|
return cls(service_name, **value)
|
|
|
|
def to_redis_mapping(self):
|
|
return {
|
|
"service_id": self.service_id,
|
|
"service_name": self.service_name,
|
|
"host": self.host,
|
|
"port": self.port,
|
|
"tags": json.dumps(self.tags),
|
|
"metadata": json.dumps(self.metadata),
|
|
"check": json.dumps(self.check)
|
|
}
|
|
|
|
def to_dict(self):
|
|
return {
|
|
"service_id": self.service_id,
|
|
"service_name": self.service_name,
|
|
"host": self.host,
|
|
"port": self.port,
|
|
"tags": self.tags,
|
|
"metadata": self.metadata,
|
|
"check": self.check
|
|
}
|