sysom1/environment/1_sdk/cec_redis/heartbeat.py

271 lines
9.3 KiB
Python

# -*- coding: utf-8 -*- #
"""
Time 2022/9/22 15:24
Author: mingfeng (SunnyQjm)
Email mfeng@linux.alibaba.com
File heartbeat.py
Description:
"""
import threading
from threading import Thread, Event as ThreadEvent
from typing import Optional
from collections import deque
import redis
from redis import Redis
from redis.client import PubSub
from schedule import Scheduler
from cec_base.exceptions import CecException
from clogger import logger
from .common import StaticConst
from .admin_static import static_del_consumer
from .utils import AtomicLong
class Heartbeat:
"""A daemon thread/process use to send and listen the heartbeat of consumer
A daemon thread/process to generate and listen to the consumer's heartbeat
1. First, the CEC creates a pub/sub channel for each consumer group.
2. Then, when each member of the consumer group accesses the CEC, a
Heartbeat instance is initiated that periodically sends heartbeat
data to the "heartbeat channel" of the corresponding consumer group.
3. Consumers also subscribe to the heartbeat channel of the consumer
group and monitor the heartbeat data of other members of the group
in real time.
4. If the current consumer detects that a group member has not sent a
heartbeat for a long time, it assumes that the member is dead and
does the following.
4. If the current consumer detects that a member of the group has not
sent heartbeat data for some time, it assumes that the member is
offline and does the following:
- If the member does not have a pending message, move it out of the
consumer group;
- If the member has a pending message, it attempts to transfer the
message to the current consumer, and moves the member out of the
consumer group after a successful transfer.
Args:
redis_client(Redis): Redis client
topic(str): Topic
consumer_group(str): Consumer group ID
consumer_id(str): Consumer ID
Keyword Args
heartbeat_interval(int): Heartbeat interval in seconds
heartbeat_process_mod(bool): Whether to use a separate process to run
heartbeat_check_interval(bool): Time interval to check if a consumer in
a group is online
Attributes:
"""
# pylint: disable=too-many-instance-attributes
# Eleven is reasonable in this case.
def __init__(self, redis_client: Redis, topic: str, consumer_group: str,
consumer_id: str, **kwargs):
self._process_mod = kwargs.get("heartbeat_process_mod", False)
self._check_heart_beat_interval = 3
check_interval = kwargs.get("heartbeat_check_interval", 0)
if check_interval > 0:
self._check_heart_beat_interval = check_interval
self._redis_client = redis_client
self._topic = topic
self._consumer_group = consumer_group
self._consumer_id = consumer_id
self._heartbeat_interval = kwargs.get("heartbeat_interval", 5)
self._stop_event: ThreadEvent = ThreadEvent()
self._heartbeat_listen_thread: Optional[Thread] = None
self._channel_name = f"{StaticConst.REDIS_HEARTBEAT_CHANNEL_PREFIX}" \
f"{self._topic}:{self._consumer_group}"
self._ps: PubSub = self._redis_client.pubsub()
self._heartbeat_timeline = AtomicLong(0)
self._heartbeat_check_schedule = Scheduler()
# <consumer_id> -> heartbeat timeline
self._consumers = {
}
self._offline_consumers = deque()
def _send_heart_beat(self):
"""Current send heartbeat to consumer group's heartbeat channel
The current consumer sends a heartbeat to the heartbeat channel of the
consumer group
Returns:
"""
self._heartbeat_timeline.inc(1)
self._redis_client.publish(self._channel_name, self._consumer_id)
def _deal_recv_heartbeat(self, message: dict):
"""Deal received heartbeat
Processing incoming heartbeat data
Returns:
"""
msg_type = message.get("type", "")
channel, consumer = message.get('channel', ''), message.get('data', '')
if msg_type == "message" and channel == self._channel_name and len(
consumer) > 0:
self._consumers[consumer] = self._heartbeat_timeline.value
def _check_offline_consumer(self):
"""Check if there are offline consumers
Returns:
"""
cur, offline_consumers = self._heartbeat_timeline.value, []
for consumer, time in self._consumers.items():
if time + self._check_heart_beat_interval < cur:
# Detect some consumer offline
self._offline_consumers.append(consumer)
offline_consumers.append(consumer)
for consumer in offline_consumers:
self._consumers.pop(consumer)
def get_next_offline_consumer(self) -> Optional[str]:
"""Get next offline consumer
Returns:
"""
if self._offline_consumers:
return self._offline_consumers[0]
return None
def remove_consumer(self, consumer: str):
"""Remove monitoring of a consumer
Removal of monitoring of a specific consumer requires the following
conditions to be met:
1. Consumer offline
2. All messages inside the PEL have been transferred;
Args:
consumer(str): Consumer ID
Returns:
"""
try:
self._offline_consumers.remove(consumer)
static_del_consumer(self._redis_client, self._topic,
self._consumer_group,
consumer)
except redis.exceptions.RedisError:
pass
def run(self):
"""Listen and send heartbeat
1. Periodically sends heartbeat data to the heartbeat channel of the
consumer group;
2. Monitor and record the heartbeat information of consumers in the
group from the heartbeat channel of the consumer group;
Returns:
"""
timeout = 1 if self._heartbeat_interval >= 2 \
else self._heartbeat_interval / 2
try:
while not self._stop_event.is_set():
message = self._ps.get_message(
timeout=timeout)
if message is not None:
self._deal_recv_heartbeat(message)
self._heartbeat_check_schedule.run_pending()
except redis.exceptions.ConnectionError:
# ignore clo
pass
def get_consumers(self) -> dict:
"""Get consumer list
Returns:
"""
return self._consumers
def start(self) -> bool:
"""Start heartbeat thread
Returns:
Examples:
> XINFO CONSUMERS mystream mygroup
1) 1) name
2) "Alice"
3) pending
4) (integer) 1
5) idle
6) (integer) 9104628
2) 1) name
2) "Bob"
3) pending
4) (integer) 1
5) idle
6) (integer) 83841983
"""
if self._heartbeat_listen_thread is not None and \
self._heartbeat_listen_thread.is_alive():
return False
self._ps.subscribe(self._channel_name)
self._heartbeat_timeline.get_and_set(0)
self._consumers = {}
# On startup, get a list of consumers in the group
try:
consumers = self._redis_client.xinfo_consumers(
self._topic, self._consumer_group)
for consumer in consumers:
self._consumers[consumer.get('name', '')] = 0
except redis.exceptions.ResponseError as error:
logger.error(error)
# Add a schedule task to send heartbeats periodically.
self._heartbeat_check_schedule.every(self._heartbeat_interval) \
.seconds.do(self._send_heart_beat)
self._heartbeat_check_schedule.every(
self._check_heart_beat_interval * self._heartbeat_interval) \
.seconds.do(self._check_offline_consumer)
self._heartbeat_listen_thread = threading.Thread(
target=self.run,
name=f"CEC-{self._consumer_group}:{self._consumer_id}-HEARTBEAT"
)
self._heartbeat_listen_thread.setDaemon(True)
self._heartbeat_listen_thread.start()
return True
def stop(self) -> bool:
"""Stop heartbeat thread
Returns:
"""
try:
if self._heartbeat_listen_thread is None:
return False
if not self._heartbeat_listen_thread.is_alive():
self._heartbeat_listen_thread = None
return False
self._heartbeat_check_schedule.clear()
self._stop_event.set()
self._heartbeat_listen_thread.join()
self._stop_event.clear()
self._heartbeat_listen_thread = None
self._ps.unsubscribe()
self._ps.close()
return True
except (redis.RedisError, CecException):
# Ignoring errors arising from the stop phase
return False