349 lines
11 KiB
C
349 lines
11 KiB
C
/*
|
|
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
|
|
*
|
|
* openGauss is licensed under Mulan PSL v2.
|
|
* You can use this software according to the terms and conditions of the Mulan PSL v2.
|
|
* You may obtain a copy of Mulan PSL v2 at:
|
|
*
|
|
* http://license.coscl.org.cn/MulanPSL2
|
|
*
|
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
* See the Mulan PSL v2 for more details.
|
|
* ---------------------------------------------------------------------------------------
|
|
*
|
|
* log_fdw_private.h
|
|
*
|
|
*
|
|
*
|
|
* IDENTIFICATION
|
|
* contrib/log_fdw/log_fdw_private.h
|
|
*
|
|
* ---------------------------------------------------------------------------------------
|
|
*/
|
|
#ifndef _LOG_FDW_PRIVATE_H_
|
|
#define _LOG_FDW_PRIVATE_H_
|
|
|
|
#include "postgres.h"
|
|
#include "knl/knl_variable.h"
|
|
|
|
#include <string.h>
|
|
#include <time.h>
|
|
|
|
#include "log_fdw.h"
|
|
#include "prflog_dump.h"
|
|
#include "funcapi.h"
|
|
#include "access/reloptions.h"
|
|
#include "catalog/pg_foreign_server.h"
|
|
#include "catalog/pg_foreign_table.h"
|
|
#include "catalog/pg_user_mapping.h"
|
|
#include "catalog/pg_type.h"
|
|
#include "commands/defrem.h"
|
|
#include "commands/explain.h"
|
|
#include "foreign/fdwapi.h"
|
|
#include "foreign/foreign.h"
|
|
#include "optimizer/cost.h"
|
|
#include "optimizer/pathnode.h"
|
|
#include "optimizer/planmain.h"
|
|
#include "optimizer/restrictinfo.h"
|
|
#include "utils/memutils.h"
|
|
#include "utils/palloc.h"
|
|
#include "utils/builtins.h"
|
|
#include "utils/lsyscache.h"
|
|
#include "utils/relcache.h"
|
|
#include "utils/syscache.h"
|
|
#include "storage/lock/lock.h"
|
|
#include "miscadmin.h"
|
|
#include "zlib.h"
|
|
#include "unzip.h"
|
|
#include "ioapi.h"
|
|
#include "pgtime.h"
|
|
|
|
/* attr position for pg_log attribute, starting from 0 */
|
|
#define PGLOG_ATTR_DIRNAME 0
|
|
#define PGLOG_ATTR_FILENAME 1
|
|
#define PGLOG_ATTR_HOSTNAME 2
|
|
#define PGLOG_ATTR_MATCHED 3
|
|
#define PGLOG_ATTR_LOGTIME 4
|
|
#define PGLOG_ATTR_NODENAME 5
|
|
#define PGLOG_ATTR_APPNAME 6
|
|
#define PGLOG_ATTR_SESSION_START 7
|
|
#define PGLOG_ATTR_SESSION_ID 8
|
|
#define PGLOG_ATTR_DBNAME 9
|
|
#define PGLOG_ATTR_REMOTE 10
|
|
#define PGLOG_ATTR_CMDTAG 11
|
|
#define PGLOG_ATTR_USERNAME 12
|
|
#define PGLOG_ATTR_VIRTUAL_XID 13
|
|
#define PGLOG_ATTR_PID 14
|
|
#define PGLOG_ATTR_LINENO 15
|
|
#define PGLOG_ATTR_XID 16
|
|
#define PGLOG_ATTR_QID 17
|
|
#define PGLOG_ATTR_ECODE 18
|
|
#define PGLOG_ATTR_MODNAME 19
|
|
#define PGLOG_ATTR_LEVEL 20
|
|
#define PGLOG_ATTR_MESSAGE 21
|
|
#define PGLOG_ATTR_MAX 22
|
|
|
|
#define PGLOG_ATTRNAME_DIRNAME "dirname"
|
|
#define PGLOG_ATTRNAME_FILENAME "filename"
|
|
#define PGLOG_ATTRNAME_HOSTNAME "hostname"
|
|
#define PGLOG_ATTRNAME_MATCHED "match"
|
|
#define PGLOG_ATTRNAME_LOGTIME "logtime"
|
|
#define PGLOG_ATTRNAME_NODENAME "nodename"
|
|
#define PGLOG_ATTRNAME_APPNAME "app"
|
|
#define PGLOG_ATTRNAME_SESSION_START "session_start"
|
|
#define PGLOG_ATTRNAME_SESSION_ID "session_id"
|
|
#define PGLOG_ATTRNAME_DBNAME "db"
|
|
#define PGLOG_ATTRNAME_REMOTE "remote"
|
|
#define PGLOG_ATTRNAME_CMDTAG "cmdtag"
|
|
#define PGLOG_ATTRNAME_USERNAME "username"
|
|
#define PGLOG_ATTRNAME_VIRTUAL_XID "vxid"
|
|
#define PGLOG_ATTRNAME_PID "pid"
|
|
#define PGLOG_ATTRNAME_LINENO "lineno"
|
|
#define PGLOG_ATTRNAME_XID "xid"
|
|
#define PGLOG_ATTRNAME_QID "qid"
|
|
#define PGLOG_ATTRNAME_ECODE "ecode"
|
|
#define PGLOG_ATTRNAME_MODNAME "mod"
|
|
#define PGLOG_ATTRNAME_LEVEL "level"
|
|
#define PGLOG_ATTRNAME_MESSAGE "msg"
|
|
|
|
#define PROFILELOG_ATTR_DIRNAME PGLOG_ATTR_DIRNAME
|
|
#define PROFILELOG_ATTR_FILENAME PGLOG_ATTR_FILENAME
|
|
#define PROFILELOG_ATTR_HOSTNAME PGLOG_ATTR_HOSTNAME
|
|
#define PROFILELOG_ATTR_LOGTIME 3
|
|
#define PROFILELOG_ATTR_NODENAME 4
|
|
#define PROFILELOG_ATTR_THREAD 5
|
|
#define PROFILELOG_ATTR_XID 6
|
|
#define PROFILELOG_ATTR_QID 7
|
|
#define PROFILELOG_ATTR_REQSRC 8
|
|
#define PROFILELOG_ATTR_REQTYPE 9
|
|
#define PROFILELOG_ATTR_REQOK 10
|
|
#define PROFILELOG_ATTR_REQCOUNT 11
|
|
#define PROFILELOG_ATTR_REQSIZE 12
|
|
#define PROFILELOG_ATTR_REQUSEC 13
|
|
#define PROFILELOG_ATTR_MAX 14
|
|
|
|
#define LOGFDW_MAX_INT32 (2147483600)
|
|
#define LOGFDW_LOCAL_IP_LEN 20
|
|
#define LOGFDW_HOSTNAME_MAXLEN 256
|
|
|
|
#define LOGFDW_TUPLE_CACHE_SIZE (int)(128)
|
|
|
|
/* dirname and filename before */
|
|
#define PRFLOG_SYSATTR_NUM 2
|
|
|
|
/* max size of one log line */
|
|
#define MAX_LINE_LEN 8192
|
|
|
|
enum FdwLogType { FLT_PG_LOG = 0, FLT_GS_PROFILE, FLT_UNKNOWN };
|
|
|
|
/* reading buffer size for log data */
|
|
#define LOGFDW_BUFFER_SZ (1024 * 1024)
|
|
|
|
/* info tag transformed from CN to datanodes */
|
|
#define logft_opt_masters_info "masters_info"
|
|
|
|
/* options for log foreign tables */
|
|
#define OPTION_LOGTYPE "logtype"
|
|
#define OPTION_MASTER_ONLY "master_only"
|
|
#define OPTION_LATEST_FILES "latest_files"
|
|
|
|
typedef struct log_file_info {
|
|
char* name; /* log file name */
|
|
time_t tm_create; /* create time */
|
|
time_t tm_last_modify; /* last modify time */
|
|
} log_file_info;
|
|
|
|
typedef struct {
|
|
NameData ip; /* ip adress */
|
|
} hkey_ip;
|
|
|
|
typedef enum unreachable_host_type {
|
|
UHT_OK, /* reachable */
|
|
UHT_DEPLOY_LIMIT, /* because deploy limitation */
|
|
UHT_NO_MASTER, /* because master switch-over */
|
|
UHT_UNKNOWN
|
|
} unr_host_type;
|
|
|
|
typedef struct {
|
|
hkey_ip host_ip;
|
|
List* nodenames;
|
|
unr_host_type reachable; /* whether this host is accessed by master datanode ? */
|
|
bool has_datanode; /* is there at least one datanode on this host ? */
|
|
} hentry_reachable_host;
|
|
|
|
typedef bool (*dirname_match)(const char*);
|
|
typedef dirname_match fname_match;
|
|
typedef int (*cmp_func)(const void*, const void*);
|
|
|
|
struct ld_scanner {
|
|
MemoryContext m_memcnxt;
|
|
char* top_dir; /* len < 1024, top directory */
|
|
List* dir_list; /* total directories to scan */
|
|
List* dir_list_copy; /* for rescan support */
|
|
char* cur_dir; /* len < 1024, sub-directory under top_dir */
|
|
log_file_info* log_file; /* len < 1024, log file under cur_dir */
|
|
List* log_file_list; /* list of log files */
|
|
dirname_match dir_cb; /* callback for dir name */
|
|
fname_match fname_cb; /* callback for file name */
|
|
int m_latest_files_num; /* apply for each directory */
|
|
};
|
|
typedef struct ld_scanner* logdir_scanner;
|
|
|
|
typedef struct {
|
|
hkey_ip host_ip;
|
|
List* nodenames;
|
|
} hvalue_nodes;
|
|
|
|
typedef void* (*logfile_open)(const char* logfile);
|
|
typedef int (*logfile_read)(void* reader, char* buf, int bufsize);
|
|
typedef void (*logfile_close)(void* reader);
|
|
|
|
typedef struct pglogPlanState {
|
|
Datum regrex;
|
|
|
|
/*
|
|
* in the order of column definations of log foreign table
|
|
* see file Code/contrib/log_fdw/log_fdw--1.0.sql
|
|
*/
|
|
FmgrInfo allattr_fmgrinfo[PGLOG_ATTR_MAX];
|
|
int32 allattr_typmod[PGLOG_ATTR_MAX];
|
|
Oid allattr_typioparam[PGLOG_ATTR_MAX];
|
|
|
|
/* attribute map from real log data to log relation */
|
|
int attmap[PGLOG_ATTR_MAX];
|
|
|
|
/*
|
|
* the real number of attributes in real log data
|
|
* now it's determinated by GUC Log_line_prefix.
|
|
*/
|
|
int nattrs;
|
|
Datum real_values[PGLOG_ATTR_MAX];
|
|
bool real_isnull[PGLOG_ATTR_MAX];
|
|
} pglogPlanState;
|
|
|
|
typedef struct gsprofilePlanState {
|
|
prflog_parse* parser; /* parse profile log */
|
|
} gsprofilePlanState;
|
|
|
|
typedef struct logFdwPlanState {
|
|
/*
|
|
* private memory manager.
|
|
*/
|
|
|
|
/* all the members should be under this private memory context. */
|
|
MemoryContext m_memcnxt;
|
|
|
|
/* per tuple memory context */
|
|
MemoryContext m_pertup_memcnxt;
|
|
|
|
/* functions for different log type */
|
|
GetForeignRelSize_function get_ftrel_size;
|
|
GetForeignPaths_function get_foreign_paths;
|
|
GetForeignPlan_function get_foreign_plan;
|
|
BeginForeignScan_function begin_foreign_scan;
|
|
IterateForeignScan_function iterate_foreign_scan;
|
|
ReScanForeignScan_function rescan_foreign_scan;
|
|
EndForeignScan_function end_foreign_scan;
|
|
ExplainForeignScan_function explain_foreign_scan;
|
|
|
|
/* common data type */
|
|
|
|
/* datum to form a tuple */
|
|
bool* m_isnull;
|
|
Datum* m_values;
|
|
|
|
union {
|
|
pglogPlanState pg_log;
|
|
gsprofilePlanState profile_log;
|
|
} log_state;
|
|
|
|
/* members for log dir */
|
|
text* dname; /* log directory name */
|
|
logdir_scanner dir_scan;
|
|
|
|
/* members for log file */
|
|
log_file_info* logfile_info; /* file name with absolute path */
|
|
text* filename; /* only file name */
|
|
void* m_vfd;
|
|
logfile_open m_open;
|
|
logfile_read m_read;
|
|
logfile_close m_close;
|
|
|
|
/* file data buffer */
|
|
logdata_buf m_log_buf;
|
|
int64 m_lineno_of_file; /* line no in each file */
|
|
|
|
/* only care N latest files, not the total set */
|
|
int m_latest_files;
|
|
|
|
/* needn't to read data from log file */
|
|
bool m_need_read_logdata;
|
|
|
|
/* just care log data of master DN */
|
|
bool m_master_only;
|
|
|
|
/* whether to check host name/dir name/log name/log time and do filter */
|
|
bool m_need_check_hostname;
|
|
bool m_need_check_dirname;
|
|
bool m_need_check_logname;
|
|
bool m_need_check_logtime;
|
|
|
|
/* EXPLAIN analyze/performance */
|
|
bool m_refuted_by_hostname; /* whether to ignore my host */
|
|
bool m_need_timing; /* need timing */
|
|
uint32 m_total_dirname_num; /* dirname info */
|
|
uint32 m_refuted_dirname_num;
|
|
uint32 m_total_logfile_num; /* log file info */
|
|
uint32 m_refuted_logfile_num_by_name;
|
|
uint32 m_refuted_logfile_num_by_time;
|
|
uint32 m_incompleted_files;
|
|
time_t m_last_modify_tm_maxval; /* always set now */
|
|
int m_plan_node_id; /* plan node id */
|
|
|
|
Var m_tmp_hostname_var;
|
|
Var m_tmp_dirname_var;
|
|
Var m_tmp_logname_var;
|
|
Var m_tmp_logtime_var;
|
|
Datum m_tmp_hostname_text;
|
|
|
|
/* tuples cache */
|
|
HeapTuple m_tuple_buf[LOGFDW_TUPLE_CACHE_SIZE];
|
|
int m_tuple_num;
|
|
int m_tuple_cur;
|
|
|
|
inline char* get_next_line(void)
|
|
{
|
|
Assert(m_log_buf.has_unhandled_data());
|
|
char* newline =
|
|
(char*)memchr((m_log_buf.m_buf + m_log_buf.m_buf_cur), '\n', (m_log_buf.m_buf_len - m_log_buf.m_buf_cur));
|
|
if (NULL != newline) {
|
|
*newline = '\0'; /* c string */
|
|
char* line = (m_log_buf.m_buf + m_log_buf.m_buf_cur); /* this line */
|
|
m_log_buf.m_buf_cur += (newline - line + 1); /* move to next line position */
|
|
return line;
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
inline bool continue_to_load_logdata(void)
|
|
{
|
|
Assert(m_log_buf.m_buf_len < m_log_buf.m_buf_maxlen);
|
|
int read_size =
|
|
m_read(m_vfd, (m_log_buf.m_buf + m_log_buf.m_buf_len), m_log_buf.m_buf_maxlen - m_log_buf.m_buf_len);
|
|
if (read_size > 0) {
|
|
m_log_buf.m_buf_len += read_size;
|
|
return true;
|
|
}
|
|
return false; /* end of file */
|
|
}
|
|
} logFdwPlanState;
|
|
|
|
struct logFdwOption {
|
|
const char* optname;
|
|
Oid optcontext; /* Oid of catalog in which option may appear */
|
|
};
|
|
|
|
#endif /* _LOG_FDW_PRIVATE_H_ */
|