Revert "发布订阅支持主备切换不断开"

This reverts commit aadf88ecc6.
This commit is contained in:
xue_meng_en 2022-03-31 14:54:58 +08:00
parent ee30df5221
commit e7795cd01c
12 changed files with 92 additions and 434 deletions

View File

@ -28,6 +28,7 @@
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/syscache.h"
#include "replication/worker_internal.h"
static List *textarray_to_stringlist(ArrayType *textarray);
@ -68,7 +69,7 @@ Subscription *GetSubscription(Oid subid, bool missing_ok)
/* Get slotname */
datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup, Anum_pg_subscription_subslotname, &isnull);
if (unlikely(isnull)) {
if (!isnull) {
sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
} else {
sub->slotname = NULL;
@ -91,10 +92,7 @@ Subscription *GetSubscription(Oid subid, bool missing_ok)
sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup, Anum_pg_subscription_subbinary, &isnull);
if (unlikely(isnull)) {
ereport(ERROR, (errcode(ERRCODE_UNEXPECTED_NULL_VALUE),
errmsg("null binary for subscription %u", subid)));
}
Assert(!isnull);
sub->binary = DatumGetBool(datum);
ReleaseSysCache(tup);
@ -189,7 +187,7 @@ char *get_subscription_name(Oid subid, bool missing_ok)
}
/* Clear the list content, only deal with DefElem and string content */
void ClearListContent(List *list)
static void ClearListContent(List *list)
{
ListCell *cell = NULL;
foreach(cell, list) {
@ -209,6 +207,25 @@ void ClearListContent(List *list)
}
}
/*
* Decrypt conninfo for subscription.
* IMPORTANT: caller should clear and free the memory after using it immediately
*/
char *DecryptConninfo(char *encryptConninfo)
{
const char* sensitiveOptionsArray[] = {"password"};
const int sensitiveArrayLength = lengthof(sensitiveOptionsArray);
List *defList = ConninfoToDefList(encryptConninfo);
DecryptOptions(defList, sensitiveOptionsArray, sensitiveArrayLength, SUBSCRIPTION_MODE);
char *decryptConninfo = DefListToString(defList);
/* defList has plain content, clear it before free */
ClearListContent(defList);
list_free_ext(defList);
/* IMPORTANT: caller should clear and free the memory after using it immediately */
return decryptConninfo;
}
/*
* Convert text array to list of strings.
*

View File

@ -44,7 +44,7 @@
#include "utils/array.h"
#include "utils/acl.h"
static bool ConnectPublisher(char* conninfo, char* slotname);
static void ConnectPublisher(char *conninfo, char* slotname);
static void CreateSlotInPublisher(char *slotname);
static void ValidateReplicationSlot(char *slotname, List *publications);
@ -210,82 +210,26 @@ static Datum publicationListToArray(List *publist)
}
/*
* Parse the original connection string which is encrypted, poll all hosts and ports,
* and try to connect to the publisher.
* When checkRemoteMode is true, the remotemode must be normal or primary.
* Return true to indicate successful connection.
* connect publisher and create slot.
* the input conninfo should be encrypt, we will decrypt password inside
*/
bool AttemptConnectPublisher(const char *conninfoOriginal, char* slotname, bool checkRemoteMode)
{
size_t conninfoLen = strlen(conninfoOriginal) + 1;
char* conninfo = NULL;
StringInfoData conninfoWithoutHostport;
initStringInfo(&conninfoWithoutHostport);
HostPort* hostPortList[MAX_REPLNODE_NUM] = {NULL};
ParseConninfo(conninfoOriginal, &conninfoWithoutHostport, hostPortList);
if (hostPortList[0] == NULL) {
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg(
"invalid connection string syntax, missing host and port")));
}
bool connectSuccess = false;
conninfo = (char*)palloc(conninfoLen * sizeof(char));
for (int i = 0; i < MAX_REPLNODE_NUM; ++i) {
if (hostPortList[i] == NULL) {
break;
}
int ret = snprintf_s(conninfo, conninfoLen, conninfoLen - 1,
"%s host=%s port=%s", conninfoWithoutHostport.data,
hostPortList[i]->host, hostPortList[i]->port);
securec_check_ss(ret, "\0", "\0");
connectSuccess = ConnectPublisher(conninfo, slotname);
if (!connectSuccess) {
/* try next host */
continue;
}
if (!checkRemoteMode) {
break;
}
ServerMode publisherServerMde = IdentifyRemoteMode();
if (publisherServerMde == NORMAL_MODE || publisherServerMde == PRIMARY_MODE) {
break;
}
/* it's a standby, try next host */
(WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_disconnect();
connectSuccess = false;
}
pfree_ext(conninfo);
/* clean up */
FreeStringInfo(&conninfoWithoutHostport);
for (int i = 0; i < MAX_REPLNODE_NUM; ++i) {
if (hostPortList[i] == NULL) {
break;
}
pfree_ext(hostPortList[i]->host);
pfree_ext(hostPortList[i]->port);
pfree_ext(hostPortList[i]);
}
return connectSuccess;
}
/*
* connect to publisher with conninfo
*/
static bool ConnectPublisher(char* conninfo, char* slotname)
static void ConnectPublisher(char *conninfo, char *slotname)
{
/* Try to connect to the publisher. */
volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv;
SpinLockAcquire(&walrcv->mutex);
walrcv->conn_target = REPCONNTARGET_PUBLICATION;
SpinLockRelease(&walrcv->mutex);
char* decryptConninfo = EncryptOrDecryptConninfo(conninfo, 'D');
char *decryptConninfo = DecryptConninfo(conninfo);
bool connectSuccess = (WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_connect(decryptConninfo, NULL, slotname, -1);
int rc = memset_s(decryptConninfo, strlen(decryptConninfo), 0, strlen(decryptConninfo));
securec_check(rc, "", "");
pfree_ext(decryptConninfo);
return connectSuccess;
if (!connectSuccess) {
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("could not connect to the publisher")));
}
}
/*
@ -362,6 +306,7 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
bool enabled_given = false;
bool enabled = true;
char *synchronous_commit;
char *conninfo;
char *slotname;
bool slotname_given;
bool binary;
@ -403,10 +348,11 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
synchronous_commit = "off";
}
conninfo = stmt->conninfo;
publications = stmt->publication;
/* Check the connection info string. */
libpqrcv_check_conninfo(stmt->conninfo);
libpqrcv_check_conninfo(conninfo);
/* Everything ok, form a new tuple. */
rc = memset_s(values, sizeof(values), 0, sizeof(values));
@ -421,9 +367,16 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary);
/* encrypt conninfo */
char *encryptConninfo = EncryptOrDecryptConninfo(stmt->conninfo, 'E');
List *conninfoList = ConninfoToDefList(stmt->conninfo);
/* Sensitive options for subscription, will be encrypted when saved to catalog. */
const char* sensitiveOptionsArray[] = {"password"};
const int sensitiveArrayLength = lengthof(sensitiveOptionsArray);
EncryptGenericOptions(conninfoList, sensitiveOptionsArray, sensitiveArrayLength, SUBSCRIPTION_MODE);
char *encryptConninfo = DefListToString(conninfoList);
values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(encryptConninfo);
pfree_ext(conninfoList);
if (enabled) {
if (!slotname_given) {
slotname = stmt->subname;
@ -459,14 +412,11 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
*/
if (enabled) {
Assert(slotname);
if (!AttemptConnectPublisher(encryptConninfo, slotname, true)) {
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("Failed to connect to publisher.")));
}
ConnectPublisher(encryptConninfo, slotname);
CreateSlotInPublisher(slotname);
(WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_disconnect();
}
pfree_ext(encryptConninfo);
heap_close(rel, RowExclusiveLock);
rc = memset_s(stmt->conninfo, strlen(stmt->conninfo), 0, strlen(stmt->conninfo));
@ -558,15 +508,23 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt)
if (conninfo) {
/* Check the connection info string. */
libpqrcv_check_conninfo(conninfo);
encryptConninfo = EncryptOrDecryptConninfo(conninfo, 'E');
rc = memset_s(conninfo, strlen(conninfo), 0, strlen(conninfo));
securec_check(rc, "\0", "\0");
values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(encryptConninfo);
replaces[Anum_pg_subscription_subconninfo - 1] = true;
/* encrypt conninfo */
List *conninfoList = ConninfoToDefList(conninfo);
/* Sensitive options for subscription, will be encrypted when saved to catalog. */
const char* sensitiveOptionsArray[] = {"password"};
const int sensitiveArrayLength = lengthof(sensitiveOptionsArray);
EncryptGenericOptions(conninfoList, sensitiveOptionsArray, sensitiveArrayLength, SUBSCRIPTION_MODE);
encryptConninfo = DefListToString(conninfoList);
needFreeConninfo = true;
/* need to check whether new conninfo can be used to connect to new publisher */
values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(encryptConninfo);
replaces[Anum_pg_subscription_subconninfo - 1] = true;
pfree_ext(conninfoList);
if (sub->enabled || (enabled_given && enabled)) {
/* we need to check whether new conninfo can be used to connect to new publisher */
checkConn = true;
}
}
@ -634,18 +592,16 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt)
if (sub->enabled && !enabled) {
ereport(ERROR, (errmsg("If you want to deactivate this subscription, use DROP SUBSCRIPTION.")));
}
/* enabling subscription, but slot hasn't been created,
* then mark createSlot to true.
*/
if (!sub->enabled && enabled && (!sub->slotname || !*(sub->slotname))) {
/* enable subscription */
if (!sub->enabled && enabled) {
/* if slot hasn't been created, then create it */
if (!sub->slotname || !*(sub->slotname)) {
createSlot = true;
}
}
if (checkConn || createSlot || validateSlot) {
if (!AttemptConnectPublisher(encryptConninfo, finalSlotName, true)) {
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg(
checkConn ? "The new conninfo cannot connect to new publisher." : "Failed to connect to publisher.")));
}
ConnectPublisher(encryptConninfo, finalSlotName);
if (createSlot) {
CreateSlotInPublisher(finalSlotName);
@ -663,6 +619,12 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt)
if (needFreeConninfo) {
pfree_ext(encryptConninfo);
}
if (conninfo) {
rc = memset_s(conninfo, strlen(conninfo), 0, strlen(conninfo));
securec_check(rc, "", "");
}
return myself;
}
@ -813,11 +775,7 @@ void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
initStringInfo(&cmd);
appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s", quote_identifier(slotname));
if (!AttemptConnectPublisher(conninfo, slotname, true)) {
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg(
"could not connect to publisher.")));
}
ConnectPublisher(conninfo, slotname);
PG_TRY();
{
int sqlstate = 0;
@ -843,7 +801,6 @@ void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
(WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_disconnect();
pfree_ext(conninfo);
pfree(cmd.data);
heap_close(rel, NoLock);
}
@ -973,149 +930,3 @@ void RenameSubscription(List *oldname, const char *newname)
return;
}
/*
* Parse the host or port string into a string array,
* where host and port are separated by ",".
* input: conn --- host or port string separated by ","
* output: connArray --- host or port string array
* return: the length of connArray
* for example:
* (1):
* conn = 1.1.1.1,2.2.2.2,...,9.9.9.9
* connArray = {
* 1,.1.1.1,
* 2.2.2.2,
* ...,
* 9.9.9.9
* }
* return 9
* (2):
* conn = 1,2,...,9
* connArray = {1,2,...,9}
* return 9
*/
static int HostsPortsToArray(const char* conn, char** connArray)
{
if (conn == NULL) {
return 0;
}
char* cp = NULL;
char* cur = NULL;
char *buf = pstrdup(conn);
cp = buf;
int i = 0;
while (*cp) {
cur = cp;
while (*cp && *cp != ',') {
++cp;
}
if (*cp == ',') {
*cp = '\0';
++cp;
}
if (i >= MAX_REPLNODE_NUM) {
ereport(ERROR, (errmsg("Currently, a maximum of %d servers are "
"supported.", MAX_REPLNODE_NUM)));
}
connArray[i++] = pstrdup(cur);
if (*cp == 0) {
break;
}
}
pfree(buf);
return i;
}
/*
* parse host and port
*/
static void ParseHostPort(char* hoststr, char* portstr, HostPort** hostPortList)
{
char* hosts[MAX_REPLNODE_NUM] = {NULL};
char* ports[MAX_REPLNODE_NUM] = {NULL};
int hostNum = HostsPortsToArray(hoststr, hosts);
int portNum = HostsPortsToArray(portstr, ports);
if (hostNum != portNum) {
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("The number of host and port are inconsistent.")));
}
for (int i = 0; i < hostNum; ++i) {
hostPortList[i] = (HostPort*)palloc(sizeof(HostPort));
hostPortList[i]->host = hosts[i];
hostPortList[i]->port = ports[i];
}
}
/*
* Parse conninfo
* conninfo format:
* 'dbname=abc user=username password=xxxx host=ip1,ip2,...,ip9 port=p1,p2,...,p9'
* after parsing:
* conninfoWithoutHostPort:
* 'dbname=abc user=username password=xxxx'
* hostPortList:
* {
* {host=ip1, port=p1},
* {host=ip2, port=p2},
* ...
* {host=ip9, port=p9}
* }
*/
void ParseConninfo(const char* conninfo, StringInfoData* conninfoWithoutHostPort, HostPort** hostPortList)
{
List* conninfoList = ConninfoToDefList(conninfo);
ListCell* l = NULL;
char* hostStr = NULL;
char* portStr = NULL;
foreach (l, conninfoList) {
DefElem* defel = (DefElem*)lfirst(l);
if (pg_strcasecmp(defel->defname, "host") == 0) {
hostStr = defGetString(defel);
} else if (pg_strcasecmp(defel->defname, "port") == 0) {
portStr = defGetString(defel);
} else {
appendStringInfo(conninfoWithoutHostPort, "%s=%s ", defel->defname, defGetString(defel));
}
}
if (hostPortList != NULL) {
ParseHostPort(hostStr, portStr, hostPortList);
}
}
/*
* encrypt conninfo when action = 'E'
* decrypt conninfo when action = 'D'
* conninfoNew: encrypted or decrypted conninfo
*/
char* EncryptOrDecryptConninfo(const char* conninfo, const char action)
{
/* parse conninfo to list */
List *conninfoList = ConninfoToDefList(conninfo);
/* Sensitive options for subscription */
const char* sensitiveOptionsArray[] = {"password"};
const int sensitiveArrayLength = lengthof(sensitiveOptionsArray);
switch (action) {
/* Encrypt */
case 'E':
EncryptGenericOptions(conninfoList, sensitiveOptionsArray, sensitiveArrayLength, SUBSCRIPTION_MODE);
break;
/* Decrypt */
case 'D':
DecryptOptions(conninfoList, sensitiveOptionsArray, sensitiveArrayLength, SUBSCRIPTION_MODE);
break;
default:
break;
}
char* conninfoNew = DefListToString(conninfoList);
ClearListContent(conninfoList);
list_free_ext(conninfoList);
return conninfoNew;
}

