sysom1/environment/1_sdk/test_cec_redis/test_redis_consumer.py

561 lines
23 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*- #
"""
Time 2022/8/25 10:25
Author: mingfeng (SunnyQjm)
Email mfeng@linux.alibaba.com
File test_redis_admin.py
Description:
"""
import time
import unittest
import uuid
from threading import Thread
from cec_base.consumer import dispatch_consumer, Consumer
from cec_base.admin import dispatch_admin, Admin
from cec_base.url import CecUrl
from cec_base.exceptions import ConsumerGroupAlreadyExistsException
from cec_base.producer import dispatch_producer, Producer
from cec_redis.redis_consumer import RedisConsumer
from cec_redis.redis_admin import RedisAdmin
from cec_redis.redis_producer import RedisProducer
from cec_redis.common import StaticConst
from cec_redis.admin_static import get_topic_consumer_group_meta
URL = "redis://localhost:6379"
class TestRedisConsumer(unittest.TestCase):
"""A test class to test RedisConsumer
This test class tests the functionality of the redis-based Consumer
"""
@classmethod
def setUpClass(cls) -> None:
"""
This initialization function is executed when the test program
starts
"""
Consumer.register('redis', RedisConsumer)
Admin.register('redis', RedisAdmin)
Producer.register('redis', RedisProducer)
def setUp(self) -> None:
"""
This initialization function is executed before the execution of each
test case
"""
self.admin = dispatch_admin(URL)
self.redis_admin: RedisAdmin = self.admin
self.producer = dispatch_producer(URL)
# 1. Create a topic for testing
self.topic_name = str(uuid.uuid4())
self.assertEqual(self.admin.create_topic(self.topic_name), True)
# 2. Produce some test data for consumption
for i in range(10):
self.producer.produce(self.topic_name, {
"seq": i
}, callback=lambda e, msg, idx=i: setattr(self, f"msg{idx}", msg))
# 3. Check that the messages produced are normal (normal if they have
# an automatically assigned event_id)
for i in range(10):
self.assertNotEqual(getattr(self, f"msg{i}"), "")
def tearDown(self) -> None:
"""
After each test case is executed, this function is executed to perform
some cleanup operations
"""
self.assertEqual(self.admin.del_topic(self.topic_name), True)
self.admin.disconnect()
self.producer.disconnect()
for i in range(10):
setattr(self, f"msg{i}", "")
def consume_one_event(self, consumer: Consumer, target_seq: int,
need_ack: bool):
"""Consume one event"""
new_msg = consumer.consume(batch_consume_limit=1)[0]
time.sleep(0.1)
self.assertEqual(new_msg.event_id,
getattr(self, f"msg{target_seq}").event_id)
self.assertEqual(int(new_msg.value["seq"]), target_seq)
if need_ack:
self.assertEqual(consumer.ack(new_msg), 1)
def test_pending_transfer(self):
"""
Test transferring overdue messages to the current consumer
Returns:
"""
# 1. Creation of consumer group 1
group1_id = str(uuid.uuid4())
self.assertEqual(self.admin.create_consumer_group(group1_id),
True)
# 2. The first consumer joins the first consumer group, consumes
# 5 events should succeed.
consumer1_id = Consumer.generate_consumer_id()
url = CecUrl("redis", "localhost:6379", {
StaticConst.REDIS_SPECIAL_PARM_CEC_PENDING_EXPIRE_TIME: 1400,
StaticConst.REDIS_SPECIAL_PARM_CEC_ENABLE_PENDING_LIST_TRANSFER:
True
})
with dispatch_consumer(str(url),
self.topic_name,
consumer_id=consumer1_id,
group_id=group1_id) as consumer1:
for i in range(5):
self.consume_one_event(consumer1, i, False)
time.sleep(0.2)
time.sleep(0.2)
self.assertEqual(len(consumer1.consume(batch_consume_limit=5)), 2)
self.assertEqual(self.admin.del_consumer_group(group1_id), True)
def test_heart_beat(self):
"""
Test heartbeat func
Returns:
"""
# 1. Creation of consumer group 1
group1_id = str(uuid.uuid4())
self.assertEqual(self.admin.create_consumer_group(group1_id),
True)
# 2. The first consumer joins the first consumer group, consumes
# 5 events should succeed.
url = CecUrl("redis", "localhost:6379", {
StaticConst.REDIS_SPECIAL_PARM_CEC_HEARTBEAT_INTERVAL: 1,
StaticConst.REDIS_SPECIAL_PARM_CEC_HEARTBEAT_CHECK_INTERVAL: 2
})
with dispatch_consumer(str(url),
self.topic_name,
consumer_id=Consumer.generate_consumer_id(),
group_id=group1_id) as consumer1:
for i in range(5):
self.consume_one_event(consumer1, i, False)
with dispatch_consumer(str(url),
self.topic_name,
consumer_id=Consumer.generate_consumer_id(),
group_id=group1_id) as consumer2:
# Wait first consumer offline
time.sleep(5)
# 1. First it will get the 5 messages that consumer one did not
# process successfully;
# 2. Secondly, get 5 unconsumed messages.
for i in range(10):
self.consume_one_event(consumer2, i, True)
self.assertEqual(self.admin.del_consumer_group(group1_id), True)
def test_2_consumer_heart_beat(self):
"""
Test two consumers competing to resume messages from offline consumers
Returns:
"""
# 1. Creation of consumer group 1
group1_id = str(uuid.uuid4())
self.assertEqual(self.admin.create_consumer_group(group1_id),
True)
# 2. The first consumer joins the first consumer group, consumes
# 5 events should succeed.
url = CecUrl("redis", "localhost:6379", {
StaticConst.REDIS_SPECIAL_PARM_CEC_HEARTBEAT_INTERVAL: 1,
StaticConst.REDIS_SPECIAL_PARM_CEC_HEARTBEAT_CHECK_INTERVAL: 2
})
with dispatch_consumer(str(url),
self.topic_name,
consumer_id=Consumer.generate_consumer_id(),
group_id=group1_id) as consumer1:
for i in range(4):
self.consume_one_event(consumer1, i, False)
with dispatch_consumer(str(url),
self.topic_name,
consumer_id=Consumer.generate_consumer_id(),
group_id=group1_id) as consumer2:
with dispatch_consumer(str(url),
self.topic_name,
consumer_id=Consumer.generate_consumer_id(),
group_id=group1_id) as consumer3:
# Wait first consumer offline
time.sleep(5)
self.consume_one_event(consumer2, 0, True)
self.consume_one_event(consumer3, 1, True)
self.consume_one_event(consumer2, 2, True)
self.consume_one_event(consumer3, 3, True)
self.consume_one_event(consumer2, 4, True)
self.consume_one_event(consumer3, 5, True)
self.assertEqual(self.admin.del_consumer_group(group1_id), True)
def test_same_consumer(self):
"""
Test use same consumer ID to consume
Returns:
"""
# 1. 创建消费组1
group1_id = str(uuid.uuid4())
self.assertEqual(self.admin.create_consumer_group(group1_id), True)
# 2. 第一个消费者加入第一个消费组消费5个消息应当成功
consumer1_id = Consumer.generate_consumer_id()
consumer1 = dispatch_consumer(URL, self.topic_name,
consumer_id=consumer1_id,
group_id=group1_id)
consumer2 = dispatch_consumer(URL, self.topic_name,
consumer_id=consumer1_id,
group_id=group1_id)
consumer1.disconnect()
consumer2.disconnect()
self.assertEqual(self.admin.del_consumer_group(group1_id), True)
def test_get_consume_status(self):
"""
Testing get consumption status interface
Returns:
"""
def consume_one_message(consumer: Consumer, group_id: str, target_seq):
new_msg = consumer.consume(batch_consume_limit=1)[0]
time.sleep(0.1)
self.assertEqual(new_msg.event_id,
getattr(self, f"msg{target_seq}").event_id)
self.assertEqual(int(new_msg.value["seq"]), target_seq)
self.assertEqual(consumer.ack(new_msg), 1)
# Check if the last submitted ID is the one just ACK
self.assertEqual(get_topic_consumer_group_meta(
self.redis_admin.client(),
self.topic_name,
group_id,
StaticConst.TOPIC_CONSUMER_GROUP_META_KEY_LAST_ACK_ID
), new_msg.event_id)
# 1. Create consumer group 1
group1_id = str(uuid.uuid4())
self.assertEqual(self.admin.create_consumer_group(group1_id), True)
# 2. The first consumer joins the first consumption group,
# consumes 5 events, and should succeed
consumer1_id = Consumer.generate_consumer_id()
consumer1 = dispatch_consumer(URL, self.topic_name,
consumer_id=consumer1_id,
group_id=group1_id)
for i in range(5):
consume_one_message(consumer1, group1_id, i)
# 3. Create consumer group 2
group2_id = str(uuid.uuid4())
self.assertEqual(self.admin.create_consumer_group(group2_id), True)
# 4. A second consumer joins the second consumption group, consuming 2
# messages, and should succeed
consumer2_id = Consumer.generate_consumer_id()
consumer2 = dispatch_consumer(URL, self.topic_name,
consumer_id=consumer2_id,
group_id=group2_id)
for i in range(2):
consume_one_message(consumer2, group2_id, i)
# Test to get the consumption of a given consumer group
consume_status_item = self.admin.get_consume_status(self.topic_name,
group1_id)[0]
self.assertEqual(consume_status_item.topic, self.topic_name)
self.assertEqual(consume_status_item.consumer_group_id,
group1_id)
self.assertEqual(consume_status_item.min_id,
getattr(self, f"msg{0}").event_id)
self.assertEqual(consume_status_item.max_id,
getattr(self, f"msg{9}").event_id)
self.assertEqual(consume_status_item.total_event_count,
10)
self.assertEqual(consume_status_item.last_ack_id,
getattr(self, f"msg{4}").event_id)
self.assertEqual(consume_status_item.lag,
5)
# Test to get the consumption of all consumption groups
results = self.admin.get_consume_status(self.topic_name)
for result in results:
if result.consumer_group_id == group2_id:
self.assertEqual(result.topic, self.topic_name)
self.assertEqual(result.consumer_group_id,
group2_id)
self.assertEqual(result.min_id,
getattr(self, f"msg{0}").event_id)
self.assertEqual(result.max_id,
getattr(self, f"msg{9}").event_id)
self.assertEqual(result.total_event_count,
10)
self.assertEqual(result.last_ack_id,
getattr(self, f"msg{1}").event_id)
self.assertEqual(result.lag,
8)
self.assertEqual(self.admin.del_consumer_group(group1_id), True)
self.assertEqual(self.admin.del_consumer_group(group2_id), True)
def test_get_event_list(self):
"""
Test get event list
Returns:
"""
# First get the first two messages
res = self.admin.get_event_list(self.topic_name, 0, '0', 2)
self.assertEqual(len(res), 2)
self.assertEqual(res[0].event_id, getattr(self, f"msg{0}").event_id)
self.assertEqual(res[1].event_id, getattr(self, f"msg{1}").event_id)
# Get the next 8 messages (a count of 20 will also only get 8 events,
# since there are only 8 left)
res = self.admin.get_event_list(self.topic_name, 0, res[1].event_id,
20)
self.assertEqual(len(res), 8)
for i in range(8):
self.assertEqual(res[i].event_id,
getattr(self, f"msg{i + 2}").event_id)
# Offset exceeds the maximum message value and should get the empty list
res = self.admin.get_event_list(self.topic_name, 0, res[7].event_id,
20)
self.assertEqual(res, [])
def test_broadcast_consume_from_early(self):
"""
Test broadcast consumption, starting with the earliest messages
"""
consumer = dispatch_consumer(URL, self.topic_name,
start_from_now=False)
consumer2 = dispatch_consumer(URL, self.topic_name,
start_from_now=False)
# 1.The first consumer takes out the first 10 and determines whether
# they are successfully taken out
for i in range(10):
msg = consumer.consume(batch_consume_limit=1)[0]
self.assertEqual(msg.event_id,
getattr(self, f"msg{i}").event_id)
self.assertEqual(int(msg.value["seq"]), i)
# 2. The second consumer takes out the first 10, and since it starts
# with the earliest message, it should also be able to successfully
# take out
for i in range(10):
msg = consumer2.consume(batch_consume_limit=1)[0]
self.assertEqual(msg.event_id,
getattr(self, f"msg{i}").event_id)
self.assertEqual(int(msg.value["seq"]), i)
# 3. The consumer tries to consume the 11th message, which will time
# out after 100ms and will get an empty array
self.assertEqual(len(consumer.consume(100)), 0)
self.assertEqual(len(consumer2.consume(100)), 0)
def test_broadcast_consume_start_from_now(self):
"""
Testing broadcast consumption, listening for consumption from the
moment of access
"""
def produce_new_msg(seq: int):
time.sleep(0.1)
self.producer.produce(self.topic_name, {
"seq": seq
}, callback=lambda e, _msg: setattr(self, f"msg{seq}", _msg))
self.assertNotEqual(getattr(self, f"msg{seq}"), "")
def consume_one_message(consumer: Consumer, target_seq):
new_msg = consumer.consume(batch_consume_limit=1)[0]
time.sleep(0.1)
self.assertEqual(new_msg.event_id,
getattr(self, f"msg{target_seq}").event_id)
self.assertEqual(int(new_msg.value["seq"]), target_seq)
consumer = dispatch_consumer(URL, self.topic_name, start_from_now=True)
# 1. The first consumer accesses and attempts to read the event
# should fail, although the producer has already produced 10
# events before, but since it is specified to listen from the
# access moment, it will only receive events generated after the
# access moment
self.assertEqual(len(consumer.consume(100)), 0)
# 2. At this point another event is produced with the producer, it
# can be consumed by the consumer
thread1 = Thread(target=produce_new_msg, args=[10])
thread1.start()
consume_one_message(consumer, 10)
thread1.join()
# 4. The second consumer accesses it at this point and attempts to
# read the event should fail
consumer2 = dispatch_consumer(URL, self.topic_name,
start_from_now=True)
self.assertEqual(len(consumer2.consume(100)), 0)
# 5. At this point another event is produced with the producer, then
# it can be consumed by consumer2
thread1 = Thread(target=produce_new_msg, args=[11])
thread1.start()
consume_one_message(consumer2, 11)
thread1.join()
# 6. Since the consumer has been accessed before, the consumer should
# also be able to consume the message numbered 11
msg = consumer.consume(batch_consume_limit=1)[0]
self.assertEqual(msg.event_id,
getattr(self, f"msg{11}").event_id)
self.assertEqual(int(msg.value["seq"]), 11)
def test_group_consume(self):
"""
Test group consumption
"""
def consume_one_message(consumer: Consumer, group_id: str, target_seq):
new_msg = consumer.consume(batch_consume_limit=1)[0]
time.sleep(0.1)
self.assertEqual(new_msg.event_id,
getattr(self, f"msg{target_seq}").event_id)
self.assertEqual(int(new_msg.value["seq"]), target_seq)
self.assertEqual(consumer.ack(new_msg), 1)
# Check if the last submitted ID is the one just ACK
self.assertEqual(get_topic_consumer_group_meta(
self.redis_admin.client(),
self.topic_name,
group_id,
StaticConst.TOPIC_CONSUMER_GROUP_META_KEY_LAST_ACK_ID
), new_msg.event_id)
# 1. Create a consumer group, which should succeed
group1_id = str(uuid.uuid4())
self.assertEqual(self.admin.create_consumer_group(group1_id), True)
# 2. Attempts to create duplicate consumer groups should throw an
# exception by default
with self.assertRaises(ConsumerGroupAlreadyExistsException):
self.admin.create_consumer_group(group1_id)
# 3. Use the ignore_exception parameter and then try to create the
# duplicate consumption group, which should not throw an exception
# and directly return False
self.assertEqual(
self.admin.create_consumer_group(group1_id, ignore_exception=True),
False)
# 4. The first consumer joins the first consumption group, consumes 5
# events, and should succeed
consumer1_id = Consumer.generate_consumer_id()
consumer1 = dispatch_consumer(URL, self.topic_name,
consumer_id=consumer1_id,
group_id=group1_id)
for i in range(5):
consume_one_message(consumer1, group1_id, i)
# 5. The second consumer joins the first consumption group and consumes
# 5 events (messages 6 to 10), which should succeed
consumer2 = dispatch_consumer(URL, self.topic_name,
group_id=group1_id)
for i in range(5):
consume_one_message(consumer2, group1_id, i + 5)
# 6. The second consumer tries again to consume a message, which should
# fail because all the messages of this topic have been consumed by
# the consumer before
self.assertEqual(len(consumer2.consume(100)), 0)
# 7. This time create another consumption group
group2_id = str(uuid.uuid4())
self.assertEqual(self.admin.create_consumer_group(group2_id), True)
# 8. Consumer 1 joins consumer group 2 and consumes 5 events (1 to 5)
# should succeed
consumer1_in_group2 = dispatch_consumer(URL, self.topic_name,
consumer_id=consumer1_id,
group_id=group2_id)
for i in range(5):
consume_one_message(consumer1_in_group2, group2_id, i)
# 9. Consumer 3 joins consumer group 2 and consumes 5 events (6 to 10),
# which should succeed
consumer3 = dispatch_consumer(URL, self.topic_name,
group_id=group2_id)
for i in range(5):
consume_one_message(consumer3, group2_id, i + 5)
consumer1.disconnect()
consumer2.disconnect()
consumer3.disconnect()
consumer1_in_group2.disconnect()
# 10. Delete consumer groups 1 and 2
self.assertEqual(self.admin.del_consumer_group(group1_id), True)
self.assertEqual(self.admin.del_consumer_group(group2_id), True)
def test_broadcast_consume_bytes_string_event(self):
"""Test broadcast consume bytes event"""
consumer = dispatch_consumer(URL, self.topic_name,
start_from_now=False)
# 1.The first consumer takes out the first 10 and determines whether
# they are successfully taken out
for i in range(10):
msg = consumer.consume(batch_consume_limit=1)[0]
self.assertEqual(msg.event_id,
getattr(self, f"msg{i}").event_id)
self.assertEqual(int(msg.value["seq"]), i)
# 2. Produce bytes value and consume it
test_byte_value = b"test bytes value"
self.producer.produce(
self.topic_name, test_byte_value,
callback=lambda e, msg, idx=10: setattr(self, f"msg{idx}", msg)
)
msg = consumer.consume(batch_consume_limit=1)[0]
self.assertEqual(msg.event_id, getattr(self, f"msg{10}").event_id)
self.assertEqual(msg.value, test_byte_value)
# 3. Produce string value and consume it
test_string_value = "test bytes value"
self.producer.produce(
self.topic_name, test_string_value,
callback=lambda e, msg, idx=11: setattr(self, f"msg{idx}", msg)
)
msg = consumer.consume(batch_consume_limit=1)[0]
self.assertEqual(msg.event_id, getattr(self, f"msg{11}").event_id)
self.assertEqual(msg.value, test_string_value)
if __name__ == '__main__':
unittest.main()