mirror of https://gitee.com/anolis/sysom.git
129 lines
4.1 KiB
Python
129 lines
4.1 KiB
Python
# -*- coding: utf-8 -*- #
|
|
"""
|
|
Time 2023/6/1 17:01
|
|
Author: mingfeng (SunnyQjm)
|
|
Email mfeng@linux.alibaba.com
|
|
File opentsdb_metric_reader.py
|
|
Description:
|
|
"""
|
|
import json
|
|
|
|
import requests
|
|
from typing import List
|
|
from urllib.parse import urljoin
|
|
from .result import MetricResult, RangeVectorResult
|
|
from .metric_reader import MetricReader
|
|
from .exceptions import MetricReaderException
|
|
from .url import MetricReaderUrl
|
|
from .task import RangeQueryTask
|
|
from .filter import FilterType
|
|
from .common import StaticConst
|
|
|
|
GET_METRIC_NAMES = "/api/suggestmetric"
|
|
GET_TAG_K = "/api/suggesttagk"
|
|
QUERY = "/api/query"
|
|
|
|
|
|
class OpentsdbMetricReader(MetricReader):
|
|
def __init__(self, url: MetricReaderUrl, **kwargs):
|
|
super().__init__(url, **kwargs)
|
|
protocol = "http"
|
|
if self.get_special_param(StaticConst.TLS):
|
|
protocol = "https"
|
|
self.base_url = f"{protocol}://{url.netloc}"
|
|
self.token = self.get_special_param(StaticConst.OPENTSDB_TOKEN)
|
|
self.aggregator = self.get_special_param(
|
|
StaticConst.OPENTSDB_AGGREGATOR)
|
|
self.down_sample = self.get_special_param(
|
|
StaticConst.OPENTSDB_DOWN_SAMPLE)
|
|
|
|
def _get_url(self, api: str):
|
|
return urljoin(self.base_url, api)
|
|
|
|
def get_metric_names(self, limit: int = -1) -> MetricResult:
|
|
params = {
|
|
"q": "e",
|
|
"start": "1w-ago",
|
|
"token": self.token
|
|
}
|
|
if limit > 0:
|
|
params["max"] = limit
|
|
res = requests.get(self._get_url(GET_METRIC_NAMES), params)
|
|
mr = MetricResult(0, [])
|
|
if res.status_code != 200:
|
|
mr.code = 1
|
|
mr.err_msg = "Request failed, status_code != 200"
|
|
else:
|
|
mr.data = res.json()
|
|
return mr
|
|
|
|
def get_metric_labels(self, metric_name) -> MetricResult:
|
|
res = requests.get(self._get_url(GET_TAG_K), {
|
|
"metric": metric_name,
|
|
"start": "1w-ago",
|
|
"token": self.token
|
|
})
|
|
mr = MetricResult(0, [])
|
|
if res.status_code != 200:
|
|
mr.code = 1
|
|
mr.err_msg = f"Get metric labels for {metric_name} failed!"
|
|
else:
|
|
mr.data = res.json()
|
|
return mr
|
|
|
|
def get_label_values(self, label_name: str) -> List[str]:
|
|
pass
|
|
|
|
def range_query(self, queries: List[RangeQueryTask]) -> MetricResult:
|
|
params = {
|
|
"start": queries[0].start_time * 1000,
|
|
"end": queries[0].end_time * 1000,
|
|
"queries": []
|
|
}
|
|
for task in queries:
|
|
query_param = {
|
|
"metric": task.metric_name,
|
|
"aggregator": self.aggregator,
|
|
"granularity": f"{task.step}s",
|
|
"downsample": self.down_sample,
|
|
"tags": {}
|
|
}
|
|
for flt in task.filters:
|
|
query_param["tags"][flt.label_name] = flt.value
|
|
params["queries"].append(query_param)
|
|
res = requests.post(
|
|
self._get_url(QUERY),
|
|
json=params,
|
|
params={
|
|
"token": self.token
|
|
}
|
|
)
|
|
mr = MetricResult(0, [])
|
|
if res.status_code != 200:
|
|
mr.code = 1
|
|
mr.err_msg = "Request failed, status_code != 200"
|
|
else:
|
|
# {
|
|
# "dps":{
|
|
# "1685601660":69.87
|
|
# },
|
|
# "id":0,
|
|
# "integrity":1,
|
|
# "messageWatermark":1685603658214,
|
|
# "metric":"node_load_1m",
|
|
# "summary":104.88153846153844,
|
|
# "tags":{
|
|
# "cluster":"asi_sh_hippo_ea119_01",
|
|
# "instance":"33.23.47.127",
|
|
# "cpu":""
|
|
# }
|
|
# }
|
|
for item in res.json():
|
|
values = []
|
|
for k, v in item["dps"].items():
|
|
values.append((k, v))
|
|
mr.data.append(
|
|
RangeVectorResult(item["metric"], item["tags"], values)
|
|
)
|
|
return mr
|