320 lines
9.5 KiB
C++
320 lines
9.5 KiB
C++
/*
|
|
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
|
|
*
|
|
* openGauss is licensed under Mulan PSL v2.
|
|
* You can use this software according to the terms and conditions of the Mulan PSL v2.
|
|
* You may obtain a copy of Mulan PSL v2 at:
|
|
*
|
|
* http://license.coscl.org.cn/MulanPSL2
|
|
*
|
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
* See the Mulan PSL v2 for more details.
|
|
* ---------------------------------------------------------------------------------------
|
|
*
|
|
* prflog_dump.cpp
|
|
* core interface and implements of log dumping.
|
|
* split control flow, data processing and output view.
|
|
*
|
|
* IDENTIFICATION
|
|
* contrib/log_fdw/prflog_dump.cpp
|
|
*
|
|
* ---------------------------------------------------------------------------------------
|
|
*/
|
|
#include "prflog_dump.h"
|
|
#include "prflog_private.h"
|
|
#include "utils/builtins.h"
|
|
#include "utils/plog.h"
|
|
#include "utils/elog.h"
|
|
#include "utils/timestamp.h"
|
|
#include "pgtz.h"
|
|
|
|
static int deform_line_meta(PLogEntryMeta* meta, bool* isnull, Datum* values, int nvalues)
|
|
{
|
|
int idx = 0;
|
|
char time_txt[FORMATTED_TS_LEN] = {0};
|
|
|
|
/* host name */
|
|
isnull[idx] = false;
|
|
values[idx++] = t_thrd.contrib_cxt.g_log_hostname;
|
|
|
|
/* time info without year and time zone */
|
|
if (format_log_reqtime(time_txt, meta->req_time)) {
|
|
isnull[idx] = false;
|
|
values[idx++] = DirectFunctionCall3(timestamptz_in, CStringGetDatum(time_txt), (Datum)0, (Datum)-1);
|
|
} else {
|
|
isnull[idx] = true;
|
|
values[idx++] = (Datum)0;
|
|
}
|
|
|
|
/* node name */
|
|
isnull[idx] = false;
|
|
values[idx++] = t_thrd.contrib_cxt.g_log_nodename;
|
|
|
|
/* thread info */
|
|
isnull[idx] = false;
|
|
values[idx++] = Int64GetDatum(meta->tid);
|
|
|
|
/* transaction id */
|
|
isnull[idx] = false;
|
|
values[idx++] = UInt32GetDatum(meta->gxid);
|
|
|
|
/* query id */
|
|
isnull[idx] = false;
|
|
values[idx++] = Int64GetDatum(meta->gqid);
|
|
|
|
Assert(nvalues >= idx);
|
|
return idx;
|
|
}
|
|
|
|
static inline int deform_line_head(PLogEntryHead* head, bool* isnull, Datum* values, int nvalues)
|
|
{
|
|
int idx = 0;
|
|
|
|
/* profile head data */
|
|
isnull[idx] = false;
|
|
values[idx++] = PointerGetDatum(cstring_to_text(datasource_type[(int)head->data_src]));
|
|
|
|
isnull[idx] = false;
|
|
values[idx++] = PointerGetDatum(cstring_to_text(ds_require_type[(int)head->req_type]));
|
|
|
|
isnull[idx] = false;
|
|
values[idx++] = Int32GetDatum(head->ret_type);
|
|
|
|
/* ignore the total number of items within this line */
|
|
Assert(nvalues >= idx);
|
|
return idx;
|
|
}
|
|
|
|
static inline int deform_line_item(PLogEntryItem* item, bool* isnull, Datum* values, int nvalues)
|
|
{
|
|
int idx = 0;
|
|
|
|
isnull[idx] = false;
|
|
values[idx++] = UInt32GetDatum(item->sum_count);
|
|
|
|
isnull[idx] = false;
|
|
values[idx++] = UInt32GetDatum(item->sum_size);
|
|
|
|
isnull[idx] = false;
|
|
values[idx++] = UInt32GetDatum(item->sum_usec);
|
|
|
|
Assert(nvalues >= idx);
|
|
return idx;
|
|
}
|
|
|
|
static void deform_record_to_values_md(bool* isnull, Datum* values, int nvalues, int items_cur, void* entry)
|
|
{
|
|
PLogBasicEntry* md = (PLogBasicEntry*)entry;
|
|
|
|
/* multi items is written into within a single entry.
|
|
* now we dump them into seperated lines in order to
|
|
* batch-insert into databases.
|
|
*/
|
|
int idx = deform_line_meta(&md->basic.meta, isnull, values, nvalues);
|
|
idx += deform_line_head(&md->basic.head, isnull + idx, values + idx, (nvalues - idx));
|
|
idx += deform_line_item(&md->basic.item[items_cur], isnull + idx, values + idx, (nvalues - idx));
|
|
Assert(nvalues == idx);
|
|
}
|
|
|
|
void prflog_parse::init(void)
|
|
{
|
|
m_host_name = (Datum)0;
|
|
m_node_name = (Datum)0;
|
|
reset();
|
|
}
|
|
|
|
void prflog_parse::reset(void)
|
|
{
|
|
m_log_version = 0;
|
|
m_logtype = 0;
|
|
m_error_no = 0;
|
|
m_error_ds = -1;
|
|
m_items_num = 0;
|
|
m_items_cur = 0;
|
|
m_file_offset = 0;
|
|
m_discard_data = false;
|
|
}
|
|
|
|
void prflog_parse::set(const int logtype)
|
|
{
|
|
/* set log type */
|
|
Assert(LOG_TYPE_ELOG != logtype);
|
|
m_logtype = logtype;
|
|
}
|
|
|
|
int prflog_parse::iter_begin(logdata_buf* datbuf)
|
|
{
|
|
Assert(0 == datbuf->m_buf_cur);
|
|
if ((0 == datbuf->m_buf_len) || (datbuf->m_buf_len < (int)sizeof(LogFileHeader))) {
|
|
m_error_no = GSLOG_ERRNO(FILESIZE_TOO_SMALL);
|
|
return m_error_no;
|
|
}
|
|
|
|
int off = 0;
|
|
|
|
/* the first magic data */
|
|
Assert(datbuf->m_buf_len >= (int)sizeof(LogFileHeader));
|
|
unsigned long fst_magic = *(unsigned long*)(datbuf->m_buf + off);
|
|
if (LOG_MAGICNUM != fst_magic) {
|
|
m_error_no = GSLOG_ERRNO(FILEDATA_BAD_MAGIC1);
|
|
return m_error_no;
|
|
}
|
|
off += sizeof(fst_magic);
|
|
|
|
/* version info */
|
|
uint16 version = *(uint16*)(datbuf->m_buf + off);
|
|
if (version > 0 && version <= PROFILE_LOG_VERSION) {
|
|
m_log_version = version - 1;
|
|
} else {
|
|
/* remember wrong version for error report */
|
|
m_log_version = version;
|
|
m_error_no = GSLOG_ERRNO(FILEDATA_INVALID_VER);
|
|
return m_error_no;
|
|
}
|
|
off += sizeof(version);
|
|
|
|
/* host name length */
|
|
uint8 hostname_len = *(uint8*)(datbuf->m_buf + off);
|
|
off += sizeof(hostname_len);
|
|
|
|
/* node name length */
|
|
uint8 nodename_len = *(uint8*)(datbuf->m_buf + off);
|
|
off += sizeof(nodename_len);
|
|
|
|
/* time zone length */
|
|
uint16 timezone_len = *(uint16*)(datbuf->m_buf + off);
|
|
off += sizeof(timezone_len);
|
|
|
|
/* compute the total length of file head */
|
|
size_t hd_total_len = sizeof(LogFileHeader) + hostname_len + nodename_len + timezone_len;
|
|
hd_total_len = MAXALIGN(hd_total_len);
|
|
if (datbuf->m_buf_len < (int)hd_total_len) {
|
|
m_error_no = GSLOG_ERRNO(FILEHEAD_TOO_BIG);
|
|
return m_error_no;
|
|
}
|
|
|
|
/* host name */
|
|
Assert('\0' == datbuf->m_buf[off + hostname_len - 1]);
|
|
m_host_name = PointerGetDatum(cstring_to_text(datbuf->m_buf + off));
|
|
off += hostname_len;
|
|
/* set global host name */
|
|
t_thrd.contrib_cxt.g_log_hostname = m_host_name;
|
|
|
|
/* node name */
|
|
Assert('\0' == datbuf->m_buf[off + nodename_len - 1]);
|
|
m_node_name = PointerGetDatum(cstring_to_text(datbuf->m_buf + off));
|
|
off += nodename_len;
|
|
/* set global node name */
|
|
t_thrd.contrib_cxt.g_log_nodename = m_node_name;
|
|
|
|
/*
|
|
* time zone which is a C string.
|
|
* we don't copy and remember this timezone info, because timezone
|
|
* info about this server backend will be used. this is different
|
|
* from gs_log tool.
|
|
*/
|
|
off += timezone_len;
|
|
Assert('\0' == *(datbuf->m_buf + off - 1));
|
|
|
|
/* last magic data */
|
|
Assert(hd_total_len - off >= (int)sizeof(fst_magic));
|
|
unsigned long lst_magic = *(unsigned long*)(datbuf->m_buf + hd_total_len - sizeof(fst_magic));
|
|
if (LOG_MAGICNUM != lst_magic) {
|
|
/* remember the offset when error happens */
|
|
m_file_offset = hd_total_len - sizeof(unsigned long);
|
|
m_error_no = GSLOG_ERRNO(FILEDATA_BAD_MAGIC2);
|
|
return m_error_no;
|
|
}
|
|
|
|
/* update current file offset */
|
|
m_file_offset = hd_total_len;
|
|
|
|
/* update logdata_buf info */
|
|
datbuf->m_buf_cur = hd_total_len;
|
|
return 0;
|
|
}
|
|
|
|
bool prflog_parse::iter_next(logdata_buf* datbuf, bool* isnull, Datum* values, int nvalues)
|
|
{
|
|
PLogBasicEntry* md = (PLogBasicEntry*)(datbuf->m_buf + datbuf->m_buf_cur);
|
|
|
|
/*
|
|
* deal with 64-bit debug query id in the log files.
|
|
* If we encounter 32-bit debug query id in old-version log files, just skip.
|
|
*/
|
|
if (md->basic.meta.plog_magic != PLOG_ENTRY_MAGICNUM) {
|
|
return false;
|
|
}
|
|
|
|
if (have_unhandle_items()) {
|
|
/* ok, return next item within this record */
|
|
deform_record_to_values_md(isnull, values, nvalues, m_items_cur, (void*)md);
|
|
advance_items_cursor(datbuf);
|
|
return true;
|
|
}
|
|
Assert(0 == m_items_num && 0 == m_items_cur);
|
|
|
|
if (datbuf->unhandled_data_size() >= (int)sizeof(PLogEntry)) {
|
|
const int msglen = get_msg_len_with_check(datbuf);
|
|
if (datbuf->unhandled_data_size() >= msglen) {
|
|
/* there is a completed record */
|
|
m_items_num = md->basic.head.item_num;
|
|
Assert(m_items_num > 0);
|
|
m_items_cur = 0;
|
|
|
|
/* ok, return the first item with this completed record */
|
|
deform_record_to_values_md(isnull, values, nvalues, m_items_cur, (void*)md);
|
|
advance_items_cursor(datbuf);
|
|
return true;
|
|
}
|
|
}
|
|
return false; /* continue to read data from log file */
|
|
}
|
|
|
|
void prflog_parse::iter_end(void)
|
|
{
|
|
char* ptr = DatumGetPointer(m_host_name);
|
|
if (PointerIsValid(ptr)) {
|
|
pfree(ptr);
|
|
m_host_name = (Datum)0;
|
|
}
|
|
|
|
ptr = DatumGetPointer(m_node_name);
|
|
if (ptr != NULL) {
|
|
pfree(ptr);
|
|
m_node_name = (Datum)0;
|
|
}
|
|
reset();
|
|
}
|
|
|
|
inline int prflog_parse::get_msg_len_with_check(logdata_buf* datbuf)
|
|
{
|
|
char* start = datbuf->m_buf + datbuf->m_buf_cur;
|
|
PLogEntry* entry = (PLogEntry*)start;
|
|
char ds = entry->head.data_src;
|
|
if (ds >= 0 && ds < DS_VALID_NUM) {
|
|
return (*plog_msglen_tbl[(int)ds])(entry);
|
|
}
|
|
|
|
/* unsupported data source */
|
|
m_file_offset += datbuf->m_buf_cur;
|
|
elog(ERROR, "unexcepted data source(%d) in log, offset %ld, ", ds, m_file_offset);
|
|
return 0;
|
|
}
|
|
|
|
inline void prflog_parse::advance_items_cursor(logdata_buf* datbuf)
|
|
{
|
|
Assert(m_items_cur < m_items_num);
|
|
if ((++m_items_cur) == m_items_num) {
|
|
/* skip this handled record */
|
|
const int msglen = get_msg_len_with_check(datbuf);
|
|
datbuf->m_buf_cur += msglen;
|
|
|
|
/* reset items infor */
|
|
m_items_num = 0;
|
|
m_items_cur = 0;
|
|
}
|
|
}
|