sysom1/environment/1_sdk/test_cec_redis/test_utils.py

53 lines
1.5 KiB
Python

# -*- coding: utf-8 -*- #
"""
Time 2022/10/8 15:08
Author: mingfeng (SunnyQjm)
Email mfeng@linux.alibaba.com
File test_redis_admin.py
Description:
"""
import time
import unittest
import uuid
from cec_base.admin import Admin, dispatch_admin
from cec_redis.utils import RedisLocker
from cec_redis.redis_admin import RedisAdmin
class MyTestCase(unittest.TestCase):
"""A test class to test utils
"""
@classmethod
def setUpClass(cls) -> None:
Admin.register('redis', RedisAdmin)
def test_redis_lock(self):
""" Test redis locker
Returns:
"""
client = dispatch_admin("redis://localhost:6379").client()
test_key = str(uuid.uuid4())
# 1. The first locking should be successful
with RedisLocker(client, test_key, ex=5) as lock_result:
self.assertEqual(lock_result, True)
# 2. Repeated locking should fail
with RedisLocker(client, test_key, ex=5) as lock_result_2:
self.assertEqual(lock_result_2, False)
# 3. Unlocked and re-locked should be successful
with RedisLocker(client, test_key, ex=5) as lock_result:
self.assertEqual(lock_result, True)
time.sleep(5)
# 2. Re-locking after lock timeout should succeed
with RedisLocker(client, test_key, ex=5) as lock_result_2:
self.assertEqual(lock_result_2, True)
if __name__ == '__main__':
unittest.main()