openGauss-server/contrib/file_fdw/file_fdw.cpp

917 lines
30 KiB
C++

/*-------------------------------------------------------------------------
*
* file_fdw.c
* foreign-data wrapper for server-side flat files.
*
* Copyright (c) 2010-2012, PostgreSQL Global Development Group
*
* IDENTIFICATION
* contrib/file_fdw/file_fdw.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "knl/knl_variable.h"
#include "access/reloptions.h"
#include "catalog/pg_foreign_table.h"
#include "commands/copy.h"
#include "commands/defrem.h"
#include "commands/explain.h"
#include "commands/vacuum.h"
#include "commands/tablecmds.h"
#include "foreign/fdwapi.h"
#include "foreign/foreign.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "nodes/nodes.h"
#include "optimizer/cost.h"
#include "optimizer/pathnode.h"
#include "optimizer/planmain.h"
#include "optimizer/restrictinfo.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/rel_gs.h"
#include "catalog/pg_user_mapping.h"
/*
* Describes the valid options for objects that use this wrapper.
*/
struct FileFdwOption {
const char* optname;
Oid optcontext; /* Oid of catalog in which option may appear */
};
/*
* Valid options for file_fdw.
* These options are based on the options for COPY FROM command.
* But note that force_not_null is handled as a boolean option attached to
* each column, not as a table option.
*
* Note: If you are adding new option for user mapping, you need to modify
* fileGetOptions(), which currently doesn't bother to look at user mappings.
*/
static const struct FileFdwOption valid_options[] = {
/* File options */
{"filename", ForeignTableRelationId},
/* Format options */
/* oids option is not supported */
{"format", ForeignTableRelationId},
{"header", ForeignTableRelationId},
{"delimiter", ForeignTableRelationId},
{"quote", ForeignTableRelationId},
{"escape", ForeignTableRelationId},
{"null", ForeignTableRelationId},
{"encoding", ForeignTableRelationId},
{"force_not_null", AttributeRelationId},
/*
* force_quote is not supported by file_fdw because it's for COPY TO.
*/
/* Sentinel */
{NULL, InvalidOid}};
/*
* FDW-specific information for RelOptInfo.fdw_private.
*/
typedef struct FileFdwPlanState {
char* filename; /* file to read */
List* options; /* merged COPY options, excluding filename */
BlockNumber pages; /* estimate of file's physical size */
double ntuples; /* estimate of number of rows in file */
} FileFdwPlanState;
/*
* FDW-specific information for ForeignScanState.fdw_state.
*/
typedef struct FileFdwExecutionState {
char* filename; /* file to read */
List* options; /* merged COPY options, excluding filename */
CopyState cstate; /* state of reading file */
} FileFdwExecutionState;
/*
* SQL functions
*/
extern "C" Datum file_fdw_handler(PG_FUNCTION_ARGS);
extern "C" Datum file_fdw_validator(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(file_fdw_handler);
PG_FUNCTION_INFO_V1(file_fdw_validator);
/*
* FDW callback routines
*/
static void fileGetForeignRelSize(PlannerInfo* root, RelOptInfo* baserel, Oid foreigntableid);
static void fileGetForeignPaths(PlannerInfo* root, RelOptInfo* baserel, Oid foreigntableid);
static ForeignScan* fileGetForeignPlan(PlannerInfo* root, RelOptInfo* baserel, Oid foreigntableid,
ForeignPath* best_path, List* tlist, List* scan_clauses);
static void fileExplainForeignScan(ForeignScanState* node, ExplainState* es);
static void fileBeginForeignScan(ForeignScanState* node, int eflags);
static TupleTableSlot* fileIterateForeignScan(ForeignScanState* node);
static void fileReScanForeignScan(ForeignScanState* node);
static void fileEndForeignScan(ForeignScanState* node);
static bool fileAnalyzeForeignTable(Relation relation, AcquireSampleRowsFunc* func, BlockNumber* totalpages,
void* additionalData = 0, bool estimate_table_rownum = false);
/*
* Helper functions
*/
static bool is_valid_option(const char* option, Oid context);
static void fileGetOptions(Oid foreigntableid, char** filename, List** other_options);
static List* get_file_fdw_attribute_options(Oid relid);
static void estimate_size(PlannerInfo* root, RelOptInfo* baserel, FileFdwPlanState* fdw_private);
static void estimate_costs(
PlannerInfo* root, RelOptInfo* baserel, FileFdwPlanState* fdw_private, Cost* startup_cost, Cost* total_cost);
static int file_acquire_sample_rows(Relation onerel, int elevel, HeapTuple* rows, int targrows, double* totalrows,
double* totaldeadrows, void* additionalData = NULL, bool estimate_table_rownum = false);
static void fileValidateTableDef(Node* Obj);
/*
* Foreign-data wrapper handler function: return a struct with pointers
* to my callback routines.
*/
Datum file_fdw_handler(PG_FUNCTION_ARGS)
{
FdwRoutine* fdwroutine = makeNode(FdwRoutine);
fdwroutine->GetForeignRelSize = fileGetForeignRelSize;
fdwroutine->GetForeignPaths = fileGetForeignPaths;
fdwroutine->GetForeignPlan = fileGetForeignPlan;
fdwroutine->ExplainForeignScan = fileExplainForeignScan;
fdwroutine->BeginForeignScan = fileBeginForeignScan;
fdwroutine->IterateForeignScan = fileIterateForeignScan;
fdwroutine->ReScanForeignScan = fileReScanForeignScan;
fdwroutine->EndForeignScan = fileEndForeignScan;
fdwroutine->AnalyzeForeignTable = fileAnalyzeForeignTable;
fdwroutine->ValidateTableDef = fileValidateTableDef;
/* @hdfs
* PartitionTblProcess and BuildRuntimePredicate are only used for hdfs_fdw now, so set null here.
*/
fdwroutine->PartitionTblProcess = NULL;
fdwroutine->BuildRuntimePredicate = NULL;
PG_RETURN_POINTER(fdwroutine);
}
/*
* Validate the generic options given to a FOREIGN DATA WRAPPER, SERVER,
* USER MAPPING or FOREIGN TABLE that uses file_fdw.
*
* Raise an ERROR if the option or its value is considered invalid.
*/
Datum file_fdw_validator(PG_FUNCTION_ARGS)
{
List* options_list = untransformRelOptions(PG_GETARG_DATUM(0));
Oid catalog = PG_GETARG_OID(1);
char* filename = NULL;
DefElem* force_not_null = NULL;
List* other_options = NIL;
ListCell* cell = NULL;
if (catalog == UserMappingRelationId) {
ereport(
ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("file_fdw doesn't support in USER MAPPING.")));
}
/*
* Only superusers are allowed to set options of a file_fdw foreign table.
* This is because the filename is one of those options, and we don't want
* non-superusers to be able to determine which file gets read.
*
* Putting this sort of permissions check in a validator is a bit of a
* crock, but there doesn't seem to be any other place that can enforce
* the check more cleanly.
*
* Note that the valid_options[] array disallows setting filename at any
* options level other than foreign table --- otherwise there'd still be a
* security hole.
*/
if (catalog == ForeignTableRelationId && !superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("only superuser can change options of a file_fdw foreign table")));
/*
* Check that only options supported by file_fdw, and allowed for the
* current object type, are given.
*/
foreach (cell, options_list) {
DefElem* def = (DefElem*)lfirst(cell);
if (!is_valid_option(def->defname, catalog)) {
const struct FileFdwOption* opt = NULL;
StringInfoData buf;
/*
* Unknown option specified, complain about it. Provide a hint
* with list of valid options for the object.
*/
initStringInfo(&buf);
for (opt = valid_options; opt->optname; opt++) {
if (catalog == opt->optcontext)
appendStringInfo(&buf, "%s%s", (buf.len > 0) ? ", " : "", opt->optname);
}
ereport(ERROR,
(errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
errmsg("invalid option \"%s\"", def->defname),
buf.len > 0 ? errhint("Valid options in this context are: %s", buf.data)
: errhint("There are no valid options in this context.")));
}
/*
* Separate out filename and force_not_null, since ProcessCopyOptions
* won't accept them. (force_not_null only comes in a boolean
* per-column flavor here.)
*/
if (strcmp(def->defname, "filename") == 0) {
if (filename)
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options")));
filename = defGetString(def);
} else if (strcmp(def->defname, "force_not_null") == 0) {
if (force_not_null)
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options")));
force_not_null = def;
/* Don't care what the value is, as long as it's a legal boolean */
(void)defGetBoolean(def);
} else if (strcmp(def->defname, "format") == 0) {
char* fmt = defGetString(def);
if (strcasecmp(fmt, "fixed") == 0) {
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("file_fdw doesn't support fixed option in format")));
}
other_options = lappend(other_options, def);
} else {
other_options = lappend(other_options, def);
}
}
/*
* Now apply the core COPY code's validation logic for more checks.
*/
ProcessCopyOptions(NULL, true, other_options);
/*
* Filename option is required for file_fdw foreign tables.
*/
if (catalog == ForeignTableRelationId && filename == NULL)
ereport(ERROR,
(errcode(ERRCODE_FDW_DYNAMIC_PARAMETER_VALUE_NEEDED),
errmsg("filename is required for file_fdw foreign tables")));
PG_RETURN_VOID();
}
/*
* Check if the provided option is one of the valid options.
* context is the Oid of the catalog holding the object the option is for.
*/
static bool is_valid_option(const char* option, Oid context)
{
const struct FileFdwOption* opt;
for (opt = valid_options; opt->optname; opt++) {
if (context == opt->optcontext && strcmp(opt->optname, option) == 0)
return true;
}
return false;
}
/*
* Fetch the options for a file_fdw foreign table.
*
* We have to separate out "filename" from the other options because
* it must not appear in the options list passed to the core COPY code.
*/
static void fileGetOptions(Oid foreigntableid, char** filename, List** other_options)
{
ForeignTable* table = NULL;
ForeignServer* server = NULL;
ForeignDataWrapper* wrapper = NULL;
List* options = NIL;
ListCell* lc = NULL;
ListCell* prev = NULL;
/*
* Extract options from FDW objects. We ignore user mappings because
* file_fdw doesn't have any options that can be specified there.
*
* (XXX Actually, given the current contents of valid_options[], there's
* no point in examining anything except the foreign table's own options.
* Simplify?)
*/
table = GetForeignTable(foreigntableid);
Assert(NULL != table);
server = GetForeignServer(table->serverid);
Assert(NULL != server);
wrapper = GetForeignDataWrapper(server->fdwid);
Assert(NULL != wrapper);
options = NIL;
options = list_concat(options, wrapper->options);
options = list_concat(options, server->options);
options = list_concat(options, table->options);
options = list_concat(options, get_file_fdw_attribute_options(foreigntableid));
/*
* Separate out the filename.
*/
*filename = NULL;
prev = NULL;
foreach (lc, options) {
DefElem* def = (DefElem*)lfirst(lc);
if (strcmp(def->defname, "filename") == 0) {
*filename = defGetString(def);
options = list_delete_cell(options, lc, prev);
break;
}
prev = lc;
}
/*
* The validator should have checked that a filename was included in the
* options, but check again, just in case.
*/
if (*filename == NULL)
elog(ERROR, "filename is required for file_fdw foreign tables");
*other_options = options;
}
/*
* Retrieve per-column generic options from pg_attribute and construct a list
* of DefElems representing them.
*
* At the moment we only have "force_not_null", which should be combined into
* a single DefElem listing all such columns, since that's what COPY expects.
*/
static List* get_file_fdw_attribute_options(Oid relid)
{
Relation rel;
TupleDesc tupleDesc;
AttrNumber natts;
AttrNumber attnum;
List* fnncolumns = NIL;
rel = heap_open(relid, AccessShareLock);
tupleDesc = RelationGetDescr(rel);
natts = tupleDesc->natts;
/* Retrieve FDW options for all user-defined attributes. */
for (attnum = 1; attnum <= natts; attnum++) {
Form_pg_attribute attr = tupleDesc->attrs[attnum - 1];
List* options = NIL;
ListCell* lc = NULL;
/* Skip dropped attributes. */
if (attr->attisdropped)
continue;
options = GetForeignColumnOptions(relid, attnum);
foreach (lc, options) {
DefElem* def = (DefElem*)lfirst(lc);
if (strcmp(def->defname, "force_not_null") == 0) {
if (defGetBoolean(def)) {
char* attname = pstrdup(NameStr(attr->attname));
fnncolumns = lappend(fnncolumns, makeString(attname));
}
}
/* maybe in future handle other options here */
}
}
heap_close(rel, AccessShareLock);
/* Return DefElem only when some column(s) have force_not_null */
if (fnncolumns != NIL)
return list_make1(makeDefElem("force_not_null", (Node*)fnncolumns));
else
return NIL;
}
/*
* fileGetForeignRelSize
* Obtain relation size estimates for a foreign table
*/
static void fileGetForeignRelSize(PlannerInfo* root, RelOptInfo* baserel, Oid foreigntableid)
{
FileFdwPlanState* fdw_private = NULL;
/*
* Fetch options. We only need filename at this point, but we might as
* well get everything and not need to re-fetch it later in planning.
*/
fdw_private = (FileFdwPlanState*)palloc(sizeof(FileFdwPlanState));
fileGetOptions(foreigntableid, &fdw_private->filename, &fdw_private->options);
baserel->fdw_private = (void*)fdw_private;
/* Estimate relation size */
estimate_size(root, baserel, fdw_private);
}
/*
* fileGetForeignPaths
* Create possible access paths for a scan on the foreign table
*
* Currently we don't support any push-down feature, so there is only one
* possible access path, which simply returns all records in the order in
* the data file.
*/
static void fileGetForeignPaths(PlannerInfo* root, RelOptInfo* baserel, Oid foreigntableid)
{
FileFdwPlanState* fdw_private = (FileFdwPlanState*)baserel->fdw_private;
Cost startup_cost;
Cost total_cost;
/* Estimate costs */
estimate_costs(root, baserel, fdw_private, &startup_cost, &total_cost);
/* Create a ForeignPath node and add it as only possible path */
add_path(root,
baserel,
(Path*)create_foreignscan_path(root,
baserel,
startup_cost,
total_cost,
NIL, /* no pathkeys */
NULL, /* no outer rel either */
NIL)); /* no fdw_private data */
/*
* If data file was sorted, and we knew it somehow, we could insert
* appropriate pathkeys into the ForeignPath node to tell the planner
* that.
*/
}
/*
* fileGetForeignPlan
* Create a ForeignScan plan node for scanning the foreign table
*/
static ForeignScan* fileGetForeignPlan(
PlannerInfo* root, RelOptInfo* baserel, Oid foreigntableid, ForeignPath* best_path, List* tlist, List* scan_clauses)
{
Index scan_relid = baserel->relid;
/*
* We have no native ability to evaluate restriction clauses, so we just
* put all the scan_clauses into the plan node's qual list for the
* executor to check. So all we have to do here is strip RestrictInfo
* nodes from the clauses and ignore pseudoconstants (which will be
* handled elsewhere).
*/
scan_clauses = extract_actual_clauses(scan_clauses, false);
/* Create the ForeignScan node */
return make_foreignscan(tlist,
scan_clauses,
scan_relid,
NIL, /* no expressions to evaluate */
NIL,
EXEC_ON_DATANODES); /* no private state either */
}
/*
* fileExplainForeignScan
* Produce extra output for EXPLAIN
*/
static void fileExplainForeignScan(ForeignScanState* node, ExplainState* es)
{
char* filename = NULL;
List* options = NIL;
/* Fetch options --- we only need filename at this point */
fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation), &filename, &options);
ExplainPropertyText("Foreign File", filename, es);
/* Suppress file size if we're not showing cost details */
if (es->costs) {
struct stat stat_buf;
if (stat(filename, &stat_buf) == 0)
ExplainPropertyLong("Foreign File Size", (long)stat_buf.st_size, es);
}
}
/*
* fileBeginForeignScan
* Initiate access to the file by creating CopyState
*/
static void fileBeginForeignScan(ForeignScanState* node, int eflags)
{
char* filename = NULL;
List* options = NIL;
CopyState cstate;
FileFdwExecutionState* festate = NULL;
/*
* Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
*/
if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
return;
/* Fetch options of foreign table */
fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation), &filename, &options);
/*
* Create CopyState from FDW options. We always acquire all columns, so
* as to match the expected ScanTupleSlot signature.
*/
cstate = BeginCopyFrom(node->ss.ss_currentRelation, filename, NIL, options, NULL, NULL);
/*
* Save state in node->fdw_state. We must save enough information to call
* BeginCopyFrom() again.
*/
festate = (FileFdwExecutionState*)palloc(sizeof(FileFdwExecutionState));
festate->filename = filename;
festate->options = options;
festate->cstate = cstate;
node->fdw_state = (void*)festate;
}
/*
* fileIterateForeignScan
* Read next record from the data file and store it into the
* ScanTupleSlot as a virtual tuple
*/
static TupleTableSlot* fileIterateForeignScan(ForeignScanState* node)
{
FileFdwExecutionState* festate = (FileFdwExecutionState*)node->fdw_state;
TupleTableSlot* slot = node->ss.ss_ScanTupleSlot;
bool found = false;
ErrorContextCallback errcontext;
/* Set up callback to identify error line number. */
errcontext.callback = CopyFromErrorCallback;
errcontext.arg = (void*)festate->cstate;
errcontext.previous = t_thrd.log_cxt.error_context_stack;
t_thrd.log_cxt.error_context_stack = &errcontext;
/*
* The protocol for loading a virtual tuple into a slot is first
* ExecClearTuple, then fill the values/isnull arrays, then
* ExecStoreVirtualTuple. If we don't find another row in the file, we
* just skip the last step, leaving the slot empty as required.
*
* We can pass ExprContext = NULL because we read all columns from the
* file, so no need to evaluate default expressions.
*
* We can also pass tupleOid = NULL because we don't allow oids for
* foreign tables.
*/
ExecClearTuple(slot);
found = NextCopyFrom(festate->cstate, NULL, slot->tts_values, slot->tts_isnull, NULL);
if (found)
ExecStoreVirtualTuple(slot);
/* Remove error callback. */
t_thrd.log_cxt.error_context_stack = errcontext.previous;
return slot;
}
/*
* fileReScanForeignScan
* Rescan table, possibly with new parameters
*/
static void fileReScanForeignScan(ForeignScanState* node)
{
FileFdwExecutionState* festate = (FileFdwExecutionState*)node->fdw_state;
EndCopyFrom(festate->cstate);
festate->cstate = BeginCopyFrom(node->ss.ss_currentRelation, festate->filename, NIL, festate->options, NULL, NULL);
}
/*
* fileEndForeignScan
* Finish scanning foreign table and dispose objects used for this scan
*/
static void fileEndForeignScan(ForeignScanState* node)
{
FileFdwExecutionState* festate = (FileFdwExecutionState*)node->fdw_state;
/* if festate is NULL, we are in EXPLAIN; nothing to do */
if (festate)
EndCopyFrom(festate->cstate);
}
/*
* fileAnalyzeForeignTable
* Test whether analyzing this foreign table is supported
* @hdfs
* In order to match AnalyzeForeignTable function input parameter change
* we add the parameter (void* additionalData). This parameter may not be
* used by fileAnalyzeForeignTable function.
*/
static bool fileAnalyzeForeignTable(Relation relation, AcquireSampleRowsFunc* func, BlockNumber* totalpages,
void* additionalData, bool estimate_table_rownum)
{
char* filename = NULL;
List* options = NIL;
struct stat stat_buf;
/* Fetch options of foreign table */
fileGetOptions(RelationGetRelid(relation), &filename, &options);
/*
* Get size of the file. (XXX if we fail here, would it be better to just
* return false to skip analyzing the table?)
*/
if (stat(filename, &stat_buf) < 0)
ereport(ERROR, (errcode_for_file_access(), errmsg("could not stat file \"%s\": %m", filename)));
/*
* Convert size to pages. Must return at least 1 so that we can tell
* later on that pg_class.relpages is not default.
*/
*totalpages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ;
if (*totalpages < 1)
*totalpages = 1;
*func = file_acquire_sample_rows;
return true;
}
/*
* Estimate size of a foreign table.
*
* The main result is returned in baserel->rows. We also set
* fdw_private->pages and fdw_private->ntuples for later use in the cost
* calculation.
*/
static void estimate_size(PlannerInfo* root, RelOptInfo* baserel, FileFdwPlanState* fdw_private)
{
struct stat stat_buf;
BlockNumber pages;
double ntuples;
double nrows;
/*
* Get size of the file. It might not be there at plan time, though, in
* which case we have to use a default estimate.
*/
if (stat(fdw_private->filename, &stat_buf) < 0)
stat_buf.st_size = 10 * BLCKSZ;
/*
* Convert size to pages for use in I/O cost estimate later.
*/
pages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ;
if (pages < 1)
pages = 1;
fdw_private->pages = pages;
/*
* Estimate the number of tuples in the file.
*/
if (baserel->pages > 0) {
/*
* We have # of pages and # of tuples from pg_class (that is, from a
* previous ANALYZE), so compute a tuples-per-page estimate and scale
* that by the current file size.
*/
double density;
density = baserel->tuples / (double)baserel->pages;
ntuples = clamp_row_est(density * (double)pages);
} else {
/*
* Otherwise we have to fake it. We back into this estimate using the
* planner's idea of the relation width; which is bogus if not all
* columns are being read, not to mention that the text representation
* of a row probably isn't the same size as its internal
* representation. Possibly we could do something better, but the
* real answer to anyone who complains is "ANALYZE" ...
*/
int tuple_width;
tuple_width = MAXALIGN(baserel->width) + MAXALIGN(sizeof(HeapTupleHeaderData));
ntuples = clamp_row_est((double)stat_buf.st_size / (double)tuple_width);
baserel->tuples = ntuples;
}
fdw_private->ntuples = ntuples;
/*
* Now estimate the number of rows returned by the scan after applying the
* baserestrictinfo quals.
*/
nrows = ntuples * clauselist_selectivity(root, baserel->baserestrictinfo, 0, JOIN_INNER, NULL);
nrows = clamp_row_est(nrows);
/* Save the output-rows estimate for the planner */
baserel->rows = nrows;
}
/*
* Estimate costs of scanning a foreign table.
*
* Results are returned in *startup_cost and *total_cost.
*/
static void estimate_costs(
PlannerInfo* root, RelOptInfo* baserel, FileFdwPlanState* fdw_private, Cost* startup_cost, Cost* total_cost)
{
BlockNumber pages = fdw_private->pages;
double ntuples = fdw_private->ntuples;
Cost run_cost = 0;
Cost cpu_per_tuple;
/*
* We estimate costs almost the same way as cost_seqscan(), thus assuming
* that I/O costs are equivalent to a regular table file of the same size.
* However, we take per-tuple CPU costs as 10x of a seqscan, to account
* for the cost of parsing records.
*/
run_cost += u_sess->attr.attr_sql.seq_page_cost * pages;
*startup_cost = baserel->baserestrictcost.startup;
cpu_per_tuple = u_sess->attr.attr_sql.cpu_tuple_cost * 10 + baserel->baserestrictcost.per_tuple;
run_cost += cpu_per_tuple * ntuples;
*total_cost = *startup_cost + run_cost;
}
/*
* file_acquire_sample_rows -- acquire a random sample of rows from the table
*
* Selected rows are returned in the caller-allocated array rows[],
* which must have at least targrows entries.
* The actual number of rows selected is returned as the function result.
* We also count the total number of rows in the file and return it into
* *totalrows. Note that *totaldeadrows is always set to 0.
*
* Note that the returned list of rows is not always in order by physical
* position in the file. Therefore, correlation estimates derived later
* may be meaningless, but it's OK because we don't use the estimates
* currently (the planner only pays attention to correlation for indexscans).
*/
static int file_acquire_sample_rows(Relation onerel, int elevel, HeapTuple* rows, int targrows, double* totalrows,
double* totaldeadrows, void* additionalData, bool estimate_table_rownum)
{
int numrows = 0;
double rowstoskip = -1; /* -1 means not set yet */
double rstate;
TupleDesc tupDesc;
Datum* values = NULL;
bool* nulls = NULL;
bool found = false;
char* filename = NULL;
List* options = NIL;
CopyState cstate;
ErrorContextCallback errcontext;
MemoryContext oldcontext = CurrentMemoryContext;
MemoryContext tupcontext;
Assert(onerel);
Assert(targrows > 0);
tupDesc = RelationGetDescr(onerel);
values = (Datum*)palloc(tupDesc->natts * sizeof(Datum));
nulls = (bool*)palloc(tupDesc->natts * sizeof(bool));
/* Fetch options of foreign table */
fileGetOptions(RelationGetRelid(onerel), &filename, &options);
/*
* Create CopyState from FDW options.
*/
cstate = BeginCopyFrom(onerel, filename, NIL, options, NULL, NULL);
/*
* Use per-tuple memory context to prevent leak of memory used to read
* rows from the file with Copy routines.
*/
tupcontext = AllocSetContextCreate(CurrentMemoryContext,
"file_fdw temporary context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
/* Prepare for sampling rows */
rstate = anl_init_selection_state(targrows);
/* Set up callback to identify error line number. */
errcontext.callback = CopyFromErrorCallback;
errcontext.arg = (void*)cstate;
errcontext.previous = t_thrd.log_cxt.error_context_stack;
t_thrd.log_cxt.error_context_stack = &errcontext;
*totalrows = 0;
*totaldeadrows = 0;
for (;;) {
/* Check for user-requested abort or sleep */
vacuum_delay_point();
/* Fetch next row */
MemoryContextReset(tupcontext);
MemoryContextSwitchTo(tupcontext);
found = NextCopyFrom(cstate, NULL, values, nulls, NULL);
MemoryContextSwitchTo(oldcontext);
if (!found)
break;
/*
* The first targrows sample rows are simply copied into the
* reservoir. Then we start replacing tuples in the sample until we
* reach the end of the relation. This algorithm is from Jeff Vitter's
* paper (see more info in commands/analyze.c).
*/
if (numrows < targrows) {
rows[numrows++] = heap_form_tuple(tupDesc, values, nulls);
} else {
/*
* t in Vitter's paper is the number of records already processed.
* If we need to compute a new S value, we must use the
* not-yet-incremented value of totalrows as t.
*/
if (rowstoskip < 0)
rowstoskip = anl_get_next_S(*totalrows, targrows, &rstate);
if (rowstoskip <= 0) {
/*
* Found a suitable tuple, so save it, replacing one old tuple
* at random
*/
int k = (int)(targrows * anl_random_fract());
Assert(k >= 0 && k < targrows);
heap_freetuple(rows[k]);
rows[k] = heap_form_tuple(tupDesc, values, nulls);
}
rowstoskip -= 1;
}
*totalrows += 1;
}
/* Remove error callback. */
t_thrd.log_cxt.error_context_stack = errcontext.previous;
/* Clean up. */
MemoryContextDelete(tupcontext);
EndCopyFrom(cstate);
pfree(values);
pfree(nulls);
/*
* Emit some interesting relation info
*/
ereport(elevel,
(errmsg("\"%s\": file contains %.0f rows; "
"%d rows in sample",
RelationGetRelationName(onerel),
*totalrows,
numrows)));
return numrows;
}
/*@hdfs
*brief: Validate table definition
*input param @obj: A Obj including infomation to validate when alter tabel and create table.
*/
static void fileValidateTableDef(Node* Obj)
{
if (Obj == NULL)
return;
switch (nodeTag(Obj)) {
case T_AlterTableStmt: {
ListCell* lc = NULL;
AlterTableStmt* stmt = (AlterTableStmt*)Obj;
foreach (lc, stmt->cmds) {
AlterTableCmd* cmd = (AlterTableCmd*)lfirst(lc);
if (!FOREIGNTABLE_SUPPORT_AT_CMD(cmd->subtype)) {
ereport(ERROR, (errmsg("Un-support feature"), errdetail("target table is a foreign table")));
}
}
break;
}
case T_CreateForeignTableStmt:
break;
default:
elog(ERROR, "unrecognized node type: %d", (int)nodeTag(Obj));
break;
}
}