View File

@ -259,7 +259,7 @@ void StartRemoteStreaming(const LibpqrcvConnectParam *options)
if (options->binary && PQserverVersion(t_thrd.libwalreceiver_cxt.streamConn) >= 90204) {
appendStringInfoString(&cmd, ", binary 'true'");
ereport(DEBUG5, (errmsg("append binary true")));
ereport(DEBUG5, ( errmsg("append binary true")));
}
appendStringInfoChar(&cmd, ')');
@ -426,7 +426,7 @@ void IdentifyRemoteSystem(bool checkRemote)
}
/* identify remote mode, should do this after connect success. */
ServerMode IdentifyRemoteMode()
static ServerMode IdentifyRemoteMode()
{
Assert(t_thrd.libwalreceiver_cxt.streamConn != NULL);
volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv;
@ -449,9 +449,7 @@ ServerMode IdentifyRemoteMode()
num_fields)));
}
remoteMode = (ServerMode)pg_strtoint32(PQgetvalue(res, 0, 0));
if (walrcv->conn_target != REPCONNTARGET_PUBLICATION &&
!t_thrd.walreceiver_cxt.AmWalReceiverForFailover &&
(!IS_PRIMARY_NORMAL(remoteMode)) &&
if (!t_thrd.walreceiver_cxt.AmWalReceiverForFailover && (!IS_PRIMARY_NORMAL(remoteMode)) &&
/* remoteMode of cascade standby is a standby */
!t_thrd.xlog_cxt.is_cascade_standby && !IS_SHARED_STORAGE_MODE) {
PQclear(res);

View File

@ -603,25 +603,3 @@ static const char *logicalrep_read_namespace(StringInfo in)
return nspname;
}
/*
* Write conninfo to the output stream.
*/
void logicalrep_write_conninfo(StringInfo out, char* conninfo)
{
pq_sendbyte(out, 'S'); /* action */
pq_writestring(out, conninfo); /* conninfo follows */
}
/*
* Read conninfo from stream.
*/
void logicalrep_read_conninfo(StringInfo in, char** conninfo)
{
const char* conninfoTemp = pq_getmsgstring(in);
size_t conninfoLen = strlen(conninfoTemp) + 1;
*conninfo = (char*)palloc(conninfoLen);
int rc = strcpy_s(*conninfo, conninfoLen, conninfoTemp);
securec_check(rc, "", "");
}

View File

@ -41,7 +41,6 @@
#include "catalog/pg_partition_fn.h"
#include "commands/trigger.h"
#include "commands/subscriptioncmds.h"
#include "executor/executor.h"
#include "executor/node/nodeModifyTable.h"
@ -112,8 +111,6 @@ static void store_flush_position(XLogRecPtr remote_lsn);
static void reread_subscription(void);
static void ApplyWorkerProcessMsg(char type, StringInfo s, XLogRecPtr *lastRcv);
static void apply_dispatch(StringInfo s);
static void apply_handle_conninfo(StringInfo s);
static void UpdateConninfo(char* standbysInfo);
/* SIGHUP: set flag to re-read config file at next convenient time */
static void LogicalrepWorkerSighub(SIGNAL_ARGS)
@ -885,9 +882,6 @@ static void apply_dispatch(StringInfo s)
case 'O':
apply_handle_origin(s);
break;
case 'S':
apply_handle_conninfo(s);
break;
default:
ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid logical replication message type \"%c\"", action)));
@ -1497,9 +1491,14 @@ void ApplyWorkerMain()
CommitTransactionCommand();
if (!AttemptConnectPublisher(t_thrd.applyworker_cxt.mySubscription->conninfo,
t_thrd.applyworker_cxt.mySubscription->name, true)) {
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("Failed to connect to publisher.")));
char *decryptConnInfo = DecryptConninfo(t_thrd.applyworker_cxt.mySubscription->conninfo);
bool connectSuccess = (WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_connect(decryptConnInfo, NULL,
t_thrd.applyworker_cxt.mySubscription->name, -1);
rc = memset_s(decryptConnInfo, strlen(decryptConnInfo), 0, strlen(decryptConnInfo));
securec_check(rc, "", "");
pfree_ext(decryptConnInfo);
if (!connectSuccess) {
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("could not connect to the publisher")));
}
/*
@ -1666,64 +1665,3 @@ char* DefListToString(const List *defList)
return buf.data;
}
/*
* Handle conninfo update message.
*/
static void apply_handle_conninfo(StringInfo s)
{
char* standbysInfo = NULL;
logicalrep_read_conninfo(s, &standbysInfo);
UpdateConninfo(standbysInfo);
pfree_ext(standbysInfo);
}
static void UpdateConninfo(char* standbysInfo)
{
Relation rel;
bool nulls[Natts_pg_subscription];
bool replaces[Natts_pg_subscription];
Datum values[Natts_pg_subscription];
HeapTuple tup;
Subscription* sub = t_thrd.applyworker_cxt.mySubscription;
Oid subid = sub->oid;
StartTransactionCommand();
rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
/* Fetch the existing tuple. */
tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, u_sess->proc_cxt.MyDatabaseId,
CStringGetDatum(t_thrd.applyworker_cxt.mySubscription->name));
if (!HeapTupleIsValid(tup)) {
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("subscription \"%s\" does not exist",
t_thrd.applyworker_cxt.mySubscription->name)));
}
subid = HeapTupleGetOid(tup);
/* Form a new tuple. */
int rc = memset_s(nulls, sizeof(nulls), false, sizeof(nulls));
securec_check(rc, "", "");
rc = memset_s(values, sizeof(values), 0, sizeof(values));
securec_check(rc, "", "");
rc = memset_s(replaces, sizeof(replaces), false, sizeof(replaces));
securec_check(rc, "", "");
/* get conninfoWithoutHostport */
StringInfoData conninfoWithoutHostport;
initStringInfo(&conninfoWithoutHostport);
ParseConninfo(sub->conninfo, &conninfoWithoutHostport, (HostPort**)NULL);
/* join conninfoWithoutHostport together with standbysinfo */
appendStringInfo(&conninfoWithoutHostport, " %s", standbysInfo);
/* Replace connection information */
values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfoWithoutHostport.data);
replaces[Anum_pg_subscription_subconninfo - 1] = true;
tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, replaces);
/* Update the catalog. */
simple_heap_update(rel, &tup->t_self, tup);
CatalogUpdateIndexes(rel, tup);
heap_close(rel, RowExclusiveLock);
CommitTransactionCommand();
ereport(LOG, (errmsg("Update conninfo successfully, new conninfo %s.", standbysInfo)));
}

