!392 升级: cec sdk 库进行版本升级, 同时对通知模块相关部分进行联动修改

Merge pull request !392 from SunnyQjm/cec-new
This commit is contained in:
huangtuq 2022-09-14 02:00:20 +00:00 committed by Gitee
commit 684ef8c78e
16 changed files with 1409 additions and 308 deletions

1
sysom_api/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
conf/ssh-key

View File

@ -10,21 +10,31 @@ from apps.alarm.models import AlarmModel, SubscribeModel
from lib import * from lib import *
from django.conf import settings from django.conf import settings
from sdk.cec_base.producer import Producer, dispatch_producer from sdk.cec_base.producer import Producer, dispatch_producer
from sdk.cec_base.admin import Admin, dispatch_admin
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
producer: Producer = dispatch_producer(settings.SYSOM_CEC_URL, default_max_len=settings.SYSOM_CEC_ALARM_MAXLEN) producer: Producer = dispatch_producer(settings.SYSOM_CEC_URL)
admin: Admin = dispatch_admin(settings.SYSOM_CEC_URL)
# 存储当前模块全局状态
storage = {
'alarm_topic_exist': False
}
def _create_alarm_message(kwargs): def _create_alarm_message(kwargs):
alarm_serializer = serializer.AddAlarmSerializer(data=kwargs) alarm_serializer = serializer.AddAlarmSerializer(data=kwargs)
alarm_serializer.is_valid(raise_exception=True) alarm_serializer.is_valid(raise_exception=True)
alarm_serializer.save() alarm_serializer.save()
channel = alarm_serializer.instance.sub.title subscriber = alarm_serializer.instance.sub.title
ser = serializer.AlarmSerializer(alarm_serializer.instance) ser = serializer.AlarmSerializer(alarm_serializer.instance)
producer.produce(f"sysom_alarm-{channel}", { if not storage['alarm_topic_exist']:
"sub": channel, storage['alarm_topic_exist'] = admin.is_topic_exist(
settings.SYSOM_CEC_ALARM_TOPIC) or admin.create_topic(settings.SYSOM_CEC_ALARM_TOPIC)
producer.produce(settings.SYSOM_CEC_ALARM_TOPIC, {
"sub": subscriber,
"message": ser.data "message": ser.data
}, auto_mk_topic=True) })
producer.flush() producer.flush()
return alarm_serializer.data return alarm_serializer.data

View File

@ -152,9 +152,11 @@ CHANNEL_SERVICE = 'http://127.0.0.1:7003'
# key path # key path
KEY_PATH=os.path.join(BASE_DIR, 'conf', 'ssh-key') KEY_PATH=os.path.join(BASE_DIR, 'conf', 'ssh-key')
# cec settings ##################################################################
SYSOM_CEC_URL = "redis://localhost:6379" # Cec settings
SYSOM_CEC_ALARM_MAXLEN = 1000 ##################################################################
SYSOM_CEC_URL = "redis://localhost:6379?cec_default_max_len=1000"
SYSOM_CEC_ALARM_TOPIC = "CEC-SYSOM-ALARM"
# API # API
TASK_API = f'{TASK_SERVICE}/api/v1/tasks/' TASK_API = f'{TASK_SERVICE}/api/v1/tasks/'

View File

@ -118,7 +118,7 @@ class NoticelconConsumer(JsonWebsocketConsumer):
self._user = self.scope['user'] self._user = self.scope['user']
if self._user: if self._user:
self.accept() self.accept()
self.consumer = dispatch_consumer(settings.SYSOM_CEC_URL, f"sysom_alarm-{self._user.username}", self.consumer = dispatch_consumer(settings.SYSOM_CEC_URL, settings.SYSOM_CEC_ALARM_TOPIC,
consumer_id=Consumer.generate_consumer_id(), start_from_now=True) consumer_id=Consumer.generate_consumer_id(), start_from_now=True)
Thread(target=self.loop_message).start() Thread(target=self.loop_message).start()
else: else:

View File

@ -5,14 +5,49 @@ Created: 2022/07/24
Description: Description:
""" """
import importlib import importlib
import json
from abc import ABCMeta, abstractmethod from abc import ABCMeta, abstractmethod
from .base import Connectable, Disconnectable from .base import Connectable, Disconnectable
from .base import Registrable, ProtoAlreadyExistsException from .base import Registrable, ProtoAlreadyExistsException
from .base import ProtoNotExistsException from .base import ProtoNotExistsException, CecException
from .event import Event
from .meta import TopicMeta, \
ConsumerGroupMemberMeta
from .url import CecUrl from .url import CecUrl
from loguru import logger from loguru import logger
class ConsumeStatusItem(object):
"""
消费状态 => 表征了单个消费者组对特定主题的消费情况
1. 最小ID最小 offset
2. 最大ID最大 offset
3. 分区中存储的事件总数包括已消费的和未消费的
4. 最后一个当前消费组在该分区已确认的事件ID最后一次消费者确认的事件的ID
5. 分区的消息堆积数量 LAG已经提交到该分区但是没有被当前消费者消费或确认的事件数量
"""
def __init__(self, topic: str, consumer_group_id: str, partition: int,
min_id: str = "", max_id: str = "",
total_event_count: int = 0, last_ack_id: str = "",
lag: int = 0):
self.topic = topic
self.consumer_group_id = consumer_group_id
self.partition = partition
self.min_id = min_id
self.max_id = max_id
self.total_event_count = total_event_count
self.last_ack_id = last_ack_id
self.lag = lag
def tojson(self):
return json.dumps(self.__dict__)
def __repr__(self):
return self.tojson()
class Admin(Connectable, Disconnectable, Registrable, metaclass=ABCMeta): class Admin(Connectable, Disconnectable, Registrable, metaclass=ABCMeta):
"""Common Event Center Management interface definition """Common Event Center Management interface definition
@ -111,7 +146,7 @@ class Admin(Connectable, Disconnectable, Registrable, metaclass=ABCMeta):
pass pass
@abstractmethod @abstractmethod
def get_topic_list(self) -> [str]: def get_topic_list(self) -> [TopicMeta]:
"""Get topic list """Get topic list
获取主题列表 获取主题列表
@ -124,7 +159,7 @@ class Admin(Connectable, Disconnectable, Registrable, metaclass=ABCMeta):
Examples: Examples:
>>> admin = dispatch_admin("redis://localhost:6379") >>> admin = dispatch_admin("redis://localhost:6379")
>>> admin.get_topic_list() >>> admin.get_topic_list()
['test_topic'] [TopicMeta(faeec676-60db-4418-a775-c5f1121d5331, 1)]
""" """
pass pass
@ -197,7 +232,7 @@ class Admin(Connectable, Disconnectable, Registrable, metaclass=ABCMeta):
pass pass
@abstractmethod @abstractmethod
def get_consumer_group_list(self) -> [str]: def get_consumer_group_list(self) -> [ConsumerGroupMemberMeta]:
"""Get consumer group list """Get consumer group list
获取消费组列表 获取消费组列表
@ -208,7 +243,81 @@ class Admin(Connectable, Disconnectable, Registrable, metaclass=ABCMeta):
Examples: Examples:
>>> admin = dispatch_admin("redis://localhost:6379") >>> admin = dispatch_admin("redis://localhost:6379")
>>> admin.get_consumer_group_list() >>> admin.get_consumer_group_list()
['test_group'] """
pass
@abstractmethod
def get_consume_status(self, topic: str, consumer_group_id: str = "",
partition: int = 0) -> [ConsumeStatusItem]:
"""Get consumption info for specific <topic, consumer_group, partition>
获取特定消费者组对某个主题下的特定分区的消费情况应包含以下数据
1. 最小ID最小 offset
2. 最大ID最大 offset
3. 分区中存储的事件总数包括已消费的和未消费的
4. 最后一个当前消费组在该分区已确认的事件ID最后一次消费者确认的事件的ID
5. 分区的消息堆积数量 LAG已经提交到该分区但是没有被当前消费者消费或确认的事件数量
Args:
topic: 主题名字
consumer_group_id: 消费组ID
1. 如果 consumer_group_id 为空字符串或者None则返回订阅了该主题的所有
消费组的消费情况=> 此时 partition 参数无效将获取所有分区的消费数据
2. 如果 consumer_group_id 为无效的组ID则抛出异常
3. 如果 consumer_group_id 为有效的组ID则只获取该消费组的消费情况
partition: 分区ID
1. 如果 partition 指定有效非负整数 => 返回指定分区的消费情况
2. 如果 partition 指定无效非负整数 => 抛出异常
3. 如果 partition 指定负数 => 返回当前主题下所有分区的消费情况
Raises:
CecException
Examples:
>>> admin = dispatch_admin("redis://localhost:6379")
>>> admin.get_consume_status("topic1")
[
{
"topic":"topic1",
"consumer_group_id":"c78e8b71-45b9-4e11-8f8e-05a98b534cc0",
"min_id":"1661516434003-0",
"max_id":"1661516434004-4",
"total_event_count":10,
"last_ack_id":"1661516434003-4",
"lag":5
},
{
"topic":"topic1",
"consumer_group_id":"d1b39ec3-6ae9-42a6-83b5-257d875788e6",
"min_id":"1661516434003-0",
"max_id":"1661516434004-4",
"total_event_count":10,
"last_ack_id":"1661516434003-1",
"lag":8
}
]
Returns:
"""
pass
@abstractmethod
def get_event_list(self, topic: str, partition: int, offset: str,
count: int) -> [Event]:
""" Get event list for specific <topic, partition>
获取特定主题在指定分区下的消息列表
1. offset count 用于分页
Args:
topic: 主题名字
partition: 分区ID
offset: 偏移希望读取在该 ID 之后的消息
count: 最大读取数量
Returns:
""" """
pass pass
@ -309,21 +418,21 @@ def dispatch_admin(url: str, **kwargs) -> Admin:
return admin_instance return admin_instance
class TopicAlreadyExistsException(Exception): class TopicAlreadyExistsException(CecException):
"""在创建 Topic 的过程中,如果当前 Topic 已经存在,则应当抛出本异常""" """在创建 Topic 的过程中,如果当前 Topic 已经存在,则应当抛出本异常"""
pass pass
class TopicNotExistsException(Exception): class TopicNotExistsException(CecException):
"""在删除 Topic 的过程中,如果不存在目标 Topic则应当抛出本异常""" """在删除 Topic 的过程中,如果不存在目标 Topic则应当抛出本异常"""
pass pass
class ConsumerGroupAlreadyExistsException(Exception): class ConsumerGroupAlreadyExistsException(CecException):
"""在创建消费组的过程中,如果当前消费组已经存在,则应当抛出本异常""" """在创建消费组的过程中,如果当前消费组已经存在,则应当抛出本异常"""
pass pass
class ConsumerGroupNotExistsException(Exception): class ConsumerGroupNotExistsException(CecException):
"""在删除消费组的过程中,如果不存在目标消费组,则应当抛出本异常""" """在删除消费组的过程中,如果不存在目标消费组,则应当抛出本异常"""
pass pass

View File

