!1675 修复发布订阅死锁问题

Merge pull request !1675 from 薛蒙恩/pubsub_deadlock
This commit is contained in:
opengauss-bot 2022-04-24 01:53:42 +00:00 committed by Gitee
commit a548f5c3c6
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
3 changed files with 16 additions and 14 deletions

View File

@ -6114,6 +6114,9 @@ void ProcessInterrupts(void)
/* The logical replication launcher can be stopped at any time. */
proc_exit(0);
} else if (IsLogicalWorker()) {
ereport(FATAL, (errcode(ERRCODE_ADMIN_SHUTDOWN),
errmsg("terminating logical replication worker due to administrator command")));
#endif
} else if (IsTxnSnapCapturerProcess()) {
ereport(FATAL,

View File

@ -121,18 +121,6 @@ static void LogicalrepWorkerSighub(SIGNAL_ARGS)
t_thrd.applyworker_cxt.got_SIGHUP = true;
}
/* SIGTERM: time to die */
static void LogicalrepWorkerSigterm(SIGNAL_ARGS)
{
int saveErrno = errno;
t_thrd.applyworker_cxt.got_SIGTERM = true;
if (t_thrd.proc)
SetLatch(&t_thrd.proc->procLatch);
errno = saveErrno;
}
/*
* Make sure that we started local transaction.
*
@ -1068,13 +1056,15 @@ static void ApplyLoop(void)
/* mark as idle, before starting to loop */
pgstat_report_activity(STATE_IDLE, NULL);
while (!t_thrd.applyworker_cxt.got_SIGTERM) {
for (;;) {
MemoryContextSwitchTo(t_thrd.applyworker_cxt.messageContext);
int len;
char *buf = NULL;
unsigned char type;
CHECK_FOR_INTERRUPTS();
/* Wait a while for data to arrive */
if ((WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len)) {
StringInfoData s;
@ -1371,7 +1361,7 @@ void ApplyWorkerMain()
*/
gspqsignal(SIGHUP, LogicalrepWorkerSighub);
gspqsignal(SIGINT, StatementCancelHandler);
gspqsignal(SIGTERM, LogicalrepWorkerSigterm);
gspqsignal(SIGTERM, die);
gspqsignal(SIGQUIT, quickdie);
gspqsignal(SIGALRM, handle_sig_alarm);
@ -1727,3 +1717,11 @@ static void UpdateConninfo(char* standbysInfo)
ereport(LOG, (errmsg("Update conninfo successfully, new conninfo %s.", standbysInfo)));
}
/*
* Is current process a logical replication worker?
*/
bool IsLogicalWorker(void)
{
return t_thrd.applyworker_cxt.curWorker != NULL;
}

View File

@ -13,5 +13,6 @@
#define LOGICALWORKER_H
extern void ApplyWorkerMain();
extern bool IsLogicalWorker(void);
#endif /* LOGICALWORKER_H */