发布订阅支持以二进制格式发送数据

This commit is contained in:
xue_meng_en 2022-04-07 11:08:45 +08:00
parent 5054ddd002
commit 8292238381
16 changed files with 301 additions and 113 deletions

View File

@ -233,6 +233,7 @@ char* all_data_nodename_list = NULL;
const uint32 USTORE_UPGRADE_VERSION = 92368;
const uint32 PACKAGE_ENHANCEMENT = 92444;
const uint32 SUBSCRIPTION_VERSION = 92580;
const uint32 SUBSCRIPTION_BINARY_VERSION_NUM = 92607;
#ifdef DUMPSYSLOG
char* syslogpath = NULL;
@ -4444,7 +4445,9 @@ void getSubscriptions(Archive *fout)
int i_subslotname;
int i_subsynccommit;
int i_subpublications;
int i, ntups;
int i_subbinary;
int i;
int ntups;
if (no_subscriptions || GetVersionNum(fout) < SUBSCRIPTION_VERSION) {
return;
@ -4469,14 +4472,20 @@ void getSubscriptions(Archive *fout)
resetPQExpBuffer(query);
/* Get the subscriptions in current database. */
appendPQExpBuffer(query,
"SELECT s.tableoid, s.oid, s.subname,"
"(%s s.subowner) AS rolname, "
" s.subconninfo, s.subslotname, s.subsynccommit, s.subpublications "
"FROM pg_catalog.pg_subscription s "
appendPQExpBuffer(query, "SELECT s.tableoid, s.oid, s.subname,"
"(%s s.subowner) AS rolname, s.subconninfo, s.subslotname, "
"s.subsynccommit, s.subpublications, \n", username_subquery);
if (GetVersionNum(fout) >= SUBSCRIPTION_BINARY_VERSION_NUM) {
appendPQExpBuffer(query, " s.subbinary\n");
} else {
appendPQExpBuffer(query, " false AS subbinary\n");
}
appendPQExpBuffer(query, "FROM pg_catalog.pg_subscription s "
"WHERE s.subdbid = (SELECT oid FROM pg_catalog.pg_database"
" WHERE datname = current_database())",
username_subquery);
" WHERE datname = current_database())");
res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
ntups = PQntuples(res);
@ -4494,6 +4503,7 @@ void getSubscriptions(Archive *fout)
i_subslotname = PQfnumber(res, "subslotname");
i_subsynccommit = PQfnumber(res, "subsynccommit");
i_subpublications = PQfnumber(res, "subpublications");
i_subbinary = PQfnumber(res, "subbinary");
subinfo = (SubscriptionInfo *)pg_malloc(ntups * sizeof(SubscriptionInfo));
@ -4512,6 +4522,7 @@ void getSubscriptions(Archive *fout)
}
subinfo[i].subsynccommit = gs_strdup(PQgetvalue(res, i, i_subsynccommit));
subinfo[i].subpublications = gs_strdup(PQgetvalue(res, i, i_subpublications));
subinfo[i].subbinary = gs_strdup(PQgetvalue(res, i, i_subbinary));
if (strlen(subinfo[i].rolname) == 0) {
write_msg(NULL, "WARNING: owner of subscription \"%s\" appears to be invalid\n", subinfo[i].dobj.name);
@ -4578,6 +4589,10 @@ static void dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
appendPQExpBufferStr(query, "NONE");
}
if (strcmp(subinfo->subbinary, "t") == 0) {
appendPQExpBuffer(query, ", binary = true");
}
if (strcmp(subinfo->subsynccommit, "off") != 0) {
appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
}

View File

@ -498,6 +498,7 @@ typedef struct _SubscriptionInfo {
char *subslotname;
char *subsynccommit;
char *subpublications;
char *subbinary;
} SubscriptionInfo;
/* global decls */

View File

@ -91,6 +91,10 @@ Subscription *GetSubscription(Oid subid, bool missing_ok)
}
sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup, Anum_pg_subscription_subbinary, &isnull);
Assert(!isnull);
sub->binary = DatumGetBool(datum);
ReleaseSysCache(tup);
return sub;

View File

@ -59,7 +59,7 @@ bool open_join_children = true;
bool will_shutdown = false;
/* hard-wired binary version number */
const uint32 GRAND_VERSION_NUM = 92605;
const uint32 GRAND_VERSION_NUM = 92606;
const uint32 PREDPUSH_SAME_LEVEL_VERSION_NUM = 92522;
const uint32 UPSERT_WHERE_VERSION_NUM = 92514;
@ -101,6 +101,7 @@ const uint32 PRIVS_DIRECTORY_VERSION_NUM = 92460;
const uint32 COMMENT_RECORD_PARAM_VERSION_NUM = 92484;
const uint32 SCAN_BATCH_MODE_VERSION_NUM = 92568;
const uint32 PUBLICATION_VERSION_NUM = 92580;
const uint32 SUBSCRIPTION_BINARY_VERSION_NUM = 92606;
/* Version number of the guc parameter backend_version added in V500R001C20 */
const uint32 V5R1C20_BACKEND_VERSION_NUM = 92305;