View File

@ -46,8 +46,6 @@ static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId orig
static List *LoadPublications(List *pubnames);
static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue);
static bool ReplconninfoChanged();
static void GetConninfo(StringInfoData* standbysInfo);
/* Entry in the map used to remember which relation schemas we sent. */
typedef struct RelationSyncEntry {
@ -219,22 +217,6 @@ static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *t
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_commit(ctx->out, txn, commit_lsn);
OutputPluginWrite(ctx, true);
/*
* Send the newest connecttion information to the subscriber,
* when the connection information about the standby changes.
*/
if (ReplconninfoChanged()) {
StringInfoData standbysInfo;
initStringInfo(&standbysInfo);
GetConninfo(&standbysInfo);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_conninfo(ctx->out, standbysInfo.data);
OutputPluginWrite(ctx, true);
FreeStringInfo(&standbysInfo);
}
}
/*
@ -634,43 +616,3 @@ static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashval
entry->pubactions.pubdelete = false;
}
}
static void GetConninfo(StringInfoData* standbysInfo)
{
bool primaryJoined = false;
StringInfoData hosts;
StringInfoData ports;
initStringInfo(&hosts);
initStringInfo(&ports);
for (int i = 1; i < MAX_REPLNODE_NUM + 1; ++i) {
t_thrd.postmaster_cxt.ReplConnChangeType[i] = 0;
if (t_thrd.postmaster_cxt.ReplConnArray[i] == NULL) {
continue;
}
if (!primaryJoined) {
appendStringInfo(&hosts, "%s,%s",
t_thrd.postmaster_cxt.ReplConnArray[i]->localhost,
t_thrd.postmaster_cxt.ReplConnArray[i]->remotehost);
appendStringInfo(&ports, "%d,%d",
t_thrd.postmaster_cxt.ReplConnArray[i]->localport,
t_thrd.postmaster_cxt.ReplConnArray[i]->remoteport);
primaryJoined = true;
} else {
appendStringInfo(&hosts, ",%s",
t_thrd.postmaster_cxt.ReplConnArray[i]->remotehost);
appendStringInfo(&ports, ",%d",
t_thrd.postmaster_cxt.ReplConnArray[i]->remoteport);
}
}
appendStringInfo(standbysInfo, "host=%s port=%s", hosts.data, ports.data);
}
static inline bool ReplconninfoChanged()
{
for (int i = 1; i < MAX_REPLNODE_NUM; ++i) {
if (t_thrd.postmaster_cxt.ReplConnChangeType[i]) {
return true;
}
}
return false;
}

View File

@ -90,7 +90,6 @@ extern Oid get_subscription_oid(const char *subname, bool missing_ok);
extern char *get_subscription_name(Oid subid, bool missing_ok);
extern int CountDBSubscriptions(Oid dbid);
extern void ClearListContent(List *list);
extern char *DecryptConninfo(char *encryptConninfo);
#endif /* PG_SUBSCRIPTION_H */

