diff --git a/contrib/pxf_fdw/pxf_fdw.c b/contrib/pxf_fdw/pxf_fdw.c index 4549d0288495c5681b1d113d6404d45639c597a4..8a73fc0808da32ff5cffd7b229b517fb13329e82 100644 --- a/contrib/pxf_fdw/pxf_fdw.c +++ b/contrib/pxf_fdw/pxf_fdw.c @@ -13,12 +13,7 @@ #include "pxf_filter.h" #include "pxf_fragment.h" -#include "access/sysattr.h" #include "access/reloptions.h" -#include "catalog/pg_foreign_server.h" -#include "catalog/pg_foreign_table.h" -#include "catalog/pg_user_mapping.h" -#include "catalog/pg_type.h" #include "cdb/cdbsreh.h" #include "cdb/cdbvars.h" #include "commands/copy.h" @@ -27,7 +22,6 @@ #include "foreign/fdwapi.h" #include "foreign/foreign.h" #include "nodes/pg_list.h" -#include "nodes/makefuncs.h" #include "optimizer/paths.h" #include "optimizer/pathnode.h" #include "optimizer/planmain.h" @@ -176,7 +170,9 @@ enum FdwScanPrivateIndex /* WHERE clauses to be sent to PXF (as a String node) */ FdwScanPrivateWhereClauses, /* Integer list of attribute numbers retrieved by the SELECT */ - FdwScanPrivateRetrievedAttrs + FdwScanPrivateRetrievedAttrs, + /* List of fragments to be processed by the segments */ + FdwScanPrivateFragmentList }; /* @@ -319,7 +315,6 @@ pxfGetForeignPlan(PlannerInfo *root, * Items in the list must match enum FdwScanPrivateIndex, above. */ - /* here we serialize the WHERE clauses */ char *where_clauses_str = SerializePxfFilterQuals(fpinfo->remote_conds); @@ -372,11 +367,61 @@ pxfBeginForeignScan(ForeignScanState *node, int eflags) if (eflags & EXEC_FLAG_EXPLAIN_ONLY) return; - List *quals = node->ss.ps.qual; + List* serializedFragmentList; + List* fragments; + ForeignTable *rel = GetForeignTable(RelationGetRelid(node->ss.ss_currentRelation)); + + List *quals = node->ss.ps.qual; Oid foreigntableid = RelationGetRelid(node->ss.ss_currentRelation); - PxfFdwScanState *pxfsstate = NULL; - Relation relation = node->ss.ss_currentRelation; - ForeignScan *foreignScan = (ForeignScan *) node->ss.ps.plan; + PxfFdwScanState *pxfsstate = NULL; + Relation relation = node->ss.ss_currentRelation; + ForeignScan *foreignScan = (ForeignScan *) node->ss.ps.plan; + PxfOptions *options = PxfGetOptions(foreigntableid); + + /* retrieve fdw-private information from pxfGetForeignPlan() */ + char *filter_str = strVal(list_nth(foreignScan->fdw_private, FdwScanPrivateWhereClauses)); + List *retrieved_attrs = (List *) list_nth(foreignScan->fdw_private, FdwScanPrivateRetrievedAttrs); + + /* + * When running queries on all segments, master makes the fragmenter call + * and segments receive a List of FragmentData from master. When the query + * only runs on master or any, the fragment list is retrieved by the + * executing process. + */ + if (rel->exec_location != FTEXECLOCATION_ALL_SEGMENTS || Gp_role == GP_ROLE_DISPATCH) + { + fragments = GetFragmentList(options, + relation, + filter_str, + retrieved_attrs); + + if (Gp_role == GP_ROLE_DISPATCH) + { + /* + * serialize fragment list and pass it down to segments + * by appending it to foreignScan->fdw_private + */ + serializedFragmentList = SerializeFragmentList(fragments); + foreignScan->fdw_private = lappend(foreignScan->fdw_private, serializedFragmentList); + + /* master does not process any fragments */ + return; + } + } + else + { + serializedFragmentList = (List *) list_nth(foreignScan->fdw_private, FdwScanPrivateFragmentList); + fragments = DeserializeFragmentList(serializedFragmentList); + + /* + * Call the work allocation algorithm when execution happens on all + * segments + */ + fragments = FilterFragmentsForSegment(fragments); + } + + /* Assign PXF location for the allocated fragments */ + AssignPxfLocationToFragments(options, fragments); /* * Save state in node->fdw_state. We must save enough information to call @@ -385,18 +430,12 @@ pxfBeginForeignScan(ForeignScanState *node, int eflags) pxfsstate = (PxfFdwScanState *) palloc(sizeof(PxfFdwScanState)); initStringInfo(&pxfsstate->uri); - pxfsstate->options = PxfGetOptions(foreigntableid); - - /* retrieve fdw-private stuff from pxfGetForeignPlan() */ - pxfsstate->filter_str = strVal(list_nth(foreignScan->fdw_private, FdwScanPrivateWhereClauses)); - pxfsstate->retrieved_attrs = (List *) list_nth(foreignScan->fdw_private, FdwScanPrivateRetrievedAttrs); - + pxfsstate->filter_str = filter_str; + pxfsstate->fragments = fragments; + pxfsstate->options = options; pxfsstate->quals = quals; pxfsstate->relation = relation; - pxfsstate->fragments = GetFragmentList(pxfsstate->options, - pxfsstate->relation, - pxfsstate->filter_str, - pxfsstate->retrieved_attrs); + pxfsstate->retrieved_attrs = retrieved_attrs; InitCopyState(pxfsstate); node->fdw_state = (void *) pxfsstate; @@ -422,7 +461,6 @@ pxfIterateForeignScan(ForeignScanState *node) ErrorContextCallback errcallback; bool found; - /* Set up callback to identify error line number. */ errcallback.callback = CopyFromErrorCallback; errcallback.arg = (void *) pxfsstate->cstate; diff --git a/contrib/pxf_fdw/pxf_filter.c b/contrib/pxf_fdw/pxf_filter.c index 0849efed9aa1570c1ce7cacd9b91347ea0a55108..8ce49d27aad88ee280d22a66b60d67dd5a612dbf 100644 --- a/contrib/pxf_fdw/pxf_filter.c +++ b/contrib/pxf_fdw/pxf_filter.c @@ -1401,9 +1401,7 @@ SerializePxfFilterQuals(List *quals) char *result = NULL; if (quals == NULL) - { return result; - } List *clauses = NULL; ListCell *lc; diff --git a/contrib/pxf_fdw/pxf_fragment.c b/contrib/pxf_fdw/pxf_fragment.c index 115923fed92f312b87a024f2d55c16035313bacd..60645744c136f923c9e8a285b87ff6b1ffbce1b5 100644 --- a/contrib/pxf_fdw/pxf_fragment.c +++ b/contrib/pxf_fdw/pxf_fragment.c @@ -13,19 +13,17 @@ #define LOG_DEBUG (FRAGDEBUG >= log_min_messages) || (FRAGDEBUG >= client_min_messages) static List *GetDataFragmentList(PxfOptions *options, ClientContext * client_context); -static void rest_request(PxfOptions *options, ClientContext * client_context, char *rest_msg); -static List *parse_get_fragments_response(List *fragments, StringInfo rest_buf); -static List *FilterFragmentsForSegment(List *list); -static void init(ClientContext * client_context); -static void logFragmentList(const char *debugHeader, List *fragments); -static void init_client_context(ClientContext * client_context); -static void AssignPxfLocationToFragments(PxfOptions *options, List *fragments); -static void call_rest(PxfOptions *options, ClientContext * client_context, char *rest_msg); -static void process_request(ClientContext * client_context, char *uri); -static void pxf_fragment_scalar(void *state, char *token, JsonTokenType type); -static void pxf_fragment_object_start(void *state, char *name, bool isnull); -static void pxf_array_element_start(void *state, bool isnull); -static void pxf_array_element_end(void *state, bool isnull); +static void RestRequest(PxfOptions *options, ClientContext * client_context, char *rest_msg); +static List *ParseGetFragmentsResponse(StringInfo rest_buf); +static void Init(ClientContext * client_context); +static void LogFragmentList(const char *debugHeader, List *fragments); +static void InitClientContext(ClientContext * client_context); +static void CallRest(PxfOptions *options, ClientContext * client_context, char *rest_msg); +static void ProcessRequest(ClientContext * client_context, char *uri); +static void PxfFragmentScalar(void *state, char *token, JsonTokenType type); +static void PxfFragmentObjectStart(void *state, char *name, bool isnull); +static void PxfArrayElementStart(void *state, bool isnull); +static void PxfArrayElementEnd(void *state, bool isnull); /* Get List of fragments using PXF * Returns selected fragments that have been allocated to the current segment @@ -44,7 +42,7 @@ GetFragmentList(PxfOptions *options, Assert(options != NULL); /* 1. Initialize curl headers */ - init(&client_context); + Init(&client_context); /* Enrich the curl HTTP header */ BuildHttpHeaders(client_context.http_headers, @@ -61,24 +59,7 @@ GetFragmentList(PxfOptions *options, return NIL; if (LOG_DEBUG) - logFragmentList("Available Data fragments", data_fragments); - - if (options->exec_location == FTEXECLOCATION_ALL_SEGMENTS) - { - /* - * Call the work allocation algorithm when execution happens on all - * segments - */ - data_fragments = FilterFragmentsForSegment(data_fragments); - if (data_fragments == NIL) - return NIL; - } - - /* Assign PXF location for the allocated fragments */ - AssignPxfLocationToFragments(options, data_fragments); - - if (LOG_DEBUG) - logFragmentList("Allocated Data fragments", data_fragments); + LogFragmentList("Available Data fragments", data_fragments); return data_fragments; } @@ -87,7 +68,7 @@ GetFragmentList(PxfOptions *options, * Assign PXF Host for each Data Fragment * Will use the same host as the existing segment as the PXF host. */ -static void +void AssignPxfLocationToFragments(PxfOptions *options, List *fragments) { ListCell *frag_c = NULL; @@ -95,7 +76,6 @@ AssignPxfLocationToFragments(PxfOptions *options, List *fragments) foreach(frag_c, fragments) { FragmentData *fragment = (FragmentData *) lfirst(frag_c); - fragment->authority = psprintf("%s:%d", options->pxf_host, options->pxf_port); } } @@ -112,13 +92,13 @@ static List * GetDataFragmentList(PxfOptions *options, ClientContext * client_context) { - List *data_fragments = NIL; + List *data_fragments; char *restMsg = "http://%s:%d/%s/%s/Fragmenter/getFragments"; - rest_request(options, client_context, restMsg); + RestRequest(options, client_context, restMsg); /* parse the JSON response and form a fragments list to return */ - data_fragments = parse_get_fragments_response(data_fragments, &(client_context->the_rest_buf)); + data_fragments = ParseGetFragmentsResponse(&(client_context->the_rest_buf)); return data_fragments; } @@ -129,12 +109,12 @@ GetDataFragmentList(PxfOptions *options, * to failover with a retry for the HA HDFS scenario. */ static void -rest_request(PxfOptions *options, ClientContext * client_context, char *rest_msg) +RestRequest(PxfOptions *options, ClientContext * client_context, char *rest_msg) { Assert(options->pxf_host != NULL && options->pxf_port > 0); /* construct the request */ - call_rest(options, client_context, rest_msg); + CallRest(options, client_context, rest_msg); } /* @@ -168,7 +148,7 @@ typedef struct FragmentState } FragmentState; static void -pxf_fragment_object_start(void *state, char *name, bool isnull) +PxfFragmentObjectStart(void *state, char *name, bool isnull) { FragmentState *s = (FragmentState *) state; @@ -216,8 +196,8 @@ pxf_fragment_object_start(void *state, char *name, bool isnull) } static void -check_and_assign(char **field, JsonTokenType type, char *token, - JsonTokenType expected_type, bool nullable) +CheckAndAssign(char **field, JsonTokenType type, char *token, + JsonTokenType expected_type, bool nullable) { if (type == JSON_TOKEN_NULL && nullable) return; @@ -231,7 +211,7 @@ check_and_assign(char **field, JsonTokenType type, char *token, } static void -pxf_fragment_scalar(void *state, char *token, JsonTokenType type) +PxfFragmentScalar(void *state, char *token, JsonTokenType type) { FragmentState *s = (FragmentState *) state; FragmentData *d = (FragmentData *) llast(s->fragments); @@ -243,19 +223,19 @@ pxf_fragment_scalar(void *state, char *token, JsonTokenType type) switch (s->object) { case PXF_PARSE_USERDATA: - check_and_assign(&(d->user_data), type, token, JSON_TOKEN_STRING, true); + CheckAndAssign(&(d->user_data), type, token, JSON_TOKEN_STRING, true); break; case PXF_PARSE_METADATA: - check_and_assign(&(d->fragment_md), type, token, JSON_TOKEN_STRING, true); + CheckAndAssign(&(d->fragment_md), type, token, JSON_TOKEN_STRING, true); break; case PXF_PARSE_PROFILE: - check_and_assign(&(d->profile), type, token, JSON_TOKEN_STRING, true); + CheckAndAssign(&(d->profile), type, token, JSON_TOKEN_STRING, true); break; case PXF_PARSE_SOURCENAME: - check_and_assign(&(d->source_name), type, token, JSON_TOKEN_STRING, false); + CheckAndAssign(&(d->source_name), type, token, JSON_TOKEN_STRING, false); break; case PXF_PARSE_INDEX: - check_and_assign(&(d->index), type, token, JSON_TOKEN_NUMBER, true); + CheckAndAssign(&(d->index), type, token, JSON_TOKEN_NUMBER, true); break; case PXF_PARSE_REPLICAS: if (type == JSON_TOKEN_STRING) @@ -270,7 +250,7 @@ pxf_fragment_scalar(void *state, char *token, JsonTokenType type) } static void -pxf_array_element_start(void *state, bool isnull) +PxfArrayElementStart(void *state, bool isnull) { FragmentState *s = (FragmentState *) state; FragmentData *data; @@ -294,7 +274,7 @@ pxf_array_element_start(void *state, bool isnull) } static void -pxf_array_element_end(void *state, bool isnull) +PxfArrayElementEnd(void *state, bool isnull) { FragmentState *s = (FragmentState *) state; @@ -307,7 +287,7 @@ pxf_array_element_end(void *state, bool isnull) } static List * -parse_get_fragments_response(List *fragments, StringInfo rest_buf) +ParseGetFragmentsResponse(StringInfo rest_buf) { JsonSemAction *sem; FragmentState *state; @@ -322,10 +302,10 @@ parse_get_fragments_response(List *fragments, StringInfo rest_buf) state->has_replicas = false; sem->semstate = state; - sem->scalar = pxf_fragment_scalar; - sem->object_field_start = pxf_fragment_object_start; - sem->array_element_start = pxf_array_element_start; - sem->array_element_end = pxf_array_element_end; + sem->scalar = PxfFragmentScalar; + sem->object_field_start = PxfFragmentObjectStart; + sem->array_element_start = PxfArrayElementStart; + sem->array_element_end = PxfArrayElementEnd; pg_parse_json(state->lex, sem); @@ -339,7 +319,7 @@ parse_get_fragments_response(List *fragments, StringInfo rest_buf) * Removes the elements which will not be processed from the list and frees up their memory. * Returns the resulting list, or NIL if no elements satisfy the condition. */ -static List * +List * FilterFragmentsForSegment(List *list) { if (!list) @@ -394,7 +374,7 @@ FilterFragmentsForSegment(List *list) ListCell *to_delete = current; if (to_delete->data.ptr_value) - free_fragment((FragmentData *) to_delete->data.ptr_value); + FreeFragment((FragmentData *) to_delete->data.ptr_value); current = lnext(to_delete); result = list_delete_cell(list, to_delete, previous); } @@ -402,14 +382,71 @@ FilterFragmentsForSegment(List *list) return result; } +/* + * Serializes a List of FragmentData such that it can be transmitted + * via the interconnect, and the list can be passed from master to + * segments + */ +List * +SerializeFragmentList(List *fragments) +{ + ListCell *frag_c = NULL; + List *serializedFragmentList = NIL; + List *serializedFragment; + + foreach(frag_c, fragments) + { + FragmentData *fragment = (FragmentData *) lfirst(frag_c); + + serializedFragment = NIL; + serializedFragment = lappend(serializedFragment, makeString(fragment->index)); + serializedFragment = lappend(serializedFragment, makeString(fragment->source_name)); + serializedFragment = lappend(serializedFragment, makeString(fragment->fragment_md)); + serializedFragment = lappend(serializedFragment, makeString(fragment->user_data)); + serializedFragment = lappend(serializedFragment, makeString(fragment->profile)); + serializedFragment = lappend(serializedFragment, makeString(psprintf("%d", fragment->fragment_idx))); + + serializedFragmentList = lappend(serializedFragmentList, serializedFragment); + } + + return serializedFragmentList; +} + +/* + * Deserializes a List of FragmentData received from the interconnect + */ +List * +DeserializeFragmentList(List *serializedFragmentList) +{ + ListCell *frag_c = NULL; + List *fragments = NIL; + + foreach(frag_c, serializedFragmentList) + { + List *serializedFragment = (List *) lfirst(frag_c); + + FragmentData *fragment = (FragmentData *) palloc0(sizeof(FragmentData)); + fragment->index = strVal(list_nth(serializedFragment, 0)); + fragment->source_name = strVal(list_nth(serializedFragment, 1)); + fragment->fragment_md = strVal(list_nth(serializedFragment, 2)); + fragment->user_data = strVal(list_nth(serializedFragment, 3)); + fragment->profile = strVal(list_nth(serializedFragment, 4)); + fragment->fragment_idx = atoi(strVal(list_nth(serializedFragment, 5))); + + fragments = lappend(fragments, fragment); + } + + return fragments; +} + /* * Preliminary curl initializations for the REST communication */ static void -init(ClientContext * client_context) +Init(ClientContext * client_context) { + InitClientContext(client_context); /* Communication with back-end, initialize churl client context and header */ - init_client_context(client_context); client_context->http_headers = churl_headers_init(); /* set HTTP header that guarantees response in JSON format */ @@ -420,7 +457,7 @@ init(ClientContext * client_context) * Free fragment data */ void -free_fragment(FragmentData *data) +FreeFragment(FragmentData *data) { if (data->authority) pfree(data->authority); @@ -442,7 +479,7 @@ free_fragment(FragmentData *data) * response to request */ static void -logFragmentList(const char *debugHeader, List *fragments) +LogFragmentList(const char *debugHeader, List *fragments) { ListCell *fragment_cell = NULL; StringInfoData log_str; @@ -473,7 +510,7 @@ logFragmentList(const char *debugHeader, List *fragments) * Initializes the client context */ static void -init_client_context(ClientContext * client_context) +InitClientContext(ClientContext * client_context) { client_context->http_headers = NULL; client_context->handle = NULL; @@ -488,7 +525,7 @@ init_client_context(ClientContext * client_context) * host>:port> */ static void -call_rest(PxfOptions *options, ClientContext * client_context, char *rest_msg) +CallRest(PxfOptions *options, ClientContext * client_context, char *rest_msg) { StringInfoData request; @@ -501,7 +538,7 @@ call_rest(PxfOptions *options, ClientContext * client_context, char *rest_msg) PXF_VERSION); /* send the request. The response will exist in rest_buf.data */ - process_request(client_context, request.data); + ProcessRequest(client_context, request.data); pfree(request.data); } @@ -509,7 +546,7 @@ call_rest(PxfOptions *options, ClientContext * client_context, char *rest_msg) * Reads from churl in chunks of 64K and copies data to the context's buffer */ static void -process_request(ClientContext * client_context, char *uri) +ProcessRequest(ClientContext * client_context, char *uri) { size_t n = 0; char buffer[RAW_BUF_SIZE]; diff --git a/contrib/pxf_fdw/pxf_fragment.h b/contrib/pxf_fdw/pxf_fragment.h index 329b5fcbb01062b73a6070e5f4589957648aff1d..642806b98556702dd5a8179997d7b7f57f7fb0b5 100644 --- a/contrib/pxf_fdw/pxf_fragment.h +++ b/contrib/pxf_fdw/pxf_fragment.h @@ -74,9 +74,35 @@ extern List *GetFragmentList(PxfOptions *options, char *filter_string, List *retrieved_attrs); +/* + * Takes a list of fragments and determines which ones need to be processed + * by the given segment based on MOD function. Removes the elements which will + * not be processed from the list and frees up their memory. + * Returns the resulting list, or NIL if no elements satisfy the condition. + */ +extern List *FilterFragmentsForSegment(List *list); + +/* + * Serializes a List of FragmentData such that it can be transmitted + * via the interconnect, and the list can be passed from master to + * segments + */ +extern List * SerializeFragmentList(List *fragments); + +/* + * Deserializes a List of FragmentData received from the interconnect + */ +extern List * DeserializeFragmentList(List *serializedFragmentList); + +/* + * Assign PXF Host for each Data Fragment + * Will use the same host as the existing segment as the PXF host. + */ +extern void AssignPxfLocationToFragments(PxfOptions *options, List *fragments); + /* * Frees the given fragment */ -extern void free_fragment(FragmentData *data); +extern void FreeFragment(FragmentData *data); #endif /* GPDB_PXFFRAGMENT_H */ diff --git a/contrib/pxf_fdw/pxf_header.c b/contrib/pxf_fdw/pxf_header.c index cd235f62bb2e7950efb20b1abd35efddc90ca236..676275bc7ac1e051356e9f8d0dccb96cabbe4b1a 100644 --- a/contrib/pxf_fdw/pxf_header.c +++ b/contrib/pxf_fdw/pxf_header.c @@ -103,7 +103,7 @@ BuildHttpHeaders(CHURL_HEADERS headers, AddOptionsToHttpHeader(headers, options->copy_options); /* filters */ - if (filter_string != NULL) + if (filter_string && strcmp(filter_string, "") != 0) { churl_headers_append(headers, "X-GP-FILTER", filter_string); churl_headers_append(headers, "X-GP-HAS-FILTER", "1");