sysom1/environment/1_sdk/test_cec_redis/test_redis_producer.py

82 lines
2.4 KiB
Python

# -*- coding: utf-8 -*- #
"""
Time 2022/8/15 10:25
Author: mingfeng (SunnyQjm)
Email mfeng@linux.alibaba.com
File test_redis_admin.py
Description:
"""
import unittest
import uuid
from typing import Optional
from cec_base.producer import dispatch_producer, Producer
from cec_base.exceptions import TopicNotExistsException
from cec_base.admin import dispatch_admin, Admin
from cec_base.event import Event
from cec_redis.redis_producer import RedisProducer
from cec_redis.redis_admin import RedisAdmin
class TestRedisProducer(unittest.TestCase):
"""A test class to test RedisProducer
This test class tests the functionality of the redis-based Producer
"""
@classmethod
def setUpClass(cls) -> None:
Producer.register('redis', RedisProducer)
Admin.register('redis', RedisAdmin)
def setUp(self) -> None:
self.producer_msg: Optional[Event] = None
self.error = None
self.producer = dispatch_producer("redis://localhost:6379")
self.admin = dispatch_admin("redis://localhost:6379")
# 1. Create a topic for testing
self.random_id = str(uuid.uuid4())
self.assertEqual(self.admin.create_topic(self.random_id), True)
def tearDown(self) -> None:
# 2. Clear topics for testing
self.assertEqual(self.admin.del_topic(self.random_id), True)
self.producer.disconnect()
self.admin.disconnect()
def test_produce(self):
"""Unit testing of the produce functions
Returns:
"""
# 3. Produce a message without throwing any exceptions
self.producer.produce(self.random_id, {
"test": "test_msg"
}, callback=lambda e, msg: (
setattr(self, 'error', e),
setattr(self, 'producer_msg', msg)
))
# 4. Check that the messages produced are normal (normal if they have
# an automatically assigned event_id)
self.assertNotIn(self.producer_msg.event_id, ["", None])
# 5. Producing a message to a non-existent topic should throw an
# exception
self.producer.produce(f"{self.random_id}-no", {
"test": "test_msg"
}, callback=lambda e, msg: (
setattr(self, 'error', e),
setattr(self, 'producer_msg', msg)
))
self.assertIsInstance(self.error, TopicNotExistsException)
if __name__ == '__main__':
unittest.main()