mirror of https://gitee.com/anolis/sysom.git
223 lines
8.2 KiB
Python
223 lines
8.2 KiB
Python
# -*- coding: utf-8 -*- #
|
|
"""
|
|
Time 2022/8/11 14:30
|
|
Author: mingfeng (SunnyQjm)
|
|
Email mfeng@linux.alibaba.com
|
|
File redis_producer.py
|
|
Description:
|
|
"""
|
|
import json
|
|
import atexit
|
|
from typing import Callable, Union, Optional
|
|
|
|
from redis import Redis
|
|
from clogger import logger
|
|
from cec_base.producer import Producer
|
|
from cec_base.event import Event
|
|
from cec_base.url import CecUrl
|
|
from cec_base.exceptions import TopicNotExistsException, CecException
|
|
from .utils import do_connect_by_cec_url
|
|
from .redis_admin import RedisAdmin
|
|
from .common import StaticConst, ClientBase
|
|
from .admin_static import static_create_topic
|
|
|
|
|
|
class RedisProducer(Producer, ClientBase):
|
|
"""A redis-based execution module implement of Producer
|
|
|
|
Producer implementation in an execution module based on the Redis.
|
|
|
|
"""
|
|
|
|
def __init__(self, url: CecUrl):
|
|
Producer.__init__(self)
|
|
ClientBase.__init__(self, url)
|
|
self._current_url = ""
|
|
|
|
# Handles Redis implementation of event-centric specialization
|
|
# parameters
|
|
self.default_max_len = self.get_special_param(
|
|
StaticConst.REDIS_SPECIAL_PARM_CEC_DEFAULT_MAX_LEN
|
|
)
|
|
self.auto_mk_topic = self.get_special_param(
|
|
StaticConst.REDIS_SPECIAL_PARM_CEC_AUTO_MK_TOPIC
|
|
)
|
|
|
|
# 1. Connect to the Redis server
|
|
self._redis_client: Optional[Redis] = None
|
|
self.connect_by_cec_url(url)
|
|
|
|
# 2. Create a new dict to hold the topic_name => TopicMeta mapping
|
|
# relationship
|
|
self._topic_metas = {
|
|
|
|
}
|
|
|
|
def _ensure_topic_meta_info(self, topic_name: str) -> bool:
|
|
"""Auto fetch topic's meta info"""
|
|
inner_topic_name = StaticConst.get_inner_topic_name(topic_name)
|
|
topic_exist = False
|
|
# Determine whether there is metadata information for the target
|
|
# topic
|
|
if inner_topic_name not in self._topic_metas or \
|
|
self._topic_metas[inner_topic_name] is None:
|
|
# Pulling metadata information
|
|
self._topic_metas[inner_topic_name] = RedisAdmin.get_meta_info(
|
|
self._redis_client, topic_name)
|
|
|
|
# If the metadata information is invalid, the topic does not exist
|
|
if self._topic_metas[inner_topic_name] is None:
|
|
if self.auto_mk_topic:
|
|
# If you set the theme to be created automatically if it
|
|
# does not exist, try to create the theme
|
|
topic_exist = static_create_topic(
|
|
self._redis_client,
|
|
topic_name)
|
|
logger.debug(
|
|
f"{self} try to auto create topic: {topic_exist}"
|
|
)
|
|
else:
|
|
topic_exist = True
|
|
else:
|
|
topic_exist = True
|
|
return topic_exist
|
|
|
|
def produce(self, topic_name: str, message_value: Union[bytes, str, dict],
|
|
callback: Callable[[Exception, Event], None] = None,
|
|
**kwargs):
|
|
"""Generate one new event, then put it to event center
|
|
|
|
发布一个事件到事件中心 => 对应到 Redis 就是生产一个消息注入到 Stream 当中
|
|
|
|
Args:
|
|
topic_name(str): Topic name
|
|
message_value(bytes | str | dict): Event value
|
|
callback(Callable[[Exception, Event], None]): Event delivery
|
|
results callback
|
|
|
|
Keyword Args
|
|
partition(int): Partition ID
|
|
1. If a valid partition number is specified, the event is
|
|
deliverd to the specified partition (not recommended);
|
|
2. A positive partition ID is passed, but no such partition is
|
|
available, an exception will be thrown.
|
|
3. A negative partition number is passed (e.g. -1), then the
|
|
event will be cast to all partitions in a balanced manner
|
|
using the built-in policy (recommended).
|
|
|
|
Examples:
|
|
>>> producer = dispatch_producer(
|
|
..."redis://localhost:6379?password=123456")
|
|
>>> producer.produce("test_topic", {"value": "hhh"})
|
|
"""
|
|
logger.debug(
|
|
f"{self} try to produce one message {message_value} to "
|
|
f"{topic_name}.")
|
|
|
|
inner_topic_name = StaticConst.get_inner_topic_name(topic_name)
|
|
topic_exist = self._ensure_topic_meta_info(topic_name)
|
|
|
|
err, event_id = None, None
|
|
if not topic_exist:
|
|
logger.error(
|
|
f"{self} Topic ({topic_name}) not exists.")
|
|
# Topic not exists
|
|
err = TopicNotExistsException(
|
|
f"Topic ({topic_name}) not exists.")
|
|
else:
|
|
# Deliver the message in the corresponding topic
|
|
kwargs.setdefault("maxlen", self.default_max_len)
|
|
value = ""
|
|
value_type = StaticConst.REDIS_CEC_EVENT_VALUE_TYPE_DICT
|
|
if isinstance(message_value, dict):
|
|
value = json.dumps(message_value)
|
|
value_type = StaticConst.REDIS_CEC_EVENT_VALUE_TYPE_DICT
|
|
elif isinstance(message_value, str):
|
|
value = message_value
|
|
value_type = StaticConst.REDIS_CEC_EVENT_VALUE_TYPE_STRING
|
|
elif isinstance(message_value, bytes):
|
|
value = message_value.decode(encoding="utf-8")
|
|
value_type = StaticConst.REDIS_CEC_EVENT_VALUE_TYPE_BYTES
|
|
else:
|
|
raise CecException(
|
|
f"Not support value type: {type(message_value)}")
|
|
event_id = self._redis_client.xadd(inner_topic_name, {
|
|
StaticConst.REDIS_CEC_EVENT_VALUE_KEY: value,
|
|
StaticConst.REDIS_CEC_EVENT_VALUE_TYPE_KEY: value_type
|
|
}, **kwargs)
|
|
# Additional processing if topice does not exist
|
|
if event_id is None:
|
|
err = TopicNotExistsException(
|
|
f"Topic ({topic_name}) not exists.")
|
|
else:
|
|
logger.info(
|
|
f"{self} produce one message '{event_id}'"
|
|
)
|
|
|
|
if callback is not None:
|
|
callback(err, Event(message_value, event_id))
|
|
|
|
def flush(self, timeout: int = -1, **kwargs):
|
|
"""Flush all cached event to event center
|
|
|
|
Deliver all events in the cache that have not yet been committed into
|
|
the event center (this is a blocking call)
|
|
|
|
Args:
|
|
timeout(int): Blocking wait time
|
|
(Negative numbers represent infinite blocking wait)
|
|
|
|
Notes: The RedisProducer's produce func is currently blocking, so the
|
|
flush func can be empty
|
|
|
|
Examples:
|
|
>>> producer = dispatch_producer(
|
|
..."redis://localhost:6379?password=123456")
|
|
>>> producer.produce("test_topic", {"value": "hhh"})
|
|
>>> producer.flush()
|
|
"""
|
|
|
|
def connect_by_cec_url(self, url: CecUrl):
|
|
"""Connect to redis server by CecUrl
|
|
|
|
Connecting to the Redis server via CecUrl
|
|
|
|
Args:
|
|
url(str): CecUrl
|
|
"""
|
|
logger.debug(
|
|
f"{self} try to connect to '{url}'.")
|
|
self._redis_client = do_connect_by_cec_url(url)
|
|
self._current_url = str(url)
|
|
logger.info(
|
|
f"{self} connect to '{url}' successfully.")
|
|
return self
|
|
|
|
def connect(self, url: str):
|
|
"""Connect to redis server by url
|
|
|
|
Connecting to the remote message queue => Corresponding to this module
|
|
is connecting to the Redis server.
|
|
|
|
Args:
|
|
url(str): CecUrl
|
|
"""
|
|
cec_url = CecUrl.parse(url)
|
|
return self.connect_by_cec_url(cec_url)
|
|
|
|
def disconnect(self):
|
|
"""Disconnect from redis server
|
|
|
|
Disconnect from remote server => Corresponds to this module as
|
|
disconnecting the Redis server.
|
|
|
|
"""
|
|
if self._redis_client is None:
|
|
return
|
|
logger.debug(
|
|
f"{self} try to disconnect from '{self._current_url}'.")
|
|
self._redis_client.quit()
|
|
self._redis_client = None
|
|
logger.info(
|
|
f"{self} disconnect from '{self._current_url}' successfully.")
|