sysom1/environment/1_sdk/cec_redis/redis_admin.py

613 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*- #
"""
Time 2022/7/29 11:25
Author: mingfeng (SunnyQjm)
Email mfeng@linux.alibaba.com
File redis_admin.py
Description:
"""
import json
from typing import Optional, List
import redis.exceptions
from cec_base.admin import Admin, ConsumeStatusItem
from cec_base.exceptions import CecException
from cec_base.event import Event
from cec_base.meta import ConsumerGroupMeta, TopicMeta
from clogger import logger
from cec_base.url import CecUrl
from redis import Redis
from redis.exceptions import ResponseError
from .utils import raise_if_not_ignore, do_connect_by_cec_url
from .consume_status_storage import ConsumeStatusStorage
from .common import StaticConst, ClientBase
from .admin_static import static_create_topic, static_del_topic, \
static_is_topic_exist, static_get_topic_list, \
static_create_consumer_group, static_del_consumer_group, \
static_is_consumer_group_exist, static_get_consumer_group_list, \
get_topic_consumer_group_meta, get_sub_list_key
class RedisAdmin(Admin, ClientBase):
"""A redis-based execution module implement of Admin
Admin implementation in an execution module based on the Redis.
"""
def __init__(self, url: CecUrl):
Admin.__init__(self)
ClientBase.__init__(self, url)
self._redis_client: Optional[Redis] = None
self._current_url: str = ""
self.connect_by_cec_url(url)
####################################################################
# Event Center Admin Interface Implementation
####################################################################
def create_topic(self, topic_name: str = "", num_partitions: int = 1,
replication_factor: int = 1, **kwargs) -> bool:
"""Create one topic
Creating a Topic => Corresponding to Redis should be creating a Stream:
1. First determine whether Topic already exists and, if so, throw
an exception;
2. Then use the xadd command to trigger the stream creation
process;
3. Thirdly, delete the test data you just inserted and clear
the stream;
4. Finally, add the topic_name to a specific set
(This collection contains a list of all the Topic names created
through the CEC)
TODO: This is where further consideration needs to be given to the use
of transactions to prevent errors in execution at an intermediate
step and inconsistent state
Args:
topic_name(str): Topic name (unique identification of the topic)
num_partitions(int): Number of partitions of the topic
replication_factor(int): Redundancy factor (specifies how many
copies of the data for the subject should
be kept in the event center)
Keyword Args:
ignore_exception: Whether to ignore exceptions that may be thrown
expire_time: Event timeout time (in ms, default: 1day)
Returns:
bool: True if successful, False otherwise.
Raises:
TopicAlreadyExistsException: If topic already exists
CecException: Get lock failed
Examples:
>>> admin = dispatch_admin("redis://localhost:6379")
>>> admin.create_topic("test_topic")
True
"""
return static_create_topic(
self._redis_client,
topic_name,
num_partitions,
replication_factor,
**kwargs
)
def del_topic(self, topic_name: str, **kwargs) -> bool:
"""Delete one topic
Deleting a Topic => Corresponding to redis should be deleting a stream
1. Just delete the key corresponding to the stream
2. Clear some relevant metadata information
Args:
topic_name(str): Topic name
Keyword Args
ignore_exception: Whether to ignore exceptions that may be thrown
Returns:
bool: True if successful, False otherwise.
Raises:
TopicNotExistsException: If topic not exists
Examples:
>>> admin = dispatch_admin("redis://localhost:6379")
>>> admin.del_topic("test_topic")
True
"""
return static_del_topic(self._redis_client, topic_name, **kwargs)
def is_topic_exist(self, topic_name: str, **kwargs) -> bool:
"""Judge whether one specific topic is exists
Determine if Topic exists => Corresponds to Redis should be to
determine if the most corresponding stream exists
1. Use the type command to determine whether the type of the key
is 'stream'
Args:
topic_name(str): Topic name
Keyword Args:
Returns:
bool: True if topic exists, False otherwise.
Examples:
>>> admin = dispatch_admin("redis://localhost:6379")
>>> admin.is_topic_exist("test_topic")
True
"""
return static_is_topic_exist(self._redis_client, topic_name, **kwargs)
def get_topic_list(self, **kwargs) -> List[TopicMeta]:
"""Get topic list
Getting the Topic list => corresponding to Redis should be a list of
all Streams.
1. This implementation creates a special Set that holds the keys
of all Streams created by this set of interfaces;
2. So just query the Set and get all the keys.
Args:
Keyword Args:
Returns:
[str]: The topic name list
Examples:
>>> admin = dispatch_admin("redis://localhost:6379")
>>> admin.get_topic_list()
['test_topic']
"""
return static_get_topic_list(self._redis_client, **kwargs)
@staticmethod
def get_meta_info(client: Redis, topic_name: str) -> Optional[dict]:
"""Get topic's meta info
Get metadata information for a specific topic
Args:
client(Redis): Redis client
topic_name(str): topic name
Returns:
TopicMeta: the topic meta info object
"""
try:
res = client.xinfo_stream(
StaticConst.get_inner_topic_name(topic_name))
except ResponseError:
return None
logger.debug(
f"get_meta_info => {res}.")
return {
'topic_name': topic_name,
**res
}
def create_consumer_group(self, consumer_group_id: str, **kwargs) -> bool:
"""Create one consumer group
Create a consumer group
1. The concept of a consumption group in Redis is for each Stream,
the same consumer group cannot consume multiple Streams;
2. This implementation creates a special Set that holds all
consumer groups created by this set of interfaces, and creates
them if the group does not exist when group consumption is
performed on a Stream;
3. Then create a list with ConsumerId as the key to store all the
Streams related to that consumer, to ensure that when a consumer
group is deleted, this consumer group should be removed from all
subscribed streams
Args:
consumer_group_id(str): Consumer group ID
Keyword Args:
ignore_exception: Whether to ignore exceptions that may be thrown
Returns:
bool: True if successful, False otherwise.
Raises:
ConsumerGroupAlreadyExistsException: If consumer group already
exists
Examples:
>>> admin = dispatch_admin("redis://localhost:6379")
>>> admin.create_consumer_group("test_group")
True
"""
return static_create_consumer_group(self._redis_client,
consumer_group_id,
**kwargs)
def del_consumer_group(self, consumer_group_id: str, **kwargs) -> bool:
"""Delete one consumer group
Delete consumer group
1. First determine if the consumer group exists, and if not,
throw an exception as appropriate;
2. Removal from the set of consumer groups;
3. Destroy all consumer group structures of the same name in all
streams associated with the current consumption group.
Args:
consumer_group_id(str): Consumer group ID
Keyword Args:
ignore_exception: Whether to ignore exceptions that may be thrown
Returns:
bool: True if successful, False otherwise.
Raises:
ConsumerGroupNotExistsException: If consumer group not exists
Examples:
>>> admin = dispatch_admin("redis://localhost:6379")
>>> admin.del_consumer_group("test_group")
True
"""
return static_del_consumer_group(self._redis_client, consumer_group_id,
**kwargs)
def is_consumer_group_exist(self, consumer_group_id: str,
**kwargs) -> bool:
"""Judge whether one specific consumer group exists
Determines whether the specific consumer group exists
1. Determine whether the set storing all consumer group keys
contains the specified consumer id;
2. Also determine if consumer_group_id is a key, and if it is
occupied, also report it as existing and not allowed to create.
Args:
consumer_group_id(str): Consumer group ID
Keyword Args:
ignore_exception: Whether to ignore exceptions that may be thrown
Returns:
bool: True if consumer group exists, False otherwise.
Examples:
>>> admin = dispatch_admin("redis://localhost:6379")
>>> admin.is_consumer_group_exist("test_group")
True
"""
return static_is_consumer_group_exist(self._redis_client,
consumer_group_id)
def get_consumer_group_list(self, **kwargs) -> List[ConsumerGroupMeta]:
"""Get consumer group list
Get consumer group list
1. Since in Redis, consumer groups belong to Streams, and consumer
groups of different Streams are independent, a special set
_REDIS_ADMIN_CONSUMER_GROUP_LIST_SET stores the names of all
consumer groups in order to achieve the purpose that the same
consumer group can consume different Topics;
2. When a consumer group tries to consume a Stream, cec-redis
automatically determines if the Stream contains the consumer
group and automatically creates it if it does not;
3. So you can get the list of consumer groups directly from
_REDIS_ADMIN_CONSUMER_GROUP_LIST_SET.
Returns:
[ConsumerGroupMeta]: The consumer group list
Examples:
>>> admin = dispatch_admin("redis://localhost:6379")
>>> admin.get_consumer_group_list()
['test_group']
"""
return static_get_consumer_group_list(self._redis_client, **kwargs)
def get_consume_status(
self, topic: str, consumer_group_id: str = "", partition: int = 0,
**kwargs) -> List[ConsumeStatusItem]:
"""Get consumption info for specific <topic, consumer_group, partition>
Get the consumption info of a particular topic by a particular consumer
group.
Args:
topic(str): Topic name
consumer_group_id(str): Consumer group ID
1. If consumer_group_id == '' or None, returns the consumption
info of all consumer groups subscribed to the topic;
=> In this case the partition parameter is invalid
(will get consumption info for all partitions)
2. Throws an exception if consumer_group_id is an invalid group
ID;
3. If consumer_group_id is a valid group ID, then only get
consumption info of the specified consumption group.
partition: Partition ID
1. If partition specifies a valid non-negative integer
=> returns the consumption info of the specified partition;
2. Throws an exception if partition specifies an invalid
non-negative integer;
3. If partition specifies a negative number => returns the
consumption info of all partitions under the current topic.
Raises:
CecException
Examples:
>>> admin = dispatch_admin("redis://localhost:6379")
>>> admin.get_consume_status("topic1")
[
{
"topic":"topic1",
"consumer_group_id":"c78e8b71-45b9-4e11-8f8e-05a98b534cc0",
"min_id":"1661516434003-0",
"max_id":"1661516434004-4",
"total_event_count":10,
"last_ack_id":"1661516434003-4",
"lag":5
},
{
"topic":"topic1",
"consumer_group_id":"d1b39ec3-6ae9-42a6-83b5-257d875788e6",
"min_id":"1661516434003-0",
"max_id":"1661516434004-4",
"total_event_count":10,
"last_ack_id":"1661516434003-1",
"lag":8
}
]
Returns:
"""
def _get_one_group_consume_status(_groups: List[dict]) \
-> List[ConsumeStatusItem]:
"""Get the consumption of the specified consumer group"""
select_group = None
# Attempt to obtain consumption information for the specified
# consumer group
for _group in _groups:
if _group.get('name') == consumer_group_id:
select_group = _group
break
if select_group is None:
# Consumer group not exists
raise CecException(
f"Consumer group {consumer_group_id} not exists or did "
f"not subscribe topic {topic}")
# Since the current implementation of cec-redis does not support
# partitioning, each topic has one and only one partition number
# and the partition number is 0. If a consumption group is
# specified, the partition number passed in <= 0 is considered
# valid; the partition number passed in > 0 is considered invalid.
if partition > 0:
raise CecException(
f"Topic {topic} did not contains partition {partition}"
)
# Here it is sufficient to return the consumption of the
# specified consumer group
last_ack_id = get_topic_consumer_group_meta(
self._redis_client, topic, select_group.get('name'),
StaticConst.TOPIC_CONSUMER_GROUP_META_KEY_LAST_ACK_ID
)
# Get LAG
if self.is_gte_7(self._redis_client):
lag = select_group['lag'] + select_group['pending']
else:
lag = ConsumeStatusStorage.get_already_ack_count(
self._redis_client, topic, consumer_group_id
)
return [
ConsumeStatusItem(
topic, consumer_group_id, 0,
min_id=min_id,
max_id=max_id,
total_event_count=length,
last_ack_id=last_ack_id,
lag=lag
)
]
def _get_all_group_consume_status(_groups: List[dict]) \
-> List[ConsumeStatusItem]:
"""Get the consumption of the all consumer group"""
# 获取所有消费组的消费情况(此时 partition 参数无效)
res, counts_map = [], {}
if not self.is_gte_7(self._redis_client):
# 如果 Redis 版本小于7将使用 ConsumeStatusStorage 获取 lag
pipeline = self._redis_client.pipeline()
for _group in _groups:
ConsumeStatusStorage.get_already_ack_count(
pipeline, topic, _group.get('name')
)
counts = pipeline.execute()
for i, _group in enumerate(_groups):
counts_map[_group.get('name')] = counts[i]
for _group in _groups:
last_ack_id = get_topic_consumer_group_meta(
self._redis_client, topic, _group.get('name'),
StaticConst.TOPIC_CONSUMER_GROUP_META_KEY_LAST_ACK_ID
)
# 获取 LAG
if 'lag' in _group and 'pending' in _group:
lag = _group['lag'] + _group['pending']
else:
lag = length - counts_map[_group.get('name')]
res.append(ConsumeStatusItem(
topic, _group.get('name'), 0,
min_id=min_id,
max_id=max_id,
total_event_count=length,
last_ack_id=last_ack_id,
lag=lag
))
return res
inner_topic_name = StaticConst.get_inner_topic_name(topic)
# Use 'xinfo stream' to get topic information
try:
stream_info = self._redis_client.xinfo_stream(inner_topic_name)
first_entry = stream_info.get("first-entry", None)
last_entry = stream_info.get("last-entry", None)
min_id = first_entry[0] if first_entry is not None else None
max_id = last_entry[0] if last_entry is not None else None
length = stream_info.get("length", 0)
groups = self._redis_client.xinfo_groups(inner_topic_name)
if consumer_group_id != '' and consumer_group_id is not None:
return _get_one_group_consume_status(groups)
return _get_all_group_consume_status(groups)
except redis.exceptions.RedisError as exc:
raise CecException(exc) from exc
def get_event_list(self, topic: str, partition: int, offset: str,
count: int, **kwargs) -> List[Event]:
""" Get event list for specific <topic, partition>
Get a list of messages for a specific topic under a specified partition
=> Use the xrange command in Redis to get the messages in a stream
1. offset and count for paging
Args:
topic(str): Topic name
partition(int): Partition ID => There is no partition in Redis, so
this parameter is invalid
offset(int): Offset (want to read messages after this ID)
count(int): Maximum number of reads
References:
https://redis.io/commands/xrange/
Returns:
"""
ignore_exception = kwargs.get("ignore_exception", False)
inner_topic_name = StaticConst.get_inner_topic_name(topic)
res = []
try:
messages = self._redis_client.xrange(
inner_topic_name,
min=f"({offset}",
max='+',
count=count
)
for message in messages:
message_content = json.loads(
message[1][StaticConst.REDIS_CEC_EVENT_VALUE_KEY])
res.append(Event(message_content, message[0]))
except redis.exceptions.RedisError as exc:
raise_if_not_ignore(ignore_exception, exc)
return res
@staticmethod
def add_group_to_stream(redis_client: Redis, stream: str,
consumer_group_id: str) -> bool:
"""Add one consumer group to stream
Add the consumer group to the corresponding stream
Args:
redis_client(Redis): Redis client
stream(str): Stream(Topic) name
consumer_group_id(str): Consumer group id
Returns:
bool: True if successfully, False otherwise.
"""
inner_topic_name = StaticConst.get_inner_topic_name(stream)
try:
logger.debug(
f"try to add consumer group '{consumer_group_id}"
f"' to topic '{stream}'.")
redis_client.xgroup_create(
inner_topic_name,
consumer_group_id, id="0-0")
except ResponseError:
logger.debug(
f"Consumer group '{consumer_group_id}"
f"' already exists.")
return False
else:
redis_client.lpush(
get_sub_list_key(consumer_group_id),
inner_topic_name
)
logger.debug(
f"Add consumer group '{consumer_group_id}"
f"' to topic '{stream}' successfully.")
return True
def is_support_partitions(self, **kwargs) -> bool:
return False
def is_support_replication(self, **kwargs) -> bool:
return False
def connect_by_cec_url(self, url: CecUrl):
"""Connect to redis server by CecUrl
Connecting to the Redis server via CecUrl
Args:
url(str): CecUrl
"""
logger.debug(
f"{self} try to connect to '{url}'.")
self._redis_client = do_connect_by_cec_url(url)
self._current_url = str(url)
logger.info(
f"{self} connect to '{url}' successfully.")
def connect(self, url: str):
"""Connect to redis server by url
Connecting to the remote message queue => Corresponding to this module
is connecting to the Redis server.
Args:
url(str): CecUrl
"""
cec_url = CecUrl.parse(url)
return self.connect_by_cec_url(cec_url)
def disconnect(self):
"""Disconnect from redis server
Disconnect from remote server => Corresponds to this module as
disconnecting the Redis server.
"""
if self._redis_client is None:
return
logger.debug(
f"{self} try to disconnect from '{self._current_url}'.")
self._redis_client.quit()
self._redis_client = None
logger.info(
f"{self} disconnect from '{self._current_url}' successfully.")
def client(self):
"""Get inner redis client
Returns:
"""
return self._redis_client