View File

@ -56,7 +56,7 @@ static void ValidateReplicationSlot(char *slotname, List *publications);
* accommodate that.
*/
static void parse_subscription_options(const List *options, char **conninfo, List **publications, bool *enabled_given,
bool *enabled, bool *slot_name_given, char **slot_name, char **synchronous_commit)
bool *enabled, bool *slot_name_given, char **slot_name, char **synchronous_commit, bool *binary_given, bool *binary)
{
ListCell *lc;
@ -76,6 +76,10 @@ static void parse_subscription_options(const List *options, char **conninfo, Lis
if (synchronous_commit) {
*synchronous_commit = NULL;
}
if (binary) {
*binary_given = false;
*binary = false;
}
/* Parse options */
foreach (lc, options) {
@ -124,6 +128,15 @@ static void parse_subscription_options(const List *options, char **conninfo, Lis
/* Test if the given value is valid for synchronous_commit GUC. */
(void)set_config_option("synchronous_commit", *synchronous_commit, PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
false, 0, false);
} else if (strcmp(defel->defname, "binary") == 0 && binary) {
if (*binary_given) {
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
}
*binary_given = true;
*binary = defGetBoolean(defel);
} else {
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR), errmsg("unrecognized subscription parameter: %s", defel->defname)));
@ -296,6 +309,8 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
char *conninfo;
char *slotname;
bool slotname_given;
bool binary;
bool binary_given;
char originname[NAMEDATALEN];
List *publications;
int rc;
@ -305,7 +320,7 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
* Connection and publication should not be specified here.
*/
parse_subscription_options(stmt->options, NULL, NULL, &enabled_given, &enabled, &slotname_given, &slotname,
&synchronous_commit);
&synchronous_commit, &binary_given, &binary);
/*
* Since creating a replication slot is not transactional, rolling back
@ -349,6 +364,7 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
values[Anum_pg_subscription_subname - 1] = DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary);
/* encrypt conninfo */
List *conninfoList = ConninfoToDefList(stmt->conninfo);
@ -439,6 +455,8 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt)
Oid subid;
bool enabled_given = false;
bool enabled;
bool binary_given;
bool binary;
char *synchronous_commit;
char *conninfo;
char *slot_name;
@ -473,7 +491,7 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt)
/* Parse options. */
parse_subscription_options(stmt->options, &conninfo, &publications, &enabled_given, &enabled, &slotname_given,
&slot_name, &synchronous_commit);
&slot_name, &synchronous_commit, &binary_given, &binary);
/* Form a new tuple. */
rc = memset_s(nulls, sizeof(nulls), false, sizeof(nulls));
@ -548,6 +566,10 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt)
values[Anum_pg_subscription_subsynccommit - 1] = CStringGetTextDatum(synchronous_commit);
replaces[Anum_pg_subscription_subsynccommit - 1] = true;
}
if (binary_given) {
values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary);
replaces[Anum_pg_subscription_subbinary - 1] = true;
}
if (publications != NIL) {
values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(publications);
replaces[Anum_pg_subscription_subpublications - 1] = true;

View File

@ -255,8 +255,14 @@ void StartRemoteStreaming(const LibpqrcvConnectParam *options)
stringlist_to_identifierstr(t_thrd.libwalreceiver_cxt.streamConn, options->publicationNames);
appendStringInfo(&cmd, ", publication_names %s",
PQescapeLiteral(t_thrd.libwalreceiver_cxt.streamConn, pubnames_str, strlen(pubnames_str)));
appendStringInfoChar(&cmd, ')');
pfree(pubnames_str);
if (options->binary && PQserverVersion(t_thrd.libwalreceiver_cxt.streamConn) >= 90204) {
appendStringInfoString(&cmd, ", binary 'true'");
ereport(DEBUG5, ( errmsg("append binary true")));
}
appendStringInfoChar(&cmd, ')');
}
PGresult *res = libpqrcv_PQexec(cmd.data);

View File

