mirror of https://gitee.com/anolis/sysom.git
230 lines
8.3 KiB
Python
230 lines
8.3 KiB
Python
# -*- coding: utf-8 -*- #
|
|
"""
|
|
Time 2022/8/10 14:48
|
|
Author: mingfeng (SunnyQjm)
|
|
Email mfeng@linux.alibaba.com
|
|
File test_redis_admin.py
|
|
Description:
|
|
"""
|
|
import unittest
|
|
import uuid
|
|
|
|
from cec_base.exceptions import TopicAlreadyExistsException, \
|
|
TopicNotExistsException, ConsumerGroupAlreadyExistsException, \
|
|
ConsumerGroupNotExistsException
|
|
from cec_base.admin import Admin, dispatch_admin
|
|
from cec_redis.redis_admin import RedisAdmin
|
|
from cec_redis.admin_static import get_topic_meta, store_topic_meta, \
|
|
del_topic_meta
|
|
|
|
|
|
class TestRedisAdmin(unittest.TestCase):
|
|
"""A test class to test RedisAdmin
|
|
|
|
This test class tests the functionality of the redis-based Admin
|
|
|
|
"""
|
|
|
|
@classmethod
|
|
def setUpClass(cls) -> None:
|
|
Admin.register('redis', RedisAdmin)
|
|
|
|
def setUp(self) -> None:
|
|
self.admin = dispatch_admin("redis://cms-sysomserver-daily.redis.zhangbei.rds.aliyuncs.com:6379?cec_default_max_len=1000&cec_auto_mk_topic=true&password=sysom@123456")
|
|
|
|
self.test_topic_name = str(uuid.uuid4())
|
|
self.assertEqual(self.admin.create_topic(self.test_topic_name), True)
|
|
self.test_group_id = str(uuid.uuid4())
|
|
self.assertEqual(self.admin.create_consumer_group(self.test_group_id),
|
|
True)
|
|
self.assertEqual(
|
|
RedisAdmin.add_group_to_stream(self.admin.client(),
|
|
self.test_topic_name,
|
|
self.test_group_id),
|
|
True)
|
|
|
|
def tearDown(self) -> None:
|
|
self.assertEqual(self.admin.del_topic(self.test_topic_name), True)
|
|
self.assertEqual(self.admin.del_consumer_group(self.test_group_id),
|
|
True)
|
|
self.admin.disconnect()
|
|
|
|
def test_get_consumer_group_list(self):
|
|
"""Test get consumer group list
|
|
|
|
Returns:
|
|
|
|
"""
|
|
print(self.admin.get_consumer_group_list())
|
|
|
|
def test_topic_meta_info_opt(self):
|
|
"""Test store and get meta info
|
|
|
|
Returns:
|
|
|
|
"""
|
|
redis_admin: RedisAdmin = self.admin
|
|
random_topic = str(uuid.uuid4())
|
|
|
|
# Gets metadata that does not exist and should return None
|
|
self.assertEqual(get_topic_meta(redis_admin.client(),
|
|
random_topic, "test-none"),
|
|
None)
|
|
|
|
# Store a metadata that should succeed
|
|
self.assertEqual(
|
|
store_topic_meta(redis_admin.client(), random_topic,
|
|
'test', 1),
|
|
True)
|
|
|
|
# Getting the metadata just stored should be successful
|
|
self.assertEqual(
|
|
int(get_topic_meta(redis_admin.client(), random_topic,
|
|
'test')),
|
|
1)
|
|
|
|
# Updating metadata should be successful
|
|
self.assertEqual(
|
|
store_topic_meta(redis_admin.client(), random_topic,
|
|
'test', "2333"), True)
|
|
|
|
# Obtaining the updated data should be successful
|
|
self.assertEqual(
|
|
get_topic_meta(redis_admin.client(), random_topic,
|
|
'test'),
|
|
'2333')
|
|
# Deleted successfully
|
|
del_topic_meta(redis_admin.client(), random_topic)
|
|
|
|
def test_topic(self):
|
|
"""Integration testing of the topic management functions
|
|
|
|
Integration testing of the topic management functions, including
|
|
adding, deleting and list fetching
|
|
|
|
Returns:
|
|
|
|
"""
|
|
# 1. Test connection
|
|
admin = self.admin
|
|
random_id = str(uuid.uuid4())
|
|
|
|
# 2. First creation should be successful
|
|
self.assertEqual(admin.create_topic(random_id), True)
|
|
|
|
# 3. Creating a topic with the same name should throw an exception
|
|
with self.assertRaises(TopicAlreadyExistsException):
|
|
admin.create_topic(random_id)
|
|
|
|
# 4. Gets meta information about a Topic that does not exist and should
|
|
# be empty
|
|
self.assertEqual(
|
|
RedisAdmin.get_meta_info(admin.client(),
|
|
f"{random_id}-no"), None
|
|
)
|
|
|
|
# 5. Get the meta information of the existing topic, the ID should be
|
|
# the same
|
|
self.assertEqual(
|
|
RedisAdmin.get_meta_info(admin.client(), random_id)['topic_name'],
|
|
random_id)
|
|
|
|
# 6. Topic that exists, is_topic_exist should return True
|
|
self.assertEqual(admin.is_topic_exist(random_id), True)
|
|
|
|
# 7. Topic that does not exist, is_topic_exist should return False
|
|
self.assertEqual(admin.is_topic_exist(f"{random_id}-no"), False)
|
|
|
|
# 8. Get the list of Topics, the Topic you just created should be
|
|
# included in it
|
|
self.assertIn(random_id,
|
|
[topic.topic_name for topic in admin.get_topic_list()])
|
|
|
|
# 9. Deleting Topic for the first time should be successful
|
|
self.assertEqual(admin.del_topic(random_id), True)
|
|
|
|
# 10. Deleting Topic a second time should throw an exception
|
|
with self.assertRaises(TopicNotExistsException):
|
|
admin.del_topic(random_id)
|
|
|
|
# 11. Deleting a non-existent Topic should also throw an exception
|
|
with self.assertRaises(TopicNotExistsException):
|
|
admin.del_topic(f"{random_id}-no")
|
|
|
|
# 12. After a topic has been successfully deleted, is_topic_exist
|
|
# should return False
|
|
self.assertEqual(admin.is_topic_exist(random_id), False)
|
|
|
|
# 13. After a topic has been successfully deleted, the Topic list
|
|
# should no longer contain
|
|
self.assertNotIn(random_id, [topic.topic_name for topic in
|
|
admin.get_topic_list()])
|
|
|
|
def test_consumer_group(self):
|
|
"""Integration testing of the consumer group management functions
|
|
|
|
Integration testing of the consumer group management functions,
|
|
including adding, deleting and list fetching
|
|
|
|
Returns:
|
|
|
|
"""
|
|
# 1. Test connection
|
|
admin = self.admin
|
|
random_id = str(uuid.uuid4())
|
|
consumer_id = str(uuid.uuid4())
|
|
|
|
# 2. Create a ConsumerGroup
|
|
self.assertEqual(admin.create_consumer_group(consumer_id), True)
|
|
|
|
# 3. Creating another ConsumerGroup with the same name should throw an
|
|
# exception that the consumer group already exists
|
|
with self.assertRaises(ConsumerGroupAlreadyExistsException):
|
|
admin.create_consumer_group(consumer_id)
|
|
|
|
# 4. ConsumerGroup that exists, is_consumer_group_exist should return
|
|
# True
|
|
self.assertEqual(admin.is_consumer_group_exist(consumer_id), True)
|
|
|
|
# 5. ConsumerGroup that does not exist, is_consumer_group_exist should
|
|
# return False
|
|
self.assertEqual(admin.is_consumer_group_exist(f"{consumer_id}-no"),
|
|
False)
|
|
|
|
# 6. Get the ConsumerGroup list, the consumer group you just created
|
|
# should be included in it
|
|
self.assertIn(consumer_id, [group.group_id for group in
|
|
admin.get_consumer_group_list()])
|
|
|
|
# 7. Create a Topic for testing
|
|
self.assertEqual(admin.create_topic(random_id), True)
|
|
|
|
# 8. Testing the addition of a consumption group to a topic
|
|
self.assertEqual(
|
|
RedisAdmin.add_group_to_stream(admin.client(), random_id,
|
|
consumer_id), True)
|
|
|
|
# 9. Repeated joins of consumer groups to a topic should fail
|
|
self.assertEqual(
|
|
RedisAdmin.add_group_to_stream(admin.client(), random_id,
|
|
consumer_id), False)
|
|
|
|
# 10. Deleting ConsumerGroup for the first time should be successful
|
|
self.assertEqual(admin.del_consumer_group(consumer_id), True)
|
|
|
|
# 11. The second deletion should throw an exception
|
|
with self.assertRaises(ConsumerGroupNotExistsException):
|
|
admin.del_consumer_group(consumer_id)
|
|
|
|
# 12. Deleting a ConsumerGroup that does not exist should also throw an
|
|
# exception
|
|
with self.assertRaises(ConsumerGroupNotExistsException):
|
|
admin.del_consumer_group(f"{consumer_id}-no")
|
|
|
|
# 13. Delete the Topic for testing
|
|
self.assertEqual(admin.del_topic(random_id), True)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|