mirror of https://gitee.com/anolis/sysom.git
80 lines
2.8 KiB
Python
80 lines
2.8 KiB
Python
# -*- coding: utf-8 -*- #
|
|
"""
|
|
Time 2022/9/29 14:55
|
|
Author: mingfeng (SunnyQjm)
|
|
Email mfeng@linux.alibaba.com
|
|
File test_redis_admin.py
|
|
Description:
|
|
"""
|
|
import time
|
|
import unittest
|
|
from cec_base.admin import dispatch_admin
|
|
from cec_redis.heartbeat import Heartbeat
|
|
|
|
|
|
class MyTestCase(unittest.TestCase):
|
|
"""A test class to test Heartbeat
|
|
|
|
This test class tests the functionality of cec-redis's heartbeat module.
|
|
|
|
"""
|
|
|
|
@classmethod
|
|
def setUpClass(cls) -> None:
|
|
pass
|
|
|
|
def setUp(self) -> None:
|
|
self.admin = dispatch_admin("redis://localhost:6379")
|
|
self.heartbeat_interval = 1
|
|
self.heartbeat1 = Heartbeat(self.admin.client(), "t1", "g1", "c1",
|
|
heartbeat_interval=self.heartbeat_interval,
|
|
heartbeat_check_interval=2)
|
|
self.heartbeat2 = Heartbeat(self.admin.client(), "t1", "g1", "c2",
|
|
heartbeat_interval=self.heartbeat_interval,
|
|
heartbeat_check_interval=2)
|
|
self.assertEqual(self.heartbeat1.start(), True)
|
|
self.assertEqual(self.heartbeat2.start(), True)
|
|
|
|
def tearDown(self) -> None:
|
|
self.assertEqual(self.heartbeat1.stop(), True)
|
|
self.assertEqual(self.heartbeat2.stop(), True)
|
|
|
|
def test_heartbeat(self):
|
|
"""Test the basic functions of the heartbeat module
|
|
|
|
Returns:
|
|
|
|
"""
|
|
time.sleep(self.heartbeat_interval + 0.5)
|
|
self.assertEqual(self.heartbeat1.get_consumers().get("c1", 0), 1)
|
|
self.assertEqual(self.heartbeat1.get_consumers().get("c2", 0), 1)
|
|
self.assertEqual(self.heartbeat2.get_consumers().get("c1", 0), 1)
|
|
self.assertEqual(self.heartbeat2.get_consumers().get("c2", 0), 1)
|
|
for i in range(2):
|
|
time.sleep(self.heartbeat_interval)
|
|
self.assertEqual(self.heartbeat1.get_consumers().get("c1", 0),
|
|
i + 2)
|
|
self.assertEqual(self.heartbeat1.get_consumers().get("c2", 0),
|
|
i + 2)
|
|
self.assertEqual(self.heartbeat2.get_consumers().get("c1", 0),
|
|
i + 2)
|
|
self.assertEqual(self.heartbeat2.get_consumers().get("c2", 0),
|
|
i + 2)
|
|
|
|
def test_offline_consumer_check(self):
|
|
"""Test offline consumer detection function
|
|
|
|
Returns:
|
|
|
|
"""
|
|
|
|
time.sleep(self.heartbeat_interval * 1.5)
|
|
self.assertEqual(self.heartbeat1.stop(), True)
|
|
time.sleep(self.heartbeat_interval * 5)
|
|
self.assertEqual(self.heartbeat2.get_next_offline_consumer(), "c1")
|
|
self.assertEqual(self.heartbeat1.start(), True)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|