!669 set WalSndCaughtUp to thread context variable

Merge pull request !669 from chenxiaobin/110
This commit is contained in:
opengauss-bot 2021-02-02 11:35:43 +08:00 committed by Gitee
commit e3b1f129dc
3 changed files with 20 additions and 18 deletions

View File

@ -1297,6 +1297,7 @@ static void knl_t_walsender_init(knl_t_walsender_context* walsender_cxt)
walsender_cxt->reply_message = (StringInfoData*)palloc0(sizeof(StringInfoData)); walsender_cxt->reply_message = (StringInfoData*)palloc0(sizeof(StringInfoData));
walsender_cxt->tmpbuf = (StringInfoData*)palloc0(sizeof(StringInfoData)); walsender_cxt->tmpbuf = (StringInfoData*)palloc0(sizeof(StringInfoData));
walsender_cxt->remotePort = 0; walsender_cxt->remotePort = 0;
walsender_cxt->walSndCaughtUp = false;
} }
static void knl_t_tsearch_init(knl_t_tsearch_context* tsearch_cxt) static void knl_t_tsearch_init(knl_t_tsearch_context* tsearch_cxt)

View File

@ -108,8 +108,6 @@ extern void *internal_load_library(const char *libname);
extern char *expand_dynamic_library_name(const char *name); extern char *expand_dynamic_library_name(const char *name);
extern bool PMstateIsRun(void); extern bool PMstateIsRun(void);
/* Are we there yet? */
static bool WalSndCaughtUp = false;
#define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */ #define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */
bool WalSegmemtRemovedhappened = false; bool WalSegmemtRemovedhappened = false;
volatile bool bSyncStat = false; volatile bool bSyncStat = false;
@ -1577,7 +1575,7 @@ static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
break; break;
/* Waiting for new WAL. Since we need to wait, we're now caught up. */ /* Waiting for new WAL. Since we need to wait, we're now caught up. */
WalSndCaughtUp = true; t_thrd.walsender_cxt.walSndCaughtUp = true;
/* /*
* Try to flush pending output to the client. Also wait for the socket * Try to flush pending output to the client. Also wait for the socket
@ -3020,7 +3018,7 @@ static int WalSndLoop(WalSndSendDataCallback send_data)
* sender is "out of work". * sender is "out of work".
*/ */
if (WalSndCaughtup()) { if (WalSndCaughtup()) {
WalSndCaughtUp = true; t_thrd.walsender_cxt.walSndCaughtUp = true;
t_thrd.walsender_cxt.sentPtr = InvalidXLogRecPtr; t_thrd.walsender_cxt.sentPtr = InvalidXLogRecPtr;
/* Close open wal file */ /* Close open wal file */
@ -3048,10 +3046,10 @@ static int WalSndLoop(WalSndSendDataCallback send_data)
if (!pq_is_send_pending()) if (!pq_is_send_pending())
send_data(); send_data();
else else
WalSndCaughtUp = false; t_thrd.walsender_cxt.walSndCaughtUp = false;
/* Send DummyStandby end message */ /* Send DummyStandby end message */
if (WalSndCaughtUp) { if (t_thrd.walsender_cxt.walSndCaughtUp) {
/* Try to flush pending output to the client */ /* Try to flush pending output to the client */
if (pq_flush_if_writable() != 0) if (pq_flush_if_writable() != 0)
break; break;
@ -3070,9 +3068,9 @@ static int WalSndLoop(WalSndSendDataCallback send_data)
if (!pq_is_send_pending()) if (!pq_is_send_pending())
send_data(); send_data();
else else
WalSndCaughtUp = false; t_thrd.walsender_cxt.walSndCaughtUp = false;
if (WalSndCaughtUp && dummyStandbyMode) { if (t_thrd.walsender_cxt.walSndCaughtUp && dummyStandbyMode) {
if (!pq_is_send_pending()) { if (!pq_is_send_pending()) {
WalSndSyncDummyStandbyDone(false); WalSndSyncDummyStandbyDone(false);
(void)pq_flush(); (void)pq_flush();
@ -3090,7 +3088,7 @@ static int WalSndLoop(WalSndSendDataCallback send_data)
} }
/* If nothing remains to be sent right now ... */ /* If nothing remains to be sent right now ... */
if (WalSndCaughtUp && !pq_is_send_pending()) { if (t_thrd.walsender_cxt.walSndCaughtUp && !pq_is_send_pending()) {
/* /*
* If we're in catchup state, move to streaming. This is an * If we're in catchup state, move to streaming. This is an
* important state change for users to know about, since before * important state change for users to know about, since before
@ -3127,7 +3125,7 @@ static int WalSndLoop(WalSndSendDataCallback send_data)
else else
send_data(); send_data();
if (WalSndCaughtUp && !pq_is_send_pending()) { if (t_thrd.walsender_cxt.walSndCaughtUp && !pq_is_send_pending()) {
if (dummyStandbyMode || if (dummyStandbyMode ||
XLByteEQ(t_thrd.walsender_cxt.sentPtr, t_thrd.walsender_cxt.MyWalSnd->flush)) XLByteEQ(t_thrd.walsender_cxt.sentPtr, t_thrd.walsender_cxt.MyWalSnd->flush))
t_thrd.walsender_cxt.walsender_shutdown_requested = true; t_thrd.walsender_cxt.walsender_shutdown_requested = true;
@ -3156,7 +3154,7 @@ static int WalSndLoop(WalSndSendDataCallback send_data)
* loaded a subset of the available data but then pq_flush_if_writable * loaded a subset of the available data but then pq_flush_if_writable
* flushed it all --- we should immediately try to send more. * flushed it all --- we should immediately try to send more.
*/ */
if (WalSndCaughtUp || pq_is_send_pending()) { if (t_thrd.walsender_cxt.walSndCaughtUp || pq_is_send_pending()) {
long sleeptime; long sleeptime;
int wakeEvents; int wakeEvents;
@ -3694,7 +3692,7 @@ static void XLogSendLogical(void)
* XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait - * XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait -
* i.e. when we're shutting down. * i.e. when we're shutting down.
*/ */
WalSndCaughtUp = false; t_thrd.walsender_cxt.walSndCaughtUp = false;
record = XLogReadRecord(t_thrd.walsender_cxt.logical_decoding_ctx->reader, t_thrd.walsender_cxt.logical_startptr, record = XLogReadRecord(t_thrd.walsender_cxt.logical_decoding_ctx->reader, t_thrd.walsender_cxt.logical_startptr,
&errm); &errm);
@ -3718,7 +3716,7 @@ static void XLogSendLogical(void)
* then we're caught up. * then we're caught up.
*/ */
if (t_thrd.walsender_cxt.logical_decoding_ctx->reader->EndRecPtr >= GetFlushRecPtr()) if (t_thrd.walsender_cxt.logical_decoding_ctx->reader->EndRecPtr >= GetFlushRecPtr())
WalSndCaughtUp = true; t_thrd.walsender_cxt.walSndCaughtUp = true;
} }
/* Update shared memory status */ /* Update shared memory status */
@ -3776,7 +3774,7 @@ static void XLogSendPhysical(void)
ereport(LOG, (errmsg("terminating walsender process to force cascaded standby " ereport(LOG, (errmsg("terminating walsender process to force cascaded standby "
"to update timeline and reconnect"))); "to update timeline and reconnect")));
t_thrd.walsender_cxt.walsender_ready_to_stop = true; t_thrd.walsender_cxt.walsender_ready_to_stop = true;
WalSndCaughtUp = true; t_thrd.walsender_cxt.walSndCaughtUp = true;
return; return;
} }
} else if (dummyStandbyMode) } else if (dummyStandbyMode)
@ -3786,7 +3784,7 @@ static void XLogSendPhysical(void)
/* Quick exit if nothing to do */ /* Quick exit if nothing to do */
if (!u_sess->attr.attr_storage.enable_stream_replication || XLByteLE(SendRqstPtr, t_thrd.walsender_cxt.sentPtr)) { if (!u_sess->attr.attr_storage.enable_stream_replication || XLByteLE(SendRqstPtr, t_thrd.walsender_cxt.sentPtr)) {
WalSndCaughtUp = true; t_thrd.walsender_cxt.walSndCaughtUp = true;
return; return;
} }
@ -3808,11 +3806,11 @@ static void XLogSendPhysical(void)
/* if we went beyond SendRqstPtr, back off */ /* if we went beyond SendRqstPtr, back off */
if (XLByteLE(SendRqstPtr, endptr)) { if (XLByteLE(SendRqstPtr, endptr)) {
endptr = SendRqstPtr; endptr = SendRqstPtr;
WalSndCaughtUp = true; t_thrd.walsender_cxt.walSndCaughtUp = true;
} else { } else {
/* round down to page boundary. */ /* round down to page boundary. */
endptr -= (endptr % XLOG_BLCKSZ); endptr -= (endptr % XLOG_BLCKSZ);
WalSndCaughtUp = false; t_thrd.walsender_cxt.walSndCaughtUp = false;
t_thrd.walsender_cxt.catchup_threshold = XLByteDifference(SendRqstPtr, endptr); t_thrd.walsender_cxt.catchup_threshold = XLByteDifference(SendRqstPtr, endptr);
} }
@ -3848,7 +3846,8 @@ static void XLogSendPhysical(void)
msghdr.walEnd = SendRqstPtr; msghdr.walEnd = SendRqstPtr;
msghdr.sendTime = GetCurrentTimestamp(); msghdr.sendTime = GetCurrentTimestamp();
msghdr.sender_sent_location = endptr; msghdr.sender_sent_location = endptr;
msghdr.catchup = (t_thrd.walsender_cxt.MyWalSnd->state == WALSNDSTATE_CATCHUP && !WalSndCaughtUp); msghdr.catchup = (t_thrd.walsender_cxt.MyWalSnd->state == WALSNDSTATE_CATCHUP &&
!t_thrd.walsender_cxt.walSndCaughtUp);
SpinLockAcquire(&hashmdata->mutex); SpinLockAcquire(&hashmdata->mutex);
local_role = hashmdata->current_mode; local_role = hashmdata->current_mode;
SpinLockRelease(&hashmdata->mutex); SpinLockRelease(&hashmdata->mutex);

View File

@ -2183,6 +2183,8 @@ typedef struct knl_t_walsender_context {
struct LogicalDecodingContext* logical_decoding_ctx; struct LogicalDecodingContext* logical_decoding_ctx;
XLogRecPtr logical_startptr; XLogRecPtr logical_startptr;
int remotePort; int remotePort;
/* Have we caught up with primary? */
bool walSndCaughtUp;
} knl_t_walsender_context; } knl_t_walsender_context;
typedef struct knl_t_walreceiverfuncs_context { typedef struct knl_t_walreceiverfuncs_context {