View File

@ -17,11 +17,6 @@
#include "nodes/parsenodes.h"
typedef struct HostPort {
char* host;
char* port;
} HostPort;
extern ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel);
extern ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt);
extern void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel);
@ -29,12 +24,6 @@ extern void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel);
extern ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId);
extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId);
extern void RenameSubscription(List* oldname, const char* newname);
extern void AddStandbysInfo(char* standbysInfo);
extern void DropStandbysInfo(char* standbysInfo);
extern void ParseConninfo(const char* conninfo, StringInfoData* conninfoWithoutHostPort, HostPort** hostPortList);
extern char* EncryptOrDecryptConninfo(const char* conninfo, const char action);
extern bool AttemptConnectPublisher(const char *conninfoOriginal, char* slotname, bool checkRemoteMode);
#endif /* SUBSCRIPTIONCMDS_H */

View File

@ -54,6 +54,5 @@ extern bool libpqrcv_command(const char *cmd, char **err, int *sqlstate);
extern void IdentifyRemoteSystem(bool checkRemote);
extern void CreateRemoteReplicationSlot(XLogRecPtr startpoint, const char* slotname, bool isLogical);
extern void StartRemoteStreaming(const LibpqrcvConnectParam *options);
extern ServerMode IdentifyRemoteMode();
#endif

