提交 2e4d99fa 编写于 作者: H Heikki Linnakangas

Change Material back to using upstream tuplestore.

Now that ShareInputScan manages its own tuplestore, Material doesn't need
the extra features that tuplestorenew.c provides.
Reviewed-by: NGeorgios Kokolatos <gkokolatos@pivotal.io>
上级 b93fcd13
......@@ -89,6 +89,7 @@ Finalize Aggregate (cost=35148.64..35148.65 rows=1 width=8) (actual rows=1 loop
-> Seq Scan on auto_explain_test.t1 (cost=0.00..13.01 rows=334 width=0) (actual rows=340 loops=1)
Output: t1.a
-> Materialize (cost=0.00..68.06 rows=1001 width=0) (actual rows=1001 loops=340)
work_mem: 142kB Segments: 3 Max: 48kB (segment 0) Workfile: (0 spilling)
-> Broadcast Motion 3:3 (slice2; segments: 3) (cost=0.00..53.05 rows=1001 width=0) (actual rows=1001 loops=1)
-> Seq Scan on auto_explain_test.t2 (cost=0.00..13.01 rows=334 width=0) (actual rows=340 loops=1)
(slice0) Executor memory: 131K bytes.
......
......@@ -25,16 +25,13 @@
#include "executor/executor.h"
#include "executor/nodeMaterial.h"
#include "executor/instrument.h" /* Instrumentation */
#include "utils/tuplestorenew.h"
#include "miscadmin.h"
#include "cdb/cdbvars.h"
#include "executor/instrument.h" /* Instrumentation */
static void ExecMaterialExplainEnd(PlanState *planstate, struct StringInfoData *buf);
static void ExecChildRescan(MaterialState *node);
static void DestroyTupleStore(MaterialState *node);
static void ExecEagerFreeMaterial(MaterialState *node);
......@@ -54,13 +51,9 @@ ExecMaterial(MaterialState *node)
EState *estate;
ScanDirection dir;
bool forward;
NTupleStore *ts;
NTupleStoreAccessor *tsa;
Tuplestorestate *tuplestorestate;
bool eof_tuplestore;
TupleTableSlot *slot;
Material *ma;
/*
* get state info from node
......@@ -68,31 +61,34 @@ ExecMaterial(MaterialState *node)
estate = node->ss.ps.state;
dir = estate->es_direction;
forward = ScanDirectionIsForward(dir);
ts = node->ts_state;
tsa = (NTupleStoreAccessor *) node->ts_pos;
ma = (Material *) node->ss.ps.plan;
Assert(IsA(ma, Material));
tuplestorestate = node->tuplestorestate;
/*
* If first time through, and we need a tuplestore, initialize it.
*/
if (ts == NULL && node->eflags != 0)
if (tuplestorestate == NULL && node->eflags != 0)
{
/* Non-shared Materialize node */
ts = ntuplestore_create(PlanStateOperatorMemKB((PlanState *) node) * 1024, "Materialize");
tsa = ntuplestore_create_accessor(ts, true /* isWriter */);
tuplestorestate = tuplestore_begin_heap(true, false, PlanStateOperatorMemKB(&node->ss.ps));
tuplestore_set_eflags(tuplestorestate, node->eflags);
if (node->eflags & EXEC_FLAG_MARK)
{
/*
* Allocate a second read pointer to serve as the mark. We know it
* must have index 1, so needn't store that.
*/
int ptrno PG_USED_FOR_ASSERTS_ONLY;
Assert(ts && tsa);
node->ts_state = ts;
node->ts_pos = (void *) tsa;
ptrno = tuplestore_alloc_read_pointer(tuplestorestate,
node->eflags);
Assert(ptrno == 1);
}
node->tuplestorestate = tuplestorestate;
/* CDB: Offer extra info for EXPLAIN ANALYZE. */
if (node->ss.ps.instrument && node->ss.ps.instrument->need_cdb)
{
/* Let the tuplestore share our Instrumentation object. */
ntuplestore_setinstrument(ts, node->ss.ps.instrument);
tuplestore_set_instrument(tuplestorestate, node->ss.ps.instrument);
/* Request a callback at end of query. */
node->ss.ps.cdbexplainfun = ExecMaterialExplainEnd;
......@@ -105,46 +101,56 @@ ExecMaterial(MaterialState *node)
* MPP TODO: Remove when a better solution is implemented.
*
* See motion_sanity_walker() for details on how a deadlock may occur.
*
* ShareInput: if the material node
* is used to share input, we will need to fetch all rows and put
* them in tuple store
*/
while (((Material *) node->ss.ps.plan)->cdb_strict)
if (((Material *) node->ss.ps.plan)->cdb_strict)
{
TupleTableSlot *outerslot = ExecProcNode(outerPlanState(node));
if (TupIsNull(outerslot))
for (;;)
{
node->eof_underlying = true;
ntuplestore_acc_seek_bof(tsa);
TupleTableSlot *outerslot = ExecProcNode(outerPlanState(node));
break;
}
if (TupIsNull(outerslot))
break;
ntuplestore_acc_put_tupleslot(tsa, outerslot);
tuplestore_puttupleslot(tuplestorestate, outerslot);
}
node->eof_underlying = true;
tuplestore_rescan(tuplestorestate);
}
}
if(forward)
ntuplestore_acc_seek_bof(tsa);
else
ntuplestore_acc_seek_eof(tsa);
/*
* If we are not at the end of the tuplestore, or are going backwards, try
* to fetch a tuple from tuplestore.
*/
eof_tuplestore = (tuplestorestate == NULL) ||
tuplestore_ateof(tuplestorestate);
if (!forward && eof_tuplestore)
{
if (!node->eof_underlying)
{
/*
* When reversing direction at tuplestore EOF, the first
* gettupleslot call will fetch the last-added tuple; but we want
* to return the one before that, if possible. So do an extra
* fetch.
*/
if (!tuplestore_advance(tuplestorestate, forward))
return NULL; /* the tuplestore must be empty */
}
eof_tuplestore = false;
}
/*
* If we can fetch another tuple from the tuplestore, return it.
*/
slot = node->ss.ps.ps_ResultTupleSlot;
if(forward)
eof_tuplestore = (tsa == NULL) || !ntuplestore_acc_advance(tsa, 1);
else
eof_tuplestore = (tsa == NULL) || !ntuplestore_acc_advance(tsa, -1);
if(tsa != NULL && ntuplestore_acc_tell(tsa, NULL))
if (!eof_tuplestore)
{
ntuplestore_acc_current_tupleslot(tsa, slot);
return slot;
if (tuplestore_gettupleslot(tuplestorestate, forward, false, slot))
return slot;
if (forward)
eof_tuplestore = true;
}
/*
......@@ -154,7 +160,9 @@ ExecMaterial(MaterialState *node)
* subplan calls. It's not optional, unfortunately, because some plan
* node types are not robust about being called again when they've already
* returned NULL.
* If reusing cached workfiles, there is no need to execute subplan at all.
*
* GPDB: If reusing cached workfiles, there is no need to execute subplan
* at all.
*/
if (eof_tuplestore && !node->eof_underlying)
{
......@@ -178,8 +186,13 @@ ExecMaterial(MaterialState *node)
return NULL;
}
if (tsa)
ntuplestore_acc_put_tupleslot(tsa, outerslot);
/*
* Append a copy of the returned tuple to tuplestore. NOTE: because
* the tuplestore is certainly in EOF state, its read position will
* move forward over the added tuple. This is what we want.
*/
if (tuplestorestate)
tuplestore_puttupleslot(tuplestorestate, outerslot);
/*
* We can just return the subplan's returned tuple, without copying.
......@@ -253,9 +266,7 @@ ExecInitMaterial(Material *node, EState *estate, int eflags)
matstate->eflags |= EXEC_FLAG_REWIND;
matstate->eof_underlying = false;
matstate->ts_state = NULL;
matstate->ts_pos = NULL;
matstate->ts_markpos = NULL;
matstate->tuplestorestate = NULL;
matstate->ts_destroyed = false;
/*
......@@ -352,11 +363,12 @@ ExecEndMaterial(MaterialState *node)
/*
* Release tuplestore resources for cases where EagerFree doesn't do it
*/
if (node->ts_state != NULL)
if (node->tuplestorestate != NULL)
{
Assert(node->ts_pos);
DestroyTupleStore(node);
tuplestore_end(node->tuplestorestate);
node->ts_destroyed = true;
}
node->tuplestorestate = NULL;
/*
* shut down the subplan
......@@ -375,30 +387,21 @@ ExecMaterialMarkPos(MaterialState *node)
{
Assert(node->eflags & EXEC_FLAG_MARK);
#ifdef DEBUG
{
/* share input should never call this */
Material *ma = (Material *) node->ss.ps.plan;
Assert(ma->share_type == SHARE_NOTSHARED);
}
#endif
/*
* if we haven't materialized yet, just return.
*/
if (NULL == node->ts_state)
{
if (!node->tuplestorestate)
return;
}
Assert(node->ts_pos);
if(node->ts_markpos == NULL)
{
node->ts_markpos = palloc(sizeof(NTupleStorePos));
}
/*
* copy the active read pointer to the mark.
*/
tuplestore_copy_read_pointer(node->tuplestorestate, 0, 1);
ntuplestore_acc_tell((NTupleStoreAccessor *) node->ts_pos, (NTupleStorePos *) node->ts_markpos);
/*
* since we may have advanced the mark, try to truncate the tuplestore.
*/
tuplestore_trim(node->tuplestorestate);
}
/* ----------------------------------------------------------------
......@@ -412,48 +415,16 @@ ExecMaterialRestrPos(MaterialState *node)
{
Assert(node->eflags & EXEC_FLAG_MARK);
#ifdef DEBUG
{
/* share input should never call this */
Material *ma = (Material *) node->ss.ps.plan;
Assert(ma->share_type == SHARE_NOTSHARED);
}
#endif
/*
* if we haven't materialized yet, just return.
*/
if (NULL == node->ts_state)
{
if (!node->tuplestorestate)
return;
}
Assert(node->ts_pos && node->ts_markpos);
ntuplestore_acc_seek((NTupleStoreAccessor *) node->ts_pos, (NTupleStorePos *) node->ts_markpos);
}
/*
* DestroyTupleStore
* Helper function for destroying tuple store
*/
void
DestroyTupleStore(MaterialState *node)
{
Assert(NULL != node);
Assert(NULL != node->ts_state);
ntuplestore_destroy_accessor((NTupleStoreAccessor *) node->ts_pos);
ntuplestore_destroy(node->ts_state);
if(node->ts_markpos)
{
pfree(node->ts_markpos);
}
node->ts_state = NULL;
node->ts_pos = NULL;
node->ts_markpos = NULL;
node->eof_underlying = false;
node->ts_destroyed = true;
/*
* copy the mark to the active read pointer.
*/
tuplestore_copy_read_pointer(node->tuplestorestate, 1, 0);
}
/*
......@@ -493,7 +464,7 @@ ExecReScanMaterial(MaterialState *node)
* If tuple store is empty, then either we have not materialized yet
* or tuple store was destroyed after a previous execution of materialize.
*/
if (NULL == node->ts_state)
if (!node->tuplestorestate)
{
/*
* If tuple store was destroyed before, then materialize is part of subquery
......@@ -519,14 +490,15 @@ ExecReScanMaterial(MaterialState *node)
if (outerPlan->chgParam != NULL ||
(node->eflags & EXEC_FLAG_REWIND) == 0)
{
DestroyTupleStore(node);
tuplestore_end(node->tuplestorestate);
node->tuplestorestate = NULL;
node->ts_destroyed = true;
if (outerPlan->chgParam == NULL)
ExecReScan(outerPlan);
node->eof_underlying = false;
}
else
{
ntuplestore_acc_seek_bof((NTupleStoreAccessor *) node->ts_pos);
}
tuplestore_rescan(node->tuplestorestate);
}
else
{
......@@ -541,11 +513,11 @@ ExecEagerFreeMaterial(MaterialState *node)
/*
* Release tuplestore resources
*/
if (NULL != node->ts_state)
if (node->tuplestorestate)
{
Assert(node->ts_pos);
DestroyTupleStore(node);
tuplestore_end(node->tuplestorestate);
node->tuplestorestate = NULL;
node->ts_destroyed = true;
}
}
......
......@@ -499,6 +499,9 @@ tuplestore_end(Tuplestorestate *state)
state->instrument->workmemwanted =
Max(state->instrument->workmemwanted, nbytes);
}
if (state->myfile)
state->instrument->workfileCreated = true;
}
if (state->myfile)
......
......@@ -2455,14 +2455,12 @@ typedef struct MaterialState
ScanState ss; /* its first field is NodeTag */
int eflags; /* capability flags to pass to tuplestore */
bool eof_underlying; /* reached end of underlying plan? */
Tuplestorestate *tuplestorestate;
bool ts_destroyed; /* called destroy tuple store? */
bool delayEagerFree; /* is is safe to free memory used by this node,
* when this node has outputted its last row? */
struct NTupleStore *ts_state; /* private state of tuplestore.c */
void *ts_pos;
void *ts_markpos;
} MaterialState;
/* ----------------
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册