sysom1/environment/1_sdk/test_cec_redis/test_redis_admin.py

230 lines
8.3 KiB
Python

# -*- coding: utf-8 -*- #
"""
Time 2022/8/10 14:48
Author: mingfeng (SunnyQjm)
Email mfeng@linux.alibaba.com
File test_redis_admin.py
Description:
"""
import unittest
import uuid
from cec_base.exceptions import TopicAlreadyExistsException, \
TopicNotExistsException, ConsumerGroupAlreadyExistsException, \
ConsumerGroupNotExistsException
from cec_base.admin import Admin, dispatch_admin
from cec_redis.redis_admin import RedisAdmin
from cec_redis.admin_static import get_topic_meta, store_topic_meta, \
del_topic_meta
class TestRedisAdmin(unittest.TestCase):
"""A test class to test RedisAdmin
This test class tests the functionality of the redis-based Admin
"""
@classmethod
def setUpClass(cls) -> None:
Admin.register('redis', RedisAdmin)
def setUp(self) -> None:
self.admin = dispatch_admin("redis://cms-sysomserver-daily.redis.zhangbei.rds.aliyuncs.com:6379?cec_default_max_len=1000&cec_auto_mk_topic=true&password=sysom@123456")
self.test_topic_name = str(uuid.uuid4())
self.assertEqual(self.admin.create_topic(self.test_topic_name), True)
self.test_group_id = str(uuid.uuid4())
self.assertEqual(self.admin.create_consumer_group(self.test_group_id),
True)
self.assertEqual(
RedisAdmin.add_group_to_stream(self.admin.client(),
self.test_topic_name,
self.test_group_id),
True)
def tearDown(self) -> None:
self.assertEqual(self.admin.del_topic(self.test_topic_name), True)
self.assertEqual(self.admin.del_consumer_group(self.test_group_id),
True)
self.admin.disconnect()
def test_get_consumer_group_list(self):
"""Test get consumer group list
Returns:
"""
print(self.admin.get_consumer_group_list())
def test_topic_meta_info_opt(self):
"""Test store and get meta info
Returns:
"""
redis_admin: RedisAdmin = self.admin
random_topic = str(uuid.uuid4())
# Gets metadata that does not exist and should return None
self.assertEqual(get_topic_meta(redis_admin.client(),
random_topic, "test-none"),
None)
# Store a metadata that should succeed
self.assertEqual(
store_topic_meta(redis_admin.client(), random_topic,
'test', 1),
True)
# Getting the metadata just stored should be successful
self.assertEqual(
int(get_topic_meta(redis_admin.client(), random_topic,
'test')),
1)
# Updating metadata should be successful
self.assertEqual(
store_topic_meta(redis_admin.client(), random_topic,
'test', "2333"), True)
# Obtaining the updated data should be successful
self.assertEqual(
get_topic_meta(redis_admin.client(), random_topic,
'test'),
'2333')
# Deleted successfully
del_topic_meta(redis_admin.client(), random_topic)
def test_topic(self):
"""Integration testing of the topic management functions
Integration testing of the topic management functions, including
adding, deleting and list fetching
Returns:
"""
# 1. Test connection
admin = self.admin
random_id = str(uuid.uuid4())
# 2. First creation should be successful
self.assertEqual(admin.create_topic(random_id), True)
# 3. Creating a topic with the same name should throw an exception
with self.assertRaises(TopicAlreadyExistsException):
admin.create_topic(random_id)
# 4. Gets meta information about a Topic that does not exist and should
# be empty
self.assertEqual(
RedisAdmin.get_meta_info(admin.client(),
f"{random_id}-no"), None
)
# 5. Get the meta information of the existing topic, the ID should be
# the same
self.assertEqual(
RedisAdmin.get_meta_info(admin.client(), random_id)['topic_name'],
random_id)
# 6. Topic that exists, is_topic_exist should return True
self.assertEqual(admin.is_topic_exist(random_id), True)
# 7. Topic that does not exist, is_topic_exist should return False
self.assertEqual(admin.is_topic_exist(f"{random_id}-no"), False)
# 8. Get the list of Topics, the Topic you just created should be
# included in it
self.assertIn(random_id,
[topic.topic_name for topic in admin.get_topic_list()])
# 9. Deleting Topic for the first time should be successful
self.assertEqual(admin.del_topic(random_id), True)
# 10. Deleting Topic a second time should throw an exception
with self.assertRaises(TopicNotExistsException):
admin.del_topic(random_id)
# 11. Deleting a non-existent Topic should also throw an exception
with self.assertRaises(TopicNotExistsException):
admin.del_topic(f"{random_id}-no")
# 12. After a topic has been successfully deleted, is_topic_exist
# should return False
self.assertEqual(admin.is_topic_exist(random_id), False)
# 13. After a topic has been successfully deleted, the Topic list
# should no longer contain
self.assertNotIn(random_id, [topic.topic_name for topic in
admin.get_topic_list()])
def test_consumer_group(self):
"""Integration testing of the consumer group management functions
Integration testing of the consumer group management functions,
including adding, deleting and list fetching
Returns:
"""
# 1. Test connection
admin = self.admin
random_id = str(uuid.uuid4())
consumer_id = str(uuid.uuid4())
# 2. Create a ConsumerGroup
self.assertEqual(admin.create_consumer_group(consumer_id), True)
# 3. Creating another ConsumerGroup with the same name should throw an
# exception that the consumer group already exists
with self.assertRaises(ConsumerGroupAlreadyExistsException):
admin.create_consumer_group(consumer_id)
# 4. ConsumerGroup that exists, is_consumer_group_exist should return
# True
self.assertEqual(admin.is_consumer_group_exist(consumer_id), True)
# 5. ConsumerGroup that does not exist, is_consumer_group_exist should
# return False
self.assertEqual(admin.is_consumer_group_exist(f"{consumer_id}-no"),
False)
# 6. Get the ConsumerGroup list, the consumer group you just created
# should be included in it
self.assertIn(consumer_id, [group.group_id for group in
admin.get_consumer_group_list()])
# 7. Create a Topic for testing
self.assertEqual(admin.create_topic(random_id), True)
# 8. Testing the addition of a consumption group to a topic
self.assertEqual(
RedisAdmin.add_group_to_stream(admin.client(), random_id,
consumer_id), True)
# 9. Repeated joins of consumer groups to a topic should fail
self.assertEqual(
RedisAdmin.add_group_to_stream(admin.client(), random_id,
consumer_id), False)
# 10. Deleting ConsumerGroup for the first time should be successful
self.assertEqual(admin.del_consumer_group(consumer_id), True)
# 11. The second deletion should throw an exception
with self.assertRaises(ConsumerGroupNotExistsException):
admin.del_consumer_group(consumer_id)
# 12. Deleting a ConsumerGroup that does not exist should also throw an
# exception
with self.assertRaises(ConsumerGroupNotExistsException):
admin.del_consumer_group(f"{consumer_id}-no")
# 13. Delete the Topic for testing
self.assertEqual(admin.del_topic(random_id), True)
if __name__ == '__main__':
unittest.main()