sysom1/environment/1_sdk/sysom_utils/config_parser.py

255 lines
9.1 KiB
Python

# -*- coding: utf-8 -*- #
"""
Time 2023/03/10 10:01
Author: mingfeng (SunnyQjm)
Email mfeng@linux.alibaba.com
File config_parser.py
Description:
"""
import yaml
from enum import Enum
import os
from clogger import logger
from .yaml_concat import YamlConcatConstructor
from .adddict import Dict
def dict_merge(dct: dict, merge_dct: dict):
"""Recursive dict merge. Inspired by :meth:``dict.update()``, instead of
updating only top-level keys, dict_merge recurses down into dicts nested
to an arbitrary depth, updating keys. The ``merge_dct`` is merged into
``dct``.
Args:
dct(Dict): dict onto which the merge is executed
merge_dct(Dict): dct merged into dct
Return:
None
"""
if dct is None or merge_dct is None:
return
for k, v in merge_dct.items():
if (
k in dct and isinstance(dct[k], dict) and isinstance(merge_dct[k], dict)
): # noqa
dict_merge(dct[k], merge_dct[k])
else:
dct[k] = merge_dct[k]
def get_all_sysom_env_from_proc_1() -> dict:
env = {}
try:
with open("/proc/1/environ", "r") as f:
for line in f.readlines():
line = line.strip()
if line == "":
continue
envs = line.split("\0")
for _env in envs:
_env = _env.strip()
if _env == "":
continue
k, v = _env.split("=", 1)
if k.startswith("sysom"):
env[k] = v
except Exception as e:
logger.warning(e)
return env
SYSOM_CONFIG_SECTION_GLOBAL = "sysom_global"
SYSOM_CONFIG_SECTION_SERVER = "sysom_server"
SYSOM_CONFIG_SECTION_WEB = "sysom_web"
SYSOM_CONFIG_SECTION_NODE = "sysom_node"
SYSOM_CONFIG_SECTION_SERVICE = "sysom_service"
ENV_LIST = {
"DB_MYSQL_HOST": "sysom_server.db.mysql.host",
"DB_MYSQL_PORT": "sysom_server.db.mysql.port",
"DB_MYSQL_USERNAME": "sysom_server.db.mysql.user",
"DB_MYSQL_PASSWORD": "sysom_server.db.mysql.password",
"DB_MYSQL_DATABASE": "sysom_server.db.mysql.database",
"REDIS_HOST": "sysom_server.db.redis.host",
"REDIS_PORT": "sysom_server.db.redis.port",
"REDIS_USERNAME": "sysom_server.db.redis.username",
"REDIS_PASSWORD": "sysom_server.db.redis.password",
}
class ConfigParserException(Exception):
pass
class CecTarget(Enum):
ADMIN = "admin"
CONSUMER = "consumer"
PRODUCER = "producer"
class ConfigParser:
def __init__(self, global_config_path: str, service_config_path: str) -> None:
self.global_config_path = global_config_path
self.service_config_path = service_config_path
self._config: Dict = self._load_config()
self._overwrite_from_env()
def _overwrite_from_env(self) -> None:
proc_1_envs = get_all_sysom_env_from_proc_1()
for k, v in proc_1_envs.items():
key = k.replace("___", ".")
self._config.set_multi(key, v)
for env in os.environ:
if env.startswith("sysom"):
key = env.replace("___", ".")
self._config.set_multi(key, os.environ[env])
for env, key_str in ENV_LIST.items():
if os.getenv(env):
self._config.set_multi(key_str, os.getenv(env))
def _load_config(self) -> Dict:
YamlConcatConstructor.add_to_loader_class(loader_class=yaml.FullLoader)
global_config: dict = {}
service_config: dict = {}
result_config: dict = {}
# Load config
with open(self.global_config_path, "r") as f:
global_config = yaml.full_load(f.read())
with open(self.service_config_path, "r") as f:
service_config = yaml.full_load(f.read())
dict_merge(result_config, global_config)
dict_merge(result_config, service_config)
return Dict(result_config)
def get_config(self) -> Dict:
return self._config
def get_global_config(self) -> Dict:
return Dict(self._config.get(SYSOM_CONFIG_SECTION_GLOBAL, {}))
def get_server_config(self) -> Dict:
return Dict(self._config.get(SYSOM_CONFIG_SECTION_SERVER, {}))
def get_web_config(self) -> Dict:
return Dict(self._config.get(SYSOM_CONFIG_SECTION_WEB, {}))
def get_node_config(self) -> Dict:
return Dict(self._config.get(SYSOM_CONFIG_SECTION_NODE, {}))
def get_service_config(self) -> Dict:
return Dict(self._config.get(SYSOM_CONFIG_SECTION_SERVICE, {}))
def get_plugin_config(self, plugin_name) -> Dict:
return Dict(self.get_service_config().plugins.get(plugin_name, {}))
##############################################################################
# Helper functions
##############################################################################
def get_consumer_group(self) -> str:
return self.get_server_config().cec.consumer_group
def get_cec_url(self, target: CecTarget) -> str:
server_config = self.get_server_config()
cec_config = server_config.cec
special_param = {}
params = []
dict_merge(special_param, cec_config.special_param.comm)
dict_merge(special_param, cec_config.special_param.get(target.value, {}))
cec_url = ""
if cec_config.protocol == "redis":
redis_config = server_config.db.redis
cec_url = (
f"{cec_config.protocol}://{redis_config.host}:{redis_config.port}?"
)
if redis_config.username:
params.append(f"username={redis_config.username}")
if redis_config.password:
params.append(f"password={redis_config.password}")
else:
raise ConfigParserException(
f"Not support cec protocol: {cec_config.protocol}"
)
for k in special_param:
params.append(f"{k}={special_param[k]}")
cec_url += "&".join(params)
return cec_url
def get_cmg_url(self) -> str:
server_config = self.get_server_config()
cmg_config = server_config.cmg
special_param = {}
params = []
if cmg_config.special_param:
dict_merge(special_param, cmg_config.special_param)
cmg_url = ""
if cmg_config.protocol == "redis":
redis_config = server_config.db.redis
cmg_url = (
f"{cmg_config.protocol}://{redis_config.host}:{redis_config.port}?"
)
if redis_config.username:
params.append(f"username={redis_config.username}")
if redis_config.password:
params.append(f"password={redis_config.password}")
for k in special_param:
params.append(f"{k}={special_param[k]}")
cmg_url += "&".join(params)
return cmg_url
def get_gcache_url(self) -> str:
service_config = self.get_service_config()
server_config = self.get_server_config()
gcache_config = service_config.framework.gcache
special_param = {}
params = []
if gcache_config.special_param:
dict_merge(special_param, gcache_config.special_param)
gcache_url = ""
if gcache_config.protocol == "redis":
redis_config = server_config.db.redis
gcache_url = (
f"{gcache_config.protocol}://{redis_config.host}:{redis_config.port}?"
)
if redis_config.username:
params.append(f"username={redis_config.username}")
if redis_config.password:
params.append(f"password={redis_config.password}")
for k in special_param:
params.append(f"{k}={special_param[k]}")
gcache_url += "&".join(params)
return gcache_url
def get_gclient_url(self, service_name: str) -> str:
service_config = self.get_service_config()
gclient_config = service_config.framework.gclient
special_param = {}
params = []
if gclient_config.special_param:
dict_merge(special_param, gclient_config.special_param)
if service_name:
special_param["cmg_service_name"] = service_name
gclient_url = ""
if gclient_config.protocol == "cmg":
cmg_url = self.get_cmg_url()
gclient_url = f"cmg://{cmg_url.split('://')[1]}"
if not gclient_url.endswith("?"):
gclient_url += "&"
for k in special_param:
params.append(f"{k}={special_param[k]}")
gclient_url += "&".join(params)
return gclient_url
def get_local_channel_job_url(self) -> str:
server_config = self.get_server_config()
channel_job_config = server_config.channel_job
cec_url = self.get_cec_url(CecTarget.PRODUCER)
params = [
f"channel_job_target_topic={channel_job_config.target_topic}",
f"channel_job_listen_topic={channel_job_config.listen_topic}",
f"channel_job_consumer_group={channel_job_config.consumer_group}",
]
if cec_url != "" and cec_url[-1] != "?":
cec_url += "&"
return cec_url + "&".join(params)