mirror of https://gitee.com/anolis/sysom.git
255 lines
9.1 KiB
Python
255 lines
9.1 KiB
Python
# -*- coding: utf-8 -*- #
|
|
"""
|
|
Time 2023/03/10 10:01
|
|
Author: mingfeng (SunnyQjm)
|
|
Email mfeng@linux.alibaba.com
|
|
File config_parser.py
|
|
Description:
|
|
"""
|
|
import yaml
|
|
from enum import Enum
|
|
import os
|
|
from clogger import logger
|
|
from .yaml_concat import YamlConcatConstructor
|
|
from .adddict import Dict
|
|
|
|
|
|
def dict_merge(dct: dict, merge_dct: dict):
|
|
"""Recursive dict merge. Inspired by :meth:``dict.update()``, instead of
|
|
updating only top-level keys, dict_merge recurses down into dicts nested
|
|
to an arbitrary depth, updating keys. The ``merge_dct`` is merged into
|
|
``dct``.
|
|
|
|
Args:
|
|
dct(Dict): dict onto which the merge is executed
|
|
merge_dct(Dict): dct merged into dct
|
|
Return:
|
|
None
|
|
"""
|
|
if dct is None or merge_dct is None:
|
|
return
|
|
for k, v in merge_dct.items():
|
|
if (
|
|
k in dct and isinstance(dct[k], dict) and isinstance(merge_dct[k], dict)
|
|
): # noqa
|
|
dict_merge(dct[k], merge_dct[k])
|
|
else:
|
|
dct[k] = merge_dct[k]
|
|
|
|
|
|
def get_all_sysom_env_from_proc_1() -> dict:
|
|
env = {}
|
|
try:
|
|
with open("/proc/1/environ", "r") as f:
|
|
for line in f.readlines():
|
|
line = line.strip()
|
|
if line == "":
|
|
continue
|
|
envs = line.split("\0")
|
|
for _env in envs:
|
|
_env = _env.strip()
|
|
if _env == "":
|
|
continue
|
|
k, v = _env.split("=", 1)
|
|
if k.startswith("sysom"):
|
|
env[k] = v
|
|
except Exception as e:
|
|
logger.warning(e)
|
|
return env
|
|
|
|
|
|
SYSOM_CONFIG_SECTION_GLOBAL = "sysom_global"
|
|
SYSOM_CONFIG_SECTION_SERVER = "sysom_server"
|
|
SYSOM_CONFIG_SECTION_WEB = "sysom_web"
|
|
SYSOM_CONFIG_SECTION_NODE = "sysom_node"
|
|
SYSOM_CONFIG_SECTION_SERVICE = "sysom_service"
|
|
|
|
ENV_LIST = {
|
|
"DB_MYSQL_HOST": "sysom_server.db.mysql.host",
|
|
"DB_MYSQL_PORT": "sysom_server.db.mysql.port",
|
|
"DB_MYSQL_USERNAME": "sysom_server.db.mysql.user",
|
|
"DB_MYSQL_PASSWORD": "sysom_server.db.mysql.password",
|
|
"DB_MYSQL_DATABASE": "sysom_server.db.mysql.database",
|
|
"REDIS_HOST": "sysom_server.db.redis.host",
|
|
"REDIS_PORT": "sysom_server.db.redis.port",
|
|
"REDIS_USERNAME": "sysom_server.db.redis.username",
|
|
"REDIS_PASSWORD": "sysom_server.db.redis.password",
|
|
}
|
|
|
|
|
|
class ConfigParserException(Exception):
|
|
pass
|
|
|
|
|
|
class CecTarget(Enum):
|
|
ADMIN = "admin"
|
|
CONSUMER = "consumer"
|
|
PRODUCER = "producer"
|
|
|
|
|
|
class ConfigParser:
|
|
def __init__(self, global_config_path: str, service_config_path: str) -> None:
|
|
self.global_config_path = global_config_path
|
|
self.service_config_path = service_config_path
|
|
self._config: Dict = self._load_config()
|
|
self._overwrite_from_env()
|
|
|
|
def _overwrite_from_env(self) -> None:
|
|
proc_1_envs = get_all_sysom_env_from_proc_1()
|
|
for k, v in proc_1_envs.items():
|
|
key = k.replace("___", ".")
|
|
self._config.set_multi(key, v)
|
|
for env in os.environ:
|
|
if env.startswith("sysom"):
|
|
key = env.replace("___", ".")
|
|
self._config.set_multi(key, os.environ[env])
|
|
for env, key_str in ENV_LIST.items():
|
|
if os.getenv(env):
|
|
self._config.set_multi(key_str, os.getenv(env))
|
|
|
|
def _load_config(self) -> Dict:
|
|
YamlConcatConstructor.add_to_loader_class(loader_class=yaml.FullLoader)
|
|
global_config: dict = {}
|
|
service_config: dict = {}
|
|
result_config: dict = {}
|
|
# Load config
|
|
with open(self.global_config_path, "r") as f:
|
|
global_config = yaml.full_load(f.read())
|
|
with open(self.service_config_path, "r") as f:
|
|
service_config = yaml.full_load(f.read())
|
|
dict_merge(result_config, global_config)
|
|
dict_merge(result_config, service_config)
|
|
return Dict(result_config)
|
|
|
|
def get_config(self) -> Dict:
|
|
return self._config
|
|
|
|
def get_global_config(self) -> Dict:
|
|
return Dict(self._config.get(SYSOM_CONFIG_SECTION_GLOBAL, {}))
|
|
|
|
def get_server_config(self) -> Dict:
|
|
return Dict(self._config.get(SYSOM_CONFIG_SECTION_SERVER, {}))
|
|
|
|
def get_web_config(self) -> Dict:
|
|
return Dict(self._config.get(SYSOM_CONFIG_SECTION_WEB, {}))
|
|
|
|
def get_node_config(self) -> Dict:
|
|
return Dict(self._config.get(SYSOM_CONFIG_SECTION_NODE, {}))
|
|
|
|
def get_service_config(self) -> Dict:
|
|
return Dict(self._config.get(SYSOM_CONFIG_SECTION_SERVICE, {}))
|
|
|
|
def get_plugin_config(self, plugin_name) -> Dict:
|
|
return Dict(self.get_service_config().plugins.get(plugin_name, {}))
|
|
|
|
##############################################################################
|
|
# Helper functions
|
|
##############################################################################
|
|
|
|
def get_consumer_group(self) -> str:
|
|
return self.get_server_config().cec.consumer_group
|
|
|
|
def get_cec_url(self, target: CecTarget) -> str:
|
|
server_config = self.get_server_config()
|
|
cec_config = server_config.cec
|
|
special_param = {}
|
|
params = []
|
|
dict_merge(special_param, cec_config.special_param.comm)
|
|
dict_merge(special_param, cec_config.special_param.get(target.value, {}))
|
|
cec_url = ""
|
|
if cec_config.protocol == "redis":
|
|
redis_config = server_config.db.redis
|
|
cec_url = (
|
|
f"{cec_config.protocol}://{redis_config.host}:{redis_config.port}?"
|
|
)
|
|
if redis_config.username:
|
|
params.append(f"username={redis_config.username}")
|
|
if redis_config.password:
|
|
params.append(f"password={redis_config.password}")
|
|
else:
|
|
raise ConfigParserException(
|
|
f"Not support cec protocol: {cec_config.protocol}"
|
|
)
|
|
for k in special_param:
|
|
params.append(f"{k}={special_param[k]}")
|
|
cec_url += "&".join(params)
|
|
return cec_url
|
|
|
|
def get_cmg_url(self) -> str:
|
|
server_config = self.get_server_config()
|
|
cmg_config = server_config.cmg
|
|
special_param = {}
|
|
params = []
|
|
if cmg_config.special_param:
|
|
dict_merge(special_param, cmg_config.special_param)
|
|
cmg_url = ""
|
|
if cmg_config.protocol == "redis":
|
|
redis_config = server_config.db.redis
|
|
cmg_url = (
|
|
f"{cmg_config.protocol}://{redis_config.host}:{redis_config.port}?"
|
|
)
|
|
if redis_config.username:
|
|
params.append(f"username={redis_config.username}")
|
|
if redis_config.password:
|
|
params.append(f"password={redis_config.password}")
|
|
for k in special_param:
|
|
params.append(f"{k}={special_param[k]}")
|
|
cmg_url += "&".join(params)
|
|
return cmg_url
|
|
|
|
def get_gcache_url(self) -> str:
|
|
service_config = self.get_service_config()
|
|
server_config = self.get_server_config()
|
|
gcache_config = service_config.framework.gcache
|
|
special_param = {}
|
|
params = []
|
|
if gcache_config.special_param:
|
|
dict_merge(special_param, gcache_config.special_param)
|
|
gcache_url = ""
|
|
if gcache_config.protocol == "redis":
|
|
redis_config = server_config.db.redis
|
|
gcache_url = (
|
|
f"{gcache_config.protocol}://{redis_config.host}:{redis_config.port}?"
|
|
)
|
|
if redis_config.username:
|
|
params.append(f"username={redis_config.username}")
|
|
if redis_config.password:
|
|
params.append(f"password={redis_config.password}")
|
|
for k in special_param:
|
|
params.append(f"{k}={special_param[k]}")
|
|
gcache_url += "&".join(params)
|
|
return gcache_url
|
|
|
|
def get_gclient_url(self, service_name: str) -> str:
|
|
service_config = self.get_service_config()
|
|
gclient_config = service_config.framework.gclient
|
|
special_param = {}
|
|
params = []
|
|
if gclient_config.special_param:
|
|
dict_merge(special_param, gclient_config.special_param)
|
|
if service_name:
|
|
special_param["cmg_service_name"] = service_name
|
|
gclient_url = ""
|
|
if gclient_config.protocol == "cmg":
|
|
cmg_url = self.get_cmg_url()
|
|
gclient_url = f"cmg://{cmg_url.split('://')[1]}"
|
|
if not gclient_url.endswith("?"):
|
|
gclient_url += "&"
|
|
for k in special_param:
|
|
params.append(f"{k}={special_param[k]}")
|
|
gclient_url += "&".join(params)
|
|
return gclient_url
|
|
|
|
def get_local_channel_job_url(self) -> str:
|
|
server_config = self.get_server_config()
|
|
channel_job_config = server_config.channel_job
|
|
cec_url = self.get_cec_url(CecTarget.PRODUCER)
|
|
params = [
|
|
f"channel_job_target_topic={channel_job_config.target_topic}",
|
|
f"channel_job_listen_topic={channel_job_config.listen_topic}",
|
|
f"channel_job_consumer_group={channel_job_config.consumer_group}",
|
|
]
|
|
if cec_url != "" and cec_url[-1] != "?":
|
|
cec_url += "&"
|
|
return cec_url + "&".join(params)
|