@ -18,7 +18,6 @@
#include "catalog/pg_type.h"
#include "libpq/pqformat.h"
#include "replication/logicalproto.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
@ -28,7 +27,7 @@
static const int LOGICALREP_IS_REPLICA_IDENTITY = 1;
static void logicalrep_write_attrs(StringInfo out, Relation rel);
static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple);
static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary);
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
@ -115,7 +114,7 @@ void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr orig
/*
* Write INSERT to the output stream.
*/
void logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple)
void logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple, bool binary)
{
pq_sendbyte(out, 'I'); /* action INSERT */
@ -123,7 +122,7 @@ void logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple)
pq_sendint32(out, RelationGetRelid(rel));
pq_sendbyte(out, 'N'); /* new tuple follows */
logicalrep_write_tuple(out, rel, newtuple);
logicalrep_write_tuple(out, rel, newtuple, binary);
}
/*
@ -151,7 +150,7 @@ LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtu
/*
* Write UPDATE to the output stream.
*/
void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, HeapTuple newtuple)
void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, HeapTuple newtuple, bool binary)
{
pq_sendbyte(out, 'U'); /* action UPDATE */
@ -166,11 +165,11 @@ void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, H
pq_sendbyte(out, 'O'); /* old tuple follows */
else
pq_sendbyte(out, 'K'); /* old key follows */
logicalrep_write_tuple(out, rel, oldtuple);
logicalrep_write_tuple(out, rel, oldtuple, binary);
}
pq_sendbyte(out, 'N'); /* new tuple follows */
logicalrep_write_tuple(out, rel, newtuple);
logicalrep_write_tuple(out, rel, newtuple, binary);
}
/*
@ -213,7 +212,7 @@ LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, Logica
/*
* Write DELETE to the output stream.
*/
void logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple)
void logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple, bool binary)
{
char relreplident = RelationGetRelReplident(rel);
Assert(relreplident == REPLICA_IDENTITY_DEFAULT ||
@ -229,7 +228,7 @@ void logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple)
else
pq_sendbyte(out, 'K'); /* old key follows */
logicalrep_write_tuple(out, rel, oldtuple);
logicalrep_write_tuple(out, rel, oldtuple, binary);
}
/*
@ -344,7 +343,7 @@ void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
/*
* Write a tuple to the outputstream, in the most efficient format possible.
*/
static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple)
static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
{
TupleDesc desc;
Datum values[MaxTupleAttributeNumber];
@ -371,7 +370,6 @@ static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple
HeapTuple typtup;
Form_pg_type typclass;
Form_pg_attribute att = desc->attrs[i];
char *outputstr;
/* skip dropped columns */
if (att->attisdropped || GetGeneratedCol(desc, i)) {
@ -379,7 +377,7 @@ static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple
}
if (isnull[i]) {
pq_sendbyte(out, 'n'); /* null column */
pq_sendbyte(out, LOGICALREP_COLUMN_NULL); /* null column */
continue;
}
@ -388,61 +386,93 @@ static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple
elog(ERROR, "cache lookup failed for type %u", att->atttypid);
typclass = (Form_pg_type)GETSTRUCT(typtup);
pq_sendbyte(out, 't'); /* 'text' data follows */
if (!typclass->typbyval && typclass->typlen == -1) {
/* definitely detoasted Datum */
Datum val = PointerGetDatum(PG_DETOAST_DATUM(values[i]));
outputstr = OidOutputFunctionCall(typclass->typoutput, val);
/*
* Send in binary if requested and type has suitable send function.
*/
if (binary && OidIsValid(typclass->typsend)) {
bytea* outputbytes = NULL;
pq_sendbyte(out, LOGICALREP_COLUMN_BINARY);
if (!typclass->typbyval && typclass->typlen == -1) {
/* definitely detoasted Datum */
Datum val = PointerGetDatum(PG_DETOAST_DATUM(values[i]));
outputbytes = OidSendFunctionCall(typclass->typsend, val);
} else {
outputbytes = OidSendFunctionCall(typclass->typsend, values[i]);
}
int len = VARSIZE(outputbytes) - VARHDRSZ;
pq_sendint(out, len, 4); /* length */
pq_sendbytes(out, VARDATA(outputbytes), len); /* data */
if (outputbytes != NULL) {
pfree(outputbytes);
}
} else {
outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
char *outputstr;
pq_sendbyte(out, LOGICALREP_COLUMN_TEXT);
if (!typclass->typbyval && typclass->typlen == -1) {
/* definitely detoasted Datum */
Datum val = PointerGetDatum(PG_DETOAST_DATUM(values[i]));
outputstr = OidOutputFunctionCall(typclass->typoutput, val);
} else {
outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
}
pq_sendcountedtext(out, outputstr, strlen(outputstr), false);
if (outputstr != NULL) {
pfree(outputstr);
}
}
pq_sendcountedtext(out, outputstr, strlen(outputstr), false);
pfree(outputstr);
ReleaseSysCache(typtup);
}
}
/*
* Read tuple in remote format from stream.
*
* The returned tuple points into the input stringinfo.
* Read tuple in logical replication format from stream.
*/
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
{
uint16 i;
uint16 natts;
int rc;
/* Get number of attributes. */
natts = pq_getmsgint(in, sizeof(uint16));
uint16 natts = pq_getmsgint(in, sizeof(uint16));
rc = memset_s(tuple->changed, sizeof(tuple->changed), 0, sizeof(tuple->changed));
securec_check(rc, "", "");
/* Allocate space for per-column values; zero out unused StringInfoDatas */
tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData));
tuple->colstatus = (char *) palloc(natts * sizeof(char));
tuple->ncols = natts;
/* Read the data */
for (i = 0; i < natts; i++) {
char kind;
kind = pq_getmsgbyte(in);
for (uint16 i = 0; i < natts; i++) {
char kind = pq_getmsgbyte(in);
tuple->colstatus[i] = kind;
uint32 len;
StringInfo value = &tuple->colvalues[i];
switch (kind) {
case 'n': /* null */
tuple->values[i] = NULL;
tuple->changed[i] = true;
case LOGICALREP_COLUMN_NULL: /* null */
/* nothing more to do */
break;
case 't': { /* text formatted value */
uint32 len;
tuple->changed[i] = true;
len = pq_getmsgint(in, sizeof(uint32)); /* read length */
case LOGICALREP_COLUMN_TEXT:
len = pq_getmsgint(in, sizeof(uint32)); /* read length */
/* and data */
tuple->values[i] = (char *)palloc(len + 1);
pq_copymsgbytes(in, tuple->values[i], len);
tuple->values[i][len] = '\0';
value->data = (char *) palloc((len + 1) * sizeof(char));
pq_copymsgbytes(in, value->data, len);
value->data[len] = '\0';
/* make StringInfo fully valid */
value->len = len;
value->cursor = 0;
value->maxlen = len;
break;
case LOGICALREP_COLUMN_BINARY:
len = pq_getmsgint(in, sizeof(uint32)); /* read length */
/* and data */
value->data = (char *) palloc((len + 1) * sizeof(char));
pq_copymsgbytes(in, value->data, len);
/* not strictly necessary but per StringInfo practice */
value->data[len] = '\0';
/* make StringInfo fully valid */
value->len = len;
value->cursor = 0;
value->maxlen = len;
break;
}
default:
elog(ERROR, "unrecognized data representation type '%c'", kind);
break;
@ -451,7 +481,7 @@ static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
}
/*
* Write relation attributes to the stream.
* Write relation attributes metadata to the stream.
*/
static void logicalrep_write_attrs(StringInfo out, Relation rel)
{
@ -504,7 +534,7 @@ static void logicalrep_write_attrs(StringInfo out, Relation rel)
}
/*
* Read relation attribute names from the stream.
* Read relation attribute metadata from the stream.
*/
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
{

View File

@ -265,11 +265,11 @@ static void slot_store_error_callback(void *arg)
}
/*
* Store data in C string form into slot.
* This is similar to BuildTupleFromCStrings but TupleTableSlot fits our
* use better.
* Store tuple data into slot.
*
* Incoming data can be either text or binary format.
*/
static void slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, char **values)
static void slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
{
int natts = slot->tts_tupleDescriptor->natts;
int i;
@ -286,19 +286,52 @@ static void slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel
errcallback.previous = t_thrd.log_cxt.error_context_stack;
t_thrd.log_cxt.error_context_stack = &errcallback;
/* Call the "in" function for each non-dropped attribute */
/* Call the "in" function for each non-dropped, non-null attribute */
for (i = 0; i < natts; i++) {
Form_pg_attribute att = slot->tts_tupleDescriptor->attrs[i];
int remoteattnum = rel->attrmap[i];
Oid typinput;
Oid typioparam;
if (!att->attisdropped && remoteattnum >= 0 && values[remoteattnum] != NULL) {
if (!att->attisdropped && remoteattnum >= 0) {
StringInfo colvalue = &tupleData->colvalues[remoteattnum];
errarg.remote_attnum = remoteattnum;
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
slot->tts_values[i] = OidInputFunctionCall(typinput, values[remoteattnum], typioparam, att->atttypmod);
slot->tts_isnull[i] = false;
if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT) {
Oid typinput;
Oid typioparam;
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
slot->tts_values[i] = OidInputFunctionCall(typinput, colvalue->data, typioparam, att->atttypmod);
slot->tts_isnull[i] = false;
} else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY) {
Oid typreceive;
Oid typioparam;
/*
* In some code paths we may be asked to re-parse the same
* tuple data. Reset the StringInfo's cursor so that works.
*/
colvalue->cursor = 0;
getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
slot->tts_values[i] = OidReceiveFunctionCall(typreceive, colvalue, typioparam, att->atttypmod);
/* Trouble if it didn't eat the whole buffer */
if (colvalue->cursor != colvalue->len) {
ereport(ERROR, (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
errmsg("incorrect binary data format in logical replication column %d",
remoteattnum + 1)));
}
slot->tts_isnull[i] = false;
} else {
/*
* NULL value from remote. (We don't expect to see
* LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
* NULL.)
*/
slot->tts_values[i] = (Datum) 0;
slot->tts_isnull[i] = true;
}
/* Reset attnum for error callback */
errarg.remote_attnum = -1;
} else {
/*
@ -318,18 +351,19 @@ static void slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel
}
/*
* Replace selected columns with user data provided as C strings.
* Replace updated columns with data from the LogicalRepTupleData struct.
* This is somewhat similar to heap_modify_tuple but also calls the type
* input functions on the user data.
* "slot" is filled with a copy of the tuple in "srcslot", with
* columns selected by the "replaces" array replaced with data values
* from "values".
*
* "slot" is filled with a copy of the tuple in "srcslot", replacing
* columns provided in "tupleData" and leaving others as-is.
*
* Caution: unreplaced pass-by-ref columns in "slot" will point into the
* storage for "srcslot". This is OK for current usage, but someday we may
* need to materialize "slot" at the end to make it independent of "srcslot".
*/
static void slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel,
char **values, const bool *replaces)
static void slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel,
LogicalRepTupleData *tupleData)
{
int natts = slot->tts_tupleDescriptor->natts;
int i;
@ -364,23 +398,47 @@ static void slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot,
Form_pg_attribute att = slot->tts_tupleDescriptor->attrs[i];
int remoteattnum = rel->attrmap[i];
if (remoteattnum < 0 || !replaces[remoteattnum]) {
if (remoteattnum < 0) {
continue;
}
if (values[remoteattnum] != NULL) {
Oid typinput;
Oid typioparam;
if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED) {
StringInfo colvalue = &tupleData->colvalues[remoteattnum];
errarg.remote_attnum = remoteattnum;
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
slot->tts_values[i] = OidInputFunctionCall(typinput, values[remoteattnum], typioparam, att->atttypmod);
slot->tts_isnull[i] = false;
if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT) {
Oid typinput;
Oid typioparam;
getTypeInputInfo(att->atttypid, &typinput, &typioparam);
slot->tts_values[i] = OidInputFunctionCall(typinput, colvalue->data, typioparam, att->atttypmod);
slot->tts_isnull[i] = false;
} else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY) {
Oid typreceive;
Oid typioparam;
/*
* In some code paths we may be asked to re-parse the same
* tuple data. Reset the StringInfo's cursor so that works.
*/
colvalue->cursor = 0;
getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
slot->tts_values[i] = OidReceiveFunctionCall(typreceive, colvalue, typioparam, att->atttypmod);
/* Trouble if it didn't eat the whole buffer */
if (colvalue->cursor != colvalue->len) {
ereport(ERROR, (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
errmsg("incorrect binary data format in logical replication column %d", remoteattnum + 1)));
}
slot->tts_isnull[i] = false;
} else {
/* must be LOGICALREP_COLUMN_NULL */
slot->tts_values[i] = (Datum) 0;
slot->tts_isnull[i] = true;
}
errarg.remote_attnum = -1;
} else {
slot->tts_values[i] = (Datum)0;
slot->tts_isnull[i] = true;
}
}
@ -516,7 +574,7 @@ static void apply_handle_insert(StringInfo s)
PushActiveSnapshot(GetTransactionSnapshot());
/* Process and store remote tuple in the slot */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
slot_store_cstrings(remoteslot, rel, newtup.values);
slot_store_data(remoteslot, rel, &newtup);
slot_fill_defaults(rel, estate, remoteslot);
MemoryContextSwitchTo(oldctx);
@ -646,7 +704,7 @@ static void apply_handle_update(StringInfo s)
int remoteattnum = rel->attrmap[i];
if (!att->attisdropped && remoteattnum >= 0) {
Assert(remoteattnum < newtup.ncols);
if (newtup.changed[i]) {
if (newtup.colstatus[i] != LOGICALREP_COLUMN_UNCHANGED) {
target_rte->updatedCols = bms_add_member(target_rte->updatedCols,
i + 1 - FirstLowInvalidHeapAttributeNumber);
}
@ -660,7 +718,7 @@ static void apply_handle_update(StringInfo s)
/* Build the search tuple. */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
slot_store_cstrings(remoteslot, rel, has_oldtup ? oldtup.values : newtup.values);
slot_store_data(remoteslot, rel, has_oldtup ? &oldtup : &newtup);
MemoryContextSwitchTo(oldctx);
/*
@ -683,7 +741,7 @@ static void apply_handle_update(StringInfo s)
if (found) {
/* Process and store remote tuple in the slot */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
slot_modify_cstrings(remoteslot, localslot, rel, newtup.values, newtup.changed);
slot_modify_data(remoteslot, localslot, rel, &newtup);
MemoryContextSwitchTo(oldctx);
EvalPlanQualSetSlot(&epqstate, remoteslot);
@ -748,7 +806,7 @@ static void apply_handle_delete(StringInfo s)
/* Find the tuple using the replica identity index. */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
slot_store_cstrings(remoteslot, rel, oldtup.values);
slot_store_data(remoteslot, rel, &oldtup);
MemoryContextSwitchTo(oldctx);
/*
@ -1224,6 +1282,17 @@ static void reread_subscription(void)
proc_exit(0);
}
/*
* Exit if any parameter that affects the remote connection was changed.
* The launcher will start a new worker.
*/
if (strcmp(newsub->name, t_thrd.applyworker_cxt.mySubscription->name) != 0 ||
newsub->binary != t_thrd.applyworker_cxt.mySubscription->binary) {
ereport(LOG, (errmsg("logical replication apply worker for subscription \"%s\" "
"will restart because of a parameter change", t_thrd.applyworker_cxt.mySubscription->name)));
proc_exit(0);
}
/* !slotname should never happen when enabled is true. */
Assert(newsub->slotname);
@ -1447,6 +1516,7 @@ void ApplyWorkerMain()
options.slotname = t_thrd.applyworker_cxt.mySubscription->slotname;
options.protoVersion = LOGICALREP_PROTO_VERSION_NUM;
options.publicationNames = t_thrd.applyworker_cxt.mySubscription->publications;
options.binary = t_thrd.applyworker_cxt.mySubscription->binary;
/* Start streaming from the slot. */
(WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_startstreaming(&options);

View File

@ -15,6 +15,8 @@
#include "catalog/pg_publication.h"
#include "commands/defrem.h"
#include "replication/logical.h"
#include "replication/logicalproto.h"
#include "replication/origin.h"
@ -74,11 +76,14 @@ void _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->shutdown_cb = pgoutput_shutdown;
}
static void parse_output_parameters(List *options, PGOutputData *data)
static void parse_output_parameters(List* options, PGOutputData* data)
{
ListCell *lc;
bool protocol_version_given = false;
bool publication_names_given = false;
bool binary_option_given = false;
data->binary = false;
foreach (lc, options) {
DefElem *defel = (DefElem *)lfirst(lc);
@ -108,6 +113,12 @@ static void parse_output_parameters(List *options, PGOutputData *data)
if (!SplitIdentifierString(strVal(defel->arg), ',', &(data->publication_names)))
ereport(ERROR, (errcode(ERRCODE_INVALID_NAME), errmsg("invalid publication_names syntax")));
} else if (strcmp(defel->defname, "binary") == 0) {
if (binary_option_given)
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options")));
binary_option_given = true;
data->binary = defGetBoolean(defel);
} else
elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
}
@ -300,19 +311,19 @@ static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
switch (change->action) {
case REORDER_BUFFER_CHANGE_INSERT:
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_insert(ctx->out, relation, &change->data.tp.newtuple->tuple);
logicalrep_write_insert(ctx->out, relation, &change->data.tp.newtuple->tuple, data->binary);
OutputPluginWrite(ctx, true);
break;
case REORDER_BUFFER_CHANGE_UINSERT:
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_insert(ctx->out, relation, (HeapTuple)(&change->data.utp.newtuple->tuple));
logicalrep_write_insert(ctx->out, relation, (HeapTuple)(&change->data.utp.newtuple->tuple), data->binary);
OutputPluginWrite(ctx, true);
break;
case REORDER_BUFFER_CHANGE_UPDATE: {
HeapTuple oldtuple = change->data.tp.oldtuple ? &change->data.tp.oldtuple->tuple : NULL;
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_update(ctx->out, relation, oldtuple, &change->data.tp.newtuple->tuple);
logicalrep_write_update(ctx->out, relation, oldtuple, &change->data.tp.newtuple->tuple, data->binary);
OutputPluginWrite(ctx, true);
break;
}
@ -320,14 +331,14 @@ static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
HeapTuple oldtuple = change->data.utp.oldtuple ? ((HeapTuple)(&change->data.utp.oldtuple->tuple)) : NULL;
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_update(ctx->out, relation, oldtuple, (HeapTuple)(&change->data.utp.newtuple->tuple));
logicalrep_write_update(ctx->out, relation, oldtuple, (HeapTuple)(&change->data.utp.newtuple->tuple), data->binary);
OutputPluginWrite(ctx, true);
break;
}
case REORDER_BUFFER_CHANGE_DELETE:
if (change->data.tp.oldtuple) {
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_delete(ctx->out, relation, &change->data.tp.oldtuple->tuple);
logicalrep_write_delete(ctx->out, relation, &change->data.tp.oldtuple->tuple, data->binary);
OutputPluginWrite(ctx, true);
} else
elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
@ -335,7 +346,7 @@ static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
case REORDER_BUFFER_CHANGE_UDELETE:
if (change->data.utp.oldtuple) {
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_delete(ctx->out, relation, (HeapTuple)(&change->data.utp.oldtuple->tuple));
logicalrep_write_delete(ctx->out, relation, (HeapTuple)(&change->data.utp.oldtuple->tuple), data->binary);
OutputPluginWrite(ctx, true);
} else
elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");

View File

@ -50,13 +50,15 @@ CATALOG(pg_subscription,6126) BKI_SHARED_RELATION BKI_ROWTYPE_OID(6128) BKI_SCHE
NameData subslotname; /* Slot name on publisher */
text subsynccommit; /* Synchronous commit setting for worker */
text subpublications[1]; /* List of publications subscribed to */
bool subbinary; /* True if the subscription wants the
* publisher to send data in binary */
#endif
}
FormData_pg_subscription;
typedef FormData_pg_subscription *Form_pg_subscription;
#define Natts_pg_subscription 8
#define Natts_pg_subscription 9
#define Anum_pg_subscription_subdbid 1
#define Anum_pg_subscription_subname 2
#define Anum_pg_subscription_subowner 3
@ -65,6 +67,7 @@ typedef FormData_pg_subscription *Form_pg_subscription;
#define Anum_pg_subscription_subslotname 6
#define Anum_pg_subscription_subsynccommit 7
#define Anum_pg_subscription_subpublications 8
#define Anum_pg_subscription_subbinary 9
typedef struct Subscription {
@ -77,6 +80,7 @@ typedef struct Subscription {
char *slotname; /* Name of the replication slot */
char *synccommit; /* Synchronous commit setting for worker */
List *publications; /* List of publication names to subscribe to */
bool binary; /* Indicates if the subscription wants data in binary format */
} Subscription;

View File

@ -90,6 +90,7 @@ extern const uint32 SUPPORT_DATA_REPAIR;
extern const uint32 SCAN_BATCH_MODE_VERSION_NUM;
extern const uint32 RELMAP_4K_VERSION_NUM;
extern const uint32 PUBLICATION_VERSION_NUM;
extern const uint32 SUBSCRIPTION_BINARY_VERSION_NUM;
extern const uint32 ANALYZER_HOOK_VERSION_NUM;
extern const uint32 SUPPORT_HASH_XLOG_VERSION_NUM;
extern const uint32 PITR_INIT_VERSION_NUM;

View File

@ -35,6 +35,7 @@ typedef struct LibpqrcvConnectParam {
bool logical;
uint32 protoVersion; /* Logical protocol version */
List *publicationNames; /* String list of publications */
bool binary; /* Ask publisher to use binary */
}LibpqrcvConnectParam;
extern int32 pg_atoi(char* s, int size, int c);

View File

@ -35,12 +35,21 @@
* Keep in mind that the columns correspond to the *remote* table.
*/
typedef struct LogicalRepTupleData {
char *values[MaxTupleAttributeNumber]; /* value in out function format or NULL if values is NULL */
bool changed[MaxTupleAttributeNumber]; /* marker for changed/unchanged values */
/* Array of StringInfos, one per column; some may be unused */
StringInfoData *colvalues;
/* Array of markers for null/unchanged/text/binary, one per column */
char *colstatus;
/* Length of above arrays */
int ncols;
} LogicalRepTupleData;
/* Possible values for LogicalRepTupleData.colstatus[colnum] */
/* These values are also used in the on-the-wire protocol */
#define LOGICALREP_COLUMN_NULL 'n'
#define LOGICALREP_COLUMN_UNCHANGED 'u'
#define LOGICALREP_COLUMN_TEXT 't'
#define LOGICALREP_COLUMN_BINARY 'b' /* added in PG14 */
typedef uint32 LogicalRepRelId;
/* Relation information */
@ -81,12 +90,12 @@ extern void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data
extern void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
extern void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data);
extern void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn);
extern void logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple);
extern void logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple, bool binary);
extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
extern void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, HeapTuple newtuple);
extern void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, HeapTuple newtuple, bool binary);
extern LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup,
LogicalRepTupleData *newtup);
extern void logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple);
extern void logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple, bool binary);
extern LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup);
extern void logicalrep_write_rel(StringInfo out, Relation rel);
extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);