View File

@ -101,7 +101,5 @@ extern void logicalrep_write_rel(StringInfo out, Relation rel);
extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
extern void logicalrep_write_typ(StringInfo out, Oid typoid);
extern void logicalrep_read_typ(StringInfo out, LogicalRepTyp *ltyp);
extern void logicalrep_write_conninfo(StringInfo out, char* conninfo);
extern void logicalrep_read_conninfo(StringInfo in, char** conninfo);
#endif /* LOGICALREP_PROTO_H */

View File

@ -24,7 +24,6 @@ CREATE SUBSCRIPTION testsub CONNECTION 'foo';
CREATE SUBSCRIPTION testsub PUBLICATION foo;
-- fail - could not connect to the publisher
create subscription testsub2 connection 'host=abc' publication pub;
create subscription testsub2 connection 'host=abc port=12345' publication pub;
set client_min_messages to error;
-- fail - syntax error, invalid connection string syntax: missing "="
CREATE SUBSCRIPTION testsub CONNECTION 'testconn' PUBLICATION testpub;
@ -33,10 +32,7 @@ CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub
CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (ENABLED=false, slot_name='testsub', synchronous_commit=off);
-- create SUBSCRIPTION with conninfo in two single quote, used to check mask string bug
CREATE SUBSCRIPTION testsub_maskconninfo CONNECTION 'host=''1.2.3.4'' port=''12345'' user=''username'' dbname=''postgres'' password=''password_1234''' PUBLICATION testpub WITH (ENABLED=false, slot_name='testsub', synchronous_commit=off);
-- fail - The number of host and port are inconsistent
create subscription sub1 connection 'dbname=postgres user=pubusr password=Huawei@123 host=192.168.0.38,192.168.0.38,192.168.0.38 port=14001,14501' publication pub1;
-- fail - a maximum of 9 servers are supported
create subscription sub1 connection 'dbname=postgres user=pubusr password=Huawei@123 host=192.168.0.38,192.168.0.38,192.168.0.38,192.168.0.38,192.168.0.38,192.168.0.38,192.168.0.38,192.168.0.38,192.168.0.38,192.168.0.38 port=14001,14501' publication pub1;
-- alter connection
ALTER SUBSCRIPTION testsub CONNECTION 'host=''1.2.3.4'' port=''12345'' user=''username'' dbname=''postgres'' password=''password_1234''';
ALTER SUBSCRIPTION testsub CONNECTION 'dbname=does_not_exist';

