sysom1/environment/1_sdk/metric_reader/metric_reader.py

168 lines
5.0 KiB
Python

# -*- coding: utf-8 -*- #
"""
Time 2023/5/5 10:31
Author: mingfeng (SunnyQjm)
Email mfeng@linux.alibaba.com
File metric_reader.py
Description:
"""
import importlib
from typing import List
from abc import ABCMeta, abstractmethod
from .result import MetricResult
from .url import MetricReaderUrl
from .exceptions import MetricReaderProtoAlreadyExistsException, \
MetricReaderProtoNotExistsException, MetricReaderException
from .task import RangeQueryTask, InstantQueryTask
from .common import StaticConst
class MetricReader(metaclass=ABCMeta):
proto_dict = {}
def __init__(self, url: MetricReaderUrl, **kwargs):
self._special_params = StaticConst.parse_special_parameter(url.params)
def get_special_param(self, key: str, default='', required: bool = False):
"""Get specialization parameter by key
Args:
key(str): specialization parameter key
default(Any): default value if key not exists
Returns:
"""
if required:
if not self._special_params.get(key, None):
raise MetricReaderException(f"Missing required param '{key}'")
return self._special_params.get(key, default)
@abstractmethod
def get_metric_names(self, limit: int = -1) -> MetricResult:
"""Get metric names
Args:
limit(int): Up to the number of metric names pulled,
-1 indicate unlimited
Returns:
"""
@abstractmethod
def get_metric_labels(self, metric_name) -> MetricResult:
"""Get label list of specific metric_name
Args:
metric_name:
Returns:
"""
@abstractmethod
def get_label_values(self, label_name) -> MetricResult:
"""Get values list of specific label_name
Args:
label_name:
Returns:
"""
@abstractmethod
def instant_query(self, queries: List[InstantQueryTask]) -> MetricResult:
"""Query data using query API for a specified metric with promql aggregation function
Args:
queries([InstantQueryTask]): Query tasks
Returns:
RangeVectorResult
"""
pass
@abstractmethod
def range_query(self, queries: List[RangeQueryTask]) -> MetricResult:
"""Query data within a specific time period for a specified metric
Args:
queries([RangeQueryTask]): Query tasks
Returns:
RangeVectorResult
"""
pass
@staticmethod
def protocol_register(proto, sub_class):
"""Register one new protocol => indicate one execution module
Register a new protocol => This function is called by the executing
module to register its own implementation of Producer for the executing
module to take effect.
(Usually when the execution module is implemented according to the
specification, there is no need for the developer to call this method
manually, the abstraction layer will dynamically import)
Args:
proto(str): Protocol identification
sub_class: Implementation class of Producer
Returns:
Examples:
>>> MetricReader.protocol_register('prometheus',
...MetricReader)
"""
if proto in MetricReader.proto_dict:
err = MetricReaderProtoAlreadyExistsException(
f"Proto '{proto}' already exists in Cmg-base-ServiceDiscovery."
)
raise err
MetricReader.proto_dict[proto] = sub_class
def dispatch_metric_reader(url: str, **kwargs) -> MetricReader:
"""Construct one MetricReader instance according the url
Construct a MetricReader instance of the corresponding type based on
the URL passed in.
Args:
url(str): CmgUrl
Keyword Args:
fetch_interval(int): The interval between pulling the instance list
of a specific service
Returns:
MetricReader: one MetricReader instance
Examples:
>>> service_discovery = dispatch_metric_reader(
..."prometheus://localhost:9090")
"""
metric_reader_url = MetricReaderUrl.parse(url)
if metric_reader_url.proto not in MetricReader.proto_dict:
# Check if dynamic import is possible
target_module = f"metric_reader.{metric_reader_url.proto}_metric_reader"
try:
module = importlib.import_module(target_module)
MetricReader.protocol_register(
metric_reader_url.proto,
getattr(module,
f'{metric_reader_url.proto.capitalize()}MetricReader')
)
except ModuleNotFoundError as exc:
raise MetricReaderProtoNotExistsException(
f"Proto '{metric_reader_url.proto}' not exists in Cmg-base"
"-ServiceDiscovery."
) from exc
metric_reader_instance = MetricReader.proto_dict[metric_reader_url.proto](
metric_reader_url, **kwargs)
return metric_reader_instance