sysom1/environment/1_sdk/metric_reader/opentsdb_metric_reader.py

129 lines
4.1 KiB
Python

# -*- coding: utf-8 -*- #
"""
Time 2023/6/1 17:01
Author: mingfeng (SunnyQjm)
Email mfeng@linux.alibaba.com
File opentsdb_metric_reader.py
Description:
"""
import json
import requests
from typing import List
from urllib.parse import urljoin
from .result import MetricResult, RangeVectorResult
from .metric_reader import MetricReader
from .exceptions import MetricReaderException
from .url import MetricReaderUrl
from .task import RangeQueryTask
from .filter import FilterType
from .common import StaticConst
GET_METRIC_NAMES = "/api/suggestmetric"
GET_TAG_K = "/api/suggesttagk"
QUERY = "/api/query"
class OpentsdbMetricReader(MetricReader):
def __init__(self, url: MetricReaderUrl, **kwargs):
super().__init__(url, **kwargs)
protocol = "http"
if self.get_special_param(StaticConst.TLS):
protocol = "https"
self.base_url = f"{protocol}://{url.netloc}"
self.token = self.get_special_param(StaticConst.OPENTSDB_TOKEN)
self.aggregator = self.get_special_param(
StaticConst.OPENTSDB_AGGREGATOR)
self.down_sample = self.get_special_param(
StaticConst.OPENTSDB_DOWN_SAMPLE)
def _get_url(self, api: str):
return urljoin(self.base_url, api)
def get_metric_names(self, limit: int = -1) -> MetricResult:
params = {
"q": "e",
"start": "1w-ago",
"token": self.token
}
if limit > 0:
params["max"] = limit
res = requests.get(self._get_url(GET_METRIC_NAMES), params)
mr = MetricResult(0, [])
if res.status_code != 200:
mr.code = 1
mr.err_msg = "Request failed, status_code != 200"
else:
mr.data = res.json()
return mr
def get_metric_labels(self, metric_name) -> MetricResult:
res = requests.get(self._get_url(GET_TAG_K), {
"metric": metric_name,
"start": "1w-ago",
"token": self.token
})
mr = MetricResult(0, [])
if res.status_code != 200:
mr.code = 1
mr.err_msg = f"Get metric labels for {metric_name} failed!"
else:
mr.data = res.json()
return mr
def get_label_values(self, label_name: str) -> List[str]:
pass
def range_query(self, queries: List[RangeQueryTask]) -> MetricResult:
params = {
"start": queries[0].start_time * 1000,
"end": queries[0].end_time * 1000,
"queries": []
}
for task in queries:
query_param = {
"metric": task.metric_name,
"aggregator": self.aggregator,
"granularity": f"{task.step}s",
"downsample": self.down_sample,
"tags": {}
}
for flt in task.filters:
query_param["tags"][flt.label_name] = flt.value
params["queries"].append(query_param)
res = requests.post(
self._get_url(QUERY),
json=params,
params={
"token": self.token
}
)
mr = MetricResult(0, [])
if res.status_code != 200:
mr.code = 1
mr.err_msg = "Request failed, status_code != 200"
else:
# {
# "dps":{
# "1685601660":69.87
# },
# "id":0,
# "integrity":1,
# "messageWatermark":1685603658214,
# "metric":"node_load_1m",
# "summary":104.88153846153844,
# "tags":{
# "cluster":"asi_sh_hippo_ea119_01",
# "instance":"33.23.47.127",
# "cpu":""
# }
# }
for item in res.json():
values = []
for k, v in item["dps"].items():
values.append((k, v))
mr.data.append(
RangeVectorResult(item["metric"], item["tags"], values)
)
return mr