@ -57,7 +57,11 @@ class Dispatchable(metaclass=ABCMeta):
pass pass
class ProtoAlreadyExistsException(Exception): class CecException(Exception):
pass
class ProtoAlreadyExistsException(CecException):
"""协议已经存在异常 """协议已经存在异常
1. 在注册一个新的协议时该协议名已经被注册则会抛出本异常 1. 在注册一个新的协议时该协议名已经被注册则会抛出本异常
@ -65,7 +69,7 @@ class ProtoAlreadyExistsException(Exception):
pass pass
class ProtoNotExistsException(Exception): class ProtoNotExistsException(CecException):
"""协议不存在异常 """协议不存在异常
1. 使用URL分发方式创建实例时如果对应的协议并没有被注册则会抛出本异常 1. 使用URL分发方式创建实例时如果对应的协议并没有被注册则会抛出本异常

View File

@ -70,7 +70,7 @@ class Consumer(Connectable, Disconnectable, Registrable, metaclass=ABCMeta):
self.consume_mode = ConsumeMode.CONSUME_FROM_NOW self.consume_mode = ConsumeMode.CONSUME_FROM_NOW
@abstractmethod @abstractmethod
def consume(self, timeout: int = 0, auto_ack: bool = False, def consume(self, timeout: int = -1, auto_ack: bool = False,
batch_consume_limit: int = 0) -> [Event]: batch_consume_limit: int = 0) -> [Event]:
"""Start to consume the event from event center according to the """Start to consume the event from event center according to the
corresponding ConsumeMode corresponding ConsumeMode
@ -78,7 +78,7 @@ class Consumer(Connectable, Disconnectable, Registrable, metaclass=ABCMeta):
根据对应消费模式的行为开始从事件中心消费事件 根据对应消费模式的行为开始从事件中心消费事件
Args: Args:
timeout(int): 超时等待时间单位ms0 表示阻塞等待 timeout(int): 超时等待时间单位ms<= 表示阻塞等待
auto_ack(bool): 是否开启自动确认组消费模式有效 auto_ack(bool): 是否开启自动确认组消费模式有效
1. 一旦开启自动确认每成功读取到一个事件消息就会自动确认 1. 一旦开启自动确认每成功读取到一个事件消息就会自动确认
@ -110,14 +110,16 @@ class Consumer(Connectable, Disconnectable, Registrable, metaclass=ABCMeta):
pass pass
@abstractmethod @abstractmethod
def ack(self, event_id: str) -> int: def ack(self, event: Event) -> int:
"""Confirm that the specified event has been successfully consumed """Confirm that the specified event has been successfully consumed
对指定的事件进行消费确认 对指定的事件进行消费确认
1. 通常应当在取出事件并成功处理之后对该事件进行确认 1. 通常应当在取出事件并成功处理之后对该事件进行确认
Args: Args:
event_id: 事件ID event(Event): 要确认的事件
1. 必须是通过 Consumer 消费获得的 Event 实例
2. 自行构造的 Event 传递进去不保证结果符合预期
Returns: Returns:
int: 1 if successfully, 0 otherwise int: 1 if successfully, 0 otherwise
@ -130,7 +132,7 @@ class Consumer(Connectable, Disconnectable, Registrable, metaclass=ABCMeta):
... , start_from_now=False) ... , start_from_now=False)
>>> msgs = consumer.consume(200, auto_ack=False, batch_consume_limit=1) >>> msgs = consumer.consume(200, auto_ack=False, batch_consume_limit=1)
>>> msg = msgs[0] >>> msg = msgs[0]
>>> consumer.ack(msg.event_id) >>> consumer.ack(msg)
""" """
pass pass

View File

@ -25,3 +25,18 @@ class Event(object):
value = dict() value = dict()
self.value = value self.value = value
self.event_id = event_id self.event_id = event_id
# cache 用于底层实现缓存数据,用户代码不应当依赖该属性
self._cache = dict()
def put(self, key: str, value):
self._cache[key] = value
def get(self, key):
return self._cache.get(key)
def __repr__(self):
return f"Event({self.event_id}, {self.value}) "
def __str__(self):
return self.event_id

View File

@ -15,29 +15,89 @@ class TopicMeta:
Args: Args:
topic_name(str): 主题名字 topic_name(str): 主题名字
num_partitions(int): 分区数目
replication_factor(int): 冗余系数
expire_time(int): 事件有效时间
Attributes: Attributes:
topic_name(str): 主题名字 topic_name(str): 主题名字
num_partitions(int): 分区数目
replication_factor(int): 冗余系数
expire_time(int): 事件有效时间
""" """
def __init__(self, topic_name: str = "", num_partitions: int = 1, def __init__(self, topic_name: str = ""):
replication_factor: int = 1,
expire_time: int = 24 * 60 * 60 * 1000):
self.topic_name = topic_name self.topic_name = topic_name
self.num_partitions = int(num_partitions) # example: 0 -> PartitionMeta(partition_id=xxx)
self.replication_factor = int(replication_factor) self.partitions = {}
self.expire_time = int(expire_time) self.error = None
def to_dict(self): def __repr__(self):
"""Convert TopicMeta to dict if self.error is not None:
return f"TopicMeta({self.topic_name}, {len(self.partitions)} " \
f"partitions, {self.error})"
else:
return f"TopicMeta({self.topic_name}, {len(self.partitions)} " \
f"partitions)"
def __str__(self):
return self.topic_name
class PartitionMeta:
"""Common Partition meta info definition
通用 Partition 元数据定义
Args:
Returns:
dict: A dict contains topic meta info
""" """
return self.__dict__
def __init__(self, partition_id: int = -1):
self.partition_id = partition_id
self.error = None
def __repr__(self):
if self.error is not None:
return f"PartitionMeta({self.partition_id}, {self.error})"
else:
return f"PartitionMeta({self.partition_id})"
def __str__(self):
return f"{self.partition_id}"
class ConsumerGroupMeta:
"""Common Consumer Group meta info definition
通用 ConsumerGroup 元数据定义
"""
def __init__(self, group_id: str = ""):
self.group_id = group_id
self.members: [ConsumerGroupMemberMeta] = []
self.error = None
def __repr__(self):
if self.error is not None:
return f"ConsumerGroupMeta({self.group_id}, {len(self.members)} " \
f"members, {self.error})"
else:
return f"ConsumerGroupMeta({self.group_id}, {len(self.members)} " \
f"members)"
def __str__(self):
return self.group_id
class ConsumerGroupMemberMeta:
"""Common Consumer Group Member meta info definition
通用 ConsumerGroupMember 元数据定义
"""
def __init__(self, client_id: str = ""):
self.client_id = client_id
self.error = None
def __repr__(self):
if self.error is not None:
return f"ConsumerGroupMemberMeta({self.client_id}, {self.error})"
else:
return f"ConsumerGroupMemberMeta({self.client_id})"

View File

