openGauss-server/contrib/log_fdw/log_fdw.cpp

3292 lines
111 KiB
C++
Executable File

/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* log_fdw.cpp
* foreign-data wrapper for accessing log data
*
*
* IDENTIFICATION
* contrib/log_fdw/log_fdw.cpp
*
* ---------------------------------------------------------------------------------------
*/
#include "postgres.h"
#include "knl/knl_variable.h"
#include <arpa/inet.h>
#include <netinet/in.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include "log_fdw_private.h"
#include "log_fdw.h"
#include "access/reloptions.h"
#include "catalog/pg_foreign_server.h"
#include "catalog/pg_foreign_table.h"
#include "catalog/pg_type.h"
#include "catalog/pg_collation.h"
#include "catalog/pg_user_mapping.h"
#include "catalog/pgxc_node.h"
#include "commands/defrem.h"
#include "commands/explain.h"
#include "foreign/fdwapi.h"
#include "foreign/foreign.h"
#include "funcapi.h"
#include "getaddrinfo.h"
#include "libpq/ip.h"
#include "nodes/makefuncs.h"
#include "optimizer/cost.h"
#include "optimizer/pathnode.h"
#include "optimizer/planmain.h"
#include "optimizer/restrictinfo.h"
#include "optimizer/predtest.h"
#include "optimizer/var.h"
#include "parser/parse_type.h"
#include "postmaster/syslogger.h"
#include "storage/lock/lock.h"
#include "utils/acl.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
#include "utils/palloc.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/relcache.h"
#include "utils/syscache.h"
#include "utils/snapmgr.h"
#include "utils/zfiles.h"
#include "miscadmin.h"
#include "zlib.h"
#include "unzip.h"
#include "ioapi.h"
static const char* valid_log_type[] = {"pg_log", "gs_profile"};
extern Datum get_hostname(PG_FUNCTION_ARGS);
/* SQL functions version */
PG_FUNCTION_INFO_V1(log_fdw_handler);
PG_FUNCTION_INFO_V1(log_fdw_validator);
/* macro for timing */
#define PERF_TRACE(MACRO_FUNC, DESC_TAG) \
do { \
if (unlikely(festate->m_need_timing)) { \
MACRO_FUNC(festate->m_plan_node_id, DESC_TAG); \
} \
} while (0)
/*
* FDW callback routines
*/
static bool match_dir_name(const char* subdir_name);
static bool pglog_fname_match(const char* log_name);
static void estimate_costs(
PlannerInfo* root, RelOptInfo* baserel, logFdwPlanState* fdw_private, Cost* startup_cost, Cost* total_cost);
static void profilelog_begin_fs(ForeignScanState* node, int eflags);
static TupleTableSlot* profilelog_iterate_fs(ForeignScanState* node);
static void profilelog_end_fs(ForeignScanState* node);
static void profilelog_rescan_fs(ForeignScanState* node);
static void profilelog_explain_fs(ForeignScanState* node, ExplainState* es);
/*
*----------------------------------------------------------
* Static and Common Utils Area
*----------------------------------------------------------
*/
static inline Datum get_hostname_text(void)
{
/* get hostname */
FunctionCallInfo fcinfo = NULL;
return get_hostname(fcinfo);
}
static text* fetch_dirname_from_logfile(const char* logfile)
{
size_t len = strlen(logfile);
text* dirname = NULL;
char* slash1 = (char*)memrchr(logfile, '/', len);
if (slash1 && '\0' != *(slash1 + 1)) {
/* skip the first '/' */
--slash1;
char* slash2 = (char*)memrchr(logfile, '/', (slash1 - logfile));
int dirlen = 0;
if (slash2 && '\0' != *(slash2 + 1)) {
dirlen = slash1 - slash2; /* not include tail 0 */
dirname = cstring_to_text_with_len(slash2 + 1, dirlen);
} else {
/* used for relative log path */
dirlen = ++slash1 - logfile;
dirname = cstring_to_text_with_len(logfile, dirlen);
}
} else {
ereport(
ERROR, (errcode(ERRCODE_UNDEFINED_FILE), errmsg("not found the first '/' in file path \"%s\" ", logfile)));
}
return dirname;
}
static text* fetch_filename_from_logfile(const char* logfile)
{
size_t len = strlen(logfile);
text* filename = NULL;
char* slash1 = (char*)memrchr(logfile, '/', len);
if (slash1 && '\0' != *(slash1 + 1)) {
filename = cstring_to_text_with_len(slash1 + 1, len - (slash1 - logfile + 1));
} else {
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_FILE), errmsg("not found '/' in file path \"%s\" ", logfile)));
}
return filename;
}
static inline text* fetch_dirname_from_path(const char* dpath)
{
return fetch_filename_from_logfile(dpath);
}
static void set_io_functions(logFdwPlanState* festate)
{
size_t len = strlen(festate->logfile_info->name);
if ((len > 3) && 0 == pg_strncasecmp(".gz", (festate->logfile_info->name + (len - 3)), 3)) {
/* GZ IO functions */
festate->m_open = gz_open;
festate->m_read = gz_read;
festate->m_close = gz_close;
return;
}
if ((len > 4) && 0 == pg_strncasecmp(".zip", (festate->logfile_info->name + (len - 4)), 4)) {
/* ZIP IO functions */
festate->m_open = unzip_open;
festate->m_read = unzip_read;
festate->m_close = unzip_close;
return;
}
/* default IO functions, for example '.log' */
festate->m_open = vfd_file_open;
festate->m_read = vfd_file_read;
festate->m_close = vfd_file_close;
}
/*
*----------------------------------------------------------
* Filter and Refuted Area
*
* 1) all target attrno list don't touch log data, which means disk IO never happens.
* 2) refuted by hostname info
* 3) refuted by dirname info
* 4) refuted by log file name info
* 5) refuted by log file CREATE and MODIFY time info
*----------------------------------------------------------
*/
static bool target_list_need_log_data(ForeignScanState* node)
{
/* targetlist is null when COUNT(*) occurs, need to load log data */
if (NULL == node->ss.ps.targetlist) {
return true;
}
List* all_attnos = GetAccessedVarnoList(node->ss.ps.targetlist, (node->ss.ps.plan ? node->ss.ps.plan->qual : NULL));
ListCell* attno_cell = NULL;
bool need_logdata = false;
/*
* if all the attributes both in SELECT and WHERE target list are in
* { hostname, dirname, filename }, then needn't to open/read/close
* log files, in order bo avoid disk IO.
*/
foreach (attno_cell, all_attnos) {
AttrNumber attno = (AttrNumber)lfirst_int(attno_cell);
if (attno > PGLOG_ATTR_HOSTNAME + 1) {
need_logdata = true;
break;
}
}
list_free(all_attnos);
all_attnos = NIL;
return need_logdata;
}
static void fillup_need_check_flags(ForeignScanState* node, FdwLogType logypte)
{
static const AttrNumber checked_attrs[FLT_UNKNOWN][4] = {/* fields: dirname, filename, hostname, logtime */
{PGLOG_ATTR_DIRNAME, PGLOG_ATTR_FILENAME, PGLOG_ATTR_HOSTNAME, PGLOG_ATTR_LOGTIME},
{PROFILELOG_ATTR_DIRNAME, PROFILELOG_ATTR_FILENAME, PROFILELOG_ATTR_HOSTNAME, PROFILELOG_ATTR_LOGTIME}};
/* only care all the attrno from WHERE clause */
List* qual = node->ss.ps.plan ? node->ss.ps.plan->qual : NULL;
if (NULL == qual) {
return;
}
logFdwPlanState* festate = (logFdwPlanState*)node->fdw_state;
List* vars = pull_var_clause((Node*)qual, PVC_RECURSE_AGGREGATES, PVC_RECURSE_PLACEHOLDERS);
ListCell* var_cell = NULL;
/*
* because it's easy to make a wrong Var for different columns of different types,
* so we remember them directly. and they will be used to filter optimization.
*/
foreach (var_cell, vars) {
Var* var = (Var*)lfirst(var_cell);
if ((checked_attrs[logypte][0] + 1) == var->varattno) {
festate->m_need_check_dirname = true;
festate->m_tmp_dirname_var = *var;
} else if ((checked_attrs[logypte][1] + 1) == var->varattno) {
festate->m_need_check_logname = true;
festate->m_tmp_logname_var = *var;
} else if ((checked_attrs[logypte][2] + 1) == var->varattno) {
festate->m_need_check_hostname = true;
festate->m_tmp_hostname_var = *var;
} else if ((checked_attrs[logypte][3] + 1) == var->varattno) {
festate->m_need_check_logtime = true;
festate->m_tmp_logtime_var = *var;
}
}
if (festate->m_need_check_logtime) {
/* use the now time as the biggest MODIFY time */
festate->m_last_modify_tm_maxval = time(NULL);
}
}
static Oid get_operator_by_typeid(Oid typeID, Oid accessMethodId, int16 strategyNumber)
{
/* Get default operator class from pg_opclass */
Oid operatorClassId = GetDefaultOpClass(typeID, accessMethodId);
if (InvalidOid == operatorClassId) {
ereport(ERROR,
(errcode(ERRCODE_CASE_NOT_FOUND), errmodule(MOD_DFS), errmsg("Invalid Oid for operator %d.", typeID)));
}
Oid operatorFamily = get_opclass_family(operatorClassId);
Oid operatorId = get_opfamily_member(operatorFamily, typeID, typeID, strategyNumber);
return operatorId;
}
static OpExpr* make_operator_expression(Var* variable, int16 strategyNumber)
{
Oid typeID = variable->vartype;
Oid typeModId = variable->vartypmod;
Oid collationId = variable->varcollid;
Oid accessMethodId = BTREE_AM_OID;
Oid OpId = InvalidOid;
Const* ConstValue = NULL;
OpExpr* expr = NULL;
Expr* leftop = (Expr*)variable;
/* Loading the operator from catalogs */
ConstValue = makeNullConst(typeID, typeModId, collationId);
OpId = get_operator_by_typeid(typeID, accessMethodId, strategyNumber);
/* Build the expression with the given variable and a null constant */
expr = (OpExpr*)make_opclause(OpId,
InvalidOid, /* no result type yet */
false, /* no return set */
leftop,
(Expr*)ConstValue,
InvalidOid,
collationId);
/* Build up implementing function id and result type */
expr->opfuncid = get_opcode(OpId);
expr->opresulttype = get_func_rettype(expr->opfuncid);
return expr;
}
static Node* build_const_constraint(Expr* equalExpr, Datum value, bool isNull)
{
Const* constant = (Const*)get_rightop(equalExpr);
Assert(NULL != constant);
constant->constvalue = value;
constant->constisnull = isNull ? true : false;
return (Node*)equalExpr;
}
static bool whether_refuted_by_range(
ForeignScanState* node, Var* var_node, int range_column, Datum const_v1, Datum const_v2)
{
Var tmp_node = *var_node;
/* CREATE time <= range < next CREATE time */
OpExpr* great_than = make_operator_expression(&tmp_node, BTGreaterEqualStrategyNumber);
OpExpr* less_than = make_operator_expression(&tmp_node, BTLessStrategyNumber);
Node* min_node = get_rightop((Expr*)great_than);
Assert(IsA(min_node, Const));
Const* min_constant = (Const*)min_node;
min_constant->constvalue = const_v1;
min_constant->constisnull = false;
Node* max_node = get_rightop((Expr*)less_than);
Assert(IsA(max_node, Const));
Const* max_constant = (Const*)max_node;
max_constant->constvalue = const_v2;
max_constant->constisnull = false;
Node* range_constraint = make_and_qual((Node*)less_than, (Node*)great_than);
List* base_restriction = lappend(NIL, range_constraint);
List* qual = node->ss.ps.plan->qual;
Assert(node->ss.ps.plan && node->ss.ps.plan->qual);
bool refuted = predicate_refuted_by(base_restriction, qual, true);
list_free(base_restriction);
base_restriction = NIL;
pfree_ext(great_than);
pfree_ext(less_than);
return refuted;
}
static bool whether_refuted_by(ForeignScanState* node, Var* var_node, int text_column, Datum const_val)
{
Var tmp_node = *var_node;
Expr* expr_node = (Expr*)make_operator_expression(&tmp_node, BTEqualStrategyNumber);
Node* const_constraint = build_const_constraint(expr_node, const_val, false);
List* base_restriction = lappend(NIL, const_constraint);
List* qual = node->ss.ps.plan->qual;
Assert(node->ss.ps.plan && node->ss.ps.plan->qual);
bool refuted = predicate_refuted_by(base_restriction, qual, true);
/* free memory */
list_free(base_restriction);
base_restriction = NIL;
pfree_ext(expr_node);
return refuted;
}
static bool whether_refuted_by_hostname(ForeignScanState* node)
{
logFdwPlanState* festate = (logFdwPlanState*)node->fdw_state;
/* check whether refuted by my hostname */
bool refuted =
whether_refuted_by(node, &(festate->m_tmp_hostname_var), PGLOG_ATTR_HOSTNAME, festate->m_tmp_hostname_text);
return refuted;
}
static bool whether_refuted_by_dirname(ForeignScanState* node, Datum dirname)
{
logFdwPlanState* festate = (logFdwPlanState*)node->fdw_state;
return whether_refuted_by(node, &(festate->m_tmp_dirname_var), PGLOG_ATTR_DIRNAME, dirname);
}
static bool whether_refuted_by_logname(ForeignScanState* node, Datum fname)
{
logFdwPlanState* festate = (logFdwPlanState*)node->fdw_state;
return whether_refuted_by(node, &(festate->m_tmp_logname_var), PGLOG_ATTR_FILENAME, fname);
}
static bool whether_refuted_by_logtime(ForeignScanState* node, log_file_info* logfile)
{
/* convert "time_t" to "timestampe with time zone" value and compare */
TimestampTz create_t = time_t_to_timestamptz((pg_time_t)logfile->tm_create);
TimestampTz next_create_t = time_t_to_timestamptz((pg_time_t)logfile->tm_last_modify);
logFdwPlanState* festate = (logFdwPlanState*)node->fdw_state;
return whether_refuted_by_range(node,
&(festate->m_tmp_logtime_var),
PGLOG_ATTR_LOGTIME,
TimestampTzGetDatum(create_t),
TimestampTzGetDatum(next_create_t));
}
static List* filter_dirname_list(ForeignScanState* node, List* dir_list)
{
ListCell* dir_cell = NULL;
ListCell* dir_prev_cell = NULL;
ListCell* dir_next_cell = NULL;
uint32 refuted_num = 0;
for (dir_cell = list_head(dir_list); dir_cell; dir_cell = dir_next_cell) {
/* remember the next valid cell */
dir_next_cell = lnext(dir_cell);
/* dpath is the absolute path, not the name of its directory */
char* dpath = (char*)lfirst(dir_cell);
/* keep the same TEXT datum format with WHERE clause */
text* dname = fetch_dirname_from_path(dpath);
if (whether_refuted_by_dirname(node, PointerGetDatum(dname))) {
/* ignore this directory path */
dir_list = list_delete_cell(dir_list, dir_cell, dir_prev_cell);
++refuted_num;
/* don't change file_prev_cell */
} else {
dir_prev_cell = dir_cell;
}
pfree_ext(dname);
}
/* remember the number of refuted dirname */
logFdwPlanState* festate = (logFdwPlanState*)node->fdw_state;
festate->m_refuted_dirname_num += refuted_num;
return dir_list;
}
static List* filter_logname_list_by_name(ForeignScanState* node, List* fname_list)
{
ListCell* file_cell = NULL;
ListCell* file_next_cell = NULL;
ListCell* file_prev_cell = NULL;
uint32 refuted_num_byname = 0;
for (file_cell = list_head(fname_list); file_cell; file_cell = file_next_cell) {
/* remember the next valid cell */
file_next_cell = lnext(file_cell);
/* file_info->name is the absolute path, not the name of its directory */
log_file_info* file_info = (log_file_info*)lfirst(file_cell);
/* keep the same TEXT datum format with WHERE clause */
text* fname = fetch_filename_from_logfile(file_info->name);
if (whether_refuted_by_logname(node, PointerGetDatum(fname))) {
/* ignore this log file */
fname_list = list_delete_cell(fname_list, file_cell, file_prev_cell);
++refuted_num_byname;
/* don't change file_prev_cell */
} else {
file_prev_cell = file_cell;
}
pfree_ext(fname);
}
/* remember the number of refuted log file */
logFdwPlanState* festate = (logFdwPlanState*)node->fdw_state;
festate->m_refuted_logfile_num_by_name += refuted_num_byname;
return fname_list;
}
static List* filter_logname_list_by_time(ForeignScanState* node, List* fname_list)
{
logFdwPlanState* festate = (logFdwPlanState*)node->fdw_state;
ListCell* cell = NULL;
ListCell* next_cell = NULL;
ListCell* prev_cell = NULL;
uint32 refuted_num_bytime = 0;
for (cell = list_head(fname_list); cell; cell = next_cell) {
/* remember the next valid cell */
next_cell = lnext(cell);
log_file_info* file_info = (log_file_info*)lfirst(cell);
if (NULL != next_cell) {
/* delay to set MODIFY time by using CREATE time of next log file */
log_file_info* next_file = (log_file_info*)lfirst(next_cell);
file_info->tm_last_modify = next_file->tm_create;
} else {
/* set now time info */
file_info->tm_last_modify = festate->m_last_modify_tm_maxval;
}
if (whether_refuted_by_logtime(node, file_info)) {
fname_list = list_delete_cell(fname_list, cell, prev_cell);
++refuted_num_bytime;
} else {
prev_cell = cell;
}
}
/* remember the number of refuted log file */
festate->m_refuted_logfile_num_by_time += refuted_num_bytime;
return fname_list;
}
/*
*----------------------------------------------------------
* Directory Task Assignment Area
*----------------------------------------------------------
*/
/* compare function for string type */
static int strcmp_wrapper(const void* s1, const void* s2)
{
const char* str1 = *(char**)s1;
const char* str2 = *(char**)s2;
return strcmp((const char*)str1, (const char*)str2);
}
/* compare function for string-node type */
static int str_node_cmp(const void* s1, const void* s2)
{
Value* v1 = *(Value**)s1;
Value* v2 = *(Value**)s2;
return strcmp(v1->val.str, v2->val.str);
}
/* compare function for log access time */
static int cmp_latest_with_create_time(const void* s1, const void* s2)
{
log_file_info* log1 = *(log_file_info**)s1;
log_file_info* log2 = *(log_file_info**)s2;
/* make the newest file in the front */
return (log2->tm_create - log1->tm_create);
}
static int rcmp_latest_with_create_time(const void* s1, const void* s2)
{
return cmp_latest_with_create_time(s2, s1);
}
static void make_string_list_sorted(List* str_list, cmp_func cmp)
{
int len = list_length(str_list);
if (0 == len || 1 == len) {
return; /* empty or single list */
}
void** ptr_array = (void**)palloc(len * sizeof(char*));
ListCell* cell = NULL;
/* contruct a temp list to sort */
int idx = 0;
foreach (cell, str_list) {
ptr_array[idx++] = (void*)lfirst(cell);
}
/* do sorting */
qsort(ptr_array, len, sizeof(void*), cmp);
/* write back to make this list sorted */
idx = 0;
foreach (cell, str_list) {
lfirst(cell) = ptr_array[idx++];
}
/* free this temp array */
pfree_ext(ptr_array);
}
/*
* extract log file's CREATE time from its file name.
*
* log file name is like: postgresql-2018-06-19_203948.log
* ^ ^ ^ ^ ^ ^
* | | | | | |
* 11 16 19 22 24 26
*/
static time_t extract_create_time_from_log_fname(const char* fname)
{
time_t tm = 0;
char* filename = (char*)pstrdup(fname);
char c = 0;
struct tm tmp_time;
int ret = memset_s(&tmp_time, sizeof(struct tm), 0, sizeof(struct tm));
securec_check(ret, "\0", "\0");
/* replace all chars of '-' or '_' in file name with '\0' */
filename[15] = filename[18] = filename[21] = '\0';
/* year data whose offset is 11 */
tmp_time.tm_year = atoi(filename + 11);
/* The number of years since 1900 */
tmp_time.tm_year -= 1900;
/* month data whose offset is 16 */
tmp_time.tm_mon = atoi(filename + 16);
/* The number of months since January, in the range 0 to 11. */
tmp_time.tm_mon -= 1;
/* day data whose offset is 19 */
tmp_time.tm_mday = atoi(filename + 19);
/* hour data whose offset is 22 and length is 2 */
c = filename[24];
filename[24] = '\0';
tmp_time.tm_hour = atoi(filename + 22);
filename[24] = c;
/* minute data whose offset is 24 and length is 2 */
c = filename[26];
filename[26] = '\0';
tmp_time.tm_min = atoi(filename + 24);
filename[26] = c;
/* second data whose offset is 26 and length is 2 */
c = filename[28];
filename[28] = '\0';
tmp_time.tm_sec = atoi(filename + 26);
filename[28] = c;
pfree_ext(filename);
/* convert struct tm to time_t */
tm = mktime(&tmp_time);
return tm;
}
static void fill_dirs_list(logdir_scanner scan)
{
char fullpath[MAXPGPATH] = {0};
struct stat st;
DIR* pdir = opendir(scan->top_dir);
if (NULL != pdir) {
struct dirent* ent = NULL;
while (NULL != (ent = readdir(pdir))) {
if (0 == strcmp(ent->d_name, ".") || 0 == strcmp(ent->d_name, "..") ||
(scan->dir_cb && !scan->dir_cb(ent->d_name))) {
continue;
}
int ret = snprintf_s(fullpath, MAXPGPATH, MAXPGPATH - 1, "%s/%s", scan->top_dir, ent->d_name);
securec_check_ss(ret, "", "");
/*
* don't use ent->d_type to judge whether it's a direcotry.
* Only the fields d_name and d_ino are specified in POSIX.1-2001.
* The remaining fields are available on many, but not all systems.
* the d_type field is available mainly only on BSD systems.
*/
if (0 == lstat(fullpath, &st) && !S_ISLNK(st.st_mode) && S_ISDIR(st.st_mode)) {
scan->dir_list = lappend(scan->dir_list, pstrdup(fullpath));
}
}
(void)closedir(pdir);
pdir = NULL;
}
}
static void fill_files_list(logdir_scanner scan)
{
char fullpath[MAXPGPATH] = {0};
DIR* pdir = NULL;
struct dirent* ent = NULL;
struct stat st;
int ret = 0;
pdir = opendir(scan->cur_dir);
if (NULL != pdir) {
Assert(NIL == scan->log_file_list);
while (NULL != (ent = readdir(pdir))) {
/* skip "." and ".." subdir */
if (0 == strcmp(ent->d_name, ".") || 0 == strcmp(ent->d_name, "..")) {
continue;
}
ret = snprintf_s(fullpath, MAXPGPATH, MAXPGPATH - 1, "%s/%s", scan->cur_dir, ent->d_name);
securec_check_ss(ret, "", "");
/*
* don't use ent->d_type to judge whether it's a direcotry.
* Only the fields d_name and d_ino are specified in POSIX.1-2001.
* The remaining fields are available on many, but not all systems.
* the d_type field is available mainly only on BSD systems.
*/
if (0 == lstat(fullpath, &st) && !S_ISLNK(st.st_mode) && S_ISREG(st.st_mode)) {
if (NULL == scan->fname_cb || scan->fname_cb(ent->d_name)) {
log_file_info* log_info = (log_file_info*)palloc0(sizeof(log_file_info));
log_info->name = pstrdup(fullpath);
log_info->tm_create = extract_create_time_from_log_fname(ent->d_name);
scan->log_file_list = lappend(scan->log_file_list, log_info);
}
}
/* else: file doesn't exist, nothing to do */
}
(void)closedir(pdir);
pdir = NULL;
}
}
/*
* get all IP address' strings for this node self.
*/
static List* localhost_to_ipinfo(void)
{
/* first, get hostname */
char hostname[LOGFDW_HOSTNAME_MAXLEN] = {0};
(void)gethostname(hostname, LOGFDW_HOSTNAME_MAXLEN);
struct addrinfo hint;
int ret = memset_s(&hint, sizeof(hint), 0, sizeof(hint));
securec_check(ret, "\0", "\0");
hint.ai_socktype = AI_CANONNAME;
hint.ai_family = AF_INET;
/* get all IPs from hostname */
struct addrinfo* gai_result = NULL;
ret = getaddrinfo(hostname, 0, &hint, &gai_result);
if (ret != 0) {
ereport(ERROR,
(errcode(ERRCODE_SYSTEM_ERROR),
errmsg("could not translate hostname \"%s\" to address: %s", hostname, gai_strerror(ret))));
}
struct addrinfo* ressave = gai_result;
List* thisnode_ips = NIL;
do {
char ip[LOGFDW_LOCAL_IP_LEN] = {0};
struct sockaddr_in* h = (struct sockaddr_in*)gai_result->ai_addr;
const char* ipstr = inet_ntop(AF_INET, &h->sin_addr.s_addr, ip, LOGFDW_LOCAL_IP_LEN);
if (NULL != ipstr) {
ListCell* ip_cell = NULL;
bool found = false;
/* search this ip list and make sure it doesn't appear */
foreach (ip_cell, thisnode_ips) {
if (0 == strcmp(ipstr, (const char*)lfirst(ip_cell))) {
found = true;
break;
}
}
if (!found) {
/* this ip string only occurs once in this list, it's unique */
thisnode_ips = lappend(thisnode_ips, pstrdup(ipstr));
}
/* advance to next ip addr */
gai_result = gai_result->ai_next;
} else {
int tmperr = errno;
ereport(ERROR,
(errcode(ERRCODE_SYSTEM_ERROR), errmsg("inet_ntop() failed(errno %d): %s", tmperr, strerror(tmperr))));
}
} while (NULL != gai_result);
freeaddrinfo(ressave);
return thisnode_ips;
}
/*
* get nodename from pgxc_node table.
* for datanode there is only one tuple in pgxc_node system table.
*/
static inline char* get_nodename_same_to_cn_pgxcnode_table(void)
{
Assert(IS_PGXC_DATANODE);
Relation rel = heap_open(PgxcNodeRelationId, AccessShareLock);
TableScanDesc scan;
scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
HeapTuple tuple = NULL;
char* name = NULL;
/*
* get node name from pgxc_node table in datanode.
* this name is the same to node_name in pgxc_node of CN.
*/
while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) {
Form_pgxc_node nodeForm = (Form_pgxc_node)GETSTRUCT(tuple);
name = pstrdup(NameStr(nodeForm->node_name));
break;
}
heap_endscan(scan);
heap_close(rel, AccessShareLock);
if (NULL == name || '\0' == name[0]) {
ereport(ERROR,
(errcode(ERRCODE_SYSTEM_ERROR), errmsg("Failed to get nodename from pgxc_node which is an empty table")));
}
return name;
}
static void build_hashtbl_of_localhost(HTAB* tmp_htbl, List* localhost_dnnames)
{
List* local_ips = localhost_to_ipinfo();
ListCell* ip_cell = NULL;
char* ip = NULL;
hvalue_nodes* entry = NULL;
bool need_insert = true;
bool found = false;
foreach (ip_cell, local_ips) {
ip = (char*)lfirst(ip_cell);
entry = (hvalue_nodes*)hash_search(tmp_htbl, ip, HASH_FIND, &found);
if (found) {
/* append node name directly to the same ip list */
Assert(entry && entry->nodenames);
entry->nodenames = list_concat(entry->nodenames, localhost_dnnames);
need_insert = false;
break;
}
}
if (need_insert) {
/* insert a new entry into hash table, and min-ip will be selected */
make_string_list_sorted(local_ips, strcmp_wrapper);
ip = (char*)lfirst(list_head(local_ips));
entry = (hvalue_nodes*)hash_search(tmp_htbl, ip, HASH_ENTER, &found);
if (NULL != entry) {
Assert(!found);
entry->nodenames = localhost_dnnames;
} else {
ereport(ERROR,
(errcode(ERRCODE_SYSTEM_ERROR),
errmsg("failed to insert entry into ip-masters hash table for localhost datanode")));
}
}
list_free_deep(local_ips);
}
static void build_hashtbl_of_master_dnname_and_ip(HTAB* tmp_htbl)
{
HeapTuple tuple = NULL;
hvalue_nodes* entry = NULL;
char* master_ip = NULL;
List* localhost_dnnames = NIL;
bool found = false;
Relation rel = heap_open(PgxcNodeRelationId, AccessShareLock);
TableScanDesc scan;
scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
/* scan pgxc_node table and get all info */
while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) {
Form_pgxc_node nodeForm = (Form_pgxc_node)GETSTRUCT(tuple);
/* master node name and ip will be passed down. so only care dn type */
if (PGXC_NODE_DATANODE != nodeForm->node_type) {
continue;
}
master_ip = (nodeForm->hostis_primary) ? NameStr(nodeForm->node_host) : NameStr(nodeForm->node_host1);
if (0 == pg_strcasecmp(master_ip, "localhost") || 0 == pg_strcasecmp(master_ip, "127.0.0.1")) {
/*
* convert localhost to ip address.
* notice: this should not happen in normal cluster configure.
* and it makes when this cluster is build by developers without slaves.
*/
ereport(LOG, (errmsg("\"%s\" should not be configured in pgxc_node for datanode type", master_ip)));
char* master_name = pstrdup(NameStr(nodeForm->node_name));
localhost_dnnames = lappend(localhost_dnnames, makeString(master_name));
} else {
/* put this ip address into hash table */
entry = (hvalue_nodes*)hash_search(tmp_htbl, master_ip, HASH_ENTER, &found);
if (NULL != entry) {
if (!found) {
/* make sure it's NIL at the first time */
entry->nodenames = NIL;
}
char* master_name = pstrdup(NameStr(nodeForm->node_name));
/* use T_String node so it can be push down to datanode */
entry->nodenames = lappend(entry->nodenames, makeString(master_name));
} else {
ereport(ERROR,
(errcode(ERRCODE_SYSTEM_ERROR),
errmsg(
"failed to put ip \"%s\" into hash \"get_ip_nodename_pairs_from_pgxc_node\" ", master_ip)));
}
}
}
heap_endscan(scan);
heap_close(rel, AccessShareLock);
if (NULL != localhost_dnnames) {
/* at last handle all the datanodes in 'localhost' */
build_hashtbl_of_localhost(tmp_htbl, localhost_dnnames);
}
}
static inline void htbl_insert_cn_entry(HTAB* tmp_htbl, const char* host, const char* node)
{
bool found = false;
hentry_reachable_host* entry = NULL;
entry = (hentry_reachable_host*)hash_search(tmp_htbl, host, HASH_ENTER, &found);
if (!found) {
entry->reachable = UHT_UNKNOWN;
entry->has_datanode = false;
entry->nodenames = NIL;
}
entry->nodenames = lappend(entry->nodenames, pstrdup(node));
}
static inline char* standby_node_tag(const char* node)
{
char host_tag[NAMEDATALEN] = {0};
errno_t ret = strcat_s(host_tag, NAMEDATALEN, node);
securec_check(ret, "\0", "\0");
ret = strcat_s(host_tag, NAMEDATALEN, "(standby)");
securec_check(ret, "\0", "\0");
return pstrdup(host_tag);
}
static inline void htbl_insert_dn_entry(HTAB* tmp_htbl, const char* host, const char* node, bool hostis_primary)
{
bool found = false;
hentry_reachable_host* entry = NULL;
entry = (hentry_reachable_host*)hash_search(tmp_htbl, host, HASH_ENTER, &found);
/* init entry */
if (!found) {
entry->reachable = UHT_UNKNOWN;
entry->has_datanode = true;
entry->nodenames = NIL;
}
if (hostis_primary) {
entry->reachable = UHT_OK;
entry->nodenames = lappend(entry->nodenames, pstrdup(node));
} else {
entry->nodenames = lappend(entry->nodenames, standby_node_tag(node));
}
}
static void htbl_insert_localhost_nodes(
HTAB* unreach_htbl, List* localhost_cn, List* localhost_dn, unr_host_type local_reach)
{
List* localhost_ips = localhost_to_ipinfo();
ListCell* ip_cell = NULL;
char* ip = NULL;
hentry_reachable_host* entry = NULL;
bool need_insert = true;
bool found = false;
foreach (ip_cell, localhost_ips) {
ip = (char*)lfirst(ip_cell);
entry = (hentry_reachable_host*)hash_search(unreach_htbl, ip, HASH_FIND, &found);
if (found) {
/* append node name directly to the same ip list */
Assert(entry && entry->nodenames);
entry->nodenames = list_concat(entry->nodenames, localhost_cn);
entry->nodenames = list_concat(entry->nodenames, localhost_dn);
if (UHT_OK != entry->reachable) {
/* if this host not reachable, then it depends on the input local_reach */
entry->reachable = local_reach;
}
entry->has_datanode = entry->has_datanode || (list_length(localhost_dn) > 0);
need_insert = false;
break;
}
}
if (need_insert) {
/* insert a new entry into hash table, and min-ip will be selected */
make_string_list_sorted(localhost_ips, strcmp_wrapper);
ip = (char*)lfirst(list_head(localhost_ips));
entry = (hentry_reachable_host*)hash_search(unreach_htbl, ip, HASH_ENTER, &found);
if (NULL != entry) {
Assert(!found);
entry->nodenames = NIL;
entry->nodenames = list_concat(entry->nodenames, localhost_cn);
entry->nodenames = list_concat(entry->nodenames, localhost_dn);
entry->reachable = local_reach;
entry->has_datanode = (list_length(localhost_dn) > 0);
} else {
ereport(ERROR,
(errcode(ERRCODE_SYSTEM_ERROR),
errmsg("failed to insert entry into ip-masters hash table for localhost datanode")));
}
}
list_free_deep(localhost_ips);
}
static void build_hashtbl_of_unreachable(HTAB* unreach_htbl)
{
Relation rel = heap_open(PgxcNodeRelationId, AccessShareLock);
TableScanDesc scan;
scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
HeapTuple tuple = NULL;
List* localhost_cn = NIL;
List* localhost_dn = NIL;
unr_host_type localhost_reachable = UHT_UNKNOWN;
while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) {
Form_pgxc_node nodeForm = (Form_pgxc_node)GETSTRUCT(tuple);
if (PGXC_NODE_DATANODE == nodeForm->node_type) {
if (0 != pg_strcasecmp(NameStr(nodeForm->node_host), "localhost")) {
/* node_host is master only if hostis_primary is true */
htbl_insert_dn_entry(
unreach_htbl, NameStr(nodeForm->node_host), NameStr(nodeForm->node_name), nodeForm->hostis_primary);
} else {
if (nodeForm->hostis_primary) {
/* localhost is reachable by this master */
localhost_dn = lappend(localhost_dn, pstrdup(NameStr(nodeForm->node_name)));
localhost_reachable = UHT_OK;
} else {
/* localhost reachable is unknown */
localhost_dn = lappend(localhost_dn, standby_node_tag(NameStr(nodeForm->node_name)));
}
}
if (0 != pg_strcasecmp(NameStr(nodeForm->node_host1), "localhost")) {
/* node_host1 is master only if hostis_primary is false */
htbl_insert_dn_entry(unreach_htbl,
NameStr(nodeForm->node_host1),
NameStr(nodeForm->node_name),
!nodeForm->hostis_primary);
} else {
if (!nodeForm->hostis_primary) {
/* localhost is reachable by this master */
localhost_dn = lappend(localhost_dn, pstrdup(NameStr(nodeForm->node_name)));
localhost_reachable = UHT_OK;
} else {
/* localhost reachable is unknown */
localhost_dn = lappend(localhost_dn, standby_node_tag(NameStr(nodeForm->node_name)));
}
}
} else {
if (0 != pg_strcasecmp(NameStr(nodeForm->node_host), "localhost")) {
/* localhost reachable depends primary dn but not cn */
htbl_insert_cn_entry(unreach_htbl, NameStr(nodeForm->node_host), NameStr(nodeForm->node_name));
} else {
/* localhost reachable is unknown */
localhost_cn = lappend(localhost_cn, pstrdup(NameStr(nodeForm->node_name)));
}
}
}
heap_endscan(scan);
heap_close(rel, AccessShareLock);
if (NULL != localhost_cn || NULL != localhost_dn) {
htbl_insert_localhost_nodes(unreach_htbl, localhost_cn, localhost_dn, localhost_reachable);
}
}
static List* get_unreachable_hosts(void)
{
HASHCTL hctl;
int ret = memset_s(&hctl, sizeof(HASHCTL), 0, sizeof(HASHCTL));
securec_check(ret, "\0", "\0");
hctl.keysize = sizeof(hkey_ip);
hctl.entrysize = sizeof(hentry_reachable_host);
hctl.hash = string_hash;
HTAB* unreach_htbl = hash_create("get_unreachable_hosts",
1024, /* default node number */
&hctl,
HASH_ELEM | HASH_FUNCTION);
build_hashtbl_of_unreachable(unreach_htbl);
List* unreach_hosts_list = NIL;
hentry_reachable_host* entry = NULL;
HASH_SEQ_STATUS hseq_stat;
hash_seq_init(&hseq_stat, unreach_htbl);
while ((entry = (hentry_reachable_host*)hash_seq_search(&hseq_stat)) != NULL) {
if (UHT_OK != entry->reachable) {
/* update unreachable reason */
entry->reachable = entry->has_datanode ? UHT_NO_MASTER : UHT_DEPLOY_LIMIT;
/* append unreachable list */
hentry_reachable_host* unreach_host = (hentry_reachable_host*)palloc(sizeof(hentry_reachable_host));
*unreach_host = *entry;
unreach_hosts_list = lappend(unreach_hosts_list, unreach_host);
}
}
hash_destroy(unreach_htbl);
return unreach_hosts_list;
}
static void free_unreachable_hosts(List* unreach)
{
ListCell* host_cell = NULL;
ListCell* next_cell = NULL;
for (host_cell = list_head(unreach); host_cell; host_cell = next_cell) {
/* remember the next cell */
next_cell = lnext(host_cell);
/* get the current cell and free its list */
hentry_reachable_host* host = (hentry_reachable_host*)lfirst(host_cell);
list_free_deep(host->nodenames);
/* detach from list and free this cell */
unreach = list_delete_cell(unreach, host_cell, NULL);
}
Assert(NIL == unreach);
}
List* get_ip_nodename_pairs_from_pgxc_node(void)
{
HASHCTL hctl;
int ret = memset_s(&hctl, sizeof(HASHCTL), 0, sizeof(HASHCTL));
securec_check(ret, "\0", "\0");
hctl.keysize = sizeof(hkey_ip);
hctl.entrysize = sizeof(hvalue_nodes);
hctl.hash = string_hash;
HTAB* tmp_htbl = hash_create("get_ip_nodename_pairs_from_pgxc_node",
1024, /* default node number */
&hctl,
HASH_ELEM | HASH_FUNCTION);
build_hashtbl_of_master_dnname_and_ip(tmp_htbl);
/* construct pair list between ip and nodes */
List* masters = NIL;
hvalue_nodes* entry = NULL;
HASH_SEQ_STATUS hseq_stat;
hash_seq_init(&hseq_stat, tmp_htbl);
while ((entry = (hvalue_nodes*)hash_seq_search(&hseq_stat)) != NULL) {
DistFdwDataNodeTask* pair = makeNode(DistFdwDataNodeTask);
/* IP address */
pair->dnName = pstrdup(NameStr(entry->host_ip.ip));
/* node name list */
pair->task = entry->nodenames;
entry->nodenames = NULL;
masters = lappend(masters, pair);
}
hash_destroy(tmp_htbl);
return masters;
}
static List* core_assign_dir_task(List* dir_list, const int nodes_num, const int this_node_pos)
{
Assert(nodes_num > 0);
Assert(this_node_pos >= 0 && this_node_pos < nodes_num);
List* task = NIL;
ListCell* cell = list_head(dir_list);
int pos = 0;
/* the first dir to scan is in the position of this_node_pos */
for (; (cell) != NULL; (cell) = lnext(cell)) {
if (this_node_pos == pos) {
task = lappend(task, lfirst(cell));
lfirst(cell) = NULL;
break;
} else {
pfree_ext(lfirst(cell));
++pos;
}
}
/* the other dirs to scan is in the position of (nodes_num-1) in each period */
if (NULL != cell) {
/* skip the first dir found */
cell = lnext(cell);
pos = 0;
for (; cell != NULL; cell = lnext(cell)) {
if (nodes_num - 1 == pos) {
task = lappend(task, lfirst(cell));
lfirst(cell) = NULL;
pos = 0;
} else {
pfree_ext(lfirst(cell));
++pos;
}
}
}
list_free(dir_list);
dir_list = NIL;
return task;
}
static void get_and_sort_all_dirs(logdir_scanner scan)
{
/* fill with dir list */
fill_dirs_list(scan);
/* sort all the dirs */
make_string_list_sorted(scan->dir_list, strcmp_wrapper);
}
static void get_dirs_from_ip_nodename_pairs(List* ip_nodename_pairs, logdir_scanner scan)
{
ListCell* pair_cell = NULL;
ListCell* ip_cell = NULL;
List* pairs_found = NIL;
List* local_ips = localhost_to_ipinfo();
/* check all IP addresses */
foreach (ip_cell, local_ips) {
char* ip = (char*)lfirst(ip_cell);
/* to search the ip-node list about my host */
foreach (pair_cell, ip_nodename_pairs) {
DistFdwDataNodeTask* pair = (DistFdwDataNodeTask*)lfirst(pair_cell);
/* dnName stores ip info */
if (0 == pg_strcasecmp(ip, pair->dnName)) {
pairs_found = lappend(pairs_found, pair);
break;
}
}
}
list_free_deep(local_ips);
local_ips = NIL;
char* nodename = get_nodename_same_to_cn_pgxcnode_table();
DistFdwDataNodeTask* pair_found = NULL;
int pos = 0;
bool found = false;
foreach (pair_cell, pairs_found) {
DistFdwDataNodeTask* pair = (DistFdwDataNodeTask*)lfirst(pair_cell);
/* make the nodes list sorted */
make_string_list_sorted(pair->task, str_node_cmp);
ListCell* dn_cell = NULL;
int tmp_pos = 0; /* reset position */
/* task stores all the datanode name */
foreach (dn_cell, pair->task) {
Value* strnode = (Value*)lfirst(dn_cell);
if (0 == pg_strcasecmp(nodename, strnode->val.str)) {
if (found) {
ereport(ERROR,
(errcode(ERRCODE_SYSTEM_ERROR),
errmsg("nodename \"%s\" found twice in ip-masters info list", nodename)));
}
found = true;
/* remember matched list and its position info */
pair_found = pair;
pos = tmp_pos;
break;
}
++tmp_pos;
}
}
if (!found) {
/* this datanode is not found in ip-nodes-pair list */
ereport(ERROR,
(errcode(ERRCODE_SYSTEM_ERROR), errmsg("nodename \"%s\" not found in ip-masters info list", nodename)));
}
pfree_ext(nodename);
/* get the sorted directory list */
get_and_sort_all_dirs(scan);
/* create the task to scan what directories */
scan->dir_list = core_assign_dir_task(scan->dir_list, list_length(pair_found->task), pos);
}
static void fetch_one_dir_to_fill_filelist(logdir_scanner scan, ForeignScanState* node)
{
/* just set cur_dir be null, and don't free it */
scan->cur_dir = NULL;
if (scan->dir_list && list_length(scan->dir_list) > 0) {
scan->cur_dir = (char*)list_nth(scan->dir_list, 0);
scan->dir_list = list_delete_first(scan->dir_list);
fill_files_list(scan);
if (scan->m_latest_files_num > 0) {
make_string_list_sorted(scan->log_file_list, cmp_latest_with_create_time);
scan->log_file_list = list_truncate(scan->log_file_list, scan->m_latest_files_num);
}
/* make this list sorted by creating time */
make_string_list_sorted(scan->log_file_list, rcmp_latest_with_create_time);
logFdwPlanState* festate = (logFdwPlanState*)node->fdw_state;
/* remember the total number of log files */
festate->m_total_logfile_num += list_length(scan->log_file_list);
/* filter and skip some log files by name */
if (festate->m_need_check_logname) {
scan->log_file_list = filter_logname_list_by_name(node, scan->log_file_list);
}
/* filter and skip some log files by time */
if (festate->m_need_check_logtime) {
scan->log_file_list = filter_logname_list_by_time(node, scan->log_file_list);
}
}
}
static inline void free_loginfo(log_file_info*& info)
{
if (NULL != info) {
pfree_ext(info->name);
pfree_ext(info);
}
}
/* topdir should be "$GAUSSLOG/pg_log" or "$GAUSSLOG/gs_profile" */
logdir_scanner new_logdir_scanner(const char* topdir, dirname_match dircb, fname_match fcb)
{
logdir_scanner scan = NULL;
size_t topdir_len = strlen(topdir) + 1;
size_t len = sizeof(ld_scanner) + topdir_len;
int ret = 0;
MemoryContext memcnxt = AllocSetContextCreate(CurrentMemoryContext,
"Log Dir Scanner",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
MemoryContext old_memcnxt = MemoryContextSwitchTo(memcnxt);
scan = (logdir_scanner)palloc0(len);
scan->m_memcnxt = memcnxt;
scan->top_dir = (char*)scan + sizeof(ld_scanner);
ret = memcpy_s(scan->top_dir, topdir_len, topdir, topdir_len);
securec_check(ret, "\0", "\0");
scan->dir_cb = dircb;
scan->fname_cb = fcb;
(void)MemoryContextSwitchTo(old_memcnxt);
return scan;
}
void free_logdir_scanner(logdir_scanner scan)
{
if (NULL != scan) {
MemoryContextDelete(scan->m_memcnxt);
}
}
static inline void set_latest_files_num(logdir_scanner scan, int n)
{
scan->m_latest_files_num = n;
}
/*
* begin to scan a log directory.
*/
static void begin_scan_dirs(logdir_scanner scan, ForeignScanState* node)
{
MemoryContext old_memcnxt = MemoryContextSwitchTo(scan->m_memcnxt);
fetch_one_dir_to_fill_filelist(scan, node);
(void)MemoryContextSwitchTo(old_memcnxt);
}
static inline void copy_dirlist_for_rescan(logdir_scanner scan)
{
MemoryContext old_memcnxt = MemoryContextSwitchTo(scan->m_memcnxt);
/* shallow copy, so dir string cannot be freed */
scan->dir_list_copy = list_copy(scan->dir_list);
(void)MemoryContextSwitchTo(old_memcnxt);
}
static void begin_rescan_dirs(logdir_scanner scan)
{
if (NULL != scan->dir_list) {
list_free(scan->dir_list);
scan->dir_list = NIL;
}
/* reset dir list */
scan->dir_list = list_copy(scan->dir_list_copy);
scan->cur_dir = NULL;
free_loginfo(scan->log_file);
scan->log_file = NULL;
/* free current log files list */
if (scan->log_file_list != NULL) {
ListCell* cell = NULL;
ListCell* next = NULL;
for (cell = list_head(scan->log_file_list); cell; cell = next) {
next = lnext(cell);
log_file_info* file_info = (log_file_info*)lfirst(cell);
free_loginfo(file_info);
scan->log_file_list = list_delete_cell(scan->log_file_list, cell, NULL);
}
}
scan->log_file_list = NULL;
}
/*
* iterate to find a log file and return its name.
*/
log_file_info* iterate_scan_dirs(logdir_scanner scan, ForeignScanState* node)
{
free_loginfo(scan->log_file);
/* fast path: fetch log file directly from files list */
if (scan->log_file_list && list_length(scan->log_file_list) > 0) {
scan->log_file = (log_file_info*)list_nth(scan->log_file_list, 0);
scan->log_file_list = list_delete_first(scan->log_file_list);
return scan->log_file;
}
MemoryContext old_memcnxt = MemoryContextSwitchTo(scan->m_memcnxt);
fetch_one_dir_to_fill_filelist(scan, node);
(void)MemoryContextSwitchTo(old_memcnxt);
if (scan->log_file_list && list_length(scan->log_file_list) > 0) {
scan->log_file = (log_file_info*)list_nth(scan->log_file_list, 0);
scan->log_file_list = list_delete_first(scan->log_file_list);
return scan->log_file;
}
/* no log file */
return NULL;
}
/*
* finish this scan of log directory
*/
void end_scan_dirs(logdir_scanner scan)
{
free_loginfo(scan->log_file);
scan->cur_dir = NULL;
/*
* for LIMIT statement, these don't hold, so ignore them.
* 1) scan->dir_list is always NIL;
* 2) scan->log_file_list is always NIL;
*/
}
void master_only_set_dirlist(logdir_scanner scan)
{
MemoryContext old_memcnxt = MemoryContextSwitchTo(scan->m_memcnxt);
/* top_dir is the only one dir to scan */
scan->dir_list = lappend(scan->dir_list, pstrdup(scan->top_dir));
(void)MemoryContextSwitchTo(old_memcnxt);
}
/* get its regex expression for each option */
static char* regex_of_line_prefix_option(const char opt)
{
switch (opt) {
case 'a': /* application name */
case 'c': /* session id */
case 'd': /* database name */
case 'e': /* error code, both char and digits may occur */
case 'h': /* remote host */
case 'n': /* node name */
case 'r': /* remote host(remote port) */
case 'i': /* command tag */
case 'u': /* user name */
case 'v': /* virtual transaction id */
return "([^ ]+)";
case 'p': /* thread id */
case 'l': /* line number */
case 'x': /* transaction id */
return "(\\d+)";
case 'm': /* timestamp with time zone */
return "(\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}.\\d{3} \\S{2,5})";
case 't': /* timestamp without milliseconds */
case 's': /* session start timestamp */
return "(\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2} \\S{2,5})";
case '%':
return "%";
case ' ':
case 'q':
default:
return "\\s+";
}
}
static void fill_attrmap_of_line_prefix(const char opt, int attpos, int* attmap)
{
switch (opt) {
case 'a': /* application name */
attmap[PGLOG_ATTR_APPNAME] = attpos;
break;
case 'c': /* session id */
attmap[PGLOG_ATTR_SESSION_ID] = attpos;
break;
case 'd': /* database name */
attmap[PGLOG_ATTR_DBNAME] = attpos;
break;
case 'h': /* remote host */
case 'r': /* remote host(remote port) */
attmap[PGLOG_ATTR_REMOTE] = attpos;
break;
case 'n': /* node name */
attmap[PGLOG_ATTR_NODENAME] = attpos;
break;
case 'i': /* command tag */
attmap[PGLOG_ATTR_CMDTAG] = attpos;
break;
case 'u': /* user name */
attmap[PGLOG_ATTR_USERNAME] = attpos;
break;
case 'v': /* virtual transaction id */
attmap[PGLOG_ATTR_VIRTUAL_XID] = attpos;
break;
case 'p': /* thread id */
attmap[PGLOG_ATTR_PID] = attpos;
break;
case 'l': /* line number */
attmap[PGLOG_ATTR_LINENO] = attpos;
break;
case 'x': /* transaction id */
attmap[PGLOG_ATTR_XID] = attpos;
break;
case 'e': /* error code */
attmap[PGLOG_ATTR_ECODE] = attpos;
break;
case 'm': /* timestamp with time zone */
case 't': /* timestamp without milliseconds */
attmap[PGLOG_ATTR_LOGTIME] = attpos;
break;
case 's': /* session start timestamp */
attmap[PGLOG_ATTR_SESSION_START] = attpos;
break;
case '%':
case ' ':
case 'q':
default:
break;
}
}
static void fill_pglog_planstate_from_logft_rel(pglogPlanState* pg_log, Relation ftrel)
{
TupleDesc tupdesc = ftrel->rd_att;
Oid in_func_oid = (Datum)0;
Assert(PGLOG_ATTR_MAX == tupdesc->natts);
for (int i = 0; i < PGLOG_ATTR_MAX; ++i) {
Form_pg_attribute att = tupdesc->attrs[i];
pg_log->allattr_typmod[i] = att->atttypmod;
getTypeInputInfo(att->atttypid, &in_func_oid, &pg_log->allattr_typioparam[i]);
fmgr_info(in_func_oid, &pg_log->allattr_fmgrinfo[i]);
}
}
static void fill_pglog_planstate_from_log_line_prefix(pglogPlanState* pg_log, StringInfo reg)
{
const int format_len = strlen(u_sess->attr.attr_common.Log_line_prefix);
int nattrs = 0;
/* -1 means that this attribute is not set */
for (int j = 0; j < PGLOG_ATTR_MAX; ++j) {
pg_log->attmap[j] = -1;
}
for (int i = 0; i < format_len; i++) {
if (u_sess->attr.attr_common.Log_line_prefix[i] != '%') {
/* it should be a space, so ignore it */
appendStringInfoString(reg, regex_of_line_prefix_option(' '));
continue;
}
/* go to char after '%' */
i++;
if (i < format_len) {
const char opt = u_sess->attr.attr_common.Log_line_prefix[i];
appendStringInfoString(reg, regex_of_line_prefix_option(opt));
fill_attrmap_of_line_prefix(opt, nattrs, pg_log->attmap);
++nattrs;
continue;
}
break; /* format error - ignore it */
}
/* debug query id */
appendStringInfoString(reg, regex_of_line_prefix_option('l'));
appendStringInfoString(reg, regex_of_line_prefix_option(' '));
pg_log->attmap[PGLOG_ATTR_QID] = nattrs;
++nattrs;
/* module name */
appendStringInfoString(reg, regex_of_line_prefix_option('a'));
appendStringInfoString(reg, regex_of_line_prefix_option(' '));
pg_log->attmap[PGLOG_ATTR_MODNAME] = nattrs;
++nattrs;
/* log level */
appendStringInfoString(reg, regex_of_line_prefix_option('a'));
appendStringInfoString(reg, regex_of_line_prefix_option(' '));
pg_log->attmap[PGLOG_ATTR_LEVEL] = nattrs;
++nattrs;
/* log message */
pg_log->attmap[PGLOG_ATTR_MESSAGE] = nattrs;
pg_log->nattrs = nattrs + 1;
}
static void format_regrex_text_datum(pglogPlanState* pg_log)
{
/* set regrex for pg_log line */
StringInfo regrex = makeStringInfo();
appendStringInfo(regrex, "%s", "^");
fill_pglog_planstate_from_log_line_prefix(pg_log, regrex);
appendStringInfo(regrex, "%s", "(.*)$");
pg_log->regrex = CStringGetTextDatum((const char*)regrex->data);
pfree_ext(regrex->data);
pfree_ext(regrex);
}
static void descript_unreachable_nodes(StringInfo str, List* unreach_host)
{
ListCell* host_cell = NULL;
hentry_reachable_host* host = NULL;
ListCell* node_cell = NULL;
char* node = NULL;
foreach (host_cell, unreach_host) {
host = (hentry_reachable_host*)lfirst(host_cell);
appendStringInfoChar(str, '\n');
appendStringInfoString(str, NameStr(host->host_ip.ip));
if (UHT_NO_MASTER == host->reachable) {
appendStringInfoString(str, "(hint: no primary datanode, switchover first)");
} else if (UHT_DEPLOY_LIMIT == host->reachable) {
appendStringInfoString(str, "(hint: deploy limitation)");
} else {
/* nothing to hint */
}
int delim = 0;
foreach (node_cell, host->nodenames) {
appendStringInfoString(str, ((delim > 0) ? ", " : ": "));
++delim;
node = (char*)lfirst(node_cell);
appendStringInfoString(str, node);
}
}
}
/*
* EXPLAIN function: unreachable host and nodes
*/
static void explain_unreachable_nodes(StringInfo explain_str)
{
/* explain which nodes cannot be reachable */
List* unreach_host = get_unreachable_hosts();
if (NULL != unreach_host) {
descript_unreachable_nodes(explain_str, unreach_host);
free_unreachable_hosts(unreach_host);
unreach_host = NULL;
}
}
/*
* check whether val is found in valid_values[] whose size is nvalues.
*/
static bool check_option_value(const char* val, const char** valid_values, size_t nvalues)
{
bool found = false;
for (size_t i = 0; i < nvalues; ++i) {
if (0 == pg_strcasecmp(val, valid_values[i])) {
found = true;
break;
}
}
return found;
}
static void valid_ft_options(List* option_list)
{
ListCell* cell = NULL;
foreach (cell, option_list) {
DefElem* optionDef = (DefElem*)lfirst(cell);
if (0 == pg_strcasecmp(optionDef->defname, OPTION_LOGTYPE)) {
char* typetxt = defGetString(optionDef);
if (!check_option_value(typetxt, valid_log_type, sizeof(valid_log_type) / sizeof(valid_log_type[0]))) {
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("Invalid value \"%s\" of option \"%s\"", typetxt, OPTION_LOGTYPE)));
}
} else if (0 == pg_strcasecmp(optionDef->defname, OPTION_MASTER_ONLY)) {
/*
* just check the validation of 'master_only' option's
* value, so ignore returned value
*/
(void)defGetBoolean(optionDef);
} else if (0 == pg_strcasecmp(optionDef->defname, OPTION_LATEST_FILES)) {
/* not use defGetInt64() becuase this is not a T_Int type node */
int32 latest_files = pg_strtoint32(defGetString(optionDef));
if (latest_files <= 0) {
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("Invalid value \"%d\" of option \"%s\"", latest_files, OPTION_LATEST_FILES),
errdetail("Valid values: 1 ~ 2147483647")));
}
} else /* ALTER FOREIGN TABLE */
{
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("Invalid option \"%s\"", optionDef->defname)));
}
}
}
static List* relation_get_options(Relation ft_rel)
{
List* options = NIL;
Oid ft_relid = RelationGetRelid(ft_rel);
/* Get server OID for the foreign table. */
HeapTuple tp = SearchSysCache1(FOREIGNTABLEREL, ObjectIdGetDatum(ft_relid));
if (!HeapTupleIsValid(tp)) {
ereport(
ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("cache lookup failed for foreign table %u", ft_relid)));
}
bool isnull = false;
Datum values = SysCacheGetAttr(FOREIGNTABLEREL, tp, Anum_pg_foreign_table_ftoptions, &isnull);
if (!isnull) {
options = untransformRelOptions(values);
}
ReleaseSysCache(tp);
return options;
}
static FdwLogType get_logtype_from_options(List* options)
{
ListCell* cell = NULL;
foreach (cell, options) {
DefElem* def = (DefElem*)lfirst(cell);
if (pg_strcasecmp(def->defname, OPTION_LOGTYPE) == 0) {
char* logtype = defGetString(def);
if (pg_strcasecmp(logtype, valid_log_type[FLT_PG_LOG]) == 0) {
return FLT_PG_LOG;
} else if (pg_strcasecmp(logtype, valid_log_type[FLT_GS_PROFILE]) == 0) {
return FLT_GS_PROFILE;
}
}
}
return FLT_UNKNOWN;
}
/* get option "logtype" of this foreign table relation */
static FdwLogType relation_getopt_logtype(Relation ft_rel)
{
List* options = relation_get_options(ft_rel);
FdwLogType t = get_logtype_from_options(options);
list_free_deep(options);
return t;
}
/* get option "master_only" of this foreign table relation */
static bool relation_getopt_masteronly(Relation ft_rel)
{
List* options = relation_get_options(ft_rel);
ListCell* cell = NULL;
bool master_only = false;
foreach (cell, options) {
DefElem* def = (DefElem*)lfirst(cell);
if (pg_strcasecmp(def->defname, OPTION_MASTER_ONLY) == 0) {
master_only = defGetBoolean(def);
break;
}
}
list_free_deep(options);
return master_only;
}
/* get option "latest_files" of this foreign table relation */
static int relation_getopt_latest_files_num(Relation ft_rel)
{
List* options = relation_get_options(ft_rel);
ListCell* cell = NULL;
int latest_files = 0;
foreach (cell, options) {
DefElem* def = (DefElem*)lfirst(cell);
if (pg_strcasecmp(def->defname, OPTION_LATEST_FILES) == 0) {
/* not use defGetInt64() becuase this is not a T_Int type node */
latest_files = pg_strtoint32(defGetString(def));
break;
}
}
list_free_deep(options);
return latest_files;
}
/* "masters_info" list is generted in CN, and passed down to all DNs */
static List* fdw_get_option_masters_info(List* fdw_private)
{
ListCell* cell = NULL;
foreach (cell, fdw_private) {
DefElem* def = (DefElem*)lfirst(cell);
if (0 == strcasecmp(def->defname, logft_opt_masters_info)) {
return (List*)def->arg;
}
}
/*
* there is no masters_info if
* 1) connect to datanode and execute query statement;
* 2) connect to coordinator and run 'EXECUTE DIRECT ON node <query statement>'
* so degrade the log level to DEBUG2.
*/
ereport(DEBUG2,
(errmsg("Start query directly from DN instead of CN"),
errdetail_internal("%s not found in FDW private data", logft_opt_masters_info)));
return NULL;
}
static inline void assign_scanned_directorys(ForeignScanState* node, logdir_scanner dir_scan)
{
List* masters_info = fdw_get_option_masters_info((List*)((ForeignScan*)node->ss.ps.plan)->fdw_private);
if (NULL != masters_info) {
get_dirs_from_ip_nodename_pairs(masters_info, dir_scan);
} else {
/*
* even though this query is sent directly from DN,
* or executed from CN using 'EXECUTE DIRECT ON(datanode) QUERY',
* we still support to scan all these directories on this host.
*/
get_and_sort_all_dirs(dir_scan);
}
}
static void pglog_begin_fs(ForeignScanState* node, int eflags)
{
logFdwPlanState* festate = (logFdwPlanState*)node->fdw_state;
pglogPlanState* pg_log = &festate->log_state.pg_log;
logdir_scanner dir_scan = NULL;
format_regrex_text_datum(pg_log);
fill_pglog_planstate_from_logft_rel(pg_log, node->ss.ss_currentRelation);
if (IS_PGXC_COORDINATOR) {
/* coordinator node doesn't scan data */
return;
}
/* only query log records of this node myself */
if (festate->m_master_only) {
char dn_dir[MAXPGPATH] = {0};
int ret = strncpy_s(dn_dir, MAXPGPATH, u_sess->attr.attr_common.Log_directory, (MAXPGPATH - 1));
securec_check(ret, "\0", "\0");
dir_scan = festate->dir_scan = new_logdir_scanner(dn_dir, NULL, pglog_fname_match);
set_latest_files_num(dir_scan, festate->m_latest_files);
master_only_set_dirlist(dir_scan);
} else {
char* loghome = gs_getenv_r("GAUSSLOG");
check_backend_env(loghome);
if (loghome && '\0' != loghome[0]) {
char top_dir[MAXPGPATH] = {0};
int ret = snprintf_s(top_dir, MAXPGPATH, MAXPGPATH - 1, "%s/pg_log", loghome);
securec_check_ss(ret, "", "");
dir_scan = festate->dir_scan = new_logdir_scanner(top_dir, match_dir_name, pglog_fname_match);
set_latest_files_num(dir_scan, festate->m_latest_files);
assign_scanned_directorys(node, dir_scan);
} else {
ereport(ERROR, (errcode(ERRCODE_SYSTEM_ERROR), errmsg("GAUSSLOG not set")));
}
}
/* filter dirname if needed */
festate->m_total_dirname_num = list_length(dir_scan->dir_list);
if (festate->m_need_check_dirname) {
dir_scan->dir_list = filter_dirname_list(node, dir_scan->dir_list);
}
copy_dirlist_for_rescan(dir_scan);
/* begin to scan directory */
begin_scan_dirs(festate->dir_scan, node);
}
static void set_func_args(FunctionCallInfoData& fcinfo, pglogPlanState* pg_log, int attidx, Datum val)
{
InitFunctionCallInfoData(fcinfo, &pg_log->allattr_fmgrinfo[attidx], 3, InvalidOid, NULL, NULL);
fcinfo.argnull[0] = false;
fcinfo.argnull[1] = false;
fcinfo.argnull[2] = false;
fcinfo.arg[0] = CStringGetDatum(text_to_cstring((const text*)val));
fcinfo.arg[1] = ObjectIdGetDatum(pg_log->allattr_typioparam[attidx]);
fcinfo.arg[2] = Int32GetDatum(pg_log->allattr_typmod[attidx]);
}
static void fill_target_tuple_values(ForeignScanState* node)
{
logFdwPlanState* festate = (logFdwPlanState*)node->fdw_state;
pglogPlanState* pg_log = &festate->log_state.pg_log;
FunctionCallInfoData fcinfo;
Datum text_datum = 0;
#define COPY_TEXT_DATUM(_attr) \
do { \
festate->m_isnull[(_attr)] = false; \
festate->m_values[(_attr)] = pg_log->real_values[pg_log->attmap[(_attr)]]; \
} while (0)
if (pg_log->attmap[PGLOG_ATTR_LOGTIME] >= 0) {
text_datum = pg_log->real_values[pg_log->attmap[(PGLOG_ATTR_LOGTIME)]];
set_func_args(fcinfo, pg_log, PGLOG_ATTR_LOGTIME, text_datum);
festate->m_isnull[PGLOG_ATTR_LOGTIME] = false;
festate->m_values[PGLOG_ATTR_LOGTIME] = FunctionCallInvoke(&fcinfo);
}
if (pg_log->attmap[PGLOG_ATTR_NODENAME] >= 0) {
COPY_TEXT_DATUM(PGLOG_ATTR_NODENAME);
}
if (pg_log->attmap[PGLOG_ATTR_APPNAME] >= 0) {
COPY_TEXT_DATUM(PGLOG_ATTR_APPNAME);
}
if (pg_log->attmap[PGLOG_ATTR_SESSION_START] >= 0) {
text_datum = pg_log->real_values[pg_log->attmap[(PGLOG_ATTR_SESSION_START)]];
set_func_args(fcinfo, pg_log, PGLOG_ATTR_SESSION_START, text_datum);
festate->m_isnull[PGLOG_ATTR_SESSION_START] = false;
festate->m_values[PGLOG_ATTR_SESSION_START] = FunctionCallInvoke(&fcinfo);
}
if (pg_log->attmap[PGLOG_ATTR_SESSION_ID] >= 0) {
COPY_TEXT_DATUM(PGLOG_ATTR_SESSION_ID);
}
if (pg_log->attmap[PGLOG_ATTR_DBNAME] >= 0) {
COPY_TEXT_DATUM(PGLOG_ATTR_DBNAME);
}
if (pg_log->attmap[PGLOG_ATTR_REMOTE] >= 0) {
COPY_TEXT_DATUM(PGLOG_ATTR_REMOTE);
}
if (pg_log->attmap[PGLOG_ATTR_CMDTAG] >= 0) {
COPY_TEXT_DATUM(PGLOG_ATTR_CMDTAG);
}
if (pg_log->attmap[PGLOG_ATTR_USERNAME] >= 0) {
COPY_TEXT_DATUM(PGLOG_ATTR_USERNAME);
}
if (pg_log->attmap[PGLOG_ATTR_VIRTUAL_XID] >= 0) {
COPY_TEXT_DATUM(PGLOG_ATTR_VIRTUAL_XID);
}
if (pg_log->attmap[PGLOG_ATTR_PID] >= 0) {
text_datum = pg_log->real_values[pg_log->attmap[(PGLOG_ATTR_PID)]];
set_func_args(fcinfo, pg_log, PGLOG_ATTR_PID, text_datum);
festate->m_isnull[PGLOG_ATTR_PID] = false;
festate->m_values[PGLOG_ATTR_PID] = FunctionCallInvoke(&fcinfo);
}
/*
* %l in log_line_prefix means the line no in the same transaction, but
* not the same file. in fact, the line no in the same file is usable.
* so use file line no instead of line no in transaction.
*/
if (pg_log->attmap[PGLOG_ATTR_XID] >= 0) {
text_datum = pg_log->real_values[pg_log->attmap[(PGLOG_ATTR_XID)]];
set_func_args(fcinfo, pg_log, PGLOG_ATTR_XID, text_datum);
festate->m_isnull[PGLOG_ATTR_XID] = false;
festate->m_values[PGLOG_ATTR_XID] = FunctionCallInvoke(&fcinfo);
}
if (pg_log->attmap[PGLOG_ATTR_QID] >= 0) {
text_datum = pg_log->real_values[pg_log->attmap[(PGLOG_ATTR_QID)]];
set_func_args(fcinfo, pg_log, PGLOG_ATTR_QID, text_datum);
festate->m_isnull[PGLOG_ATTR_QID] = false;
festate->m_values[PGLOG_ATTR_QID] = FunctionCallInvoke(&fcinfo);
}
if (pg_log->attmap[PGLOG_ATTR_ECODE] >= 0) {
COPY_TEXT_DATUM(PGLOG_ATTR_ECODE);
}
if (pg_log->attmap[PGLOG_ATTR_MODNAME] >= 0) {
COPY_TEXT_DATUM(PGLOG_ATTR_MODNAME);
}
if (pg_log->attmap[PGLOG_ATTR_LEVEL] >= 0) {
COPY_TEXT_DATUM(PGLOG_ATTR_LEVEL);
}
if (pg_log->attmap[PGLOG_ATTR_MESSAGE] >= 0) {
COPY_TEXT_DATUM(PGLOG_ATTR_MESSAGE);
}
}
static const char* skip_left_space(const char* str)
{
char ch = *str;
while ('\0' != ch && (' ' == ch || '\t' == ch)) {
ch = *(++str);
}
return str;
}
static inline void reset_all_attvals(logFdwPlanState* festate, int ncols)
{
/* reset and make all nulls */
for (int i = 0; i < ncols; i++) {
festate->m_isnull[i] = true;
festate->m_values[i] = (Datum)0;
}
}
static inline void set_attval_before_hostname(logFdwPlanState* festate)
{
/* the first column is directory name */
festate->m_isnull[PGLOG_ATTR_DIRNAME] = false;
festate->m_values[PGLOG_ATTR_DIRNAME] = PointerGetDatum(festate->dname);
/* the second column is log file name */
festate->m_isnull[PGLOG_ATTR_FILENAME] = false;
festate->m_values[PGLOG_ATTR_FILENAME] = PointerGetDatum(festate->filename);
}
static void set_attval_before_logdata(logFdwPlanState* festate)
{
set_attval_before_hostname(festate);
/* the third column is host name */
festate->m_isnull[PGLOG_ATTR_HOSTNAME] = false;
festate->m_values[PGLOG_ATTR_HOSTNAME] = festate->m_tmp_hostname_text;
}
static inline void set_not_matched_line_tuple(logFdwPlanState* festate, Datum line)
{
/* not matched */
festate->m_isnull[PGLOG_ATTR_MATCHED] = false;
festate->m_values[PGLOG_ATTR_MATCHED] = BoolGetDatum(false);
/* take it as a whole message text */
festate->m_values[PGLOG_ATTR_MESSAGE] = line;
festate->m_isnull[PGLOG_ATTR_MESSAGE] = false;
}
static inline bool try_to_match_this_line(ForeignScanState* node, Datum line)
{
logFdwPlanState* festate = (logFdwPlanState*)node->fdw_state;
pglogPlanState* pg_log = &festate->log_state.pg_log;
/*
* it's right to set collation be DEFAULT_COLLATION_OID.
* see pg_set_regex_collation() function about collation.
*
* line will be freed in cleanup_regexp_matches() called by regexp_match_to_array().
*/
Datum rs = DirectFunctionCall2Coll(regexp_match_to_array,
DEFAULT_COLLATION_OID, /* default collation */
line,
(festate->log_state.pg_log.regrex));
if (rs) {
/* this line matches regexp */
festate->m_isnull[PGLOG_ATTR_MATCHED] = false;
festate->m_values[PGLOG_ATTR_MATCHED] = BoolGetDatum(true);
ArrayIterator it = array_create_iterator(DatumGetArrayTypeP(rs), 0);
int i = 0;
while (array_iterate(it, &pg_log->real_values[i], &pg_log->real_isnull[i])) {
i++;
}
Assert(i == pg_log->nattrs);
/* fill the other values of this tuple */
fill_target_tuple_values(node);
return true;
}
return false;
}
static bool get_and_open_next_file(ForeignScanState* node)
{
logFdwPlanState* festate = (logFdwPlanState*)node->fdw_state;
festate->logfile_info = iterate_scan_dirs(festate->dir_scan, node);
if (NULL == festate->logfile_info) {
/* no log file to handle */
return false;
}
/* set IO functions according to log file postfix */
set_io_functions(festate);
if (NULL != festate->dname) {
Assert(festate->filename);
pfree_ext(festate->dname);
pfree_ext(festate->filename);
}
festate->filename = fetch_filename_from_logfile(festate->logfile_info->name);
festate->dname = fetch_dirname_from_logfile(festate->logfile_info->name);
festate->m_lineno_of_file = 0;
void* fd = festate->m_open(festate->logfile_info->name);
if (NULL != fd) {
festate->m_vfd = fd;
} else {
ereport(ERROR, (errcode(ERRCODE_IO_ERROR), errmsg("Cannot open log file: %s", festate->logfile_info->name)));
}
/* reset log buffer */
festate->m_log_buf.reset_data_buf();
return true;
}
static inline void prepare_for_next_file(logFdwPlanState* festate)
{
/* close this log file */
void* temp_fd = festate->m_vfd;
festate->m_close(temp_fd);
/* try to handle next log file */
festate->m_vfd = NULL;
festate->logfile_info = NULL;
}
/* get a line from log file */
static bool get_next_log_line(ForeignScanState* node, char*& one_line)
{
logFdwPlanState* festate = (logFdwPlanState*)node->fdw_state;
char* tmp_line = NULL;
/* fast path: read line from buffer directly */
if (festate->m_log_buf.has_unhandled_data()) {
tmp_line = festate->get_next_line();
if (NULL != tmp_line) {
/* get a line of log data */
one_line = tmp_line;
/* update line no */
festate->m_lineno_of_file++;
return true;
} else {
/* continue to read log data or next log file */
festate->m_log_buf.handle_buffered_data();
}
} else {
/* after the log_buf is all used at first time, pointer should set to the first position.*/
festate->m_log_buf.handle_buffered_data();
}
read_logdata:
if (NULL != festate->m_vfd) {
if (festate->continue_to_load_logdata()) {
tmp_line = festate->get_next_line();
if (NULL != tmp_line) {
/* get a line of log data */
one_line = tmp_line;
/* update line no */
festate->m_lineno_of_file++;
return true;
}
/* this is an incompleted line, discard it */
++festate->m_incompleted_files;
}
prepare_for_next_file(festate);
}
/* get and handle the next log file */
if (get_and_open_next_file(node)) {
goto read_logdata;
}
return false;
}
static bool get_next_log_record(ForeignScanState* node)
{
logFdwPlanState* festate = (logFdwPlanState*)node->fdw_state;
prflog_parse* parser = festate->log_state.profile_log.parser;
MemoryContext oldcnxt = NULL;
Datum* vals = festate->m_values + PRFLOG_SYSATTR_NUM;
bool* nuls = festate->m_isnull + PRFLOG_SYSATTR_NUM;
int ncols = node->ss.ss_currentRelation->rd_att->natts - PRFLOG_SYSATTR_NUM;
bool next_found = true;
next_record:
/* fast path: read line from buffer directly */
if (festate->m_log_buf.has_unhandled_data()) {
oldcnxt = MemoryContextSwitchTo(festate->m_pertup_memcnxt);
next_found = parser->iter_next(&(festate->m_log_buf), nuls, vals, ncols);
(void)MemoryContextSwitchTo(oldcnxt);
if (next_found) {
set_attval_before_hostname(festate);
festate->m_lineno_of_file++;
return true; /* ok to get the next record */
} else {
/* continue to read log data or next log file */
festate->m_log_buf.handle_buffered_data();
}
} else {
/* after the log_buf is all used at first time, pointer should set to the first position.*/
festate->m_log_buf.handle_buffered_data();
}
/* continue to load data and iter next tuple */
if (NULL != festate->m_vfd) {
if (festate->continue_to_load_logdata()) {
oldcnxt = MemoryContextSwitchTo(festate->m_pertup_memcnxt);
next_found = parser->iter_next(&(festate->m_log_buf), nuls, vals, ncols);
(void)MemoryContextSwitchTo(oldcnxt);
if (next_found) {
set_attval_before_hostname(festate);
festate->m_lineno_of_file++;
return true; /* ok to get the next record */
}
/* this is an incompleted line, discard it */
++festate->m_incompleted_files;
}
parser->iter_end();
prepare_for_next_file(festate);
}
next_file:
/* get and handle the next log file */
if (get_and_open_next_file(node)) {
/* begin to parse this log file and accessing its each record */
parser->set(LOG_TYPE_PLOG);
/* load data and begin to iter scanning */
if (festate->continue_to_load_logdata()) {
if (0 == parser->iter_begin(&festate->m_log_buf)) {
goto next_record;
}
}
/* this file is not imcompleted, so ignore it */
++festate->m_incompleted_files;
/* skip this log file */
parser->iter_end();
prepare_for_next_file(festate);
goto next_file;
}
/* no more any file to handle */
return false;
}
static inline bool finish_scan_fast_path(logFdwPlanState* festate)
{
/*
* it doesn't need to scan directory if
* 1) dir_scan is null when run "execute direct on (CN) subquery"
* 2) refuted by this hostname
*/
return (NULL == festate->dir_scan || festate->m_refuted_by_hostname);
}
/*
*Replace str to another
*/
static int replace_tz(char* sSrc, const char* sMatchStr, const char* sReplaceStr)
{
int StringLen = 0;
char caNewString[MAX_LINE_LEN] = {0};
errno_t rc = 0;
if (strlen(sSrc) > MAX_LINE_LEN) {
return -1;
}
if (NULL == sMatchStr) {
return -1;
}
char* FindPos = strstr(sSrc, sMatchStr);
/*if sSrc does not contain sMatchStr, do nothing*/
if (NULL == FindPos) {
return 0;
}
while (NULL != FindPos) {
rc = memset_s(caNewString, MAX_LINE_LEN, 0, MAX_LINE_LEN);
securec_check_c(rc, "\0", "\0");
StringLen = FindPos - sSrc;
rc = strncpy_s(caNewString, MAX_LINE_LEN, sSrc, StringLen);
securec_check_c(rc, "\0", "\0");
if ((StringLen + strlen(sReplaceStr)) >= MAX_LINE_LEN) {
return -1;
}
rc = strcat_s(caNewString, MAX_LINE_LEN, sReplaceStr);
securec_check_c(rc, "\0", "\0");
if ((StringLen + strlen(sReplaceStr) + strlen(FindPos + strlen(sMatchStr))) >= MAX_LINE_LEN) {
return -1;
}
rc = strcat_s(caNewString, MAX_LINE_LEN, FindPos + strlen(sMatchStr));
securec_check_c(rc, "\0", "\0");
rc = strcpy_s(sSrc, MAX_LINE_LEN, caNewString);
securec_check_c(rc, "\0", "\0");
FindPos = strstr(sSrc, sMatchStr);
}
return 0;
}
/* Timezone CST abbreviate is not unique,find map from timezone abbreviate to digit,
+ is EAST Timezone,- is WEAT timezone*/
static char* trans_tzname_to_digit(const char* tzname, const char* abbrevs)
{
/* Timezone for China Standard Time */
if (strcmp(tzname, "PRC") == 0) {
return "+8";
} else if (strcmp(tzname, "Asia/Beijing") == 0) {
return "+8";
} else if (strcmp(tzname, "Asia/Shanghai") == 0) {
return "+8";
} else if (strcmp(tzname, "Asia/Chongqing") == 0) {
return "+8";
} else if (strcmp(tzname, "Asia/Harbin") == 0) {
return "+8";
} else if (strcmp(tzname, "Asia/Taipei") == 0) {
return "+8";
} else if (strcmp(tzname, "Asia/Macao") == 0) {
return "+8";
} else if (strcmp(tzname, "Asia/Urumqi") == 0) {
return "+8";
}
/* Timezone for Central Standard Time */
else if (strcmp(tzname, "Australia/Darwin") == 0) {
return "+9:30";
}
/* Timezone for Cuba Standard Time */
else if (strcmp(tzname, "America/Havana") == 0 && strcmp(abbrevs, "CST") == 0) {
return "-5";
} else if (strcmp(tzname, "Cuba") == 0 && strcmp(abbrevs, "CST") == 0) {
return "-5";
}
/* Timezone for Cuba Standard Time with daylight*/
else if (strcmp(tzname, "America/Havana") == 0 && strcmp(abbrevs, "CDT") == 0) {
return "-4";
} else if (strcmp(tzname, "Cuba") == 0 && strcmp(abbrevs, "CDT") == 0) {
return "-4";
} else {
return NULL;
}
}
static char* get_abbrevs_name()
{
struct timeval tv;
pg_time_t stamp_time;
gettimeofday(&tv, NULL);
stamp_time = (pg_time_t)tv.tv_sec;
return const_cast<char*>(pg_get_abbrevs_name(&stamp_time, log_timezone));
}
static HeapTuple pglog_get_next_tuple(ForeignScanState* node)
{
logFdwPlanState* festate = (logFdwPlanState*)node->fdw_state;
char* line = NULL;
if (!get_next_log_line(node, line)) {
return NULL;
}
Assert(PGLOG_ATTR_MAX == node->ss.ss_currentRelation->rd_att->natts);
reset_all_attvals(festate, PGLOG_ATTR_MAX);
MemoryContext old_memcnxt = MemoryContextSwitchTo(festate->m_pertup_memcnxt);
set_attval_before_logdata(festate);
const char* trimed_line = skip_left_space(line);
char* abbrevs_tz = get_abbrevs_name();
char* digit_tz = trans_tzname_to_digit(pg_get_timezone_name(log_timezone), abbrevs_tz);
if (trimed_line == line) {
/* trans CST digit timezone */
if (NULL != digit_tz) {
(void)replace_tz(line, abbrevs_tz, digit_tz);
}
if (!try_to_match_this_line(node, CStringGetTextDatum((const char*)line))) {
/* failed to match */
set_not_matched_line_tuple(festate, CStringGetTextDatum((const char*)line));
}
} else {
/* trans CST digit timezone */
if (NULL != digit_tz) {
(void)replace_tz((char*)trimed_line, abbrevs_tz, digit_tz);
}
/* in most cases it's not matched if this line starts with space */
set_not_matched_line_tuple(festate, CStringGetTextDatum(trimed_line));
}
/* set line no */
festate->m_isnull[PGLOG_ATTR_LINENO] = false;
festate->m_values[PGLOG_ATTR_LINENO] = Int64GetDatum(festate->m_lineno_of_file);
/* build a tuple */
HeapTuple tuple = heap_form_tuple(node->ss.ss_currentRelation->rd_att, festate->m_values, festate->m_isnull);
(void)MemoryContextSwitchTo(old_memcnxt);
return tuple;
}
/*
* log_iterate_foreign_scan
*
*/
static TupleTableSlot* pglog_iterate_fs(ForeignScanState* node)
{
logFdwPlanState* festate = (logFdwPlanState*)node->fdw_state;
if (!finish_scan_fast_path(festate)) {
HeapTuple tup = NULL;
if (festate->m_tuple_cur < festate->m_tuple_num) {
/* fast path, return the next tuple from its cache */
TupleTableSlot* slot = node->ss.ss_ScanTupleSlot;
tup = festate->m_tuple_buf[festate->m_tuple_cur++];
ExecStoreTuple(tup, slot, InvalidBuffer, false);
return slot;
}
/* reset this tuple cache */
festate->m_tuple_cur = 0;
festate->m_tuple_num = 0;
/* reset per-tuple memory context before this iter */
MemoryContextReset(festate->m_pertup_memcnxt);
/* try to fill this cache with many tuples */
int ntuples = 0;
PERF_TRACE(TRACK_START, GET_PG_LOG_TUPLES);
while (ntuples < LOGFDW_TUPLE_CACHE_SIZE) {
tup = pglog_get_next_tuple(node);
if (tup) {
festate->m_tuple_buf[ntuples++] = tup;
} else {
break;
}
}
PERF_TRACE(TRACK_END, GET_PG_LOG_TUPLES);
if (ntuples > 0) {
/* return the first tuple in cache */
HeapTuple tuple = festate->m_tuple_buf[0];
/* update tuple cache info */
festate->m_tuple_num = ntuples;
festate->m_tuple_cur = 1;
TupleTableSlot* slot = node->ss.ss_ScanTupleSlot;
ExecStoreTuple(tuple, slot, InvalidBuffer, false);
return slot;
}
}
/* there is no any tuple */
return NULL;
}
static TupleTableSlot* iterate_fs_without_logdata(ForeignScanState* node)
{
logFdwPlanState* festate = (logFdwPlanState*)node->fdw_state;
if (finish_scan_fast_path(festate)) {
return NULL; /* no matter this host */
}
/* get the next log file */
festate->logfile_info = iterate_scan_dirs(festate->dir_scan, node);
if (NULL == festate->logfile_info) {
return NULL; /* no other log file */
}
if (NULL != festate->dname) {
Assert(festate->filename);
pfree_ext(festate->dname);
pfree_ext(festate->filename);
}
/* set filename and dirname */
festate->filename = fetch_filename_from_logfile(festate->logfile_info->name);
festate->dname = fetch_dirname_from_logfile(festate->logfile_info->name);
/* set all values be null */
reset_all_attvals(festate, node->ss.ss_currentRelation->rd_att->natts);
/* reset per-tuple memory context before this iter */
MemoryContextReset(festate->m_pertup_memcnxt);
MemoryContext old_memcnxt = MemoryContextSwitchTo(festate->m_pertup_memcnxt);
set_attval_before_logdata(festate);
/* build a tuple */
HeapTuple tuple = heap_form_tuple(node->ss.ss_currentRelation->rd_att, festate->m_values, festate->m_isnull);
TupleTableSlot* slot = node->ss.ss_ScanTupleSlot;
ExecClearTuple(slot);
ExecStoreTuple(tuple, slot, InvalidBuffer, false);
(void)MemoryContextSwitchTo(old_memcnxt);
return slot;
}
static void append_explain_string(StringInfo str, StringInfo tmpstr, int indent, bool master_only, bool need_log_data)
{
appendStringInfoSpaces(str, indent);
if (master_only) {
appendStringInfo(str, "unreachable nodes: none (master only)\n");
} else {
resetStringInfo(tmpstr);
explain_unreachable_nodes(tmpstr);
if (tmpstr->len > 0) {
appendStringInfo(str, "(unreachable nodes: %s)", tmpstr->data);
} else {
appendStringInfo(str, "(unreachable nodes: none)");
}
appendStringInfo(str, "(need load log data: %s)\n", (need_log_data ? "yes" : "no"));
}
}
/*
* log_explain_foreign_scan
*
*/
static void pglog_explain_fs(ForeignScanState* node, ExplainState* es)
{
bool master_only = relation_getopt_masteronly(node->ss.ss_currentRelation);
bool need_log_data = target_list_need_log_data(node);
StringInfoData explain_str;
initStringInfo(&explain_str);
/* different formats */
if (EXPLAIN_NORMAL != t_thrd.explain_cxt.explain_perf_mode && es->planinfo->m_staticInfo) {
es->planinfo->m_staticInfo->set_plan_name<true, true>();
append_explain_string(es->planinfo->m_staticInfo->info_str, &explain_str, 0, master_only, need_log_data);
}
if (es->format == EXPLAIN_FORMAT_TEXT) {
append_explain_string(es->str, &explain_str, es->indent * 2, master_only, need_log_data);
} else {
if (master_only) {
ExplainPropertyText("unreachable nodes", "none (master only)", es);
} else {
resetStringInfo(&explain_str);
explain_unreachable_nodes(&explain_str);
if (explain_str.len > 0) {
ExplainPropertyText("unreachable nodes", explain_str.data, es);
} else {
ExplainPropertyText("unreachable nodes", "none", es);
}
}
ExplainPropertyText("need load log data", (need_log_data ? "yes" : "no"), es);
}
pfree_ext(explain_str.data);
}
/*
* log_end_foreign_scan
* Finish scanning foreign table and dispose objects used for this scan
*/
static void pglog_end_fs(ForeignScanState* node)
{
logFdwPlanState* festate = (logFdwPlanState*)node->fdw_state;
if (NULL != festate) {
if (NULL != festate->dir_scan) {
end_scan_dirs(festate->dir_scan);
free_logdir_scanner(festate->dir_scan);
festate->dir_scan = NULL;
}
/* all the other memory will be freed by destroying memorycontext */
}
}
/*
* pglog_rescan_fs
* Rescan table, possibly with new parameters
*/
static void pglog_rescan_fs(ForeignScanState* node)
{
if (IS_PGXC_COORDINATOR) {
return;
}
/* now nothing to do with logFdwPlanState.log_state.pg_log */
/* prepare for next scan */
logFdwPlanState* festate = (logFdwPlanState*)node->fdw_state;
begin_scan_dirs(festate->dir_scan, node);
}
/*
* log_begin_foreign_scan
*/
static void log_begin_foreign_scan(ForeignScanState* node, int eflags)
{
logFdwPlanState* festate = (logFdwPlanState*)palloc0(sizeof(logFdwPlanState));
MemoryContext memcnxt = AllocSetContextCreate(CurrentMemoryContext,
"Log Data Foreign Scan",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
MemoryContext old_memcnxt = MemoryContextSwitchTo(memcnxt);
node->fdw_state = (void*)festate;
festate->m_memcnxt = memcnxt;
/* make m_pertup_memcnxt under my top m_memcnxt */
festate->m_pertup_memcnxt = AllocSetContextCreate(memcnxt,
"Per Tuple of Log Data Foreign Scan",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
int ncols = node->ss.ss_currentRelation->rd_att->natts;
festate->m_isnull = (bool*)palloc0(ncols * sizeof(bool));
festate->m_values = (Datum*)palloc0(ncols * sizeof(Datum));
festate->m_master_only = relation_getopt_masteronly(node->ss.ss_currentRelation);
festate->m_latest_files = relation_getopt_latest_files_num(node->ss.ss_currentRelation);
festate->m_need_read_logdata = target_list_need_log_data(node);
festate->m_tmp_hostname_text = get_hostname_text();
FdwLogType logypte = relation_getopt_logtype(node->ss.ss_currentRelation);
fillup_need_check_flags(node, logypte);
if (festate->m_need_check_hostname) {
festate->m_refuted_by_hostname = whether_refuted_by_hostname(node);
}
if (IS_PGXC_DATANODE) {
/* data buffer for log file */
logdata_buf::init(&festate->m_log_buf, LOGFDW_BUFFER_SZ);
}
festate->m_plan_node_id = node->ss.ps.plan->plan_node_id;
switch (logypte) {
case FLT_PG_LOG:
festate->begin_foreign_scan = pglog_begin_fs;
festate->iterate_foreign_scan =
festate->m_need_read_logdata ? pglog_iterate_fs : iterate_fs_without_logdata;
festate->end_foreign_scan = pglog_end_fs;
festate->explain_foreign_scan = pglog_explain_fs;
festate->rescan_foreign_scan = pglog_rescan_fs;
break;
case FLT_GS_PROFILE:
festate->begin_foreign_scan = profilelog_begin_fs;
festate->iterate_foreign_scan =
festate->m_need_read_logdata ? profilelog_iterate_fs : iterate_fs_without_logdata;
festate->end_foreign_scan = profilelog_end_fs;
festate->explain_foreign_scan = profilelog_explain_fs;
festate->rescan_foreign_scan = profilelog_rescan_fs;
break;
default:
break;
}
festate->begin_foreign_scan(node, eflags);
(void)MemoryContextSwitchTo(old_memcnxt);
}
/*
* log_iterate_foreign_scan
*
*/
static TupleTableSlot* log_iterate_foreign_scan(ForeignScanState* node)
{
logFdwPlanState* festate = (logFdwPlanState*)node->fdw_state;
festate->m_need_timing = (node->ss.ps.instrument && node->ss.ps.instrument->need_timer);
MemoryContext old_memcnxt = MemoryContextSwitchTo(festate->m_memcnxt);
TupleTableSlot* slot = festate->iterate_foreign_scan(node);
(void)MemoryContextSwitchTo(old_memcnxt);
return slot;
}
/*
* log_explain_foreign_scan
*
*/
static void log_explain_foreign_scan(ForeignScanState* node, ExplainState* es)
{
logFdwPlanState* festate = (logFdwPlanState*)node->fdw_state;
festate->explain_foreign_scan(node, es);
}
/*
* log_end_foreign_scan
* Finish scanning foreign table and dispose objects used for this scan
*/
static void log_end_foreign_scan(ForeignScanState* node)
{
logFdwPlanState* festate = (logFdwPlanState*)node->fdw_state;
/* finish this foreign scan */
festate->end_foreign_scan(node);
logdata_buf::deinit(&festate->m_log_buf);
/* destroy private memory context */
MemoryContextDelete(festate->m_memcnxt);
Instrumentation* instr = node->ss.ps.instrument;
if (NULL != instr) {
/* explain type */
instr->dfsType = TYPE_LOG_FT;
/* host name info */
instr->minmaxCheckStripe = 1;
instr->minmaxFilterStripe = festate->m_refuted_by_hostname ? 1 : 0;
/* dirs info */
instr->minmaxCheckStride = festate->m_total_dirname_num;
instr->minmaxFilterStride = festate->m_refuted_dirname_num;
/* files info */
instr->minmaxCheckFiles = festate->m_total_logfile_num;
instr->minmaxFilterFiles = festate->m_refuted_logfile_num_by_name + festate->m_refuted_logfile_num_by_time;
instr->dynamicPrunFiles = festate->m_latest_files;
instr->staticPruneFiles = festate->m_incompleted_files;
}
/* destroy logFdwPlanState object */
pfree_ext(festate);
node->fdw_state = NULL;
}
/*
* log_rescan_foreign_scan
* Rescan table, possibly with new parameters
*/
static void log_rescan_foreign_scan(ForeignScanState* node)
{
logFdwPlanState* festate = (logFdwPlanState*)node->fdw_state;
MemoryContext old_memcnxt = MemoryContextSwitchTo(festate->m_memcnxt);
/* clean up EXPLAIN data */
festate->m_total_logfile_num = 0;
festate->m_refuted_logfile_num_by_name = 0;
festate->m_refuted_logfile_num_by_time = 0;
festate->m_incompleted_files = 0;
/* close file if needed */
if (NULL != festate->m_vfd) {
festate->m_close(festate->m_vfd);
}
festate->m_vfd = NULL;
/* free memory if needed */
if (NULL != festate->dname) {
pfree_ext(festate->dname);
}
if (NULL != festate->filename) {
pfree_ext(festate->filename);
}
festate->logfile_info = NULL;
festate->m_tuple_cur = 0;
festate->m_tuple_num = 0;
MemoryContextReset(festate->m_pertup_memcnxt);
/* reset log data buffer */
festate->m_log_buf.reset_data_buf();
if (festate->dir_scan != NULL) {
begin_rescan_dirs(festate->dir_scan);
}
festate->rescan_foreign_scan(node);
(void)MemoryContextSwitchTo(old_memcnxt);
}
static ForeignScan* log_get_foreign_plan(
PlannerInfo* root, RelOptInfo* baserel, Oid foreigntableid, ForeignPath* best_path, List* tlist, List* scan_clauses)
{
Index scan_relid = baserel->relid;
List* fdw_data = best_path->fdw_private;
if (IS_PGXC_COORDINATOR) {
if (!IS_STREAM) {
ereport(WARNING, (errmsg("log_fdw enter non-stream mode when getting foreign plan")));
}
Relation rel = heap_open(foreigntableid, AccessShareLock);
bool master_only = relation_getopt_masteronly(rel);
heap_close(rel, AccessShareLock);
if (!master_only) {
/* get pairs about IP and datanode name list from pgxc_node tables */
List* ip_dnname_pairs = get_ip_nodename_pairs_from_pgxc_node();
/* append inner option <logft_opt_masters_info> */
fdw_data = lappend(fdw_data, makeDefElem(pstrdup(logft_opt_masters_info), (Node*)ip_dnname_pairs));
}
/* hint the caller all the unreachable nodes if needed */
List* unreach_host = get_unreachable_hosts();
if (NULL != unreach_host) {
StringInfoData unreach_details;
initStringInfo(&unreach_details);
descript_unreachable_nodes(&unreach_details, unreach_host);
ereport(INFO, (errmsg("some hosts/nodes unreachalbe"), errdetail("%s", unreach_details.data)));
pfree_ext(unreach_details.data);
free_unreachable_hosts(unreach_host);
unreach_host = NULL;
}
}
/*
* We have no native ability to evaluate restriction clauses, so we just
* put all the scan_clauses into the plan node's qual list for the
* executor to check. So all we have to do here is strip RestrictInfo
* nodes from the clauses and ignore pseudoconstants (which will be
* handled elsewhere).
*/
scan_clauses = extract_actual_clauses(scan_clauses, false);
best_path->fdw_private = fdw_data;
/* Create the ForeignScan node */
ForeignScan* fscan = make_foreignscan(tlist,
scan_clauses,
scan_relid,
NIL, /* no expressions to evaluate */
fdw_data,
EXEC_ON_DATANODES); /* no private state either */
return fscan;
}
static void log_get_foreign_paths(PlannerInfo* root, RelOptInfo* baserel, Oid foreigntableid)
{
logFdwPlanState* fdw_private = (logFdwPlanState*)baserel->fdw_private;
Cost startup_cost = 0;
Cost total_cost = 0;
/* Estimate costs */
estimate_costs(root, baserel, fdw_private, &startup_cost, &total_cost);
/* Create a ForeignPath node and add it as only possible path */
add_path(root,
baserel,
(Path*)create_foreignscan_path(root,
baserel,
startup_cost,
total_cost,
NIL, /* no pathkeys */
NULL, /* no outer rel either */
NIL,
1)); /* no fdw_private data */
/*
* If data file was sorted, and we knew it somehow, we could insert
* appropriate pathkeys into the ForeignPath node to tell the planner
* that.
*/
}
/*
* Estimate size of a foreign table.
*
* The main result is returned in baserel->rows. We also set
* fdw_private->pages and fdw_private->ntuples for later use in the cost
* calculation.
*/
static void estimate_size(PlannerInfo* root, RelOptInfo* baserel, logFdwPlanState* fdw_private)
{
/* Save the output-rows estimate for the planner */
baserel->rows = 100;
}
static void log_get_foreign_relsize(PlannerInfo* root, RelOptInfo* baserel, Oid foreigntableid)
{
logFdwPlanState* fdw_private = NULL;
/* Estimate relation size */
estimate_size(root, baserel, fdw_private);
}
/*
* Estimate costs of scanning a foreign table.
*
* Results are returned in *startup_cost and *total_cost.
*/
static void estimate_costs(
PlannerInfo* root, RelOptInfo* baserel, logFdwPlanState* fdw_private, Cost* startup_cost, Cost* total_cost)
{
*total_cost = *startup_cost = 100;
}
static bool match_dir_name(const char* subdir_name)
{
if (0 == strncmp(subdir_name, "cn_", 3) || 0 == strncmp(subdir_name, "dn_", 3)) {
return true;
}
return false;
}
static bool all_digits(const char* start, size_t len)
{
for (size_t i = 0; i < len; ++i) {
if (isdigit(*start++))
continue;
return false;
}
return true;
}
static bool log_fname_matched_with_pattern(const char* fname)
{
const size_t minlen = strlen("postgresql-YYYY-MM-DD_HHMMSS.log");
size_t len = strlen(fname);
if (len >= minlen) {
if (0 == strncmp("postgresql-", fname, 11) && '-' == fname[15] && '-' == fname[18] && '_' == fname[21] &&
all_digits(fname + 11, 4) && /* year */
all_digits(fname + 16, 2) && /* month */
all_digits(fname + 19, 2) && /* day */
all_digits(fname + 22, 6) /* hour, minute, second */
)
return true;
}
return false;
}
static bool pglog_fname_match(const char* log_name)
{
/*
* supported formats includes:
* 1) postgresql-YYYY-MM-DD_HHMMSS.log
* 2) postgresql-YYYY-MM-DD_HHMMSS.zip
* 3) postgresql-YYYY-MM-DD_HHMMSS.log.gz
*/
const size_t exp_len = strlen("postgresql-YYYY-MM-DD_HHMMSS.log");
const size_t exp_len2 = strlen("postgresql-YYYY-MM-DD_HHMMSS.log.gz");
size_t len = strlen(log_name);
if ((exp_len == len) && log_fname_matched_with_pattern(log_name)) {
if ((0 == strncmp(log_name + (len - 4), ".log", 4)) || (0 == strncmp(log_name + (len - 4), ".zip", 4))) {
return true;
}
} else if ((exp_len2 == len) && log_fname_matched_with_pattern(log_name)) {
if (0 == strncmp(log_name + (len - 7), ".log.gz", 7)) {
return true;
}
}
return false;
}
static bool profilelog_fname_match(const char* log_name)
{
/*
* supported formats includes:
* 1) postgresql-YYYY-MM-DD_HHMMSS.prf
* 2) postgresql-YYYY-MM-DD_HHMMSS.zip
*/
const size_t exp_len = strlen("postgresql-YYYY-MM-DD_HHMMSS.prf");
size_t len = strlen(log_name);
if ((exp_len == len) && log_fname_matched_with_pattern(log_name)) {
if ((0 == strncmp(log_name + (len - 4), ".prf", 4)) || (0 == strncmp(log_name + (len - 4), ".zip", 4))) {
return true;
}
}
return false;
}
/*
* because profile log data is buffered and to flush only when this
* buffer is full. so in order to access these buffered data, we have to
* flush them before starting query.
* it's done by two steps:
* 1) set JUST-TO-FLUSH request flag, but not rotate log file;
* 2) send signal to postmaster who will request syslogger thread to
* flush buffer. this signal PMSIGNAL_ROTATE_LOGFILE will be shared by
* both ROTATION and FLUSH actions.
*/
static void profilelog_flush_buffered_data(void)
{
set_flag_to_flush_buffer();
(void)pg_rotate_logfile(NULL);
}
static void profilelog_begin_fs(ForeignScanState* node, int eflags)
{
if (IS_PGXC_COORDINATOR) {
/* coordinator node doesn't scan data */
return;
}
logFdwPlanState* festate = (logFdwPlanState*)node->fdw_state;
logdir_scanner dir_scan = NULL;
char* loghome = gs_getenv_r("GAUSSLOG");
check_backend_env(loghome);
if (loghome && '\0' != loghome[0]) {
/* request to flush buffered data before scan files */
profilelog_flush_buffered_data();
char top_dir[MAXPGPATH] = {0};
if (festate->m_master_only) {
int ret = snprintf_s(top_dir,
MAXPGPATH,
MAXPGPATH - 1,
"%s/gs_profile/%s",
loghome,
g_instance.attr.attr_common.PGXCNodeName);
securec_check_ss(ret, "", "");
dir_scan = festate->dir_scan = new_logdir_scanner(top_dir, NULL, profilelog_fname_match);
set_latest_files_num(dir_scan, festate->m_latest_files);
master_only_set_dirlist(dir_scan);
} else {
int ret = snprintf_s(top_dir, MAXPGPATH, MAXPGPATH - 1, "%s/gs_profile", loghome);
securec_check_ss(ret, "", "");
/* prepare for scanning log directory and log files */
dir_scan = festate->dir_scan = new_logdir_scanner(top_dir, match_dir_name, profilelog_fname_match);
set_latest_files_num(dir_scan, festate->m_latest_files);
assign_scanned_directorys(node, dir_scan);
}
festate->m_total_dirname_num = list_length(dir_scan->dir_list);
if (festate->m_need_check_dirname) {
dir_scan->dir_list = filter_dirname_list(node, dir_scan->dir_list);
}
copy_dirlist_for_rescan(dir_scan);
begin_scan_dirs(dir_scan, node);
} else {
ereport(ERROR, (errcode(ERRCODE_SYSTEM_ERROR), errmsg("GAUSSLOG not set")));
}
/* data parser object for profile log */
prflog_parse* parser = (prflog_parse*)palloc0(sizeof(prflog_parse));
parser->init();
festate->log_state.profile_log.parser = parser;
}
static TupleTableSlot* profilelog_iterate_fs(ForeignScanState* node)
{
logFdwPlanState* festate = (logFdwPlanState*)node->fdw_state;
if (!finish_scan_fast_path(festate)) {
if (festate->m_tuple_cur < festate->m_tuple_num) {
/* fast path, return the next tuple from its cache */
TupleTableSlot* slot = node->ss.ss_ScanTupleSlot;
HeapTuple tuple = festate->m_tuple_buf[festate->m_tuple_cur++];
ExecStoreTuple(tuple, slot, InvalidBuffer, false);
return slot;
}
/* reset this tuple cache */
festate->m_tuple_cur = 0;
festate->m_tuple_num = 0;
/* reset per-tuple memory context before this iter */
MemoryContextReset(festate->m_pertup_memcnxt);
MemoryContext old_memcnxt = NULL;
int ntuples = 0;
PERF_TRACE(TRACK_START, GET_GS_PROFILE_TUPLES);
while (ntuples < LOGFDW_TUPLE_CACHE_SIZE) {
if (get_next_log_record(node)) {
old_memcnxt = MemoryContextSwitchTo(festate->m_pertup_memcnxt);
festate->m_tuple_buf[ntuples++] =
heap_form_tuple(node->ss.ss_currentRelation->rd_att, festate->m_values, festate->m_isnull);
(void)MemoryContextSwitchTo(old_memcnxt);
} else {
break;
}
}
PERF_TRACE(TRACK_END, GET_GS_PROFILE_TUPLES);
if (ntuples > 0) {
/* return the first tuple in cache */
HeapTuple tuple = festate->m_tuple_buf[0];
/* update tuple cache info */
festate->m_tuple_num = ntuples;
festate->m_tuple_cur = 1;
TupleTableSlot* slot = node->ss.ss_ScanTupleSlot;
ExecStoreTuple(tuple, slot, InvalidBuffer, false);
return slot;
}
}
return NULL;
}
static void profilelog_end_fs(ForeignScanState* node)
{
logFdwPlanState* festate = (logFdwPlanState*)node->fdw_state;
if (NULL != festate) {
if (NULL != festate->dir_scan) {
end_scan_dirs(festate->dir_scan);
free_logdir_scanner(festate->dir_scan);
festate->dir_scan = NULL;
}
festate->logfile_info = NULL;
if (NULL != festate->dname) {
pfree_ext(festate->dname);
}
prflog_parse* parser = festate->log_state.profile_log.parser;
if (NULL != parser) {
pfree_ext(parser);
festate->log_state.profile_log.parser = NULL;
}
}
}
static void profilelog_rescan_fs(ForeignScanState* node)
{
if (IS_PGXC_COORDINATOR) {
return;
}
logFdwPlanState* festate = (logFdwPlanState*)node->fdw_state;
/* reset parser */
prflog_parse* parser = festate->log_state.profile_log.parser;
parser->iter_end();
/* prepare for next scan */
begin_scan_dirs(festate->dir_scan, node);
}
static void profilelog_explain_fs(ForeignScanState* node, ExplainState* es)
{
/* first call the same EXPLAIN part */
pglog_explain_fs(node, es);
}
/* ANALYZE for log foreign table */
static bool log_unsupport_analyze(Relation relation, AcquireSampleRowsFunc* func, BlockNumber* totalPageCount,
void* additionalData, bool estimate_table_rownum)
{
return false; /* unsupport feature */
}
static inline bool find_logft_option(List* opt_list, const char* opt_name)
{
ListCell* optcell = NULL;
foreach (optcell, opt_list) {
DefElem* opt = (DefElem*)lfirst(optcell);
if (0 == pg_strcasecmp(opt->defname, opt_name)) {
return true;
}
}
return false;
}
static void log_validate_table_def(Node* obj)
{
Assert(NULL != obj);
switch (nodeTag(obj)) {
case T_AlterTableStmt: {
List* cmds = ((AlterTableStmt*)obj)->cmds;
ListCell* lcmd = NULL;
foreach (lcmd, cmds) {
AlterTableCmd* cmd = (AlterTableCmd*)lfirst(lcmd);
/* forbid to do */
if (AT_AddColumn == cmd->subtype || AT_SetNotNull == cmd->subtype || AT_DropColumn == cmd->subtype ||
AT_AlterColumnType == cmd->subtype) {
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("Un-support feature for log foreign table")));
}
/* do under some limitation */
if (AT_ChangeOwner == cmd->subtype) {
if (!superuser_arg(get_role_oid(cmd->name, false))) {
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Cannot change owner to others but superuser for log foreign table")));
}
}
if (AT_GenericOptions == cmd->subtype) {
if (find_logft_option((List*)cmd->def, OPTION_LOGTYPE)) {
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Cannot change option \"%s\" of log foreign table", OPTION_LOGTYPE)));
}
}
}
break;
}
case T_CreateForeignTableStmt: {
/* MUST be real superuser for creating log foreign table */
if (!isRelSuperuser()) {
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("Must be superuser to create log foreign table")));
}
/* check write only */
CreateForeignTableStmt* log_ft = (CreateForeignTableStmt*)obj;
if (log_ft->write_only) {
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("Unsupport write only for log foreign table.")));
}
/* OPTION_LOGTYPE must be given */
bool logtype_found = log_ft->options ? find_logft_option(log_ft->options, OPTION_LOGTYPE) : false;
if (!logtype_found) {
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Option \"%s\" must be specified for log foreign table.", OPTION_LOGTYPE)));
}
break;
}
default: {
ereport(ERROR,
(errcode(ERRCODE_SYSTEM_ERROR),
errmsg("unrecognized node type(%d) during validating log foreign table definition",
(int)nodeTag(obj))));
break;
}
}
}
/*
* Validate function
*/
Datum log_fdw_validator(PG_FUNCTION_ARGS)
{
Oid contextId = PG_GETARG_OID(1);
Datum options = PG_GETARG_DATUM(0);
List* option_list = untransformRelOptions(options);
if (ForeignTableRelationId == contextId) {
valid_ft_options(option_list);
}
PG_RETURN_VOID();
}
Datum log_fdw_handler(PG_FUNCTION_ARGS)
{
/* palloc0() is called in makeNode() */
FdwRoutine* fdwroutine = makeNode(FdwRoutine);
fdwroutine->ExplainForeignScan = log_explain_foreign_scan;
fdwroutine->BeginForeignScan = log_begin_foreign_scan;
fdwroutine->IterateForeignScan = log_iterate_foreign_scan;
fdwroutine->ReScanForeignScan = log_rescan_foreign_scan;
fdwroutine->EndForeignScan = log_end_foreign_scan;
fdwroutine->GetForeignRelSize = log_get_foreign_relsize;
fdwroutine->GetForeignPaths = log_get_foreign_paths;
fdwroutine->GetForeignPlan = log_get_foreign_plan;
fdwroutine->AnalyzeForeignTable = log_unsupport_analyze;
fdwroutine->ValidateTableDef = log_validate_table_def;
PG_RETURN_POINTER(fdwroutine);
}
/* information for pg_log columns' data type */
static const Oid pglog_column_type[PGLOG_ATTR_MAX] = {TEXTOID,
TEXTOID,
TEXTOID,
BOOLOID,
TIMESTAMPTZOID,
TEXTOID,
TEXTOID,
TIMESTAMPTZOID,
TEXTOID,
TEXTOID,
TEXTOID,
TEXTOID,
TEXTOID,
TEXTOID,
INT8OID,
INT8OID,
INT8OID,
INT8OID,
TEXTOID,
TEXTOID,
TEXTOID,
TEXTOID};
/* information for profile_log columns' data type */
static const Oid prflog_column_type[PROFILELOG_ATTR_MAX] = {TEXTOID,
TEXTOID,
TEXTOID,
TIMESTAMPTZOID,
TEXTOID,
INT8OID,
INT8OID,
INT8OID,
TEXTOID,
TEXTOID,
INT4OID,
INT8OID,
INT8OID,
INT8OID};
static inline bool the_same_type_with(const TypeName* coltype, const Oid expected_type)
{
Oid atttypid = 0;
int32 atttypmod = 0;
typenameTypeIdAndMod(NULL, coltype, &atttypid, &atttypmod);
return (expected_type == atttypid);
}
static bool check_logft_schema(List* schema, const Oid* columns_type, const int attrs_num)
{
if (attrs_num == list_length(schema)) {
ListCell* cell = NULL;
int i = 0;
foreach (cell, schema) {
ColumnDef* entry = (ColumnDef*)lfirst(cell);
if (!the_same_type_with(entry->typname, columns_type[i])) {
return false; /* wrong columns' data type */
}
++i;
}
return true;
}
return false; /* wrong columns' number */
}
static bool check_pglog_ft_definition(CreateForeignTableStmt* stmt)
{
return check_logft_schema(stmt->base.tableElts, pglog_column_type, PGLOG_ATTR_MAX);
}
static bool check_prflog_definition(CreateForeignTableStmt* stmt)
{
return check_logft_schema(stmt->base.tableElts, prflog_column_type, PROFILELOG_ATTR_MAX);
}
void check_log_ft_definition(CreateForeignTableStmt* stmt)
{
if (0 != strcmp(stmt->servername, LOG_SRV)) {
return; /* not a log foreign table */
}
FdwLogType t = get_logtype_from_options(stmt->options);
bool check_ok = false;
if (FLT_PG_LOG == t) {
check_ok = check_pglog_ft_definition(stmt);
} else if (FLT_GS_PROFILE == t) {
check_ok = check_prflog_definition(stmt);
}
if (!check_ok) {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("incorrect definition of log foreign table.")));
}
}