未验证 提交 bad6cebc 编写于 作者: J Jinbao Chen 提交者: GitHub

Support 'copy (select statement) to file on segment' (#6077)

In ‘copy (select statement) to file’, we generate a query plan and set
its dest receivor to copy_dest_receive. And run the dest receivor on QD.
In 'copy (select statement) to file on segment', we modify the query plan,
delete gather mothon, and let dest receivor run on QE.

Change 'isCtas' in Query to 'parentStmtType' to be able to mark the upper
utility statement type. Add a CopyIntoClause node to store copy
informations. Add copyIntoClause to PlannedStmt.

In postgres, we don't need to make a different query plan for the
query in the utility stament. But in greenplum, we need to.
So we use a field to indicate whether the query is contained in utitily
statemnt, and the type of utitily statemnt.

Actually the behavior of 'copy (select statement) to file on segment'
is very similar to 'SELECT ... INTO ...' and 'CREATE TABLE ... AS SELECT ...'.
We use distribution policy inherent in the query result as the final data
distribution policy. If not, we use the first clomn in target list as the key,
and redistribute. The only difference is that we used 'copy_dest_receiver'
instead of 'intorel_dest_receiver'
上级 02213a73
......@@ -180,7 +180,7 @@ cdbparallelize(PlannerInfo *root,
{
case CMD_SELECT:
/* SELECT INTO / CREATE TABLE AS always created partitioned tables. */
if (query->isCTAS)
if (query->parentStmtType != PARENTSTMTTYPE_NONE)
context->resultSegments = true;
break;
......
......@@ -278,7 +278,7 @@ apply_motion(PlannerInfo *root, Plan *plan, Query *query)
{
case CMD_SELECT:
/* If the query comes from 'CREAT TABLE AS' or 'SELECT INTO' */
if (query->isCTAS)
if (query->parentStmtType != PARENTSTMTTYPE_NONE)
{
List *hashExpr;
ListCell *exp1;
......@@ -443,14 +443,14 @@ apply_motion(PlannerInfo *root, Plan *plan, Query *query)
strcat(columns, "???");
}
ereport(NOTICE,
(errcode(ERRCODE_SUCCESSFUL_COMPLETION),
errmsg("Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) "
"named '%s' as the Greenplum Database data distribution key for this "
"table. ", columns),
errhint("The 'DISTRIBUTED BY' clause determines the distribution of data."
" Make sure column(s) chosen are the optimal data distribution key to minimize skew.")));
if (query->parentStmtType == PARENTSTMTTYPE_CTAS)
ereport(NOTICE,
(errcode(ERRCODE_SUCCESSFUL_COMPLETION),
errmsg("Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) "
"named '%s' as the Greenplum Database data distribution key for this "
"table. ", columns),
errhint("The 'DISTRIBUTED BY' clause determines the distribution of data."
" Make sure column(s) chosen are the optimal data distribution key to minimize skew.")));
}
}
......
......@@ -25,6 +25,7 @@
#include <sys/wait.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <commands/copy.h>
#include "access/heapam.h"
#include "access/htup_details.h"
......@@ -76,13 +77,7 @@
#define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
#define OCTVALUE(c) ((c) - '0')
/* DestReceiver for COPY (SELECT) TO */
typedef struct
{
DestReceiver pub; /* publicly-known function pointers */
CopyState cstate; /* CopyStateData for the command */
uint64 processed; /* # of tuples processed */
} DR_copy;
/*
......@@ -155,17 +150,19 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
/* non-export function prototypes */
static CopyState BeginCopy(bool is_from, Relation rel, Node *raw_query,
const char *queryString, List *attnamelist, List *options);
const char *queryString, List *attnamelist, List *options,
TupleDesc tupDesc);
static void EndCopy(CopyState cstate);
static CopyState BeginCopyTo(Relation rel, Node *query, const char *queryString,
const char *filename, bool is_program, List *attnamelist,
List *options, bool skip_ext_partition);
static void EndCopyTo(CopyState cstate);
const char *filename, bool is_program, List *attnamelist,
List *options, bool skip_ext_partition);
static void EndCopyTo(CopyState cstate, uint64 *processed);
static uint64 DoCopyTo(CopyState cstate);
static uint64 CopyToDispatch(CopyState cstate);
static uint64 CopyTo(CopyState cstate);
static uint64 CopyFrom(CopyState cstate);
static uint64 CopyDispatchOnSegment(CopyState cstate, const CopyStmt *stmt);
static uint64 CopyToQueryOnSegment(CopyState cstate);
static void CopyFromInsertBatch(CopyState cstate, EState *estate,
CommandId mycid, int hi_options,
ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
......@@ -247,6 +244,8 @@ static void cdbFlushInsertBatches(List *resultRels,
int hi_options,
TupleTableSlot *baseSlot,
int firstBufferedLineNo);
CopyIntoClause*
MakeCopyIntoClause(CopyStmt *stmt);
/* ==========================================================================
* The following macros aid in major refactoring of data processing code (in
......@@ -1130,8 +1129,18 @@ DoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed)
cstate->partitions = stmt->partitions;
/*
* "copy t to file on segment" CopyDispatchOnSegment
* "copy (select * from t) to file on segment" CopyToQueryOnSegment
* "copy t/(select * from t) to file" DoCopyTo
*/
if (Gp_role == GP_ROLE_DISPATCH && cstate->on_segment)
*processed = CopyDispatchOnSegment(cstate, stmt);
{
if (cstate->rel)
*processed = CopyDispatchOnSegment(cstate, stmt);
else
*processed = CopyToQueryOnSegment(cstate);
}
else
*processed = DoCopyTo(cstate); /* copy from database to file */
}
......@@ -1146,9 +1155,8 @@ DoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed)
}
PG_END_TRY();
EndCopyTo(cstate);
EndCopyTo(cstate, processed);
}
/*
* Close the relation. If reading, we can release the AccessShareLock we
* got; if writing, we should hold the lock until end of transaction to
......@@ -1729,10 +1737,10 @@ BeginCopy(bool is_from,
Node *raw_query,
const char *queryString,
List *attnamelist,
List *options)
List *options,
TupleDesc tupDesc)
{
CopyState cstate;
TupleDesc tupDesc;
int num_phys_attrs;
MemoryContext oldcontext;
......@@ -1758,6 +1766,7 @@ BeginCopy(bool is_from,
0, /* pass correct value when COPY supports no delim */
true);
/* Process the source/target relation or query */
if (rel)
{
......@@ -1774,7 +1783,7 @@ BeginCopy(bool is_from,
errmsg("table \"%s\" does not have OIDs",
RelationGetRelationName(cstate->rel))));
}
else
else if(raw_query)
{
List *rewritten;
Query *query;
......@@ -1809,6 +1818,11 @@ BeginCopy(bool is_from,
query = (Query *) linitial(rewritten);
if (cstate->on_segment && IsA(query, Query))
{
query->parentStmtType = PARENTSTMTTYPE_COPY;
}
/* Query mustn't use INTO, either */
if (query->utilityStmt != NULL &&
IsA(query->utilityStmt, CreateTableAsStmt))
......@@ -1839,6 +1853,9 @@ BeginCopy(bool is_from,
InvalidSnapshot,
dest, NULL,
GP_INSTRUMENT_OPTS);
if (cstate->on_segment)
cstate->queryDesc->plannedstmt->copyIntoClause =
MakeCopyIntoClause(glob_copystmt);
if (gp_enable_gpperfmon && Gp_role == GP_ROLE_DISPATCH)
{
......@@ -2152,6 +2169,175 @@ EndCopy(CopyState cstate)
pfree(cstate);
}
CopyIntoClause*
MakeCopyIntoClause(CopyStmt *stmt)
{
CopyIntoClause *copyIntoClause;
copyIntoClause = makeNode(CopyIntoClause);
copyIntoClause->is_program = stmt->is_program;
copyIntoClause->ao_segnos = stmt->ao_segnos;
copyIntoClause->filename = stmt->filename;
copyIntoClause->options = stmt->options;
copyIntoClause->attlist = stmt->attlist;
return copyIntoClause;
}
CopyState
BeginCopyToOnSegment(QueryDesc *queryDesc)
{
CopyState cstate;
MemoryContext oldcontext;
ListCell *cur;
TupleDesc tupDesc;
int num_phys_attrs;
Form_pg_attribute *attr;
char *filename;
CopyIntoClause *copyIntoClause;
Assert(Gp_role == GP_ROLE_EXECUTE);
copyIntoClause = queryDesc->plannedstmt->copyIntoClause;
tupDesc = queryDesc->tupDesc;
cstate = BeginCopy(false, NULL, NULL, NULL, copyIntoClause->attlist,
copyIntoClause->options, tupDesc);
oldcontext = MemoryContextSwitchTo(cstate->copycontext);
cstate->null_print_client = cstate->null_print; /* default */
/* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
cstate->fe_msgbuf = makeStringInfo();
cstate->filename = pstrdup(copyIntoClause->filename);
cstate->is_program = copyIntoClause->is_program;
if (cstate->on_segment)
MangleCopyFileName(cstate);
filename = cstate->filename;
if (cstate->is_program)
{
cstate->program_pipes = open_program_pipes(cstate->filename, true);
cstate->copy_file = fdopen(cstate->program_pipes->pipes[0], PG_BINARY_W);
if (cstate->copy_file == NULL)
ereport(ERROR,
(errmsg("could not execute command \"%s\": %m",
cstate->filename)));
}
else
{
mode_t oumask; /* Pre-existing umask value */
struct stat st;
/*
* Prevent write to relative path ... too easy to shoot oneself in
* the foot by overwriting a database file ...
*/
if (!is_absolute_path(filename))
ereport(ERROR,
(errcode(ERRCODE_INVALID_NAME),
errmsg("relative path not allowed for COPY to file")));
oumask = umask(S_IWGRP | S_IWOTH);
cstate->copy_file = AllocateFile(filename, PG_BINARY_W);
umask(oumask);
if (cstate->copy_file == NULL)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open file \"%s\" for writing: %m", filename)));
// Increase buffer size to improve performance (cmcdevitt)
setvbuf(cstate->copy_file, NULL, _IOFBF, 393216); // 384 Kbytes
fstat(fileno(cstate->copy_file), &st);
if (S_ISDIR(st.st_mode))
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is a directory", filename)));
}
attr = tupDesc->attrs;
num_phys_attrs = tupDesc->natts;
/* Get info about the columns we need to process. */
cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
foreach(cur, cstate->attnumlist)
{
int attnum = lfirst_int(cur);
Oid out_func_oid;
bool isvarlena;
if (cstate->binary)
getTypeBinaryOutputInfo(attr[attnum - 1]->atttypid,
&out_func_oid,
&isvarlena);
else
getTypeOutputInfo(attr[attnum - 1]->atttypid,
&out_func_oid,
&isvarlena);
fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
}
/*
* Create a temporary memory context that we can reset once per row to
* recover palloc'd memory. This avoids any problems with leaks inside
* datatype output routines, and should be faster than retail pfree's
* anyway. (We don't need a whole econtext as CopyFrom does.)
*/
cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
"COPY TO",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
if (cstate->binary)
{
/* Generate header for a binary copy */
int32 tmp;
/* Signature */
CopySendData(cstate, BinarySignature, 11);
/* Flags field */
tmp = 0;
if (cstate->oids)
tmp |= (1 << 16);
CopySendInt32(cstate, tmp);
/* No header extension */
tmp = 0;
CopySendInt32(cstate, tmp);
}
else
{
/* if a header has been requested send the line */
if (cstate->header_line)
{
bool hdr_delim = false;
foreach(cur, cstate->attnumlist)
{
int attnum = lfirst_int(cur);
char *colname;
if (hdr_delim)
CopySendChar(cstate, cstate->delim[0]);
hdr_delim = true;
colname = NameStr(attr[attnum - 1]->attname);
CopyAttributeOutCSV(cstate, colname, false,
list_length(cstate->attnumlist) == 1);
}
CopySendEndOfRow(cstate);
}
}
MemoryContextSwitchTo(oldcontext);
return cstate;
}
/*
* Setup CopyState to read tuples from a table or a query for COPY TO.
*/
......@@ -2209,7 +2395,7 @@ BeginCopyTo(Relation rel,
errhint("Try the COPY (SELECT ...) TO variant.")));
}
cstate = BeginCopy(false, rel, query, queryString, attnamelist, options);
cstate = BeginCopy(false, rel, query, queryString, attnamelist, options, NULL);
oldcontext = MemoryContextSwitchTo(cstate->copycontext);
cstate->skip_ext_partition = skip_ext_partition;
......@@ -2241,17 +2427,6 @@ BeginCopyTo(Relation rel,
}
}
if (rel == NULL && cstate->on_segment)
{
/*
* Report error because COPY ON SEGMENT doesn't know the data
* location of the result of SELECT query.
*/
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("'COPY (SELECT ...) TO' doesn't support 'ON SEGMENT'.")));
}
bool pipe = (filename == NULL || (Gp_role == GP_ROLE_EXECUTE && !cstate->on_segment));
if (cstate->on_segment && Gp_role == GP_ROLE_DISPATCH)
......@@ -2336,7 +2511,7 @@ BeginCopyToForExternalTable(Relation extrel, List *options)
Assert(RelationIsExternal(extrel));
cstate = BeginCopy(false, extrel, NULL, NULL, NIL, options);
cstate = BeginCopy(false, extrel, NULL, NULL, NIL, options, NULL);
cstate->dispatch_mode = COPY_DIRECT;
/*
......@@ -2418,17 +2593,37 @@ DoCopyTo(CopyState cstate)
return processed;
}
void EndCopyToOnSegment(CopyState cstate)
{
Assert(Gp_role == GP_ROLE_EXECUTE);
if (cstate->binary)
{
/* Generate trailer for a binary copy */
CopySendInt16(cstate, -1);
/* Need to flush out the trailer */
CopySendEndOfRow(cstate);
}
MemoryContextDelete(cstate->rowcontext);
EndCopy(cstate);
}
/*
* Clean up storage and release resources for COPY TO.
*/
static void
EndCopyTo(CopyState cstate)
EndCopyTo(CopyState cstate, uint64 *processed)
{
if (cstate->queryDesc != NULL)
{
/* Close down the query and free resources. */
ExecutorFinish(cstate->queryDesc);
ExecutorEnd(cstate->queryDesc);
if (cstate->queryDesc->es_processed > 0)
*processed = cstate->queryDesc->es_processed;
FreeQueryDesc(cstate->queryDesc);
PopActiveSnapshot();
}
......@@ -2600,6 +2795,16 @@ CopyToDispatch(CopyState cstate)
return processed;
}
static uint64
CopyToQueryOnSegment(CopyState cstate)
{
Assert(Gp_role != GP_ROLE_EXECUTE);
/* run the plan --- the dest receiver will send tuples */
ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L);
return 0;
}
/*
* Copy from relation or query TO file.
*/
......@@ -4266,7 +4471,7 @@ BeginCopyFrom(Relation rel,
MemoryContext oldcontext;
bool volatile_defexprs;
cstate = BeginCopy(true, rel, NULL, NULL, attnamelist, options);
cstate = BeginCopy(true, rel, NULL, NULL, attnamelist, options, NULL);
oldcontext = MemoryContextSwitchTo(cstate->copycontext);
/*
......@@ -6858,7 +7063,10 @@ truncateEolStr(char *str, EolType eol_type)
static void
copy_dest_startup(DestReceiver *self __attribute__((unused)), int operation __attribute__((unused)), TupleDesc typeinfo __attribute__((unused)))
{
/* no-op */
if (Gp_role == GP_ROLE_DISPATCH)
return;
DR_copy *myState = (DR_copy *) self;
myState->cstate = BeginCopyToOnSegment(myState->queryDesc);
}
/*
......@@ -6884,7 +7092,10 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
static void
copy_dest_shutdown(DestReceiver *self __attribute__((unused)))
{
/* no-op */
if (Gp_role == GP_ROLE_DISPATCH)
return;
DR_copy *myState = (DR_copy *) self;
EndCopyToOnSegment(myState->cstate);
}
/*
......
......@@ -56,6 +56,7 @@
#include "catalog/pg_attribute_encoding.h"
#include "catalog/pg_type.h"
#include "cdb/cdbpartition.h"
#include "commands/copy.h"
#include "commands/createas.h"
#include "commands/matview.h"
#include "commands/tablecmds.h" /* XXX: temp for get_parts() */
......@@ -2062,6 +2063,11 @@ InitPlan(QueryDesc *queryDesc, int eflags)
*/
if (queryDesc->plannedstmt->intoClause != NULL)
intorel_initplan(queryDesc, eflags);
else if(queryDesc->plannedstmt->copyIntoClause != NULL)
{
queryDesc->dest = CreateCopyDestReceiver();
((DR_copy*)queryDesc->dest)->queryDesc = queryDesc;
}
if (DEBUG1 >= log_min_messages)
{
......@@ -4680,7 +4686,7 @@ FillSliceTable(EState *estate, PlannedStmt *stmt)
cxt.estate = estate;
cxt.currentSliceId = 0;
if (stmt->intoClause != NULL)
if (stmt->intoClause != NULL || stmt->copyIntoClause != NULL)
{
Slice *currentSlice = (Slice *) linitial(sliceTable->slices);
int numsegments;
......
......@@ -1459,8 +1459,8 @@ CQueryMutators::ConvertToDerivedTable
new_query->rtable = gpdb::LAppend(new_query->rtable, rte);
// new_query->intoClause = origIntoClause;
new_query->intoPolicy = into_policy;
new_query->isCTAS = derived_table_query->isCTAS;
derived_table_query->isCTAS = false;
new_query->parentStmtType = derived_table_query->parentStmtType;
derived_table_query->parentStmtType = PARENTSTMTTYPE_NONE;
FromExpr *fromexpr = MakeNode(FromExpr);
fromexpr->quals = NULL;
......
......@@ -429,10 +429,14 @@ CTranslatorQueryToDXL::CheckSupportedCmdType
// refactoring commit 9dbf2b7d . We are temporarily *always* falling
// back. Detect CTAS harder when we get back to it.
if (!optimizer_enable_ctas && query->isCTAS)
if (!optimizer_enable_ctas && query->parentStmtType == PARENTSTMTTYPE_CTAS)
{
GPOS_RAISE(gpdxl::ExmaDXL, gpdxl::ExmiQuery2DXLUnsupportedFeature, GPOS_WSZ_LIT("CTAS. Set optimizer_enable_ctas to on to enable CTAS with GPORCA"));
}
if (query->parentStmtType == PARENTSTMTTYPE_COPY)
{
GPOS_RAISE(gpdxl::ExmaDXL, gpdxl::ExmiQuery2DXLUnsupportedFeature, GPOS_WSZ_LIT("COPY. Copy select statement to file on segment is not supported with GPORCA"));
}
// supported: regular select or CTAS when it is enabled
return;
......@@ -681,7 +685,7 @@ CTranslatorQueryToDXL::TranslateQueryToDXL()
switch (m_query->commandType)
{
case CMD_SELECT:
if (!m_query->isCTAS)
if (m_query->parentStmtType == PARENTSTMTTYPE_NONE)
{
return TranslateSelectQueryToDXL();
}
......
......@@ -131,6 +131,7 @@ _copyPlannedStmt(const PlannedStmt *from)
COPY_SCALAR_FIELD(query_mem);
COPY_NODE_FIELD(intoClause);
COPY_NODE_FIELD(copyIntoClause);
return newnode;
}
......@@ -1513,6 +1514,23 @@ _copyIntoClause(const IntoClause *from)
return newnode;
}
/*
* _copyIntoClause
*/
static CopyIntoClause *
_copyCopyIntoClause(const CopyIntoClause *from)
{
CopyIntoClause *newnode = makeNode(CopyIntoClause);
COPY_NODE_FIELD(attlist);
COPY_SCALAR_FIELD(is_program);
COPY_STRING_FIELD(filename);
COPY_NODE_FIELD(options);
COPY_NODE_FIELD(ao_segnos);
return newnode;
}
/*
* We don't need a _copyExpr because Expr is an abstract supertype which
* should never actually get instantiated. Also, since it has no common
......@@ -3167,7 +3185,7 @@ _copyQuery(const Query *from)
COPY_NODE_FIELD(setOperations);
COPY_NODE_FIELD(constraintDeps);
COPY_NODE_FIELD(intoPolicy);
COPY_SCALAR_FIELD(isCTAS);
COPY_SCALAR_FIELD(parentStmtType);
COPY_SCALAR_FIELD(needReshuffle);
return newnode;
......@@ -5294,6 +5312,9 @@ copyObject(const void *from)
case T_IntoClause:
retval = _copyIntoClause(from);
break;
case T_CopyIntoClause:
retval = _copyCopyIntoClause(from);
break;
case T_Var:
retval = _copyVar(from);
break;
......
......@@ -930,7 +930,7 @@ _equalQuery(const Query *a, const Query *b)
if (!GpPolicyEqual(a->intoPolicy, b->intoPolicy))
return false;
COMPARE_SCALAR_FIELD(isCTAS);
COMPARE_SCALAR_FIELD(parentStmtType);
COMPARE_SCALAR_FIELD(needReshuffle);
return true;
......
......@@ -360,6 +360,7 @@ _outPlannedStmt(StringInfo str, PlannedStmt *node)
WRITE_NODE_FIELD(intoPolicy);
WRITE_UINT64_FIELD(query_mem);
WRITE_NODE_FIELD(intoClause);
WRITE_NODE_FIELD(copyIntoClause);
}
static void
......@@ -869,7 +870,7 @@ _outQuery(StringInfo str, Query *node)
WRITE_NODE_FIELD(rowMarks);
WRITE_NODE_FIELD(setOperations);
WRITE_NODE_FIELD(constraintDeps);
WRITE_BOOL_FIELD(isCTAS);
WRITE_BOOL_FIELD(parentStmtType);
WRITE_BOOL_FIELD(needReshuffle);
/* Don't serialize policy */
......@@ -1479,6 +1480,9 @@ _outNode(StringInfo str, void *obj)
case T_IntoClause:
_outIntoClause(str, obj);
break;
case T_CopyIntoClause:
_outCopyIntoClause(str, obj);
break;
case T_Var:
_outVar(str, obj);
break;
......
......@@ -322,6 +322,7 @@ _outPlannedStmt(StringInfo str, const PlannedStmt *node)
WRITE_UINT64_FIELD(query_mem);
WRITE_NODE_FIELD(intoClause);
WRITE_NODE_FIELD(copyIntoClause);
}
#endif /* COMPILING_BINARY_FUNCS */
......@@ -1329,6 +1330,19 @@ _outIntoClause(StringInfo str, const IntoClause *node)
WRITE_NODE_FIELD(distributedBy);
}
static void
_outCopyIntoClause(StringInfo str, const CopyIntoClause *node)
{
WRITE_NODE_TYPE("COPYINTOCLAUSE");
WRITE_NODE_FIELD(attlist);
WRITE_BOOL_FIELD(is_program);
WRITE_STRING_FIELD(filename);
WRITE_NODE_FIELD(options);
WRITE_NODE_FIELD(ao_segnos);
}
static void
_outVar(StringInfo str, const Var *node)
{
......@@ -3774,7 +3788,7 @@ _outQuery(StringInfo str, const Query *node)
WRITE_NODE_FIELD(rowMarks);
WRITE_NODE_FIELD(setOperations);
WRITE_NODE_FIELD(constraintDeps);
WRITE_BOOL_FIELD(isCTAS);
WRITE_BOOL_FIELD(parentStmtType);
WRITE_BOOL_FIELD(needReshuffle);
/* Don't serialize policy */
......@@ -4837,6 +4851,9 @@ _outNode(StringInfo str, const void *obj)
case T_IntoClause:
_outIntoClause(str, obj);
break;
case T_CopyIntoClause:
_outCopyIntoClause(str, obj);
break;
case T_Var:
_outVar(str, obj);
break;
......
......@@ -254,7 +254,7 @@ _readQuery(void)
READ_NODE_FIELD(rowMarks);
READ_NODE_FIELD(setOperations);
READ_NODE_FIELD(constraintDeps);
READ_BOOL_FIELD(isCTAS);
READ_BOOL_FIELD(parentStmtType);
READ_BOOL_FIELD(needReshuffle);
/* policy not serialized */
......@@ -1421,6 +1421,7 @@ _readPlannedStmt(void)
READ_UINT64_FIELD(query_mem);
READ_NODE_FIELD(intoClause);
READ_NODE_FIELD(copyIntoClause);
READ_DONE();
}
......@@ -3239,6 +3240,9 @@ readNodeBinary(void)
case T_IntoClause:
return_value = _readIntoClause();
break;
case T_CopyIntoClause:
return_value = _readCopyIntoClause();
break;
case T_Var:
return_value = _readVar();
break;
......
......@@ -402,7 +402,7 @@ _readQuery(void)
READ_NODE_FIELD(rowMarks);
READ_NODE_FIELD(setOperations);
READ_NODE_FIELD(constraintDeps);
READ_BOOL_FIELD(isCTAS);
READ_BOOL_FIELD(parentStmtType);
READ_BOOL_FIELD(needReshuffle);
local_node->intoPolicy = NULL;
......@@ -690,6 +690,20 @@ _readIntoClause(void)
READ_DONE();
}
static CopyIntoClause *
_readCopyIntoClause(void)
{
READ_LOCALS(CopyIntoClause);
READ_NODE_FIELD(attlist);
READ_BOOL_FIELD(is_program);
READ_STRING_FIELD(filename);
READ_NODE_FIELD(options);
READ_NODE_FIELD(ao_segnos);
READ_DONE();
}
/*
* _readVar
*/
......@@ -2963,6 +2977,8 @@ parseNodeString(void)
return_value = _readRangeVar();
else if (MATCH("INTOCLAUSE", 10))
return_value = _readIntoClause();
else if (MATCH("COPYINTOCLAUSE", 10))
return_value = _readCopyIntoClause();
else if (MATCH("VAR", 3))
return_value = _readVar();
else if (MATCH("CONST", 5))
......
......@@ -2680,7 +2680,7 @@ grouping_planner(PlannerInfo *root, double tuple_fraction)
*/
if ((parse->distinctClause || parse->sortClause) &&
(root->config->honor_order_by || !root->parent_root) &&
!parse->isCTAS &&
parse->parentStmtType == PARENTSTMTTYPE_NONE &&
/*
* GPDB_84_MERGE_FIXME: Does this do the right thing, if you have a
* SELECT DISTINCT query as argument to a table function?
......
......@@ -89,7 +89,7 @@ normalize_query(Query *query)
* queries like "SELECT function()", which would be executed on the QD
* anyway.
*/
if (res->commandType != CMD_SELECT || res->isCTAS)
if (res->commandType != CMD_SELECT || res->parentStmtType != PARENTSTMTTYPE_NONE)
{
if (safe_to_replace_sirvf_tle(res))
{
......
......@@ -3291,8 +3291,8 @@ transformCreateTableAsStmt(ParseState *pstate, CreateTableAsStmt *stmt)
result->commandType = CMD_UTILITY;
result->utilityStmt = (Node *) stmt;
/* GPDB: Set isCTAS to be true as we know this query is for CTAS */
((Query*)stmt->query)->isCTAS = true;
/* GPDB: Set parentStmtType to PARENTSTMTTYPE_CTAS as we know this query is for CTAS */
((Query*)stmt->query)->parentStmtType = PARENTSTMTTYPE_CTAS;
if (stmt->into->distributedBy && Gp_role == GP_ROLE_DISPATCH)
setQryDistributionPolicy(stmt->into, (Query *)stmt->query);
......
......@@ -204,6 +204,10 @@ autostats_get_cmdtype(QueryDesc *queryDesc, AutoStatsCmdType * pcmdType, Oid *pr
relationOid = GetIntoRelOid(queryDesc);
cmdType = AUTOSTATS_CMDTYPE_CTAS;
}
else if (stmt->copyIntoClause != NULL)
{
cmdType = AUTOSTATS_CMDTYPE_COPY;
}
break;
case CMD_INSERT:
......
......@@ -276,7 +276,7 @@ ProcessQuery(Portal portal,
*/
if (Gp_role == GP_ROLE_EXECUTE &&
queryDesc->plannedstmt &&
queryDesc->plannedstmt->intoClause)
queryDesc->plannedstmt->intoClause != NULL)
eflag = GetIntoRelEFlags(queryDesc->plannedstmt->intoClause);
ExecutorStart(queryDesc, eflag);
......@@ -385,7 +385,7 @@ ChoosePortalStrategy(List *stmts)
{
if (query->commandType == CMD_SELECT &&
query->utilityStmt == NULL &&
!query->isCTAS)
query->parentStmtType == PARENTSTMTTYPE_NONE)
{
if (query->hasModifyingCTE)
return PORTAL_ONE_MOD_WITH;
......@@ -410,7 +410,8 @@ ChoosePortalStrategy(List *stmts)
{
if (pstmt->commandType == CMD_SELECT &&
pstmt->utilityStmt == NULL &&
pstmt->intoClause == NULL)
pstmt->intoClause == NULL &&
pstmt->copyIntoClause == NULL)
{
if (pstmt->hasModifyingCTE)
return PORTAL_ONE_MOD_WITH;
......@@ -522,7 +523,7 @@ FetchStatementTargetList(Node *stmt)
{
if (query->commandType == CMD_SELECT &&
query->utilityStmt == NULL &&
!query->isCTAS)
query->parentStmtType == PARENTSTMTTYPE_NONE)
return query->targetList;
if (query->returningList)
return query->returningList;
......@@ -535,7 +536,8 @@ FetchStatementTargetList(Node *stmt)
if (pstmt->commandType == CMD_SELECT &&
pstmt->utilityStmt == NULL &&
pstmt->intoClause == NULL)
pstmt->intoClause == NULL &&
pstmt->copyIntoClause == NULL)
return pstmt->planTree->targetlist;
if (pstmt->hasReturning)
return pstmt->planTree->targetlist;
......
......@@ -686,7 +686,7 @@ RevalidateCachedQuery(CachedPlanSource *plansource, IntoClause *intoClause)
{
Assert(list_length(tlist) == 1);
Query *query = (Query *) linitial(tlist);
query->isCTAS = true;
query->parentStmtType = PARENTSTMTTYPE_CTAS;
}
/* Release snapshot if we got one */
......
......@@ -34,7 +34,6 @@ typedef enum CopyDest
} CopyDest;
/* CopyStateData is private in commands/copy.c */
typedef struct CopyStateData *CopyState;
typedef int (*copy_data_source_cb) (void *outbuf, int datasize, void *extra);
/*
......@@ -283,6 +282,17 @@ typedef struct CopyStateData
/* end Greenplum Database specific variables */
} CopyStateData;
typedef struct CopyStateData *CopyState;
/* DestReceiver for COPY (SELECT) TO */
typedef struct
{
DestReceiver pub; /* publicly-known function pointers */
CopyState cstate; /* CopyStateData for the command */
QueryDesc *queryDesc; /* QueryDesc for the copy*/
uint64 processed; /* # of tuples processed */
} DR_copy;
/*
* Some platforms like macOS (since Yosemite) already define 64 bit versions
* of htonl and nhohl so we need to guard against redefinition.
......@@ -303,6 +313,9 @@ extern CopyState BeginCopyFrom(Relation rel, const char *filename,
bool is_program, copy_data_source_cb data_source_cb,
void *data_source_cb_extra,
List *attnamelist, List *options, List *ao_segnos);
extern CopyState
BeginCopyToOnSegment(QueryDesc *queryDesc);
extern void EndCopyToOnSegment(CopyState cstate);
extern CopyState BeginCopyToForExternalTable(Relation extrel, List *options);
extern void EndCopyFrom(CopyState cstate);
extern bool NextCopyFrom(CopyState cstate, ExprContext *econtext,
......
......@@ -232,6 +232,7 @@ typedef enum NodeTag
T_JoinExpr,
T_FromExpr,
T_IntoClause,
T_CopyIntoClause,
T_Flow,
T_Grouping,
T_GroupId,
......
......@@ -88,6 +88,30 @@ typedef uint32 AclMode; /* a bitmask of privilege bits */
* Query Tree
*****************************************************************************/
/*
* ParentStmtType represents whether the query is included in
* a utility stmt. And it indicates the type of this utility stmt.
* PARENTSTMTTYPE_NONE query is not included in a utility stmt.
* PARENTSTMTTYPE_CTAS query is included in a CreateTableAsStmt.
* PARENTSTMTTYPE_COPY query is included in a CopyStmt.
*
* Previously we added the isCtas field to Query to indicate that
* the query is included in CreateTableAsStmt. For this type of
* query, you need to make a different MPP plan. The copy statement
* also contains the query, which also requires a different query
* plan.
* In postgres, we don't need to make a different query plan for the
* query in the utility stament. But in greenplum, we need to. So we
* use a field to indicate whether the query is contained in utitily
* statemnt, and the type of utitily statemnt.
*/
typedef uint8 ParentStmtType;
#define PARENTSTMTTYPE_NONE 0
#define PARENTSTMTTYPE_CTAS 1
#define PARENTSTMTTYPE_COPY 2
/*
* Query -
* Parse analysis turns all statements into a Query tree
......@@ -184,10 +208,10 @@ typedef struct Query
struct GpPolicy *intoPolicy;
/*
* GPDB: Used to indicate this query is part of CTAS so that its plan would
* always be dispatched in parallel.
* GPDB: Used to indicate this query is part of CTAS or COPY so that its plan
* would always be dispatched in parallel.
*/
bool isCTAS;
ParentStmtType parentStmtType;
/*
* Do we need to reshuffle data, we use an UpdateStmt
......@@ -198,7 +222,6 @@ typedef struct Query
} Query;
/****************************************************************************
* Supporting data structures for Parse Trees
*
......
......@@ -20,6 +20,7 @@
#include "access/sdir.h"
#include "nodes/bitmapset.h"
#include "nodes/primnodes.h"
#include "parsenodes.h"
typedef struct DirectDispatchInfo
{
......@@ -143,6 +144,7 @@ typedef struct PlannedStmt
* to be dispatched to QEs.
*/
IntoClause *intoClause;
CopyIntoClause *copyIntoClause;
} PlannedStmt;
/*
......
......@@ -106,6 +106,18 @@ typedef struct IntoClause
Node *distributedBy; /* GPDB: columns to distribubte the data on. */
} IntoClause;
typedef struct CopyIntoClause
{
NodeTag type;
List *attlist; /* List of column names (as Strings), or NIL
* for all columns */
bool is_program; /* is 'filename' a program to popen? */
char *filename; /* filename, or NULL for STDIN/STDOUT */
List *options; /* List of DefElem nodes */
List *ao_segnos; /* AO segno map */
} CopyIntoClause;
/* ----------------------------------------------------------------
* node types for executable expressions
......
......@@ -687,7 +687,7 @@ SELECT * FROM segment_reject_limit_from;
COPY segment_reject_limit_from to STDOUT on segment;
COPY segment_reject_limit_from to PROGRAM STDOUT;
-- 'COPY (SELECT ...) TO' doesn't support 'ON SEGMENT'
-- 'COPY (SELECT ...) TO' has supported 'ON SEGMENT'
COPY (SELECT * FROM segment_reject_limit_from) TO '/tmp/segment_reject_limit<SEGID>.csv' ON SEGMENT;
-- SREH is not supported by COPY TO.
......@@ -780,6 +780,10 @@ COPY test_copy_on_segment_nocol TO '/tmp/valid_filename_nocol<SEGID>.txt' ON SEG
COPY test_copy_on_segment_nocol FROM '/tmp/valid_filename_nocol<SEGID>.txt' ON SEGMENT;
SELECT * FROM test_copy_on_segment_nocol;
COPY (select * from test_copy_on_segment_nocol) TO '/tmp/valid_filename_nocol<SEGID>.txt' ON SEGMENT;
COPY test_copy_on_segment_nocol FROM '/tmp/valid_filename_nocol<SEGID>.txt' ON SEGMENT;
SELECT * FROM test_copy_on_segment_nocol;
CREATE TABLE test_copy_on_segment_array (a int[], b text) DISTRIBUTED BY (a);
INSERT INTO test_copy_on_segment_array VALUES ('{1,2,3}', 'sd');
INSERT INTO test_copy_on_segment_array VALUES ('{2,2,3}', 'fg');
......@@ -793,6 +797,11 @@ CREATE TABLE test_copy_on_segment_array_1 (a int[], b text) DISTRIBUTED BY (a);
COPY test_copy_on_segment_array_1 FROM '/tmp/valid_filename_array<SEGID>.txt' ON SEGMENT;
SELECT * FROM test_copy_on_segment_array EXCEPT SELECT * FROM test_copy_on_segment_array_1;
delete from test_copy_on_segment_array_1;
COPY (select * from test_copy_on_segment_array) TO '/tmp/valid_filename_array_select<SEGID>.txt' ON SEGMENT;
COPY test_copy_on_segment_array_1 FROM '/tmp/valid_filename_array_select<SEGID>.txt' ON SEGMENT;
SELECT * FROM test_copy_on_segment_array EXCEPT SELECT * FROM test_copy_on_segment_array_1;
CREATE TABLE test_copy_on_segment_2dim_array (a int[][]) DISTRIBUTED BY (a);
INSERT INTO test_copy_on_segment_2dim_array VALUES ('{{1,2,3},{2,5,9}}');
INSERT INTO test_copy_on_segment_2dim_array VALUES ('{{1,8,3},{2,5,9}}');
......@@ -806,6 +815,11 @@ CREATE TABLE test_copy_on_segment_2dim_array_1 (a int[][]) DISTRIBUTED BY (a);
COPY test_copy_on_segment_2dim_array_1 FROM '/tmp/valid_filename_2dim_array<SEGID>.txt' ON SEGMENT;
SELECT * FROM test_copy_on_segment_2dim_array EXCEPT SELECT * FROM test_copy_on_segment_2dim_array_1;
delete from test_copy_on_segment_2dim_array_1;
COPY (select * from test_copy_on_segment_2dim_array) TO '/tmp/valid_filename_2dim_array_select<SEGID>.txt' ON SEGMENT;
COPY test_copy_on_segment_2dim_array_1 FROM '/tmp/valid_filename_2dim_array_select<SEGID>.txt' ON SEGMENT;
SELECT * FROM test_copy_on_segment_2dim_array EXCEPT SELECT * FROM test_copy_on_segment_2dim_array_1;
CREATE TABLE test_copy_on_segment (a int, b text, c text) DISTRIBUTED BY (b);
INSERT INTO test_copy_on_segment VALUES (1, 's', 'd');
INSERT INTO test_copy_on_segment VALUES (2, 'f', 'g');
......@@ -841,6 +855,29 @@ CREATE TABLE test_copy_from_on_segment_withoids (LIKE test_copy_on_segment_witho
COPY test_copy_from_on_segment_withoids FROM '/tmp/withoids_valid_filename<SEGID>.csv' WITH ON SEGMENT OIDS CSV QUOTE '"' ESCAPE E'\\' NULL '\N' DELIMITER ',' IGNORE EXTERNAL PARTITIONS;
SELECT * FROM test_copy_from_on_segment_withoids ORDER BY a;
COPY (select * from test_copy_on_segment) TO '/tmp/invalid_filename_select.txt' ON SEGMENT;
COPY (select * from test_copy_on_segment) TO '/tmp/valid_filename_select<SEGID>.txt' ON SEGMENT;
COPY (select * from test_copy_on_segment) TO '/tmp/valid_filename_select<SEGID>.bin' ON SEGMENT BINARY;
COPY test_copy_on_segment TO '/tmp/valid_filename_select<SEGID>.csv' WITH ON SEGMENT CSV QUOTE '"' FORCE QUOTE a,b,c ESCAPE E'\\' NULL '\N' DELIMITER ',' HEADER;
COPY test_copy_on_segment_withoids TO '/tmp/withoids_valid_filename_select<SEGID>.csv' WITH ON SEGMENT OIDS CSV QUOTE '"' FORCE QUOTE a,b,c ESCAPE E'\\' NULL '\N' DELIMITER ',';
delete from test_copy_from_on_segment_txt;
COPY test_copy_from_on_segment_txt FROM '/tmp/invalid_filename_select.txt' ON SEGMENT;
COPY test_copy_from_on_segment_txt FROM '/tmp/valid_filename_select<SEGID>.txt' ON SEGMENT;
SELECT * FROM test_copy_from_on_segment_txt ORDER BY a;
delete from test_copy_from_on_segment_binary;
COPY test_copy_from_on_segment_binary FROM '/tmp/valid_filename_select<SEGID>.bin' ON SEGMENT BINARY;
SELECT * FROM test_copy_from_on_segment_binary ORDER BY a;
delete from test_copy_from_on_segment_csv;
COPY test_copy_from_on_segment_csv FROM '/tmp/valid_filename_select<SEGID>.csv' WITH ON SEGMENT CSV QUOTE '"' ESCAPE E'\\' NULL '\N' DELIMITER ',' HEADER;
SELECT * FROM test_copy_from_on_segment_csv ORDER BY a;
delete from test_copy_from_on_segment_withoids;
COPY test_copy_from_on_segment_withoids FROM '/tmp/withoids_valid_filename_select<SEGID>.csv' WITH ON SEGMENT OIDS CSV QUOTE '"' ESCAPE E'\\' NULL '\N' DELIMITER ',';
SELECT * FROM test_copy_from_on_segment_withoids ORDER BY a;
CREATE TABLE onek_copy_onsegment (
unique1 int4,
unique2 int4,
......@@ -868,6 +905,13 @@ COPY onek_copy_from_onsegment FROM '/tmp/valid_filename_onek_copy_onsegment<SEGI
SELECT * FROM onek_copy_onsegment EXCEPT SELECT * FROM onek_copy_from_onsegment;
SELECT count(*) FROM onek_copy_from_onsegment;
COPY (select * from onek_copy_onsegment) TO '/tmp/valid_filename_onek_copy_onsegment_select<SEGID>.txt' ON SEGMENT;
delete from onek_copy_from_onsegment;
COPY onek_copy_from_onsegment FROM '/tmp/valid_filename_onek_copy_onsegment_select<SEGID>.txt' ON SEGMENT;
SELECT * FROM onek_copy_onsegment EXCEPT SELECT * FROM onek_copy_from_onsegment;
SELECT count(*) FROM onek_copy_from_onsegment;
CREATE EXTERNAL WEB TABLE rm_copy_onsegment_files (a int)
EXECUTE E'(rm -rf /tmp/*valid_filename*.*)'
ON SEGMENT 0
......@@ -896,6 +940,9 @@ DROP TABLE IF EXISTS LINEITEM_2;
DROP TABLE IF EXISTS LINEITEM_3;
DROP TABLE IF EXISTS LINEITEM_4;
DROP TABLE IF EXISTS LINEITEM_5;
DROP TABLE IF EXISTS LINEITEM_6;
DROP TABLE IF EXISTS LINEITEM_7;
DROP TABLE IF EXISTS LINEITEM_8;
-- end_ignore
CREATE TABLE LINEITEM ( L_ORDERKEY INTEGER NOT NULL,
L_PARTKEY INTEGER NOT NULL,
......@@ -923,11 +970,15 @@ CREATE TABLE LINEITEM_2 (LIKE LINEITEM);
CREATE TABLE LINEITEM_3 (LIKE LINEITEM);
CREATE TABLE LINEITEM_4 (LIKE LINEITEM);
CREATE TABLE LINEITEM_5 (LIKE LINEITEM);
CREATE TABLE LINEITEM_6 (LIKE LINEITEM);
CREATE TABLE LINEITEM_7 (LIKE LINEITEM);
CREATE TABLE LINEITEM_8 (LIKE LINEITEM);
COPY LINEITEM FROM '@abs_srcdir@/data/lineitem.csv' WITH DELIMITER '|' CSV;
SELECT COUNT(*) FROM LINEITEM;
COPY LINEITEM TO '/tmp/lineitem.csv' CSV;
COPY LINEITEM TO '/tmp/lineitem_s<SEGID>.csv' ON SEGMENT CSV;
COPY (select * from LINEITEM) TO '/tmp/lineitem_qs<SEGID>.csv' ON SEGMENT CSV;
COPY LINEITEM_1 FROM '/tmp/lineitem.csv' CSV;
SELECT COUNT(*) FROM LINEITEM_1;
......@@ -937,21 +988,35 @@ COPY LINEITEM_2 FROM '/tmp/lineitem_s<SEGID>.csv' ON SEGMENT CSV;
SELECT COUNT(*) FROM LINEITEM_2;
SELECT * FROM LINEITEM EXCEPT SELECT * FROM LINEITEM_2;
COPY LINEITEM TO PROGRAM 'cat > /tmp/lineitem_program.csv' CSV;
COPY LINEITEM_3 FROM PROGRAM 'cat /tmp/lineitem_program.csv' CSV;
COPY LINEITEM_3 FROM '/tmp/lineitem_qs<SEGID>.csv' ON SEGMENT CSV;
SELECT COUNT(*) FROM LINEITEM_3;
SELECT * FROM LINEITEM EXCEPT SELECT * FROM LINEITEM_3;
COPY LINEITEM TO PROGRAM 'cat > /tmp/lineitem_program<SEGID>.csv' ON SEGMENT CSV;
COPY LINEITEM_4 FROM PROGRAM 'cat /tmp/lineitem_program<SEGID>.csv' ON SEGMENT CSV;
COPY LINEITEM TO PROGRAM 'cat > /tmp/lineitem_program.csv' CSV;
COPY LINEITEM_4 FROM PROGRAM 'cat /tmp/lineitem_program.csv' CSV;
SELECT COUNT(*) FROM LINEITEM_4;
SELECT * FROM LINEITEM EXCEPT SELECT * FROM LINEITEM_4;
\COPY LINEITEM TO PROGRAM 'cat > /tmp/lineitem_program_client.csv' CSV;
\COPY LINEITEM_5 FROM PROGRAM 'cat /tmp/lineitem_program_client.csv' CSV;
COPY (select * from LINEITEM) TO PROGRAM 'cat > /tmp/lineitem_program.csv' CSV;
COPY LINEITEM_5 FROM PROGRAM 'cat /tmp/lineitem_program.csv' CSV;
SELECT COUNT(*) FROM LINEITEM_5;
SELECT * FROM LINEITEM EXCEPT SELECT * FROM LINEITEM_5;
COPY LINEITEM TO PROGRAM 'cat > /tmp/lineitem_program<SEGID>.csv' ON SEGMENT CSV;
COPY LINEITEM_6 FROM PROGRAM 'cat /tmp/lineitem_program<SEGID>.csv' ON SEGMENT CSV;
SELECT COUNT(*) FROM LINEITEM_6;
SELECT * FROM LINEITEM EXCEPT SELECT * FROM LINEITEM_6;
COPY (select * from LINEITEM) TO PROGRAM 'cat > /tmp/lineitem_program<SEGID>.csv' ON SEGMENT CSV;
COPY LINEITEM_7 FROM PROGRAM 'cat /tmp/lineitem_program<SEGID>.csv' ON SEGMENT CSV;
SELECT COUNT(*) FROM LINEITEM_7;
SELECT * FROM LINEITEM EXCEPT SELECT * FROM LINEITEM_7;
\COPY LINEITEM TO PROGRAM 'cat > /tmp/lineitem_program_client.csv' CSV;
\COPY LINEITEM_8 FROM PROGRAM 'cat /tmp/lineitem_program_client.csv' CSV;
SELECT COUNT(*) FROM LINEITEM_8;
SELECT * FROM LINEITEM EXCEPT SELECT * FROM LINEITEM_8;
--Test for `COPY FROM ON SEGMENT` checking the distribution key restriction
-- start_matchsubs
-- m/^CONTEXT: COPY .*, line \d*: .*$/
......@@ -1063,6 +1128,7 @@ CREATE TABLE COPY_TO_PROGRAM_ERROR(dir text);
COPY COPY_TO_PROGRAM_ERROR TO PROGRAM 'echo && echo "error" >&2 && exit 255';
COPY COPY_TO_PROGRAM_ERROR TO PROGRAM 'echo <SEGID>&& echo "error" >&2 && exit 255' on segment;
COPY (SELECT * FROM COPY_TO_PROGRAM_ERROR) TO PROGRAM 'echo <SEGID>&& echo "error" >&2 && exit 255' on segment;
CREATE TABLE COPY_FROM_PROGRAM_ERROR(a int);
......
......@@ -722,9 +722,8 @@ COPY segment_reject_limit_from to PROGRAM STDOUT;
ERROR: STDIN/STDOUT not allowed with PROGRAM
LINE 1: COPY segment_reject_limit_from to PROGRAM STDOUT;
^
-- 'COPY (SELECT ...) TO' doesn't support 'ON SEGMENT'
-- 'COPY (SELECT ...) TO' has supported 'ON SEGMENT'
COPY (SELECT * FROM segment_reject_limit_from) TO '/tmp/segment_reject_limit<SEGID>.csv' ON SEGMENT;
ERROR: 'COPY (SELECT ...) TO' doesn't support 'ON SEGMENT'.
-- SREH is not supported by COPY TO.
COPY segment_reject_limit_from to STDOUT log errors segment reject limit 3 rows;
ERROR: COPY single row error handling only available using COPY FROM
......@@ -793,6 +792,12 @@ SELECT * FROM test_copy_on_segment_nocol;
--
(0 rows)
COPY (select * from test_copy_on_segment_nocol) TO '/tmp/valid_filename_nocol<SEGID>.txt' ON SEGMENT;
COPY test_copy_on_segment_nocol FROM '/tmp/valid_filename_nocol<SEGID>.txt' ON SEGMENT;
SELECT * FROM test_copy_on_segment_nocol;
--
(0 rows)
CREATE TABLE test_copy_on_segment_array (a int[], b text) DISTRIBUTED BY (a);
INSERT INTO test_copy_on_segment_array VALUES ('{1,2,3}', 'sd');
INSERT INTO test_copy_on_segment_array VALUES ('{2,2,3}', 'fg');
......@@ -808,6 +813,14 @@ SELECT * FROM test_copy_on_segment_array EXCEPT SELECT * FROM test_copy_on_segme
---+---
(0 rows)
delete from test_copy_on_segment_array_1;
COPY (select * from test_copy_on_segment_array) TO '/tmp/valid_filename_array_select<SEGID>.txt' ON SEGMENT;
COPY test_copy_on_segment_array_1 FROM '/tmp/valid_filename_array_select<SEGID>.txt' ON SEGMENT;
SELECT * FROM test_copy_on_segment_array EXCEPT SELECT * FROM test_copy_on_segment_array_1;
a | b
---+---
(0 rows)
CREATE TABLE test_copy_on_segment_2dim_array (a int[][]) DISTRIBUTED BY (a);
INSERT INTO test_copy_on_segment_2dim_array VALUES ('{{1,2,3},{2,5,9}}');
INSERT INTO test_copy_on_segment_2dim_array VALUES ('{{1,8,3},{2,5,9}}');
......@@ -823,6 +836,14 @@ SELECT * FROM test_copy_on_segment_2dim_array EXCEPT SELECT * FROM test_copy_on_
---
(0 rows)
delete from test_copy_on_segment_2dim_array_1;
COPY (select * from test_copy_on_segment_2dim_array) TO '/tmp/valid_filename_2dim_array_select<SEGID>.txt' ON SEGMENT;
COPY test_copy_on_segment_2dim_array_1 FROM '/tmp/valid_filename_2dim_array_select<SEGID>.txt' ON SEGMENT;
SELECT * FROM test_copy_on_segment_2dim_array EXCEPT SELECT * FROM test_copy_on_segment_2dim_array_1;
a
---
(0 rows)
CREATE TABLE test_copy_on_segment (a int, b text, c text) DISTRIBUTED BY (b);
INSERT INTO test_copy_on_segment VALUES (1, 's', 'd');
INSERT INTO test_copy_on_segment VALUES (2, 'f', 'g');
......@@ -889,6 +910,60 @@ SELECT * FROM test_copy_from_on_segment_withoids ORDER BY a;
3 | h | j
(3 rows)
COPY (select * from test_copy_on_segment) TO '/tmp/invalid_filename_select.txt' ON SEGMENT;
ERROR: <SEGID> is required for file name (seg0 127.0.0.1:25432 pid=22593)
COPY (select * from test_copy_on_segment) TO '/tmp/valid_filename_select<SEGID>.txt' ON SEGMENT;
COPY (select * from test_copy_on_segment) TO '/tmp/valid_filename_select<SEGID>.bin' ON SEGMENT BINARY;
COPY test_copy_on_segment TO '/tmp/valid_filename_select<SEGID>.csv' WITH ON SEGMENT CSV QUOTE '"' FORCE QUOTE a,b,c ESCAPE E'\\' NULL '\N' DELIMITER ',' HEADER;
COPY test_copy_on_segment_withoids TO '/tmp/withoids_valid_filename_select<SEGID>.csv' WITH ON SEGMENT OIDS CSV QUOTE '"' FORCE QUOTE a,b,c ESCAPE E'\\' NULL '\N' DELIMITER ',';
delete from test_copy_from_on_segment_txt;
COPY test_copy_from_on_segment_txt FROM '/tmp/invalid_filename_select.txt' ON SEGMENT;
ERROR: <SEGID> is required for file name (seg0 127.0.0.1:25432 pid=22593)
COPY test_copy_from_on_segment_txt FROM '/tmp/valid_filename_select<SEGID>.txt' ON SEGMENT;
SELECT * FROM test_copy_from_on_segment_txt ORDER BY a;
a | b | c
---+---+---
1 | s | d
2 | f | g
3 | h | j
4 | i | l
5 | q | w
(5 rows)
delete from test_copy_from_on_segment_binary;
COPY test_copy_from_on_segment_binary FROM '/tmp/valid_filename_select<SEGID>.bin' ON SEGMENT BINARY;
SELECT * FROM test_copy_from_on_segment_binary ORDER BY a;
a | b | c
---+---+---
1 | s | d
2 | f | g
3 | h | j
4 | i | l
5 | q | w
(5 rows)
delete from test_copy_from_on_segment_csv;
COPY test_copy_from_on_segment_csv FROM '/tmp/valid_filename_select<SEGID>.csv' WITH ON SEGMENT CSV QUOTE '"' ESCAPE E'\\' NULL '\N' DELIMITER ',' HEADER;
SELECT * FROM test_copy_from_on_segment_csv ORDER BY a;
a | b | c
---+---+---
1 | s | d
2 | f | g
3 | h | j
4 | i | l
5 | q | w
(5 rows)
delete from test_copy_from_on_segment_withoids;
COPY test_copy_from_on_segment_withoids FROM '/tmp/withoids_valid_filename_select<SEGID>.csv' WITH ON SEGMENT OIDS CSV QUOTE '"' ESCAPE E'\\' NULL '\N' DELIMITER ',';
SELECT * FROM test_copy_from_on_segment_withoids ORDER BY a;
a | b | c
---+---+---
1 | s | d
2 | f | g
3 | h | j
(3 rows)
CREATE TABLE onek_copy_onsegment (
unique1 int4,
unique2 int4,
......@@ -929,6 +1004,20 @@ SELECT count(*) FROM onek_copy_from_onsegment;
1000
(1 row)
COPY (select * from onek_copy_onsegment) TO '/tmp/valid_filename_onek_copy_onsegment_select<SEGID>.txt' ON SEGMENT;
delete from onek_copy_from_onsegment;
COPY onek_copy_from_onsegment FROM '/tmp/valid_filename_onek_copy_onsegment_select<SEGID>.txt' ON SEGMENT;
SELECT * FROM onek_copy_onsegment EXCEPT SELECT * FROM onek_copy_from_onsegment;
unique1 | unique2 | two | four | ten | twenty | hundred | thousand | twothousand | fivethous | tenthous | odd | even | stringu1 | stringu2 | string4
---------+---------+-----+------+-----+--------+---------+----------+-------------+-----------+----------+-----+------+----------+----------+---------
(0 rows)
SELECT count(*) FROM onek_copy_from_onsegment;
count
-------
1000
(1 row)
CREATE EXTERNAL WEB TABLE rm_copy_onsegment_files (a int)
EXECUTE E'(rm -rf /tmp/*valid_filename*.*)'
ON SEGMENT 0
......@@ -959,6 +1048,9 @@ DROP TABLE IF EXISTS LINEITEM_2;
DROP TABLE IF EXISTS LINEITEM_3;
DROP TABLE IF EXISTS LINEITEM_4;
DROP TABLE IF EXISTS LINEITEM_5;
DROP TABLE IF EXISTS LINEITEM_6;
DROP TABLE IF EXISTS LINEITEM_7;
DROP TABLE IF EXISTS LINEITEM_8;
-- end_ignore
CREATE TABLE LINEITEM ( L_ORDERKEY INTEGER NOT NULL,
L_PARTKEY INTEGER NOT NULL,
......@@ -986,6 +1078,9 @@ CREATE TABLE LINEITEM_2 (LIKE LINEITEM);
CREATE TABLE LINEITEM_3 (LIKE LINEITEM);
CREATE TABLE LINEITEM_4 (LIKE LINEITEM);
CREATE TABLE LINEITEM_5 (LIKE LINEITEM);
CREATE TABLE LINEITEM_6 (LIKE LINEITEM);
CREATE TABLE LINEITEM_7 (LIKE LINEITEM);
CREATE TABLE LINEITEM_8 (LIKE LINEITEM);
COPY LINEITEM FROM '@abs_srcdir@/data/lineitem.csv' WITH DELIMITER '|' CSV;
SELECT COUNT(*) FROM LINEITEM;
count
......@@ -995,6 +1090,7 @@ SELECT COUNT(*) FROM LINEITEM;
COPY LINEITEM TO '/tmp/lineitem.csv' CSV;
COPY LINEITEM TO '/tmp/lineitem_s<SEGID>.csv' ON SEGMENT CSV;
COPY (select * from LINEITEM) TO '/tmp/lineitem_qs<SEGID>.csv' ON SEGMENT CSV;
COPY LINEITEM_1 FROM '/tmp/lineitem.csv' CSV;
SELECT COUNT(*) FROM LINEITEM_1;
count
......@@ -1019,8 +1115,7 @@ SELECT * FROM LINEITEM EXCEPT SELECT * FROM LINEITEM_2;
------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------
(0 rows)
COPY LINEITEM TO PROGRAM 'cat > /tmp/lineitem_program.csv' CSV;
COPY LINEITEM_3 FROM PROGRAM 'cat /tmp/lineitem_program.csv' CSV;
COPY LINEITEM_3 FROM '/tmp/lineitem_qs<SEGID>.csv' ON SEGMENT CSV;
SELECT COUNT(*) FROM LINEITEM_3;
count
-------
......@@ -1032,8 +1127,8 @@ SELECT * FROM LINEITEM EXCEPT SELECT * FROM LINEITEM_3;
------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------
(0 rows)
COPY LINEITEM TO PROGRAM 'cat > /tmp/lineitem_program<SEGID>.csv' ON SEGMENT CSV;
COPY LINEITEM_4 FROM PROGRAM 'cat /tmp/lineitem_program<SEGID>.csv' ON SEGMENT CSV;
COPY LINEITEM TO PROGRAM 'cat > /tmp/lineitem_program.csv' CSV;
COPY LINEITEM_4 FROM PROGRAM 'cat /tmp/lineitem_program.csv' CSV;
SELECT COUNT(*) FROM LINEITEM_4;
count
-------
......@@ -1045,8 +1140,8 @@ SELECT * FROM LINEITEM EXCEPT SELECT * FROM LINEITEM_4;
------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------
(0 rows)
\COPY LINEITEM TO PROGRAM 'cat > /tmp/lineitem_program_client.csv' CSV;
\COPY LINEITEM_5 FROM PROGRAM 'cat /tmp/lineitem_program_client.csv' CSV;
COPY (select * from LINEITEM) TO PROGRAM 'cat > /tmp/lineitem_program.csv' CSV;
COPY LINEITEM_5 FROM PROGRAM 'cat /tmp/lineitem_program.csv' CSV;
SELECT COUNT(*) FROM LINEITEM_5;
count
-------
......@@ -1058,6 +1153,45 @@ SELECT * FROM LINEITEM EXCEPT SELECT * FROM LINEITEM_5;
------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------
(0 rows)
COPY LINEITEM TO PROGRAM 'cat > /tmp/lineitem_program<SEGID>.csv' ON SEGMENT CSV;
COPY LINEITEM_6 FROM PROGRAM 'cat /tmp/lineitem_program<SEGID>.csv' ON SEGMENT CSV;
SELECT COUNT(*) FROM LINEITEM_6;
count
-------
57190
(1 row)
SELECT * FROM LINEITEM EXCEPT SELECT * FROM LINEITEM_6;
l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment
------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------
(0 rows)
COPY (select * from LINEITEM) TO PROGRAM 'cat > /tmp/lineitem_program<SEGID>.csv' ON SEGMENT CSV;
COPY LINEITEM_7 FROM PROGRAM 'cat /tmp/lineitem_program<SEGID>.csv' ON SEGMENT CSV;
SELECT COUNT(*) FROM LINEITEM_7;
count
-------
57190
(1 row)
SELECT * FROM LINEITEM EXCEPT SELECT * FROM LINEITEM_7;
l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment
------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------
(0 rows)
\COPY LINEITEM TO PROGRAM 'cat > /tmp/lineitem_program_client.csv' CSV;
\COPY LINEITEM_8 FROM PROGRAM 'cat /tmp/lineitem_program_client.csv' CSV;
SELECT COUNT(*) FROM LINEITEM_8;
count
-------
57190
(1 row)
SELECT * FROM LINEITEM EXCEPT SELECT * FROM LINEITEM_8;
l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment
------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------
(0 rows)
--Test for `COPY FROM ON SEGMENT` checking the distribution key restriction
-- start_matchsubs
-- m/^CONTEXT: COPY .*, line \d*: .*$/
......@@ -1219,7 +1353,9 @@ HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sur
COPY COPY_TO_PROGRAM_ERROR TO PROGRAM 'echo && echo "error" >&2 && exit 255';
ERROR: command error message: error
COPY COPY_TO_PROGRAM_ERROR TO PROGRAM 'echo <SEGID>&& echo "error" >&2 && exit 255' on segment;
ERROR: command error message: error (seg1 127.0.0.1:40001 pid=23883)
ERROR: command error message: error (seg0 127.0.0.1:25432 pid=23338)
COPY (SELECT * FROM COPY_TO_PROGRAM_ERROR) TO PROGRAM 'echo <SEGID>&& echo "error" >&2 && exit 255' on segment;
ERROR: command error message: error (seg0 127.0.0.1:25432 pid=23338)
CREATE TABLE COPY_FROM_PROGRAM_ERROR(a int);
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Greenplum Database data distribution key for this table.
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册