修复发布订阅死锁问题

This commit is contained in:
xue_meng_en 2022-04-16 16:27:37 +08:00
parent 7f97d633f1
commit d8b89ceea5
3 changed files with 16 additions and 14 deletions

View File

@ -6114,6 +6114,9 @@ void ProcessInterrupts(void)
/* The logical replication launcher can be stopped at any time. */ /* The logical replication launcher can be stopped at any time. */
proc_exit(0); proc_exit(0);
} else if (IsLogicalWorker()) {
ereport(FATAL, (errcode(ERRCODE_ADMIN_SHUTDOWN),
errmsg("terminating logical replication worker due to administrator command")));
#endif #endif
} else if (IsTxnSnapCapturerProcess()) { } else if (IsTxnSnapCapturerProcess()) {
ereport(FATAL, ereport(FATAL,

View File

@ -121,18 +121,6 @@ static void LogicalrepWorkerSighub(SIGNAL_ARGS)
t_thrd.applyworker_cxt.got_SIGHUP = true; t_thrd.applyworker_cxt.got_SIGHUP = true;
} }
/* SIGTERM: time to die */
static void LogicalrepWorkerSigterm(SIGNAL_ARGS)
{
int saveErrno = errno;
t_thrd.applyworker_cxt.got_SIGTERM = true;
if (t_thrd.proc)
SetLatch(&t_thrd.proc->procLatch);
errno = saveErrno;
}
/* /*
* Make sure that we started local transaction. * Make sure that we started local transaction.
* *
@ -1068,13 +1056,15 @@ static void ApplyLoop(void)
/* mark as idle, before starting to loop */ /* mark as idle, before starting to loop */
pgstat_report_activity(STATE_IDLE, NULL); pgstat_report_activity(STATE_IDLE, NULL);
while (!t_thrd.applyworker_cxt.got_SIGTERM) { for (;;) {
MemoryContextSwitchTo(t_thrd.applyworker_cxt.messageContext); MemoryContextSwitchTo(t_thrd.applyworker_cxt.messageContext);
int len; int len;
char *buf = NULL; char *buf = NULL;
unsigned char type; unsigned char type;
CHECK_FOR_INTERRUPTS();
/* Wait a while for data to arrive */ /* Wait a while for data to arrive */
if ((WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len)) { if ((WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len)) {
StringInfoData s; StringInfoData s;
@ -1371,7 +1361,7 @@ void ApplyWorkerMain()
*/ */
gspqsignal(SIGHUP, LogicalrepWorkerSighub); gspqsignal(SIGHUP, LogicalrepWorkerSighub);
gspqsignal(SIGINT, StatementCancelHandler); gspqsignal(SIGINT, StatementCancelHandler);
gspqsignal(SIGTERM, LogicalrepWorkerSigterm); gspqsignal(SIGTERM, die);
gspqsignal(SIGQUIT, quickdie); gspqsignal(SIGQUIT, quickdie);
gspqsignal(SIGALRM, handle_sig_alarm); gspqsignal(SIGALRM, handle_sig_alarm);
@ -1727,3 +1717,11 @@ static void UpdateConninfo(char* standbysInfo)
ereport(LOG, (errmsg("Update conninfo successfully, new conninfo %s.", standbysInfo))); ereport(LOG, (errmsg("Update conninfo successfully, new conninfo %s.", standbysInfo)));
} }
/*
* Is current process a logical replication worker?
*/
bool IsLogicalWorker(void)
{
return t_thrd.applyworker_cxt.curWorker != NULL;
}

View File

@ -13,5 +13,6 @@
#define LOGICALWORKER_H #define LOGICALWORKER_H
extern void ApplyWorkerMain(); extern void ApplyWorkerMain();
extern bool IsLogicalWorker(void);
#endif /* LOGICALWORKER_H */ #endif /* LOGICALWORKER_H */