View File

@ -60,10 +60,8 @@ LINE 1: CREATE SUBSCRIPTION testsub PUBLICATION foo;
^
-- fail - could not connect to the publisher
create subscription testsub2 connection 'host=abc' publication pub;
ERROR: The number of host and port are inconsistent.
create subscription testsub2 connection 'host=abc port=12345' publication pub;
WARNING: apply worker could not connect to the remote server
ERROR: Failed to connect to publisher.
ERROR: could not connect to the publisher
set client_min_messages to error;
-- fail - syntax error, invalid connection string syntax: missing "="
CREATE SUBSCRIPTION testsub CONNECTION 'testconn' PUBLICATION testpub;
@ -74,20 +72,14 @@ ERROR: unrecognized subscription parameter: create_slot
CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (ENABLED=false, slot_name='testsub', synchronous_commit=off);
-- create SUBSCRIPTION with conninfo in two single quote, used to check mask string bug
CREATE SUBSCRIPTION testsub_maskconninfo CONNECTION 'host=''1.2.3.4'' port=''12345'' user=''username'' dbname=''postgres'' password=''password_1234''' PUBLICATION testpub WITH (ENABLED=false, slot_name='testsub', synchronous_commit=off);
-- fail - The number of host and port are inconsistent
create subscription sub1 connection 'dbname=postgres user=pubusr password=Huawei@123 host=192.168.0.38,192.168.0.38,192.168.0.38 port=14001,14501' publication pub1;
ERROR: The number of host and port are inconsistent.
-- fail - a maximum of 9 servers are supported
create subscription sub1 connection 'dbname=postgres user=pubusr password=Huawei@123 host=192.168.0.38,192.168.0.38,192.168.0.38,192.168.0.38,192.168.0.38,192.168.0.38,192.168.0.38,192.168.0.38,192.168.0.38,192.168.0.38 port=14001,14501' publication pub1;
ERROR: Currently, a maximum of 9 servers are supported.
-- alter connection
ALTER SUBSCRIPTION testsub CONNECTION 'host=''1.2.3.4'' port=''12345'' user=''username'' dbname=''postgres'' password=''password_1234''';
ALTER SUBSCRIPTION testsub CONNECTION 'dbname=does_not_exist';
reset client_min_messages;
select subname, pg_get_userbyid(subowner) as Owner, subenabled, subconninfo, subpublications, subbinary from pg_subscription where subname='testsub';
subname | owner | subenabled | subconninfo | subpublications | subbinary
---------+---------------------------+------------+------------------------+-----------------+-----------
testsub | regress_subscription_user | f | dbname=does_not_exist | {testpub} | f
subname | owner | subenabled | subconninfo | subpublications | subbinary
---------+---------------------------+------------+----------------------+-----------------+-----------
testsub | regress_subscription_user | f | dbname=doesnotexist | {testpub} | f
(1 row)
--- alter subscription
@ -150,7 +142,8 @@ COMMIT;
-- -- active SUBSCRIPTION
BEGIN;
ALTER SUBSCRIPTION testsub_rename ENABLE;
ERROR: invalid connection string syntax, missing host and port
WARNING: apply worker could not connect to the remote server
ERROR: could not connect to the publisher
select subname, subenabled from pg_subscription where subname='testsub_rename';
ERROR: current transaction is aborted, commands ignored until end of transaction block, firstChar[Q]
ALTER SUBSCRIPTION testsub_rename SET (ENABLED=false);