提交 de0b7bb7 编写于 作者: F Francisco Guerrero 提交者: Francisco Guerrero

PXF FDW: Run Fragmenter call from master only

Currently, every segment node retrieves metadata about the list of
fragments it's going to process. Then it filters out fragments assigned
to that segment, and then it processes each fragment, one at a time.
This operation can stress the external metadata servers when the
Greenplum cluster is large, because every segment will connect at the
same time to the external system to fetch metadata. An optimization was
introduced in PXF to cache the metadata at the PXF Server level, when
multiple segments were trying to access the same metadata, PXF would
only issue 1 query to the external system. This helped improved the
situation, but still, every segment host was getting the same metadata.

In Foreign Data Wrappers, this metadata query can be done in a single
place from master. And master can provide this information to the
segments.
上级 15779931
......@@ -13,12 +13,7 @@
#include "pxf_filter.h"
#include "pxf_fragment.h"
#include "access/sysattr.h"
#include "access/reloptions.h"
#include "catalog/pg_foreign_server.h"
#include "catalog/pg_foreign_table.h"
#include "catalog/pg_user_mapping.h"
#include "catalog/pg_type.h"
#include "cdb/cdbsreh.h"
#include "cdb/cdbvars.h"
#include "commands/copy.h"
......@@ -27,7 +22,6 @@
#include "foreign/fdwapi.h"
#include "foreign/foreign.h"
#include "nodes/pg_list.h"
#include "nodes/makefuncs.h"
#include "optimizer/paths.h"
#include "optimizer/pathnode.h"
#include "optimizer/planmain.h"
......@@ -176,7 +170,9 @@ enum FdwScanPrivateIndex
/* WHERE clauses to be sent to PXF (as a String node) */
FdwScanPrivateWhereClauses,
/* Integer list of attribute numbers retrieved by the SELECT */
FdwScanPrivateRetrievedAttrs
FdwScanPrivateRetrievedAttrs,
/* List of fragments to be processed by the segments */
FdwScanPrivateFragmentList
};
/*
......@@ -319,7 +315,6 @@ pxfGetForeignPlan(PlannerInfo *root,
* Items in the list must match enum FdwScanPrivateIndex, above.
*/
/* here we serialize the WHERE clauses */
char *where_clauses_str = SerializePxfFilterQuals(fpinfo->remote_conds);
......@@ -372,11 +367,61 @@ pxfBeginForeignScan(ForeignScanState *node, int eflags)
if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
return;
List *quals = node->ss.ps.qual;
List* serializedFragmentList;
List* fragments;
ForeignTable *rel = GetForeignTable(RelationGetRelid(node->ss.ss_currentRelation));
List *quals = node->ss.ps.qual;
Oid foreigntableid = RelationGetRelid(node->ss.ss_currentRelation);
PxfFdwScanState *pxfsstate = NULL;
Relation relation = node->ss.ss_currentRelation;
ForeignScan *foreignScan = (ForeignScan *) node->ss.ps.plan;
PxfFdwScanState *pxfsstate = NULL;
Relation relation = node->ss.ss_currentRelation;
ForeignScan *foreignScan = (ForeignScan *) node->ss.ps.plan;
PxfOptions *options = PxfGetOptions(foreigntableid);
/* retrieve fdw-private information from pxfGetForeignPlan() */
char *filter_str = strVal(list_nth(foreignScan->fdw_private, FdwScanPrivateWhereClauses));
List *retrieved_attrs = (List *) list_nth(foreignScan->fdw_private, FdwScanPrivateRetrievedAttrs);
/*
* When running queries on all segments, master makes the fragmenter call
* and segments receive a List of FragmentData from master. When the query
* only runs on master or any, the fragment list is retrieved by the
* executing process.
*/
if (rel->exec_location != FTEXECLOCATION_ALL_SEGMENTS || Gp_role == GP_ROLE_DISPATCH)
{
fragments = GetFragmentList(options,
relation,
filter_str,
retrieved_attrs);
if (Gp_role == GP_ROLE_DISPATCH)
{
/*
* serialize fragment list and pass it down to segments
* by appending it to foreignScan->fdw_private
*/
serializedFragmentList = SerializeFragmentList(fragments);
foreignScan->fdw_private = lappend(foreignScan->fdw_private, serializedFragmentList);
/* master does not process any fragments */
return;
}
}
else
{
serializedFragmentList = (List *) list_nth(foreignScan->fdw_private, FdwScanPrivateFragmentList);
fragments = DeserializeFragmentList(serializedFragmentList);
/*
* Call the work allocation algorithm when execution happens on all
* segments
*/
fragments = FilterFragmentsForSegment(fragments);
}
/* Assign PXF location for the allocated fragments */
AssignPxfLocationToFragments(options, fragments);
/*
* Save state in node->fdw_state. We must save enough information to call
......@@ -385,18 +430,12 @@ pxfBeginForeignScan(ForeignScanState *node, int eflags)
pxfsstate = (PxfFdwScanState *) palloc(sizeof(PxfFdwScanState));
initStringInfo(&pxfsstate->uri);
pxfsstate->options = PxfGetOptions(foreigntableid);
/* retrieve fdw-private stuff from pxfGetForeignPlan() */
pxfsstate->filter_str = strVal(list_nth(foreignScan->fdw_private, FdwScanPrivateWhereClauses));
pxfsstate->retrieved_attrs = (List *) list_nth(foreignScan->fdw_private, FdwScanPrivateRetrievedAttrs);
pxfsstate->filter_str = filter_str;
pxfsstate->fragments = fragments;
pxfsstate->options = options;
pxfsstate->quals = quals;
pxfsstate->relation = relation;
pxfsstate->fragments = GetFragmentList(pxfsstate->options,
pxfsstate->relation,
pxfsstate->filter_str,
pxfsstate->retrieved_attrs);
pxfsstate->retrieved_attrs = retrieved_attrs;
InitCopyState(pxfsstate);
node->fdw_state = (void *) pxfsstate;
......@@ -422,7 +461,6 @@ pxfIterateForeignScan(ForeignScanState *node)
ErrorContextCallback errcallback;
bool found;
/* Set up callback to identify error line number. */
errcallback.callback = CopyFromErrorCallback;
errcallback.arg = (void *) pxfsstate->cstate;
......
......@@ -1401,9 +1401,7 @@ SerializePxfFilterQuals(List *quals)
char *result = NULL;
if (quals == NULL)
{
return result;
}
List *clauses = NULL;
ListCell *lc;
......
......@@ -13,19 +13,17 @@
#define LOG_DEBUG (FRAGDEBUG >= log_min_messages) || (FRAGDEBUG >= client_min_messages)
static List *GetDataFragmentList(PxfOptions *options, ClientContext * client_context);
static void rest_request(PxfOptions *options, ClientContext * client_context, char *rest_msg);
static List *parse_get_fragments_response(List *fragments, StringInfo rest_buf);
static List *FilterFragmentsForSegment(List *list);
static void init(ClientContext * client_context);
static void logFragmentList(const char *debugHeader, List *fragments);
static void init_client_context(ClientContext * client_context);
static void AssignPxfLocationToFragments(PxfOptions *options, List *fragments);
static void call_rest(PxfOptions *options, ClientContext * client_context, char *rest_msg);
static void process_request(ClientContext * client_context, char *uri);
static void pxf_fragment_scalar(void *state, char *token, JsonTokenType type);
static void pxf_fragment_object_start(void *state, char *name, bool isnull);
static void pxf_array_element_start(void *state, bool isnull);
static void pxf_array_element_end(void *state, bool isnull);
static void RestRequest(PxfOptions *options, ClientContext * client_context, char *rest_msg);
static List *ParseGetFragmentsResponse(StringInfo rest_buf);
static void Init(ClientContext * client_context);
static void LogFragmentList(const char *debugHeader, List *fragments);
static void InitClientContext(ClientContext * client_context);
static void CallRest(PxfOptions *options, ClientContext * client_context, char *rest_msg);
static void ProcessRequest(ClientContext * client_context, char *uri);
static void PxfFragmentScalar(void *state, char *token, JsonTokenType type);
static void PxfFragmentObjectStart(void *state, char *name, bool isnull);
static void PxfArrayElementStart(void *state, bool isnull);
static void PxfArrayElementEnd(void *state, bool isnull);
/* Get List of fragments using PXF
* Returns selected fragments that have been allocated to the current segment
......@@ -44,7 +42,7 @@ GetFragmentList(PxfOptions *options,
Assert(options != NULL);
/* 1. Initialize curl headers */
init(&client_context);
Init(&client_context);
/* Enrich the curl HTTP header */
BuildHttpHeaders(client_context.http_headers,
......@@ -61,24 +59,7 @@ GetFragmentList(PxfOptions *options,
return NIL;
if (LOG_DEBUG)
logFragmentList("Available Data fragments", data_fragments);
if (options->exec_location == FTEXECLOCATION_ALL_SEGMENTS)
{
/*
* Call the work allocation algorithm when execution happens on all
* segments
*/
data_fragments = FilterFragmentsForSegment(data_fragments);
if (data_fragments == NIL)
return NIL;
}
/* Assign PXF location for the allocated fragments */
AssignPxfLocationToFragments(options, data_fragments);
if (LOG_DEBUG)
logFragmentList("Allocated Data fragments", data_fragments);
LogFragmentList("Available Data fragments", data_fragments);
return data_fragments;
}
......@@ -87,7 +68,7 @@ GetFragmentList(PxfOptions *options,
* Assign PXF Host for each Data Fragment
* Will use the same host as the existing segment as the PXF host.
*/
static void
void
AssignPxfLocationToFragments(PxfOptions *options, List *fragments)
{
ListCell *frag_c = NULL;
......@@ -95,7 +76,6 @@ AssignPxfLocationToFragments(PxfOptions *options, List *fragments)
foreach(frag_c, fragments)
{
FragmentData *fragment = (FragmentData *) lfirst(frag_c);
fragment->authority = psprintf("%s:%d", options->pxf_host, options->pxf_port);
}
}
......@@ -112,13 +92,13 @@ static List *
GetDataFragmentList(PxfOptions *options,
ClientContext * client_context)
{
List *data_fragments = NIL;
List *data_fragments;
char *restMsg = "http://%s:%d/%s/%s/Fragmenter/getFragments";
rest_request(options, client_context, restMsg);
RestRequest(options, client_context, restMsg);
/* parse the JSON response and form a fragments list to return */
data_fragments = parse_get_fragments_response(data_fragments, &(client_context->the_rest_buf));
data_fragments = ParseGetFragmentsResponse(&(client_context->the_rest_buf));
return data_fragments;
}
......@@ -129,12 +109,12 @@ GetDataFragmentList(PxfOptions *options,
* to failover with a retry for the HA HDFS scenario.
*/
static void
rest_request(PxfOptions *options, ClientContext * client_context, char *rest_msg)
RestRequest(PxfOptions *options, ClientContext * client_context, char *rest_msg)
{
Assert(options->pxf_host != NULL && options->pxf_port > 0);
/* construct the request */
call_rest(options, client_context, rest_msg);
CallRest(options, client_context, rest_msg);
}
/*
......@@ -168,7 +148,7 @@ typedef struct FragmentState
} FragmentState;
static void
pxf_fragment_object_start(void *state, char *name, bool isnull)
PxfFragmentObjectStart(void *state, char *name, bool isnull)
{
FragmentState *s = (FragmentState *) state;
......@@ -216,8 +196,8 @@ pxf_fragment_object_start(void *state, char *name, bool isnull)
}
static void
check_and_assign(char **field, JsonTokenType type, char *token,
JsonTokenType expected_type, bool nullable)
CheckAndAssign(char **field, JsonTokenType type, char *token,
JsonTokenType expected_type, bool nullable)
{
if (type == JSON_TOKEN_NULL && nullable)
return;
......@@ -231,7 +211,7 @@ check_and_assign(char **field, JsonTokenType type, char *token,
}
static void
pxf_fragment_scalar(void *state, char *token, JsonTokenType type)
PxfFragmentScalar(void *state, char *token, JsonTokenType type)
{
FragmentState *s = (FragmentState *) state;
FragmentData *d = (FragmentData *) llast(s->fragments);
......@@ -243,19 +223,19 @@ pxf_fragment_scalar(void *state, char *token, JsonTokenType type)
switch (s->object)
{
case PXF_PARSE_USERDATA:
check_and_assign(&(d->user_data), type, token, JSON_TOKEN_STRING, true);
CheckAndAssign(&(d->user_data), type, token, JSON_TOKEN_STRING, true);
break;
case PXF_PARSE_METADATA:
check_and_assign(&(d->fragment_md), type, token, JSON_TOKEN_STRING, true);
CheckAndAssign(&(d->fragment_md), type, token, JSON_TOKEN_STRING, true);
break;
case PXF_PARSE_PROFILE:
check_and_assign(&(d->profile), type, token, JSON_TOKEN_STRING, true);
CheckAndAssign(&(d->profile), type, token, JSON_TOKEN_STRING, true);
break;
case PXF_PARSE_SOURCENAME:
check_and_assign(&(d->source_name), type, token, JSON_TOKEN_STRING, false);
CheckAndAssign(&(d->source_name), type, token, JSON_TOKEN_STRING, false);
break;
case PXF_PARSE_INDEX:
check_and_assign(&(d->index), type, token, JSON_TOKEN_NUMBER, true);
CheckAndAssign(&(d->index), type, token, JSON_TOKEN_NUMBER, true);
break;
case PXF_PARSE_REPLICAS:
if (type == JSON_TOKEN_STRING)
......@@ -270,7 +250,7 @@ pxf_fragment_scalar(void *state, char *token, JsonTokenType type)
}
static void
pxf_array_element_start(void *state, bool isnull)
PxfArrayElementStart(void *state, bool isnull)
{
FragmentState *s = (FragmentState *) state;
FragmentData *data;
......@@ -294,7 +274,7 @@ pxf_array_element_start(void *state, bool isnull)
}
static void
pxf_array_element_end(void *state, bool isnull)
PxfArrayElementEnd(void *state, bool isnull)
{
FragmentState *s = (FragmentState *) state;
......@@ -307,7 +287,7 @@ pxf_array_element_end(void *state, bool isnull)
}
static List *
parse_get_fragments_response(List *fragments, StringInfo rest_buf)
ParseGetFragmentsResponse(StringInfo rest_buf)
{
JsonSemAction *sem;
FragmentState *state;
......@@ -322,10 +302,10 @@ parse_get_fragments_response(List *fragments, StringInfo rest_buf)
state->has_replicas = false;
sem->semstate = state;
sem->scalar = pxf_fragment_scalar;
sem->object_field_start = pxf_fragment_object_start;
sem->array_element_start = pxf_array_element_start;
sem->array_element_end = pxf_array_element_end;
sem->scalar = PxfFragmentScalar;
sem->object_field_start = PxfFragmentObjectStart;
sem->array_element_start = PxfArrayElementStart;
sem->array_element_end = PxfArrayElementEnd;
pg_parse_json(state->lex, sem);
......@@ -339,7 +319,7 @@ parse_get_fragments_response(List *fragments, StringInfo rest_buf)
* Removes the elements which will not be processed from the list and frees up their memory.
* Returns the resulting list, or NIL if no elements satisfy the condition.
*/
static List *
List *
FilterFragmentsForSegment(List *list)
{
if (!list)
......@@ -394,7 +374,7 @@ FilterFragmentsForSegment(List *list)
ListCell *to_delete = current;
if (to_delete->data.ptr_value)
free_fragment((FragmentData *) to_delete->data.ptr_value);
FreeFragment((FragmentData *) to_delete->data.ptr_value);
current = lnext(to_delete);
result = list_delete_cell(list, to_delete, previous);
}
......@@ -402,14 +382,71 @@ FilterFragmentsForSegment(List *list)
return result;
}
/*
* Serializes a List of FragmentData such that it can be transmitted
* via the interconnect, and the list can be passed from master to
* segments
*/
List *
SerializeFragmentList(List *fragments)
{
ListCell *frag_c = NULL;
List *serializedFragmentList = NIL;
List *serializedFragment;
foreach(frag_c, fragments)
{
FragmentData *fragment = (FragmentData *) lfirst(frag_c);
serializedFragment = NIL;
serializedFragment = lappend(serializedFragment, makeString(fragment->index));
serializedFragment = lappend(serializedFragment, makeString(fragment->source_name));
serializedFragment = lappend(serializedFragment, makeString(fragment->fragment_md));
serializedFragment = lappend(serializedFragment, makeString(fragment->user_data));
serializedFragment = lappend(serializedFragment, makeString(fragment->profile));
serializedFragment = lappend(serializedFragment, makeString(psprintf("%d", fragment->fragment_idx)));
serializedFragmentList = lappend(serializedFragmentList, serializedFragment);
}
return serializedFragmentList;
}
/*
* Deserializes a List of FragmentData received from the interconnect
*/
List *
DeserializeFragmentList(List *serializedFragmentList)
{
ListCell *frag_c = NULL;
List *fragments = NIL;
foreach(frag_c, serializedFragmentList)
{
List *serializedFragment = (List *) lfirst(frag_c);
FragmentData *fragment = (FragmentData *) palloc0(sizeof(FragmentData));
fragment->index = strVal(list_nth(serializedFragment, 0));
fragment->source_name = strVal(list_nth(serializedFragment, 1));
fragment->fragment_md = strVal(list_nth(serializedFragment, 2));
fragment->user_data = strVal(list_nth(serializedFragment, 3));
fragment->profile = strVal(list_nth(serializedFragment, 4));
fragment->fragment_idx = atoi(strVal(list_nth(serializedFragment, 5)));
fragments = lappend(fragments, fragment);
}
return fragments;
}
/*
* Preliminary curl initializations for the REST communication
*/
static void
init(ClientContext * client_context)
Init(ClientContext * client_context)
{
InitClientContext(client_context);
/* Communication with back-end, initialize churl client context and header */
init_client_context(client_context);
client_context->http_headers = churl_headers_init();
/* set HTTP header that guarantees response in JSON format */
......@@ -420,7 +457,7 @@ init(ClientContext * client_context)
* Free fragment data
*/
void
free_fragment(FragmentData *data)
FreeFragment(FragmentData *data)
{
if (data->authority)
pfree(data->authority);
......@@ -442,7 +479,7 @@ free_fragment(FragmentData *data)
* response to <GET_BLOCK_LOCATIONS> request
*/
static void
logFragmentList(const char *debugHeader, List *fragments)
LogFragmentList(const char *debugHeader, List *fragments)
{
ListCell *fragment_cell = NULL;
StringInfoData log_str;
......@@ -473,7 +510,7 @@ logFragmentList(const char *debugHeader, List *fragments)
* Initializes the client context
*/
static void
init_client_context(ClientContext * client_context)
InitClientContext(ClientContext * client_context)
{
client_context->http_headers = NULL;
client_context->handle = NULL;
......@@ -488,7 +525,7 @@ init_client_context(ClientContext * client_context)
* <hadoop_uri->host>:<hadoop_uri->port>
*/
static void
call_rest(PxfOptions *options, ClientContext * client_context, char *rest_msg)
CallRest(PxfOptions *options, ClientContext * client_context, char *rest_msg)
{
StringInfoData request;
......@@ -501,7 +538,7 @@ call_rest(PxfOptions *options, ClientContext * client_context, char *rest_msg)
PXF_VERSION);
/* send the request. The response will exist in rest_buf.data */
process_request(client_context, request.data);
ProcessRequest(client_context, request.data);
pfree(request.data);
}
......@@ -509,7 +546,7 @@ call_rest(PxfOptions *options, ClientContext * client_context, char *rest_msg)
* Reads from churl in chunks of 64K and copies data to the context's buffer
*/
static void
process_request(ClientContext * client_context, char *uri)
ProcessRequest(ClientContext * client_context, char *uri)
{
size_t n = 0;
char buffer[RAW_BUF_SIZE];
......
......@@ -74,9 +74,35 @@ extern List *GetFragmentList(PxfOptions *options,
char *filter_string,
List *retrieved_attrs);
/*
* Takes a list of fragments and determines which ones need to be processed
* by the given segment based on MOD function. Removes the elements which will
* not be processed from the list and frees up their memory.
* Returns the resulting list, or NIL if no elements satisfy the condition.
*/
extern List *FilterFragmentsForSegment(List *list);
/*
* Serializes a List of FragmentData such that it can be transmitted
* via the interconnect, and the list can be passed from master to
* segments
*/
extern List * SerializeFragmentList(List *fragments);
/*
* Deserializes a List of FragmentData received from the interconnect
*/
extern List * DeserializeFragmentList(List *serializedFragmentList);
/*
* Assign PXF Host for each Data Fragment
* Will use the same host as the existing segment as the PXF host.
*/
extern void AssignPxfLocationToFragments(PxfOptions *options, List *fragments);
/*
* Frees the given fragment
*/
extern void free_fragment(FragmentData *data);
extern void FreeFragment(FragmentData *data);
#endif /* GPDB_PXFFRAGMENT_H */
......@@ -103,7 +103,7 @@ BuildHttpHeaders(CHURL_HEADERS headers,
AddOptionsToHttpHeader(headers, options->copy_options);
/* filters */
if (filter_string != NULL)
if (filter_string && strcmp(filter_string, "") != 0)
{
churl_headers_append(headers, "X-GP-FILTER", filter_string);
churl_headers_append(headers, "X-GP-HAS-FILTER", "1");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册