1. Support the feature of shrinking a standby server

2. Fix the issue of #I1N6G7: link https://gitee.com/opengauss/openGauss-server/issues/I1N6G7
3. link #I1N6G7
This commit is contained in:
testinchina 2020-11-18 11:10:38 +08:00 committed by chenc
parent d046a092ac
commit 25fcadc71f
11 changed files with 1191 additions and 21 deletions

View File

@ -1601,7 +1601,7 @@ char* parse_AZ_result(char* AZStr, const char* data_dir)
array[0] = get_nodename_list_by_AZ(azList[0], data_dir);
// input az name is incorrect
if (NULL == array[0]) {
(void)write_stderr("ERROR: The AZ name \"%s\" does not be found on cluster. please makesure the AZ string "
(void)write_log("ERROR: The AZ name \"%s\" does not be found on cluster. please makesure the AZ string "
"\"%s\" is correct.\n",
azList[0],
AZStr);
@ -1613,7 +1613,7 @@ char* parse_AZ_result(char* AZStr, const char* data_dir)
array[1] = get_nodename_list_by_AZ(azList[1], data_dir);
// input az name is incorrect
if (NULL == array[1]) {
(void)write_stderr("ERROR: The AZ name \"%s\" does not be found on cluster. please makesure the AZ string "
(void)write_log("ERROR: The AZ name \"%s\" does not be found on cluster. please makesure the AZ string "
"\"%s\" is correct.\n",
azList[1],
AZStr);
@ -1626,7 +1626,7 @@ char* parse_AZ_result(char* AZStr, const char* data_dir)
array[2] = get_nodename_list_by_AZ(azList[2], data_dir);
// input az name is incorrect
if (NULL == array[2]) {
(void)write_stderr("ERROR: The AZ name \"%s\" does not be found on cluster. please makesure the AZ string "
(void)write_log("ERROR: The AZ name \"%s\" does not be found on cluster. please makesure the AZ string "
"\"%s\" is correct.\n",
azList[2],
AZStr);
@ -1725,6 +1725,10 @@ char* get_AZ_value(const char* value, const char* data_dir)
char* result = NULL;
size_t len = 0;
char* az1 = getAZNamebyPriority(g_az_master);
char* vouter_ptr = NULL;
char delims[] = ",";
char* vptr = NULL;
char emptyvalue[] = "''";
if (az1 != NULL) {
minLen = strlen("ANY X()") + strlen(az1);
@ -1746,7 +1750,7 @@ char* get_AZ_value(const char* value, const char* data_dir)
}
/* check value length */
if (strlen(value) > MAX_VALUE_LEN || strlen(value) < minLen) {
if (strlen(value) > MAX_VALUE_LEN) {
(void)write_stderr("ERROR: The value of pamameter synchronous_standby_names is incorrect.\n");
return NULL;
}
@ -1756,6 +1760,14 @@ char* get_AZ_value(const char* value, const char* data_dir)
// skip the space
while (isspace((unsigned char)*p))
p++;
if (strlen(p) == 0) {
len = strlen(emptyvalue) + 1;
result = (char*)pg_malloc_zero(len * sizeof(char));
nRet = snprintf_s(result, len, len - 1, "%s", emptyvalue);
securec_check_ss_c(nRet, "\0", "\0");
return result;
}
if (0 != strncmp(p, "FIRST ", strlen("FIRST ")) && 0 != strncmp(p, "ANY ", strlen("ANY "))) {
(void)write_stderr("ERROR: The value of pamameter synchronous_standby_names is incorrect.\n");
@ -1833,7 +1845,25 @@ char* get_AZ_value(const char* value, const char* data_dir)
// parse and check the AZName string
nodenameList = parse_AZ_result(q, data_dir);
if (NULL == nodenameList) {
goto failed;
// try dn
nodenameList = get_nodename_list_by_AZ(az1, data_dir);
vptr = strtok_r(q, delims, &vouter_ptr);
while (NULL != vptr) {
p = vptr;
//p like this: dn_6001, dn_6002...
while (isspace((unsigned char)*p))
p++;
if(NULL == strstr(nodenameList, p)) {
goto failed;
}
vptr = strtok_r(NULL, delims, &vouter_ptr);
}
len = strlen(nodenameList);
nRet = snprintf_s(nodenameList, len + 1, len, q);
} else if ('\0' == nodenameList[0]) {
(void)write_stderr("ERROR: There is no standby node name. Please make sure the value of "
"synchronous_standby_names is correct.\n");
@ -1853,7 +1883,7 @@ char* get_AZ_value(const char* value, const char* data_dir)
len = strlen(preStr) + 5 + strlen(nodenameList);
result = (char*)pg_malloc_zero(len * sizeof(char));
nRet = snprintf_s(result, len, len - 1, "'%s(%s)'", preStr, nodenameList);
securec_check_ss_c(nRet, result, "\0");
securec_check_ss_c(nRet, "\0", "\0");
GS_FREE(nodenameList);
return result;

View File

@ -2945,7 +2945,7 @@ static int ServerLoop(void)
if (g_instance.pid_cxt.RemoteServicePID == 0 && !dummyStandbyMode && IS_PGXC_DATANODE &&
t_thrd.postmaster_cxt.HaShmData->current_mode != NORMAL_MODE && !IS_DN_WITHOUT_STANDBYS_MODE() &&
IsRemoteReadModeOn())
IsRemoteReadModeOn() && get_cur_repl_num())
g_instance.pid_cxt.RemoteServicePID = initialize_util_thread(RPC_SERVICE);
}
@ -9310,14 +9310,6 @@ static void CreateHaListenSocket(void)
/* we should not reset pooler port if it is used */
use_pooler_port = NeedPoolerPort();
if (MAX_REPLNODE_NUM == 8)
Assert(t_thrd.postmaster_cxt.ReplConnArray[1] || t_thrd.postmaster_cxt.ReplConnArray[2] ||
t_thrd.postmaster_cxt.ReplConnArray[3] || t_thrd.postmaster_cxt.ReplConnArray[4] ||
t_thrd.postmaster_cxt.ReplConnArray[5] || t_thrd.postmaster_cxt.ReplConnArray[6] ||
t_thrd.postmaster_cxt.ReplConnArray[7]);
else if (MAX_REPLNODE_NUM == 5)
Assert(t_thrd.postmaster_cxt.ReplConnArray[1] || t_thrd.postmaster_cxt.ReplConnArray[2] ||
t_thrd.postmaster_cxt.ReplConnArray[3] || t_thrd.postmaster_cxt.ReplConnArray[4]);
int ss_rc = memset_s(&newListenAddrs, sizeof(newListenAddrs), 0, sizeof(newListenAddrs));
securec_check(ss_rc, "\0", "\0");

View File

@ -191,16 +191,24 @@ void RemoteServiceMain(void)
char listen_address[MAXPGPATH];
int rc = EOK;
int i = 0;
if (!t_thrd.postmaster_cxt.ReplConnArray[1] || 0 == t_thrd.postmaster_cxt.ReplConnArray[1]->localservice) {
ereport(FATAL, (errmodule(MOD_REMOTE), errmsg("remote service port not available")));
for (i = 1; i < MAX_REPLNODE_NUM; i++) {
if (t_thrd.postmaster_cxt.ReplConnArray[i] && 0 != t_thrd.postmaster_cxt.ReplConnArray[i]->localservice) {
break;
}
}
if (i == MAX_REPLNODE_NUM) {
ereport(WARNING, (errmodule(MOD_REMOTE), errmsg("remote service port not available")));
proc_exit(0);
}
rc = snprintf_s(listen_address,
MAXPGPATH,
(MAXPGPATH - 1),
"%s:%d",
t_thrd.postmaster_cxt.ReplConnArray[1]->localhost,
t_thrd.postmaster_cxt.ReplConnArray[1]->localservice);
t_thrd.postmaster_cxt.ReplConnArray[i]->localhost,
t_thrd.postmaster_cxt.ReplConnArray[i]->localservice);
securec_check_ss(rc, "", "");
GRPC_TRY() {

View File

@ -11160,7 +11160,8 @@ static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo* logSegNo)
* 5 if enable_xlog_prune not set , When there are not quorum standbys connect, keep xlog
*/
if (t_thrd.xlog_cxt.server_mode == PRIMARY_MODE &&
u_sess->attr.attr_storage.guc_synchronous_commit > SYNCHRONOUS_COMMIT_LOCAL_FLUSH) {
u_sess->attr.attr_storage.guc_synchronous_commit > SYNCHRONOUS_COMMIT_LOCAL_FLUSH &&
t_thrd.syncrep_cxt.SyncRepConfig != NULL) {
if (WalSndInProgress(SNDROLE_PRIMARY_BUILDSTANDBY)) {
/* segno = 1 show all file should be keep */
segno = 1;

View File

@ -172,6 +172,9 @@ void UpdateLastHeartbeatTime(const char* remoteHost, int remotePort, TimestampTz
SpinLockAcquire(&stat->mutex);
for (int i = 1; i < MAX_REPLNODE_NUM; i++) {
ReplConnInfo* replconninfo = t_thrd.postmaster_cxt.ReplConnArray[i];
if (replconninfo == NULL) {
continue;
}
if (strncmp((char*)replconninfo->remotehost, (char*)remoteHost, IP_LEN) == 0 &&
replconninfo->remoteheartbeatport == remotePort) {

View File

@ -0,0 +1,319 @@
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
#############################################################################
# Copyright (c) 2020 Huawei Technologies Co.,Ltd.
#
# openGauss is licensed under Mulan PSL v2.
# You can use this software according to the terms
# and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
#
# http://license.coscl.org.cn/MulanPSL2
#
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS,
# WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.
# ----------------------------------------------------------------------------
# Description : gs_dropnode is a utility to drop a standby node from the cluster
#############################################################################
import datetime
import os
import re
import subprocess
import sys
sys.path.append(sys.path[0])
from gspylib.common.DbClusterInfo import dbClusterInfo
from gspylib.common.DbClusterStatus import DbClusterStatus
from gspylib.common.GaussLog import GaussLog
from gspylib.common.Common import DefaultValue, ClusterCommand
from gspylib.common.ErrorCode import ErrorCode
from gspylib.common.ParallelBaseOM import ParallelBaseOM
from gspylib.common.ParameterParsecheck import Parameter
from gspylib.threads.SshTool import SshTool
from impl.dropnode.DropnodeImpl import DropnodeImpl
ENV_LIST = ["MPPDB_ENV_SEPARATE_PATH", "GPHOME", "PATH",
"LD_LIBRARY_PATH", "PYTHONPATH", "GAUSS_WARNING_TYPE",
"GAUSSHOME", "PATH", "LD_LIBRARY_PATH",
"S3_CLIENT_CRT_FILE", "GAUSS_VERSION", "PGHOST",
"GS_CLUSTER_NAME", "GAUSSLOG", "GAUSS_ENV", "umask"]
class Dropnode(ParallelBaseOM):
"""
"""
def __init__(self):
"""
"""
ParallelBaseOM.__init__(self)
# Add the standby node backip list which need to be deleted
self.hostIpListForDel = []
self.hostMapForDel = {}
self.hostMapForExist = {}
self.clusterInfo = dbClusterInfo()
self.backIpNameMap = {}
self.failureHosts = []
self.flagOnlyPrimary = False
envFile = DefaultValue.getEnv("MPPDB_ENV_SEPARATE_PATH")
if envFile:
self.envFile = envFile
else:
self.envFile = "/etc/profile"
self.userProfile = ''
def usage(self):
"""
gs_dropnode is a utility to delete the standby node from a cluster.
Usage:
gs_dropnode -? | --help
gs_dropnode -V | --version
gs_dropnode -U USER -G GROUP -h nodeList
General options:
-U Cluster user.
-G Group of the cluster user.
-h The standby node backip list which need to be deleted
Separate multiple nodes with commas (,).
such as '-h 192.168.0.1,192.168.0.2'
-?, --help Show help information for this
utility, and exit the command line mode.
-V, --version Show version information.
"""
print(self.usage.__doc__)
def parseCommandLine(self):
"""
parse parameter from command line
"""
ParaObj = Parameter()
ParaDict = ParaObj.ParameterCommandLine("dropnode")
# parameter -h or -?
if (ParaDict.__contains__("helpFlag")):
self.usage()
sys.exit(0)
# Resolves command line arguments
# parameter -U
if (ParaDict.__contains__("user")):
self.user = ParaDict.get("user")
DefaultValue.checkPathVaild(self.user)
# parameter -G
if (ParaDict.__contains__("group")):
self.group = ParaDict.get("group")
# parameter -h
if (ParaDict.__contains__("nodename")):
self.hostIpListForDel = ParaDict.get("nodename")
def checkParameters(self):
"""
function: Check parameter from command line
input: NA
output: NA
"""
# check user | group | node
if len(self.user) == 0:
GaussLog.exitWithError(ErrorCode.GAUSS_358["GAUSS_35801"] % "-U")
if len(self.group) == 0:
GaussLog.exitWithError(ErrorCode.GAUSS_358["GAUSS_35801"] % "-G")
if len(self.hostIpListForDel) == 0:
GaussLog.exitWithError(ErrorCode.GAUSS_358["GAUSS_35801"] % "-h")
# get dbcluster info from static config file
self.clusterInfo.initFromStaticConfig(self.user)
self.backIpNameMap = {}
for node in self.clusterInfo.dbNodes:
self.backIpNameMap[node.name] = node.backIps[0]
if node.backIps[0] in self.hostIpListForDel:
self.hostMapForDel[node.name] = {'ipaddr': node.backIps[0],
'datadir': [], 'dn_id': [],
'port': []}
for i in node.datanodes:
self.hostMapForDel[node.name]['datadir'].append(i.datadir)
self.hostMapForDel[node.name]['dn_id'].append(
'dn_' + str(i.instanceId))
self.hostMapForDel[node.name]['port'].append(str(i.port))
else:
self.hostMapForExist[node.name] = {'ipaddr': node.backIps[0],
'datadir': [], 'dn_id': [],
'port': [],
'replToBeDel': [],
'syncStandbyDel': [],
'pghbaDel': []}
for i in node.datanodes:
self.hostMapForExist[node.name]['datadir'].append(i.datadir)
self.hostMapForExist[node.name]['dn_id'].append(
'dn_' + str(i.instanceId))
self.hostMapForExist[node.name]['port'].append(str(i.port))
for ipLoop in self.hostIpListForDel:
if ipLoop not in self.backIpNameMap.values():
GaussLog.exitWithError(ErrorCode.GAUSS_358["GAUSS_35802"] % \
self.hostIpListForDel)
if not self.hostMapForDel:
GaussLog.exitWithError(ErrorCode.GAUSS_358["GAUSS_35802"] % \
self.hostIpListForDel)
# check the node ip is the IP of the current server
localIp = self.backIpNameMap[DefaultValue.GetHostIpOrName()]
if localIp in self.hostIpListForDel:
GaussLog.exitWithError(ErrorCode.GAUSS_358["GAUSS_35803"] % \
localIp)
localNode = self.clusterInfo.getDbNodeByName(
DefaultValue.GetHostIpOrName())
localInstanceType = self.clusterInfo.getdataNodeInstanceType(
localNode.id)
if localInstanceType:
GaussLog.exitWithError(ErrorCode.GAUSS_358["GAUSS_35804"])
def check_repeat_process(self):
"""
function: Check whether only one node be left in the cluster
return a flag
"""
cmd = "ps -ef | grep gs_dropnode | grep -v grep"
(status, output) = subprocess.getstatusoutput(cmd)
if status == 0 and len(output.split('\n')) > 1:
GaussLog.exitWithError(ErrorCode.GAUSS_358["GAUSS_35810"])
def flagForOnlyPrimaryLeft(self):
"""
function: Check whether only one node be left in the cluster
return a flag
"""
countClusterNodes = len(self.backIpNameMap.values())
if (countClusterNodes - len(self.hostIpListForDel)) == 1:
flag = input(
"The cluster will have only one standalone node left after the operation!"
"\nDo you want to continue to drop the target node (yes/no)? ")
count_f = 2
while count_f:
if (
flag.upper() != "YES"
and flag.upper() != "NO"
and flag.upper() != "Y" and flag.upper() != "N"):
count_f -= 1
flag = input("Please type 'yes' or 'no': ")
continue
break
if count_f == 0:
GaussLog.exitWithError(
ErrorCode.GAUSS_358["GAUSS_35805"] % flag.upper())
if flag.upper() == "NO" or flag.upper() == "N":
GaussLog.exitWithError(
ErrorCode.GAUSS_358["GAUSS_35805"] % flag.upper())
self.flagOnlyPrimary = True
def checkClusterStatus(self):
"""
function: Check whether the status of cluster is normal
input: NA
output: NA
"""
tmpDir = DefaultValue.getTmpDirFromEnv()
tmpFile = os.path.join(tmpDir, "gauss_cluster_status.dat_" + \
str(datetime.datetime.now().strftime(
'%Y%m%d%H%M%S')) + "_" + str(os.getpid()))
cmd = ClusterCommand.getQueryStatusCmd(self.user, "", tmpFile, True)
(status, output) = subprocess.getstatusoutput(cmd)
if status != 0:
self.logger.debug("The cmd is %s " % cmd)
raise Exception(ErrorCode.GAUSS_514["GAUSS_51400"] % \
cmd + "Error: \n%s" % output)
# Initialize cluster status information for the temporary file
clusterStatus = DbClusterStatus()
clusterStatus.initFromFile(tmpFile)
clsStatus = clusterStatus.clusterStatus
statusDelHost = " The target node to be dropped is %s \n" % str(
self.hostMapForDel.keys())[9:]
if clsStatus in ["Unknown", "Unavailable"]:
GaussLog.exitWithError(
ErrorCode.GAUSS_358["GAUSS_35806"] % clsStatus)
for dbNode in clusterStatus.dbNodes:
if dbNode.name in self.hostMapForDel.keys():
if dbNode.isNodeHealthy():
statusDelHost += "The status of %s is %s \n" \
% (dbNode.name,
DbClusterStatus.OM_NODE_STATUS_NORMAL)
else:
statusDelHost += "The status of %s is %s \n" \
% (dbNode.name,
DbClusterStatus.OM_NODE_STATUS_ABNORMAL)
flag = input(
statusDelHost + "\n \
Do you want to continue to drop the target node (yes/no)? ")
count_f = 2
while count_f:
if (
flag.upper() != "YES"
and flag.upper() != "NO"
and flag.upper() != "Y" and flag.upper() != "N"):
count_f -= 1
flag = input("Please type 'yes' or 'no': ")
continue
break
if count_f == 0:
GaussLog.exitWithError(
ErrorCode.GAUSS_358["GAUSS_35805"] % flag.upper())
if flag.upper() == "NO" or flag.upper() == "N":
GaussLog.exitWithError(
ErrorCode.GAUSS_358["GAUSS_35805"] % flag.upper())
def checkConnection(self, hostnames, env):
"""
check the node connection, change the timeout to 30s as 330s is too long
if the node which will not be deleted can't be connected, report ERR
else continue
"""
command = "echo 1"
sshTool = SshTool(hostnames, None, 30)
resultMap, outputCollect = sshTool.getSshStatusOutput(command,
hostnames, env)
self.logger.debug(outputCollect)
self.failureHosts = re.findall(r"\[FAILURE\] .*:.*\n", outputCollect)
for host in self.failureHosts:
if host in self.hostMapForExist.keys():
GaussLog.exitWithError(
ErrorCode.GAUSS_358["GAUSS_35807"] % host)
def initLogs(self):
"""
init log file
"""
cmd = "echo ~%s" % self.user
(status, output) = subprocess.getstatusoutput(cmd)
self.userProfile = os.path.join(output, ".bashrc")
if not os.path.isfile(self.userProfile):
raise Exception(
ErrorCode.GAUSS_502["GAUSS_50210"] % self.userProfile)
log_path = DefaultValue.getEnvironmentParameterValue("GAUSSLOG",
self.user,
self.userProfile)
self.logFile = os.path.realpath(
"%s/om/%s" % (log_path, DefaultValue.DROPNODE_LOG_FILE))
# if not absolute path
if not os.path.isabs(self.logFile):
GaussLog.exitWithError(ErrorCode.GAUSS_502["GAUSS_50213"] % "log")
self.initLogger("gs_dropnode")
self.logger.ignoreErr = True
if __name__ == "__main__":
dropNode = Dropnode()
dropNode.parseCommandLine()
dropNode.initLogs()
dropNode.check_repeat_process()
dropNode.checkParameters()
dropNode.flagForOnlyPrimaryLeft()
dropNode.checkConnection(list(dropNode.backIpNameMap.keys()),
dropNode.envFile)
dropNodeImpl = DropnodeImpl(dropNode)
dropNodeImpl.run()

View File

@ -310,6 +310,7 @@ class DefaultValue():
RESIZE_LOG_FILE = "gs_resize.log"
HOTPATCH_LOG_FILE = "gs_hotpatch.log"
EXPANSION_LOG_FILE = "gs_expansion.log"
DROPNODE_LOG_FILE = "gs_dropnode.log"
# hotpatch action
HOTPATCH_ACTION_LIST = ["load", "unload", "active", "deactive",
"info", "list"]

View File

@ -1116,6 +1116,36 @@ class ErrorCode():
}
##########################################################################
# gs_dropnode
# [GAUSS-358] : gs_dropnode failed
##########################################################################
GAUSS_358 = {
"GAUSS_35800": "[GAUSS-35800] Expansion standby node failed.",
"GAUSS_35801": "[GAUSS-35801] Empty parameter. The %s parameter is "
"missing in the command.",
"GAUSS_35802": "[GAUSS-35802] The IP list of target node: %s"
"is not in the current cluster. Please check!",
"GAUSS_35803": "[GAUSS-35803] The IP of primary node %s is in the "
"target node list. \n"
"The primary node can not be dropped! \n",
"GAUSS_35804": "[GAUSS-35804] The dropnode operation can only be executed"
" at the primary node. \n ",
"GAUSS_35805": "[GAUSS-35805] Input %s. Operation aborted. ",
"GAUSS_35806": "[GAUSS-35806] Current status of cluster is %s .\n"
"It doesn't meet the requirement.! ",
"GAUSS_35807": "[GAUSS-35807] The host %s which still exist in the "
"cluster can't be connected.\n"
"It doesn't meet the requirement.! ",
"GAUSS_35808": "[GAUSS-35808] The %s is running switchover/failover!\n"
"The dropnode operation can only be executed when there is"
" no such operation!",
"GAUSS_35809": "[GAUSS-35809] Some important steps failed to execute. "
"Please refer to log for detail!",
"GAUSS_35810": "[GAUSS-35810] A same process is already running! "
}
class OmError(BaseException):
"""

View File

@ -95,6 +95,8 @@ gs_checkos = ["-?", "--help", "-V", "--version", "-h:", "-f:", "-o:",
"-l:", "-X:"]
gs_expansion = ["-?", "--help", "-V", "--version", "-U:", "-G:", "-L",
"-X:", "-h:", "--sep-env-file="]
gs_dropnode = ["-?", "--help", "-V", "--version", "-U:", "-G:",
"-h:", "--sep-env-file="]
# gs_om child branch
gs_om_start = ["-t:", "-?", "--help", "-V", "--version", "-h:", "-I:",
@ -156,7 +158,8 @@ ParameterDict = {"preinstall": gs_preinstall,
"view": gs_om_view,
"query": gs_om_query,
"refreshconf": gs_om_refreshconf,
"expansion": gs_expansion
"expansion": gs_expansion,
"dropnode": gs_dropnode
}
# List of scripts with the -t parameter

View File

@ -0,0 +1,783 @@
# -*- coding:utf-8 -*-
#############################################################################
# Copyright (c) 2020 Huawei Technologies Co.,Ltd.
#
# openGauss is licensed under Mulan PSL v2.
# You can use this software according to the terms
# and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
#
# http://license.coscl.org.cn/MulanPSL2
#
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS,
# WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.
# ----------------------------------------------------------------------------
# Description : DropnodeImpl.py
#############################################################################
import subprocess
import sys
import re
import os
import pwd
import datetime
import grp
import socket
import stat
sys.path.append(sys.path[0] + "/../../../../")
from gspylib.threads.SshTool import SshTool
from gspylib.common.ErrorCode import ErrorCode
from gspylib.common.Common import DefaultValue, ClusterCommand
from gspylib.common.GaussLog import GaussLog
from gspylib.inspection.common.SharedFuncs import cleanFile
from gspylib.inspection.common.Exception import SQLCommandException
sys.path.append(sys.path[0] + "/../../../lib/")
DefaultValue.doConfigForParamiko()
import paramiko
# mode
MODE_PRIMARY = "primary"
MODE_STANDBY = "standby"
MODE_NORMAL = "normal"
SWITCHOVER_FILE = "/switchover"
FAILOVER_FILE = "/failover"
PROMOTE_FILE = "/promote"
# db state
STAT_NORMAL = "normal"
# master
MASTER_INSTANCE = 0
# standby
STANDBY_INSTANCE = 1
# status failed
STATUS_FAIL = "Failure"
class DropnodeImpl():
"""
class for drop a standby node.
step:
1. check whether all standby can be reached or the switchover/failover is happening
2. shutdown the program of the target node if it can be reached
3. flush the configuration on all nodes if it is still a HA cluster
4. flush the configuration on primary if it is the only one left
"""
def __init__(self, dropnode):
"""
"""
self.context = dropnode
self.user = self.context.user
self.userProfile = self.context.userProfile
self.group = self.context.group
self.backupFilePrimary = ''
self.localhostname = DefaultValue.GetHostIpOrName()
self.logger = self.context.logger
self.resultDictOfPrimary = []
self.replSlot = ''
envFile = DefaultValue.getEnv("MPPDB_ENV_SEPARATE_PATH")
if envFile:
self.envFile = envFile
else:
self.envFile = "/etc/profile"
gphomepath = DefaultValue.getEnv("GPHOME")
if gphomepath:
self.gphomepath = gphomepath
else:
(status, output) = subprocess.getstatusoutput("which gs_om")
if "no gs_om in" in output:
raise Exception(ErrorCode.GAUSS_518["GAUSS_51800"] % "$GPHOME")
self.gphomepath = os.path.normpath(output.replace("/script/gs_om",""))
self.appPath = self.context.clusterInfo.appPath
self.gsqlPath = "source %s;%s/bin/gsql" % (self.userProfile, self.appPath)
currentTime = str(datetime.datetime.now()).replace(" ", "_").replace(
".", "_")
self.dnIdForDel = []
for hostDelName in self.context.hostMapForDel.keys():
self.dnIdForDel += self.context.hostMapForDel[hostDelName]['dn_id']
self.commonOper = OperCommon(dropnode)
def changeUser(self):
if os.getuid() == 0:
user = self.user
try:
pw_record = pwd.getpwnam(user)
except Exception:
GaussLog.exitWithError(ErrorCode.GAUSS_503["GAUSS_50300"] % user)
user_name = pw_record.pw_name
user_uid = pw_record.pw_uid
user_gid = pw_record.pw_gid
env = os.environ.copy()
os.setgid(user_gid)
os.setuid(user_uid)
def checkAllStandbyState(self):
"""
check all standby state whether switchover is happening
"""
for hostNameLoop in self.context.hostMapForDel.keys():
if hostNameLoop not in self.context.failureHosts:
sshtool_host = SshTool([hostNameLoop])
for i in self.context.hostMapForDel[hostNameLoop]['datadir']:
# check whether switchover/failover is happening
if not self.commonOper.checkStandbyState(hostNameLoop, i,
sshtool_host,
self.userProfile, True):
GaussLog.exitWithError(
ErrorCode.GAUSS_358["GAUSS_35808"] % hostNameLoop)
self.commonOper.stopInstance(hostNameLoop, sshtool_host, i,
self.userProfile)
self.cleanSshToolFile(sshtool_host)
for hostNameLoop in self.context.hostMapForExist.keys():
sshtool_host = SshTool([hostNameLoop])
for i in self.context.hostMapForExist[hostNameLoop]['datadir']:
# check whether switchover/failover is happening
if not self.commonOper.checkStandbyState(hostNameLoop, i,
sshtool_host,
self.userProfile):
GaussLog.exitWithError(
ErrorCode.GAUSS_358["GAUSS_35808"] % hostNameLoop)
self.cleanSshToolFile(sshtool_host)
def dropNodeOnAllHosts(self):
"""
drop the target node on the other host
"""
for hostNameLoop in self.context.hostMapForExist.keys():
sshtool_host = SshTool([hostNameLoop])
# backup
backupfile = self.commonOper.backupConf(
self.gphomepath, self.user,
hostNameLoop, self.userProfile, sshtool_host)
self.logger.log(
"[gs_dropnode]The backup file of " + hostNameLoop + " is " + backupfile)
if hostNameLoop == self.localhostname:
self.backupFilePrimary = backupfile
indexForuse = 0
for i in self.context.hostMapForExist[hostNameLoop]['datadir']:
# parse
resultDict = self.commonOper.parseConfigFile(hostNameLoop, i,
self.dnIdForDel,
self.context.hostIpListForDel,
sshtool_host,
self.envFile)
resultDictForRollback = self.commonOper.parseBackupFile(
hostNameLoop, backupfile,
self.context.hostMapForExist[hostNameLoop][
'dn_id'][indexForuse],
resultDict['replStr'], sshtool_host,
self.envFile)
if hostNameLoop == self.localhostname:
self.resultDictOfPrimary.append(resultDict)
# try set
try:
self.commonOper.SetPgsqlConf(resultDict['replStr'],
hostNameLoop,
resultDict['syncStandbyStr'],
sshtool_host,
self.userProfile,
self.context.hostMapForExist[
hostNameLoop]['port'][
indexForuse],
'',
self.context.flagOnlyPrimary)
except ValueError:
self.logger.log("[gs_dropnode]Rollback pgsql process.")
self.commonOper.SetPgsqlConf(resultDict['replStr'],
hostNameLoop,
resultDict['syncStandbyStr'],
sshtool_host,
self.userProfile,
self.context.hostMapForExist[
hostNameLoop]['port'][
indexForuse],
resultDictForRollback[
'rollbackReplStr'])
try:
replSlot = self.commonOper.GetReplSlot(hostNameLoop,
sshtool_host, self.userProfile, self.gsqlPath,
self.context.hostMapForExist[hostNameLoop]['port'][
indexForuse])
self.commonOper.SetReplSlot(hostNameLoop, sshtool_host,
self.userProfile, self.gsqlPath,
self.context.hostMapForExist[
hostNameLoop]['port'][indexForuse
], self.dnIdForDel, replSlot)
except ValueError:
self.logger.log("[gs_dropnode]Rollback replslot")
self.commonOper.SetReplSlot(hostNameLoop, sshtool_host,
self.userProfile, self.gsqlPath,
self.context.hostMapForExist[
hostNameLoop]['port'][indexForuse
], self.dnIdForDel, replSlot, True)
indexForuse += 1
self.cleanSshToolFile(sshtool_host)
def operationOnlyOnPrimary(self):
"""
operation only need to be executed on primary node
"""
LocalhostName = self.localhostname
sshtool_host = SshTool([LocalhostName])
try:
self.commonOper.SetPghbaConf(self.userProfile, LocalhostName,
self.resultDictOfPrimary[0][
'pghbaStr'], False,
self.context.flagOnlyPrimary)
except ValueError:
self.logger.log("[gs_dropnode]Rollback pghba conf.")
self.commonOper.SetPghbaConf(self.userProfile, LocalhostName,
self.resultDictOfPrimary[0][
'pghbaStr'], True,
self.context.flagOnlyPrimary)
self.cleanSshToolFile(sshtool_host)
def modifyStaticConf(self):
"""
Modify the cluster static conf and save it
"""
self.logger.log("[gs_dropnode]Start to modify the cluster static conf.")
staticConfigPath = "%s/bin/cluster_static_config" % self.appPath
# first backup, only need to be done on primary node
tmpDir = DefaultValue.getEnvironmentParameterValue("PGHOST", self.user,
self.userProfile)
cmd = "cp %s %s/%s_BACKUP" % (
staticConfigPath, tmpDir, 'cluster_static_config')
(status, output) = subprocess.getstatusoutput(cmd)
if status:
self.logger.debug("[gs_dropnode]Backup cluster_static_config failed"
+ output)
backIpDict = self.context.backIpNameMap
backIpDict_values = list(backIpDict.values())
backIpDict_keys = list(backIpDict.keys())
for ipLoop in self.context.hostIpListForDel:
nameLoop = backIpDict_keys[backIpDict_values.index(ipLoop)]
dnLoop = self.context.clusterInfo.getDbNodeByName(nameLoop)
self.context.clusterInfo.dbNodes.remove(dnLoop)
for dbNode in self.context.clusterInfo.dbNodes:
if dbNode.name == self.localhostname:
self.context.clusterInfo.saveToStaticConfig(staticConfigPath,
dbNode.id)
continue
staticConfigPath_dn = "%s/cluster_static_config_%s" % (
tmpDir, dbNode.name)
self.context.clusterInfo.saveToStaticConfig(staticConfigPath_dn,
dbNode.id)
self.logger.debug(
"[gs_dropnode]Start to scp the cluster static conf to any other node.")
if not self.context.flagOnlyPrimary:
sshtool = SshTool(self.context.clusterInfo.getClusterNodeNames())
cmd = "%s/script/gs_om -t refreshconf" % self.gphomepath
(status, output) = subprocess.getstatusoutput(cmd)
self.logger.debug("[gs_dropnode]Output of refresh dynamic conf :%s." % output)
for hostName in self.context.hostMapForExist.keys():
hostSsh = SshTool([hostName])
if hostName != self.localhostname:
staticConfigPath_name = "%s/cluster_static_config_%s" % (
tmpDir, hostName)
hostSsh.scpFiles(staticConfigPath_name, staticConfigPath,
[hostName], self.envFile)
try:
os.unlink(staticConfigPath_name)
except FileNotFoundError:
pass
self.cleanSshToolFile(hostSsh)
self.logger.log("[gs_dropnode]End of modify the cluster static conf.")
def cleanSshToolFile(self, sshTool):
"""
"""
try:
sshTool.clenSshResultFiles()
except Exception as e:
self.logger.debug(str(e))
def checkUserAndGroupExists(self):
"""
check system user and group exists and be same
on primary and standby nodes
"""
inputUser = self.user
inputGroup = self.group
user_group_id = ""
isUserExits = False
localHost = socket.gethostname()
for user in pwd.getpwall():
if user.pw_name == self.user:
user_group_id = user.pw_gid
isUserExits = True
break
if not isUserExits:
GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35704"] \
% ("User", self.user, localHost))
isGroupExits = False
group_id = ""
for group in grp.getgrall():
if group.gr_name == self.group:
group_id = group.gr_gid
isGroupExits = True
if not isGroupExits:
GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35704"] \
% ("Group", self.group, localHost))
if user_group_id != group_id:
GaussLog.exitWithError("User [%s] is not in the group [%s]." \
% (self.user, self.group))
hostNames = list(self.context.hostMapForExist.keys())
envfile = self.envFile
sshTool = SshTool(hostNames)
# get username in the other standy nodes
getUserNameCmd = "cat /etc/passwd | grep -w %s" % inputUser
resultMap, outputCollect = sshTool.getSshStatusOutput(getUserNameCmd,
[], envfile)
for hostKey in resultMap:
if resultMap[hostKey] == STATUS_FAIL:
self.cleanSshToolFile(sshTool)
GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35704"] \
% ("User", self.user, hostKey))
# get groupname in the other standy nodes
getGroupNameCmd = "cat /etc/group | grep -w %s" % inputGroup
resultMap, outputCollect = sshTool.getSshStatusOutput(getGroupNameCmd,
[], envfile)
for hostKey in resultMap:
if resultMap[hostKey] == STATUS_FAIL:
self.cleanSshToolFile(sshTool)
GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35704"] \
% ("Group", self.group, hostKey))
self.cleanSshToolFile(sshTool)
def restartInstance(self):
if self.context.flagOnlyPrimary:
self.logger.log("[gs_dropnode]Remove the dynamic conf.")
dynamicConfigPath = "%s/bin/cluster_dynamic_config" % self.appPath
try:
os.unlink(dynamicConfigPath)
except FileNotFoundError:
pass
flag = input(
"Only one primary node is left."
"It is recommended to restart the node."
"\nDo you want to restart the primary node now (yes/no)? ")
count_f = 2
while count_f:
if (
flag.upper() != "YES"
and flag.upper() != "NO"
and flag.upper() != "Y" and flag.upper() != "N"):
count_f -= 1
flag = input("Please type 'yes' or 'no': ")
continue
break
if count_f == 0:
GaussLog.exitWithError(
ErrorCode.GAUSS_358["GAUSS_35805"] % flag.upper())
if flag.upper() == "NO" or flag.upper() == "N":
GaussLog.exitWithError(
ErrorCode.GAUSS_358["GAUSS_35805"] % flag.upper())
sshTool = SshTool([self.localhostname])
for i in self.context.hostMapForExist[self.localhostname]['datadir']:
self.commonOper.stopInstance(self.localhostname, sshTool, i,
self.userProfile)
self.commonOper.startInstance(i, self.userProfile)
self.cleanSshToolFile(sshTool)
else:
pass
def run(self):
"""
start dropnode
"""
self.changeUser()
self.checkUserAndGroupExists()
self.logger.log("[gs_dropnode]Start to drop nodes of the cluster.")
self.checkAllStandbyState()
self.dropNodeOnAllHosts()
self.operationOnlyOnPrimary()
self.modifyStaticConf()
self.restartInstance()
self.logger.log("[gs_dropnode]Success to drop the target nodes.")
class OperCommon:
def __init__(self, dropnode):
"""
"""
self.logger = dropnode.logger
self.user = dropnode.user
def checkStandbyState(self, host, dirDn, sshTool, envfile, isForDel=False):
"""
check the existed standby node state
Exit if the role is not standby or the state of database is not normal
"""
sshcmd = "gs_ctl query -D %s" % dirDn
(statusMap, output) = sshTool.getSshStatusOutput(sshcmd, [host],
envfile)
if 'Is server running?' in output and not isForDel:
return False
if 'Is server running?' in output and isForDel:
return True
res = re.findall(r'db_state\s*:\s*(\w+)', output)
dbState = res[0]
if dbState in ['Promoting', 'Wait', 'Demoting']:
return False
return True
def backupConf(self, appPath, user, host, envfile, sshTool):
"""
backup the configuration file (postgresql.conf and pg_hba.conf)
The Backup.py can do this
"""
self.logger.log(
"[gs_dropnode]Start to backup parameter config file on %s." % host)
tmpPath = '/tmp/gs_dropnode_backup' + \
str(datetime.datetime.now().strftime('%Y%m%d%H%M%S'))
backupPyPath = os.path.join(appPath, '../om/script/local/Backup.py')
cmd = "(find /tmp -type d | grep gs_dropnode_backup | xargs rm -rf;" \
"if [ ! -d '%s' ]; then mkdir -p '%s' -m %s;fi)" \
% (tmpPath, tmpPath, DefaultValue.KEY_DIRECTORY_MODE)
sshTool.executeCommand(cmd, "", DefaultValue.SUCCESS, [host], envfile)
logfile = os.path.join(tmpPath, 'gs_dropnode_call_Backup_py.log')
cmd = "python3 %s -U %s -P %s -p --nodeName=%s -l %s" \
% (backupPyPath, user, tmpPath, host, logfile)
(statusMap, output) = sshTool.getSshStatusOutput(cmd, [host], envfile)
if statusMap[host] != 'Success':
self.logger.debug(
"[gs_dropnode]Backup parameter config file failed." + output)
GaussLog.exitWithError(ErrorCode.GAUSS_358["GAUSS_35809"])
self.logger.log(
"[gs_dropnode]End to backup parameter config file on %s." % host)
return '%s/parameter_%s.tar' % (tmpPath, host)
def parseConfigFile(self, host, dirDn, dnId, hostIpListForDel, sshTool,
envfile):
"""
parse the postgresql.conf file and get the replication info
"""
self.logger.log(
"[gs_dropnode]Start to parse parameter config file on %s." % host)
resultDict = {'replStr': '', 'syncStandbyStr': '', 'pghbaStr': ''}
pgConfName = os.path.join(dirDn, 'postgresql.conf')
pghbaConfName = os.path.join(dirDn, 'pg_hba.conf')
cmd = "grep -o '^replconninfo.*' %s | egrep -o '^replconninfo.*'" \
% pgConfName
(statusMap, output1) = sshTool.getSshStatusOutput(cmd, [host], envfile)
if statusMap[host] != 'Success':
self.logger.debug("[gs_dropnode]Parse replconninfo failed:" + output1)
GaussLog.exitWithError(ErrorCode.GAUSS_358["GAUSS_35809"])
cmd = "grep -o '^synchronous_standby_names.*' %s" % pgConfName
(statusMap, output) = sshTool.getSshStatusOutput(cmd, [host], envfile)
if statusMap[host] != 'Success':
self.logger.debug(
"[gs_dropnode]Parse synchronous_standby_names failed:" + output)
GaussLog.exitWithError(ErrorCode.GAUSS_358["GAUSS_35809"])
output_v = output.split("'")[-2]
if output_v == '*':
resultDict['syncStandbyStr'] = output_v
else:
resultDict['syncStandbyStr'] = self.check_syncStandbyStr(dnId,
output_v)
cmd = "grep '^host.*trust' %s" % pghbaConfName
(statusMap, output) = sshTool.getSshStatusOutput(cmd, [host], envfile)
if statusMap[host] != 'Success':
self.logger.debug("[gs_dropnode]Parse pg_hba file failed:" + output)
for ip in hostIpListForDel:
if ip in output1:
i = output1.rfind('replconninfo', 0, output1.find(ip)) + 12
resultDict['replStr'] += output1[i]
if ip in output:
s = output.rfind('host', 0, output.find(ip))
e = output.find('\n', output.find(ip), len(output))
resultDict['pghbaStr'] += output[s:e] + '|'
self.logger.log(
"[gs_dropnode]End to parse parameter config file on %s." % host)
return resultDict
def check_syncStandbyStr(self, dnlist, output):
output_no = '0'
output_result = output
output_new_no = '1'
if 'ANY' in output or 'FIRST' in output:
output_dn = re.findall(r'\((.*)\)', output)[0]
output_no = re.findall(r'.*(\d) *\(.*\)', output)[0]
else:
output_dn = output
output_dn_nospace = re.sub(' *', '', output_dn)
init_no = len(output_dn_nospace.split(','))
quorum_no = int(init_no / 2) + 1
half_no = quorum_no - 1
count_dn = 0
list_output1 = '*'
for dninst in dnlist:
if dninst in output_dn_nospace:
list_output1 = output_dn_nospace.split(',')
list_output1.remove(dninst)
list_output1 = ','.join(list_output1)
init_no -= 1
count_dn += 1
if count_dn == 0 or list_output1 == '':
return ''
if list_output1 != '*':
output_result = output.replace(output_dn, list_output1)
if output_no == '0':
return output_result
if int(output_no) == quorum_no:
output_new_no = str(int(init_no / 2) + 1)
output_result = output_result.replace(output_no, output_new_no, 1)
return output_result
elif int(output_no) > half_no and (int(output_no) - count_dn) > 0:
output_new_no = str(int(output_no) - count_dn)
elif int(output_no) > half_no and (int(output_no) - count_dn) <= 0:
output_new_no = '1'
elif int(output_no) < half_no and int(output_no) <= init_no:
output_new_no = output_no
elif half_no > int(output_no) > init_no:
output_new_no = str(init_no)
output_result = output_result.replace(output_no, output_new_no, 1)
return output_result
def parseBackupFile(self, host, backupfile, dnId, replstr, sshTool,
envfile):
"""
parse the backup file eg.parameter_host.tar to get the value for rollback
"""
self.logger.log(
"[gs_dropnode]Start to parse backup parameter config file on %s." % host)
resultDict = {'rollbackReplStr': '', 'syncStandbyStr': ''}
backupdir = os.path.dirname(backupfile)
cmd = "tar xf %s -C %s;grep -o '^replconninfo.*' %s/%s/%s_postgresql.conf;" \
"grep -o '^synchronous_standby_names.*' %s/%s/%s_postgresql.conf;" \
% (
backupfile, backupdir, backupdir, 'parameter_' + host, dnId[3:],
backupdir, 'parameter_' + host, dnId[3:])
(statusMap, output) = sshTool.getSshStatusOutput(cmd, [host], envfile)
if statusMap[host] != 'Success':
self.logger.log(
"[gs_dropnode]Parse backup parameter config file failed:" + output)
GaussLog.exitWithError(ErrorCode.GAUSS_358["GAUSS_35809"])
for i in replstr:
tmp_v = 'replconninfo' + i
s = output.index(tmp_v)
e = output.find('\n', s, len(output))
resultDict['rollbackReplStr'] += output[s:e].split("'")[-2] + '|'
s = output.index('synchronous_standby_names')
resultDict['syncStandbyStr'] = output[s:].split("'")[-2]
self.logger.log(
"[gs_dropnode]End to parse backup parameter config file %s." % host)
return resultDict
def SetPgsqlConf(self, replNo, host, syncStandbyValue, sshTool, envfile,
port, replValue='', singleLeft=False):
"""
Set the value of postgresql.conf
"""
self.logger.log(
"[gs_dropnode]Start to set postgresql config file on %s." % host)
sqlExecFile = '/tmp/gs_dropnode_sqlExecFile_' + \
str(datetime.datetime.now().strftime(
'%Y%m%d%H%M%S')) + host
checkResultFile = '/tmp/gs_dropnode_sqlResultFile_' + \
str(datetime.datetime.now().strftime(
'%Y%m%d%H%M%S')) + host
sqlvalue = ''
if not replValue and replNo != '':
for i in replNo:
sqlvalue += "ALTER SYSTEM SET replconninfo%s = '';" % i
if len(replValue) > 0:
count = 0
for i in replNo:
sqlvalue += "ALTER SYSTEM SET replconninfo%s = '%s';" % (
i, replValue[:-1].split('|')[count])
count += 1
if not singleLeft and syncStandbyValue != '':
sqlvalue += "ALTER SYSTEM SET synchronous_standby_names = '%s';" \
% syncStandbyValue
if singleLeft:
sqlvalue += "ALTER SYSTEM SET synchronous_standby_names = '';"
if sqlvalue != '':
cmd = "touch %s && chmod %s %s" % \
(sqlExecFile, DefaultValue.MAX_DIRECTORY_MODE, sqlExecFile)
(status, output) = subprocess.getstatusoutput(cmd)
if status != 0:
self.logger.log(
"[gs_dropnode]Create the SQL command file failed:" + output)
GaussLog.exitWithError(ErrorCode.GAUSS_358["GAUSS_35809"])
try:
with os.fdopen(
os.open("%s" % sqlExecFile, os.O_WRONLY | os.O_CREAT,
stat.S_IWUSR | stat.S_IRUSR), 'w') as fo:
fo.write(sqlvalue)
fo.close()
except Exception as e:
cleanFile(sqlExecFile)
raise SQLCommandException(sqlExecFile,
"write into sql query file failed. "
+ str(e))
self.logger.debug(
"[gs_dropnode]Start to send the SQL command file to all hosts.")
sshTool.scpFiles(sqlExecFile, '/tmp', [host])
cmd = "gsql -p %s -d postgres -f %s --output %s;cat %s" % (
port, sqlExecFile, checkResultFile, checkResultFile)
(statusMap, output) = sshTool.getSshStatusOutput(cmd, [host], envfile)
if "ERROR" in output:
self.logger.debug(
"[gs_dropnode]Failed to execute the SQL command file on all "
"hosts:" + output)
raise ValueError(output)
cmd = "ls /tmp/gs_dropnode_sql* | xargs rm -rf"
sshTool.executeCommand(cmd, "", DefaultValue.SUCCESS, [host], envfile)
try:
os.unlink(sqlExecFile)
os.unlink(checkResultFile)
except FileNotFoundError:
pass
self.logger.log(
"[gs_dropnode]End of set postgresql config file on %s." % host)
def SetPghbaConf(self, envProfile, host, pgHbaValue,
flagRollback=False, flagPrimayOnly=False):
"""
Set the value of pg_hba.conf
"""
self.logger.log(
"[gs_dropnode]Start of set pg_hba config file on %s." % host)
cmd = 'source %s;' % envProfile
if not flagPrimayOnly:
hostPara = 'all'
if flagPrimayOnly:
hostPara = host
if len(pgHbaValue):
if not flagRollback:
for i in pgHbaValue[:-1].split('|'):
v = i[0:i.find('/32') + 3]
cmd += "gs_guc set -N %s -I all -h '%s';" % (hostPara, v)
if flagRollback:
for i in pgHbaValue[:-1].split('|'):
cmd += "gs_guc set -N %s -I all -h '%s';" \
% (hostPara, i.strip())
(status, output) = subprocess.getstatusoutput(cmd)
result_v = re.findall(r'Failed instances: (\d)\.', output)
if status:
self.logger.debug(
"[gs_dropnode]Set pg_hba config file failed:" + output)
raise ValueError(output)
if len(result_v):
if result_v[0] != '0':
self.logger.debug(
"[gs_dropnode]Set pg_hba config file failed:" + output)
raise ValueError(output)
else:
self.logger.debug(
"[gs_dropnode]Set pg_hba config file failed:" + output)
raise ValueError(output)
else:
self.logger.log(
"[gs_dropnode]Nothing need to do with pg_hba config file.")
self.logger.log(
"[gs_dropnode]End of set pg_hba config file on %s." % host)
def GetReplSlot(self, host, sshTool, envfile, gsqlPath, port):
"""
Get the replication slot on primary node only
"""
self.logger.log("[gs_dropnode]Start to get repl slot on primary node.")
selectSQL = "SELECT slot_name,plugin,slot_type FROM pg_replication_slots;"
querycmd = "%s -p %s postgres -A -t -c '%s'" % (gsqlPath, port, selectSQL)
(status, output) = sshTool.getSshStatusOutput(querycmd, [host], envfile)
if status[host] != 'Success' or "ERROR" in output:
self.logger.debug(
"[gs_dropnode]Get repl slot failed:" + output)
GaussLog.exitWithError(ErrorCode.GAUSS_358["GAUSS_35809"])
return ','.join(output.split('\n')[1:])
def SetReplSlot(self, host, sshTool, envfile, gsqlPath, port, dnid, replslot_output, flagRollback=False):
"""
Drop the replication slot on primary node only
"""
self.logger.log("[gs_dropnode]Start to set repl slot on primary node.")
setcmd = ''
if not flagRollback:
for i in dnid:
if i in replslot_output:
setcmd += "%s -p %s postgres -A -t -c \\\"SELECT pg_drop_" \
"replication_slot('%s');\\\";" % \
(gsqlPath, port, i)
if flagRollback:
list_o = [i.split('|') for i in replslot_output.split(',')]
for r in list_o:
if r[0] in dnid and r[2] == 'physical':
setcmd += "%s -p %s postgres -A -t -c \\\"SELECT * FROM " \
"pg_create_physical_replication_slot('%s', false);\\\";" % \
(gsqlPath, port, r[0])
elif r[0] in dnid and r[2] == 'logical':
setcmd += "%s -p %s postgres -A -t -c \\\"SELECT * FROM " \
"pg_create_logical_replication_slot('%s', '%s');\\\";" % \
(gsqlPath, port, r[0], r[1])
if setcmd != '':
if host == DefaultValue.GetHostIpOrName():
setcmd = setcmd.replace("\\", '')
(status, output) = sshTool.getSshStatusOutput(setcmd, [host], envfile)
if status[host] != 'Success' or "ERROR" in output:
self.logger.debug("[gs_dropnode]Set repl slot failed:" + output)
raise ValueError(output)
self.logger.log("[gs_dropnode]End of set repl slot on primary node.")
def SetSyncCommit(self, dirDn):
"""
Set the synccommit to local when only primary server be left
"""
self.logger.log("[gs_dropnode]Start to set sync_commit on primary node.")
command = "gs_guc set -D %s -c 'synchronous_commit = local'" % dirDn
(status, output) = subprocess.getstatusoutput(command)
if status or '0' not in re.findall(r'Failed instances: (\d)\.', output):
self.logger.debug("[gs_dropnode]Set sync_commit failed:" + output)
raise ValueError(output)
self.logger.log("[gs_dropnode]End of set sync_commit on primary node.")
def stopInstance(self, host, sshTool, dirDn, env):
"""
"""
self.logger.log("[gs_dropnode]Start to stop the target node %s." % host)
command = "source %s ; gs_ctl stop -D %s -M immediate" % (env, dirDn)
resultMap, outputCollect = sshTool.getSshStatusOutput(command, [host],
env)
self.logger.debug(host)
self.logger.debug(outputCollect)
self.logger.log("[gs_dropnode]End of stop the target node %s." % host)
def startInstance(self, dirDn, env):
"""
"""
self.logger.log("[gs_dropnode]Start to start the target node.")
start_retry_num = 1
command = "source %s ; gs_ctl start -D %s" % (env, dirDn)
while start_retry_num <= 3:
(status, output) = subprocess.getstatusoutput(command)
self.logger.debug(output)
if 'done' in output and 'server started' in output:
self.logger.log("[gs_dropnode]End of start the target node.")
break
else:
self.logger.debug("[gs_dropnode]Failed to start the node.")
GaussLog.exitWithError(ErrorCode.GAUSS_358["GAUSS_35809"])
start_retry_num += 1