@ -26,8 +26,8 @@ class Producer(Connectable, Disconnectable, Registrable,
@abstractmethod @abstractmethod
def produce(self, topic_name: str, message_value: dict, def produce(self, topic_name: str, message_value: dict,
auto_mk_topic: bool = False,
callback: Callable[[Exception, Event], None] = None, callback: Callable[[Exception, Event], None] = None,
partition: int = -1,
**kwargs): **kwargs):
"""Generate one new event, then put it to event center """Generate one new event, then put it to event center
@ -37,8 +37,12 @@ class Producer(Connectable, Disconnectable, Registrable,
Args: Args:
topic_name: 主题名称 topic_name: 主题名称
message_value: 事件内容 message_value: 事件内容
auto_mk_topic: 是否在主题不存在的时候自动创建
callback(Callable[[Exception, Event], None]): 事件成功投递到事件中心回调 callback(Callable[[Exception, Event], None]): 事件成功投递到事件中心回调
partition(int): 分区号
1. 如果指定了有效分区号消息投递给指定的分区不建议
2. 传递了一个正数分区号但是无此分区将抛出异常
3. 传递了一个负数分区号比如-1则消息将使用内建的策略均衡的投
递给所有的分区建议
Examples: Examples:
>>> producer = dispatch_producer( >>> producer = dispatch_producer(
@ -48,11 +52,14 @@ class Producer(Connectable, Disconnectable, Registrable,
pass pass
@abstractmethod @abstractmethod
def flush(self): def flush(self, timeout: int = -1):
"""Flush all cached event to event center """Flush all cached event to event center
将在缓存中还未提交的所有事件都注入到事件中心当中这是一个阻塞调用 将在缓存中还未提交的所有事件都注入到事件中心当中这是一个阻塞调用
Args:
timeout: 超时等待时间单位ms<= 表示阻塞等待
Examples: Examples:
>>> producer = dispatch_producer( >>> producer = dispatch_producer(
..."redis://localhost:6379?password=123456") ..."redis://localhost:6379?password=123456")
@ -118,10 +125,7 @@ def dispatch_producer(url: str, **kwargs) -> Producer:
f"Proto '{cec_url.proto}' not exists in Cec-base-Producer." f"Proto '{cec_url.proto}' not exists in Cec-base-Producer."
) )
raise err raise err
producer_instance = Producer.protoDict[cec_url.proto]( producer_instance = Producer.protoDict[cec_url.proto](cec_url, **kwargs)
cec_url,
**kwargs
)
logger.success( logger.success(
f"Cec-base-Producer dispatch one producer instance success. " f"Cec-base-Producer dispatch one producer instance success. "
f"proto={cec_url.proto}, url={url}") f"proto={cec_url.proto}, url={url}")

View File

@ -0,0 +1,169 @@
# -*- coding: utf-8 -*- #
"""
Time 2022/8/31 23:38
Author: mingfeng (SunnyQjm)
Email mfeng@linux.alibaba.com
File common.py
Description:
"""
from redis import Redis
from ..cec_base.url import CecUrl
class StaticConst:
CEC_REDIS_PREFIX = "CEC-REDIS:"
REDIS_CEC_EVENT_VALUE_KEY = "redis-cec-event-value-key"
_REDIS_ADMIN_META_PREFIX = f"{CEC_REDIS_PREFIX}META:"
# 指示一个集合 => 保存了所有的 Stream 的key
REDIS_ADMIN_TOPIC_LIST_SET = f"{_REDIS_ADMIN_META_PREFIX}" \
f"TOPIC-LIST-SET"
# 指示一个集合 => 保存了所有的 Consumer Group 的key
REDIS_ADMIN_CONSUMER_GROUP_LIST_SET \
= f"{_REDIS_ADMIN_META_PREFIX}" \
f"CONSUMER-GROUP-LIST-SET"
# 消费组订阅列表前缀 => 消费组的订阅列表里面存储了该消费组订阅的所有主题
REDIS_ADMIN_CONSUMER_GROUP_SUB_LIST_PREFIX \
= f"{_REDIS_ADMIN_META_PREFIX}" \
f"SUB-LIST-PREFIX:"
# 指定一个所有的 STREAM key 共用的前缀,方便获取 stream 列表
REDIS_ADMIN_STREAM_KEY_PREFIX \
= f"{CEC_REDIS_PREFIX}" \
f"STREAM-PREFIX:"
# 主题元数据信息前缀
REDIS_ADMIN_TOPIC_META_PREFIX \
= f"{_REDIS_ADMIN_META_PREFIX}TOPIC-META:"
# 主题—消费组元数据信息前缀
REDIS_ADMIN_TOPIC_CONSUMER_GROUP_META_PREFIX \
= f"{_REDIS_ADMIN_META_PREFIX}TOPIC-CONSUMER-GROUP-META:"
# 主题-消费者元数据 key
TOPIC_CONSUMER_GROUP_META_KEY_LAST_ACK_ID \
= f"LAST-ACK-ID"
# 主题锁前缀
REDIS_ADMIN_TOPIC_LOCKER_PREFIX \
= f"{_REDIS_ADMIN_META_PREFIX}TOPIC-LOCKER:"
# 消费组锁前缀
REDIS_ADMIN_CONSUMER_GROUP_LOCKER_PREFIX \
= f"{_REDIS_ADMIN_META_PREFIX}CONSUMER-GROUP-LOCKER:"
# 特化参数列表:
REDIS_SPECIAL_PARM_CEC_DEFAULT_MAX_LEN = 'cec_default_max_len'
REDIS_SPECIAL_PARM_CEC_AUTO_MK_TOPIC = 'cec_auto_mk_topic'
REDIS_SPECIAL_PARM_CEC_PENDING_EXPIRE_TIME = 'cec_pending_expire_time'
_redis_special_parameter_list = [
# cec_default_max_len => 默认最大队列长度限制
# 1. 有效范围Producer
# 2. 含义:该参数指定了 Producer 将事件投递到某个具体的 Stream 中,期望该 Stream
# 最大保持的队列长度,由于 Redis stream 底层使用树形结构,精确裁剪会很影响
# 性能,所以该参数限制的是一个大致长度,实际队列可能会稍大于该值
REDIS_SPECIAL_PARM_CEC_DEFAULT_MAX_LEN,
# cec_auto_mk_topic => 自动创建主题
# 1. 有效范围Consumer
# 2. 含义:该参数指定了 Producer 在投递消息到某个 Topic 时,如果 Topic 不存在
# 是否需要自动创建 Topic。
REDIS_SPECIAL_PARM_CEC_AUTO_MK_TOPIC,
# cec_pending_expire_time =>超期转换时间间隔
# 1. 有效范围Consumer
# 2. 含义该参数指定了一个事件再待确认列表pending list中长时间未确认被自动流
# 转到组内其它消费者的的时间间隔,每个消费者在每批次消费时都会尝试将 pending
# list 中的超期事件流转给自己。
REDIS_SPECIAL_PARM_CEC_PENDING_EXPIRE_TIME,
]
_redis_special_parameters_default_value = {
REDIS_SPECIAL_PARM_CEC_DEFAULT_MAX_LEN: (int, 1000),
REDIS_SPECIAL_PARM_CEC_AUTO_MK_TOPIC: (bool, False),
REDIS_SPECIAL_PARM_CEC_PENDING_EXPIRE_TIME: (int, 5 * 60 * 1000)
}
@staticmethod
def parse_special_parameter(params: dict) -> dict:
"""
解析特化参数并将特化参数从参数列表中移除
Args:
params:
Returns:
"""
res = {}
for key in StaticConst._redis_special_parameter_list:
_type, default = \
StaticConst._redis_special_parameters_default_value[key]
res[key] = _type(params.pop(key, default))
return res
@staticmethod
def get_inner_topic_name(topic_name) -> str:
"""Get inner topic name by topic name
通过 topic_name 获取对应的 inner_topic_name
1. 事件主题在 Redis 中对应一个 STREAM
2. 本模块为所有由 cec-redis 创建的 STREAM 添加一个公共前缀作为命名空间
3. inner_topic_name = ALI-CEC-REDIS-STREAM-PREFIX:{topic_name}
Args:
topic_name: 主题名称
Returns:
"""
return f"{StaticConst.REDIS_ADMIN_STREAM_KEY_PREFIX}{topic_name}"
@staticmethod
def get_topic_name_by_inner_topic_name(inner_topic_name: str) -> str:
"""
inner_topic_name => topic_name
Args:
inner_topic_name:
Returns:
"""
return inner_topic_name[
len(StaticConst.REDIS_ADMIN_STREAM_KEY_PREFIX):]
class ClientBase:
"""
cec 客户端基类Redis* 需要集成本类本类提供一些通用的实现
"""
def __init__(self, url: CecUrl):
self._redis_version = None
self._special_params = StaticConst.parse_special_parameter(url.params)
def get_special_param(self, key: str, default=''):
return self._special_params.get(key, default)
def is_gte_6_2(self, redis_client: Redis):
"""
判断redis版本是否 >= 6.2
Returns:
"""
if not self._redis_version:
self._redis_version = redis_client.info('server')['redis_version']
return self._redis_version >= '6.2'
def is_gte_7(self, redis_client: Redis):
"""
判断redis版本是否 >= 7
Args:
redis_client:
Returns:
"""
if not self._redis_version:
self._redis_version = redis_client.info('server')['redis_version']
return self._redis_version >= '7'

View File

@ -0,0 +1,186 @@
# -*- coding: utf-8 -*- #
"""
Time 2022/8/31 17:24
Author: mingfeng (SunnyQjm)
Email mfeng@linux.alibaba.com
File consume_status_storage.py
Description:
"""
from redis import Redis
from redis.client import Pipeline
from ..cec_base.event import Event
from .common import StaticConst
from typing import Union
class ConsumeStatusStorage:
"""A consumer group consume status storage
一个存储消费组消费状态的存储器
1. 设计该存储器主要是为了计算 LAG消息堆积数
2. Redis 7.0 支持使用 xinfo group 得到
- 'lag' => stream 中任然等待被交付给指定消费组中消费者的消息数量包括已交付未确认的
- 'pending' => stream 中已经交付给指定消费组中消费者但是未确认的消息数量
- 'lag' + 'pending' => 即可得到主题中还未被消费者消费或者确认的消息数
3. Redis 版本 < 7 没有办法获取上述数据因此需要使用本类来兼容实现
兼容的思路如下
1. 使用 Redis 提供的 zsets有序列表数据结构为每个<topic, group>维护一个 zsets
ID 转换成分数存储
2. 每次 RedisConsumer 使用 consume 获取消息时调用 xinfo stream 获得
- 'length' => 队列当前的长度
- 'fist-entry' => 取出队列中最早的消息的 ID
3. 然后调用 ZREMRANGEBYSCORE 删除掉所有比最早的消息 ID 还小的消息防止过度堆积
4. 每个消息 ACK 的时候将对应的 ID 插入到对应的 zsets 当中
5. 接着本类将提供下列静态方法
- get_lag(topic, group) => 得到主题的 LAG
"""
_CEC_REDIS_CONSUME_STATUS_STORAGE_PREFIX = \
f"{StaticConst.CEC_REDIS_PREFIX}CONSUME_STATUS_STORAGE:"
max_float = float("inf") # 无限大 比所有数大
min_float = float("-inf") # 无限小 比所有数小
def __init__(self, _redis_client: Redis, stream: str, group_id: str):
self._redis_client = _redis_client
# 得到 Redis 服务器的版本号
self._version = self._redis_client.info('server')['redis_version']
# 判断是 Redis 版本是否大于 7
self._is_gt_version_7 = self._version >= '7'
self.stream = stream
self.inner_stream_key = StaticConst.get_inner_topic_name(stream)
self.group_id = group_id
def update(self):
"""
使用 xinfo stream 得到 stream 里面最早的消息的 ID并据此删除对应 zsets 中的数据
Returns:
"""
if self._is_gt_version_7:
return False
stream_info = self._redis_client.xinfo_stream(self.inner_stream_key)
if 'first-entry' in stream_info:
min_id = stream_info['first-entry'][0]
min_score = ConsumeStatusStorage._get_score_by_id(min_id)
self._redis_client.zremrangebyscore(
ConsumeStatusStorage._get_z_set_key(self.stream,
self.group_id),
0,
min_score - 1
)
return True
return False
def do_after_ack_by_pl(self, pipeline: Pipeline, event: Event):
"""
在某个消息被 ACK 执行本方法将其 ID 存储到 zset 当中
Args:
pipeline:
event:
References:
https://redis.io/commands/zadd/
Returns:
"""
if self._is_gt_version_7:
return False
pipeline.zadd(
ConsumeStatusStorage._get_z_set_key(self.stream,
self.group_id),
{
event.event_id: ConsumeStatusStorage._get_score_by_id(
event.event_id)
},
)
@staticmethod
def get_already_ack_count(redis_client: Union[Redis, Pipeline],
stream: str,
group_id: str, ):
"""
得到指定 <stream, group> 目前已经确认的消息数量可以用来计算 LAG
Args:
redis_client:
stream:
group_id:
References
https://redis.io/commands/zcount/
Returns:
"""
return redis_client.zcount(
ConsumeStatusStorage._get_z_set_key(stream, group_id),
ConsumeStatusStorage.min_float,
ConsumeStatusStorage.max_float
)
@staticmethod
def destroy_by_stream_group(redis_or_pl: Union[Redis, Pipeline],
stream: str,
group_id: str):
"""
删除 <stream, group> 对应的 zset => 通常在某个消费组离开 stream 时调用
Args:
redis_or_pl:
stream:
group_id:
Returns:
"""
return redis_or_pl.delete(
ConsumeStatusStorage._get_z_set_key(stream, group_id))
@staticmethod
def destroy_by_stream(redis_client: Redis, stream: str):
"""
删除 <stream, *> 对应的所有 zset => 通常在某个 stream 被删除时调用
Args:
redis_or_pl:
stream:
Returns:
"""
keys = redis_client.keys(
f"{ConsumeStatusStorage._CEC_REDIS_CONSUME_STATUS_STORAGE_PREFIX}" \
f"{stream}:*")
if len(keys) > 0:
return redis_client.delete(*keys)
return 0
@staticmethod
def _get_score_by_id(message_id: str):
"""
根据 Redis 自动生成的 ID 转换成浮点数'1526985054069-0' => 1526985054069.0
Args:
message_id:
Returns:
"""
return float(message_id.replace('-', '.'))
@staticmethod
def _get_z_set_key(stream: str, group_id: str):
"""
获取对应 <stream, group> 用于存储 ID zset key
Args:
stream:
group_id:
Returns:
"""
return \
f"{ConsumeStatusStorage._CEC_REDIS_CONSUME_STATUS_STORAGE_PREFIX}" \
f"{stream}:{group_id}"

View File

@ -6,42 +6,47 @@ Email mfeng@linux.alibaba.com
File redis_admin.py File redis_admin.py
Description: Description:
""" """
from ..cec_base.admin import Admin import json
import sys
from typing import Optional
from ..cec_base.admin import Admin, ConsumeStatusItem
from ..cec_base.admin import TopicNotExistsException, TopicAlreadyExistsException from ..cec_base.admin import TopicNotExistsException, TopicAlreadyExistsException
from ..cec_base.admin import ConsumerGroupNotExistsException from ..cec_base.admin import ConsumerGroupNotExistsException
from ..cec_base.admin import ConsumerGroupAlreadyExistsException from ..cec_base.admin import ConsumerGroupAlreadyExistsException
from ..cec_base.base import raise_if_not_ignore from ..cec_base.base import raise_if_not_ignore, CecException
from ..cec_base.meta import TopicMeta from ..cec_base.event import Event
from ..cec_base.meta import TopicMeta, PartitionMeta, ConsumerGroupMeta, \
ConsumerGroupMemberMeta
from ..cec_base.log import LoggerHelper from ..cec_base.log import LoggerHelper
from ..cec_base.url import CecUrl from ..cec_base.url import CecUrl
from redis import Redis from redis import Redis
from redis.exceptions import ResponseError from redis.exceptions import ResponseError
from .utils import do_connect_by_cec_url from .utils import do_connect_by_cec_url
from loguru import logger from loguru import logger
from itertools import chain
from .consume_status_storage import ConsumeStatusStorage
from .common import StaticConst, ClientBase
class RedisAdmin(Admin): class RedisAdmin(Admin, ClientBase):
"""A redis-based execution module implement of Admin """A redis-based execution module implement of Admin
一个基于 Redis 实现的执行模块中的 Admin 实现 一个基于 Redis 实现的执行模块中的 Admin 实现
""" """
# 指示一个集合 => 保存了所有的 Stream 的key
_REDIS_ADMIN_TOPIC_LIST_SET = "ali-cec-redis-admin-topic-list-set"
# 指示一个集合 => 保存了所有的 Consumer Group 的key
_REDIS_ADMIN_CONSUMER_GROUP_LIST_SET \
= "ali-cec-redis-admin-consumer-group-list-set"
# 指定一个特殊的前缀 => 拼接上 Topic 的名字,用于存储该 Topic 的元数据meta信息
_REDIS_ADMIN_TOPIC_META_KEY_PREFIX \
= "ali-cec-redis-admin-topic-meta-key-prefix"
def __init__(self, url: CecUrl): def __init__(self, url: CecUrl):
Admin.__init__(self)
ClientBase.__init__(self, url)
self._redis_client: Redis = None self._redis_client: Redis = None
self._current_url: str = "" self._current_url: str = ""
self.connect_by_cec_url(url) self.connect_by_cec_url(url)
####################################################################
# 事件中心接口实现
####################################################################
@staticmethod @staticmethod
@logger.catch(reraise=True) @logger.catch(reraise=True)
def static_create_topic(redis_client: Redis, topic_name: str = "", def static_create_topic(redis_client: Redis, topic_name: str = "",
@ -54,50 +59,45 @@ class RedisAdmin(Admin):
f"{topic_name}, num_partitions={num_partitions}" f"{topic_name}, num_partitions={num_partitions}"
f", replication_factor={replication_factor}," f", replication_factor={replication_factor},"
f" expire_time={expire_time}>.") f" expire_time={expire_time}>.")
# 使用set给create/del操作加锁防止并发场景下重复创建删除问题 # 内部表征 Topic 的 Stream 的 Key拼接了特殊的前缀作为命名空间
if redis_client.set( inner_topic_name = StaticConst.get_inner_topic_name(
f"{topic_name}_ali-ece-redis-create-and-del-topic", topic_name)
1, nx=True, ex=10) == 0: result = True
return raise_if_not_ignore(ignore_exception, try:
TopicAlreadyExistsException( # 加锁
f"Someone else is creating or" if not RedisAdmin._lock_topic(redis_client, topic_name,
f" deleting this topic." ignore_exception):
)) return False
# 1. 判断 Topic 是否存在 # 1. 判断 Topic 是否存在
if RedisAdmin.static_is_topic_exist(redis_client, topic_name): if RedisAdmin.static_is_topic_exist(redis_client,
return raise_if_not_ignore(ignore_exception, topic_name):
TopicAlreadyExistsException( raise TopicAlreadyExistsException(
f"Topic {topic_name} already " f"Topic {topic_name} already "
f"exists." f"exists."
)) )
else:
# 2. 使用 xadd 触发 stream 创建 # 2. 使用 xadd 触发 stream 创建
event_id = redis_client.xadd(topic_name, { event_id = redis_client.xadd(inner_topic_name, {
"test": 1 "test": 1
}) })
pl = redis_client.pipeline()
# 3. 删除刚才添加的测试事件,清空 stream # 3. 删除刚才添加的测试事件,清空 stream
redis_client.xdel(topic_name, event_id) pl.xdel(inner_topic_name, event_id)
# 4. 将新建的 Topic 加入到 Topic 集合当中(便于获取所有 Topic 列表) # 4. 将新建的 Topic 加入到 Topic 集合当中(便于获取所有 Topic 列表)
redis_client.sadd(RedisAdmin._REDIS_ADMIN_TOPIC_LIST_SET, pl.sadd(StaticConst.REDIS_ADMIN_TOPIC_LIST_SET,
topic_name) inner_topic_name)
pl.execute()
except Exception as e:
raise_if_not_ignore(ignore_exception, e)
finally:
# 解锁
RedisAdmin._unlock_topic(redis_client, topic_name)
# 5. 构造并保存元数据信息
redis_client.hset(
f"{RedisAdmin._REDIS_ADMIN_TOPIC_META_KEY_PREFIX}-{topic_name}",
mapping={
"topic_name": topic_name,
"num_partitions": num_partitions,
"replication_factor": replication_factor,
"expire_time": expire_time
}
)
# 释放锁
redis_client.delete(
f"{topic_name}_ali-ece-redis-create-and-del-topic")
LoggerHelper.get_lazy_logger().success( LoggerHelper.get_lazy_logger().success(
f"{redis_client} create_topic '{topic_name}' successfully.") f"{redis_client} create_topic '{topic_name}' successfully.")
return True return result
@logger.catch(reraise=True) @logger.catch(reraise=True)
def create_topic(self, topic_name: str = "", num_partitions: int = 1, def create_topic(self, topic_name: str = "", num_partitions: int = 1,
@ -112,7 +112,7 @@ class RedisAdmin(Admin):
3. 最后将刚才插入的测试数据删掉清空 stream 3. 最后将刚才插入的测试数据删掉清空 stream
4. topic_name 加入到特定的 set 集合当中 4. topic_name 加入到特定的 set 集合当中
该集合包含了所有通过 CEC 创建的 Topic 名称列表 该集合包含了所有通过 CEC 创建的 Topic 名称列表
5. 构造并保存元数据信息
TODO: 此处需要进一步考虑是否使用事务防止中间某一步执行出错状态不一致 TODO: 此处需要进一步考虑是否使用事务防止中间某一步执行出错状态不一致
Args: Args:
@ -145,13 +145,15 @@ class RedisAdmin(Admin):
Raises: Raises:
TopicAlreadyExistsException: If topic already exists TopicAlreadyExistsException: If topic already exists
CecException: Get lock failed
Examples: Examples:
>>> admin = dispatch_admin("redis://localhost:6379") >>> admin = dispatch_admin("redis://localhost:6379")
>>> admin.create_topic("test_topic") >>> admin.create_topic("test_topic")
True True
""" """
return RedisAdmin.static_create_topic(self._redis_client, topic_name, return RedisAdmin.static_create_topic(self._redis_client,
topic_name,
num_partitions, num_partitions,
replication_factor, replication_factor,
ignore_exception, ignore_exception,
@ -164,32 +166,42 @@ class RedisAdmin(Admin):
LoggerHelper.get_lazy_logger().debug( LoggerHelper.get_lazy_logger().debug(
f"{redis_client} try to del_topic <topic_name={topic_name}>.") f"{redis_client} try to del_topic <topic_name={topic_name}>.")
# 使用set给create/del操作加锁防止并发场景下重复创建删除问题 inner_topic_name = StaticConst.get_inner_topic_name(
if redis_client.set( topic_name)
f"{topic_name}_ali-ece-redis-create-and-del-topic",
1, nx=True, ex=10) == 0: try:
# 加锁
if not RedisAdmin._lock_topic(redis_client, topic_name,
ignore_exception):
return False
# 1. 判断是否存在
if not RedisAdmin.static_is_topic_exist(redis_client,
topic_name):
raise_if_not_ignore(ignore_exception, raise_if_not_ignore(ignore_exception,
TopicNotExistsException( TopicNotExistsException(
f"Someone else is creating or deleting "
f"this topic."
))
# 1. 判断是否存在
if not RedisAdmin.static_is_topic_exist(redis_client, topic_name):
raise_if_not_ignore(ignore_exception, TopicNotExistsException(
f"Topic {topic_name} not exists." f"Topic {topic_name} not exists."
)) ))
pl = redis_client.pipeline()
# 2. 删除对应的 streamtopic # 2. 删除对应的 streamtopic
redis_client.delete(topic_name) pl.delete(inner_topic_name)
# 3. 将当前 topic 从 topic 列表中移除 # 3. 将当前 topic 从 topic 列表中移除
redis_client.srem(RedisAdmin._REDIS_ADMIN_TOPIC_LIST_SET, topic_name) pl.srem(StaticConst.REDIS_ADMIN_TOPIC_LIST_SET,
inner_topic_name)
# 4. 删除 topic 的元数据信息 pl.execute()
redis_client.delete(
f"{RedisAdmin._REDIS_ADMIN_TOPIC_META_KEY_PREFIX}-{topic_name}") # 4. 清除 TOPIC 相关的元数据信息
# 释放锁 RedisAdmin.del_topic_meta(redis_client, topic_name)
redis_client.delete(
f"{topic_name}_ali-ece-redis-create-and-del-topic") # 5. 删除 TOPIC 关联的用于存储消费状态的结构
ConsumeStatusStorage.destroy_by_stream(redis_client, topic_name)
except Exception as e:
raise_if_not_ignore(ignore_exception, e)
finally:
# 解锁
RedisAdmin._unlock_topic(redis_client, topic_name)
LoggerHelper.get_lazy_logger().success( LoggerHelper.get_lazy_logger().success(
f"{redis_client} del_topic '{topic_name}' successfully.") f"{redis_client} del_topic '{topic_name}' successfully.")
@ -202,6 +214,7 @@ class RedisAdmin(Admin):
删除一个 Topic => 对应到 Redis 应该是删除一个 Stream 删除一个 Topic => 对应到 Redis 应该是删除一个 Stream
1. 直接删除 Stream 对应的key即可 1. 直接删除 Stream 对应的key即可
2. 清楚一些相关的元数据信息
Args: Args:
topic_name: 主题名字主题的唯一标识 topic_name: 主题名字主题的唯一标识
@ -223,8 +236,10 @@ class RedisAdmin(Admin):
@staticmethod @staticmethod
@logger.catch(reraise=True) @logger.catch(reraise=True)
def static_is_topic_exist(redis_client: Redis, topic_name: str) -> bool: def static_is_topic_exist(redis_client: Redis,
res = redis_client.type(topic_name) == 'stream' topic_name: str) -> bool:
res = redis_client.type(
StaticConst.get_inner_topic_name(topic_name)) == 'stream'
LoggerHelper.get_lazy_logger().debug( LoggerHelper.get_lazy_logger().debug(
f"Is topic {topic_name} exists? => {res}.") f"Is topic {topic_name} exists? => {res}.")
return res return res
@ -247,16 +262,26 @@ class RedisAdmin(Admin):
>>> admin.is_topic_exist("test_topic") >>> admin.is_topic_exist("test_topic")
True True
""" """
return RedisAdmin.static_is_topic_exist(self._redis_client, topic_name) return RedisAdmin.static_is_topic_exist(self._redis_client,
topic_name)
@staticmethod @staticmethod
@logger.catch(reraise=True) @logger.catch(reraise=True)
def static_get_topic_list(redis_client: Redis) -> [str]: def static_get_topic_list(redis_client: Redis) -> [str]:
res = [topic_name for topic_name in res = redis_client.smembers(StaticConst.REDIS_ADMIN_TOPIC_LIST_SET)
redis_client.smembers(RedisAdmin._REDIS_ADMIN_TOPIC_LIST_SET)] topics = []
for inner_topic_name in res:
topic_meta = TopicMeta(
StaticConst.get_topic_name_by_inner_topic_name(
inner_topic_name))
topic_meta.partitions = {
0: PartitionMeta(0)
}
topics.append(topic_meta)
LoggerHelper.get_lazy_logger().debug( LoggerHelper.get_lazy_logger().debug(
f"get_topic_list => {res}.") f"get_topic_list => {res}.")
return res return topics
@logger.catch(reraise=True) @logger.catch(reraise=True)
def get_topic_list(self) -> [str]: def get_topic_list(self) -> [str]:
@ -280,7 +305,7 @@ class RedisAdmin(Admin):
@staticmethod @staticmethod
@logger.catch(reraise=True) @logger.catch(reraise=True)
def get_meta_info(client: Redis, topic_name: str) -> TopicMeta: def get_meta_info(client: Redis, topic_name: str) -> Optional[dict]:
"""Get topic's meta info """Get topic's meta info
获取特定 Topic 的元数据信息 获取特定 Topic 的元数据信息
@ -292,11 +317,17 @@ class RedisAdmin(Admin):
Returns: Returns:
TopicMeta: the topic meta info object TopicMeta: the topic meta info object
""" """
values = client.hgetall( try:
f"{RedisAdmin._REDIS_ADMIN_TOPIC_META_KEY_PREFIX}-{topic_name}") res = client.xinfo_stream(
StaticConst.get_inner_topic_name(topic_name))
except ResponseError:
return None
LoggerHelper.get_lazy_logger().debug( LoggerHelper.get_lazy_logger().debug(
f"get_meta_info => {values}.") f"get_meta_info => {res}.")
return TopicMeta(**values) return {
'topic_name': topic_name,
**res
}
@staticmethod @staticmethod
@logger.catch(reraise=True) @logger.catch(reraise=True)
@ -308,15 +339,12 @@ class RedisAdmin(Admin):
f"<consumer_group_id={consumer_group_id}>.") f"<consumer_group_id={consumer_group_id}>.")
# 使用set给create/del操作加锁防止并发场景下重复创建删除问题 # 使用set给create/del操作加锁防止并发场景下重复创建删除问题
if redis_client.set( try:
f"{consumer_group_id}_ali-ece-redis-create-and-del-consumer-" # 加锁
f"group-id", if not RedisAdmin._lock_consumer_group(redis_client,
1, nx=True, ex=10) == 0: consumer_group_id,
raise_if_not_ignore(ignore_exception, ignore_exception):
ConsumerGroupAlreadyExistsException( return False
f"Someone else is creating or deleting "
f"this consumer group."
))
if RedisAdmin.static_is_consumer_group_exist(redis_client, if RedisAdmin.static_is_consumer_group_exist(redis_client,
consumer_group_id): consumer_group_id):
if ignore_exception: if ignore_exception:
@ -325,13 +353,15 @@ class RedisAdmin(Admin):
raise ConsumerGroupAlreadyExistsException( raise ConsumerGroupAlreadyExistsException(
f"Consumer group {consumer_group_id} already exists.") f"Consumer group {consumer_group_id} already exists.")
# 添加到消费组key集合当中 # 添加到消费组key集合当中
redis_client.sadd(RedisAdmin._REDIS_ADMIN_CONSUMER_GROUP_LIST_SET, redis_client.sadd(
StaticConst.REDIS_ADMIN_CONSUMER_GROUP_LIST_SET,
consumer_group_id)
except Exception as e:
raise_if_not_ignore(ignore_exception, e)
finally:
# 解锁
RedisAdmin._unlock_consumer_group(redis_client,
consumer_group_id) consumer_group_id)
# 释放锁
redis_client.delete(
f"{consumer_group_id}_ali-ece-redis-create-and-del-consumer-"
f"group-id")
LoggerHelper.get_lazy_logger().debug( LoggerHelper.get_lazy_logger().debug(
f"{redis_client} create_consumer_group " f"{redis_client} create_consumer_group "
f"'{consumer_group_id}' successfully.") f"'{consumer_group_id}' successfully.")
@ -372,22 +402,20 @@ class RedisAdmin(Admin):
@staticmethod @staticmethod
@logger.catch(reraise=True) @logger.catch(reraise=True)
def static_del_consumer_group(redis_client: Redis, consumer_group_id: str, def static_del_consumer_group(redis_client: Redis,
consumer_group_id: str,
ignore_exception: bool = False) -> bool: ignore_exception: bool = False) -> bool:
LoggerHelper.get_lazy_logger().debug( LoggerHelper.get_lazy_logger().debug(
f"{redis_client} try to del_consumer_group " f"{redis_client} try to del_consumer_group "
f"<consumer_group_id={consumer_group_id}>.") f"<consumer_group_id={consumer_group_id}>.")
# 使用set给create/del操作加锁防止并发场景下重复创建删除问题 try:
if redis_client.set( # 加锁
f"{consumer_group_id}_ali-ece-redis-create-and-del-consumer-" if not RedisAdmin._lock_consumer_group(
f"group-id", redis_client, consumer_group_id, ignore_exception
1, nx=True, ex=10) == 0: ):
raise_if_not_ignore(ignore_exception, return False
ConsumerGroupNotExistsException(
f"Someone else is creating or deleting "
f"this consumer group."
))
# 1. 首先判断消费组是否存在,不存在则根据情况抛出异常 # 1. 首先判断消费组是否存在,不存在则根据情况抛出异常
if not RedisAdmin.static_is_consumer_group_exist(redis_client, if not RedisAdmin.static_is_consumer_group_exist(redis_client,
consumer_group_id): consumer_group_id):
@ -398,23 +426,38 @@ class RedisAdmin(Admin):
)) ))
# 2. 从消费组集合中移除 # 2. 从消费组集合中移除
redis_client.srem(RedisAdmin._REDIS_ADMIN_CONSUMER_GROUP_LIST_SET, redis_client.srem(
StaticConst.REDIS_ADMIN_CONSUMER_GROUP_LIST_SET,
consumer_group_id) consumer_group_id)
# 3. 销毁当前消费组关联的所有stream中的同名消费组结构 # 3. 销毁当前消费组关联的所有stream中的同名消费组结构
stream = redis_client.lpop( streams = redis_client.lpop(
f"{consumer_group_id}__ali-cec-redis-consumer-group-list") RedisAdmin.get_sub_list_key(consumer_group_id),
while stream is not None: sys.maxsize
# 如果主题没有被删除则将消费组从中移出否则什么都不做只是pop )
if RedisAdmin.static_is_topic_exist(redis_client, stream): pl = redis_client.pipeline()
redis_client.xgroup_destroy(stream, consumer_group_id) for stream in streams:
stream = redis_client.lpop( # 取消订阅主题
f"{consumer_group_id}__ali-cec-redis-consumer-group-list") pl.xgroup_destroy(stream, consumer_group_id)
# 释放锁 # 删除对应的 zset
redis_client.delete( ConsumeStatusStorage.destroy_by_stream_group(pl, stream,
f"{consumer_group_id}_ali-ece-redis-create-and-del-consumer-" consumer_group_id)
f"group-id") pl.execute()
for stream in streams:
# 清除主题-消费组相关的元数据信息
RedisAdmin.del_topic_consumer_group_meta(redis_client, stream,
consumer_group_id)
except ConsumerGroupNotExistsException as e:
raise_if_not_ignore(ignore_exception, e)
except Exception as e:
print(e)
# 此处忽略 Pipeline 执行清理操作可能产生的错误
pass
finally:
# 解锁
RedisAdmin._unlock_consumer_group(redis_client,
consumer_group_id)
LoggerHelper.get_lazy_logger().debug( LoggerHelper.get_lazy_logger().debug(
f"{redis_client} del_consumer_group " f"{redis_client} del_consumer_group "
@ -455,7 +498,7 @@ class RedisAdmin(Admin):
def static_is_consumer_group_exist(redis_client: Redis, def static_is_consumer_group_exist(redis_client: Redis,
consumer_group_id: str) -> bool: consumer_group_id: str) -> bool:
res = redis_client.sismember( res = redis_client.sismember(
RedisAdmin._REDIS_ADMIN_CONSUMER_GROUP_LIST_SET, StaticConst.REDIS_ADMIN_CONSUMER_GROUP_LIST_SET,
consumer_group_id) consumer_group_id)
LoggerHelper.get_lazy_logger().debug( LoggerHelper.get_lazy_logger().debug(
@ -483,21 +526,46 @@ class RedisAdmin(Admin):
>>> admin.is_consumer_group_exist("test_group") >>> admin.is_consumer_group_exist("test_group")
True True
""" """
return RedisAdmin.static_is_consumer_group_exist(self._redis_client, return RedisAdmin.static_is_consumer_group_exist(
self._redis_client,
consumer_group_id) consumer_group_id)
@staticmethod @staticmethod
@logger.catch(reraise=True) @logger.catch(reraise=True)
def static_get_consumer_group_list(redis_client: Redis) -> [str]: def static_get_consumer_group_list(redis_client: Redis) \
res = [topic_name for topic_name in -> [ConsumerGroupMeta]:
redis_client.smembers(
RedisAdmin._REDIS_ADMIN_CONSUMER_GROUP_LIST_SET)] res = redis_client.smembers(
StaticConst.REDIS_ADMIN_CONSUMER_GROUP_LIST_SET)
group_metas = []
for group_id in res:
group_meta = ConsumerGroupMeta(group_id)
try:
# 得到改消费组所有订阅的主题信息
sub_topics = redis_client.lrange(
RedisAdmin.get_sub_list_key(group_id), 0, -1
)
# 遍历所有的主题,得到所有的成员
pl = redis_client.pipeline(transaction=True)
for topic in sub_topics:
pl.xinfo_consumers(topic, group_id)
# {"name":"Alice","pending":1,"idle":9104628}
for consumer in chain.from_iterable(pl.execute()):
group_meta.members.append(
ConsumerGroupMemberMeta(consumer['name']))
except Exception as e:
group_meta.error = e
else:
group_metas.append(group_meta)
LoggerHelper.get_lazy_logger().debug( LoggerHelper.get_lazy_logger().debug(
f"get_consumer_group_list => {res}.") f"get_consumer_group_list => {res}.")
return res return group_metas
@logger.catch(reraise=True) @logger.catch(reraise=True)
def get_consumer_group_list(self) -> [str]: def get_consumer_group_list(self) -> [ConsumerGroupMeta]:
"""Get consumer group list """Get consumer group list
获取消费组列表 获取消费组列表
@ -516,7 +584,166 @@ class RedisAdmin(Admin):
>>> admin.get_consumer_group_list() >>> admin.get_consumer_group_list()
['test_group'] ['test_group']
""" """
return RedisAdmin.static_get_consumer_group_list(self._redis_client) return RedisAdmin.static_get_consumer_group_list(
self._redis_client)
@logger.catch(reraise=True)
def get_consume_status(self, topic: str, consumer_group_id: str = "",
partition: int = 0) -> [ConsumeStatusItem]:
"""Get consumption info for specific <topic, consumer_group, partition>
获取特定消费者组对某个主题下的特定分区的消费情况应包含以下数据
1. 最小ID最小 offset=> xinfo stream (first-entry)
2. 最大ID最大 offset=> xinfo stream (last-entry)
3. 分区中存储的事件总数包括已消费的和未消费的=> xlen / xinfo stream (length)
4. 最后一个当前消费组在该分区已确认的事件ID最后一次消费者确认的事件的ID
=> 使用 Redis 的一个主题相关的 key 存储了最后一次ack的ID从中提取即可
5. 分区的消息堆积数量 LAG已经提交到该分区但是没有被当前消费者消费或确认的事件数量
=> xinfo stream (entries-added) 可以得到历史加入到主题的事件数量
=> xinfo group (entries-read) 可以得到当前消费组已经读取的事件数量
=> 两者相减能得到消息堆积数量
Args:
topic: 主题名字
consumer_group_id: 消费组ID
1. 如果 consumer_group_id 为空字符串或者None则返回订阅了该主题的所有
消费组的消费情况=> 此时 partition 参数无效将获取所有分区的消费数据
2. 如果 consumer_group_id 为无效的组ID则抛出异常
3. 如果 consumer_group_id 为有效的组ID则只获取该消费组的消费情况
partition: 分区IDRedis不支持分区因此此参数在 cec-redis 的实现里面只有一个合法值0
1. 如果 partition 指定有效非负整数 => 返回指定分区的消费情况
2. 如果 partition 指定无效非负整数 => 抛出异常
3. 如果 partition 指定负数 => 返回当前主题下所有分区的消费情况
Raises:
CecException
Returns:
"""
inner_topic_name = StaticConst.get_inner_topic_name(topic)
# 使用 xinfo stream 获取主题信息
try:
stream_info = self._redis_client.xinfo_stream(inner_topic_name)
min_id, max_id, length, entries_added = None, None, 0, 0
if 'first-entry' in stream_info:
min_id = stream_info['first-entry'][0]
if 'last-entry' in stream_info:
max_id = stream_info['last-entry'][0]
if 'length' in stream_info:
length = stream_info['length']
groups = self._redis_client.xinfo_groups(inner_topic_name)
if consumer_group_id != '' and consumer_group_id is not None:
select_group = None
# 尝试获取指定消费组的消费信息
for group in groups:
if group.get('name') == consumer_group_id:
select_group = group
break
if select_group is None:
# 消费组不存在
raise CecException(
f"Consumer group {consumer_group_id} not exists or did "
f"not subscribe topic {topic}")
# 由于目前 cec-redis 的实现不支持分区,因此每个主题有且只有一个分区号
# 并且分区号为0如果在指定了消费组的情况下传入的分区号 <= 0视为有效
# 传入的分区号 > 0 视为无效
if partition > 0:
raise CecException(
f"Topic {topic} did not contains partition {partition}"
)
# 此处只需将指定消费组的消费情况返回即可
last_ack_id = self.get_topic_consumer_group_meta(
self._redis_client, topic, select_group.get('name'),
StaticConst.TOPIC_CONSUMER_GROUP_META_KEY_LAST_ACK_ID
)
# 获取 LAG
if self.is_gte_7(self._redis_client):
lag = select_group['lag'] + select_group['pending']
else:
lag = ConsumeStatusStorage.get_already_ack_count(
self._redis_client, topic, consumer_group_id
)
# 返回指定消费组的消费情况
return [
ConsumeStatusItem(
topic, consumer_group_id, 0,
min_id, max_id, length,
last_ack_id, lag
)
]
else:
# 获取所有消费组的消费情况(此时 partition 参数无效)
res, counts_map = [], {}
if not self.is_gte_7(self._redis_client):
# 如果 Redis 版本小于7将使用 ConsumeStatusStorage 获取 lag
pl = self._redis_client.pipeline()
for group in groups:
ConsumeStatusStorage.get_already_ack_count(
pl, topic, group.get('name')
)
counts = pl.execute()
for i in range(len(groups)):
counts_map[groups[i].get('name')] = counts[i]
for group in groups:
last_ack_id = self.get_topic_consumer_group_meta(
self._redis_client, topic, group.get('name'),
StaticConst.TOPIC_CONSUMER_GROUP_META_KEY_LAST_ACK_ID
)
# 获取 LAG
if 'lag' in group and 'pending' in group:
lag = group['lag'] + group['pending']
else:
lag = length - counts_map[group.get('name')]
res.append(ConsumeStatusItem(
topic, group['name'], 0,
min_id, max_id, length,
last_ack_id, lag
))
return res
except Exception as e:
raise CecException(e)
@logger.catch(reraise=True)
def get_event_list(self, topic: str, partition: int, offset: str,
count: int) -> [Event]:
""" Get event list for specific <topic, partition>
获取特定主题在指定分区下的消息列表 => Redis 中使用 xrange 命令获取 stream 中的消息
1. offset count 用于分页
Args:
topic: 主题名字
partition: 分区ID => Redis 中无分区因此次参数无效
offset: 偏移希望读取在该 ID 之后的消息
count: 最大读取数量
References:
https://redis.io/commands/xrange/
Returns:
"""
inner_topic_name = StaticConst.get_inner_topic_name(topic)
messages = self._redis_client.xrange(
inner_topic_name,
min=f"({offset}",
max='+',
count=count
)
res = []
for message in messages:
message_content = json.loads(
message[1][StaticConst.REDIS_CEC_EVENT_VALUE_KEY])
res.append(Event(message_content, message[0]))
return res
@staticmethod @staticmethod
@logger.catch(reraise=True) @logger.catch(reraise=True)
@ -534,11 +761,14 @@ class RedisAdmin(Admin):
Returns: Returns:
bool: True if successfully, False otherwise. bool: True if successfully, False otherwise.
""" """
inner_topic_name = StaticConst.get_inner_topic_name(stream)
try: try:
LoggerHelper.get_lazy_logger().debug( LoggerHelper.get_lazy_logger().debug(
f"try to add consumer group '{consumer_group_id}" f"try to add consumer group '{consumer_group_id}"
f"' to topic '{stream}'.") f"' to topic '{stream}'.")
redis_client.xgroup_create(stream, consumer_group_id, id="0-0") redis_client.xgroup_create(
inner_topic_name,
consumer_group_id, id="0-0")
except ResponseError: except ResponseError:
# 消费组已存在 # 消费组已存在
LoggerHelper.get_lazy_logger().debug( LoggerHelper.get_lazy_logger().debug(
@ -550,8 +780,8 @@ class RedisAdmin(Admin):
else: else:
# 消费组创建成功,进行关联 # 消费组创建成功,进行关联
redis_client.lpush( redis_client.lpush(
f"{consumer_group_id}__ali-cec-redis-consumer-group-list", RedisAdmin.get_sub_list_key(consumer_group_id),
stream inner_topic_name
) )
LoggerHelper.get_lazy_logger().debug( LoggerHelper.get_lazy_logger().debug(
f"Add consumer group '{consumer_group_id}" f"Add consumer group '{consumer_group_id}"
@ -594,10 +824,6 @@ class RedisAdmin(Admin):
cec_url = CecUrl.parse(url) cec_url = CecUrl.parse(url)
return self.connect_by_cec_url(cec_url) return self.connect_by_cec_url(cec_url)
@logger.catch()
def __del__(self):
self.disconnect()
@logger.catch(reraise=True) @logger.catch(reraise=True)
def disconnect(self): def disconnect(self):
"""Disconnect from redis server """Disconnect from redis server
@ -613,23 +839,251 @@ class RedisAdmin(Admin):
LoggerHelper.get_lazy_logger().success( LoggerHelper.get_lazy_logger().success(
f"{self} disconnect from '{self._current_url}' successfully.") f"{self} disconnect from '{self._current_url}' successfully.")
####################################################################
# 一些辅助函数
####################################################################
@staticmethod
def get_topic_consumer_group_meta_info_key(topic: str, group_id: str,
key: str):
return f"{StaticConst.REDIS_ADMIN_TOPIC_CONSUMER_GROUP_META_PREFIX}" \
f"{topic + ':' if topic is not None else ''}" \
f"{group_id + ':' if group_id is not None else ''}" \
f"{key + ':' if key is not None else ''}"
@staticmethod
def get_topic_meta_info_key(topic: str, key: str):
return f"{StaticConst.REDIS_ADMIN_TOPIC_META_PREFIX}" \
f"{topic + ':' if topic is not None else ''}" \
f"{key + ':' if key is not None else ''}"
@staticmethod
def get_sub_list_key(group_id: str) -> str:
return f"{StaticConst.REDIS_ADMIN_CONSUMER_GROUP_SUB_LIST_PREFIX}" \
f"{group_id}"
@staticmethod
def store_meta(redis_client: Redis, key: str, value: str):
return redis_client.set(key, value)
@staticmethod
def get_meta(redis_client: Redis, key: str):
return redis_client.get(key)
@staticmethod
def del_meta(redis_client: Redis, prefix: str):
next_cursor = 0
while True:
next_cursor, key_list = redis_client.scan(
next_cursor,
match=f"{prefix}*",
count=100
)
if len(key_list) > 0:
redis_client.delete(*key_list)
if next_cursor == 0:
break
return True
@staticmethod
def store_topic_consumer_group_meta(redis_client: Redis, topic: str,
key: str, group_id: str, value):
return RedisAdmin.store_meta(
redis_client,
RedisAdmin.get_topic_consumer_group_meta_info_key(
topic, group_id, key
),
value
)
@staticmethod
def store_topic_meta(redis_client: Redis, topic: str, key: str, value):
"""Store topic meta info
存储主题相关的元数据信息
Args:
redis_client:
topic:
key:
value:
Returns:
"""
return RedisAdmin.store_meta(
redis_client,
RedisAdmin.get_topic_meta_info_key(topic, key),
value
)
@staticmethod
def get_topic_consumer_group_meta(redis_client: Redis, topic: str,
group_id: str, key: str):
return RedisAdmin.get_meta(
redis_client,
RedisAdmin.get_topic_consumer_group_meta_info_key(
topic, group_id, key
)
)
@staticmethod
def get_topic_meta(redis_client: Redis, topic: str, key: str):
"""Get topic meta info
获取主题相关的元数据信息
Args:
redis_client:
topic:
key:
Returns:
"""
return RedisAdmin.get_meta(
redis_client,
RedisAdmin.get_topic_meta_info_key(topic, key)
)
@staticmethod
def del_topic_consumer_group_meta(redis_client: Redis,
topic: str, group_id: str):
return RedisAdmin.del_meta(
redis_client,
RedisAdmin.get_topic_consumer_group_meta_info_key(
topic, group_id, None
)
)
@staticmethod
def del_topic_meta(redis_client: Redis, topic: str):
"""Delete all meta info for specific topic
删除特定主题的所有元数据信息
Args:
redis_client:
topic:
Returns:
"""
res1 = RedisAdmin.del_meta(
redis_client,
RedisAdmin.get_topic_consumer_group_meta_info_key(topic, None,
None)
)
res2 = RedisAdmin.del_meta(
redis_client,
RedisAdmin.get_topic_meta_info_key(topic, None)
)
return res1 and res2
@staticmethod
def _lock_topic(redis_client: Redis, topic: str,
ignore_exception: bool = False) -> bool:
"""
给某个主题加锁防止并发场景下重复操作问题
Args:
redis_client:
topic:
ignore_exception:
Returns:
"""
# 使用set给create/del操作加锁防止并发场景下重复创建删除问题
if redis_client.set(
f"{StaticConst.REDIS_ADMIN_TOPIC_LOCKER_PREFIX}{topic}",
topic, nx=True, ex=10) == 0:
return raise_if_not_ignore(ignore_exception,
CecException(
f"Someone else is creating or"
f" deleting this topic."
))
return True
@staticmethod
def _unlock_topic(redis_client: Redis, topic: str) -> bool:
"""
释放给某个主题加的锁应当和 lock_topic 配套使用
Args:
redis_client:
topic:
Returns:
"""
# 释放锁
if redis_client.get(
f"{StaticConst.REDIS_ADMIN_TOPIC_LOCKER_PREFIX}{topic}"
) == topic:
return redis_client.delete(
f"{StaticConst.REDIS_ADMIN_TOPIC_LOCKER_PREFIX}{topic}") == 1
else:
return False
@staticmethod
def _lock_consumer_group(redis_client: Redis, consumer_group_id: str,
ignore_exception: bool = False) -> bool:
"""
给某个消费组加锁防止并发场景下重复操作问题
Args:
redis_client:
consumer_group_id:
ignore_exception:
Returns:
"""
# 使用set给create/del操作加锁防止并发场景下重复创建删除问题
if redis_client.set(
f"{StaticConst.REDIS_ADMIN_CONSUMER_GROUP_LOCKER_PREFIX}"
f"{consumer_group_id}",
consumer_group_id, nx=True, ex=10) == 0:
return raise_if_not_ignore(ignore_exception,
CecException(
f"Someone else is creating or"
f" deleting this consumer group."
))
return True
@staticmethod
def _unlock_consumer_group(redis_client: Redis,
consumer_group_id: str) -> bool:
"""
释放给某个消费组加的锁应当和 lock_consumer_group 配套使用
Args:
redis_client:
consumer_group_id:
Returns:
"""
# 释放锁
if redis_client.get(
f"{StaticConst.REDIS_ADMIN_CONSUMER_GROUP_LOCKER_PREFIX}"
f"{consumer_group_id}"
) == consumer_group_id:
return redis_client.delete(
f"{StaticConst.REDIS_ADMIN_CONSUMER_GROUP_LOCKER_PREFIX}"
f"{consumer_group_id}") == 1
else:
return False
@logger.catch()
def __del__(self):
self.disconnect()
@logger.catch(reraise=True) @logger.catch(reraise=True)
def client(self): def client(self):
return self._redis_client return self._redis_client
@staticmethod
@logger.catch(reraise=True)
def perform_periodic_cleanup(redis_client: Redis,
topic_name: str,
expire_duration: int = 0):
# 先获取到当前最新的时间
time_res = redis_client.time()
time_ms = int(time_res[0] * 1000 + time_res[1] / 1000)
# 然后执行 xtrim 命令进行清理
clean_count = redis_client.xtrim(topic_name,
minid=f"{time_ms - expire_duration}")
LoggerHelper.get_lazy_logger().debug(
f"perform_periodic_cleanup => {clean_count}"
)
return clean_count

View File

@ -15,12 +15,13 @@ from ..cec_base.log import LoggerHelper
from redis import Redis from redis import Redis
from .utils import do_connect_by_cec_url from .utils import do_connect_by_cec_url
from .redis_admin import RedisAdmin from .redis_admin import RedisAdmin
from .redis_producer import RedisProducer
from loguru import logger from loguru import logger
from queue import Queue from queue import Queue
from .consume_status_storage import ConsumeStatusStorage
from .common import StaticConst, ClientBase
class RedisConsumer(Consumer): class RedisConsumer(Consumer, ClientBase):
"""A redis-based execution module implement of Consumer """A redis-based execution module implement of Consumer
一个基于 Redis 实现的执行模块中的 Consumer 实现 一个基于 Redis 实现的执行模块中的 Consumer 实现
@ -30,13 +31,24 @@ class RedisConsumer(Consumer):
def __init__(self, url: CecUrl, topic_name: str, consumer_id: str = "", def __init__(self, url: CecUrl, topic_name: str, consumer_id: str = "",
group_id: str = "", start_from_now: bool = True, group_id: str = "", start_from_now: bool = True,
default_batch_consume_limit: int = 10): default_batch_consume_limit: int = 10):
super().__init__(topic_name, consumer_id, group_id, start_from_now, Consumer.__init__(self, topic_name, consumer_id, group_id,
default_batch_consume_limit) start_from_now, default_batch_consume_limit)
ClientBase.__init__(self, url)
# 特化参数1pending_expire_time => pending 消息超时时间
# - 在pending列表中超过指定时间的消息当前消费者会尝试将其获取下来消费
self.pending_expire_time = self.get_special_param(
StaticConst.REDIS_SPECIAL_PARM_CEC_PENDING_EXPIRE_TIME
)
self._current_url = "" self._current_url = ""
self._redis_client: Redis = None self._redis_client: Redis = None
self.connect_by_cec_url(url) self.connect_by_cec_url(url)
self._last_event_id: str = None # 最近一次消费的ID self._last_event_id: str = None # 最近一次消费的ID
self._message_cache_queue = Queue() # 消息缓存队列 self._message_cache_queue = Queue() # 消息缓存队列
self.consume_status_storage = None
self.inner_topic_name = StaticConst.get_inner_topic_name(
topic_name)
# 如果是组消费模式,检查消费组是否存在 # 如果是组消费模式,检查消费组是否存在
if self.consume_mode == ConsumeMode.CONSUME_GROUP: if self.consume_mode == ConsumeMode.CONSUME_GROUP:
@ -44,19 +56,24 @@ class RedisConsumer(Consumer):
RedisAdmin.static_create_consumer_group(self._redis_client, RedisAdmin.static_create_consumer_group(self._redis_client,
group_id, group_id,
True) True)
self.consume_status_storage = ConsumeStatusStorage(
self._redis_client,
topic_name,
group_id
)
# 通过本字段标识是否是需要拉取 pending 列表中的消息 # 通过本字段标识是否是需要拉取 pending 列表中的消息
self._is_need_fetch_pending_message = True self._is_need_fetch_pending_message = True
@logger.catch(reraise=True) @logger.catch(reraise=True)
def consume(self, timeout: int = 0, auto_ack: bool = False, def consume(self, timeout: int = -1, auto_ack: bool = False,
batch_consume_limit: int = 0) -> [Event]: batch_consume_limit: int = 0) -> [Event]:
"""Consume some event from cec """Consume some event from cec
从事件中心尝试消费一组事件 从事件中心尝试消费一组事件
Args: Args:
timeout(int): 超时等待时间单位ms0 表示阻塞等待 timeout(int): 超时等待时间单位ms<=0 表示阻塞等待
auto_ack(bool): 是否开启自动确认组消费模式有效 auto_ack(bool): 是否开启自动确认组消费模式有效
1. 一旦开启自动确认每成功读取到一个事件消息就会自动确认 1. 一旦开启自动确认每成功读取到一个事件消息就会自动确认
@ -85,18 +102,65 @@ class RedisConsumer(Consumer):
... , start_from_now=False) ... , start_from_now=False)
>>> consumer.consume(200, auto_ack=False, batch_consume_limit=20) >>> consumer.consume(200, auto_ack=False, batch_consume_limit=20)
""" """
if timeout <= 0:
timeout = 0
batch_consume_limit = self.default_batch_consume_limit if \ batch_consume_limit = self.default_batch_consume_limit if \
batch_consume_limit <= 0 else batch_consume_limit batch_consume_limit <= 0 else batch_consume_limit
LoggerHelper.get_lazy_logger().debug( LoggerHelper.get_lazy_logger().debug(
f"{self} try to consume one message from " f"{self} try to consume one message from "
f"{self.topic_name} in {self.consume_mode}.") f"{self.topic_name} in {self.consume_mode}.")
if self.consume_mode == ConsumeMode.CONSUME_GROUP: if self.consume_mode == ConsumeMode.CONSUME_GROUP:
message_ret = [[[], [], []]]
if self._last_event_id is None: if self._last_event_id is None:
# 确保消费组存在 # 确保消费组存在
RedisAdmin.add_group_to_stream( RedisAdmin.add_group_to_stream(
self._redis_client, self.topic_name, self.group_id) self._redis_client, self.topic_name, self.group_id)
# 首先处理 pending list transfer
# 1. 尝试从消费组整体的 pending list 中过滤出长时间未 ACK 的事件,即在
# pending list 中停留时间超过 'pending_expire_time' 的事件;
# 2. 并将超期的事件 transfer 到当前消费者进行处理
if self.is_gte_6_2(self._redis_client):
# Redis 版本大于等于 6.2 支持使用 xautoclaim 来合并 xpending + xclaim 操
# 作,因此直接使用 xautoclaim 即可
# 尝试从消费组全局的 pending list transfer 超期的消息到当前消费者
message_ret = [
self._redis_client.xautoclaim(
self.inner_topic_name, self.group_id,
self.consumer_id,
min_idle_time=self.pending_expire_time,
count=batch_consume_limit
)
]
else:
# 如果Redis版本小于 6.2,则不支持 xautoclaim需要使用 xpending + xclaim
# 尝试从消费组全局的 pending list transfer 超期的消息到当前消费者
pending_list = self._redis_client.xpending_range(
self.inner_topic_name, self.group_id,
min='-',
max='+' if self._last_event_id is None else self._last_event_id,
count=batch_consume_limit,
)
if len(pending_list) > 0:
pending_list = list(filter(
lambda item: item.get('time_since_delivered',
0) > self.pending_expire_time,
pending_list
))
pending_ids = list(map(
lambda item: item.get('message_id', '0-0'),
pending_list
))
if len(pending_ids) > 0:
message_ret = [[[], self._redis_client.xclaim(
self.inner_topic_name, self.group_id,
self.consumer_id, self.pending_expire_time,
pending_ids
)]]
if len(message_ret[0][1]) <= 0:
# 判断是否需要从 pending list 拉取消息 # 判断是否需要从 pending list 拉取消息
# 1. 实例创建后,第一次消费会尝试获取 pending 列表的消息,即当前消费者(由 # 1. 实例创建后,第一次消费会尝试获取 pending 列表的消息,即当前消费者(由
# consumer_id 区分不同的消费者,使用相同的 consumer_id 创建的RedisConsumer # consumer_id 区分不同的消费者,使用相同的 consumer_id 创建的RedisConsumer
@ -109,33 +173,41 @@ class RedisConsumer(Consumer):
if self._is_need_fetch_pending_message: if self._is_need_fetch_pending_message:
message_ret = self._redis_client.xreadgroup( message_ret = self._redis_client.xreadgroup(
self.group_id, self.consumer_id, { self.group_id, self.consumer_id, {
self.topic_name: '0-0' self.inner_topic_name: '0-0'
}, count=batch_consume_limit, block=timeout }, count=batch_consume_limit, block=timeout,
noack=auto_ack
) )
if len(message_ret[0][1]) == 0: if len(message_ret[0][1]) == 0:
self._is_need_fetch_pending_message = False self._is_need_fetch_pending_message = False
message_ret = self._redis_client.xreadgroup( message_ret = self._redis_client.xreadgroup(
self.group_id, self.consumer_id, { self.group_id, self.consumer_id, {
self.topic_name: '>' self.inner_topic_name: '>'
}, count=batch_consume_limit, block=timeout) }, count=batch_consume_limit, block=timeout,
noack=auto_ack
)
else: else:
# 组消费模式单独处理 # 组消费模式单独处理
message_ret = self._redis_client.xreadgroup( message_ret = self._redis_client.xreadgroup(
self.group_id, self.consumer_id, { self.group_id, self.consumer_id, {
self.topic_name: '>' self.inner_topic_name: '>'
}, count=batch_consume_limit, block=timeout) }, count=batch_consume_limit, block=timeout,
noack=auto_ack
)
# 更新状态,执行必要的清除任务
self.consume_status_storage.update()
else: else:
# 下面处理扇形广播消费 # 下面处理扇形广播消费
if self._last_event_id is None: if self._last_event_id is None:
# 表示自从这个 Consumer 被实例化后第一次调用消费方法,做一些初始化操作 # 表示自从这个 Consumer 被实例化后第一次调用消费方法,做一些初始化操作
message_ret = self._redis_client.xread({ message_ret = self._redis_client.xread({
self.topic_name: '$' if self.consume_mode == self.inner_topic_name: '$' if self.consume_mode ==
ConsumeMode.CONSUME_FROM_NOW else '0-0' ConsumeMode.CONSUME_FROM_NOW else '0-0'
}, count=batch_consume_limit, block=timeout) }, count=batch_consume_limit, block=timeout)
else: else:
# 按序依次取出消息 # 按序依次取出消息
message_ret = self._redis_client.xread({ message_ret = self._redis_client.xread({
self.topic_name: self._last_event_id self.inner_topic_name: self._last_event_id
}, count=batch_consume_limit, block=timeout) }, count=batch_consume_limit, block=timeout)
if len(message_ret) < 1 or len(message_ret[0]) < 2 or len( if len(message_ret) < 1 or len(message_ret[0]) < 2 or len(
message_ret[0][1]) < 1: message_ret[0][1]) < 1:
@ -149,36 +221,28 @@ class RedisConsumer(Consumer):
self._last_event_id = message_tuple[0] self._last_event_id = message_tuple[0]
# 过滤掉不是通过 cec 接口投递的事件 # 过滤掉不是通过 cec 接口投递的事件
if RedisProducer.REDIS_CEC_EVENT_VALUE_KEY not in message_tuple[1]: if StaticConst.REDIS_CEC_EVENT_VALUE_KEY not in message_tuple[1]:
continue continue
message_content = json.loads( message_content = json.loads(
message_tuple[1][RedisProducer.REDIS_CEC_EVENT_VALUE_KEY]) message_tuple[1][StaticConst.REDIS_CEC_EVENT_VALUE_KEY])
msg = Event(message_content, message_tuple[0]) msg = Event(message_content, message_tuple[0])
messages.append(msg) messages.append(msg)
LoggerHelper.get_lazy_logger().debug( LoggerHelper.get_lazy_logger().debug(
f"{self} read one message from {self.topic_name} success " f"{self} read one message from {self.topic_name} success "
f"=> {msg}") f"=> {msg}")
if auto_ack:
self._redis_client.xack(self.topic_name, self.group_id,
self._last_event_id)
LoggerHelper.get_lazy_logger().debug(
f"{self} auto_ack => <id={msg.event_id}, topic_name="
f"{self.topic_name}, consumer_group_id={self.group_id}>")
LoggerHelper.get_lazy_logger().info(
f"{self} consume one message from {self.topic_name} "
f"success => {msg}")
return messages return messages
@logger.catch(reraise=True) @logger.catch(reraise=True)
def ack(self, event_id: str) -> int: def ack(self, event: Event) -> int:
"""Confirm that the specified event has been successfully consumed """Confirm that the specified event has been successfully consumed
事件确认在接收到事件并成功处理后调用本方法确认 事件确认在接收到事件并成功处理后调用本方法确认
Args: Args:
event_id: 事件ID event(Event): 要确认的事件
1. 必须是通过 Consumer 消费获得的 Event 实例
2. 自行构造的 Event 传递进去不保证结果符合预期
Returns: Returns:
int: 1 if successfully, 0 otherwise int: 1 if successfully, 0 otherwise
@ -191,16 +255,32 @@ class RedisConsumer(Consumer):
... , start_from_now=False) ... , start_from_now=False)
>>> msgs = consumer.consume(200, auto_ack=False, batch_consume_limit=1) >>> msgs = consumer.consume(200, auto_ack=False, batch_consume_limit=1)
>>> msg = msgs[0] >>> msg = msgs[0]
>>> consumer.ack(msg.event_id) >>> consumer.ack(msg)
""" """
LoggerHelper.get_lazy_logger().debug( LoggerHelper.get_lazy_logger().debug(
f"{self} try to ack => {event_id}" f"{self} try to ack => {event.event_id}"
) )
ret = self._redis_client.xack(self.topic_name, self.group_id, event_id) # 使用流水线来加速
# 1. 记录当前主题-消费组最新确认的ID
pl = self._redis_client.pipeline()
key = RedisAdmin.get_topic_consumer_group_meta_info_key(
self.topic_name,
self.group_id,
StaticConst.TOPIC_CONSUMER_GROUP_META_KEY_LAST_ACK_ID)
pl.set(
key,
event.event_id
)
# 2. 对事件进行确认
pl.xack(self.inner_topic_name, self.group_id, event.event_id)
# 3. 记录确认的ID
self.consume_status_storage.do_after_ack_by_pl(pl, event)
rets = pl.execute()
LoggerHelper.get_lazy_logger().info( LoggerHelper.get_lazy_logger().info(
f"{self} ack '{event_id}' successfully" f"{self} ack '{event.event_id}' successfully"
) )
return ret return rets[1]
@logger.catch(reraise=True) @logger.catch(reraise=True)
def __getitem__(self, item): def __getitem__(self, item):

View File

@ -18,23 +18,29 @@ from redis import Redis
from .utils import do_connect_by_cec_url from .utils import do_connect_by_cec_url
from .redis_admin import RedisAdmin from .redis_admin import RedisAdmin
from loguru import logger from loguru import logger
from .common import StaticConst, ClientBase
class RedisProducer(Producer): class RedisProducer(Producer, ClientBase):
"""A redis-based execution module implement of Producer """A redis-based execution module implement of Producer
一个基于 Redis 实现的执行模块中的 Producer 实现 一个基于 Redis 实现的执行模块中的 Producer 实现
""" """
REDIS_CEC_EVENT_VALUE_KEY = "redis-cec-event-value-key" def __init__(self, url: CecUrl):
Producer.__init__(self)
def __init__(self, url: CecUrl, **kwargs): ClientBase.__init__(self, url)
self._current_url = "" self._current_url = ""
if 'default_max_len' in kwargs:
self.default_max_len = kwargs['default_max_len'] # 处理 Redis 实现的事件中心的特化参数
else: self.default_max_len = self.get_special_param(
self.default_max_len = 1000 StaticConst.REDIS_SPECIAL_PARM_CEC_DEFAULT_MAX_LEN
)
self.auto_mk_topic = self.get_special_param(
StaticConst.REDIS_SPECIAL_PARM_CEC_AUTO_MK_TOPIC
)
# 1. 首先连接到 Redis 服务器 # 1. 首先连接到 Redis 服务器
self._redis_client: Redis = None self._redis_client: Redis = None
self.connect_by_cec_url(url) self.connect_by_cec_url(url)
@ -46,8 +52,8 @@ class RedisProducer(Producer):
@logger.catch(reraise=True) @logger.catch(reraise=True)
def produce(self, topic_name: str, message_value: dict, def produce(self, topic_name: str, message_value: dict,
auto_mk_topic: bool = False,
callback: Callable[[Exception, Event], None] = None, callback: Callable[[Exception, Event], None] = None,
partition: int = -1,
**kwargs): **kwargs):
"""Generate one new event, then put it to event center """Generate one new event, then put it to event center
@ -56,9 +62,12 @@ class RedisProducer(Producer):
Args: Args:
topic_name: 主题名称 topic_name: 主题名称
message_value: 事件内容 message_value: 事件内容
auto_mk_topic: 是否在主题不存在的时候自动创建
callback(Callable[[Exception, Event], None]): 事件成功投递到事件中心回调 callback(Callable[[Exception, Event], None]): 事件成功投递到事件中心回调
max_len: 期望本主题最大堆积的消息数 partition(int): 分区号
1. 如果指定了有效分区号消息投递给指定的分区不建议
2. 传递了一个正数分区号但是无此分区将抛出异常
3. 传递了一个负数分区号比如-1则消息将使用内建的策略均衡的投
递给所有的分区建议
Examples: Examples:
>>> producer = dispatch_producer( >>> producer = dispatch_producer(
@ -70,16 +79,17 @@ class RedisProducer(Producer):
f"{topic_name}.") f"{topic_name}.")
topic_exist = False topic_exist = False
inner_topic_name = StaticConst.get_inner_topic_name(topic_name)
# 判断是否有目标主题的元数据信息 # 判断是否有目标主题的元数据信息
if topic_name not in self._topic_metas or \ if inner_topic_name not in self._topic_metas or \
self._topic_metas[topic_name].topic_name == "": self._topic_metas[inner_topic_name] is None:
# 拉取元数据信息 # 拉取元数据信息
self._topic_metas[topic_name] = RedisAdmin.get_meta_info( self._topic_metas[inner_topic_name] = RedisAdmin.get_meta_info(
self._redis_client, topic_name) self._redis_client, topic_name)
# 如果元数据信息无效,说明主题不存在 # 如果元数据信息无效,说明主题不存在
if self._topic_metas[topic_name].topic_name == "": if self._topic_metas[inner_topic_name] is None:
if auto_mk_topic: if self.auto_mk_topic:
# 如果设置了主题不存在时自动创建,则尝试创建主题 # 如果设置了主题不存在时自动创建,则尝试创建主题
topic_exist = RedisAdmin.static_create_topic( topic_exist = RedisAdmin.static_create_topic(
self._redis_client, self._redis_client,
@ -103,8 +113,8 @@ class RedisProducer(Producer):
# 将消息放到对应的 topic 中 # 将消息放到对应的 topic 中
if 'maxlen' not in kwargs: if 'maxlen' not in kwargs:
kwargs['maxlen'] = self.default_max_len kwargs['maxlen'] = self.default_max_len
event_id = self._redis_client.xadd(topic_name, { event_id = self._redis_client.xadd(inner_topic_name, {
RedisProducer.REDIS_CEC_EVENT_VALUE_KEY: json.dumps( StaticConst.REDIS_CEC_EVENT_VALUE_KEY: json.dumps(
message_value) message_value)
}, **kwargs) }, **kwargs)
@ -122,7 +132,7 @@ class RedisProducer(Producer):
callback(e, Event(message_value, event_id)) callback(e, Event(message_value, event_id))
@logger.catch(reraise=True) @logger.catch(reraise=True)
def flush(self): def flush(self, timeout: int = -1):
"""Flush all cached event to event center """Flush all cached event to event center
TODO: 目前 RedisProducer 的produce实现为阻塞所以 flush 实现可以为空 TODO: 目前 RedisProducer 的produce实现为阻塞所以 flush 实现可以为空
@ -182,4 +192,3 @@ class RedisProducer(Producer):
self._redis_client = None self._redis_client = None
LoggerHelper.get_lazy_logger().success( LoggerHelper.get_lazy_logger().success(
f"{self} disconnect from '{self._current_url}' successfully.") f"{self} disconnect from '{self._current_url}' successfully.")

View File

@ -6,7 +6,6 @@ Email mfeng@linux.alibaba.com
File utils.py File utils.py
Description: Description:
""" """
import uuid
from redis import Redis from redis import Redis
from ..cec_base.url import CecUrl from ..cec_base.url import CecUrl
from ..cec_base.base import ConnectException from ..cec_base.base import ConnectException
@ -27,9 +26,6 @@ def do_connect_by_cec_url(cec_url: CecUrl) -> Redis:
try: try:
redis_client = Redis(host=host, port=port, db=0, decode_responses=True, redis_client = Redis(host=host, port=port, db=0, decode_responses=True,
**cec_url.params) **cec_url.params)
# 执行一个简单的get命令使得客户端连接上redis服务器如果连接失败
# 会抛出异常
# redis_client.get(str(uuid.uuid4()))
except ConnectionError as e: except ConnectionError as e:
raise ConnectException(e) raise ConnectException(e)
return redis_client return redis_client