mirror of https://gitee.com/anolis/sysom.git
53 lines
1.5 KiB
Python
53 lines
1.5 KiB
Python
# -*- coding: utf-8 -*- #
|
|
"""
|
|
Time 2022/10/8 15:08
|
|
Author: mingfeng (SunnyQjm)
|
|
Email mfeng@linux.alibaba.com
|
|
File test_redis_admin.py
|
|
Description:
|
|
"""
|
|
import time
|
|
import unittest
|
|
import uuid
|
|
|
|
from cec_base.admin import Admin, dispatch_admin
|
|
from cec_redis.utils import RedisLocker
|
|
from cec_redis.redis_admin import RedisAdmin
|
|
|
|
|
|
class MyTestCase(unittest.TestCase):
|
|
"""A test class to test utils
|
|
|
|
"""
|
|
|
|
@classmethod
|
|
def setUpClass(cls) -> None:
|
|
Admin.register('redis', RedisAdmin)
|
|
|
|
def test_redis_lock(self):
|
|
""" Test redis locker
|
|
Returns:
|
|
|
|
"""
|
|
client = dispatch_admin("redis://localhost:6379").client()
|
|
test_key = str(uuid.uuid4())
|
|
|
|
# 1. The first locking should be successful
|
|
with RedisLocker(client, test_key, ex=5) as lock_result:
|
|
self.assertEqual(lock_result, True)
|
|
# 2. Repeated locking should fail
|
|
with RedisLocker(client, test_key, ex=5) as lock_result_2:
|
|
self.assertEqual(lock_result_2, False)
|
|
|
|
# 3. Unlocked and re-locked should be successful
|
|
with RedisLocker(client, test_key, ex=5) as lock_result:
|
|
self.assertEqual(lock_result, True)
|
|
time.sleep(5)
|
|
# 2. Re-locking after lock timeout should succeed
|
|
with RedisLocker(client, test_key, ex=5) as lock_result_2:
|
|
self.assertEqual(lock_result_2, True)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|