mirror of https://gitee.com/anolis/sysom.git
561 lines
23 KiB
Python
561 lines
23 KiB
Python
# -*- coding: utf-8 -*- #
|
||
"""
|
||
Time 2022/8/25 10:25
|
||
Author: mingfeng (SunnyQjm)
|
||
Email mfeng@linux.alibaba.com
|
||
File test_redis_admin.py
|
||
Description:
|
||
"""
|
||
import time
|
||
import unittest
|
||
import uuid
|
||
from threading import Thread
|
||
|
||
from cec_base.consumer import dispatch_consumer, Consumer
|
||
from cec_base.admin import dispatch_admin, Admin
|
||
from cec_base.url import CecUrl
|
||
from cec_base.exceptions import ConsumerGroupAlreadyExistsException
|
||
from cec_base.producer import dispatch_producer, Producer
|
||
from cec_redis.redis_consumer import RedisConsumer
|
||
from cec_redis.redis_admin import RedisAdmin
|
||
from cec_redis.redis_producer import RedisProducer
|
||
from cec_redis.common import StaticConst
|
||
from cec_redis.admin_static import get_topic_consumer_group_meta
|
||
|
||
URL = "redis://localhost:6379"
|
||
|
||
|
||
class TestRedisConsumer(unittest.TestCase):
|
||
"""A test class to test RedisConsumer
|
||
|
||
This test class tests the functionality of the redis-based Consumer
|
||
|
||
"""
|
||
|
||
@classmethod
|
||
def setUpClass(cls) -> None:
|
||
"""
|
||
This initialization function is executed when the test program
|
||
starts
|
||
"""
|
||
Consumer.register('redis', RedisConsumer)
|
||
Admin.register('redis', RedisAdmin)
|
||
Producer.register('redis', RedisProducer)
|
||
|
||
def setUp(self) -> None:
|
||
"""
|
||
This initialization function is executed before the execution of each
|
||
test case
|
||
"""
|
||
self.admin = dispatch_admin(URL)
|
||
self.redis_admin: RedisAdmin = self.admin
|
||
self.producer = dispatch_producer(URL)
|
||
|
||
# 1. Create a topic for testing
|
||
self.topic_name = str(uuid.uuid4())
|
||
self.assertEqual(self.admin.create_topic(self.topic_name), True)
|
||
|
||
# 2. Produce some test data for consumption
|
||
for i in range(10):
|
||
self.producer.produce(self.topic_name, {
|
||
"seq": i
|
||
}, callback=lambda e, msg, idx=i: setattr(self, f"msg{idx}", msg))
|
||
|
||
# 3. Check that the messages produced are normal (normal if they have
|
||
# an automatically assigned event_id)
|
||
for i in range(10):
|
||
self.assertNotEqual(getattr(self, f"msg{i}"), "")
|
||
|
||
def tearDown(self) -> None:
|
||
"""
|
||
After each test case is executed, this function is executed to perform
|
||
some cleanup operations
|
||
"""
|
||
self.assertEqual(self.admin.del_topic(self.topic_name), True)
|
||
|
||
self.admin.disconnect()
|
||
self.producer.disconnect()
|
||
|
||
for i in range(10):
|
||
setattr(self, f"msg{i}", "")
|
||
|
||
def consume_one_event(self, consumer: Consumer, target_seq: int,
|
||
need_ack: bool):
|
||
"""Consume one event"""
|
||
new_msg = consumer.consume(batch_consume_limit=1)[0]
|
||
time.sleep(0.1)
|
||
self.assertEqual(new_msg.event_id,
|
||
getattr(self, f"msg{target_seq}").event_id)
|
||
self.assertEqual(int(new_msg.value["seq"]), target_seq)
|
||
if need_ack:
|
||
self.assertEqual(consumer.ack(new_msg), 1)
|
||
|
||
def test_pending_transfer(self):
|
||
"""
|
||
Test transferring overdue messages to the current consumer
|
||
Returns:
|
||
|
||
"""
|
||
|
||
# 1. Creation of consumer group 1
|
||
group1_id = str(uuid.uuid4())
|
||
self.assertEqual(self.admin.create_consumer_group(group1_id),
|
||
True)
|
||
|
||
# 2. The first consumer joins the first consumer group, consumes
|
||
# 5 events should succeed.
|
||
consumer1_id = Consumer.generate_consumer_id()
|
||
url = CecUrl("redis", "localhost:6379", {
|
||
StaticConst.REDIS_SPECIAL_PARM_CEC_PENDING_EXPIRE_TIME: 1400,
|
||
StaticConst.REDIS_SPECIAL_PARM_CEC_ENABLE_PENDING_LIST_TRANSFER:
|
||
True
|
||
})
|
||
with dispatch_consumer(str(url),
|
||
self.topic_name,
|
||
consumer_id=consumer1_id,
|
||
group_id=group1_id) as consumer1:
|
||
for i in range(5):
|
||
self.consume_one_event(consumer1, i, False)
|
||
time.sleep(0.2)
|
||
time.sleep(0.2)
|
||
self.assertEqual(len(consumer1.consume(batch_consume_limit=5)), 2)
|
||
|
||
self.assertEqual(self.admin.del_consumer_group(group1_id), True)
|
||
|
||
def test_heart_beat(self):
|
||
"""
|
||
Test heartbeat func
|
||
|
||
Returns:
|
||
|
||
"""
|
||
# 1. Creation of consumer group 1
|
||
group1_id = str(uuid.uuid4())
|
||
self.assertEqual(self.admin.create_consumer_group(group1_id),
|
||
True)
|
||
|
||
# 2. The first consumer joins the first consumer group, consumes
|
||
# 5 events should succeed.
|
||
url = CecUrl("redis", "localhost:6379", {
|
||
StaticConst.REDIS_SPECIAL_PARM_CEC_HEARTBEAT_INTERVAL: 1,
|
||
StaticConst.REDIS_SPECIAL_PARM_CEC_HEARTBEAT_CHECK_INTERVAL: 2
|
||
})
|
||
with dispatch_consumer(str(url),
|
||
self.topic_name,
|
||
consumer_id=Consumer.generate_consumer_id(),
|
||
group_id=group1_id) as consumer1:
|
||
for i in range(5):
|
||
self.consume_one_event(consumer1, i, False)
|
||
|
||
with dispatch_consumer(str(url),
|
||
self.topic_name,
|
||
consumer_id=Consumer.generate_consumer_id(),
|
||
group_id=group1_id) as consumer2:
|
||
# Wait first consumer offline
|
||
time.sleep(5)
|
||
|
||
# 1. First it will get the 5 messages that consumer one did not
|
||
# process successfully;
|
||
# 2. Secondly, get 5 unconsumed messages.
|
||
for i in range(10):
|
||
self.consume_one_event(consumer2, i, True)
|
||
|
||
self.assertEqual(self.admin.del_consumer_group(group1_id), True)
|
||
|
||
def test_2_consumer_heart_beat(self):
|
||
"""
|
||
|
||
Test two consumers competing to resume messages from offline consumers
|
||
|
||
Returns:
|
||
|
||
"""
|
||
# 1. Creation of consumer group 1
|
||
group1_id = str(uuid.uuid4())
|
||
self.assertEqual(self.admin.create_consumer_group(group1_id),
|
||
True)
|
||
|
||
# 2. The first consumer joins the first consumer group, consumes
|
||
# 5 events should succeed.
|
||
url = CecUrl("redis", "localhost:6379", {
|
||
StaticConst.REDIS_SPECIAL_PARM_CEC_HEARTBEAT_INTERVAL: 1,
|
||
StaticConst.REDIS_SPECIAL_PARM_CEC_HEARTBEAT_CHECK_INTERVAL: 2
|
||
})
|
||
with dispatch_consumer(str(url),
|
||
self.topic_name,
|
||
consumer_id=Consumer.generate_consumer_id(),
|
||
group_id=group1_id) as consumer1:
|
||
for i in range(4):
|
||
self.consume_one_event(consumer1, i, False)
|
||
|
||
with dispatch_consumer(str(url),
|
||
self.topic_name,
|
||
consumer_id=Consumer.generate_consumer_id(),
|
||
group_id=group1_id) as consumer2:
|
||
with dispatch_consumer(str(url),
|
||
self.topic_name,
|
||
consumer_id=Consumer.generate_consumer_id(),
|
||
group_id=group1_id) as consumer3:
|
||
# Wait first consumer offline
|
||
time.sleep(5)
|
||
self.consume_one_event(consumer2, 0, True)
|
||
self.consume_one_event(consumer3, 1, True)
|
||
self.consume_one_event(consumer2, 2, True)
|
||
self.consume_one_event(consumer3, 3, True)
|
||
self.consume_one_event(consumer2, 4, True)
|
||
self.consume_one_event(consumer3, 5, True)
|
||
|
||
self.assertEqual(self.admin.del_consumer_group(group1_id), True)
|
||
|
||
def test_same_consumer(self):
|
||
"""
|
||
|
||
Test use same consumer ID to consume
|
||
|
||
Returns:
|
||
|
||
"""
|
||
# 1. 创建消费组1
|
||
group1_id = str(uuid.uuid4())
|
||
self.assertEqual(self.admin.create_consumer_group(group1_id), True)
|
||
|
||
# 2. 第一个消费者加入第一个消费组,消费5个消息,应当成功
|
||
consumer1_id = Consumer.generate_consumer_id()
|
||
consumer1 = dispatch_consumer(URL, self.topic_name,
|
||
consumer_id=consumer1_id,
|
||
group_id=group1_id)
|
||
|
||
consumer2 = dispatch_consumer(URL, self.topic_name,
|
||
consumer_id=consumer1_id,
|
||
group_id=group1_id)
|
||
|
||
consumer1.disconnect()
|
||
consumer2.disconnect()
|
||
self.assertEqual(self.admin.del_consumer_group(group1_id), True)
|
||
|
||
def test_get_consume_status(self):
|
||
"""
|
||
|
||
Testing get consumption status interface
|
||
|
||
Returns:
|
||
|
||
"""
|
||
|
||
def consume_one_message(consumer: Consumer, group_id: str, target_seq):
|
||
new_msg = consumer.consume(batch_consume_limit=1)[0]
|
||
time.sleep(0.1)
|
||
self.assertEqual(new_msg.event_id,
|
||
getattr(self, f"msg{target_seq}").event_id)
|
||
self.assertEqual(int(new_msg.value["seq"]), target_seq)
|
||
self.assertEqual(consumer.ack(new_msg), 1)
|
||
# Check if the last submitted ID is the one just ACK
|
||
self.assertEqual(get_topic_consumer_group_meta(
|
||
self.redis_admin.client(),
|
||
self.topic_name,
|
||
group_id,
|
||
StaticConst.TOPIC_CONSUMER_GROUP_META_KEY_LAST_ACK_ID
|
||
), new_msg.event_id)
|
||
|
||
# 1. Create consumer group 1
|
||
group1_id = str(uuid.uuid4())
|
||
self.assertEqual(self.admin.create_consumer_group(group1_id), True)
|
||
|
||
# 2. The first consumer joins the first consumption group,
|
||
# consumes 5 events, and should succeed
|
||
consumer1_id = Consumer.generate_consumer_id()
|
||
consumer1 = dispatch_consumer(URL, self.topic_name,
|
||
consumer_id=consumer1_id,
|
||
group_id=group1_id)
|
||
for i in range(5):
|
||
consume_one_message(consumer1, group1_id, i)
|
||
|
||
# 3. Create consumer group 2
|
||
group2_id = str(uuid.uuid4())
|
||
self.assertEqual(self.admin.create_consumer_group(group2_id), True)
|
||
|
||
# 4. A second consumer joins the second consumption group, consuming 2
|
||
# messages, and should succeed
|
||
consumer2_id = Consumer.generate_consumer_id()
|
||
consumer2 = dispatch_consumer(URL, self.topic_name,
|
||
consumer_id=consumer2_id,
|
||
group_id=group2_id)
|
||
for i in range(2):
|
||
consume_one_message(consumer2, group2_id, i)
|
||
|
||
# Test to get the consumption of a given consumer group
|
||
consume_status_item = self.admin.get_consume_status(self.topic_name,
|
||
group1_id)[0]
|
||
self.assertEqual(consume_status_item.topic, self.topic_name)
|
||
self.assertEqual(consume_status_item.consumer_group_id,
|
||
group1_id)
|
||
self.assertEqual(consume_status_item.min_id,
|
||
getattr(self, f"msg{0}").event_id)
|
||
self.assertEqual(consume_status_item.max_id,
|
||
getattr(self, f"msg{9}").event_id)
|
||
self.assertEqual(consume_status_item.total_event_count,
|
||
10)
|
||
self.assertEqual(consume_status_item.last_ack_id,
|
||
getattr(self, f"msg{4}").event_id)
|
||
self.assertEqual(consume_status_item.lag,
|
||
5)
|
||
|
||
# Test to get the consumption of all consumption groups
|
||
results = self.admin.get_consume_status(self.topic_name)
|
||
for result in results:
|
||
if result.consumer_group_id == group2_id:
|
||
self.assertEqual(result.topic, self.topic_name)
|
||
self.assertEqual(result.consumer_group_id,
|
||
group2_id)
|
||
self.assertEqual(result.min_id,
|
||
getattr(self, f"msg{0}").event_id)
|
||
self.assertEqual(result.max_id,
|
||
getattr(self, f"msg{9}").event_id)
|
||
self.assertEqual(result.total_event_count,
|
||
10)
|
||
self.assertEqual(result.last_ack_id,
|
||
getattr(self, f"msg{1}").event_id)
|
||
self.assertEqual(result.lag,
|
||
8)
|
||
|
||
self.assertEqual(self.admin.del_consumer_group(group1_id), True)
|
||
self.assertEqual(self.admin.del_consumer_group(group2_id), True)
|
||
|
||
def test_get_event_list(self):
|
||
"""
|
||
|
||
Test get event list
|
||
|
||
Returns:
|
||
|
||
"""
|
||
# First get the first two messages
|
||
res = self.admin.get_event_list(self.topic_name, 0, '0', 2)
|
||
self.assertEqual(len(res), 2)
|
||
self.assertEqual(res[0].event_id, getattr(self, f"msg{0}").event_id)
|
||
self.assertEqual(res[1].event_id, getattr(self, f"msg{1}").event_id)
|
||
|
||
# Get the next 8 messages (a count of 20 will also only get 8 events,
|
||
# since there are only 8 left)
|
||
res = self.admin.get_event_list(self.topic_name, 0, res[1].event_id,
|
||
20)
|
||
self.assertEqual(len(res), 8)
|
||
for i in range(8):
|
||
self.assertEqual(res[i].event_id,
|
||
getattr(self, f"msg{i + 2}").event_id)
|
||
|
||
# Offset exceeds the maximum message value and should get the empty list
|
||
res = self.admin.get_event_list(self.topic_name, 0, res[7].event_id,
|
||
20)
|
||
self.assertEqual(res, [])
|
||
|
||
def test_broadcast_consume_from_early(self):
|
||
"""
|
||
Test broadcast consumption, starting with the earliest messages
|
||
"""
|
||
consumer = dispatch_consumer(URL, self.topic_name,
|
||
start_from_now=False)
|
||
consumer2 = dispatch_consumer(URL, self.topic_name,
|
||
start_from_now=False)
|
||
|
||
# 1.The first consumer takes out the first 10 and determines whether
|
||
# they are successfully taken out
|
||
for i in range(10):
|
||
msg = consumer.consume(batch_consume_limit=1)[0]
|
||
self.assertEqual(msg.event_id,
|
||
getattr(self, f"msg{i}").event_id)
|
||
self.assertEqual(int(msg.value["seq"]), i)
|
||
|
||
# 2. The second consumer takes out the first 10, and since it starts
|
||
# with the earliest message, it should also be able to successfully
|
||
# take out
|
||
for i in range(10):
|
||
msg = consumer2.consume(batch_consume_limit=1)[0]
|
||
self.assertEqual(msg.event_id,
|
||
getattr(self, f"msg{i}").event_id)
|
||
self.assertEqual(int(msg.value["seq"]), i)
|
||
|
||
# 3. The consumer tries to consume the 11th message, which will time
|
||
# out after 100ms and will get an empty array
|
||
self.assertEqual(len(consumer.consume(100)), 0)
|
||
self.assertEqual(len(consumer2.consume(100)), 0)
|
||
|
||
def test_broadcast_consume_start_from_now(self):
|
||
"""
|
||
Testing broadcast consumption, listening for consumption from the
|
||
moment of access
|
||
"""
|
||
|
||
def produce_new_msg(seq: int):
|
||
time.sleep(0.1)
|
||
self.producer.produce(self.topic_name, {
|
||
"seq": seq
|
||
}, callback=lambda e, _msg: setattr(self, f"msg{seq}", _msg))
|
||
self.assertNotEqual(getattr(self, f"msg{seq}"), "")
|
||
|
||
def consume_one_message(consumer: Consumer, target_seq):
|
||
new_msg = consumer.consume(batch_consume_limit=1)[0]
|
||
time.sleep(0.1)
|
||
self.assertEqual(new_msg.event_id,
|
||
getattr(self, f"msg{target_seq}").event_id)
|
||
self.assertEqual(int(new_msg.value["seq"]), target_seq)
|
||
|
||
consumer = dispatch_consumer(URL, self.topic_name, start_from_now=True)
|
||
|
||
# 1. The first consumer accesses and attempts to read the event
|
||
# should fail, although the producer has already produced 10
|
||
# events before, but since it is specified to listen from the
|
||
# access moment, it will only receive events generated after the
|
||
# access moment
|
||
self.assertEqual(len(consumer.consume(100)), 0)
|
||
|
||
# 2. At this point another event is produced with the producer, it
|
||
# can be consumed by the consumer
|
||
thread1 = Thread(target=produce_new_msg, args=[10])
|
||
thread1.start()
|
||
consume_one_message(consumer, 10)
|
||
thread1.join()
|
||
|
||
# 4. The second consumer accesses it at this point and attempts to
|
||
# read the event should fail
|
||
consumer2 = dispatch_consumer(URL, self.topic_name,
|
||
start_from_now=True)
|
||
self.assertEqual(len(consumer2.consume(100)), 0)
|
||
|
||
# 5. At this point another event is produced with the producer, then
|
||
# it can be consumed by consumer2
|
||
thread1 = Thread(target=produce_new_msg, args=[11])
|
||
thread1.start()
|
||
consume_one_message(consumer2, 11)
|
||
thread1.join()
|
||
|
||
# 6. Since the consumer has been accessed before, the consumer should
|
||
# also be able to consume the message numbered 11
|
||
msg = consumer.consume(batch_consume_limit=1)[0]
|
||
self.assertEqual(msg.event_id,
|
||
getattr(self, f"msg{11}").event_id)
|
||
self.assertEqual(int(msg.value["seq"]), 11)
|
||
|
||
def test_group_consume(self):
|
||
"""
|
||
Test group consumption
|
||
"""
|
||
|
||
def consume_one_message(consumer: Consumer, group_id: str, target_seq):
|
||
new_msg = consumer.consume(batch_consume_limit=1)[0]
|
||
time.sleep(0.1)
|
||
self.assertEqual(new_msg.event_id,
|
||
getattr(self, f"msg{target_seq}").event_id)
|
||
self.assertEqual(int(new_msg.value["seq"]), target_seq)
|
||
self.assertEqual(consumer.ack(new_msg), 1)
|
||
# Check if the last submitted ID is the one just ACK
|
||
self.assertEqual(get_topic_consumer_group_meta(
|
||
self.redis_admin.client(),
|
||
self.topic_name,
|
||
group_id,
|
||
StaticConst.TOPIC_CONSUMER_GROUP_META_KEY_LAST_ACK_ID
|
||
), new_msg.event_id)
|
||
|
||
# 1. Create a consumer group, which should succeed
|
||
group1_id = str(uuid.uuid4())
|
||
self.assertEqual(self.admin.create_consumer_group(group1_id), True)
|
||
|
||
# 2. Attempts to create duplicate consumer groups should throw an
|
||
# exception by default
|
||
with self.assertRaises(ConsumerGroupAlreadyExistsException):
|
||
self.admin.create_consumer_group(group1_id)
|
||
|
||
# 3. Use the ignore_exception parameter and then try to create the
|
||
# duplicate consumption group, which should not throw an exception
|
||
# and directly return False
|
||
self.assertEqual(
|
||
self.admin.create_consumer_group(group1_id, ignore_exception=True),
|
||
False)
|
||
|
||
# 4. The first consumer joins the first consumption group, consumes 5
|
||
# events, and should succeed
|
||
consumer1_id = Consumer.generate_consumer_id()
|
||
consumer1 = dispatch_consumer(URL, self.topic_name,
|
||
consumer_id=consumer1_id,
|
||
group_id=group1_id)
|
||
for i in range(5):
|
||
consume_one_message(consumer1, group1_id, i)
|
||
|
||
# 5. The second consumer joins the first consumption group and consumes
|
||
# 5 events (messages 6 to 10), which should succeed
|
||
consumer2 = dispatch_consumer(URL, self.topic_name,
|
||
group_id=group1_id)
|
||
for i in range(5):
|
||
consume_one_message(consumer2, group1_id, i + 5)
|
||
|
||
# 6. The second consumer tries again to consume a message, which should
|
||
# fail because all the messages of this topic have been consumed by
|
||
# the consumer before
|
||
self.assertEqual(len(consumer2.consume(100)), 0)
|
||
|
||
# 7. This time create another consumption group
|
||
group2_id = str(uuid.uuid4())
|
||
self.assertEqual(self.admin.create_consumer_group(group2_id), True)
|
||
|
||
# 8. Consumer 1 joins consumer group 2 and consumes 5 events (1 to 5)
|
||
# should succeed
|
||
consumer1_in_group2 = dispatch_consumer(URL, self.topic_name,
|
||
consumer_id=consumer1_id,
|
||
group_id=group2_id)
|
||
for i in range(5):
|
||
consume_one_message(consumer1_in_group2, group2_id, i)
|
||
|
||
# 9. Consumer 3 joins consumer group 2 and consumes 5 events (6 to 10),
|
||
# which should succeed
|
||
consumer3 = dispatch_consumer(URL, self.topic_name,
|
||
group_id=group2_id)
|
||
for i in range(5):
|
||
consume_one_message(consumer3, group2_id, i + 5)
|
||
|
||
consumer1.disconnect()
|
||
consumer2.disconnect()
|
||
consumer3.disconnect()
|
||
consumer1_in_group2.disconnect()
|
||
|
||
# 10. Delete consumer groups 1 and 2
|
||
self.assertEqual(self.admin.del_consumer_group(group1_id), True)
|
||
self.assertEqual(self.admin.del_consumer_group(group2_id), True)
|
||
|
||
def test_broadcast_consume_bytes_string_event(self):
|
||
"""Test broadcast consume bytes event"""
|
||
consumer = dispatch_consumer(URL, self.topic_name,
|
||
start_from_now=False)
|
||
# 1.The first consumer takes out the first 10 and determines whether
|
||
# they are successfully taken out
|
||
for i in range(10):
|
||
msg = consumer.consume(batch_consume_limit=1)[0]
|
||
self.assertEqual(msg.event_id,
|
||
getattr(self, f"msg{i}").event_id)
|
||
self.assertEqual(int(msg.value["seq"]), i)
|
||
|
||
# 2. Produce bytes value and consume it
|
||
test_byte_value = b"test bytes value"
|
||
self.producer.produce(
|
||
self.topic_name, test_byte_value,
|
||
callback=lambda e, msg, idx=10: setattr(self, f"msg{idx}", msg)
|
||
)
|
||
|
||
msg = consumer.consume(batch_consume_limit=1)[0]
|
||
self.assertEqual(msg.event_id, getattr(self, f"msg{10}").event_id)
|
||
self.assertEqual(msg.value, test_byte_value)
|
||
|
||
# 3. Produce string value and consume it
|
||
test_string_value = "test bytes value"
|
||
self.producer.produce(
|
||
self.topic_name, test_string_value,
|
||
callback=lambda e, msg, idx=11: setattr(self, f"msg{idx}", msg)
|
||
)
|
||
|
||
msg = consumer.consume(batch_consume_limit=1)[0]
|
||
self.assertEqual(msg.event_id, getattr(self, f"msg{11}").event_id)
|
||
self.assertEqual(msg.value, test_string_value)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
unittest.main()
|