sysom1/environment/1_sdk/cec_redis/admin_static.py

774 lines
22 KiB
Python

# -*- coding: utf-8 -*- #
"""
Time 2022/9/26 21:13
Author: mingfeng (SunnyQjm)
Email mfeng@linux.alibaba.com
File admin_static.py
Description:
"""
import sys
from typing import Optional, List
from itertools import chain
from redis import Redis
from clogger import logger
import redis.exceptions
from cec_base.exceptions import TopicNotExistsException, \
TopicAlreadyExistsException, ConsumerGroupNotExistsException, \
ConsumerGroupAlreadyExistsException
from cec_base.meta import TopicMeta, PartitionMeta, ConsumerGroupMeta, \
ConsumerGroupMemberMeta
from cec_base.exceptions import CecException
from .utils import raise_if_not_ignore
from .consume_status_storage import ConsumeStatusStorage
from .common import StaticConst
####################################################################
# Static function implementation of the management interface
####################################################################
def static_create_topic(redis_client, topic_name: str = "",
num_partitions: int = 1,
replication_factor: int = 1, **kwargs) -> bool:
"""A static method to create one topic
Args:
redis_client(Redis): Redis client
topic_name(str): Topic name
num_partitions(int): Number of partitions
replication_factor(int): Number of replications
Keyword Args:
ignore_exception(bool): Whether ignore exception
expire_time(int): Event expire time
Returns:
bool: True if create successful else failed
"""
ignore_exception = kwargs.get("ignore_exception", False)
expire_time = kwargs.get("expire_time", 24 * 60 * 60 * 1000)
logger.debug(
f"{redis_client} try to create_topic <topic_name="
f"{topic_name}, num_partitions={num_partitions}"
f", replication_factor={replication_factor},"
f" expire_time={expire_time}>.")
# The Key of the Stream that internally characterizes Topic, spliced with
# a special prefix as a namespace
inner_topic_name = StaticConst.get_inner_topic_name(
topic_name)
result = True
try:
if not _lock_topic(redis_client, topic_name,
ignore_exception):
return False
# 1. Determine whether Topic exists
if static_is_topic_exist(redis_client,
topic_name):
raise TopicAlreadyExistsException(
f"Topic {topic_name} already "
f"exists."
)
# 2. Use xadd to trigger stream creation
event_id = redis_client.xadd(inner_topic_name, {
"test": 1
})
pipeline = redis_client.pipeline()
# 3. Delete the test event just added and empty the stream.
pipeline.xdel(inner_topic_name, event_id)
# 4. add the new Topic to the Topic collection (for easy access to
# the list of all Topics)
pipeline.sadd(StaticConst.REDIS_ADMIN_TOPIC_LIST_SET,
inner_topic_name)
pipeline.execute()
except redis.exceptions.RedisError as exc:
raise_if_not_ignore(ignore_exception, exc)
finally:
_unlock_topic(redis_client, topic_name)
logger.info(
f"{redis_client} create_topic '{topic_name}' successfully.")
return result
def static_del_topic(redis_client: Redis, topic_name: str, **kwargs) -> bool:
"""A static method to delete one topic
Args:
redis_client(Redis): Redis client
topic_name(str): Topic name
Keyword Args:
ignore_exception(bool): Whether ignore exception
Returns:
bool: True if delete successful else failed
"""
ignore_exception = kwargs.get("ignore_exception", False)
logger.debug(
f"{redis_client} try to del_topic <topic_name={topic_name}>.")
inner_topic_name = StaticConst.get_inner_topic_name(
topic_name)
try:
if not _lock_topic(redis_client, topic_name, ignore_exception):
return False
# 1. Determine whether topic exists.
if not static_is_topic_exist(redis_client,
topic_name):
raise_if_not_ignore(ignore_exception,
TopicNotExistsException(
f"Topic {topic_name} not exists."
))
pipeline = redis_client.pipeline()
# 2. Delete the corresponding stream (topic)
pipeline.delete(inner_topic_name)
# 3. Remove the current topic from the topic list
pipeline.srem(StaticConst.REDIS_ADMIN_TOPIC_LIST_SET,
inner_topic_name)
pipeline.execute()
# 4. Clear topic-related metadata information
del_topic_meta(redis_client, topic_name)
# 5. Delete the structure associated with topic for storing consumption
# status
ConsumeStatusStorage.destroy_by_stream(redis_client, topic_name)
except redis.exceptions.RedisError as exc:
raise_if_not_ignore(ignore_exception, exc)
except CecException as exc:
raise_if_not_ignore(ignore_exception, exc)
finally:
_unlock_topic(redis_client, topic_name)
logger.info(
f"{redis_client} del_topic '{topic_name}' successfully.")
return True
def static_is_topic_exist(redis_client: Redis, topic_name: str,
**kwargs) -> bool:
"""A static method to determine whether specific topic exists
Args:
redis_client(Redis): Redis client
topic_name(str): Topic name
**kwargs:
Returns:
"""
ignore_exception = kwargs.get("ignore_exception", False)
res = False
try:
res = redis_client.type(
StaticConst.get_inner_topic_name(topic_name)) == 'stream'
logger.debug(
f"Is topic {topic_name} exists? => {res}, {kwargs}.")
except redis.exceptions.RedisError as exc:
raise_if_not_ignore(ignore_exception, exc)
return res
def static_get_topic_list(redis_client: Redis, **kwargs) -> List[TopicMeta]:
"""A static method to get topic list
Args:
redis_client(Redis): Redis client
Keyword Args:
Returns:
"""
ignore_exception = kwargs.get("ignore_exception", False)
topics = []
try:
res = redis_client.smembers(StaticConst.REDIS_ADMIN_TOPIC_LIST_SET)
for inner_topic_name in res:
topic_meta = TopicMeta(
StaticConst.get_topic_name_by_inner_topic_name(
inner_topic_name))
topic_meta.partitions = {
0: PartitionMeta(0)
}
topics.append(topic_meta)
logger.debug(
f"get_topic_list => {res}.")
except redis.exceptions.RedisError as exc:
raise_if_not_ignore(ignore_exception, exc)
return topics
def static_create_consumer_group(redis_client: Redis,
consumer_group_id: str,
**kwargs) -> bool:
"""A static method to create a consumer group
Args:
redis_client(Redis): Redis client
consumer_group_id(str): Consumer group ID
Keyword Args:
ignore_exception: Whether to ignore exceptions that may be thrown
Returns:
bool: True if successful, False otherwise.
"""
ignore_exception = kwargs.get("ignore_exception", False)
logger.debug(
f"{redis_client} try to create_consumer_group "
f"<consumer_group_id={consumer_group_id}>.")
try:
if not _lock_consumer_group(redis_client,
consumer_group_id,
ignore_exception):
return False
if static_is_consumer_group_exist(redis_client,
consumer_group_id):
if ignore_exception:
return False
raise ConsumerGroupAlreadyExistsException(
f"Consumer group {consumer_group_id} already exists.")
# Add to the consumer group key collection
redis_client.sadd(
StaticConst.REDIS_ADMIN_CONSUMER_GROUP_LIST_SET,
consumer_group_id)
except redis.exceptions.RedisError as exc:
raise_if_not_ignore(ignore_exception, exc)
except CecException as exc:
raise_if_not_ignore(ignore_exception, exc)
finally:
_unlock_consumer_group(redis_client,
consumer_group_id)
logger.debug(
f"{redis_client} create_consumer_group "
f"'{consumer_group_id}' successfully.")
return True
def static_del_consumer_group(redis_client: Redis,
consumer_group_id: str,
**kwargs) -> bool:
"""A static method to delete a consumer group
Args:
redis_client(Redis): Redis client
consumer_group_id(str): Consumer group ID
Keyword Args:
ignore_exception: 是否忽略可能会抛出的异常
Returns:
"""
ignore_exception = kwargs.get("ignore_exception", False)
logger.debug(
f"{redis_client} try to del_consumer_group "
f"<consumer_group_id={consumer_group_id}>.")
try:
if not _lock_consumer_group(
redis_client, consumer_group_id, ignore_exception
):
return False
# 1. First determine if the consumer group exists, and if not, throw
# an exception as appropriate.
if not static_is_consumer_group_exist(redis_client,
consumer_group_id):
raise_if_not_ignore(ignore_exception,
ConsumerGroupNotExistsException(
f"Consumer group {consumer_group_id} "
f"not exists."
))
# 2. Removal from the set of consumer groups.
redis_client.srem(
StaticConst.REDIS_ADMIN_CONSUMER_GROUP_LIST_SET,
consumer_group_id)
# 3. Destroy all consumer group structures of the same name in all
# streams associated with the current consumption group.
streams = redis_client.lpop(
get_sub_list_key(consumer_group_id),
sys.maxsize
)
if streams is None:
streams = []
pipeline = redis_client.pipeline()
for stream in streams:
# Unsubscribe from topics
pipeline.xgroup_destroy(stream, consumer_group_id)
# Delete the corresponding zset
ConsumeStatusStorage.destroy_by_stream_group(pipeline, stream,
consumer_group_id)
pipeline.execute()
for stream in streams:
# Clear metadata information related to topic-consumer groups
del_topic_consumer_group_meta(redis_client, stream,
consumer_group_id)
except ConsumerGroupNotExistsException as exc:
raise_if_not_ignore(ignore_exception, exc)
except redis.exceptions.RedisError:
# Ignore errors that may be generated by Pipeline performing cleanup
# operations here
pass
finally:
_unlock_consumer_group(redis_client, consumer_group_id)
logger.debug(
f"{redis_client} del_consumer_group "
f"'{consumer_group_id}' successfully.")
return True
def static_is_consumer_group_exist(redis_client: Redis,
consumer_group_id: str,
**kwargs) -> bool:
"""A static method to determine whether the specific consumer group exists
Args:
redis_client(Redis): Redis client
consumer_group_id(str): Consumer group ID
Keyword Args:
ignore_exception: Whether to ignore exceptions that may be thrown
Returns:
[ConsumerGroupMeta]: The consumer group list
"""
ignore_exception = kwargs.get("ignore_exception", False)
res = False
try:
res = redis_client.sismember(
StaticConst.REDIS_ADMIN_CONSUMER_GROUP_LIST_SET,
consumer_group_id)
logger.debug(
f"{redis_client} Is consumer group '{consumer_group_id}' "
f"exists => {res}")
except redis.exceptions.RedisError as exc:
raise_if_not_ignore(ignore_exception, exc)
return res
def static_get_consumer_group_list(redis_client: Redis, **kwargs) \
-> List[ConsumerGroupMeta]:
"""A static method to get consumer group list
Args:
redis_client(Redis): Redis client
Keyword Args:
ignore_exception: Whether to ignore exceptions that may be thrown
Returns:
"""
ignore_exception = kwargs.get("ignore_exception", True)
res = redis_client.smembers(
StaticConst.REDIS_ADMIN_CONSUMER_GROUP_LIST_SET)
group_metas = []
for group_id in res:
group_meta = ConsumerGroupMeta(group_id)
try:
# Get information about all subscribed topics for this consumer
# group
sub_topics = redis_client.lrange(
get_sub_list_key(group_id), 0, -1
)
# Iterate through all topics to get all members
pipeline = redis_client.pipeline(transaction=True)
for topic in sub_topics:
pipeline.xinfo_consumers(topic, group_id)
# {"name":"Alice","pending":1,"idle":9104628}
for consumer in chain.from_iterable(pipeline.execute()):
group_meta.members.append(
ConsumerGroupMemberMeta(consumer['name']))
except redis.exceptions.RedisError as exc:
raise_if_not_ignore(ignore_exception, exc)
group_meta.error = exc
except CecException as exc:
raise_if_not_ignore(ignore_exception, exc)
group_meta.error = exc
else:
group_metas.append(group_meta)
logger.debug(
f"get_consumer_group_list => {res}.")
return group_metas
def static_del_consumer(redis_client: Redis, topic: str, group: str,
consumer: str):
"""A static method to remove consumer from consumer group
Args:
redis_client(Redis):
topic(str):
group(str):
consumer(str):
Returns:
"""
return redis_client.xgroup_delconsumer(topic, group, consumer) == 1
def _lock_consumer_group(redis_client: Redis, consumer_group_id: str,
ignore_exception: bool = False) -> bool:
"""Lock specific consumer group
Lock a consumer group to prevent repeated operation problems in concurrent
scenarios.
Args:
redis_client:
consumer_group_id:
ignore_exception:
Returns:
"""
if redis_client.set(
f"{StaticConst.REDIS_ADMIN_CONSUMER_GROUP_LOCKER_PREFIX}"
f"{consumer_group_id}",
consumer_group_id, nx=True, ex=10) == 0:
return raise_if_not_ignore(ignore_exception,
CecException(
"Someone else is creating or"
" deleting this consumer group."
))
return True
def _unlock_consumer_group(redis_client: Redis,
consumer_group_id: str) -> bool:
"""Unlock specific consumer group
Releasing a lock placed on a consumer group should be used in conjunction
with lock_consumer_group
Args:
redis_client:
consumer_group_id:
Returns:
"""
if redis_client.get(
f"{StaticConst.REDIS_ADMIN_CONSUMER_GROUP_LOCKER_PREFIX}"
f"{consumer_group_id}"
) == consumer_group_id:
return redis_client.delete(
f"{StaticConst.REDIS_ADMIN_CONSUMER_GROUP_LOCKER_PREFIX}"
f"{consumer_group_id}") == 1
return False
def _lock_topic(redis_client: Redis, topic: str,
ignore_exception: bool = False) -> bool:
"""Lock specific topic
Lock a topic to prevent repeated operation problems in concurrent scenes.
Args:
redis_client:
topic:
ignore_exception:
Returns:
"""
if redis_client.set(
f"{StaticConst.REDIS_ADMIN_TOPIC_LOCKER_PREFIX}{topic}",
topic, nx=True, ex=10) == 0:
return raise_if_not_ignore(ignore_exception,
CecException(
"Someone else is creating or deleting "
"this topic."
))
return True
def _unlock_topic(redis_client: Redis, topic: str) -> bool:
"""Unlock specific topic
Releasing a lock placed on a topic should be used in conjunction with
lock_topic
Args:
redis_client:
topic:
Returns:
"""
# 释放锁
if redis_client.get(
f"{StaticConst.REDIS_ADMIN_TOPIC_LOCKER_PREFIX}{topic}"
) == topic:
return redis_client.delete(
f"{StaticConst.REDIS_ADMIN_TOPIC_LOCKER_PREFIX}{topic}") == 1
return False
####################################################################
# 一些辅助函数
####################################################################
def get_topic_consumer_group_meta_info_key(
topic: Optional[str], group_id: Optional[str], key: Optional[str]
):
"""Get topic-group meta info
Get <topic, group> meta info key
Args:
topic:
group_id:
key:
Returns:
"""
return f"{StaticConst.REDIS_ADMIN_TOPIC_CONSUMER_GROUP_META_PREFIX}" \
f"{topic + ':' if topic is not None else ''}" \
f"{group_id + ':' if group_id is not None else ''}" \
f"{key + ':' if key is not None else ''}"
def get_topic_meta_info_key(topic: str, key: Optional[str]):
"""Get topic meta info
Get <topic> meta info key
Args:
topic:
key:
Returns:
"""
return f"{StaticConst.REDIS_ADMIN_TOPIC_META_PREFIX}" \
f"{topic + ':' if topic is not None else ''}" \
f"{key + ':' if key is not None else ''}"
def get_sub_list_key(group_id: str) -> str:
"""Get sub list
Get sub list key => Each topic is associated with a sub list containing the
IDs of all consumer groups that are subscribed to the topic
Args:
group_id:
Returns:
"""
return f"{StaticConst.REDIS_ADMIN_CONSUMER_GROUP_SUB_LIST_PREFIX}" \
f"{group_id}"
def store_meta(redis_client: Redis, key: str, value: str):
"""Store meta info
Store metadata
Args:
redis_client:
key:
value:
Returns:
"""
return redis_client.set(key, value)
def get_meta(redis_client: Redis, key: str):
"""Get meta info
Get metadata
Args:
redis_client:
key:
Returns:
"""
return redis_client.get(key)
def del_meta(redis_client: Redis, prefix: str):
"""Delete meta info
Delete metadata
Args:
redis_client:
prefix:
Returns:
"""
next_cursor = 0
while True:
next_cursor, key_list = redis_client.scan(
next_cursor,
match=f"{prefix}*",
count=100
)
if len(key_list) > 0:
redis_client.delete(*key_list)
if next_cursor == 0:
break
return True
def store_topic_consumer_group_meta(redis_client: Redis, topic: str,
key: str, group_id: str, value):
"""Store topic-group meta info
Store <topic, group> metadata
Args:
redis_client:
topic:
key:
group_id:
value:
Returns:
"""
return store_meta(
redis_client,
get_topic_consumer_group_meta_info_key(
topic, group_id, key
),
value
)
def store_topic_meta(redis_client: Redis, topic: str, key: str, value):
"""Store topic meta info
Store <topic> metadata
Args:
redis_client:
topic:
key:
value:
Returns:
"""
return store_meta(
redis_client,
get_topic_meta_info_key(topic, key),
value
)
def get_topic_consumer_group_meta(redis_client: Redis, topic: str,
group_id: str, key: str):
"""Get topic-group meta info
Get <topic, group> metadata
Args:
redis_client:
topic:
group_id:
key:
Returns:
"""
return get_meta(
redis_client,
get_topic_consumer_group_meta_info_key(
topic, group_id, key
)
)
def get_topic_meta(redis_client: Redis, topic: str, key: str):
"""Get topic meta info
Get topic-related metadata information
Args:
redis_client:
topic:
key:
Returns:
"""
return get_meta(
redis_client,
get_topic_meta_info_key(topic, key)
)
def del_topic_consumer_group_meta(redis_client: Redis,
topic: str, group_id: str):
"""Delete topic-group meta info
Delete <topic, group> metadata information
Args:
redis_client:
topic:
group_id:
Returns:
"""
return del_meta(
redis_client,
get_topic_consumer_group_meta_info_key(
topic, group_id, None
)
)
def del_topic_meta(redis_client: Redis, topic: str):
"""Delete all meta info for specific topic
Delete all metadata information for a specific topic
Args:
redis_client:
topic:
Returns:
"""
res1 = del_meta(
redis_client,
get_topic_consumer_group_meta_info_key(topic, None, None)
)
res2 = del_meta(
redis_client, get_topic_meta_info_key(topic, None)
)
return res1 and res2