2468 lines
85 KiB
C++
Executable File
2468 lines
85 KiB
C++
Executable File
#include <stdio.h>
|
|
#include <sys/stat.h>
|
|
#include <netinet/in.h>
|
|
#include <arpa/inet.h>
|
|
#include <stdlib.h>
|
|
#include "access/dfs/dfs_common.h"
|
|
#include "access/dfs/dfs_query.h"
|
|
#ifdef ENABLE_MULTIPLE_NODES
|
|
#include "access/dfs/carbondata_index_reader.h"
|
|
#endif
|
|
#include "access/dfs/dfs_stream.h"
|
|
#include "access/dfs/dfs_stream_factory.h"
|
|
#include "hdfs_fdw.h"
|
|
#include "scheduler.h"
|
|
#include "access/hash.h"
|
|
#include "access/relscan.h"
|
|
#include "catalog/pgxc_node.h"
|
|
#include "catalog/pg_partition_fn.h"
|
|
#include "commands/defrem.h"
|
|
#include "foreign/foreign.h"
|
|
#include "nodes/nodes.h"
|
|
#include "optimizer/cost.h"
|
|
#include "optimizer/predtest.h"
|
|
#include "pgxc/pgxcnode.h"
|
|
#include "pgxc/pgxc.h"
|
|
#include "utils/lsyscache.h"
|
|
#include "utils/memutils.h"
|
|
#include "access/heapam.h"
|
|
#include "utils/syscache.h"
|
|
#include "dfs_adaptor.h"
|
|
|
|
typedef struct PartitionInfoCacheKey {
|
|
Oid relOid;
|
|
uint32 scanClauseHashValue;
|
|
} PartitionInfoCacheKey;
|
|
|
|
typedef struct PartitionInfoCacheEntry {
|
|
PartitionInfoCacheKey key;
|
|
int64 lastModifyTime;
|
|
List* splitList;
|
|
List* prunningResult;
|
|
} PartitionInfoCacheEntry;
|
|
|
|
typedef struct partition_context {
|
|
/*It is used to store the restirction on partition column. */
|
|
List* partClauseList;
|
|
/*It is used to store varattno list of partition columns. */
|
|
List* partColList;
|
|
} partition_context;
|
|
|
|
typedef struct partition_string_context {
|
|
/* It is used to store built partition path for partition restriction. */
|
|
List* partColStrList;
|
|
/*
|
|
* It is used to store varattno of partition columns, each listcell of
|
|
* partColStrList and partColNoList is one by one correspondence.
|
|
*/
|
|
List* partColNoList;
|
|
Oid foreignTblOid;
|
|
} partition_string_context;
|
|
|
|
extern char* tcp_link_addr;
|
|
|
|
static void AssignReplicaNode(HTAB* htab, const Oid* dnOids, const uint32_t nodeNum, const List* fileList);
|
|
static bool AssignRemoteNode(HTAB* htab, int nodeNum, Oid* dnOids, SplitInfo* currentFile, bool isAnalyze);
|
|
static dnWork* AssignRequestFilesToDn(HTAB* htab, List* fileList, int filesNum, dfs::DFSConnector* conn);
|
|
static char* parseMultiFileNames(char** fileNames, bool checkRootDir, char delimiter);
|
|
static int getAnalyzeFilesNum(int dataNodeNum, int totalFilesNum);
|
|
static bool isNodeLocalToFile(Form_pgxc_node nodeForm, const char* blLocation);
|
|
static List* GetAllFiles(dfs::DFSConnector* conn, Oid foreignTableId, ServerTypeOption srvType, List* columnList = NIL,
|
|
List* scanClauseList = NIL);
|
|
static List* GetObsAllFiles(dfs::DFSConnector* conn, Oid foreignTableId, List* columnList, List*& prunningResult,
|
|
List*& partList, List* scanClauses);
|
|
static List* GetHdfsAllFiles(dfs::DFSConnector* conn, Oid foreignTableId, List* columnList, List*& prunningResult,
|
|
List*& partList, List* scanClauses);
|
|
static List* GetSubFiles(dfs::DFSConnector* conn, SplitInfo* split, int colNo);
|
|
static List* DigFiles(dfs::DFSConnector* conn, SplitInfo* split);
|
|
static List* PartitionPruneProcess(dfs::DFSConnector* conn, List* partitionRelatedList, List* scanClauses,
|
|
Oid foreignTableId, List*& prunningResult, List*& partList, ServerTypeOption srvType);
|
|
static void CheckPartitionColNumber(
|
|
dfs::DFSConnector* conn, List* partList, List* fileList, Oid foreignTableId, ServerTypeOption srvType);
|
|
static SplitInfo* CheckOneSubSplit(dfs::DFSConnector* conn, SplitInfo* split, bool isLastPartition, Oid foreignTableId);
|
|
static bool PartitionFilterClause(SplitInfo* split, List* scanClauses, Var* value, Expr* equalExpr);
|
|
static void CollectPartPruneInfo(List*& prunningResult, int sum, int notprunning, int colno, Oid relOid);
|
|
static List* DrillDown(dfs::DFSConnector* conn, List* fileList);
|
|
static bool AssignLocalNode(
|
|
HTAB* htab, uint64* Locations, const char* pChached, uint32 LocationSize, SplitInfo* currentFile, bool isAnalyze);
|
|
static int GetDnIpAddrByOid(Oid* DnOid, uint32 OidSize, uint64* OidIp, uint32 OidIpSize);
|
|
static int StatDn(uint64* dnInfo, uint32 dnCnt, dnInfoStat* statDnInfo, uint32 statDnCnt);
|
|
static int CompareByLowerInt32(const void* Elem1, const void* Elem2);
|
|
static int CompareByIp(const void* Elem1, const void* Elem2);
|
|
static Value* getPartitionValue(dfs::DFSConnector* conn, char* partitionStr, char* ObjectStr);
|
|
static void obsFileScheduling(HTAB* htab, List* FileList, Oid* dnOids, int numOfNodes, char locatorType);
|
|
static void hdfsFileScheduling(
|
|
dfs::DFSConnector* conn, HTAB* htab, List* FileList, Oid* dnOids, int numOfNodes, char locatorType, bool isAnalyze);
|
|
static void SpillToDisk(Index relId, List* allTask, dfs::DFSConnector* conn);
|
|
static char* getPrefixPath(dfs::DFSConnector* conn);
|
|
static void flushToRemote(SplitMap* dnTask, const char* buffer, dfs::DFSConnector* conn);
|
|
static void loadDiskSplits(SplitMap* dnTask, dfs::DFSConnector* conn);
|
|
void scan_expression_tree_walker(Node* node, bool (*walker)(), void* context);
|
|
void getPartitionClause(Node* node, partition_context* context);
|
|
void getPartitionString(Node* node, partition_string_context* context);
|
|
bool isEquivalentExpression(Oid opno);
|
|
|
|
extern List* CNSchedulingForDistOBSFt(Oid foreignTableId);
|
|
|
|
#ifdef ENABLE_MULTIPLE_NODES
|
|
static List* ExtractNonParamRestriction(List* opExpressionList);
|
|
List* CarbonDataFile(dfs::DFSConnector* conn, List* fileList, List* allColumnList, List* restrictColumnList,
|
|
List* scanClauses, int16 attrNum);
|
|
#endif
|
|
|
|
uint32 best_effort_use_cahce = 0; // not use
|
|
|
|
List* CNScheduling(Oid foreignTableId, Index relId, List* columnList, List* scanClauses, List*& prunningResult,
|
|
List*& partList, char locatorType, bool isAnalyze, List* allColumnList, int16 attrNum, int64* fileNum)
|
|
{
|
|
int numOfNodes = 0;
|
|
|
|
Oid* dnOids = NULL;
|
|
HTAB* HTab = NULL;
|
|
HASHCTL HashCtl;
|
|
|
|
List* FileList = NIL;
|
|
List* PartitionRelatedList = NIL;
|
|
errno_t rc = EOK;
|
|
|
|
ServerTypeOption srvType = T_INVALID;
|
|
|
|
if (IS_PGXC_DATANODE) {
|
|
QUERY_NOT_SUPPORT(foreignTableId,
|
|
"Query on datanode is not "
|
|
"supported currently for the foreign table:%s.");
|
|
}
|
|
|
|
numOfNodes = get_pgxc_classnodes(foreignTableId, &dnOids);
|
|
Assert(NULL != dnOids && numOfNodes > 0);
|
|
|
|
srvType = getServerType(foreignTableId);
|
|
|
|
/* initialize the hash table which is used for storing dn's assigned files */
|
|
rc = memset_s(&HashCtl, sizeof(HashCtl), 0, sizeof(HashCtl));
|
|
securec_check(rc, "\0", "\0");
|
|
HashCtl.keysize = sizeof(Oid);
|
|
HashCtl.entrysize = sizeof(dnWork);
|
|
HashCtl.hash = oid_hash;
|
|
HashCtl.hcxt = CurrentMemoryContext;
|
|
HTab = hash_create("SchedulerHashTable", 128, &HashCtl, HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
|
|
|
|
dfs::DFSConnector* conn = dfs::createConnector(CurrentMemoryContext, foreignTableId);
|
|
|
|
/* get file list */
|
|
switch (srvType) {
|
|
case T_OBS_SERVER: {
|
|
/* get all obs files need to be schedule */
|
|
FileList = GetObsAllFiles(conn, foreignTableId, columnList, prunningResult, partList, scanClauses);
|
|
break;
|
|
}
|
|
case T_HDFS_SERVER: {
|
|
/* get all hdfs files need to be schedule */
|
|
FileList = GetHdfsAllFiles(conn, foreignTableId, columnList, prunningResult, partList, scanClauses);
|
|
break;
|
|
}
|
|
default: {
|
|
Assert(0);
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (0 == list_length(FileList)) {
|
|
delete (conn);
|
|
conn = NULL;
|
|
return NIL;
|
|
}
|
|
|
|
/* Start to process partition info */
|
|
PartitionRelatedList = list_make2(FileList, columnList);
|
|
|
|
/* Process the diretories of each layer of partition in order. */
|
|
FileList = PartitionPruneProcess(
|
|
conn, PartitionRelatedList, scanClauses, foreignTableId, prunningResult, partList, srvType);
|
|
|
|
#ifdef ENABLE_MULTIPLE_NODES
|
|
char* format = HdfsGetOptionValue(foreignTableId, OPTION_NAME_FORMAT);
|
|
/* check data format is carbondata, if is carbondata, analysis and filter */
|
|
if (0 == pg_strcasecmp(format, DFS_FORMAT_CARBONDATA)) {
|
|
FileList = CarbonDataFile(conn, FileList, allColumnList, columnList, scanClauses, attrNum);
|
|
}
|
|
#endif
|
|
|
|
if (0 == list_length(FileList)) {
|
|
delete (conn);
|
|
conn = NULL;
|
|
return NIL;
|
|
}
|
|
|
|
if (NULL != fileNum) {
|
|
*fileNum = list_length(FileList);
|
|
}
|
|
|
|
/* file schedule */
|
|
switch (srvType) {
|
|
case T_OBS_SERVER: {
|
|
/* Check if the file list is empty again after the partition prunning. */
|
|
if (0 == list_length(FileList)) {
|
|
delete (conn);
|
|
conn = NULL;
|
|
return NIL;
|
|
}
|
|
|
|
obsFileScheduling(HTab, FileList, dnOids, numOfNodes, locatorType);
|
|
break;
|
|
}
|
|
case T_HDFS_SERVER: {
|
|
/*
|
|
* Sometimes the hive partition layers is more than ours partition defination, so here we need to dig
|
|
* down to find all the files.
|
|
*/
|
|
FileList = DrillDown(conn, FileList);
|
|
|
|
/* Check if the file list is empty again after the partition prunning and drilling. */
|
|
if (0 == list_length(FileList)) {
|
|
delete (conn);
|
|
conn = NULL;
|
|
return NIL;
|
|
}
|
|
|
|
hdfsFileScheduling(conn, HTab, FileList, dnOids, numOfNodes, locatorType, isAnalyze);
|
|
break;
|
|
}
|
|
default: {
|
|
Assert(0);
|
|
break;
|
|
}
|
|
}
|
|
|
|
List* allTask = NIL;
|
|
dnWork* Item = NULL;
|
|
HASH_SEQ_STATUS ScanStatus;
|
|
|
|
hash_seq_init(&ScanStatus, HTab);
|
|
ereport(LOG, (errmodule(MOD_HDFS), errmsg("Total %d files, %d datanodes", list_length(FileList), numOfNodes)));
|
|
|
|
while ((Item = (dnWork*)hash_seq_search(&ScanStatus)) != NULL) {
|
|
ListCell* lc = NULL;
|
|
SplitMap* dnTask = makeNode(SplitMap);
|
|
|
|
dnTask->nodeId = PGXCNodeGetNodeId(Item->nodeOid, PGXC_NODE_DATANODE);
|
|
dnTask->locatorType = locatorType;
|
|
dnTask->splits = Item->toDoList;
|
|
dnTask->fileNums = list_length(dnTask->splits);
|
|
foreach (lc, dnTask->splits) {
|
|
SplitInfo* split = (SplitInfo*)lfirst(lc);
|
|
dnTask->totalSize += split->ObjectSize;
|
|
}
|
|
|
|
allTask = lappend(allTask, dnTask);
|
|
ereport(DEBUG1,
|
|
(errmodule(MOD_HDFS),
|
|
errmsg(
|
|
"Datanode %s, assigned %d files", get_pgxc_nodename(Item->nodeOid), list_length(Item->toDoList))));
|
|
}
|
|
|
|
hash_destroy(HTab);
|
|
HTab = NULL;
|
|
|
|
Assert(allTask != NIL);
|
|
|
|
/* check the allTask, If the size of splits */
|
|
if ((!t_thrd.postgres_cxt.mark_explain_only && !isAnalyze) &&
|
|
list_length(FileList) >= u_sess->attr.attr_sql.schedule_splits_threshold) {
|
|
SpillToDisk(relId, allTask, conn);
|
|
}
|
|
|
|
delete (conn);
|
|
conn = NULL;
|
|
|
|
/* free early will have problem */
|
|
list_free(FileList);
|
|
FileList = NIL;
|
|
|
|
return allTask;
|
|
}
|
|
|
|
/**
|
|
* @Description: scheduler OBS objects for datanodes.
|
|
* @in htab, the hash table.
|
|
* @in FileList, the objects to be scheduled.
|
|
* @in dnOids, the datanode arrary.
|
|
* @in numOfNodes, the datanode number.
|
|
# @in locatorType, distribute type of the given table .
|
|
* @return node.
|
|
*/
|
|
void obsFileScheduling(HTAB* htab, List* FileList, Oid* dnOids, int numOfNodes, char locatorType)
|
|
{
|
|
if (LOCATOR_TYPE_REPLICATED == locatorType) {
|
|
AssignReplicaNode(htab, dnOids, numOfNodes, FileList);
|
|
} else {
|
|
ListCell* FileCell = NULL;
|
|
int num_processed = 0;
|
|
int taskCnt = 0;
|
|
int fileCnt = list_length(FileList);
|
|
|
|
taskCnt = MIN(fileCnt, numOfNodes);
|
|
|
|
if (taskCnt == 0)
|
|
taskCnt = 1;
|
|
|
|
foreach (FileCell, FileList) {
|
|
// filter object size is zero
|
|
SplitInfo* splitinfo = (SplitInfo*)lfirst(FileCell);
|
|
bool found = false;
|
|
CHECK_FOR_INTERRUPTS();
|
|
|
|
Oid nodeOid = dnOids[num_processed % taskCnt];
|
|
dnWork* item = (dnWork*)hash_search(htab, &nodeOid, HASH_ENTER, &found);
|
|
if (!found) {
|
|
item->toDoList = NIL;
|
|
}
|
|
|
|
item->toDoList = lappend(item->toDoList, splitinfo);
|
|
|
|
num_processed++;
|
|
ereport(DEBUG1,
|
|
(errmodule(MOD_OBS),
|
|
errmsg("Assign object %s to datanode:%s",
|
|
((SplitInfo*)lfirst(FileCell))->filePath,
|
|
get_pgxc_nodename(nodeOid))));
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @Description: scheduler hdfs files for datanodes.
|
|
* @in htab, the hash table.
|
|
* @in FileList, the objects to be scheduled.
|
|
* @in dnOids, the datanode arrary.
|
|
* @in numOfNodes, the datanode number.
|
|
# @in locatorType, distribute type of the given table .
|
|
* @in isAnalyze, if the isAnalyze is true, we are executing an analyze command.
|
|
* @return node.
|
|
*/
|
|
static void hdfsFileScheduling(
|
|
dfs::DFSConnector* conn, HTAB* htab, List* FileList, Oid* dnOids, int numOfNodes, char locatorType, bool isAnalyze)
|
|
{
|
|
ListCell* FileCell = NULL;
|
|
SplitInfo* Split = NULL;
|
|
char szIp[32] = {0};
|
|
uint64* dnInfo = NULL;
|
|
dnInfoStat* statDnInfo = NULL;
|
|
|
|
/*get dn nodes ip and combo with oid*/
|
|
dnInfo = (uint64*)palloc0(numOfNodes * sizeof(uint64));
|
|
statDnInfo = (dnInfoStat*)palloc0(numOfNodes * sizeof(dnInfoStat));
|
|
int dnCnt = GetDnIpAddrByOid(dnOids, numOfNodes, dnInfo, numOfNodes);
|
|
|
|
/*sorted by lower int32 for using bsearch*/
|
|
::qsort(dnInfo, dnCnt, sizeof(uint64), CompareByLowerInt32);
|
|
|
|
/*stat by ip*/
|
|
int statDnCnt = StatDn(dnInfo, dnCnt, statDnInfo, numOfNodes);
|
|
|
|
/* used for generate a random start position in AssignLocalNode */
|
|
::srand((unsigned)time(NULL));
|
|
|
|
if (LOCATOR_TYPE_REPLICATED == locatorType) {
|
|
AssignReplicaNode(htab, dnOids, numOfNodes, FileList);
|
|
} else {
|
|
uint64* pLocal = (uint64*)palloc0(sizeof(uint64) * MAX_ROUNDROBIN_AVAILABLE_DN_NUM);
|
|
char* pCached = (char*)palloc0(sizeof(char) * MAX_ROUNDROBIN_AVAILABLE_DN_NUM);
|
|
bool needPredicate = true;
|
|
int fileCount = 0;
|
|
int localFileCount = 0;
|
|
errno_t Ret = EOK;
|
|
|
|
Ret = memset_s(pCached, MAX_ROUNDROBIN_AVAILABLE_DN_NUM, 0, MAX_ROUNDROBIN_AVAILABLE_DN_NUM);
|
|
securec_check(Ret, "", "");
|
|
foreach (FileCell, FileList) {
|
|
CHECK_FOR_INTERRUPTS();
|
|
Split = (SplitInfo*)lfirst(FileCell);
|
|
char* CurrentFileName = Split->filePath;
|
|
uint32 LocalCnt = 0;
|
|
|
|
if (needPredicate) {
|
|
dfs::DFSBlockInfo* BlInf = conn->getBlockLocations(CurrentFileName);
|
|
Assert(BlInf != NULL);
|
|
int ReplNum = BlInf->getNumOfReplica();
|
|
const char* pName = NULL;
|
|
|
|
for (int Loop = 0; Loop < ReplNum; Loop++) {
|
|
pName = BlInf->getNames(0, Loop);
|
|
Assert(pName != NULL);
|
|
Ret = strcpy_s(szIp, (sizeof(szIp) - 1), pName);
|
|
securec_check(Ret, "", "");
|
|
/*remove port info*/
|
|
char* pStr = ::strrchr(szIp, ':');
|
|
if (pStr != NULL) {
|
|
*pStr = '\0';
|
|
}
|
|
|
|
uint64 TmpVal = (uint64)inet_addr(szIp);
|
|
dnInfoStat* pdnInfoStat =
|
|
(dnInfoStat*)::bsearch(&TmpVal, statDnInfo, statDnCnt, sizeof(dnInfoStat), CompareByIp);
|
|
|
|
if (pdnInfoStat != NULL) /*is in dn*/
|
|
{
|
|
/* save all local */
|
|
Ret = memcpy_s(&pLocal[LocalCnt],
|
|
((MAX_ROUNDROBIN_AVAILABLE_DN_NUM - LocalCnt) * sizeof(uint64)),
|
|
&dnInfo[pdnInfoStat->Start],
|
|
pdnInfoStat->Cnt * sizeof(uint64));
|
|
securec_check(Ret, "", "");
|
|
LocalCnt += pdnInfoStat->Cnt;
|
|
|
|
if (BlInf->isCached(0, Loop)) {
|
|
Ret = memset_s(&pCached[LocalCnt], pdnInfoStat->Cnt, 1, pdnInfoStat->Cnt);
|
|
securec_check(Ret, "", "");
|
|
}
|
|
}
|
|
}
|
|
delete (BlInf);
|
|
BlInf = NULL;
|
|
}
|
|
|
|
/*check whether all locations are remote*/
|
|
if (LocalCnt != 0) {
|
|
(void)AssignLocalNode(htab, pLocal, pCached, LocalCnt, Split, isAnalyze);
|
|
localFileCount++;
|
|
} else {
|
|
/* choose one mpp dn to handle the split */
|
|
if (!AssignRemoteNode(htab, numOfNodes, dnOids, Split, isAnalyze)) {
|
|
delete (conn);
|
|
conn = NULL;
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_NO_DATA_FOUND),
|
|
errmodule(MOD_HDFS),
|
|
errmsg("No datanode is assigned for this split: %s", CurrentFileName)));
|
|
}
|
|
}
|
|
fileCount++;
|
|
|
|
/* If the local ratio is less than 1/10 in the first 256 files, we take all the others as remote scan. */
|
|
if (fileCount == (int)MAX_ROUNDROBIN_AVAILABLE_DN_NUM &&
|
|
localFileCount < (int)(MAX_ROUNDROBIN_AVAILABLE_DN_NUM / 10)) {
|
|
needPredicate = false;
|
|
}
|
|
}
|
|
|
|
pfree_ext(pLocal);
|
|
pfree_ext(pCached);
|
|
}
|
|
|
|
pfree_ext(dnInfo);
|
|
pfree_ext(statDnInfo);
|
|
}
|
|
|
|
static void SpillToDisk(Index relId, List* allTask, dfs::DFSConnector* conn)
|
|
{
|
|
ListCell* lc = NULL;
|
|
foreach (lc, allTask) {
|
|
SplitMap* dnTask = (SplitMap*)lfirst(lc);
|
|
|
|
/* when the number of files is less than u_sess->attr.attr_sql.schedule_splits_threshold / 1000, don't spill to
|
|
* disk. */
|
|
if (list_length(dnTask->splits) <= (int)(u_sess->attr.attr_sql.schedule_splits_threshold / 1000))
|
|
continue;
|
|
|
|
/* serialize the splits */
|
|
char* tmpBuffer = nodeToString(dnTask->splits);
|
|
dnTask->lengths = lappend_int(dnTask->lengths, strlen(tmpBuffer));
|
|
|
|
/* construct the tmp file name */
|
|
StringInfoData tmpFileName;
|
|
initStringInfo(&tmpFileName);
|
|
char* prefix = getPrefixPath(conn);
|
|
uint32 queryHashID = ((t_thrd.postgres_cxt.debug_query_string == NULL)
|
|
? 0
|
|
: hash_any((unsigned char*)t_thrd.postgres_cxt.debug_query_string,
|
|
strlen(t_thrd.postgres_cxt.debug_query_string)));
|
|
appendStringInfo(&tmpFileName,
|
|
"%s/.%lu_%u_%u_%ld_%d_%ld",
|
|
prefix,
|
|
gs_thread_self(),
|
|
queryHashID,
|
|
relId,
|
|
(int64)allTask,
|
|
dnTask->nodeId,
|
|
gs_random());
|
|
|
|
dnTask->downDiskFilePath = tmpFileName.data;
|
|
|
|
/* flush the serial buffer to share file system. */
|
|
flushToRemote(dnTask, tmpBuffer, conn);
|
|
ereport(DEBUG1,
|
|
(errmodule(MOD_HDFS),
|
|
errmsg("Coordinate %s, spill %d splits to dfs %s.",
|
|
get_pgxc_nodename(dnTask->nodeId),
|
|
list_length(dnTask->splits),
|
|
dnTask->downDiskFilePath)));
|
|
|
|
/* clean tmp memory */
|
|
pfree_ext(tmpBuffer);
|
|
if (prefix != NULL)
|
|
pfree_ext(prefix);
|
|
list_free_deep(dnTask->splits);
|
|
dnTask->splits = NIL;
|
|
}
|
|
}
|
|
|
|
static char* getPrefixPath(dfs::DFSConnector* conn)
|
|
{
|
|
if (conn->getType() == HDFS_CONNECTOR) {
|
|
/* hdfs */
|
|
StringInfoData prefix;
|
|
initStringInfo(&prefix);
|
|
appendStringInfo(&prefix, "/tmp");
|
|
return prefix.data;
|
|
} else {
|
|
/* obs */
|
|
const char* bucket = conn->getValue("bucket", NULL);
|
|
StringInfoData prefix;
|
|
initStringInfo(&prefix);
|
|
appendStringInfo(&prefix, "%s", bucket);
|
|
return prefix.data;
|
|
}
|
|
}
|
|
|
|
static void flushToRemote(SplitMap* dnTask, const char* buffer, dfs::DFSConnector* conn)
|
|
{
|
|
if (conn->openFile(dnTask->downDiskFilePath, O_WRONLY) == -1 ||
|
|
conn->writeCurrentFile(buffer, linitial_int(dnTask->lengths)) == -1 || conn->flushCurrentFile() == -1) {
|
|
delete (conn);
|
|
conn = NULL;
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_FDW_ERROR),
|
|
errmodule(MOD_HDFS),
|
|
errmsg("Failed to flush the splits into disk when the count is"
|
|
"too much, detail can be found in log of %s",
|
|
g_instance.attr.attr_common.PGXCNodeName)));
|
|
}
|
|
}
|
|
|
|
static void loadDiskSplits(SplitMap* dnTask, dfs::DFSConnector* conn)
|
|
{
|
|
MemoryContext oldCtx = MemoryContextSwitchTo(u_sess->cache_mem_cxt);
|
|
int length = linitial_int(dnTask->lengths);
|
|
char* buffer = (char*)palloc0(length + 1);
|
|
if (conn->openFile(dnTask->downDiskFilePath, O_RDONLY) == -1 ||
|
|
conn->readCurrentFileFully(buffer, length, 0) == -1) {
|
|
delete (conn);
|
|
conn = NULL;
|
|
(void)MemoryContextSwitchTo(oldCtx);
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_FDW_ERROR),
|
|
errmodule(MOD_HDFS),
|
|
errmsg("Failed to load the splits from disk when the count is"
|
|
"too much, detail can be found in log of %s",
|
|
g_instance.attr.attr_common.PGXCNodeName)));
|
|
}
|
|
ereport(DEBUG1,
|
|
(errmodule(MOD_HDFS),
|
|
errmsg("Datanode %s, load %d splits from dfs %s.",
|
|
get_pgxc_nodename(dnTask->nodeId),
|
|
list_length(dnTask->splits),
|
|
dnTask->downDiskFilePath)));
|
|
|
|
/* deserialize the splits */
|
|
dnTask->splits = (List*)stringToNode(buffer);
|
|
pfree_ext(buffer);
|
|
(void)MemoryContextSwitchTo(oldCtx);
|
|
|
|
/* clear the down disk objects */
|
|
conn->closeCurrentFile();
|
|
(void)conn->deleteFile(dnTask->downDiskFilePath, 0);
|
|
pfree(dnTask->downDiskFilePath);
|
|
dnTask->downDiskFilePath = NULL;
|
|
list_free(dnTask->lengths);
|
|
dnTask->lengths = NIL;
|
|
}
|
|
|
|
static int GetDnIpAddrByOid(Oid* DnOid, uint32 OidSize, uint64* OidIp, uint32 OidIpSize)
|
|
{
|
|
uint32 Loop = 0;
|
|
uint32 OidIpIdx = 0;
|
|
HeapTuple tuple = NULL;
|
|
|
|
Assert(OidIpSize >= OidSize);
|
|
|
|
for (Loop = 0; (Loop < OidSize) && (OidIpIdx < OidIpSize); Loop++) {
|
|
tuple = SearchSysCache1(PGXCNODEOID, ObjectIdGetDatum(DnOid[Loop]));
|
|
|
|
if (!HeapTupleIsValid(tuple))
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_CACHE_LOOKUP_FAILED),
|
|
errmodule(MOD_HDFS),
|
|
errmsg("cache lookup failed for node %u", DnOid[Loop])));
|
|
|
|
Form_pgxc_node NodeForm = (Form_pgxc_node)GETSTRUCT(tuple);
|
|
in_addr_t TmpVal = 0;
|
|
|
|
/*handle local host and local ip*/
|
|
if ((strncmp(NodeForm->node_host.data, LOCAL_IP, ::strlen(LOCAL_IP)) == 0) ||
|
|
(strncmp(NodeForm->node_host.data, LOCAL_HOST, ::strlen(LOCAL_HOST)) == 0)) {
|
|
|
|
TmpVal = inet_addr(tcp_link_addr);
|
|
} else {
|
|
TmpVal = inet_addr(NodeForm->node_host.data);
|
|
}
|
|
|
|
OidIp[OidIpIdx] = (((uint64)DnOid[Loop]) << 32) + ((uint64)TmpVal);
|
|
OidIpIdx += 1;
|
|
|
|
ReleaseSysCache(tuple);
|
|
}
|
|
|
|
return OidIpIdx;
|
|
}
|
|
|
|
static int StatDn(uint64* dnInfo, uint32 dnCnt, dnInfoStat* statDnInfo, uint32 statDnCnt)
|
|
{
|
|
uint64 Tmp = (dnCnt > 0) ? dnInfo[0] : 0;
|
|
uint32 Cnt = (dnCnt > 0) ? 1 : 0;
|
|
uint32 Start = 0;
|
|
uint32 Loop = (dnCnt > 0) ? 1 : 0;
|
|
uint32 dnInfoStatIdx = 0;
|
|
|
|
Assert((dnCnt <= statDnCnt) && (dnInfo != NULL) && (statDnInfo != NULL));
|
|
|
|
for (; (Loop < dnCnt) && (dnInfoStatIdx < statDnCnt); Loop++) {
|
|
if (GETIP(Tmp) == GETIP(dnInfo[Loop])) {
|
|
Cnt += 1;
|
|
} else {
|
|
statDnInfo[dnInfoStatIdx].ipAddr = GETIP(Tmp);
|
|
statDnInfo[dnInfoStatIdx].Start = Start;
|
|
statDnInfo[dnInfoStatIdx].Cnt = Cnt;
|
|
dnInfoStatIdx += 1;
|
|
|
|
Tmp = dnInfo[Loop];
|
|
Start = Loop;
|
|
Cnt = 1;
|
|
}
|
|
}
|
|
|
|
if (Cnt > 0) {
|
|
statDnInfo[dnInfoStatIdx].ipAddr = GETIP(Tmp);
|
|
statDnInfo[dnInfoStatIdx].Start = Start;
|
|
statDnInfo[dnInfoStatIdx].Cnt = Cnt;
|
|
}
|
|
|
|
return int(dnInfoStatIdx + 1);
|
|
}
|
|
|
|
/*quick sort & bsearch*/
|
|
static int CompareByLowerInt32(const void* Elem1, const void* Elem2)
|
|
{
|
|
int Ret = 0;
|
|
const uint64* P1 = (const uint64*)Elem1;
|
|
const uint64* P2 = (const uint64*)Elem2;
|
|
uint32 Ip1 = GETIP(*P1);
|
|
uint32 Ip2 = GETIP(*P2);
|
|
|
|
Ret = (Ip1 > Ip2) ? 1 : ((Ip1 < Ip2) ? -1 : 0);
|
|
|
|
return Ret;
|
|
}
|
|
|
|
static int CompareByIp(const void* Elem1, const void* Elem2)
|
|
{
|
|
int Ret = 0;
|
|
const uint64* P1 = (const uint64*)Elem1;
|
|
uint32 Ip1 = GETIP(*P1);
|
|
const dnInfoStat* P2 = (dnInfoStat*)Elem2;
|
|
uint32 Ip2 = P2->ipAddr;
|
|
|
|
Ret = (Ip1 > Ip2) ? 1 : ((Ip1 < Ip2) ? -1 : 0);
|
|
|
|
return Ret;
|
|
}
|
|
|
|
List* CNSchedulingForAnalyze(unsigned int* totalFilesNum, unsigned int* numOfDns, Oid foreignTableId, bool isglbstats)
|
|
{
|
|
errno_t rc = EOK;
|
|
int filesToRead = 0;
|
|
HTAB* htab = NULL;
|
|
HASHCTL hash_ctl;
|
|
dnWork* item = NULL;
|
|
List* fileList = NIL;
|
|
HASH_SEQ_STATUS scan_status;
|
|
List* partitionRelatedList = NIL;
|
|
List* columnList = NIL;
|
|
Relation relation = RelationIdGetRelation(foreignTableId);
|
|
TupleDesc tupleDescriptor = NULL;
|
|
List* prunningResult = NIL;
|
|
List* partList = NIL;
|
|
RelationLocInfo* rel_loc_info = GetRelationLocInfo(foreignTableId);
|
|
List* allTask = NIL;
|
|
ServerTypeOption srvType = T_INVALID;
|
|
|
|
if (!RelationIsValid(relation)) {
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_CACHE_LOOKUP_FAILED),
|
|
errmodule(MOD_HDFS),
|
|
errmsg("could not open relation with OID %u", foreignTableId)));
|
|
}
|
|
|
|
srvType = getServerType(foreignTableId);
|
|
tupleDescriptor = RelationGetDescr(relation);
|
|
columnList = CreateColList((Form_pg_attribute*)tupleDescriptor->attrs, tupleDescriptor->natts);
|
|
|
|
RelationClose(relation);
|
|
|
|
*numOfDns = get_pgxc_classnodes(foreignTableId, NULL);
|
|
Assert(*numOfDns > 0);
|
|
|
|
/* we should get all dn task for global stats. */
|
|
if (isglbstats) {
|
|
if (IS_OBS_CSV_TXT_FOREIGN_TABLE(foreignTableId)) {
|
|
/* for dist obs foreign table.*/
|
|
allTask = CNSchedulingForDistOBSFt(foreignTableId);
|
|
} else {
|
|
if (rel_loc_info == NULL) {
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_CACHE_LOOKUP_FAILED),
|
|
errmodule(MOD_HDFS),
|
|
errmsg("could not get locator information for relation with OID %u", foreignTableId)));
|
|
}
|
|
allTask = CNScheduling(foreignTableId,
|
|
0,
|
|
columnList,
|
|
NULL,
|
|
prunningResult,
|
|
partList,
|
|
rel_loc_info->locatorType,
|
|
true,
|
|
columnList,
|
|
tupleDescriptor->natts,
|
|
NULL);
|
|
}
|
|
pfree_ext(rel_loc_info);
|
|
return allTask;
|
|
}
|
|
|
|
/* used for generate a random start position in AssignLocalNode */
|
|
srand((unsigned)time(NULL));
|
|
|
|
dfs::DFSConnector* conn = dfs::createConnector(CurrentMemoryContext, foreignTableId);
|
|
|
|
rc = memset_s(&hash_ctl, sizeof(hash_ctl), 0, sizeof(hash_ctl));
|
|
securec_check(rc, "\0", "\0");
|
|
hash_ctl.keysize = sizeof(Oid);
|
|
hash_ctl.entrysize = sizeof(dnWork);
|
|
hash_ctl.hash = oid_hash;
|
|
hash_ctl.hcxt = CurrentMemoryContext;
|
|
htab = hash_create("SchedulerHashTable", 128, &hash_ctl, HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
|
|
|
|
/* Get the string of file names into allFiles */
|
|
fileList = GetAllFiles(conn, foreignTableId, srvType);
|
|
|
|
if (0 == list_length(fileList)) {
|
|
delete (conn);
|
|
conn = NULL;
|
|
list_free_deep(columnList);
|
|
return NIL;
|
|
}
|
|
|
|
/* Start to process partition info */
|
|
partitionRelatedList = list_make2(fileList, columnList);
|
|
|
|
/* Process the diretories of each layer of partition in order. */
|
|
fileList =
|
|
PartitionPruneProcess(conn, partitionRelatedList, NIL, foreignTableId, prunningResult, partList, srvType);
|
|
|
|
/*
|
|
* Sometimes the hive partition layers is more than ours partition defination, so here we need to dig
|
|
* down to find all the files.
|
|
*/
|
|
fileList = DrillDown(conn, fileList);
|
|
|
|
/* Check if the file list is empty again after the partition prunning and drilling. */
|
|
if (0 == list_length(fileList)) {
|
|
delete (conn);
|
|
conn = NULL;
|
|
return NIL;
|
|
}
|
|
|
|
*totalFilesNum = list_length(fileList);
|
|
|
|
/*
|
|
* acquire the num of files needs to analyze
|
|
* at least analyze one file
|
|
*/
|
|
if (IsLocatorReplicated(GetLocatorType(foreignTableId))) {
|
|
filesToRead = *totalFilesNum;
|
|
} else {
|
|
filesToRead = getAnalyzeFilesNum(*numOfDns, *totalFilesNum);
|
|
}
|
|
Assert(filesToRead > 0);
|
|
|
|
/*
|
|
* find the suitable dn which has the request files number.
|
|
* we will choose local read files priorityly
|
|
* if none of the dns has the enough local read files,fill the others with remote read files
|
|
*/
|
|
item = AssignRequestFilesToDn(htab, fileList, filesToRead, conn);
|
|
Assert(NULL != item);
|
|
if (item == NULL) {
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_CACHE_LOOKUP_FAILED),
|
|
errmodule(MOD_HDFS),
|
|
errmsg("could not assign request files to dn")));
|
|
}
|
|
delete (conn);
|
|
conn = NULL;
|
|
|
|
SplitMap* taskMap = makeNode(SplitMap);
|
|
taskMap->nodeId = PGXCNodeGetNodeId(item->nodeOid, PGXC_NODE_DATANODE);
|
|
taskMap->locatorType = LOCATOR_TYPE_NONE;
|
|
taskMap->splits = list_copy(item->toDoList);
|
|
allTask = lappend(allTask, taskMap);
|
|
|
|
hash_seq_init(&scan_status, htab);
|
|
while ((item = (dnWork*)hash_seq_search(&scan_status)) != NULL) {
|
|
list_free(item->toDoList);
|
|
item->toDoList = NIL;
|
|
}
|
|
|
|
hash_destroy(htab);
|
|
list_free(fileList);
|
|
fileList = NIL;
|
|
|
|
return allTask;
|
|
}
|
|
|
|
void AssignSplits(List* splitToDnMap, dfs::reader::ReaderState* readerState, dfs::DFSConnector* conn)
|
|
{
|
|
ListCell* splitToDnMapCell = NULL;
|
|
|
|
List* fileList = NIL;
|
|
|
|
foreach (splitToDnMapCell, splitToDnMap) {
|
|
SplitMap* dnTask = (SplitMap*)lfirst(splitToDnMapCell);
|
|
|
|
if (u_sess->pgxc_cxt.PGXCNodeId == dnTask->nodeId || LOCATOR_TYPE_REPLICATED == dnTask->locatorType) {
|
|
/* If the splits is spilled to disk ,then load it here. */
|
|
if ((dnTask->splits == NULL && dnTask->downDiskFilePath != NULL) && conn != NULL) {
|
|
loadDiskSplits(dnTask, conn);
|
|
}
|
|
|
|
if (NIL != dnTask->splits) {
|
|
fileList = (List*)copyObject(dnTask->splits);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
readerState->splitList = fileList;
|
|
}
|
|
|
|
/*
|
|
* Find a dn which has the enough local-read files to fulfill the request files number
|
|
* if there's not enough files, take some remote-read files
|
|
*/
|
|
static dnWork* AssignRequestFilesToDn(HTAB* htab, List* fileList, int filesNum, dfs::DFSConnector* conn)
|
|
{
|
|
HeapTuple tuple = NULL;
|
|
int maxFilesAssigned = 0;
|
|
Form_pgxc_node dataNodeForm;
|
|
dnWork* recordDn = NULL;
|
|
bool found = false;
|
|
ListCell* fileCell = NULL;
|
|
dnWork* item = NULL;
|
|
int dnIdx = 0;
|
|
Oid* dnOids = NULL;
|
|
int totalDnNum;
|
|
int numOfDns;
|
|
int replIdx;
|
|
int startNode;
|
|
|
|
PgxcNodeGetOids(NULL, &dnOids, NULL, &numOfDns, false);
|
|
Assert(numOfDns > 0 && NULL != dnOids);
|
|
|
|
totalDnNum = numOfDns;
|
|
startNode = (int)gs_random() % totalDnNum;
|
|
dnIdx = startNode;
|
|
|
|
/* traversal all the dn to assign files,and return the first dn that satifies. */
|
|
while (numOfDns--) {
|
|
bool assigned = false;
|
|
|
|
Oid nodeOid = dnOids[dnIdx];
|
|
tuple = SearchSysCache1(PGXCNODEOID, ObjectIdGetDatum(nodeOid));
|
|
|
|
Form_pgxc_node nodeForm = (Form_pgxc_node)GETSTRUCT(tuple);
|
|
|
|
/* initialize the number of files that is assigned to zero */
|
|
int filesAssgnied = 0;
|
|
|
|
/* traversal all the files */
|
|
foreach (fileCell, fileList) {
|
|
SplitInfo* split = (SplitInfo*)lfirst(fileCell);
|
|
char* currentFileName = split->filePath;
|
|
int numOfReplica;
|
|
|
|
dfs::DFSBlockInfo* bl = conn->getBlockLocations(currentFileName);
|
|
numOfReplica = bl->getNumOfReplica();
|
|
|
|
/* traversal the replication ,if any of the replication satify local read to current datanode, add it to
|
|
* splitmap */
|
|
for (replIdx = 0; replIdx < numOfReplica; replIdx++) {
|
|
if (isNodeLocalToFile(nodeForm, bl->getNames(0, replIdx))) {
|
|
if (!assigned) {
|
|
assigned = true;
|
|
|
|
item = (dnWork*)hash_search(htab, &nodeOid, HASH_ENTER, &found);
|
|
item->toDoList = NIL;
|
|
|
|
Assert(!found);
|
|
}
|
|
|
|
filesAssgnied++;
|
|
item->toDoList = lappend(item->toDoList, split);
|
|
|
|
if (filesAssgnied == filesNum) {
|
|
ReleaseSysCache(tuple);
|
|
delete (bl);
|
|
bl = NULL;
|
|
return item;
|
|
}
|
|
|
|
break;
|
|
}
|
|
}
|
|
delete (bl);
|
|
bl = NULL;
|
|
}
|
|
|
|
/* record the dn which has the most local read files */
|
|
if (filesAssgnied > maxFilesAssigned) {
|
|
maxFilesAssigned = filesAssgnied;
|
|
recordDn = item;
|
|
}
|
|
|
|
ReleaseSysCache(tuple);
|
|
|
|
dnIdx++;
|
|
if (dnIdx == totalDnNum)
|
|
dnIdx = 0;
|
|
}
|
|
|
|
/* after traversal all files,we don't get enough local files,
|
|
* so choose the dn which has the most local files ,and add some remote files to it
|
|
*/
|
|
if (NULL == recordDn) {
|
|
recordDn = (dnWork*)hash_search(htab, &dnOids[startNode], HASH_ENTER, &found);
|
|
recordDn->toDoList = NIL;
|
|
}
|
|
tuple = SearchSysCache1(PGXCNODEOID, ObjectIdGetDatum(recordDn->nodeOid));
|
|
dataNodeForm = (Form_pgxc_node)GETSTRUCT(tuple);
|
|
|
|
foreach (fileCell, fileList) {
|
|
SplitInfo* split = (SplitInfo*)lfirst(fileCell);
|
|
char* currentFileName = split->filePath;
|
|
int numOfReplica;
|
|
|
|
dfs::DFSBlockInfo* bl = conn->getBlockLocations(currentFileName);
|
|
numOfReplica = bl->getNumOfReplica();
|
|
|
|
for (replIdx = 0; replIdx < numOfReplica; replIdx++) {
|
|
if (isNodeLocalToFile(dataNodeForm, bl->getNames(0, replIdx))) {
|
|
/* it has already in the toDo list */
|
|
break;
|
|
}
|
|
}
|
|
delete (bl);
|
|
bl = NULL;
|
|
|
|
if (replIdx >= numOfReplica) {
|
|
maxFilesAssigned++;
|
|
recordDn->toDoList = lappend(recordDn->toDoList, split);
|
|
|
|
if (maxFilesAssigned == filesNum) {
|
|
ReleaseSysCache(tuple);
|
|
return recordDn;
|
|
}
|
|
}
|
|
}
|
|
|
|
ReleaseSysCache(tuple);
|
|
|
|
if (dnOids != NULL)
|
|
pfree_ext(dnOids);
|
|
return NULL;
|
|
}
|
|
|
|
static int getAnalyzeFilesNum(int dataNodeNum, int totalFilesNum)
|
|
{
|
|
double accurateAssignedFiles;
|
|
double rawAssignedFiles;
|
|
int filesToRead;
|
|
|
|
if (dataNodeNum == 0)
|
|
dataNodeNum = 1;
|
|
|
|
accurateAssignedFiles = (double)totalFilesNum / dataNodeNum;
|
|
rawAssignedFiles = (double)totalFilesNum / dataNodeNum;
|
|
|
|
if (totalFilesNum <= dataNodeNum)
|
|
filesToRead = 1;
|
|
else if (accurateAssignedFiles - rawAssignedFiles > 0.5)
|
|
filesToRead = (int)rawAssignedFiles + 1;
|
|
else
|
|
filesToRead = (int)rawAssignedFiles;
|
|
|
|
Assert(filesToRead >= 1 && filesToRead <= totalFilesNum / dataNodeNum + 1 &&
|
|
filesToRead >= totalFilesNum / dataNodeNum);
|
|
|
|
return filesToRead;
|
|
}
|
|
|
|
static bool isNodeLocalToFile(Form_pgxc_node nodeForm, const char* blLocation)
|
|
{
|
|
|
|
/*
|
|
* if node ip is not local address, compare directly
|
|
*/
|
|
if (strncmp(nodeForm->node_host.data, blLocation, strlen(nodeForm->node_host.data)) == 0) {
|
|
return true;
|
|
}
|
|
|
|
/*
|
|
* if node ip is local address, compare sctp_link_addr
|
|
*/
|
|
if (((strncmp(nodeForm->node_host.data, LOCAL_IP, strlen(LOCAL_IP)) == 0) ||
|
|
(strncmp(nodeForm->node_host.data, LOCAL_HOST, strlen(LOCAL_HOST)) == 0)) &&
|
|
(strncmp(tcp_link_addr, blLocation, strlen(blLocation)) == 0)) {
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
static void AssignReplicaNode(HTAB* htab, const Oid* dnOids, const uint32_t nodeNum, const List* fileList)
|
|
{
|
|
bool found = false;
|
|
for (uint32_t dnIdx = 0; dnIdx < nodeNum; dnIdx++) {
|
|
Oid nodeOid = dnOids[dnIdx];
|
|
dnWork* item = (dnWork*)hash_search(htab, &nodeOid, HASH_ENTER, &found);
|
|
if (0 == dnIdx) {
|
|
item->toDoList = (List*)copyObject(fileList);
|
|
} else {
|
|
item->toDoList = NIL;
|
|
}
|
|
}
|
|
}
|
|
|
|
/*
|
|
* select a lowest workload dn in (cached&local) or (uncached&local) dns
|
|
*/
|
|
static bool AssignLocalNode(
|
|
HTAB* htab, uint64* Locations, const char* pChached, uint32 LocationSize, SplitInfo* currentFile, bool isAnalyze)
|
|
{
|
|
bool bFound = false;
|
|
Oid NodeOid = 0;
|
|
uint32 nodeTaskLength = 0;
|
|
uint64 TempU64 = 0;
|
|
uint32 MinWl = MAX_UINT32;
|
|
uint32 NextIdx = 0;
|
|
uint32 UseCachedFactor = 1;
|
|
uint32 Sel;
|
|
|
|
Assert((Locations != NULL) && (LocationSize > 0));
|
|
|
|
dnWork* pMinWorkload[MAX_ROUNDROBIN_AVAILABLE_DN_NUM] = {NULL};
|
|
|
|
for (uint32 Loop = 0; Loop < LocationSize; Loop++) {
|
|
TempU64 = Locations[Loop];
|
|
UseCachedFactor = ((pChached[Loop] != 0) && (best_effort_use_cahce == 1)) ? 2 : 1;
|
|
NodeOid = GETOID(TempU64);
|
|
|
|
/* use the oid of each node in the pgxc_node is efficient and is coveient to get information
|
|
* with oid
|
|
*/
|
|
dnWork* Item = (dnWork*)hash_search(htab, &NodeOid, HASH_ENTER, &bFound);
|
|
|
|
if (bFound) {
|
|
nodeTaskLength = (list_length(Item->toDoList) + (UseCachedFactor - 1)) / UseCachedFactor;
|
|
|
|
} else /* indicate the mpp datanode did not has any work */
|
|
{
|
|
nodeTaskLength = 0;
|
|
Item->toDoList = NIL;
|
|
}
|
|
|
|
if (nodeTaskLength < MinWl) {
|
|
pMinWorkload[0] = Item;
|
|
MinWl = nodeTaskLength;
|
|
NextIdx = 1;
|
|
} else if (nodeTaskLength == MinWl) {
|
|
pMinWorkload[NextIdx] = Item;
|
|
NextIdx += 1;
|
|
} else {
|
|
/*nothing to do*/
|
|
}
|
|
}
|
|
|
|
Assert((NextIdx > 0) && (NextIdx < MAX_ROUNDROBIN_AVAILABLE_DN_NUM));
|
|
if (NextIdx == 0)
|
|
NextIdx = 1;
|
|
|
|
if (!isAnalyze) {
|
|
/* select one randomly from all low workload dns */
|
|
Sel = gs_random() % NextIdx;
|
|
} else /* get a determinded dn to get single stats for global stats. */
|
|
Sel = (NextIdx - 1) % MAX_ROUNDROBIN_AVAILABLE_DN_NUM;
|
|
|
|
pMinWorkload[Sel]->toDoList = lappend(pMinWorkload[Sel]->toDoList, currentFile);
|
|
|
|
return true;
|
|
}
|
|
|
|
static bool AssignRemoteNode(HTAB* htab, int nodeNum, Oid* dnOids, SplitInfo* currentFile, bool isAnalyze)
|
|
{
|
|
HeapTuple tuple = NULL;
|
|
int nodeTaskLength = 0;
|
|
bool found = false;
|
|
dnWork* recordDn = NULL;
|
|
Oid nodeOid;
|
|
int dnIdx;
|
|
int totalDnNum = nodeNum;
|
|
|
|
if (!isAnalyze)
|
|
dnIdx = (int)gs_random() % totalDnNum;
|
|
else /* the dn id often changed for anlyze of global stats, result in the totalRowCnts is error, so we should set
|
|
the dn id is 0. */
|
|
dnIdx = 0;
|
|
|
|
while (nodeNum-- > 0) {
|
|
nodeOid = dnOids[dnIdx];
|
|
tuple = SearchSysCache1(PGXCNODEOID, ObjectIdGetDatum(nodeOid));
|
|
|
|
if (!HeapTupleIsValid(tuple))
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_CACHE_LOOKUP_FAILED),
|
|
errmodule(MOD_HDFS),
|
|
errmsg("cache lookup failed for node %u", nodeOid)));
|
|
|
|
Form_pgxc_node nodeForm = (Form_pgxc_node)GETSTRUCT(tuple);
|
|
|
|
/* Take definition for given node type */
|
|
if (nodeForm->node_type != PGXC_NODE_COORDINATOR) {
|
|
/*
|
|
* use the oid of each node in the pgxc_node is efficient and is coveient to get information
|
|
* with oid
|
|
*/
|
|
dnWork* item = (dnWork*)hash_search(htab, &nodeOid, HASH_ENTER, &found);
|
|
|
|
// and it's not in the map yet
|
|
if (!found || 0 == list_length(item->toDoList)) {
|
|
item->toDoList = NIL;
|
|
item->toDoList = lappend(item->toDoList, currentFile);
|
|
|
|
ReleaseSysCache(tuple);
|
|
|
|
return true;
|
|
} else {
|
|
Assert(list_length(item->toDoList) > 0);
|
|
if (0 == nodeTaskLength || nodeTaskLength > list_length(item->toDoList)) {
|
|
nodeTaskLength = list_length(item->toDoList);
|
|
recordDn = item;
|
|
}
|
|
}
|
|
}
|
|
ReleaseSysCache(tuple);
|
|
|
|
dnIdx++;
|
|
|
|
if (dnIdx == totalDnNum)
|
|
dnIdx = 0;
|
|
}
|
|
|
|
if (0 != nodeTaskLength) {
|
|
recordDn->toDoList = lappend(recordDn->toDoList, currentFile);
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
static char* parseMultiFileNames(char** fileNames, bool checkRootDir, char delimiter)
|
|
{
|
|
char* currentFileName = NULL;
|
|
char* semicolon = strchr(*fileNames, delimiter);
|
|
|
|
if (semicolon == NULL) {
|
|
/* NOT FOUND */
|
|
char* tmp = *fileNames;
|
|
char* fileNameBegin = NULL;
|
|
char* fileNameEnd = NULL;
|
|
|
|
/* detele ' ' before path */
|
|
while (' ' == *tmp)
|
|
tmp++;
|
|
fileNameBegin = tmp;
|
|
|
|
/* detele ' ' after path */
|
|
tmp++;
|
|
while (' ' != *tmp && '\0' != *tmp)
|
|
tmp++;
|
|
fileNameEnd = tmp;
|
|
|
|
int indexOfSemicolon = (int)(fileNameEnd - fileNameBegin);
|
|
currentFileName = (char*)palloc0(indexOfSemicolon + 1);
|
|
errno_t rc = memcpy_s(currentFileName, (indexOfSemicolon + 1), fileNameBegin, indexOfSemicolon);
|
|
securec_check(rc, "", "");
|
|
currentFileName[indexOfSemicolon] = '\0';
|
|
*fileNames = NULL; /* reset to NULL as an end scan indicator */
|
|
} else {
|
|
/* delete ' ' */
|
|
char* tmp = *fileNames;
|
|
char* fileNameBegin = 0;
|
|
char* fileNameEnd = 0;
|
|
|
|
/* detele ' ' before path */
|
|
while (' ' == *tmp)
|
|
tmp++;
|
|
fileNameBegin = tmp;
|
|
|
|
/* detele ' ' after path */
|
|
tmp++;
|
|
while (' ' != *tmp && tmp < semicolon)
|
|
tmp++;
|
|
fileNameEnd = tmp;
|
|
|
|
int indexOfSemicolon = (int)(fileNameEnd - fileNameBegin);
|
|
int indexOfFile = (int)(semicolon - *fileNames);
|
|
currentFileName = (char*)palloc0(indexOfSemicolon + 1);
|
|
errno_t rc = memcpy_s(currentFileName, (indexOfSemicolon + 1), fileNameBegin, indexOfSemicolon);
|
|
securec_check(rc, "", "");
|
|
currentFileName[indexOfSemicolon] = '\0';
|
|
*fileNames += (long)indexOfFile + 1;
|
|
}
|
|
|
|
if (checkRootDir && currentFileName[0] != '/') {
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_FDW_INVALID_STRING_FORMAT),
|
|
errmodule(MOD_HDFS),
|
|
errmsg("file path need to start with root '/', but it is: %s", currentFileName)));
|
|
}
|
|
|
|
return currentFileName;
|
|
}
|
|
|
|
/**
|
|
* @Description: traverse the scan expression tree. It is a base function.
|
|
* when we search specified expression(for example,var, partColumnRestiction), need use it.
|
|
* @in node, the given expression.
|
|
* @in walker, the implementation function.
|
|
* @out context, the stateless struct, the caller build this structer.
|
|
* @return none.
|
|
*/
|
|
void scan_expression_tree_walker(Node* node, void (*walker)(), void* context)
|
|
{
|
|
bool (*p2walker)(void*, void*) = (bool (*)(void*, void*))walker;
|
|
/* Guard against stack overflow due to overly complex expressions. */
|
|
check_stack_depth();
|
|
|
|
if (NULL == node) {
|
|
return;
|
|
}
|
|
switch (nodeTag(node)) {
|
|
case T_BoolExpr: {
|
|
BoolExpr* expr = (BoolExpr*)node;
|
|
scan_expression_tree_walker((Node*)expr->args, walker, context);
|
|
break;
|
|
}
|
|
case T_OpExpr:
|
|
case T_NullTest: {
|
|
p2walker(node, context);
|
|
break;
|
|
}
|
|
case T_List: {
|
|
ListCell* temp = NULL;
|
|
foreach (temp, (List*)node) {
|
|
scan_expression_tree_walker((Node*)lfirst(temp), walker, context);
|
|
}
|
|
break;
|
|
}
|
|
|
|
default: {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @Description: get the restrictions about the given partition column.
|
|
* @in node, we get the restriction from this node, which is a restriction
|
|
* expression list.
|
|
* @in/out partition_context, the element partColList of context store
|
|
* all partition column number. The element partClauseList will store
|
|
* restriction clause.
|
|
* @return
|
|
*/
|
|
static void GetPartitionClauseOpExpr(Node* node, partition_context* context)
|
|
{
|
|
OpExpr* op_clause = (OpExpr*)node;
|
|
Node* leftop = NULL;
|
|
Node* rightop = NULL;
|
|
Var* var = NULL;
|
|
if (list_length(op_clause->args) != 2) {
|
|
return;
|
|
}
|
|
leftop = get_leftop((Expr*)op_clause);
|
|
rightop = get_rightop((Expr*)op_clause);
|
|
|
|
Assert(NULL != rightop);
|
|
Assert(NULL != leftop);
|
|
|
|
if (rightop == NULL) {
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_FDW_ERROR),
|
|
errmodule(MOD_HDFS),
|
|
errmsg("The right operate expression of partition column cannot be NULL.")));
|
|
return;
|
|
}
|
|
|
|
if (leftop == NULL) {
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_FDW_ERROR),
|
|
errmodule(MOD_HDFS),
|
|
errmsg("The left operate expression of partition column cannot be NULL.")));
|
|
return;
|
|
}
|
|
|
|
if (IsVarNode(rightop) && IsA(leftop, Const)) {
|
|
if (IsA(rightop, RelabelType)) {
|
|
rightop = (Node*)((RelabelType*)rightop)->arg;
|
|
}
|
|
var = (Var*)rightop;
|
|
} else if (IsVarNode(leftop) && IsA(rightop, Const)) {
|
|
if (IsA(leftop, RelabelType)) {
|
|
leftop = (Node*)((RelabelType*)leftop)->arg;
|
|
}
|
|
var = (Var*)leftop;
|
|
}
|
|
|
|
if (NULL != var) {
|
|
ListCell* cell = NULL;
|
|
foreach (cell, context->partColList) {
|
|
if (lfirst_int(cell) == var->varattno) {
|
|
/* we find one partition restriction calues and put it into partColList. */
|
|
context->partClauseList = lappend(context->partClauseList, node);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
static void GetPartitionClauseNullTest(Node* node, partition_context* context)
|
|
{
|
|
NullTest* nullExpr = (NullTest*)node;
|
|
if (IS_NOT_NULL == nullExpr->nulltesttype) {
|
|
return;
|
|
}
|
|
if (!IsA(nullExpr->arg, Var)) {
|
|
return;
|
|
}
|
|
Var* var = (Var*)nullExpr->arg;
|
|
if (NULL != var) {
|
|
ListCell* cell = NULL;
|
|
foreach (cell, context->partColList) {
|
|
if (lfirst_int(cell) == var->varattno) {
|
|
context->partClauseList = lappend(context->partClauseList, node);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void getPartitionClause(Node* node, partition_context* context)
|
|
{
|
|
if (NULL == node) {
|
|
return;
|
|
}
|
|
switch (nodeTag(node)) {
|
|
case T_BoolExpr: {
|
|
ereport(
|
|
ERROR, (errcode(ERRCODE_FDW_INVALID_DATA_TYPE), errmodule(MOD_HDFS), errmsg("can not reach here.")));
|
|
|
|
break;
|
|
}
|
|
case T_OpExpr: {
|
|
GetPartitionClauseOpExpr(node, context);
|
|
break;
|
|
}
|
|
case T_NullTest: {
|
|
/* Only optimize the "column Is NULL" NullTest expression. */
|
|
GetPartitionClauseNullTest(node, context);
|
|
break;
|
|
}
|
|
default: {
|
|
scan_expression_tree_walker(node, (void (*)())getPartitionClause, context);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @Description: build the partition dirctory for partition restriction.
|
|
* for example, /b=123/c=456/.
|
|
* @in node, the restriction.
|
|
* @in partition_string_context, it include the partition column.
|
|
* the partition restirction to be obtained will store partColStrList
|
|
* of context.
|
|
* @return none.
|
|
*/
|
|
static void getPartitionStringOpExpr(Node* node, partition_string_context* context)
|
|
{
|
|
OpExpr* op_clause = (OpExpr*)node;
|
|
bool equalExpr = isEquivalentExpression(op_clause->opno);
|
|
Var* var = NULL;
|
|
Const* constant = NULL;
|
|
Node* leftop = get_leftop((Expr*)op_clause);
|
|
Node* rightop = get_rightop((Expr*)op_clause);
|
|
if (equalExpr) {
|
|
if (rightop && IsVarNode(rightop) && leftop && IsA(leftop, Const)) {
|
|
if (IsA(rightop, RelabelType)) {
|
|
rightop = (Node*)((RelabelType*)rightop)->arg;
|
|
}
|
|
var = (Var*)rightop;
|
|
constant = (Const*)leftop;
|
|
} else if (leftop && IsVarNode(leftop) && rightop && IsA(rightop, Const)) {
|
|
if (IsA(leftop, RelabelType)) {
|
|
leftop = (Node*)((RelabelType*)leftop)->arg;
|
|
}
|
|
var = (Var*)leftop;
|
|
constant = (Const*)rightop;
|
|
}
|
|
if (NULL == var) {
|
|
return;
|
|
}
|
|
|
|
char* relName = get_relid_attribute_name(context->foreignTblOid, var->varattno);
|
|
StringInfo partitionDir = makeStringInfo();
|
|
appendStringInfo(partitionDir, "%s=", relName);
|
|
GetStringFromDatum(constant->consttype, constant->consttypmod, constant->constvalue, partitionDir);
|
|
appendStringInfo(partitionDir, "/");
|
|
context->partColNoList = lappend_int(context->partColNoList, var->varattno);
|
|
context->partColStrList = lappend(context->partColStrList, partitionDir);
|
|
}
|
|
}
|
|
|
|
static void getPartitionStringNullTest(Node* node, partition_string_context* context)
|
|
{
|
|
NullTest* nullExpr = (NullTest*)node;
|
|
if (IS_NOT_NULL == nullExpr->nulltesttype) {
|
|
return;
|
|
}
|
|
if (!IsA(nullExpr->arg, Var)) {
|
|
return;
|
|
}
|
|
|
|
Var* var = (Var*)nullExpr->arg;
|
|
if (NULL != var) {
|
|
char* relName = get_relid_attribute_name(context->foreignTblOid, var->varattno);
|
|
StringInfo partitionDir = makeStringInfo();
|
|
appendStringInfo(partitionDir, "%s=%s", relName, DEFAULT_HIVE_NULL);
|
|
appendStringInfo(partitionDir, "/");
|
|
context->partColNoList = lappend_int(context->partColNoList, var->varattno);
|
|
context->partColStrList = lappend(context->partColStrList, partitionDir);
|
|
}
|
|
}
|
|
|
|
void getPartitionString(Node* node, partition_string_context* context)
|
|
{
|
|
if (NULL == node) {
|
|
return;
|
|
}
|
|
switch (nodeTag(node)) {
|
|
case T_OpExpr: {
|
|
getPartitionStringOpExpr(node, context);
|
|
break;
|
|
}
|
|
case T_NullTest: {
|
|
getPartitionStringNullTest(node, context);
|
|
break;
|
|
}
|
|
default: {
|
|
scan_expression_tree_walker(node, (void (*)())getPartitionString, context);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @Description: whether or check the give expression include bool expression.
|
|
* @in node, the given expression.
|
|
* @return return true, if find the bool expression, otherwise return false.
|
|
*/
|
|
bool hasBoolExpr(Node* node)
|
|
{
|
|
check_stack_depth();
|
|
bool returnValue = false;
|
|
|
|
if (node == NULL)
|
|
return false;
|
|
|
|
switch (nodeTag(node)) {
|
|
case T_BoolExpr: {
|
|
returnValue = true;
|
|
break;
|
|
}
|
|
case T_OpExpr: {
|
|
ListCell* temp = NULL;
|
|
OpExpr* expr = (OpExpr*)node;
|
|
foreach (temp, expr->args) {
|
|
if (hasBoolExpr((Node*)lfirst(temp))) {
|
|
returnValue = true;
|
|
break;
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
case T_List: {
|
|
ListCell* temp = NULL;
|
|
foreach (temp, (List*)node) {
|
|
if (hasBoolExpr((Node*)lfirst(temp))) {
|
|
returnValue = true;
|
|
break;
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
|
|
default: {
|
|
break;
|
|
}
|
|
}
|
|
|
|
return returnValue;
|
|
}
|
|
|
|
/**
|
|
* @Description: add patition dirctory path from the given restriction.
|
|
* @in prefix, the foldername option value.
|
|
* @in foreignTableId, the given foreign table oid.
|
|
* @in scanClauseList, the given restriction.
|
|
* @return return the modified prefix.
|
|
*/
|
|
List* addPartitionPath(Oid foreignTableId, List* scanClauseList, char* prefix)
|
|
{
|
|
List* partList = GetPartitionList(foreignTableId);
|
|
bool hasBExpr = false;
|
|
List* partStrPathList = NIL;
|
|
partition_string_context context;
|
|
StringInfo str = NULL;
|
|
|
|
hasBExpr = hasBoolExpr((Node*)scanClauseList);
|
|
|
|
/*now, only optimize the opExpr. */
|
|
if (hasBExpr) {
|
|
return NIL;
|
|
}
|
|
|
|
context.partColNoList = NIL;
|
|
context.partColStrList = NIL;
|
|
context.foreignTblOid = foreignTableId;
|
|
|
|
partition_context part_context;
|
|
part_context.partClauseList = NIL;
|
|
part_context.partColList = partList;
|
|
|
|
getPartitionClause((Node*)scanClauseList, &part_context);
|
|
|
|
/* get the partition restriction. */
|
|
getPartitionString((Node*)part_context.partClauseList, &context);
|
|
|
|
/* Bulid the partition path. */
|
|
for (int i = 0; i < list_length(partList); i++) {
|
|
AttrNumber varattno = list_nth_int(partList, i);
|
|
bool findPartCol = false;
|
|
if (hasBExpr && i >= 1) {
|
|
break;
|
|
}
|
|
for (int partIndex = 0; partIndex < list_length(context.partColNoList); partIndex++) {
|
|
int partColNo = list_nth_int(context.partColNoList, partIndex);
|
|
|
|
if (partColNo == varattno) {
|
|
findPartCol = true;
|
|
StringInfo partStringDir = (StringInfo)list_nth(context.partColStrList, partIndex);
|
|
|
|
/* we need palloc memory. */
|
|
|
|
if (hasBExpr) {
|
|
str = makeStringInfo();
|
|
appendStringInfo(str, "%s", prefix);
|
|
appendStringInfo(str, "%s", partStringDir->data);
|
|
partStrPathList = lappend(partStrPathList, str);
|
|
ereport(LOG, (errmodule(MOD_DFS), errmsg("active pruning remain file: %s", str->data)));
|
|
|
|
/*As for boolExpr, more then one restriction is existed for one column. */
|
|
continue;
|
|
} else {
|
|
if (0 == i) {
|
|
str = makeStringInfo();
|
|
appendStringInfo(str, "%s", prefix);
|
|
}
|
|
appendStringInfo(str, "%s", partStringDir->data);
|
|
ereport(LOG, (errmodule(MOD_DFS), errmsg("active pruning remain file: %s", str->data)));
|
|
/* only one restriction for one column. */
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!findPartCol) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (!hasBExpr && NULL != str) {
|
|
partStrPathList = lappend(partStrPathList, str);
|
|
}
|
|
return partStrPathList;
|
|
}
|
|
|
|
static List* GetAllFiles(
|
|
dfs::DFSConnector* conn, Oid foreignTableId, ServerTypeOption srvType, List* columnList, List* scanClauseList)
|
|
{
|
|
List* fileList = NIL;
|
|
char* currentFile = NULL;
|
|
List* entryList = NIL;
|
|
|
|
switch (srvType) {
|
|
case T_OBS_SERVER: {
|
|
char* multiBucketsFolder = HdfsGetOptionValue(foreignTableId, OPTION_NAME_FOLDERNAME);
|
|
char* multiBucketsRegion = HdfsGetOptionValue(foreignTableId, OPTION_NAME_LOCATION);
|
|
|
|
while (NULL != multiBucketsFolder || NULL != multiBucketsRegion) {
|
|
if (NULL != multiBucketsFolder) {
|
|
currentFile = parseMultiFileNames(&multiBucketsFolder, false, ',');
|
|
} else {
|
|
/* As for region option, each region path will stat from "obs://". So we add strlen(obs:/) length
|
|
* for multiBuckets. */
|
|
multiBucketsRegion = multiBucketsRegion + strlen("obs:/");
|
|
currentFile = parseMultiFileNames(&multiBucketsRegion, false, '|');
|
|
}
|
|
|
|
List* fixedPathList = NIL;
|
|
if (u_sess->attr.attr_sql.enable_valuepartition_pruning) {
|
|
fixedPathList = addPartitionPath(foreignTableId, scanClauseList, currentFile);
|
|
}
|
|
|
|
if (0 == list_length(fixedPathList)) {
|
|
fileList = list_concat(fileList, conn->listObjectsStat(currentFile, currentFile));
|
|
} else {
|
|
ListCell* cell = NULL;
|
|
foreach (cell, fixedPathList) {
|
|
StringInfo fixedPath = (StringInfo)lfirst(cell);
|
|
fileList = list_concat(fileList, conn->listObjectsStat(fixedPath->data, currentFile));
|
|
}
|
|
}
|
|
list_free_ext(fixedPathList);
|
|
}
|
|
|
|
break;
|
|
}
|
|
case T_HDFS_SERVER: {
|
|
HdfsFdwOptions* options = HdfsGetOptions(foreignTableId);
|
|
if (options->foldername) {
|
|
/*
|
|
* If the foldername is a file path, the funtion hdfsListDirectory do not validity check,
|
|
* so calling IsHdfsFile to judge whether the foldername is not a file path.
|
|
*/
|
|
if (conn->isDfsFile(options->foldername)) {
|
|
delete (conn);
|
|
conn = NULL;
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_FDW_INVALID_OPTOIN_DATA),
|
|
errmodule(MOD_HDFS),
|
|
errmsg("The foldername option cannot be a file path.")));
|
|
}
|
|
|
|
fileList = conn->listObjectsStat(options->foldername);
|
|
} else {
|
|
while (NULL != options->filename) {
|
|
currentFile = parseMultiFileNames(&options->filename, true, ',');
|
|
|
|
/* If the option use filenames, then all the entries defined must be file. */
|
|
if (!conn->isDfsFile(currentFile)) {
|
|
delete (conn);
|
|
conn = NULL;
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_FDW_INVALID_OPTOIN_DATA),
|
|
errmodule(MOD_HDFS),
|
|
errmsg("The entries in the options fileNames must be file!")));
|
|
}
|
|
|
|
fileList = list_concat(fileList, conn->listObjectsStat(currentFile));
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
default: {
|
|
Assert(0);
|
|
break;
|
|
}
|
|
}
|
|
|
|
list_free(entryList);
|
|
entryList = NIL;
|
|
return fileList;
|
|
}
|
|
|
|
static List* GetObsAllFiles(dfs::DFSConnector* conn, Oid foreignTableId, List* columnList, List*& prunningResult,
|
|
List*& partList, List* scanClauses)
|
|
{
|
|
List* fileList = NIL;
|
|
char* currentFile = NULL;
|
|
|
|
char* multiBucketsFolder = HdfsGetOptionValue(foreignTableId, OPTION_NAME_FOLDERNAME);
|
|
char* multiBucketsRegion = HdfsGetOptionValue(foreignTableId, OPTION_NAME_LOCATION);
|
|
|
|
while (NULL != multiBucketsFolder || NULL != multiBucketsRegion) {
|
|
if (NULL != multiBucketsFolder) {
|
|
currentFile = parseMultiFileNames(&multiBucketsFolder, false, ',');
|
|
} else {
|
|
/* As for region option, each region path will stat from "obs://". So we add strlen(obs:/) length for
|
|
* multiBuckets. */
|
|
multiBucketsRegion = multiBucketsRegion + strlen("obs:/");
|
|
currentFile = parseMultiFileNames(&multiBucketsRegion, false, '|');
|
|
}
|
|
|
|
List* fixedPathList = NIL;
|
|
if (u_sess->attr.attr_sql.enable_valuepartition_pruning) {
|
|
fixedPathList = addPartitionPath(foreignTableId, scanClauses, currentFile);
|
|
}
|
|
|
|
if (0 == list_length(fixedPathList)) {
|
|
fileList = list_concat(fileList, conn->listObjectsStat(currentFile, currentFile));
|
|
} else {
|
|
ListCell* cell = NULL;
|
|
foreach (cell, fixedPathList) {
|
|
StringInfo fixedPath = (StringInfo)lfirst(cell);
|
|
fileList = list_concat(fileList, conn->listObjectsStat(fixedPath->data, currentFile));
|
|
}
|
|
}
|
|
list_free_ext(fixedPathList);
|
|
}
|
|
|
|
if (0 == list_length(fileList)) {
|
|
return NIL;
|
|
}
|
|
return fileList;
|
|
}
|
|
|
|
static List* GetHdfsAllFiles(dfs::DFSConnector* conn, Oid foreignTableId, List* columnList, List*& prunningResult,
|
|
List*& partList, List* scanClauses)
|
|
{
|
|
List* fileList = NIL;
|
|
char* currentFile = NULL;
|
|
|
|
HdfsFdwOptions* options = HdfsGetOptions(foreignTableId);
|
|
if (options->foldername) {
|
|
/*
|
|
* If the foldername is a file path, the funtion hdfsListDirectory do not validity check,
|
|
* so calling IsHdfsFile to judge whether the foldername is not a file path.
|
|
*/
|
|
if (conn->isDfsFile(options->foldername)) {
|
|
delete (conn);
|
|
conn = NULL;
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_FDW_INVALID_OPTOIN_DATA),
|
|
errmodule(MOD_HDFS),
|
|
errmsg("The foldername option cannot be a file path.")));
|
|
}
|
|
|
|
fileList = conn->listObjectsStat(options->foldername);
|
|
} else {
|
|
while (NULL != options->filename) {
|
|
currentFile = parseMultiFileNames(&options->filename, true, ',');
|
|
|
|
/* If the option use filenames, then all the entries defined must be file. */
|
|
if (!conn->isDfsFile(currentFile)) {
|
|
delete (conn);
|
|
conn = NULL;
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_FDW_INVALID_OPTOIN_DATA),
|
|
errmodule(MOD_HDFS),
|
|
errmsg("The entries in the options fileNames must be file!")));
|
|
}
|
|
|
|
fileList = list_concat(fileList, conn->listObjectsStat(currentFile));
|
|
}
|
|
}
|
|
|
|
if (0 == list_length(fileList)) {
|
|
return NIL;
|
|
}
|
|
return fileList;
|
|
}
|
|
|
|
static Value* getPartitionValue(dfs::DFSConnector* conn, char* partitionStr, char* ObjectStr)
|
|
{
|
|
const char* partContent = strchr(partitionStr, '=');
|
|
|
|
if (NULL == partContent) {
|
|
delete (conn);
|
|
conn = NULL;
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_FDW_ERROR),
|
|
errmodule(MOD_HDFS),
|
|
errmsg("Something wrong with the partition directory name of file %s.", ObjectStr)));
|
|
}
|
|
|
|
Value* partValue = makeString(UriDecode(partContent + 1));
|
|
return partValue;
|
|
}
|
|
|
|
/*
|
|
* Get all the files in the current directory and store the partition column value in the new split if need.
|
|
* This is only used for partition table and is different from DigFiles although both two functions search sub
|
|
* files for a given path.
|
|
*
|
|
* @_in param conn: the handler of hdfs connect.
|
|
* @_in param split: The split from whose path we get sub files.
|
|
* @_in param colNo: the partition column index of the current split.
|
|
* @return Return a list of sub files, or null for a empty directory.
|
|
*/
|
|
static List* GetSubFiles(dfs::DFSConnector* conn, SplitInfo* split, int colNo)
|
|
{
|
|
List* fileList = NIL;
|
|
char* fileName = split->fileName;
|
|
char* folderName = split->filePath;
|
|
List* partContentList = split->partContentList;
|
|
List* entryList = NIL;
|
|
SplitInfo* newsplit = NULL;
|
|
|
|
entryList = conn->listObjectsStat(folderName);
|
|
|
|
if (entryList == NIL) {
|
|
return NIL;
|
|
}
|
|
|
|
List* newPartContentList = list_copy(partContentList);
|
|
Value* partValue = getPartitionValue(conn, fileName, folderName);
|
|
newPartContentList = lappend(newPartContentList, partValue);
|
|
|
|
for (int i = 0; i < list_length(entryList); i++) {
|
|
SplitInfo* splitInfo = (SplitInfo*)list_nth(entryList, i);
|
|
newsplit = InitFolderSplit(splitInfo->filePath, newPartContentList, splitInfo->ObjectSize);
|
|
fileList = lappend(fileList, newsplit);
|
|
}
|
|
|
|
pfree_ext(split->fileName);
|
|
pfree_ext(split->filePath);
|
|
list_free(newPartContentList);
|
|
list_free(entryList);
|
|
entryList = NIL;
|
|
|
|
return fileList;
|
|
}
|
|
|
|
/*
|
|
* Dig the file split input. This is a general function to get list of file/directories for a given file path.
|
|
* (GetSubFiles is only for partition prunning.) If the path is a file, add itself to the list and return;
|
|
* else if the file is a directory and no file is found under it, return NIL; else if the directory has sub
|
|
* files/directories then add all the sub ones into the list and return.
|
|
*
|
|
* @_in param conn: The hdfs connect handle.
|
|
* @_in param split: The split file/directory to dig in.
|
|
* @return Return a list of one or more files. NIL means it is a empty directory.
|
|
*/
|
|
static List* DigFiles(dfs::DFSConnector* conn, SplitInfo* split)
|
|
{
|
|
List* fileList = NIL;
|
|
char* filePath = split->filePath;
|
|
SplitInfo* newsplit = NULL;
|
|
List* partContent = split->partContentList;
|
|
List* entryList = conn->listObjectsStat(filePath);
|
|
|
|
if (entryList == NIL) {
|
|
return NIL;
|
|
}
|
|
|
|
for (int i = 0; i < list_length(entryList); i++) {
|
|
SplitInfo* splitInfo = (SplitInfo*)list_nth(entryList, i);
|
|
newsplit = InitFolderSplit(splitInfo->filePath, partContent, splitInfo->ObjectSize);
|
|
fileList = lappend(fileList, newsplit);
|
|
}
|
|
|
|
list_free(entryList);
|
|
entryList = NIL;
|
|
return fileList;
|
|
}
|
|
|
|
List* GetPartitionList(Oid relid)
|
|
{
|
|
HeapTuple partTuple = NULL;
|
|
int2vector* partVec = NULL;
|
|
List* partList = NULL;
|
|
Datum datum;
|
|
bool isnull = false;
|
|
|
|
/* Check if the current foreign table is partitioned. */
|
|
if (!isPartitionedObject(relid, RELKIND_FOREIGN_TABLE, true))
|
|
return NIL;
|
|
|
|
/* Search the tuple related to the current foreign table in pg_partition. */
|
|
partTuple = searchPgPartitionByParentIdCopy(PART_OBJ_TYPE_PARTED_TABLE, relid);
|
|
if (!HeapTupleIsValid(partTuple))
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_CACHE_LOOKUP_FAILED),
|
|
errmodule(MOD_HDFS),
|
|
errmsg("cache lookup failed for relid %u", relid)));
|
|
|
|
datum = SysCacheGetAttr(PARTRELID, partTuple, Anum_pg_partition_partkey, &isnull);
|
|
if (isnull) {
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_CACHE_LOOKUP_FAILED),
|
|
errmodule(MOD_HDFS),
|
|
errmsg("Error happens when search the record in pg_partition for a partition table. ")));
|
|
} else {
|
|
partVec = (int2vector*)DatumGetPointer(datum);
|
|
}
|
|
|
|
/* Build the partition list from the partVec stored in tuple. */
|
|
for (int i = 0; i < partVec->dim1; i++) {
|
|
partList = lappend_int(partList, partVec->values[i]);
|
|
}
|
|
|
|
heap_freetuple(partTuple);
|
|
partTuple = NULL;
|
|
return partList;
|
|
}
|
|
|
|
/**
|
|
* @Description: fill the partition value into partContentList in order to
|
|
* read it by setPartinfoAndDesc function.
|
|
* @in conn, the DFS connextor.
|
|
* @in splitObject, store the partition value, we get partition value from it.
|
|
* @return none.
|
|
*/
|
|
void fillPartitionValueInSplitInfo(dfs::DFSConnector* conn, SplitInfo* splitObject, int partColNum)
|
|
{
|
|
int ibegin = find_Nth(splitObject->filePath, partColNum, "/");
|
|
int iend = find_Nth(splitObject->filePath, partColNum + 1, "/");
|
|
char* partitionStr = (char*)palloc0(iend - ibegin);
|
|
error_t rc = EOK;
|
|
rc = memcpy_s(partitionStr, iend - ibegin, splitObject->filePath + ibegin + 1, iend - ibegin - 1);
|
|
securec_check(rc, "\0", "\0");
|
|
|
|
splitObject->fileName = partitionStr;
|
|
Value* partitionValue = getPartitionValue(conn, partitionStr, splitObject->filePath);
|
|
splitObject->partContentList = lappend(splitObject->partContentList, partitionValue);
|
|
}
|
|
|
|
/*
|
|
* The function handle the whole process of partition prunning based on the partition column list,
|
|
* scanClauses and so on.
|
|
*
|
|
* @_in param conn: The handler of hdfs connect.
|
|
* @_in param partitionRelatedList: Includes partition list, file list and column list.
|
|
* @_in param scanClauses: The expression clauses of foreign scan for the prunning;
|
|
* @_in param foreignTableId: the relation oid of the current foreign table.
|
|
* @_out param prunningResult: Statistic of the partition prunning information for each layer.
|
|
* @_out param partList: The list of the partition column.
|
|
* @return Return the file list after the partition prunning.
|
|
*/
|
|
static List* PartitionPruneProcess(dfs::DFSConnector* conn, List* partitionRelatedList, List* scanClauses,
|
|
Oid foreignTableId, List*& prunningResult, List*& partList, ServerTypeOption srvType)
|
|
{
|
|
partList = GetPartitionList(foreignTableId);
|
|
|
|
ListCell* fileCell = NULL;
|
|
SplitInfo* split = NULL;
|
|
bool partitionSkipped = false;
|
|
List* fileList = (List*)linitial(partitionRelatedList);
|
|
List* columnList = (List*)lsecond(partitionRelatedList);
|
|
|
|
CheckPartitionColNumber(conn, partList, fileList, foreignTableId, srvType);
|
|
|
|
for (int i = 0; i < list_length(partList); i++) {
|
|
List* newFileList = NIL;
|
|
int partCol = list_nth_int(partList, i);
|
|
int sum = list_length(fileList);
|
|
|
|
/*
|
|
* If the fileList is empty, then all the files has been pruned and return immediately.
|
|
* In this case , the list of part info in the split is not complete, but it is ok because
|
|
* if we return NIL here, there will no task to scheduler and it will return in the begining
|
|
* of scan(don't read any orc file).
|
|
*/
|
|
if (0 == sum)
|
|
return NIL;
|
|
|
|
int notprunning = 0;
|
|
|
|
/* Fetch the current partition column var includes type and no. */
|
|
Var* value = GetVarFromColumnList(columnList, partCol);
|
|
|
|
/* The flag to control if we need to call PartitionFilterClause, true means not. */
|
|
bool skipPartitionFilter = ((NULL == value) || (0 == list_length(scanClauses)));
|
|
|
|
/*
|
|
* If the partition column is not required(value ==null) or the i exceed the max number of
|
|
* partition layers we can prune or the scanClauses is null, we will just fetch the partition
|
|
* column value without prunning.
|
|
*/
|
|
if (skipPartitionFilter) {
|
|
/* process all the directories in the fileList(they must be directory other than file) */
|
|
foreach (fileCell, fileList) {
|
|
split = (SplitInfo*)lfirst(fileCell);
|
|
if (T_HDFS_SERVER == srvType) {
|
|
newFileList = list_concat(newFileList, GetSubFiles(conn, split, partCol));
|
|
} else {
|
|
fillPartitionValueInSplitInfo(conn, split, i + split->prefixSlashNum);
|
|
newFileList = lappend(newFileList, split);
|
|
}
|
|
}
|
|
} else {
|
|
Expr* equalExpr = (Expr*)MakeOperatorExpression(value, BTEqualStrategyNumber);
|
|
|
|
/* process all the directories in the fileList(they must be directory other than file) */
|
|
ListCell* prev = NULL;
|
|
ListCell* next = NULL;
|
|
for (fileCell = list_head(fileList); fileCell != NULL; fileCell = next) {
|
|
CHECK_FOR_INTERRUPTS();
|
|
split = (SplitInfo*)lfirst(fileCell);
|
|
|
|
/* next cell */
|
|
next = lnext(fileCell);
|
|
|
|
if (T_OBS_SERVER == srvType) {
|
|
fillPartitionValueInSplitInfo(conn, split, i + split->prefixSlashNum);
|
|
}
|
|
|
|
/* Partition pruning by scanClauses. */
|
|
partitionSkipped = false;
|
|
|
|
partitionSkipped = PartitionFilterClause(split, scanClauses, value, equalExpr);
|
|
|
|
/*
|
|
* If we can not skip the current one, we need to Add the files under the directory to
|
|
* new file List which will be the fileList when we arrive to the next partition layer.
|
|
*/
|
|
if (!partitionSkipped) {
|
|
if (T_HDFS_SERVER == srvType) {
|
|
newFileList = list_concat(newFileList, GetSubFiles(conn, split, partCol));
|
|
} else {
|
|
newFileList = lappend(newFileList, split);
|
|
}
|
|
notprunning++;
|
|
|
|
/* prev cell */
|
|
prev = fileCell;
|
|
} else {
|
|
if (T_OBS_SERVER == srvType) {
|
|
pfree_ext(split->fileName);
|
|
pfree_ext(split->filePath);
|
|
pfree_ext(split);
|
|
|
|
/* remove from fileList */
|
|
fileList = list_delete_cell(fileList, fileCell, prev);
|
|
} else {
|
|
/* prev cell */
|
|
prev = fileCell;
|
|
}
|
|
}
|
|
}
|
|
|
|
/* collect the partition prunning statistic for the current partition layer. */
|
|
CollectPartPruneInfo(prunningResult, sum, notprunning, partCol, foreignTableId);
|
|
}
|
|
|
|
/* Clean the list if needed. */
|
|
if (T_HDFS_SERVER == srvType) {
|
|
list_free_deep(fileList);
|
|
} else {
|
|
list_free(fileList);
|
|
}
|
|
fileList = newFileList;
|
|
}
|
|
|
|
return fileList;
|
|
}
|
|
|
|
/*
|
|
* Check if the number of partition column is larger than in hdfs. The number of
|
|
* partition column defined in MPPDB can be smaller, but can never be larger.
|
|
*
|
|
* .e.g
|
|
* If we create a foreign table like: create foreign table hdfs_tab ~ partitioned by
|
|
* (c1, c2), then we will check if all the file paths include c1 and c2. Because we must
|
|
* ensure that the tree consisted of paths is absolutely balanceable which
|
|
* means all the paths have the same length. But there can be no files in the last
|
|
* partition directory, so we need to process this condition specially.
|
|
*
|
|
* @_in param conn: The handler of hdfs connect.
|
|
* @_in param partList: The list of partition columns.
|
|
* @_in param fileList: The file list defined when we create the foreign table.
|
|
* @_in param foreignTableId: The oid of the foreign table in catalog.
|
|
*/
|
|
static void CheckPartitionColNumber(
|
|
dfs::DFSConnector* conn, List* partList, List* fileList, Oid foreignTableId, ServerTypeOption srvType)
|
|
{
|
|
int length = list_length(partList);
|
|
if (0 == length) {
|
|
return;
|
|
}
|
|
if (T_HDFS_SERVER == srvType) {
|
|
SplitInfo* split = (SplitInfo*)linitial(fileList);
|
|
SplitInfo* curSplit = (SplitInfo*)copyObject(split);
|
|
|
|
for (int i = 0; i < length; i++) {
|
|
curSplit = CheckOneSubSplit(conn, curSplit, (i == length - 1), foreignTableId);
|
|
}
|
|
DestroySplit(curSplit);
|
|
} else {
|
|
ListCell* cell = NULL;
|
|
foreach (cell, fileList) {
|
|
SplitInfo* curSplit = (SplitInfo*)lfirst(cell);
|
|
if (find_Nth(curSplit->filePath, length + curSplit->prefixSlashNum, "/") == -1) {
|
|
QUERY_NOT_SUPPORT(foreignTableId,
|
|
"The number of partition columns "
|
|
"defined of foreign table %s is larger than it should be.");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/*
|
|
* We only check one split for each layer here when checking if the number of partition
|
|
* column is larger than defined in hdfs.
|
|
* @_in param conn: The handler of hdfs connect.
|
|
* @_in param split: The split to be checked.
|
|
* @_in param isLastPartition: Indicate if the split is of the last partition.
|
|
* @_in param foreignTableId: The oid of the foreign table in catalog.
|
|
* @return Return the first split under the current folder.
|
|
*/
|
|
static SplitInfo* CheckOneSubSplit(dfs::DFSConnector* conn, SplitInfo* split, bool isLastPartition, Oid foreignTableId)
|
|
{
|
|
char* folderName = split->filePath;
|
|
SplitInfo* newsplit = NULL;
|
|
List* entryList = NIL;
|
|
|
|
if (conn->isDfsFile(folderName)) {
|
|
delete (conn);
|
|
conn = NULL;
|
|
QUERY_NOT_SUPPORT(foreignTableId,
|
|
"The number of partition columns "
|
|
"defined of foreign table %s is larger than it should be.");
|
|
}
|
|
|
|
entryList = conn->listDirectory(folderName);
|
|
|
|
/*
|
|
* Here if the numEntries equals zero, we may need log a error. If we define N
|
|
* partition columns, then there must be N layers of folder.
|
|
*/
|
|
if (0 == list_length(entryList)) {
|
|
if (!isLastPartition) {
|
|
delete (conn);
|
|
conn = NULL;
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_FDW_ERROR),
|
|
errmodule(MOD_HDFS),
|
|
errmsg("Error occur when open partition folder: %s, "
|
|
"it is empty.",
|
|
folderName)));
|
|
}
|
|
} else {
|
|
newsplit = InitFolderSplit((char*)list_nth(entryList, 0), NIL, 0);
|
|
}
|
|
|
|
/* Clean the former split. */
|
|
DestroySplit(split);
|
|
list_free(entryList);
|
|
entryList = NIL;
|
|
|
|
return newsplit;
|
|
}
|
|
|
|
/*
|
|
* Check if we can skip the current split by the scanClauses.
|
|
* @_in param split: The split to check if it matches the scanclauses.
|
|
* @_in param scanClauses: The clauses generated from the optimizer.
|
|
* @_in param value: The var of the partition column.
|
|
* @_in_param equalExpr: The expression of the restriction to be built.
|
|
* @return Return true: we can skip the current split; false: we need to keep the split.
|
|
*/
|
|
static bool PartitionFilterClause(SplitInfo* split, List* scanClauses, Var* value, Expr* equalExpr)
|
|
{
|
|
Node* baseRestriction = NULL;
|
|
char* partValue = NULL;
|
|
List* partRestriction = NIL;
|
|
bool partSkipped = false;
|
|
char* fileName = split->fileName;
|
|
Assert(fileName != NULL);
|
|
Datum datumValue;
|
|
|
|
/* fetch the value of the current partition column. */
|
|
partValue = strchr(fileName, '=');
|
|
if (NULL == partValue) {
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_FDW_INVALID_OPTOIN_DATA),
|
|
errmodule(MOD_HDFS),
|
|
errmsg("Something wrong with the partition directory name of file %s.", split->filePath)));
|
|
}
|
|
|
|
if (0 == strncmp(partValue + 1, DEFAULT_HIVE_NULL, 26)) {
|
|
/* __HIVE_DEFAULT_PARTITION__ means the current value is NULL */
|
|
baseRestriction = BuildNullTestConstraint(value, IS_NULL);
|
|
} else {
|
|
/* Convert the string value to datum value. */
|
|
datumValue = GetDatumFromString(value->vartype, value->vartypmod, UriDecode(partValue + 1));
|
|
BuildConstraintConst(equalExpr, datumValue, false);
|
|
baseRestriction = (Node*)equalExpr;
|
|
}
|
|
|
|
partRestriction = lappend(partRestriction, baseRestriction);
|
|
|
|
/*
|
|
* Compare the size of strings by using "C" format in coarse filter.
|
|
*/
|
|
List* tempScanClauses = (List*)copyObject(scanClauses);
|
|
List* opExprList = pull_opExpr((Node*)tempScanClauses);
|
|
ListCell* lc = NULL;
|
|
foreach (lc, opExprList) {
|
|
OpExpr* opExpr = (OpExpr*)lfirst(lc);
|
|
opExpr->inputcollid = C_COLLATION_OID;
|
|
}
|
|
|
|
/*
|
|
* Call the function afforded by PG to try if we can refute the predicate, if partSkipped
|
|
* is true then we can skip the current split(file), otherwise we need it.
|
|
*/
|
|
partSkipped = predicate_refuted_by(partRestriction, tempScanClauses, true);
|
|
list_free_ext(opExprList);
|
|
list_free_ext(partRestriction);
|
|
list_free_deep(tempScanClauses);
|
|
tempScanClauses = NIL;
|
|
|
|
return partSkipped;
|
|
}
|
|
|
|
/*
|
|
* Collect the partition prunning result which includes the total number of files and the left number
|
|
* of files for each layer.
|
|
*
|
|
* @_out param prunningResult: The result list of partition prunning.
|
|
* @_in param sum: The total number of files for prunning.
|
|
* @_in param notprunning: The left number of files after prunning.
|
|
* @_in param colno: The no of partition column.
|
|
* @_in param relOid: The oid of the partitioned foreign table.
|
|
*/
|
|
static void CollectPartPruneInfo(List*& prunningResult, int sum, int notprunning, int colno, Oid relOid)
|
|
{
|
|
Relation rel = heap_open(relOid, AccessShareLock);
|
|
char* attName = NameStr(rel->rd_att->attrs[colno - 1]->attname);
|
|
|
|
/*
|
|
* Add 16 here because we need to add some description words, three separate characters
|
|
* (';', '(', ')') and termination '\0'.
|
|
*/
|
|
int length = strlen(attName) + GetDigitOfInt(sum) + GetDigitOfInt(notprunning) + 16;
|
|
char* tmp = (char*)palloc0(sizeof(char) * length);
|
|
int ret = 0;
|
|
ret = snprintf_s(
|
|
tmp, sizeof(char) * length, sizeof(char) * length - 1, "%s(total %d; left %d)", attName, sum, notprunning);
|
|
securec_check_ss(ret, "\0", "\0");
|
|
heap_close(rel, AccessShareLock);
|
|
|
|
Value* v = makeString(tmp);
|
|
prunningResult = lappend(prunningResult, v);
|
|
}
|
|
|
|
/*
|
|
* Drill down until there is only file in the file list(no directories).
|
|
*
|
|
* @_in param conn: The handler of hdfs connect.
|
|
* @_in param fileList: The list of files before the drill.
|
|
* @return Return the list of files after drilling.
|
|
*/
|
|
static List* DrillDown(dfs::DFSConnector* conn, List* fileList)
|
|
{
|
|
ListCell* fileCell = NULL;
|
|
SplitInfo* split = NULL;
|
|
List* nextFileList = NIL;
|
|
List* fileListList = NIL;
|
|
|
|
while (0 != list_length(fileList)) {
|
|
bool isAllFile = true;
|
|
|
|
/* Dig in each file in the file list. */
|
|
foreach (fileCell, fileList) {
|
|
CHECK_FOR_INTERRUPTS();
|
|
split = (SplitInfo*)lfirst(fileCell);
|
|
if (split->ObjectSize > 0) {
|
|
fileListList = lappend(fileListList, split);
|
|
continue;
|
|
} else {
|
|
isAllFile = false;
|
|
nextFileList = list_concat(nextFileList, DigFiles(conn, split));
|
|
}
|
|
}
|
|
|
|
/*
|
|
* If all the split is file, we have digged into the bottom
|
|
* and it is time to break the loop and return.
|
|
*/
|
|
if (isAllFile) {
|
|
break;
|
|
} else {
|
|
list_free(fileList);
|
|
fileList = nextFileList;
|
|
nextFileList = NIL;
|
|
}
|
|
}
|
|
|
|
return fileListList;
|
|
}
|
|
|
|
#ifdef ENABLE_MULTIPLE_NODES
|
|
/*
|
|
* @Description: filter *.carbondata from fileList
|
|
* @IN conn: dfs connection
|
|
* @IN fileList: all file list to be filter
|
|
* @IN allColumnList: column list for query
|
|
* @IN restrictColumnList: restrict column list
|
|
* @IN scanClauses: scan clause
|
|
* @IN attrNum: attr num
|
|
* @Return: *.carbondata file list for query
|
|
*/
|
|
List* CarbonDataFile(dfs::DFSConnector* conn, List* fileList, List* allColumnList, List* restrictColumnList,
|
|
List* scanClauses, int16 attrNum)
|
|
{
|
|
Assert(NIL != fileList);
|
|
|
|
if (0 == list_length(fileList)) {
|
|
return NIL;
|
|
}
|
|
|
|
/* if no where condition, do not read *.carbonindex file */
|
|
/* if (enable_indexscan == false), do not read *.carbonindex file */
|
|
if ((0 == list_length(restrictColumnList)) || (u_sess->attr.attr_sql.enable_indexscan == false)) {
|
|
List* dataFileList = dfs::CarbonFileFilter(fileList, CARBONDATA_DATA);
|
|
list_free(fileList);
|
|
|
|
ereport(DEBUG1, (errmodule(MOD_CARBONDATA), errmsg("Ignore *.carbonindex file.")));
|
|
|
|
return dataFileList;
|
|
}
|
|
|
|
/* get .carbonindex file from obs file list */
|
|
List* indexFileList = dfs::CarbonFileFilter(fileList, CARBONDATA_INDEX);
|
|
|
|
/* init readerState for inputstream */
|
|
dfs::reader::ReaderState* readerState = (dfs::reader::ReaderState*)palloc0(sizeof(dfs::reader::ReaderState));
|
|
|
|
readerState->persistCtx = AllocSetContextCreate(CurrentMemoryContext,
|
|
"carbon index reader context",
|
|
ALLOCSET_DEFAULT_MINSIZE,
|
|
ALLOCSET_DEFAULT_INITSIZE,
|
|
ALLOCSET_DEFAULT_MAXSIZE);
|
|
|
|
/* switch MemoryContext */
|
|
MemoryContext oldcontext = MemoryContextSwitchTo(readerState->persistCtx);
|
|
|
|
/* init restrictRequired and readRequired */
|
|
bool* restrictRequired = (bool*)palloc0(sizeof(bool) * attrNum);
|
|
bool* readRequired = (bool*)palloc0(sizeof(bool) * attrNum);
|
|
|
|
ListCell* lc = NULL;
|
|
Var* variable = NULL;
|
|
|
|
foreach (lc, restrictColumnList) {
|
|
variable = (Var*)lfirst(lc);
|
|
Assert(variable->varattno <= attrNum);
|
|
restrictRequired[variable->varattno - 1] = true;
|
|
}
|
|
|
|
foreach (lc, allColumnList) {
|
|
variable = (Var*)lfirst(lc);
|
|
Assert(variable->varattno <= attrNum);
|
|
readRequired[variable->varattno - 1] = true;
|
|
}
|
|
|
|
/* init queryRestrictionList */
|
|
List* queryRestrictionList = ExtractNonParamRestriction((List*)copyObject(scanClauses));
|
|
|
|
/* get .carbondata file from .carbonindex file */
|
|
List* dataFileList = NIL;
|
|
ListCell* cell = NULL;
|
|
|
|
for (cell = list_head(indexFileList); cell != NULL; cell = lnext(cell)) {
|
|
void* data = lfirst(cell);
|
|
if (IsA(data, SplitInfo)) {
|
|
SplitInfo* splitinfo = (SplitInfo*)data;
|
|
char* filePath = splitinfo->filePath;
|
|
|
|
readerState->currentSplit = splitinfo;
|
|
readerState->currentFileSize = splitinfo->ObjectSize;
|
|
readerState->currentFileID = -1;
|
|
|
|
std::unique_ptr<dfs::GSInputStream> gsInputStream =
|
|
dfs::InputStreamFactory(conn, filePath, readerState, false);
|
|
|
|
dfs::reader::CarbondataIndexReader indexFileReader(
|
|
allColumnList, queryRestrictionList, restrictRequired, readRequired, attrNum);
|
|
|
|
indexFileReader.init(std::move(gsInputStream));
|
|
|
|
indexFileReader.readIndex();
|
|
|
|
/* Data File Deduplication from indexfile */
|
|
dataFileList = list_concat(dataFileList, indexFileReader.getDataFileDeduplication());
|
|
}
|
|
}
|
|
|
|
fileList = dfs::CarbonDataFileMatch(fileList, dataFileList);
|
|
|
|
/* release memory */
|
|
(void)MemoryContextSwitchTo(oldcontext);
|
|
if (NULL != readerState->persistCtx && readerState->persistCtx != CurrentMemoryContext) {
|
|
MemoryContextDelete(readerState->persistCtx);
|
|
pfree(readerState);
|
|
readerState = NULL;
|
|
}
|
|
|
|
return fileList;
|
|
}
|
|
|
|
/*
|
|
* @Description: extract non-param restriction
|
|
* @IN opExpressionList: operate expression list
|
|
* @Return: non-param restriction list
|
|
*/
|
|
static List* ExtractNonParamRestriction(List* opExpressionList)
|
|
{
|
|
ListCell* lc = NULL;
|
|
Expr* expr = NULL;
|
|
List* retRestriction = NIL;
|
|
|
|
foreach (lc, opExpressionList) {
|
|
expr = (Expr*)lfirst(lc);
|
|
if (IsA(expr, OpExpr)) {
|
|
Node* leftop = get_leftop(expr);
|
|
Node* rightop = get_rightop(expr);
|
|
Assert(leftop != NULL);
|
|
Assert(rightop != NULL);
|
|
if ((IsVarNode(leftop) && IsParamConst(rightop)) || (IsVarNode(rightop) && IsParamConst(leftop))) {
|
|
continue;
|
|
}
|
|
}
|
|
retRestriction = lappend(retRestriction, expr);
|
|
}
|
|
|
|
List* opExprList = pull_opExpr((Node*)retRestriction);
|
|
foreach (lc, opExprList) {
|
|
OpExpr* opExpr = (OpExpr*)lfirst(lc);
|
|
opExpr->inputcollid = C_COLLATION_OID;
|
|
}
|
|
list_free_ext(opExprList);
|
|
return retRestriction;
|
|
}
|
|
#endif
|