View File

@ -24,6 +24,7 @@ typedef struct PGOutputData {
List *publication_names;
List *publications;
bool binary;
} PGOutputData;
#endif /* PGOUTPUT_H */

View File

@ -37,7 +37,7 @@ CREATE SUBSCRIPTION testsub_maskconninfo CONNECTION 'host=''1.2.3.4'' port=''123
ALTER SUBSCRIPTION testsub CONNECTION 'host=''1.2.3.4'' port=''12345'' user=''username'' dbname=''postgres'' password=''password_1234''';
ALTER SUBSCRIPTION testsub CONNECTION 'dbname=does_not_exist';
reset client_min_messages;
select subname, pg_get_userbyid(subowner) as Owner, subenabled, subconninfo, subpublications from pg_subscription where subname='testsub';
select subname, pg_get_userbyid(subowner) as Owner, subenabled, subconninfo, subpublications, subbinary from pg_subscription where subname='testsub';
--- alter subscription
------ set publication
ALTER SUBSCRIPTION testsub SET PUBLICATION testpub2, testpub3;
@ -57,6 +57,9 @@ select subname, subenabled, subsynccommit from pg_subscription where subname='t
ALTER SUBSCRIPTION testsub SET (slot_name='testsub');
-- alter owner
ALTER SUBSCRIPTION testsub owner to regress_subscription_user2;
-- alter subbinary to true
ALTER SUBSCRIPTION testsub SET (binary=true);
select subname, subbinary from pg_subscription where subname='testsub';
--rename
ALTER SUBSCRIPTION testsub rename to testsub_rename;
--- inside a transaction block

