提交 f669acf7 编写于 作者: H Hubert Zhang

Refactor the resource management in INITPLAN function

We introduce function which runs on INITPLAN in commit a21ff2
INITPLAN function is designed to support "CTAS select * from udf();"
Since udf() is run on EntryDB, but EntryDB is always read gang which
cannot do dispatch work, the query would fail if function contains DDL
statement etc.

The idea of INITPLAN function is to run the function on INITPLAN, which
is QD in fact and store the result into a tuplestore. Later the FunctionScan
on EntryDB just read tuple from tuplestore instead of running the real function.

But the life cycle management is a little tricky. In the original commit, we
hack to close the tuplestore in INITPLAN without deleting the file, and let
EntryDB reader to delete the file after finishing the tuple fetch. This will
introduce file leak if the transaction abort before the entryDB runs.

This commit add a postprocess_initplans in ExecutorEnd() of the main plan to
clean the tuplestore createed in preprocess_initplans in ExecutorStart() of
the main plan. Note that postprocess_initplans must be place after the dispatch
work are finished i.e. mppExecutorFinishup().
Upstream don't need this function since it always use scalar PARAM to communicate
between INITPLAN and main plan.
上级 b371e592
......@@ -22,6 +22,7 @@
#include "cdb/cdbllize.h"
#include "cdb/cdbsubplan.h"
#include "cdb/cdbvars.h" /* currentSliceId */
#include "utils/tuplestorenew.h"
static bool isParamExecutableNow(SubPlanState *spstate, ParamExecData *prmList);
......@@ -148,6 +149,38 @@ preprocess_initplans(QueryDesc *queryDesc)
}
}
/*
* CDB: Post processing INITPLAN to clean up resource with long life cycle
*
* INITPLAN usually communicate with main plan through scalar PARAM, but in some case,
* the main plan need to get more data from INITPLAN which long life cycle resource like
* temp file will be used.
* Take INITPLAN function case as an example, INITPLAN will store its result into
* tuplestore, which will be read by entryDB in main plan. Tuplestore and corresponding
* files should not be cleaned before the main plan finished.
*
* postprocess_initplans is used to clean these resources in ExecutorEnd of main plan.
*/
void
postprocess_initplans(QueryDesc *queryDesc)
{
EState *estate = queryDesc->estate;
ParamExecData *prm;
SubPlanState *sps;
int i;
/* clean ntuplestore used by INITPLAN function */
for (i = 0; i < queryDesc->plannedstmt->nParamExec; i++)
{
prm = &estate->es_param_exec_vals[i];
sps = (SubPlanState *) prm->execPlan;
if(sps && sps->ts_pos)
ntuplestore_destroy_accessor(sps->ts_pos);
if(sps && sps->ts_state)
ntuplestore_destroy(sps->ts_state);
}
}
static bool
isParamExecutableNow(SubPlanState *spstate, ParamExecData *prmList)
{
......
......@@ -1106,6 +1106,17 @@ standard_ExecutorEnd(QueryDesc *queryDesc)
*/
RemoveMotionLayer(estate->motionlayer_context);
/*
* GPDB specific
* Clean the special resources created by INITPLAN.
* The resources have long life cycle and are used by the main plan.
* It's too early to clean them in preprocess_initplans.
*/
if (queryDesc->plannedstmt->nParamExec > 0)
{
postprocess_initplans(queryDesc);
}
/*
* Release EState and per-query memory context.
*/
......@@ -1115,6 +1126,17 @@ standard_ExecutorEnd(QueryDesc *queryDesc)
}
PG_END_TRY();
/*
* GPDB specific
* Clean the special resources created by INITPLAN.
* The resources have long life cycle and are used by the main plan.
* It's too early to clean them in preprocess_initplans.
*/
if (queryDesc->plannedstmt->nParamExec > 0)
{
postprocess_initplans(queryDesc);
}
/*
* If normal termination, let each operator clean itself up.
* Otherwise don't risk it... an error might have left some
......
......@@ -104,14 +104,7 @@ FunctionNext_guts(FunctionScanState *node)
char rwfile_prefix[100];
function_scan_create_bufname_prefix(rwfile_prefix, sizeof(rwfile_prefix));
node->ts_state = ntuplestore_create_readerwriter(rwfile_prefix, 0, false, false);
/*
* delete file when close tuplestore reader
* tuplestore writer is created in initplan, so it needs to keep
* the file even if initplan ended.
* we should let the reader to delete it when reader's job finished.
*/
ntuplestore_set_is_temp_file(node->ts_state, true);
node->ts_state = ntuplestore_create_readerwriter(rwfile_prefix, 0, false);
node->ts_pos = ntuplestore_create_accessor(node->ts_state, false);
ntuplestore_acc_seek_bof(node->ts_pos);
......
......@@ -209,7 +209,7 @@ init_tuplestore_state(ShareInputScanState *node)
elog(DEBUG1, "SISC writer (shareid=%d, slice=%d): No tuplestore yet, creating tuplestore",
sisc->share_id, currentSliceId);
shareinput_create_bufname_prefix(rwfile_prefix, sizeof(rwfile_prefix), sisc->share_id);
ts = ntuplestore_create_readerwriter(rwfile_prefix, PlanStateOperatorMemKB((PlanState *)node) * 1024, true, true);
ts = ntuplestore_create_readerwriter(rwfile_prefix, PlanStateOperatorMemKB((PlanState *)node) * 1024, true);
tsa = ntuplestore_create_accessor(ts, true);
}
else
......@@ -246,7 +246,7 @@ init_tuplestore_state(ShareInputScanState *node)
shareinput_reader_waitready(node->ref);
shareinput_create_bufname_prefix(rwfile_prefix, sizeof(rwfile_prefix), sisc->share_id);
ts = ntuplestore_create_readerwriter(rwfile_prefix, 0, false, false);
ts = ntuplestore_create_readerwriter(rwfile_prefix, 0, false);
tsa = ntuplestore_create_accessor(ts, false);
}
local_state->ts_state = ts;
......
......@@ -1123,8 +1123,7 @@ PG_TRY();
function_scan_create_bufname_prefix(rwfile_prefix, sizeof(rwfile_prefix));
node->ts_state = ntuplestore_create_readerwriter(rwfile_prefix, PlanStateOperatorMemKB((PlanState *)(node->planstate)) * 1024, true, false);
ntuplestore_set_is_temp_file(node->ts_state, false);
node->ts_state = ntuplestore_create_readerwriter(rwfile_prefix, PlanStateOperatorMemKB((PlanState *)(node->planstate)) * 1024, true);
node->ts_pos = (void *)ntuplestore_create_accessor(node->ts_state, true);
}
......@@ -1142,6 +1141,7 @@ PG_TRY();
if (subLinkType == INITPLAN_FUNC_SUBLINK)
{
ntuplestore_acc_put_tupleslot((NTupleStoreAccessor *) node->ts_pos, slot);
found = true;
continue;
}
......@@ -1212,20 +1212,13 @@ PG_TRY();
}
/*
* Flush and cleanup the tuplestore writer
*
* Note that the file of tuplestore will not be deleted at here.
* This is due to the tuplestore reader is outside initplan, and
* reader will delete the file when it finished.
* Flush the tuplestore writer
*
*/
if (subLinkType == INITPLAN_FUNC_SUBLINK && node->ts_state)
{
ntuplestore_acc_seek_bof(node->ts_pos);
ntuplestore_flush(node->ts_state);
ntuplestore_destroy_accessor(node->ts_pos);
ntuplestore_destroy(node->ts_state);
}
if (!found)
......
......@@ -747,7 +747,7 @@ ntuplestore_create_common(int64 maxBytes, char *operation_name)
* useWorkFile specify whether to use workfile for tuplestore
*/
NTupleStore *
ntuplestore_create_readerwriter(const char *filename, int64 maxBytes, bool isWriter, bool useWorkFile)
ntuplestore_create_readerwriter(const char *filename, int64 maxBytes, bool isWriter)
{
NTupleStore* store = NULL;
char filenamelob[MAXPGPATH];
......@@ -760,8 +760,7 @@ ntuplestore_create_readerwriter(const char *filename, int64 maxBytes, bool isWri
store->rwflag = NTS_IS_WRITER;
store->lobbytes = 0;
store->work_set = NULL;
if (useWorkFile)
store->work_set = workfile_mgr_create_set(store->operation_name, filename);
store->work_set = workfile_mgr_create_set(store->operation_name, filename);
store->pfile = BufFileCreateNamedTemp(filename,
false /* interXact */,
store->work_set);
......@@ -1364,14 +1363,4 @@ ntuplestore_create_spill_files(NTupleStore *nts)
nts->instrument->workfileCreated = true;
}
/*
* Specify the BufFiles used by tuplestore are temp files or not
*/
void
ntuplestore_set_is_temp_file(NTupleStore *ts, bool isTempFile)
{
BufFileSetIsTempFile(ts->pfile, isTempFile);
BufFileSetIsTempFile(ts->plobfile, isTempFile);
}
/* EOF */
......@@ -22,5 +22,6 @@
#include "nodes/plannodes.h"
extern void preprocess_initplans(QueryDesc *queryDesc);
extern void postprocess_initplans(QueryDesc *queryDesc);
#endif /* CDBSUBPLAN_H */
......@@ -26,7 +26,7 @@ void ntuplestore_setinstrument(NTupleStore* ts, struct Instrumentation *ins);
/* Tuple store method */
extern NTupleStore *ntuplestore_create(int64 maxBytes, char *operation_name);
extern NTupleStore *ntuplestore_create_readerwriter(const char* filename, int64 maxBytes, bool isWriter, bool useWorkFile);
extern NTupleStore *ntuplestore_create_readerwriter(const char* filename, int64 maxBytes, bool isWriter);
extern bool ntuplestore_is_readerwriter_reader(NTupleStore* nts);
extern void ntuplestore_flush(NTupleStore *ts);
extern void ntuplestore_destroy(NTupleStore *ts);
......@@ -65,6 +65,5 @@ extern bool ntuplestore_acc_seek_first(NTupleStoreAccessor *tsa);
extern bool ntuplestore_acc_seek_last(NTupleStoreAccessor *tsa);
extern void ntuplestore_acc_seek_bof(NTupleStoreAccessor *tsa);
extern void ntuplestore_acc_seek_eof(NTupleStoreAccessor *tsa);
extern void ntuplestore_set_is_temp_file(NTupleStore *ts, bool isTempFile);
#endif /* TUPSTORE_NEW_H */
......@@ -408,6 +408,12 @@ NOTICE: unique_violation
-- But EntryDB and QEs cannot run DDLs which needs to do dispatch.
-- We introduce new function location 'EXECUTE ON INITPLAN' to run
-- the function on initplan to overcome the above issue.
CREATE or replace FUNCTION get_temp_file_num() returns text as
$$
import os
fileNum = len([name for name in os.listdir('base/pgsql_tmp')])
return fileNum
$$ language plpythonu;
CREATE OR REPLACE FUNCTION get_country()
RETURNS TABLE (
country_id integer,
......@@ -431,6 +437,13 @@ AS $$
public.country c order by country_id;
end; $$
LANGUAGE 'plpgsql' EXECUTE ON INITPLAN;
-- Temp file number before running INITPLAN function
SELECT get_temp_file_num();
get_temp_file_num
-------------------
0
(1 row)
SELECT * FROM get_country();
NOTICE: table "country" does not exist, skipping
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'country_id' as the Greenplum Database data distribution key for this table.
......@@ -569,3 +582,17 @@ SELECT count(*) FROM t3_function_scan;
100000
(1 row)
-- abort case 1: abort before entrydb run the function scan
DROP TABLE IF EXISTS t4_function_scan;
NOTICE: table "t4_function_scan" does not exist, skipping
CREATE TABLE t4_function_scan AS SELECT 444, (1 / (0* random()))::text UNION ALL SELECT * FROM get_country();
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named '?column?' as the Greenplum Database data distribution key for this table.
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
ERROR: division by zero (entry db 10.146.0.4:7000 pid=20360)
-- Temp file number after running INITPLAN function, number should not changed.
SELECT get_temp_file_num();
get_temp_file_num
-------------------
0
(1 row)
......@@ -412,6 +412,12 @@ NOTICE: unique_violation
-- But EntryDB and QEs cannot run DDLs which needs to do dispatch.
-- We introduce new function location 'EXECUTE ON INITPLAN' to run
-- the function on initplan to overcome the above issue.
CREATE or replace FUNCTION get_temp_file_num() returns text as
$$
import os
fileNum = len([name for name in os.listdir('base/pgsql_tmp')])
return fileNum
$$ language plpythonu;
CREATE OR REPLACE FUNCTION get_country()
RETURNS TABLE (
country_id integer,
......@@ -435,6 +441,13 @@ AS $$
public.country c order by country_id;
end; $$
LANGUAGE 'plpgsql' EXECUTE ON INITPLAN;
-- Temp file number before running INITPLAN function
SELECT get_temp_file_num();
get_temp_file_num
-------------------
0
(1 row)
SELECT * FROM get_country();
NOTICE: table "country" does not exist, skipping
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'country_id' as the Greenplum Database data distribution key for this table.
......@@ -573,3 +586,17 @@ SELECT count(*) FROM t3_function_scan;
100000
(1 row)
-- abort case 1: abort before entrydb run the function scan
DROP TABLE IF EXISTS t4_function_scan;
NOTICE: table "t4_function_scan" does not exist, skipping
CREATE TABLE t4_function_scan AS SELECT 444, (1 / (0* random()))::text UNION ALL SELECT * FROM get_country();
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named '?column?' as the Greenplum Database data distribution key for this table.
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
ERROR: division by zero (entry db 10.146.0.4:7000 pid=20360)
-- Temp file number after running INITPLAN function, number should not changed.
SELECT get_temp_file_num();
get_temp_file_num
-------------------
0
(1 row)
......@@ -239,6 +239,12 @@ SELECT trigger_unique();
-- But EntryDB and QEs cannot run DDLs which needs to do dispatch.
-- We introduce new function location 'EXECUTE ON INITPLAN' to run
-- the function on initplan to overcome the above issue.
CREATE or replace FUNCTION get_temp_file_num() returns text as
$$
import os
fileNum = len([name for name in os.listdir('base/pgsql_tmp')])
return fileNum
$$ language plpythonu;
CREATE OR REPLACE FUNCTION get_country()
RETURNS TABLE (
......@@ -265,6 +271,8 @@ AS $$
end; $$
LANGUAGE 'plpgsql' EXECUTE ON INITPLAN;
-- Temp file number before running INITPLAN function
SELECT get_temp_file_num();
SELECT * FROM get_country();
SELECT get_country();
......@@ -316,3 +324,9 @@ DROP TABLE IF EXISTS t3_function_scan;
CREATE TABLE t3_function_scan AS SELECT * FROM get_id();
SELECT count(*) FROM t3_function_scan;
-- abort case 1: abort before entrydb run the function scan
DROP TABLE IF EXISTS t4_function_scan;
CREATE TABLE t4_function_scan AS SELECT 444, (1 / (0* random()))::text UNION ALL SELECT * FROM get_country();
-- Temp file number after running INITPLAN function, number should not changed.
SELECT get_temp_file_num();
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册