mirror of https://gitee.com/anolis/sysom.git
168 lines
5.0 KiB
Python
168 lines
5.0 KiB
Python
# -*- coding: utf-8 -*- #
|
|
"""
|
|
Time 2023/5/5 10:31
|
|
Author: mingfeng (SunnyQjm)
|
|
Email mfeng@linux.alibaba.com
|
|
File metric_reader.py
|
|
Description:
|
|
"""
|
|
import importlib
|
|
from typing import List
|
|
from abc import ABCMeta, abstractmethod
|
|
from .result import MetricResult
|
|
from .url import MetricReaderUrl
|
|
from .exceptions import MetricReaderProtoAlreadyExistsException, \
|
|
MetricReaderProtoNotExistsException, MetricReaderException
|
|
from .task import RangeQueryTask, InstantQueryTask
|
|
from .common import StaticConst
|
|
|
|
|
|
class MetricReader(metaclass=ABCMeta):
|
|
proto_dict = {}
|
|
|
|
def __init__(self, url: MetricReaderUrl, **kwargs):
|
|
self._special_params = StaticConst.parse_special_parameter(url.params)
|
|
|
|
def get_special_param(self, key: str, default='', required: bool = False):
|
|
"""Get specialization parameter by key
|
|
|
|
Args:
|
|
key(str): specialization parameter key
|
|
default(Any): default value if key not exists
|
|
|
|
Returns:
|
|
|
|
"""
|
|
if required:
|
|
if not self._special_params.get(key, None):
|
|
raise MetricReaderException(f"Missing required param '{key}'")
|
|
return self._special_params.get(key, default)
|
|
|
|
@abstractmethod
|
|
def get_metric_names(self, limit: int = -1) -> MetricResult:
|
|
"""Get metric names
|
|
|
|
Args:
|
|
limit(int): Up to the number of metric names pulled,
|
|
-1 indicate unlimited
|
|
|
|
Returns:
|
|
|
|
"""
|
|
|
|
@abstractmethod
|
|
def get_metric_labels(self, metric_name) -> MetricResult:
|
|
"""Get label list of specific metric_name
|
|
|
|
Args:
|
|
metric_name:
|
|
|
|
Returns:
|
|
|
|
"""
|
|
|
|
@abstractmethod
|
|
def get_label_values(self, label_name) -> MetricResult:
|
|
"""Get values list of specific label_name
|
|
|
|
Args:
|
|
label_name:
|
|
|
|
Returns:
|
|
|
|
"""
|
|
|
|
@abstractmethod
|
|
def instant_query(self, queries: List[InstantQueryTask]) -> MetricResult:
|
|
"""Query data using query API for a specified metric with promql aggregation function
|
|
|
|
Args:
|
|
queries([InstantQueryTask]): Query tasks
|
|
|
|
Returns:
|
|
RangeVectorResult
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def range_query(self, queries: List[RangeQueryTask]) -> MetricResult:
|
|
"""Query data within a specific time period for a specified metric
|
|
|
|
Args:
|
|
queries([RangeQueryTask]): Query tasks
|
|
|
|
Returns:
|
|
RangeVectorResult
|
|
"""
|
|
pass
|
|
|
|
@staticmethod
|
|
def protocol_register(proto, sub_class):
|
|
"""Register one new protocol => indicate one execution module
|
|
|
|
Register a new protocol => This function is called by the executing
|
|
module to register its own implementation of Producer for the executing
|
|
module to take effect.
|
|
(Usually when the execution module is implemented according to the
|
|
specification, there is no need for the developer to call this method
|
|
manually, the abstraction layer will dynamically import)
|
|
|
|
Args:
|
|
proto(str): Protocol identification
|
|
sub_class: Implementation class of Producer
|
|
|
|
Returns:
|
|
|
|
Examples:
|
|
>>> MetricReader.protocol_register('prometheus',
|
|
...MetricReader)
|
|
|
|
"""
|
|
if proto in MetricReader.proto_dict:
|
|
err = MetricReaderProtoAlreadyExistsException(
|
|
f"Proto '{proto}' already exists in Cmg-base-ServiceDiscovery."
|
|
)
|
|
raise err
|
|
MetricReader.proto_dict[proto] = sub_class
|
|
|
|
|
|
def dispatch_metric_reader(url: str, **kwargs) -> MetricReader:
|
|
"""Construct one MetricReader instance according the url
|
|
|
|
Construct a MetricReader instance of the corresponding type based on
|
|
the URL passed in.
|
|
|
|
Args:
|
|
url(str): CmgUrl
|
|
|
|
Keyword Args:
|
|
fetch_interval(int): The interval between pulling the instance list
|
|
of a specific service
|
|
|
|
Returns:
|
|
MetricReader: one MetricReader instance
|
|
|
|
Examples:
|
|
>>> service_discovery = dispatch_metric_reader(
|
|
..."prometheus://localhost:9090")
|
|
"""
|
|
metric_reader_url = MetricReaderUrl.parse(url)
|
|
if metric_reader_url.proto not in MetricReader.proto_dict:
|
|
# Check if dynamic import is possible
|
|
target_module = f"metric_reader.{metric_reader_url.proto}_metric_reader"
|
|
try:
|
|
module = importlib.import_module(target_module)
|
|
MetricReader.protocol_register(
|
|
metric_reader_url.proto,
|
|
getattr(module,
|
|
f'{metric_reader_url.proto.capitalize()}MetricReader')
|
|
)
|
|
except ModuleNotFoundError as exc:
|
|
raise MetricReaderProtoNotExistsException(
|
|
f"Proto '{metric_reader_url.proto}' not exists in Cmg-base"
|
|
"-ServiceDiscovery."
|
|
) from exc
|
|
metric_reader_instance = MetricReader.proto_dict[metric_reader_url.proto](
|
|
metric_reader_url, **kwargs)
|
|
return metric_reader_instance
|