View File

@ -76,10 +76,10 @@ CREATE SUBSCRIPTION testsub_maskconninfo CONNECTION 'host=''1.2.3.4'' port=''123
ALTER SUBSCRIPTION testsub CONNECTION 'host=''1.2.3.4'' port=''12345'' user=''username'' dbname=''postgres'' password=''password_1234''';
ALTER SUBSCRIPTION testsub CONNECTION 'dbname=does_not_exist';
reset client_min_messages;
select subname, pg_get_userbyid(subowner) as Owner, subenabled, subconninfo, subpublications from pg_subscription where subname='testsub';
subname | owner | subenabled | subconninfo | subpublications
---------+---------------------------+------------+------------------------+-----------------
testsub | regress_subscription_user | f | dbname=does_not_exist | {testpub}
select subname, pg_get_userbyid(subowner) as Owner, subenabled, subconninfo, subpublications, subbinary from pg_subscription where subname='testsub';
subname | owner | subenabled | subconninfo | subpublications | subbinary
---------+---------------------------+------------+----------------------+-----------------+-----------
testsub | regress_subscription_user | f | dbname=doesnotexist | {testpub} | f
(1 row)
--- alter subscription
@ -122,6 +122,14 @@ ALTER SUBSCRIPTION testsub SET (slot_name='testsub');
ERROR: Currently enabled=false, cannot change slot_name to a non-null value.
-- alter owner
ALTER SUBSCRIPTION testsub owner to regress_subscription_user2;
-- alter subbinary to true
ALTER SUBSCRIPTION testsub SET (binary=true);
select subname, subbinary from pg_subscription where subname='testsub';
subname | subbinary
---------+-----------
testsub | t
(1 row)
--rename
ALTER SUBSCRIPTION testsub rename to testsub_rename;
--- inside a transaction block
@ -281,10 +289,11 @@ SELECT object_name,detail_info FROM pg_query_audit('2022-01-13 9:30:00', '2031-1
testsub_maskconninfo | ALTER SUBSCRIPTION testsub_maskconninfo SET (conninfo='*************************************************************************************************************************;
testsub | ALTER SUBSCRIPTION testsub SET (synchronous_commit=on);
testsub | ALTER SUBSCRIPTION testsub owner to regress_subscription_user2;
testsub | ALTER SUBSCRIPTION testsub SET (binary=true);
testsub | ALTER SUBSCRIPTION testsub rename to testsub_rename;
testsub_rename | DROP SUBSCRIPTION IF EXISTS testsub_rename;
testsub_maskconninfo | DROP SUBSCRIPTION IF EXISTS testsub_maskconninfo;
(14 rows)
(15 rows)
--clear audit log
SELECT pg_delete_audit('1012-11-10', '3012-11-11');