Offering: openGaussDev

More detail: 修复删除分区表报错

# Conflicts:
#	src/common/backend/utils/init/globals.cpp
#	src/include/access/xlogproc.h

Match-id-978d16029c7f5ac4681e6191b78392f89718edba
This commit is contained in:
openGaussDev 2022-03-04 16:14:56 +08:00 committed by yanghao
parent 5e577476a4
commit 5cfddc1b52
16 changed files with 1562 additions and 1264 deletions

View File

@ -59,7 +59,7 @@ bool open_join_children = true;
bool will_shutdown = false;
/* hard-wired binary version number */
const uint32 GRAND_VERSION_NUM = 92604;
const uint32 GRAND_VERSION_NUM = 92605;
const uint32 PREDPUSH_SAME_LEVEL_VERSION_NUM = 92522;
const uint32 UPSERT_WHERE_VERSION_NUM = 92514;
@ -114,6 +114,8 @@ const uint32 SUPPORT_HASH_XLOG_VERSION_NUM = 92603;
/* This variable indicates wheather the instance is in progress of upgrade as a whole */
uint32 volatile WorkingGrandVersionNum = GRAND_VERSION_NUM;
const uint32 INVALID_INVISIBLE_TUPLE_VERSION = 92605;
const uint32 ENHANCED_TUPLE_LOCK_VERSION_NUM = 92583;
const uint32 TWOPHASE_FILE_VERSION = 92414;

View File

