twophase bugfix

This commit is contained in:
wuyuechuan 2022-03-18 16:02:52 +08:00
parent 3a552c955d
commit abdcea01c2
14 changed files with 156 additions and 23 deletions

View File

@ -1028,6 +1028,10 @@ void smgrDoPendingDeletes(bool isCommit)
u_sess->catalog_cxt.pendingDeletes = next;
/* do deletion if called for */
if (pending->atCommit == isCommit) {
if (IS_COMPRESS_DELETE_FORK(pending->forknum)) {
SET_OPT_BY_NEGATIVE_FORK(pending->relnode, pending->forknum);
pending->forknum = MAIN_FORKNUM;
}
if (!IsValidColForkNum(pending->forknum)) {
RowRelationDoDeleteFiles(
pending->relnode, pending->backend, pending->ownerid, pending->relOid, isCommit);
@ -1144,7 +1148,11 @@ int smgrGetPendingDeletes(bool forCommit, ColFileNodeRel** ptr, bool skipTemp, i
rptrRel->filenode.spcNode = pending->relnode.spcNode;
rptrRel->filenode.dbNode = pending->relnode.dbNode;
rptrRel->filenode.relNode = pending->relnode.relNode;
rptrRel->forknum = pending->forknum;
if (IS_COMPRESSED_RNODE(pending->relnode, pending->forknum)) {
rptrRel->forknum = COMPRESS_FORKNUM;
} else {
rptrRel->forknum = pending->forknum;
}
rptrRel->ownerid = pending->ownerid;
/* Add bucketid into forknum */
forknum_add_bucketid(rptrRel->forknum, pending->relnode.bucketNode);

View File

@ -1077,8 +1077,6 @@ Oid DefineIndex(Oid relationId, IndexStmt* stmt, Oid indexRelationId, bool is_al
}
}
}
CheckCompressOption(&indexCreateSupport);
/*
* Parse AM-specific options, convert to text array form, validate.
*/

View File

@ -1531,6 +1531,10 @@ void XLogBlockDdlDoSmgrAction(XLogBlockHead *blockhead, void *blockrecbody, Redo
ColFileNodeRel *colFileNodeRel = xnodes + i;
ColFileNode colFileNode;
ColFileNodeCopy(&colFileNode, colFileNodeRel);
if (IS_COMPRESS_DELETE_FORK(colFileNode.forknum)) {
SET_OPT_BY_NEGATIVE_FORK(colFileNode.filenode, colFileNode.forknum);
colFileNode.forknum = MAIN_FORKNUM;
}
if (!IsValidColForkNum(colFileNode.forknum)) {
XlogDropRowReation(colFileNode.filenode);
}

View File

@ -1172,6 +1172,12 @@ static void TrackRelStorageDrop(XLogReaderState *record)
ColFileNodeCopy(&colFileNodeData, colFileNodeRel);
/* set opt to compressOpt if FORKNUM is compress forknum */
if (IS_COMPRESS_DELETE_FORK(colFileNodeData.forknum)) {
SET_OPT_BY_NEGATIVE_FORK(colFileNodeData.filenode, colFileNodeData.forknum);
colFileNodeData.forknum = MAIN_FORKNUM;
}
/* Logic relfilenode delete is ignored */
if (IsSegmentFileNode(colFileNodeData.filenode)) {
continue;

View File

@ -220,6 +220,12 @@ void PRTrackDropFiles(HTAB *redoItemHash, XLogBlockDdlParse *ddlParse, XLogRecPt
ColFileNode colFileNode;
ColFileNodeRel *colFileNodeRel = xnodes + i;
ColFileNodeCopy(&colFileNode, colFileNodeRel);
if (IS_COMPRESS_DELETE_FORK(colFileNode.forknum)) {
SET_OPT_BY_NEGATIVE_FORK(colFileNode.filenode, colFileNode.forknum);
colFileNode.forknum = MAIN_FORKNUM;
}
if (!IsValidColForkNum(colFileNode.forknum)) {
for (int i = 0; i < MAX_FORKNUM; ++i)
PRTrackRelTruncate(redoItemHash, colFileNode.filenode, i, 0);

View File

@ -2612,6 +2612,10 @@ void FinishPreparedTransaction(const char *gid, bool isCommit)
ColFileNodeCopy(&colFileNode, colFileNodeRel);
if (IS_COMPRESS_DELETE_FORK(colFileNode.forknum)) {
SET_OPT_BY_NEGATIVE_FORK(colFileNode.filenode, colFileNode.forknum);
colFileNode.forknum = MAIN_FORKNUM;
}
if (!IsValidColForkNum(colFileNode.forknum)) {
RowRelationDoDeleteFiles(colFileNode.filenode, InvalidBackendId, colFileNode.ownerid);
} else {

View File

@ -7167,6 +7167,10 @@ void push_unlink_rel_to_hashtbl(ColFileNodeRel *xnodes, int nrels)
DelFileTag *entry = NULL;
ColFileNodeCopy(&colFileNode, colFileNodeRel);
if (IS_COMPRESS_DELETE_FORK(colFileNode.forknum)) {
SET_OPT_BY_NEGATIVE_FORK(colFileNode.filenode, colFileNode.forknum);
colFileNode.forknum = MAIN_FORKNUM;
}
if (!IsValidColForkNum(colFileNode.forknum) && !IsSegmentFileNode(colFileNode.filenode)) {
entry = (DelFileTag*)hash_search(relfilenode_hashtbl, &(colFileNode.filenode), HASH_ENTER, &found);
if (!found) {
@ -7212,7 +7216,10 @@ static void unlink_relfiles(_in_ ColFileNodeRel *xnodes, _in_ int nrels)
ColFileNodeRel *colFileNodeRel = xnodes + i;
ColFileNodeCopy(&colFileNode, colFileNodeRel);
if (IS_COMPRESS_DELETE_FORK(colFileNode.forknum)) {
SET_OPT_BY_NEGATIVE_FORK(colFileNode.filenode, colFileNode.forknum);
colFileNode.forknum = MAIN_FORKNUM;
}
if (!IsValidColForkNum(colFileNode.forknum)) {
RelFileNode relFileNode = colFileNode.filenode;
ForkNumber fork;
@ -7815,6 +7822,10 @@ void xactApplyXLogDropRelation(XLogReaderState *record)
ColFileNodeRel *nodeRel = xnodes + i;
ColFileNodeCopy(&node, nodeRel);
if (IS_COMPRESS_DELETE_FORK(node.forknum)) {
SET_OPT_BY_NEGATIVE_FORK(node.filenode, node.forknum);
node.forknum = MAIN_FORKNUM;
}
if (!IsValidColForkNum(node.forknum)) {
for (int fork = 0; fork <= MAX_FORKNUM; fork++)
XLogDropRelation(node.filenode, fork);

View File

@ -1321,6 +1321,10 @@ void XLogForgetDDLRedo(XLogRecParseState *redoblockstate)
ColFileNodeRel *colFileNodeRel = xnodes + i;
ColFileNode colFileNode;
ColFileNodeCopy(&colFileNode, colFileNodeRel);
if (IS_COMPRESS_DELETE_FORK(colFileNode.forknum)) {
SET_OPT_BY_NEGATIVE_FORK(colFileNode.filenode, colFileNode.forknum);
colFileNode.forknum = MAIN_FORKNUM;
}
if (!IsValidColForkNum(colFileNode.forknum)) {
XlogDropRowReation(colFileNode.filenode);
}

View File

@ -6535,7 +6535,7 @@ int ckpt_buforder_comparator(const void *pa, const void *pb)
} else { /* should not be the same block ... */
return 1;
}
/* do not need to compare opt */
/* do not need to compare opt */
}
/*

View File

@ -212,24 +212,25 @@ inline void TransCompressOptions(const RelFileNode& node, RelFileCompressOption*
compressOption = compressOption >> g_cmpBitStruct[CMP_BYTE_CONVERT_INDEX].bitLen;
}
#define SET_COMPRESS_OPTION(node, byteConvert, diffConvert, preChunks, symbol, level, algorithm, chunkSize) \
do { \
(node).opt = (node).opt << g_cmpBitStruct[CMP_BYTE_CONVERT_INDEX].bitLen; \
(node).opt += (byteConvert)&g_cmpBitStruct[CMP_BYTE_CONVERT_INDEX].mask; \
(node).opt = (node).opt << g_cmpBitStruct[CMP_DIFF_CONVERT_INDEX].bitLen; \
(node).opt += (diffConvert)&g_cmpBitStruct[CMP_DIFF_CONVERT_INDEX].mask; \
(node).opt = (node).opt << g_cmpBitStruct[CMP_PRE_CHUNK_INDEX].bitLen; \
(node).opt += (preChunks)&g_cmpBitStruct[CMP_PRE_CHUNK_INDEX].mask; \
(node).opt = (node).opt << g_cmpBitStruct[CMP_COMPRESS_LEVEL_SYMBOL].bitLen; \
(node).opt += (symbol)&g_cmpBitStruct[CMP_COMPRESS_LEVEL_SYMBOL].mask; \
(node).opt = (node).opt << g_cmpBitStruct[CMP_LEVEL_INDEX].bitLen; \
(node).opt += (level)&g_cmpBitStruct[CMP_LEVEL_INDEX].mask; \
(node).opt = (node).opt << g_cmpBitStruct[CMP_ALGORITHM_INDEX].bitLen; \
(node).opt += (algorithm)&g_cmpBitStruct[CMP_ALGORITHM_INDEX].mask; \
(node).opt = (node).opt << g_cmpBitStruct[CMP_CHUNK_SIZE_INDEX].bitLen; \
(node).opt += (chunkSize)&g_cmpBitStruct[CMP_CHUNK_SIZE_INDEX].mask; \
#define SET_COMPRESS_OPTION(node, byteConvert, diffConvert, preChunks, symbol, level, algorithm, chunkSize) \
do { \
(node).opt = 0; \
(node).opt = (node).opt << g_cmpBitStruct[CMP_BYTE_CONVERT_INDEX].bitLen; \
(node).opt += (byteConvert)&g_cmpBitStruct[CMP_BYTE_CONVERT_INDEX].mask; \
(node).opt = (node).opt << g_cmpBitStruct[CMP_DIFF_CONVERT_INDEX].bitLen; \
(node).opt += (diffConvert)&g_cmpBitStruct[CMP_DIFF_CONVERT_INDEX].mask; \
(node).opt = (node).opt << g_cmpBitStruct[CMP_PRE_CHUNK_INDEX].bitLen; \
(node).opt += (preChunks)&g_cmpBitStruct[CMP_PRE_CHUNK_INDEX].mask; \
(node).opt = (node).opt << g_cmpBitStruct[CMP_COMPRESS_LEVEL_SYMBOL].bitLen; \
(node).opt += (symbol)&g_cmpBitStruct[CMP_COMPRESS_LEVEL_SYMBOL].mask; \
(node).opt = (node).opt << g_cmpBitStruct[CMP_LEVEL_INDEX].bitLen; \
(node).opt += (level)&g_cmpBitStruct[CMP_LEVEL_INDEX].mask; \
(node).opt = (node).opt << g_cmpBitStruct[CMP_ALGORITHM_INDEX].bitLen; \
(node).opt += (algorithm)&g_cmpBitStruct[CMP_ALGORITHM_INDEX].mask; \
(node).opt = (node).opt << g_cmpBitStruct[CMP_CHUNK_SIZE_INDEX].bitLen; \
(node).opt += (chunkSize)&g_cmpBitStruct[CMP_CHUNK_SIZE_INDEX].mask; \
} while (0)
#define GET_ROW_COL_CONVERT(opt) \
(((opt) >> g_cmpBitStruct[CMP_BYTE_CONVERT_INDEX].moveBit) & g_cmpBitStruct[CMP_BYTE_CONVERT_INDEX].mask)
#define GET_DIFF_CONVERT(opt) \
@ -246,8 +247,8 @@ inline void TransCompressOptions(const RelFileNode& node, RelFileCompressOption*
(((opt) >> g_cmpBitStruct[CMP_CHUNK_SIZE_INDEX].moveBit) & g_cmpBitStruct[CMP_CHUNK_SIZE_INDEX].mask)
#define IS_COMPRESSED_MAINFORK(reln, forkNum) ((reln)->smgr_rnode.node.opt != 0 && (forkNum) == MAIN_FORKNUM)
#define IS_COMPRESS_DELETE_FORK(forkNum) ((forkNum) == COMPRESS_FORKNUM)
#define IS_COMPRESSED_RNODE(rnode, forkNum) ((rnode).opt != 0 && (forkNum) == MAIN_FORKNUM)
/* Compress function */
template <bool heapPageData>
extern int TemplateCompressPage(const char* src, char* dst, int dst_size, RelFileCompressOption option);
@ -261,6 +262,11 @@ int CompressPage(const char* src, char* dst, int dst_size, RelFileCompressOption
int DecompressPage(const char* src, char* dst, uint8 algorithm);
#define SET_OPT_BY_NEGATIVE_FORK(rnode, forkNumber) \
do { \
SET_COMPRESS_OPTION((rnode), 0, 0, 0, 0, 0, COMPRESS_ALGORITHM_ZSTD, 0); \
} while (0)
/* Memory mapping function */
extern PageCompressHeader* pc_mmap(int fd, int chunk_size, bool readonly);
extern PageCompressHeader* pc_mmap_real_size(int fd, int size, bool readonly);

View File

@ -31,6 +31,9 @@ typedef enum {
*/
typedef int ForkNumber;
/* used for delete forknum */
#define COMPRESS_FORKNUM -9
#define SEGMENT_EXT_8192_FORKNUM -8
#define SEGMENT_EXT_1024_FORKNUM -7
#define SEGMENT_EXT_128_FORKNUM -6

View File

@ -0,0 +1,37 @@
create schema "compress_2PC";
create table "compress_2PC".file_count(id int, count int);
checkpoint;
\! @abs_bindir@/gsql -r -p @portstring@ -d regression -c "insert into \"compress_2PC\".file_count values(1, `find @abs_srcdir@ | wc -l`)"
-- create rollback
start transaction;
create table "compress_2PC".normal(a text,b integer);
create table "compress_2PC".compress(a text,b integer) with (compresstype=2);
rollback;
-- drop commit
create table "compress_2PC".normal(id int);
create table "compress_2PC".compress(id int) with (compresstype=2);
start transaction;
drop table "compress_2PC".normal;
drop table "compress_2PC".compress;
commit;
-- 2pc create rollback
begin;
create table "compress_2PC".test_abort1(a text,b integer) with (compresstype=2);
create table "compress_2PC".test_abort2(a text,b integer) with (compresstype=2);
create table "compress_2PC".test_abort3(a text,b integer);
prepare transaction 'the first prepare transaction';
rollback prepared 'the first prepare transaction';
--2pc drop rollback
create table "compress_2PC".test_commit1(a text,b integer) with (compresstype=2);
create table "compress_2PC".test_commit2(a text,b integer) with (compresstype=2);
create table "compress_2PC".test_commit3(a text,b integer);
begin;
drop table "compress_2PC".test_commit1;
drop table"compress_2PC". test_commit2;
drop table "compress_2PC".test_commit3;
prepare transaction 'the first prepare transaction';
commit prepared 'the first prepare transaction';
-- checkpoint
checkpoint;
\! @abs_bindir@/gsql -r -p @portstring@ -d regression -c "insert into \"compress_2PC\".file_count values(2, `find @abs_srcdir@ | wc -l`)"
select count(distinct(count)) from "compress_2PC".file_count;

View File

@ -0,0 +1,44 @@
create schema "compress_2PC";
create table "compress_2PC".file_count(id int, count int);
checkpoint;
\! @abs_bindir@/gsql -r -p @portstring@ -d regression -c "insert into \"compress_2PC\".file_count values(1, `find @abs_srcdir@ | wc -l`)"
INSERT 0 1
-- create rollback
start transaction;
create table "compress_2PC".normal(a text,b integer);
create table "compress_2PC".compress(a text,b integer) with (compresstype=2);
rollback;
-- drop commit
create table "compress_2PC".normal(id int);
create table "compress_2PC".compress(id int) with (compresstype=2);
start transaction;
drop table "compress_2PC".normal;
drop table "compress_2PC".compress;
commit;
-- 2pc create rollback
begin;
create table "compress_2PC".test_abort1(a text,b integer) with (compresstype=2);
create table "compress_2PC".test_abort2(a text,b integer) with (compresstype=2);
create table "compress_2PC".test_abort3(a text,b integer);
prepare transaction 'the first prepare transaction';
rollback prepared 'the first prepare transaction';
--2pc drop rollback
create table "compress_2PC".test_commit1(a text,b integer) with (compresstype=2);
create table "compress_2PC".test_commit2(a text,b integer) with (compresstype=2);
create table "compress_2PC".test_commit3(a text,b integer);
begin;
drop table "compress_2PC".test_commit1;
drop table"compress_2PC". test_commit2;
drop table "compress_2PC".test_commit3;
prepare transaction 'the first prepare transaction';
commit prepared 'the first prepare transaction';
-- checkpoint
checkpoint;
\! @abs_bindir@/gsql -r -p @portstring@ -d regression -c "insert into \"compress_2PC\".file_count values(2, `find @abs_srcdir@ | wc -l`)"
INSERT 0 1
select count(distinct(count)) from "compress_2PC".file_count;
count
-------
1
(1 row)

View File

@ -903,6 +903,8 @@ test: toomanyparams
test: test_astore_multixact
test: row_compression/pg_table_size row_compression/unsupported_feature row_compression/normal_test
test: row_compression/row_compression_basebackup
test: row_compression/twophase
test: component_view_enhancements
test: single_node_user_mapping