@ -1920,6 +1920,15 @@ double CopyUHeapDataInternal(Relation oldHeap, Relation oldIndex, Relation newHe
return tups_vacuumed;
}
static inline bool tuple_invisible_not_hotupdate(HeapTuple tuple, Relation relation)
{
if (HeapKeepInvisibleTuple(tuple, RelationGetDescr(relation)) && !HeapTupleIsHotUpdated(tuple)) {
return false;
} else {
return true;
}
}
double copy_heap_data_internal(Relation OldHeap, Relation OldIndex, Relation NewHeap, TransactionId OldestXmin,
TransactionId FreezeXid, bool verbose, bool use_sort, AdaptMem* memUsage)
{
@ -2071,7 +2080,7 @@ double copy_heap_data_internal(Relation OldHeap, Relation OldIndex, Relation New
switch (HeapTupleSatisfiesVacuum(tuple, OldestXmin, buf)) {
case HEAPTUPLE_DEAD:
/* Definitely dead */
isdead = true;
isdead = tuple_invisible_not_hotupdate(tuple, OldHeap);
break;
case HEAPTUPLE_RECENTLY_DEAD:
tups_recently_dead += 1;

View File

@ -1051,9 +1051,11 @@ static IndexBulkDeleteResult** lazy_scan_heap(
OffsetNumber offnum, maxoff;
bool tupgone = false;
bool hastup = false;
bool keepThisInvisbleTuple = false;
bool keepThisInvisibleTuple = false;
int prev_dead_count;
OffsetNumber invalid[MaxOffsetNumber];
OffsetNumber frozen[MaxOffsetNumber];
int ninvalid = 0;
int nfrozen;
Size freespace;
bool all_visible_according_to_vm = false;
@ -1335,7 +1337,7 @@ static IndexBulkDeleteResult** lazy_scan_heap(
tuple.t_bucketId = RelationGetBktid(onerel);
HeapTupleCopyBaseFromPage(&tuple, page);
tupgone = false;
keepThisInvisbleTuple = false;
keepThisInvisibleTuple = false;
if (u_sess->attr.attr_storage.enable_debug_vacuum)
t_thrd.utils_cxt.pRelatedRel = onerel;
@ -1358,8 +1360,8 @@ static IndexBulkDeleteResult** lazy_scan_heap(
* cheaper to get rid of it in the next pruning pass than
* to treat it like an indexed tuple.
*/
keepThisInvisbleTuple = HeapKeepInvisbleTuple(&tuple, RelationGetDescr(onerel));
if (HeapTupleIsHotUpdated(&tuple) || HeapTupleIsHeapOnly(&tuple) || keepThisInvisbleTuple) {
keepThisInvisibleTuple = HeapKeepInvisibleTuple(&tuple, RelationGetDescr(onerel));
if (HeapTupleIsHotUpdated(&tuple) || HeapTupleIsHeapOnly(&tuple) || keepThisInvisibleTuple) {
nkeep += 1;
} else {
tupgone = true; /* we can delete the tuple */
@ -1437,8 +1439,15 @@ static IndexBulkDeleteResult** lazy_scan_heap(
tups_vacuumed += 1;
has_dead_tuples = true;
} else if (keepThisInvisbleTuple) {
vacrelstats->hasKeepInvisbleTuples = true;
} else if (keepThisInvisibleTuple) {
if (t_thrd.proc->workingVersionNum >= INVALID_INVISIBLE_TUPLE_VERSION
&& !HeapTupleIsHotUpdated(&tuple)) {
heap_invalid_invisible_tuple(&tuple);
Assert(tuple.t_tableOid == PartitionRelationId);
invalid[ninvalid++] = offnum;
} else {
vacrelstats->hasKeepInvisbleTuples = true;
}
} else {
num_tuples += 1;
hastup = true;
@ -1478,6 +1487,23 @@ static IndexBulkDeleteResult** lazy_scan_heap(
}
}
if (ninvalid > 0) {
START_CRIT_SECTION();
MarkBufferDirty(buf);
if (RelationNeedsWAL(onerel)) {
XLogRecPtr recptr;
recptr = log_heap_invalid(onerel, buf, u_sess->cmd_cxt.FreezeLimit,
invalid, ninvalid);
PageSetLSN(page, recptr);
}
END_CRIT_SECTION();
if (TransactionIdPrecedes(((HeapPageHeader)page)->pd_xid_base, u_sess->utils_cxt.RecentXmin)) {
if (u_sess->utils_cxt.RecentXmin - ((HeapPageHeader)page)->pd_xid_base > CHANGE_XID_BASE)
(void)heap_change_xidbase_after_freeze(onerel, buf);
}
}
/*
* If there are no indexes then we can vacuum the page right now
* instead of doing a second scan.

View File

@ -3401,7 +3401,7 @@ void heap_slot_store_heap_tuple(HeapTuple tuple, TupleTableSlot* slot, Buffer bu
*
* Note: Only the dead tuple of pg_partition needs to be verified in the current code.
*/
bool HeapKeepInvisbleTuple(HeapTuple tuple, TupleDesc tupleDesc, KeepInvisbleTupleFunc checkKeepFunc)
bool HeapKeepInvisibleTuple(HeapTuple tuple, TupleDesc tupleDesc, KeepInvisbleTupleFunc checkKeepFunc)
{
static KeepInvisbleOpt keepInvisibleArray[] = {
{PartitionRelationId, Anum_pg_partition_parttype, PartitionLocalIndexSkipping},

View File

@ -3454,6 +3454,34 @@ static void HeapPageShiftBase(Buffer buffer, Page page, bool multi, int64 delta)
}
}
void heap_invalid_invisible_tuple(HeapTuple tuple)
{
HeapTupleSetXmin(tuple, InvalidTransactionId);
HeapTupleSetXmax(tuple, InvalidTransactionId);
tuple->t_data->t_infomask &= ~HEAP_XMAX_BITS;
tuple->t_data->t_infomask &= ~HEAP_XMIN_COMMITTED;
tuple->t_data->t_infomask |= HEAP_XMIN_INVALID;
Assert(!HeapTupleIsHotUpdated(tuple));
ereport(LOG, (errmsg("Dead and invisible tuple: t_ctid = { ip_blkid = { bi_hi = %hu, bi_lo = %hu }, "
"ip_posid = %hu }, t_xmin = %u, xmax = %u, infomask = %hu",
tuple->t_data->t_ctid.ip_blkid.bi_hi,
tuple->t_data->t_ctid.ip_blkid.bi_lo,
tuple->t_data->t_ctid.ip_posid,
tuple->t_data->t_choice.t_heap.t_xmin,
tuple->t_data->t_choice.t_heap.t_xmin,
tuple->t_data->t_infomask)));
}
static inline bool heap_check_invalid_invisible_tuple(HeapTuple tuple, TupleDesc tupleDesc,
TransactionId cutoff_xid, Buffer buffer)
{
return (t_thrd.proc->workingVersionNum >= INVALID_INVISIBLE_TUPLE_VERSION)
&& !HeapTupleIsHotUpdated(tuple) && HeapKeepInvisibleTuple(tuple, tupleDesc)
&& (HeapTupleSatisfiesVacuum(tuple, cutoff_xid, buffer) == HEAPTUPLE_DEAD);
}
/*
* Freeze xids in the single heap page. Useful when we can't fit new xid even
* with base shift.
@ -3466,7 +3494,9 @@ static int freeze_single_heap_page(Relation relation, Buffer buffer)
OffsetNumber maxoff = InvalidOffsetNumber;
HeapTupleData tuple;
int nfrozen = 0;
int ninvalid = 0;
OffsetNumber frozen[MaxOffsetNumber];
OffsetNumber invalid[MaxOffsetNumber];
TransactionId latest_removed_xid = InvalidTransactionId;
TransactionId oldest_xmin = InvalidTransactionId;
TransactionId freeze_xid = InvalidTransactionId;
@ -3534,13 +3564,18 @@ static int freeze_single_heap_page(Relation relation, Buffer buffer)
tuple.t_len = ItemIdGetLength(itemid);
tuple.t_tableOid = RelationGetRelid(relation);
tuple.t_bucketId = RelationGetBktid(relation);
ItemPointerSet(&(tuple.t_self), BufferGetBlockNumber(buffer), offnum);
HeapTupleCopyBaseFromPage(&tuple, page);
/*
* Each non-removable tuple must be checked to see if it needs
* freezing. Note we already have exclusive buffer lock.
*/
if (heap_freeze_tuple(&tuple, freeze_xid, freeze_mxid, &changedMultiXid)) {
if (heap_check_invalid_invisible_tuple(&tuple, RelationGetDescr(relation), freeze_xid, buffer)) {
heap_invalid_invisible_tuple(&tuple);
Assert(tuple.t_tableOid == PartitionRelationId);
invalid[ninvalid++] = offnum;
} else if (heap_freeze_tuple(&tuple, freeze_xid, freeze_mxid, &changedMultiXid)) {
frozen[nfrozen++] = offnum;
}
} /* scan along page */
@ -3565,6 +3600,20 @@ static int freeze_single_heap_page(Relation relation, Buffer buffer)
END_CRIT_SECTION();
}
if (ninvalid > 0) {
START_CRIT_SECTION();
MarkBufferDirty(buffer);
/* Now WAL-log freezing if necessary */
if (RelationNeedsWAL(relation)) {
XLogRecPtr recptr = log_heap_invalid(relation, buffer, freeze_xid,
invalid, ninvalid);
PageSetLSN(page, recptr);
}
END_CRIT_SECTION();
}
gstrace_exit(GS_TRC_ID_freeze_single_heap_page);
return nfrozen;
}
@ -7976,7 +8025,7 @@ XLogRecPtr log_heap_freeze(Relation reln, Buffer buffer, TransactionId cutoff_xi
OffsetNumber* offsets, int offcnt)
{
xl_heap_freeze xlrec;
XLogRecPtr recptr;
XLogRecPtr recptr = InvalidXLogRecPtr;
bool useOldXlog = t_thrd.proc->workingVersionNum < ENHANCED_TUPLE_LOCK_VERSION_NUM ||
!MultiXactIdIsValid(cutoff_multi);
#ifdef ENABLE_MULTIPLE_NODES
@ -8002,7 +8051,41 @@ XLogRecPtr log_heap_freeze(Relation reln, Buffer buffer, TransactionId cutoff_xi
XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
XLogRegisterBufData(0, (char*)offsets, offcnt * sizeof(OffsetNumber));
recptr = XLogInsert(RM_HEAP2_ID, useOldXlog ? XLOG_HEAP2_FREEZE : XLOG_HEAP2_FREEZE | XLOG_TUPLE_LOCK_UPGRADE_FLAG);
recptr = XLogInsert(RM_HEAP2_ID,
useOldXlog ? XLOG_HEAP2_FREEZE : XLOG_HEAP2_FREEZE | XLOG_TUPLE_LOCK_UPGRADE_FLAG);
return recptr;
}
/*
* Perform XLogInsert for a heap-invalid operation. Caller must already
* have modified the buffer and marked it dirty.
*/
XLogRecPtr log_heap_invalid(Relation reln, Buffer buffer, TransactionId cutoff_xid, OffsetNumber* offsets,
int offcnt)
{
xl_heap_invalid xlrecInvalid;
XLogRecPtr recptr = InvalidXLogRecPtr;
/* Caller should not call me on a non-WAL-logged relation */
Assert(RelationNeedsWAL(reln));
/* nor when there are no tuples to invalid */
Assert(offcnt > 0);
xlrecInvalid.cutoff_xid = cutoff_xid;
XLogBeginInsert();
XLogRegisterData((char*)&xlrecInvalid, SizeOfHeapInvalid);
/*
* The tuple-offsets array is not actually in the buffer, but pretend that
* it is. When XLogInsert stores the whole buffer, the offsets array need
* not be stored too.
*/
XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
XLogRegisterBufData(0, (char*)offsets, offcnt * sizeof(OffsetNumber));
recptr = XLogInsert(RM_HEAP3_ID, XLOG_HEAP3_INVALID);
return recptr;
}
@ -8665,6 +8748,37 @@ static void heap_xlog_freeze(XLogReaderState* record)
}
}
static void heap_xlog_invalid(XLogReaderState* record)
{
xl_heap_invalid* xlrecInvalid = (xl_heap_invalid*)XLogRecGetData(record);
TransactionId cutoff_xid = xlrecInvalid->cutoff_xid;
RedoBufferInfo buffer;
/*
* In Hot Standby mode, ensure that there's no queries running which still
* consider the frozen xids as running.
*/
if (InHotStandby && g_supportHotStandby) {
RelFileNode rnode;
(void)XLogRecGetBlockTag(record, HEAP_FREEZE_ORIG_BLOCK_NUM, &rnode, NULL, NULL);
XLogRecPtr lsn = record->EndRecPtr;
ResolveRecoveryConflictWithSnapshot(cutoff_xid, rnode, lsn);
}
if (XLogReadBufferForRedo(record, HEAP_FREEZE_ORIG_BLOCK_NUM, &buffer) == BLK_NEEDS_REDO) {
Size blkdatalen;
char* blkdata = XLogRecGetBlockData(record, HEAP_FREEZE_ORIG_BLOCK_NUM, &blkdatalen);
HeapXlogInvalidOperatorPage(&buffer, (void*)blkdata, blkdatalen);
MarkBufferDirty(buffer.buf);
}
if (BufferIsValid(buffer.buf)) {
UnlockReleaseBuffer(buffer.buf);
}
}
/*
* Replay XLOG_HEAP2_VISIBLE record.
* The critical integrity requirement here is that we must never end up with
@ -9289,6 +9403,9 @@ void heap3_redo(XLogReaderState* record)
break;
case XLOG_HEAP3_REWRITE:
break;
case XLOG_HEAP3_INVALID:
heap_xlog_invalid(record);
break;
default:
ereport(PANIC, (errmsg("heap3_redo: unknown op code %hhu", info)));
}

View File

@ -344,6 +344,7 @@ static int heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rooto
HeapTupleData tup;
tup.t_tableOid = RelationGetRelid(relation);
tup.t_bucketId = RelationGetBktid(relation);
bool keepInvisible = false;
gstrace_entry(GS_TRC_ID_heap_prune_chain);
@ -389,7 +390,7 @@ static int heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rooto
if (HeapTupleSatisfiesVacuum(&tup, oldest_xmin, buffer) == HEAPTUPLE_DEAD &&
!HeapTupleHeaderIsHotUpdated(htup)) {
if (HeapKeepInvisbleTuple(&tup, RelationGetDescr(relation))) {
if (HeapKeepInvisibleTuple(&tup, RelationGetDescr(relation))) {
return ndeleted;
}
@ -485,9 +486,8 @@ static int heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rooto
}
switch (HeapTupleSatisfiesVacuum(&tup, oldest_xmin, buffer)) {
case HEAPTUPLE_DEAD:
if (!HeapKeepInvisbleTuple(&tup, RelationGetDescr(relation))) {
tupdead = true;
}
keepInvisible = HeapKeepInvisibleTuple(&tup, RelationGetDescr(relation));
tupdead = true;
break;
case HEAPTUPLE_RECENTLY_DEAD:
@ -557,6 +557,11 @@ static int heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rooto
prior_xmax = HeapTupleGetUpdateXid(&tup);
}
/* There is only one dead tuple that needs to be retained and no processing is performed */
if (keepInvisible && (nchain == 1)) {
latestdead = InvalidOffsetNumber;
}
/*
* If we found a DEAD tuple in the chain, adjust the HOT chain so that all
* the DEAD tuples at the start of the chain are removed and the root line
@ -571,6 +576,10 @@ static int heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rooto
* right candidate for redirection.
*/
for (i = 1; (i < nchain) && (chainitems[i - 1] != latestdead); i++) {
// The entire chain is dead, but need to keep invisble tuple
if (keepInvisible && (i == nchain - 1)) {
break;
}
heap_prune_record_unused(prstate, chainitems[i]);
ndeleted++;
}

View File

@ -151,6 +151,32 @@ void HeapXlogFreezeOperatorPage(RedoBufferInfo *buffer, void *recorddata, void *
PageSetLSN(page, buffer->lsn);
}
void HeapXlogInvalidOperatorPage(RedoBufferInfo *buffer, void *blkdata, Size datalen)
{
Page page = buffer->pageinfo.page;
if (datalen > 0) {
OffsetNumber *offsets = (OffsetNumber *)blkdata;
OffsetNumber *offsets_end = (OffsetNumber *)((char *)offsets + datalen);
HeapTupleData tuple;
while (offsets < offsets_end) {
/* offsets[] entries are one-based */
ItemId lp = PageGetItemId(page, *offsets);
tuple.t_data = (HeapTupleHeader)PageGetItem(page, lp);
tuple.t_len = ItemIdGetLength(lp);
HeapTupleCopyBaseFromPage(&tuple, page);
ItemPointerSet(&(tuple.t_self), buffer->blockinfo.blkno, *offsets);
heap_invalid_invisible_tuple(&tuple);
offsets++;
}
}
PageSetLSN(page, buffer->lsn);
}
void HeapXlogVisibleOperatorPage(RedoBufferInfo *buffer, void *recorddata)
{
xl_heap_visible *xlrec = (xl_heap_visible *)recorddata;
@ -1083,6 +1109,45 @@ static XLogRecParseState *HeapXlogFreezeParseBlock(XLogReaderState *record, uint
return recordstatehead;
}
static XLogRecParseState *HeapXlogInvalidParseBlock(XLogReaderState *record, uint32 *blocknum)
{
*blocknum = 1;
XLogRecParseState *blockstate = NULL;
XLogRecParseState *recordstatehead = NULL;
XLogParseBufferAllocListFunc(record, &recordstatehead, NULL);
if (recordstatehead == NULL) {
return NULL;
}
XLogRecSetBlockDataState(record, HEAP_FREEZE_ORIG_BLOCK_NUM, recordstatehead);
/*
* In Hot Standby mode, ensure that there's no queries running which still consider the
* invalid xids as running.
*/
if (g_supportHotStandby) {
(*blocknum)++;
/* need notify hot standby */
XLogParseBufferAllocListFunc(record, &blockstate, recordstatehead);
if (blockstate == NULL) {
return NULL;
}
/* get cutoff xid */
xl_heap_invalid *xlrecInvalid = (xl_heap_invalid *)XLogRecGetData(record);
TransactionId cutoff_xid = xlrecInvalid->cutoff_xid;
RelFileNode rnode;
XLogRecGetBlockTag(record, HEAP_FREEZE_ORIG_BLOCK_NUM, &rnode, NULL, NULL);
RelFileNodeForkNum filenode =
RelFileNodeForkNumFill(&rnode, InvalidBackendId, InvalidForkNumber, InvalidBlockNumber);
XLogRecSetBlockCommonState(record, BLOCK_DATA_INVALIDMSG_TYPE, filenode, blockstate);
XLogRecSetInvalidMsgState(&blockstate->blockparse.extra_rec.blockinvalidmsg, cutoff_xid);
}
return recordstatehead;
}
static XLogRecParseState *HeapXlogCleanParseBlock(XLogReaderState *record, uint32 *blocknum)
{
xl_heap_clean *xlrec = (xl_heap_clean *)XLogRecGetData(record);
@ -1526,6 +1591,19 @@ static void HeapXlogFreezeBlock(XLogBlockHead *blockhead, XLogBlockDataParse *bl
}
}
static void HeapXlogInvalidBlock(XLogBlockHead *blockhead, XLogBlockDataParse *blockdatarec, RedoBufferInfo *bufferinfo)
{
XLogBlockDataParse *datadecode = blockdatarec;
XLogRedoAction action = XLogCheckBlockDataRedoAction(datadecode, bufferinfo);
if (action == BLK_NEEDS_REDO) {
Size blkdatalen;
char *blkdata = XLogBlockDataGetBlockData(datadecode, &blkdatalen);
Assert(blkdata != NULL);
HeapXlogInvalidOperatorPage(bufferinfo, (void *)blkdata, blkdatalen);
MakeRedoBufferDirty(bufferinfo);
}
}
static void HeapXlogCleanBlock(XLogBlockHead *blockhead, XLogBlockDataParse *blockdatarec, RedoBufferInfo *bufferinfo)
{
XLogBlockDataParse *datadecode = blockdatarec;
@ -1638,6 +1716,9 @@ XLogRecParseState *Heap3RedoParseToBlock(XLogReaderState *record, uint32 *blockn
break;
case XLOG_HEAP3_REWRITE:
break;
case XLOG_HEAP3_INVALID:
recordblockstate = HeapXlogInvalidParseBlock(record, blocknum);
break;
default:
ereport(PANIC, (errmsg("Heap3RedoParseToBlock: unknown op code %u", info)));
}
@ -1654,6 +1735,9 @@ void Heap3RedoDataBlock(XLogBlockHead *blockhead, XLogBlockDataParse *blockdatar
break;
case XLOG_HEAP3_REWRITE:
break;
case XLOG_HEAP3_INVALID:
HeapXlogInvalidBlock(blockhead, blockdatarec, bufferinfo);
break;
default:
ereport(PANIC, (errmsg("heap3_redo_block: unknown op code %u", info)));
}

View File

@ -337,6 +337,8 @@ const char* heap3_type_name(uint8 subtype)
return "heap3_new_cid";
} else if (info == XLOG_HEAP3_REWRITE) {
return "heap3_rewrite";
} else if (info == XLOG_HEAP3_INVALID) {
return "heap3_invalid";
} else {
return "unkown_type";
}
@ -350,6 +352,26 @@ void heap3_desc(StringInfo buf, XLogReaderState *record)
appendStringInfo(buf, "XLOG_HEAP_NEW_CID");
} else if (info == XLOG_HEAP3_REWRITE) {
appendStringInfo(buf, "XLOG_HEAP2_REWRITE");
} else
} else if (info == XLOG_HEAP3_INVALID) {
xl_heap_invalid *xlrecInvalid = (xl_heap_invalid *)XLogRecGetData(record);
appendStringInfo(buf, "invalid: cutoff xid %lu", xlrecInvalid->cutoff_xid);
if (!XLogRecHasBlockImage(record, 0)) {
Size datalen;
OffsetNumber *offsets = (OffsetNumber *)XLogRecGetBlockData(record, 0, &datalen);
if (datalen > 0) {
OffsetNumber *offsets_end = (OffsetNumber *)((char *)offsets + datalen);
appendStringInfo(buf, " offsets: [");
while (offsets < offsets_end) {
appendStringInfo(buf, " %d ", *offsets);
offsets++;
}
appendStringInfo(buf, "]");
}
}
} else {
appendStringInfo(buf, "UNKNOWN");
}
}

View File

@ -198,7 +198,7 @@ static const RmgrDispatchData g_dispatchTable[RM_MAX_ID + 1] = {
{ DispatchSeqRecord, RmgrRecordInfoValid, RM_SEQ_ID, XLOG_SEQ_LOG, XLOG_SEQ_LOG },
{ DispatchSpgistRecord, RmgrRecordInfoValid, RM_SPGIST_ID, XLOG_SPGIST_CREATE_INDEX, XLOG_SPGIST_VACUUM_REDIRECT },
{ DispatchRepSlotRecord, RmgrRecordInfoValid, RM_SLOT_ID, XLOG_SLOT_CREATE, XLOG_TERM_LOG },
{ DispatchHeap3Record, RmgrRecordInfoValid, RM_HEAP3_ID, XLOG_HEAP3_NEW_CID, XLOG_HEAP3_REWRITE },
{ DispatchHeap3Record, RmgrRecordInfoValid, RM_HEAP3_ID, XLOG_HEAP3_NEW_CID, XLOG_HEAP3_INVALID },
{ DispatchBarrierRecord, RmgrRecordInfoValid, RM_BARRIER_ID, XLOG_BARRIER_CREATE, XLOG_BARRIER_SWITCHOVER },
#ifdef ENABLE_MOT
{DispatchMotRecord, NULL, RM_MOT_ID, 0, 0},
@ -867,7 +867,13 @@ static bool DispatchRepSlotRecord(XLogReaderState *record, List *expectedTLIs, T
/* Run from the dispatcher thread. */
static bool DispatchHeap3Record(XLogReaderState *record, List *expectedTLIs, TimestampTz recordXTime)
{
DispatchTxnRecord(record, expectedTLIs);
uint8 info = (XLogRecGetInfo(record) & (~XLR_INFO_MASK));
if (info == XLOG_HEAP3_INVALID) {
DispatchRecordWithPages(record, expectedTLIs);
} else {
DispatchTxnRecord(record, expectedTLIs);
}
return false;
}

View File

@ -189,7 +189,7 @@ static const RmgrDispatchData g_dispatchTable[RM_MAX_ID + 1] = {
{ DispatchSeqRecord, RmgrRecordInfoValid, RM_SEQ_ID, XLOG_SEQ_LOG, XLOG_SEQ_LOG },
{ DispatchSpgistRecord, RmgrRecordInfoValid, RM_SPGIST_ID, XLOG_SPGIST_CREATE_INDEX, XLOG_SPGIST_VACUUM_REDIRECT },
{ DispatchRepSlotRecord, RmgrRecordInfoValid, RM_SLOT_ID, XLOG_SLOT_CREATE, XLOG_TERM_LOG },
{ DispatchHeap3Record, RmgrRecordInfoValid, RM_HEAP3_ID, XLOG_HEAP3_NEW_CID, XLOG_HEAP3_REWRITE },
{ DispatchHeap3Record, RmgrRecordInfoValid, RM_HEAP3_ID, XLOG_HEAP3_NEW_CID, XLOG_HEAP3_INVALID },
{ DispatchBarrierRecord, RmgrRecordInfoValid, RM_BARRIER_ID, XLOG_BARRIER_CREATE, XLOG_BARRIER_SWITCHOVER },
#ifdef ENABLE_MOT
@ -720,7 +720,13 @@ static bool DispatchRepSlotRecord(XLogReaderState *record, List *expectedTLIs, T
/* Run from the dispatcher thread. */
static bool DispatchHeap3Record(XLogReaderState *record, List *expectedTLIs, TimestampTz recordXTime)
{
DispatchTxnRecord(record, expectedTLIs, recordXTime, false);
uint8 info = ((XLogRecGetInfo(record) & (~XLR_INFO_MASK)) & XLOG_HEAP_OPMASK);
if (info == XLOG_HEAP3_INVALID) {
DispatchRecordWithPages(record, expectedTLIs, SUPPORT_FPAGE_DISPATCH);
} else {
DispatchTxnRecord(record, expectedTLIs, recordXTime, false);
}
return false;
}

View File

@ -745,6 +745,8 @@ static void DecodeHeap3Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
SnapBuildProcessNewCid(builder, xid, buf->origptr, xlrec, bucket_id);
break;
}
case XLOG_HEAP3_INVALID:
break;
case XLOG_HEAP3_REWRITE:
break;
default:

View File

@ -479,6 +479,8 @@ void ParseHeap3Op(ParallelLogicalDecodingContext *ctx, XLogRecordBuffer *buf, Pa
}
case XLOG_HEAP3_REWRITE:
break;
case XLOG_HEAP3_INVALID:
break;
default:
ereport(WARNING,
(errcode(ERRCODE_UNRECOGNIZED_NODE_TYPE), errmsg("unexpected RM_HEAP3_ID record type: %u", info)));

View File

@ -290,6 +290,8 @@ extern TableScanDesc heap_beginscan_sampling(Relation relation, Snapshot snapsho
extern void heapgetpage(TableScanDesc scan, BlockNumber page);
extern void heap_invalid_invisible_tuple(HeapTuple tuple);
extern void heap_rescan(TableScanDesc sscan, ScanKey key);
extern void heap_endscan(TableScanDesc scan);
extern HeapTuple heap_getnext(TableScanDesc scan, ScanDirection direction);
@ -367,8 +369,10 @@ extern XLogRecPtr log_heap_cleanup_info(const RelFileNode* rnode, TransactionId
extern XLogRecPtr log_heap_clean(Relation reln, Buffer buffer, OffsetNumber* redirected, int nredirected,
OffsetNumber* nowdead, int ndead, OffsetNumber* nowunused, int nunused, TransactionId latestRemovedXid,
bool repair_fragmentation);
extern XLogRecPtr log_heap_freeze(
Relation reln, Buffer buffer, TransactionId cutoff_xid, MultiXactId cutoff_multi, OffsetNumber* offsets, int offcnt);
extern XLogRecPtr log_heap_freeze(Relation reln, Buffer buffer, TransactionId cutoff_xid, MultiXactId cutoff_multi,
OffsetNumber* offsets, int offcnt);
extern XLogRecPtr log_heap_invalid(Relation reln, Buffer buffer, TransactionId cutoff_xid, OffsetNumber* offsets,
int offcnt);
extern XLogRecPtr log_heap_visible(RelFileNode rnode, BlockNumber block, Buffer heap_buffer, Buffer vm_buffer,
TransactionId cutoff_xid, bool free_dict);
extern XLogRecPtr log_cu_bcm(const RelFileNode* rnode, int col, uint64 block, int status, int count);

View File

@ -748,6 +748,7 @@ inline HeapTuple heaptup_alloc(Size size)
/* XLOG_HEAP_NEW_CID with 0x30 in heap is XLOGHEAP2_NEW_CID with 0x70 in heap2 in PG9.4 */
#define XLOG_HEAP3_NEW_CID 0x00
#define XLOG_HEAP3_REWRITE 0x10
#define XLOG_HEAP3_INVALID 0x20
/* we used to put all xl_heap_* together, which made us run out of opcodes (quickly)
* when trying to add a DELETE_IS_SUPER operation. Thus we split the codes carefully
@ -980,6 +981,12 @@ typedef struct xl_heap_freeze {
#define SizeOfOldHeapFreeze (offsetof(xl_heap_freeze, cutoff_xid) + sizeof(TransactionId))
#define SizeOfHeapFreeze (offsetof(xl_heap_freeze, cutoff_multi) + sizeof(MultiXactId))
typedef struct xl_heap_invalid {
TransactionId cutoff_xid;
/* TUPLE OFFSET NUMBERS FOLLOW AT THE END */
} xl_heap_invalid;
#define SizeOfHeapInvalid (offsetof(xl_heap_invalid, cutoff_xid) + sizeof(TransactionId))
typedef struct xl_heap_freeze_tuple {
TransactionId xmax;
OffsetNumber offset;
@ -1212,7 +1219,7 @@ typedef struct KeepInvisbleOpt {
KeepInvisbleTupleFunc checkKeepFunc;
} KeepInvisbleOpt;
bool HeapKeepInvisbleTuple(HeapTuple tuple, TupleDesc tupleDesc, KeepInvisbleTupleFunc checkKeepFunc = NULL);
bool HeapKeepInvisibleTuple(HeapTuple tuple, TupleDesc tupleDesc, KeepInvisbleTupleFunc checkKeepFunc = NULL);
void HeapCopyTupleNoAlloc(HeapTuple dest, HeapTuple src);
// for ut test

File diff suppressed because it is too large Load Diff

View File

@ -80,6 +80,7 @@ extern const uint32 TWOPHASE_FILE_VERSION;
extern const uint32 CLIENT_ENCRYPTION_PROC_VERSION_NUM;
extern const uint32 PRIVS_DIRECTORY_VERSION_NUM;
extern const uint32 COMMENT_RECORD_PARAM_VERSION_NUM;
extern const uint32 INVALID_INVISIBLE_TUPLE_VERSION;
extern const uint32 ENHANCED_TUPLE_LOCK_VERSION_NUM;
extern const uint32 HASUID_VERSION_NUM;
extern const uint32 CREATE_INDEX_CONCURRENTLY_DIST_VERSION_NUM;