From f71749667c94415b9ef2cf761ef365ed00937a3a Mon Sep 17 00:00:00 2001 From: xiaoxiao <53000479+xiaoxiaoHe-E@users.noreply.github.com> Date: Mon, 9 Nov 2020 15:47:26 +0800 Subject: [PATCH] compatible gpload (#11103) * refactor gpload test file TEST.py 1. migrate gpload test to pytest 2. new function to form config file through yaml package and make it more reasonable 3. add a case to cover gpload update_condition arggument * migrate gpload and TEST.py to python3.6 new test case 43 to test gpload behavior when column name has capital letters and without data type change some ans file since psql react different * change sql to find reuseable external table to make gpload compatible in gp7 and gp6 better TEST.py to write config file with ruamel.yaml moudle Co-authored-by: XiaoxiaoHe --- gpMgmt/bin/gpload.py | 256 ++-- gpMgmt/bin/gpload_test/.gitignore | 1 + gpMgmt/bin/gpload_test/gpload2/TEST.py | 1112 +++++++++-------- gpMgmt/bin/gpload_test/gpload2/conftest.py | 15 + .../gpload2/data/external_file_03.txt | 2 +- gpMgmt/bin/gpload_test/gpload2/query37.ans | 46 +- gpMgmt/bin/gpload_test/gpload2/query42.ans | 21 + gpMgmt/bin/gpload_test/gpload2/query43.ans | 18 + 8 files changed, 853 insertions(+), 618 deletions(-) create mode 100644 gpMgmt/bin/gpload_test/gpload2/conftest.py create mode 100644 gpMgmt/bin/gpload_test/gpload2/query42.ans create mode 100644 gpMgmt/bin/gpload_test/gpload2/query43.ans diff --git a/gpMgmt/bin/gpload.py b/gpMgmt/bin/gpload.py index f268172825..3025ef4629 100755 --- a/gpMgmt/bin/gpload.py +++ b/gpMgmt/bin/gpload.py @@ -22,8 +22,8 @@ Options: --version: print version number and exit -?: help ''' - import sys +import yaml if sys.hexversion<0x2040400: sys.stderr.write("gpload needs python 2.4.4 or higher\n") sys.exit(2) @@ -35,9 +35,16 @@ except ImportError: sys.exit(2) import platform + try: import pg +except ImportError: + try: + from pygresql import pg + except Exception as e: + pass except Exception as e: + print(repr(e)) errorMsg = "gpload was unable to import The PyGreSQL Python module (pg.py) - %s\n" % str(e) sys.stderr.write(str(errorMsg)) errorMsg = "Please check if you have the correct Visual Studio redistributable package installed.\n" @@ -66,6 +73,12 @@ else: if windowsPlatform == False: import select +from sys import version_info +if version_info.major == 2 : + import __builtin__ + long = __builtin__.long +else: + long = int EXECNAME = 'gpload' @@ -135,11 +148,11 @@ valid_tokens = { "schema": {'parse_children': False, 'parent': 'external'}} _abbrevs = [ - (1<<50, ' PB'), - (1<<40, ' TB'), - (1<<30, ' GB'), - (1<<20, ' MB'), - (1<<10, ' kB'), + (long(1<<50), ' PB'), + (long(1<<40), ' TB'), + (long(1<<30), ' GB'), + (long(1<<20), ' MB'), + (long(1<<10), ' kB'), (1, ' bytes') ] @@ -739,7 +752,7 @@ def bytestr(size, precision=1): if size >= factor: break - float_string_split = repr(size/float(factor)).split('.') + float_string_split = "size/float(factor)".split('.') integer_part = float_string_split[0] decimal_part = float_string_split[1] if int(decimal_part[0:precision]): @@ -890,7 +903,7 @@ def cli_help(): #============================================================ def usage(error = None): - print((cli_help() or __doc__)) + print (cli_help() or __doc__) sys.stdout.flush() if error: sys.stderr.write('ERROR: ' + error + '\n') @@ -1332,6 +1345,7 @@ class gpload: [datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S'), self.elevel2str(level), a]) + '\n' + #str = str.encode('utf-8') except Exception as e: # log even if contains non-utf8 data and pass this exception self.logfile.write("\nWarning: Log() threw an exception: %s \n" % (e)) @@ -1464,9 +1478,11 @@ class gpload: if not self.options.U: self.options.U = os.environ.get('PGUSER') if not self.options.U: - self.options.U = os.environ.get('USER') or \ - os.environ.get('LOGNAME') or \ - os.environ.get('USERNAME') + self.options.U = getpass.getuser() + self.log(self.INFO, "no user supplied, defaulting to "+self.options.U) + #self.options.U = os.environ.get('USER') or \ + # os.environ.get('LOGNAME') or \ + # os.environ.get('USERNAME') if not self.options.U or len(self.options.U) == 0: self.log(self.ERROR, @@ -1647,8 +1663,6 @@ class gpload: availablePorts = set(range(1,65535)) found_source = False - self.getconfig('gpload:input', list) - while 1: sourceIndex += 1 name = 'gpload:input:source(%d)'%sourceIndex @@ -1856,6 +1870,9 @@ class gpload: self.options.p)) def read_columns(self): + ''' + get from columns + ''' columns = self.getconfig('gpload:input:columns',list,None, returnOriginal=True) if columns != None: self.from_cols_from_user = True # user specified from columns @@ -1868,7 +1885,7 @@ class gpload: """ remove leading or trailing spaces """ d = { tempkey.strip() : value } key = list(d.keys())[0] - if d[key] is None: + if d[key] is None or not d[key]: self.log(self.DEBUG, 'getting source column data type from target') for name, typ, mapto, hasseq in self.into_columns: @@ -1884,7 +1901,7 @@ class gpload: # Mark this column as having no mapping, which is important # for do_insert() - self.from_columns.append([quote_ident(key),d[key].lower(),None, False]) + self.from_columns.append([key,d[key].lower(),None, False]) else: self.from_columns = self.into_columns self.from_cols_from_user = False @@ -1903,6 +1920,9 @@ class gpload: def read_table_metadata(self): + ''' + get into columns list like: [column name, column data type, mapping target, has_sequence(bool)] + ''' # KAS Note to self. If schema is specified, then probably should use PostgreSQL rules for defining it. # find the shema name for this table (according to search_path) @@ -1956,7 +1976,12 @@ class gpload: name = row['column_name'] name = quote_ident(name) has_seq = row['has_sequence'] - i = [name,ct,None, has_seq] + if has_seq == str('f') or has_seq==False: + has_seq_bool = False + if has_seq == str('t') or has_seq==True: + has_sql_bool = True + i = [name,ct,None, has_seq_bool] + # i: [column name, column data type, mapping target, has_sequence] self.into_columns.append(i) self.into_columns_dict[name] = i self.log(self.DEBUG, "found input column: " + str(i)) @@ -1976,6 +2001,10 @@ class gpload: self.log(self.ERROR, 'table %s.%s does not exist in database %s'% (tableSchema, tableName, self.options.d)) def read_mapping(self): + ''' + get mapping for into_colums and record the mapping at into_columns[2]. + if no mapping in cofig file, this function will get mapping from from_columns + ''' mapping = self.getconfig('gpload:output:mapping',dict,None, returnOriginal=True) if mapping: @@ -2011,15 +2040,18 @@ class gpload: for name,typ,mapto,seq in self.into_columns: self.log(self.DEBUG,'%s: %s = %s'%(name,typ,mapto)) - # In order to find out whether we have an existing external table in the - # catalog which could be reused for this operation we need to make sure - # that it has the same column names and types, the same data format, and - # location specification, and single row error handling specs. - # - # This function will return the SQL to run in order to find out whether - # such a table exists. - # + def get_reuse_exttable_query(self, formatType, formatOpts, limitStr, from_cols, schemaName, log_errors, encodingCode): + ''' + In order to find out whether we have an existing external table in the + catalog which could be reused for this operation we need to make sure + that it has the same column names and types, the same data format, and + location specification, and single row error handling specs. + + Return: + SQL to run in order to find out whether such a table exists. + ''' + sqlFormat = """select attrelid::regclass from ( select @@ -2035,7 +2067,7 @@ class gpload: on (pg_class.oid = attrelid) %s where - relkind = 'f' and + relkind = '%s' and relname like 'ext_gpload_reusable_%%' and attnum > 0 and not attisdropped and %s @@ -2045,6 +2077,7 @@ class gpload: on(pgattr.attrelid = pgext.reloid) """ joinStr = "" + relkind = "" conditionStr = "" # if schemaName is None, find the resuable ext table which is visible to @@ -2059,8 +2092,11 @@ class gpload: on(pg_class.relnamespace = pgns.oid) """ conditionStr = "pgns.nspname = '%s'" % schemaName - - sql = sqlFormat % (joinStr, conditionStr) + if noGpVersion or self.gpdb_version < "7.0.0": + relkind='r' + else: + relkind='f' + sql = sqlFormat % (joinStr, relkind, conditionStr) if noGpVersion or self.gpdb_version < "6.0.0": if log_errors: @@ -2106,28 +2142,30 @@ class gpload: self.log(self.DEBUG, "query used to identify reusable external relations: %s" % sql) return sql - # Fast path to find out whether we have an existing external table in the - # catalog which could be reused for this operation. we only make sure the - # location, data format and error limit are same. we don't check column - # names and types - # - # This function will return the SQL to run in order to find out whether - # such a table exists. The results of this SQl are table names without schema - # - def get_fast_match_exttable_query(self, formatType, formatOpts, limitStr, schemaName, log_errors, encodingCode): + def get_fast_match_exttable_query(self, formatType, formatOpts, limitStr, schemaName, log_errors, encodingCode): + ''' + Fast path to find out whether we have an existing external table in the + catalog which could be reused for this operation. we only make sure the + location, data format and error limit are same. we don't check column + names and types + + Return: SQL to run in order to find out whether + such a table exists. The results of this SQl are table names without schema + ''' sqlFormat = """select relname from pg_class join pg_exttable pgext on(pg_class.oid = pgext.reloid) %s where - relkind = 'f' and + relkind = '%s' and relname like 'ext_gpload_reusable_%%' and %s """ joinStr = "" + relkind = "" conditionStr = "" # if schemaName is None, find the resuable ext table which is visible to @@ -2141,8 +2179,11 @@ class gpload: pg_namespace pgns on(pg_class.relnamespace = pgns.oid)""" conditionStr = "pgns.nspname = '%s'" % schemaName - - sql = sqlFormat % (joinStr, conditionStr) + if noGpVersion or self.gpdb_version < "7.0.0": + relkind='r' + else: + relkind='f' + sql = sqlFormat % (joinStr, relkind, conditionStr) if noGpVersion or self.gpdb_version < "6.0.0": if log_errors: @@ -2175,15 +2216,19 @@ class gpload: self.log(self.DEBUG, "query used to fast match external relations:\n %s" % sql) return sql - # - # Create a string from the following conditions to reuse staging table: - # 1. same target table - # 2. same number of columns - # 3. same names and types, in the same order - # 4. same distribution key (according to columns' names and their order) - # + def get_staging_conditions_string(self, target_table_name, staging_cols, distribution_cols): - + ''' + Create a string from the following conditions to reuse staging table: + 1. same target table + 2. same number of columns + 3. same names and types, in the same order + 4. same distribution key (according to columns' names and their order) + + Return: + string (target_table_name:columns_num:staging_cols_str:distribution_cols_str) + ''' + columns_num = len(staging_cols) staging_cols_str = '-'.join(['%s-%s' % (quote(quote_unident(col[0])), quote(col[1])) for col in staging_cols]) @@ -2193,13 +2238,16 @@ class gpload: return '%s:%s:%s:%s' % (target_table_name, columns_num, staging_cols_str, distribution_cols_str) - # - # This function will return the SQL to run in order to find out whether - # we have an existing staging table in the catalog which could be reused for this - # operation, according to the method and the encoding conditions. - # + def get_reuse_staging_table_query(self, encoding_conditions): - + ''' + This function will return the SQL to run in order to find out whether + we have an existing staging table in the catalog which could be reused for this + operation, according to the method and the encoding conditions. + + return: + sql(string) + ''' sql = """SELECT oid::regclass FROM pg_class WHERE relname = 'staging_gpload_reusable_%s';""" % (encoding_conditions) @@ -2207,10 +2255,9 @@ class gpload: self.log(self.DEBUG, "query used to identify reusable temporary relations: %s" % sql) return sql - # - # get oid for table from pg_class, None if not exist - # + def get_table_oid(self, tableName): + '''get oid for table from pg_class, None if not exist''' if tableName: sql = "select %s::regclass::oid" % quote(quote_unident(tableName)) try: @@ -2221,6 +2268,9 @@ class gpload: return None def get_ext_schematable(self, schemaName, tableName): + ''' + return formated table name + ''' if schemaName is None: return tableName else: @@ -2228,6 +2278,9 @@ class gpload: return schemaTable def get_external_table_formatOpts(self, option, specify=''): + ''' + add option, specify to self.formatOpts for creating external table + ''' formatType = self.getconfig('gpload:input:format', str, 'text').lower() if formatType == 'text': @@ -2272,19 +2325,16 @@ class gpload: else: self.control_file_warning(option +''' must be single ASCII character, you can also use unprintable characters(for example: '\\x1c' / E'\\x1c' or '\\u001c' / E'\\u001c' ''') self.control_file_error("Invalid option, gpload quit immediately") - sys.exit(2); + sys.exit(2) else: self.formatOpts += "%s '%s' " % (specify_str, val) - # - # Create a new external table or find a reusable external table to use for this operation - # def create_external_table(self): - - # extract all control file information and transform it accordingly - # in order to construct a CREATE EXTERNAL TABLE statement if will be - # needed later on + ''' + extract all control file information and transform it accordingly, + create a new external table or find a reusable external table to use for this operation or later + ''' formatType = self.getconfig('gpload:input:format', str, 'text').lower() locationStr = ','.join(map(quote,self.locations)) @@ -2298,7 +2348,7 @@ class gpload: elif formatType=='csv': self.formatOpts += "null '' " else: - self.formatOpts += "null %s " % quote_no_slash(r"\N") + self.formatOpts += "null %s " % quote_no_slash("\\N") esc = self.getconfig('gpload:input:escape', None, None) @@ -2323,6 +2373,7 @@ class gpload: if self.getconfig('gpload:input:header',bool,False): self.formatOpts += "header " + ### should be true or false force_not_null_columns = self.getconfig('gpload:input:force_not_null',list,[]) if force_not_null_columns: for i in force_not_null_columns: @@ -2390,6 +2441,7 @@ class gpload: self.extSchemaTable = self.get_ext_schematable(quote_unident(self.extSchemaName), self.extTableName) self.log(self.INFO, "reusing external staging table %s" % self.extSchemaTable) return + # staging table is not specified, we need to find it manually else: # process the single quotes in order to successfully find an existing external table to reuse. self.formatOpts = self.formatOpts.replace("E'\\''","'\''") @@ -2442,9 +2494,9 @@ class gpload: sql += "segment reject limit %s "%limitStr try: - self.db.query(sql) + self.db.query(sql.encode('utf-8')) except Exception as e: - self.log(self.ERROR, 'could not run SQL "%s": %s' % (sql, str(e))) + self.log(self.ERROR, 'could not run SQL "%s": %s' % (sql, unicode(e))) # set up to drop the external table at the end of operation, unless user # specified the 'reuse_tables' option, in which case we don't drop @@ -2452,11 +2504,11 @@ class gpload: self.cleanupSql.append('drop external table if exists %s'%self.extSchemaTable) - # - # Create a new staging table or find a reusable staging table to use for this operation - # (only valid for update/merge operations). - # def create_staging_table(self): + ''' + Create a new staging table or find a reusable staging table to use for this operation + (only valid for update/merge operations). + ''' # make sure we set the correct distribution policy distcols = self.getconfig('gpload:output:match_columns', list) @@ -2521,7 +2573,10 @@ class gpload: def count_errors(self): - self.db.set_notice_receiver(notice_processor) + if self.gpdb_version < "7.0.0": # for gpdb6 + notice_processor(self.db.notices()) + else: + self.db.set_notice_receiver(notice_processor) if self.log_errors and not self.options.D: # make sure we only get errors for our own instance if not self.reuse_tables: @@ -2554,12 +2609,16 @@ class gpload: def do_insert(self, dest): """ Handle the INSERT case + insert data into dest table from self external table """ self.log(self.DEBUG, "into columns " + str(self.into_columns)) + # a[2] is mapping target + #cols = filter(lambda a:a[2]!=None, self.into_columns) cols = [a for a in self.into_columns if a[2]!=None] # only insert non-serial columns, unless the user told us to # insert the serials explicitly + # a[3] is has_sequence (bool) if not self.from_cols_from_user: cols = [a for a in cols if a[3] == False] @@ -2575,7 +2634,7 @@ class gpload: self.log(self.LOG, sql) if not self.options.D: try: - self.rowsInserted = self.db.query(sql) + self.rowsInserted = self.db.query(sql.encode('utf-8')) except Exception as e: # We need to be a bit careful about the error since it may contain non-unicode characters strE = e.__str__().encode().decode('unicode-escape') @@ -2592,7 +2651,14 @@ class gpload: self.create_external_table() self.do_insert(self.get_qualified_tablename()) - def map_stuff(self,config,format,index): + def map_stuff(self,config,configFormat,index): + ''' + get the config and find it in into_columns_dict, + report error if no column finded in into_columns_dict or no mapping for it. + + Return + list: [ configFormat(into_clomuns[0], into_clomuns[index]) ] + ''' lis = [] theList = self.getconfig(config,list) theList = convertListToDelimited(theList) @@ -2604,7 +2670,9 @@ class gpload: self.log(self.ERROR,'column %s in %s does not exist'%(i,config)) if not j[index]: self.log(self.ERROR,'there is no mapping from the column %s in %s'%(i,config)) - lis.append(format(j[0],j[index])) + # append ( j[0] = from_table.j[index]) + # column_name = from_table.column_name + lis.append(configFormat(j[0],j[index])) return lis def fix_update_cond(self, match): @@ -2613,7 +2681,8 @@ class gpload: def do_update(self,fromname,index): """ - UPDATE case + UPDATE case. Update into_table from staging_table + form the update sql from update_columns, match_columns and update_condition """ sql = 'update %s into_table ' % self.get_qualified_tablename() sql += 'set %s '%','.join(self.map_stuff('gpload:output:update_columns',(lambda x,y:'%s=from_table.%s' % (x, y)),index)) @@ -2626,6 +2695,7 @@ class gpload: update_condition = self.getconfig('gpload:output:update_condition', str, None) if update_condition: + ### need to optimize # # Place the table alias infront of column references. # @@ -2635,17 +2705,17 @@ class gpload: # Better lexing and parsing needs to be done here to fix all cases. # update_condition = ' ' + update_condition + ' ' - for name, type, mapto, seq in self.into_columns: + for name, colType, mapto, seq in self.into_columns: regexp = '(?<=[^\w])%s(?=[^\w])' % name self.log(self.DEBUG, 'update_condition re: ' + regexp) temp_update_condition = update_condition updateConditionList = splitIntoLiteralsAndNonLiterals(update_condition) skip = False - update_condition = '' + update_condition = """""" for uc in updateConditionList: if skip == False: - uc = re.sub(regexp, self.fix_update_cond, uc) - skip = True + uc = re.sub(regexp, self.fix_update_cond, uc) + skip = True update_condition = update_condition + uc if update_condition == temp_update_condition: # see if column can be undelimited, and try again. @@ -2663,7 +2733,7 @@ class gpload: self.log(self.LOG, sql) if not self.options.D: try: - self.rowsUpdated = self.db.query(sql) + self.rowsUpdated = self.db.query(sql.encode('utf-8')) except Exception as e: # We need to be a bit careful about the error since it may contain non-unicode characters strE = str(str(e), errors = 'ignore') @@ -2671,11 +2741,15 @@ class gpload: self.log(self.ERROR, strE + ' encountered while running ' + strF) def get_qualified_tablename(self): - + ''' + return a qualified table name from self.schema and self.table + ''' tblname = "%s.%s" % (self.schema, self.table) return tblname def get_table_dist_key(self): + ''' + ''' # NOTE: this query should be re-written better. the problem is that it is # not possible to perform a cast on a table name with spaces... if noGpVersion or self.gpdb_version < "6.0.0": @@ -2703,7 +2777,7 @@ class gpload: return attrs def table_supports_update(self): - """Columns being updated cannot appear in the distribution key.""" + """ Check wether columns being updated are distribution key.""" distKeyList = self.get_table_dist_key() distkey = set() for dk in distKeyList: @@ -2762,7 +2836,7 @@ class gpload: self.log(self.LOG, sql) if not self.options.D: try: - self.rowsInserted = self.db.query(sql) + self.rowsInserted = self.db.query(sql.encode('utf-8')) except Exception as e: # We need to be a bit careful about the error since it may contain non-unicode characters strE = str(str(e), errors = 'ignore') @@ -2775,11 +2849,19 @@ class gpload: if not self.options.D: try: truncateSQLtext = "truncate %s" % tblname - self.db.query(truncateSQLtext) + self.db.query(truncateSQLtext.encode('utf-8')) except Exception as e: self.log(self.ERROR, 'could not execute truncate target %s: %s' % (tblname, str(e))) def do_method(self): + ''' + setup gpload config, + start a transaction + execute the 'before sql', + do method (insert upade, merge) accordingly, + execute the 'after sql' + ''' + # Is the table to be truncated before the load? preload = self.getconfig('gpload:preload', list, default=None) method = self.getconfig('gpload:output:mode', str, 'insert').lower() @@ -2820,7 +2902,7 @@ class gpload: self.log(self.LOG, "Pre-SQL from user: %s" % before) if not self.options.D: try: - self.db.query(before) + self.db.query(before.encode('utf-8')) except Exception as e: self.log(self.ERROR, 'could not execute SQL in sql:before "%s": %s' % (before, str(e))) @@ -2843,7 +2925,7 @@ class gpload: self.log(self.LOG, "Post-SQL from user: %s" % after) if not self.options.D: try: - self.db.query(after) + self.db.query(after.encode('utf-8')) except Exception as e: self.log(self.ERROR, 'could not execute SQL in sql:after "%s": %s' % (after, str(e))) diff --git a/gpMgmt/bin/gpload_test/.gitignore b/gpMgmt/bin/gpload_test/.gitignore index 97f8f218f1..c25d2e9ca2 100644 --- a/gpMgmt/bin/gpload_test/.gitignore +++ b/gpMgmt/bin/gpload_test/.gitignore @@ -9,3 +9,4 @@ gpstringsubs.pl gpdiff.pl atmsort.pm explain.pm +data/large_file.csv diff --git a/gpMgmt/bin/gpload_test/gpload2/TEST.py b/gpMgmt/bin/gpload_test/gpload2/TEST.py index 9419469e63..460f8540a8 100755 --- a/gpMgmt/bin/gpload_test/gpload2/TEST.py +++ b/gpMgmt/bin/gpload_test/gpload2/TEST.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python3 +#!/usr/bin/env pytest import unittest import sys @@ -9,17 +9,37 @@ import socket import fileinput import platform import re -import subprocess -import pg +#import yaml +import ruamel.yaml +import pytest + +try: + import subprocess32 as subprocess +except: + import subprocess +try: + import pg +except ImportError: + try: + from pygresql import pg + except Exception as e: + pass +except Exception as e: + print(repr(e)) + errorMsg = "gpload was unable to import The PyGreSQL Python module (pg.py) - %s\n" % str(e) + sys.stderr.write(str(errorMsg)) + errorMsg = "Please check if you have the correct Visual Studio redistributable package installed.\n" + sys.stderr.write(str(errorMsg)) + sys.exit(2) def get_port_from_conf(): file = os.environ.get('MASTER_DATA_DIRECTORY')+'/postgresql.conf' if os.path.isfile(file): with open(file) as f: - for line in f: - match = re.search('port=\d+',line) + for line in f.xreadlines(): + match = re.search(r'port=\d+',line) if match: - match1 = re.search('\d+', match.group()) + match1 = re.search(r'\d+', match.group()) if match1: return match1.group() @@ -48,7 +68,7 @@ def getPortMasterOnly(host = 'localhost',master_value = None, user = os.environ.get('USER'),gphome = os.environ['GPHOME'], mdd=os.environ['MASTER_DATA_DIRECTORY'],port = os.environ['PGPORT']): - master_pattern = "Context:\s*-1\s*Value:\s*\d+" + master_pattern = r"Context:\s*-1\s*Value:\s*\d+" command = "gpconfig -s %s" % ( "port" ) cmd = "source %s/greenplum_path.sh; export MASTER_DATA_DIRECTORY=%s; export PGPORT=%s; %s" \ @@ -56,7 +76,10 @@ def getPortMasterOnly(host = 'localhost',master_value = None, (ok,out) = run(cmd) if not ok: - raise Exception("Unable to connect to segment server %s as user %s" % (host, user)) + cmd = "python %s/bin/gpconfig -s port"%(gphome) + (ok,out) = run(cmd) + if not ok: + raise Exception("Unable to connect to segment server %s as user %s" % (host, user)) for line in out: out = line.decode().split('\n') @@ -95,111 +118,122 @@ d = mkpath('config') if not os.path.exists(d): os.mkdir(d) -def write_config_file(mode='insert', reuse_flag='',columns_flag='0',mapping='0',portNum='8081',database='reuse_gptest',host='localhost',formatOpts='text',file='data/external_file_01.txt',table='texttable',format='text',delimiter="'|'",escape='',quote='',truncate='False',log_errors=None, error_limit='0',error_table=None,externalSchema=None,staging_table=None,fast_match='false', encoding=None, preload=True, fill=False, config='config/config_file', match_columns='true', update_columns='n2'): +def write_config_file(config='config/config_file',file='data/external_file_01.txt',input_port='8081',columns=None, format='text', log_errors=None, error_limit=None, delimiter='|', + encoding=None, escape=None, null_as=None, fill_missing_fields=None, quote=None, table='texttable', mode='insert', update_columns=['n2'],update_condition=None, match_columns=['n1','s1','s2'], staging_table=None, + mapping=None, externalSchema=None, preload=True, truncate=False, reuse_tables=True, fast_match=None,sql=False, before=None, after=None + , error_table=None): + ''' - f = open(mkpath(config),'w') - f.write("VERSION: 1.0.0.1") - if database: - f.write("\nDATABASE: "+database) - f.write("\nUSER: "+os.environ.get('USER')) - f.write("\nHOST: "+hostNameAddrs) - f.write("\nPORT: "+masterPort) - f.write("\nGPLOAD:") - f.write("\n INPUT:") - f.write("\n - SOURCE:") - f.write("\n LOCAL_HOSTNAME:") - f.write("\n - "+hostNameAddrs) - if portNum: - f.write("\n PORT: "+portNum) - f.write("\n FILE:") - f.write("\n - "+mkpath(file)) - if columns_flag=='1': - f.write("\n - COLUMNS:") - f.write("\n - s_s1: text") - f.write("\n - s_s2: text") - f.write("\n - s_dt: timestamp") - f.write("\n - s_s3: text") - f.write("\n - s_n1: smallint") - f.write("\n - s_n2: integer") - f.write("\n - s_n3: bigint") - f.write("\n - s_n4: decimal") - f.write("\n - s_n5: numeric") - f.write("\n - s_n6: real") - f.write("\n - s_n7: double precision") - f.write("\n - s_n8: text") - f.write("\n - s_n9: text") - if columns_flag == '2': - f.write("\n - COLUMNS:") - f.write("\n - 'Field1': bigint") - f.write("\n - 'Field#2': text") + write gpload config file according to pramarters, please see gpload document for detail information of parameters. + + Args: + config (string): config file path + columns (list): columns in source file [{"col1":"type1"},{"col2":"type2"}...] + log_errors (bool): + error_limit (int): + update_columns (list): target column names to update ['col1','col2'...], add single quottion to columns spontaneously + match_columns (list): target column name to match when update or merge ['col1','col2'...], add single quottion to columns spontaneously + mapping (dict): target_column_name: source_column_name {"tar1":"sour1","tar2":"sou2"...}, add single quottion to columns spontaneously + sql (bool): wether to execute sql, False for default + external (bool): False for default + preload (bool): True for default + reuse_flag (bool): reuse table or not + fast_match (bool): + staging_table (sting): external_table_name + delimiter (string): add single quotation spontaneously + null_as (string): add single quotation spontaneously + + Returns: + none + ''' + S = ruamel.yaml.scalarstring.SingleQuotedScalarString # use S if you want str to be single quoted + conf = {} + conf['VERSION'] = '1.0.0.1' + conf['DATABASE'] = 'reuse_gptest' + conf['USER'] = os.environ.get('USER') #only for linux + conf['HOST'] = hostNameAddrs + conf['PORT'] = int(masterPort) + conf['GPLOAD'] = {} + + gpload_input = [] + input_source = {'LOCAL_HOSTNAME':[hostNameAddrs], + 'port':int(input_port), + 'FILE': [mkpath(file)]} + gpload_input.append({'SOURCE':input_source}) + if columns: + gpload_input.append({"COLUMNS":columns}) if format: - f.write("\n - FORMAT: "+format) + gpload_input.append({'FORMAT':format}) if log_errors: - f.write("\n - LOG_ERRORS: true") - f.write("\n - ERROR_LIMIT: " + error_limit) + gpload_input.append({'LOG_ERRORS':log_errors}) + if error_limit: + gpload_input.append({'ERROR_LIMIT':error_limit}) if error_table: - f.write("\n - ERROR_TABLE: " + error_table) - f.write("\n - ERROR_LIMIT: " + error_limit) + gpload_input.append({'ERROR_TABLE':error_table}) if delimiter: - f.write("\n - DELIMITER: "+delimiter) + gpload_input.append({'DELIMITER':S(delimiter)}) if encoding: - f.write("\n - ENCODING: "+encoding) + gpload_input.append({'ENCODING': encoding}) if escape: - f.write("\n - ESCAPE: "+escape) + gpload_input.append({'ESCAPE':escape}) + if null_as: + gpload_input.append({'NULL_AS':null_as}) + if fill_missing_fields: + gpload_input.append({'FILL_MISSING_FIELDS':fill_missing_fields}) if quote: - f.write("\n - QUOTE: "+quote) - if fill: - f.write("\n - FILL_MISSING_FIELDS: true") - f.write("\n OUTPUT:") - f.write("\n - TABLE: "+table) - if mode: - if mode == 'insert': - f.write("\n - MODE: "+'insert') - if mode == 'update': - f.write("\n - MODE: "+'update') - if mode == 'merge': - f.write("\n - MODE: "+'merge') - f.write("\n - UPDATE_COLUMNS:") - f.write("\n - "+update_columns) - if match_columns=='true': - f.write("\n - MATCH_COLUMNS:") - f.write("\n - n1") - f.write("\n - s1") - f.write("\n - s2") - if match_columns=='2': - f.write("\n - MATCH_COLUMNS:") - f.write("\n - '\"Field1\"'") - f.write("\n - '\"Field#2\"'") - if mapping=='1': - f.write("\n - MAPPING:") - f.write("\n s1: s_s1") - f.write("\n s2: s_s2") - f.write("\n dt: s_dt") - f.write("\n s3: s_s3") - f.write("\n n1: s_n1") - f.write("\n n2: s_n2") - f.write("\n n3: s_n3") - f.write("\n n4: s_n4") - f.write("\n n5: s_n5") - f.write("\n n6: s_n6") - f.write("\n n7: s_n7") - f.write("\n n8: s_n8") - f.write("\n n9: s_n9") + gpload_input.append({'QUOTE':quote}) + + conf['GPLOAD']['INPUT'] = gpload_input + if externalSchema: - f.write("\n EXTERNAL:") - f.write("\n - SCHEMA: "+externalSchema) + conf['GPLOAD']['EXTERNAL'] = [{'SCHEMA':externalSchema}] + + gpload_output = [] + gpload_output.append({'TABLE':table}) + gpload_output.append({'MODE':mode}) + if match_columns: + gpload_output.append({'MATCH_COLUMNS':match_columns}) + if update_columns: + gpload_output.append({'UPDATE_COLUMNS':update_columns}) + if update_condition: + gpload_output.append({'UPDATE_CONDITION':update_condition}) + if mapping: + mapping_quoted = {} + for key, val in mapping.items(): + mapping_quoted[S(key)]= S(val) + gpload_output.append({'MAPPING':mapping_quoted}) + conf['GPLOAD']['OUTPUT'] = gpload_output + if preload: - f.write("\n PRELOAD:") - f.write("\n - REUSE_TABLES: "+reuse_flag) - f.write("\n - FAST_MATCH: "+fast_match) + gpload_preload = [] + if truncate: + gpload_preload.append({'TRUNCATE':truncate}) + if reuse_tables: + gpload_preload.append({'REUSE_TABLES':reuse_tables}) + if fast_match: + gpload_preload.append({'FAST_MATCH':fast_match}) if staging_table: - f.write("\n - STAGING_TABLE: "+staging_table) - f.write("\n") - f.close() + gpload_preload.append({'STAGING_TABLE':staging_table}) + conf['GPLOAD']['PRELOAD'] = gpload_preload + + if sql: + gpload_sql=[] + if before: + gpload_sql.append({'BEFORE':before}) + if after: + gpload_sql.append({'AFTER':after}) + conf['GPLOAD']['SQL'] = gpload_sql + + f = open(mkpath(config),'w') + yaml = ruamel.yaml.YAML() + yaml.dump(conf, f) def runfile(ifile, flag='', dbname=None, outputPath="", outputFile="", username=None, PGOPTIONS=None, host = None, port = None): + ''' + run a file in psql + ''' if len(outputFile) == 0: (ok, out) = psql_run(ifile = ifile,ofile = outFile(ifile, outputPath),flag = flag, @@ -308,7 +342,6 @@ def gpdbAnsFile(fname): return os.path.splitext(fname)[0] + ext def isFileEqual( f1, f2, optionalFlags = "", outputPath = "", myinitfile = ""): - LMYD = os.path.abspath(os.path.dirname(__file__)) if not os.access( f1, os.R_OK ): raise Exception( 'Error: cannot find file %s' % f1 ) @@ -317,19 +350,20 @@ def isFileEqual( f1, f2, optionalFlags = "", outputPath = "", myinitfile = ""): dfile = diffFile( f1, outputPath = outputPath ) # Gets the suitePath name to add init_file suitePath = f1[0:f1.rindex( "/" )] + gphome = os.environ['GPHOME'] if os.path.exists(suitePath + "/init_file"): - (ok, out) = run('../gpdiff.pl -w ' + optionalFlags + \ - ' -I NOTICE: -I HINT: -I CONTEXT: -I GP_IGNORE: --gp_init_file=%s/global_init_file --gp_init_file=%s/init_file ' + (ok, out) = run(gphome+'/lib/postgresql/pgxs/src/test/regress/gpdiff.pl -w ' + optionalFlags + \ + ' -I NOTICE: -I HINT: -I CONTEXT: -I GP_IGNORE: -I DROP --gp_init_file=%s/global_init_file --gp_init_file=%s/init_file ' '%s %s > %s 2>&1' % (LMYD, suitePath, f1, f2, dfile)) else: if os.path.exists(myinitfile): - (ok, out) = run('../gpdiff.pl -w ' + optionalFlags + \ - ' -I NOTICE: -I HINT: -I CONTEXT: -I GP_IGNORE: --gp_init_file=%s/global_init_file --gp_init_file=%s ' + (ok, out) = run(gphome+'/lib/postgresql/pgxs/src/test/regress/gpdiff.pl -w ' + optionalFlags + \ + ' -I NOTICE: -I HINT: -I CONTEXT: -I GP_IGNORE: -I DROP --gp_init_file=%s/global_init_file --gp_init_file=%s ' '%s %s > %s 2>&1' % (LMYD, myinitfile, f1, f2, dfile)) else: - (ok, out) = run( '../gpdiff.pl -w ' + optionalFlags + \ - ' -I NOTICE: -I HINT: -I CONTEXT: -I GP_IGNORE: --gp_init_file=%s/global_init_file ' + (ok, out) = run( gphome+'/lib/postgresql/pgxs/src/test/regress/gpdiff.pl -w ' + optionalFlags + \ + ' -I NOTICE: -I HINT: -I CONTEXT: -I GP_IGNORE: -I DROP --gp_init_file=%s/global_init_file ' '%s %s > %s 2>&1' % ( LMYD, f1, f2, dfile ) ) @@ -354,7 +388,7 @@ def modify_sql_file(num): if os.path.isfile(file): for line in fileinput.FileInput(file,inplace=1): line = line.replace("gpload.py ","gpload ") - print((str(re.sub('\n','',line)))) + print (str(re.sub('\n','',line))) def copy_data(source='',target=''): cmd = 'cp '+ mkpath('data/' + source) + ' ' + mkpath(target) @@ -372,7 +406,7 @@ def get_table_name(): ) except Exception as e: errorMessage = str(e) - print(('could not connect to database: ' + errorMessage)) + print ('could not connect to database: ' + errorMessage) queryString = """SELECT relname from pg_class WHERE relname @@ -383,6 +417,7 @@ def get_table_name(): return resultList def drop_tables(): + '''drop external and staging tables''' try: db = pg.DB(dbname='reuse_gptest' ,host='localhost' @@ -390,10 +425,10 @@ def drop_tables(): ) except Exception as e: errorMessage = str(e) - print(('could not connect to database: ' + errorMessage)) + print ('could not connect to database: ' + errorMessage) - list = get_table_name() - for i in list: + tableList = get_table_name() + for i in tableList: name = i[0] match = re.search('ext_gpload',name) if match: @@ -401,7 +436,7 @@ def drop_tables(): db.query(queryString.encode('utf-8')) else: - queryString = "DROP TABLE %s" % name + queryString = "DROP TABLE %s;" % name db.query(queryString.encode('utf-8')) class PSQLError(Exception): @@ -415,401 +450,462 @@ class PSQLError(Exception): ''' pass -class GPLoad_FormatOpts_TestCase(unittest.TestCase): - - def check_result(self,ifile, optionalFlags = "-U3", outputPath = ""): - """ - PURPOSE: compare the actual and expected output files and report an - error if they don't match. - PARAMETERS: - ifile: the name of the .sql file whose actual and expected outputs - we want to compare. You may include the path as well as the - filename. This function will process this file name to - figure out the proper names of the .out and .ans files. - optionalFlags: command-line options (if any) for diff. - For example, pass " -B " (with the blank spaces) to ignore - blank lines. By default, diffs are unified with 3 lines of - context (i.e. optionalFlags is "-U3"). - """ - f1 = gpdbAnsFile(ifile) - f2 = outFile(ifile, outputPath=outputPath) - - result = isFileEqual(f1, f2, optionalFlags, outputPath=outputPath) - diff = None if result else read_diff(ifile, outputPath) - self.assertTrue(result, "query resulted in diff:\n{}".format(diff)) - - return True - - def doTest(self, num): - file = mkpath('query%d.diff' % num) - if os.path.isfile(file): - run("rm -f" + " " + file) - modify_sql_file(num) - file = mkpath('query%d.sql' % num) - runfile(file) - self.check_result(file) - - def test_00_gpload_formatOpts_setup(self): - "0 gpload setup" - for num in range(1,42): - f = open(mkpath('query%d.sql' % num),'w') - f.write("\! gpload -f "+mkpath('config/config_file')+ " -d reuse_gptest\n"+"\! gpload -f "+mkpath('config/config_file')+ " -d reuse_gptest\n") - f.close() - file = mkpath('setup.sql') - runfile(file) - self.check_result(file) - - def test_01_gpload_formatOpts_delimiter(self): - "1 gpload formatOpts delimiter '|' with reuse " - copy_data('external_file_01.txt','data_file.txt') - write_config_file(reuse_flag='true',formatOpts='text',file='data_file.txt',table='texttable',delimiter="'|'") - self.doTest(1) - - def test_02_gpload_formatOpts_delimiter(self): - "2 gpload formatOpts delimiter '\t' with reuse" - copy_data('external_file_02.txt','data_file.txt') - write_config_file(reuse_flag='true',formatOpts='text',file='data_file.txt',table='texttable',delimiter="'\t'") - self.doTest(2) - - def test_03_gpload_formatOpts_delimiter(self): - "3 gpload formatOpts delimiter E'\t' with reuse" - copy_data('external_file_02.txt','data_file.txt') - write_config_file(reuse_flag='true',formatOpts='text',file='data_file.txt',table='texttable',delimiter="E'\\t'") - self.doTest(3) - - def test_04_gpload_formatOpts_delimiter(self): - "4 gpload formatOpts delimiter E'\\u0009' with reuse" - copy_data('external_file_02.txt','data_file.txt') - write_config_file(reuse_flag='true',formatOpts='text',file='data_file.txt',table='texttable',delimiter="E'\\u0009'") - self.doTest(4) - - def test_05_gpload_formatOpts_delimiter(self): - "5 gpload formatOpts delimiter E'\\'' with reuse" - copy_data('external_file_03.txt','data_file.txt') - write_config_file(reuse_flag='true',formatOpts='text',file='data_file.txt',table='texttable',delimiter="E'\''") - self.doTest(5) - - def test_06_gpload_formatOpts_delimiter(self): - "6 gpload formatOpts delimiter \"'\" with reuse" - copy_data('external_file_03.txt','data_file.txt') - write_config_file(reuse_flag='true',formatOpts='text',file='data_file.txt',table='texttable',delimiter="\"'\"") - self.doTest(6) - - def test_07_gpload_reuse_table_insert_mode_without_reuse(self): - "7 gpload insert mode without reuse" - runfile(mkpath('setup.sql')) - f = open(mkpath('query7.sql'),'a') - f.write("\! psql -d reuse_gptest -c 'select count(*) from texttable;'") - f.close() - write_config_file(mode='insert',reuse_flag='false') - self.doTest(7) - - def test_08_gpload_reuse_table_update_mode_with_reuse(self): - "8 gpload update mode with reuse" - drop_tables() - copy_data('external_file_04.txt','data_file.txt') - write_config_file(mode='update',reuse_flag='true',file='data_file.txt') - self.doTest(8) - - def test_09_gpload_reuse_table_update_mode_without_reuse(self): - "9 gpload update mode without reuse" - f = open(mkpath('query9.sql'),'a') - f.write("\! psql -d reuse_gptest -c 'select count(*) from texttable;'\n"+"\! psql -d reuse_gptest -c 'select * from texttable where n2=222;'") - f.close() - copy_data('external_file_05.txt','data_file.txt') - write_config_file(mode='update',reuse_flag='false',file='data_file.txt') - self.doTest(9) - - def test_10_gpload_reuse_table_merge_mode_with_reuse(self): - "10 gpload merge mode with reuse " - drop_tables() - copy_data('external_file_06.txt','data_file.txt') - write_config_file('merge','true',file='data_file.txt') - self.doTest(10) - - def test_11_gpload_reuse_table_merge_mode_without_reuse(self): - "11 gpload merge mode without reuse " - copy_data('external_file_07.txt','data_file.txt') - write_config_file('merge','false',file='data_file.txt') - self.doTest(11) - - def test_12_gpload_reuse_table_merge_mode_with_different_columns_number_in_file(self): - "12 gpload merge mode with reuse (RERUN with different columns number in file) " - psql_run(cmd="ALTER TABLE texttable ADD column n8 text",dbname='reuse_gptest') - copy_data('external_file_08.txt','data_file.txt') - write_config_file('merge','true',file='data_file.txt') - self.doTest(12) - - def test_13_gpload_reuse_table_merge_mode_with_different_columns_number_in_DB(self): - "13 gpload merge mode with reuse (RERUN with different columns number in DB table) " - preTest = mkpath('pre_test_13.sql') - psql_run(preTest, dbname='reuse_gptest') - copy_data('external_file_09.txt','data_file.txt') - write_config_file('merge','true',file='data_file.txt') - self.doTest(13) - - def test_14_gpload_reuse_table_update_mode_with_reuse_RERUN(self): - "14 gpload update mode with reuse (RERUN) " - write_config_file('update','true',file='data_file.txt') - self.doTest(14) - - def test_15_gpload_reuse_table_merge_mode_with_different_columns_order(self): - "15 gpload merge mode with different columns' order " - copy_data('external_file_10.txt','data/data_file.tbl') - write_config_file('merge','true',file='data/data_file.tbl',columns_flag='1',mapping='1') - self.doTest(15) - - def test_16_gpload_formatOpts_quote(self): - "16 gpload formatOpts quote unspecified in CSV with reuse " - copy_data('external_file_11.csv','data_file.csv') - write_config_file(reuse_flag='true',formatOpts='csv',file='data_file.csv',table='csvtable',format='csv',delimiter="','") - self.doTest(16) - - def test_17_gpload_formatOpts_quote(self): - "17 gpload formatOpts quote '\\x26'(&) with reuse" - copy_data('external_file_12.csv','data_file.csv') - write_config_file(reuse_flag='true',formatOpts='csv',file='data_file.csv',table='csvtable',format='csv',delimiter="','",quote="'\x26'") - self.doTest(17) - - def test_18_gpload_formatOpts_quote(self): - "18 gpload formatOpts quote E'\\x26'(&) with reuse" - copy_data('external_file_12.csv','data_file.csv') - write_config_file(reuse_flag='true',formatOpts='csv',file='data_file.csv',table='csvtable',format='csv',delimiter="','",quote="E'\x26'") - self.doTest(18) - - def test_19_gpload_formatOpts_escape(self): - "19 gpload formatOpts escape '\\' with reuse" - copy_data('external_file_01.txt','data_file.txt') - file = mkpath('setup.sql') - runfile(file) - write_config_file(reuse_flag='true',formatOpts='text',file='data_file.txt',table='texttable',escape='\\') - self.doTest(19) - - def test_20_gpload_formatOpts_escape(self): - "20 gpload formatOpts escape '\\' with reuse" - copy_data('external_file_01.txt','data_file.txt') - write_config_file(reuse_flag='true',formatOpts='text',file='data_file.txt',table='texttable',escape= '\x5C') - self.doTest(20) - - def test_21_gpload_formatOpts_escape(self): - "21 gpload formatOpts escape E'\\\\' with reuse" - copy_data('external_file_01.txt','data_file.txt') - write_config_file(reuse_flag='true',formatOpts='text',file='data_file.txt',table='texttable',escape="E'\\\\'") - self.doTest(21) - # case 22 is flaky on concourse. It may report: Fatal Python error: GC object already tracked during testing. - # This is seldom issue. we can't reproduce it locally, so we disable it, in order to not blocking others - #def test_22_gpload_error_count(self): - # "22 gpload error count" - # f = open(mkpath('query22.sql'),'a') - # f.write("\! psql -d reuse_gptest -c 'select count(*) from csvtable;'") - # f.close() - # f = open(mkpath('data/large_file.csv'),'w') - # for i in range(0, 10000): - # if i % 2 == 0: - # f.write('1997,Ford,E350,"ac, abs, moon",3000.00,a\n') - # else: - # f.write('1997,Ford,E350,"ac, abs, moon",3000.00\n') - # f.close() - # copy_data('large_file.csv','data_file.csv') - # write_config_file(reuse_flag='true',formatOpts='csv',file='data_file.csv',table='csvtable',format='csv',delimiter="','",log_errors=True,error_limit='90000000') - # self.doTest(22) - def test_23_gpload_error_count(self): - "23 gpload error_table" - file = mkpath('setup.sql') - runfile(file) - f = open(mkpath('query23.sql'),'a') - f.write("\! psql -d reuse_gptest -c 'select count(*) from csvtable;'") - f.close() - f = open(mkpath('data/large_file.csv'),'w') - for i in range(0, 10000): - if i % 2 == 0: - f.write('1997,Ford,E350,"ac, abs, moon",3000.00,a\n') - else: - f.write('1997,Ford,E350,"ac, abs, moon",3000.00\n') - f.close() - copy_data('large_file.csv','data_file.csv') - write_config_file(reuse_flag='true',formatOpts='csv',file='data_file.csv',table='csvtable',format='csv',delimiter="','",error_table="err_table",error_limit='90000000') - self.doTest(23) - def test_24_gpload_error_count(self): - "24 gpload error count with ext schema" - file = mkpath('setup.sql') - runfile(file) - f = open(mkpath('query24.sql'),'a') - f.write("\! psql -d reuse_gptest -c 'select count(*) from csvtable;'") - f.close() - f = open(mkpath('data/large_file.csv'),'w') - for i in range(0, 10000): - if i % 2 == 0: - f.write('1997,Ford,E350,"ac, abs, moon",3000.00,a\n') - else: - f.write('1997,Ford,E350,"ac, abs, moon",3000.00\n') - f.close() - copy_data('large_file.csv','data_file.csv') - write_config_file(reuse_flag='true',formatOpts='csv',file='data_file.csv',table='csvtable',format='csv',delimiter="','",log_errors=True,error_limit='90000000',externalSchema='test') - self.doTest(24) - def test_25_gpload_ext_staging_table(self): - "25 gpload reuse ext_staging_table if it is configured" - file = mkpath('setup.sql') - runfile(file) - f = open(mkpath('query25.sql'),'a') - f.write("\! psql -d reuse_gptest -c 'select count(*) from csvtable;'") - f.close() - copy_data('external_file_13.csv','data_file.csv') - write_config_file(reuse_flag='true',formatOpts='csv',file='data_file.csv',table='csvtable',format='csv',delimiter="','",log_errors=True,error_limit='10',staging_table='staging_table') - self.doTest(25) - def test_26_gpload_ext_staging_table_with_externalschema(self): - "26 gpload reuse ext_staging_table if it is configured with externalschema" - file = mkpath('setup.sql') - runfile(file) - f = open(mkpath('query26.sql'),'a') - f.write("\! psql -d reuse_gptest -c 'select count(*) from csvtable;'") - f.close() - copy_data('external_file_13.csv','data_file.csv') - write_config_file(reuse_flag='true',formatOpts='csv',file='data_file.csv',table='csvtable',format='csv',delimiter="','",log_errors=True,error_limit='10',staging_table='staging_table',externalSchema='test') - self.doTest(26) - def test_27_gpload_ext_staging_table_with_externalschema(self): - "27 gpload reuse ext_staging_table if it is configured with externalschema" - file = mkpath('setup.sql') - runfile(file) - f = open(mkpath('query27.sql'),'a') - f.write("\! psql -d reuse_gptest -c 'select count(*) from test.csvtable;'") - f.close() - copy_data('external_file_13.csv','data_file.csv') - write_config_file(reuse_flag='true',formatOpts='csv',file='data_file.csv',table='test.csvtable',format='csv',delimiter="','",log_errors=True,error_limit='10',staging_table='staging_table',externalSchema="'%'") - self.doTest(27) - def test_28_gpload_ext_staging_table_with_dot(self): - "28 gpload reuse ext_staging_table if it is configured with dot" - file = mkpath('setup.sql') - runfile(file) - f = open(mkpath('query28.sql'),'a') - f.write("\! psql -d reuse_gptest -c 'select count(*) from test.csvtable;'") - f.close() - copy_data('external_file_13.csv','data_file.csv') - write_config_file(reuse_flag='true',formatOpts='csv',file='data_file.csv',table='test.csvtable',format='csv',delimiter="','",log_errors=True,error_limit='10',staging_table='t.staging_table') - self.doTest(28) - def test_29_gpload_reuse_table_insert_mode_with_reuse_and_null(self): - "29 gpload insert mode with reuse and null" - runfile(mkpath('setup.sql')) - f = open(mkpath('query29.sql'),'a') - f.write("\! psql -d reuse_gptest -c 'select count(*) from texttable where n2 is null;'") - f.close() - copy_data('external_file_14.txt','data_file.txt') - write_config_file(mode='insert',reuse_flag='true',file='data_file.txt',log_errors=True, error_limit='100') - self.doTest(29) - - def test_30_gpload_reuse_table_update_mode_with_fast_match(self): - "30 gpload update mode with fast match" - drop_tables() - copy_data('external_file_04.txt','data_file.txt') - write_config_file(mode='update',reuse_flag='true',fast_match='true',file='data_file.txt') - self.doTest(30) - - def test_31_gpload_reuse_table_update_mode_with_fast_match_and_different_columns_number(self): - "31 gpload update mode with fast match and differenct columns number) " - psql_run(cmd="ALTER TABLE texttable ADD column n8 text",dbname='reuse_gptest') - copy_data('external_file_08.txt','data_file.txt') - write_config_file(mode='update',reuse_flag='true',fast_match='true',file='data_file.txt') - self.doTest(31) - - def test_32_gpload_update_mode_without_reuse_table_with_fast_match(self): - "32 gpload update mode when reuse table is false and fast match is true" - drop_tables() - copy_data('external_file_08.txt','data_file.txt') - write_config_file(mode='update',reuse_flag='false',fast_match='true',file='data_file.txt') - self.doTest(32) - def test_33_gpload_reuse_table_merge_mode_with_fast_match_and_external_schema(self): - "33 gpload update mode with fast match and external schema" - file = mkpath('setup.sql') - runfile(file) - copy_data('external_file_04.txt','data_file.txt') - write_config_file(mode='merge',reuse_flag='true',fast_match='true',file='data_file.txt',externalSchema='test') - self.doTest(33) - def test_34_gpload_reuse_table_merge_mode_with_fast_match_and_encoding(self): - "34 gpload merge mode with fast match and encoding GBK" - file = mkpath('setup.sql') - runfile(file) - copy_data('external_file_04.txt','data_file.txt') - write_config_file(mode='merge',reuse_flag='true',fast_match='true',file='data_file.txt',encoding='GBK') - self.doTest(34) - - def test_35_gpload_reuse_table_merge_mode_with_fast_match_default_encoding(self): - "35 gpload does not reuse table when encoding is setted from GBK to empty" - write_config_file(mode='merge',reuse_flag='true',fast_match='true',file='data_file.txt') - self.doTest(35) - - def test_36_gpload_reuse_table_merge_mode_default_encoding(self): - "36 gpload merge mode with encoding GBK" - file = mkpath('setup.sql') - runfile(file) - copy_data('external_file_04.txt','data_file.txt') - write_config_file(mode='merge',reuse_flag='true',fast_match='false',file='data_file.txt',encoding='GBK') - self.doTest(36) - - def test_37_gpload_reuse_table_merge_mode_invalid_encoding(self): - "37 gpload merge mode with invalid encoding" - file = mkpath('setup.sql') - runfile(file) - copy_data('external_file_04.txt','data_file.txt') - write_config_file(mode='merge',reuse_flag='true',fast_match='false',file='data_file.txt',encoding='xxxx') - self.doTest(37) - - def test_38_gpload_without_preload(self): - "38 gpload insert mode without preload" - file = mkpath('setup.sql') - runfile(file) - copy_data('external_file_04.txt','data_file.txt') - write_config_file(mode='insert',reuse_flag='true',fast_match='false',file='data_file.txt',error_table="err_table",error_limit='1000',preload=False) - self.doTest(38) - - def test_39_gpload_fill_missing_fields(self): - "39 gpload fill missing fields" - file = mkpath('setup.sql') - runfile(file) - copy_data('external_file_04.txt','data_file.txt') - write_config_file(mode='insert',reuse_flag='false',fast_match='false',file='data_file.txt',table='texttable1', error_limit='1000', fill=True) - self.doTest(39) - - def test_40_gpload_merge_mode_with_multi_pk(self): - "40 gpload merge mode with multiple pk" - file = mkpath('setup.sql') - runfile(file) - copy_data('external_file_pk.txt','data_file.txt') - write_config_file(mode='merge',reuse_flag='true',fast_match='false',file='data_file.txt',table='testpk') - copy_data('external_file_pk2.txt','data_file2.txt') - write_config_file(mode='merge',reuse_flag='true',fast_match='false',file='data_file2.txt',table='testpk',config='config/config_file2') - f = open(mkpath('query40.sql'),'w') - f.write("""\! psql -d reuse_gptest -c "create table testpk (n1 integer, s1 integer, s2 varchar(128), n2 integer, primary key(n1,s1,s2))\ - partition by range (s1)\ - subpartition by list(s2)\ - SUBPARTITION TEMPLATE\ - ( SUBPARTITION usa VALUES ('usa'),\ - SUBPARTITION asia VALUES ('asia'),\ - SUBPARTITION europe VALUES ('europe'),\ - DEFAULT SUBPARTITION other_regions)\ - (start (1) end (13) every (1),\ - default partition others)\ - ;"\n""") - f.write("\! gpload -f "+mkpath('config/config_file')+ " -d reuse_gptest\n") - f.write("\! gpload -f "+mkpath('config/config_file2')+ " -d reuse_gptest\n") - f.write("\! psql -d reuse_gptest -c 'drop table testpk;'\n") - f.close() - self.doTest(40) - - def test_41_gpload_special_char(self): - "41 gpload special char" - file = mkpath('setup.sql') - runfile(file) - copy_data('external_file_15.txt','data_file.txt') - write_config_file(mode='insert',reuse_flag='true',fast_match='false', file='data_file.txt',table='testSpecialChar',columns_flag='2', delimiter=";") - copy_data('external_file_16.txt','data_file2.txt') - write_config_file(update_columns='\'"Field#2"\'',config='config/config_file2', mode='merge',reuse_flag='true',fast_match='false', file='data_file2.txt',table='testSpecialChar',columns_flag='2', delimiter=";",match_columns='2') - f = open(mkpath('query41.sql'),'a') - f.write("\! gpload -f "+mkpath('config/config_file2')+ " -d reuse_gptest\n") - f.close() - self.doTest(41) +class AnsFile(): + def __init__(self, path): + self.path = path + def __eq__(self, other): + return isFileEqual(self.path, other.path, '-U3', outputPath="") -if __name__ == '__main__': - suite = unittest.TestLoader().loadTestsFromTestCase(GPLoad_FormatOpts_TestCase) - runner = unittest.TextTestRunner(verbosity=2) - ret = not runner.run(suite).wasSuccessful() - sys.exit(ret) +def check_result(ifile, optionalFlags = "-U3", outputPath = ""): + """ + PURPOSE: compare the actual and expected output files and report an + error if they don't match. + PARAMETERS: + ifile: the name of the .sql file whose actual and expected outputs + we want to compare. You may include the path as well as the + filename. This function will process this file name to + figure out the proper names of the .out and .ans files. + optionalFlags: command-line options (if any) for diff. + For example, pass " -B " (with the blank spaces) to ignore + blank lines. By default, diffs are unified with 3 lines of + context (i.e. optionalFlags is "-U3"). + """ + f1 = gpdbAnsFile(ifile) + f1 = AnsFile(f1) + f2 = outFile(ifile, outputPath=outputPath) + f2 = AnsFile(f2) + assert f1 == f2 #, read_diff(ifile, "") + return True + +def doTest(num): + file = mkpath('query%d.diff' % num) + if os.path.isfile(file): + run("rm -f" + " " + file) + modify_sql_file(num) + file = mkpath('query%d.sql' % num) + runfile(file) + check_result(file) + +def prepare_test_file(num): + """ + initialize specific query#.sql for test case num + """ + f = open(mkpath('query%d.sql' % num),'w') + f.write("\\! gpload -f "+mkpath('config/config_file')+ " -d reuse_gptest\n"+"\\! gpload -f "+mkpath('config/config_file')+ " -d reuse_gptest\n") + f.close() + +def test_00_gpload_formatOpts_setup(): + "0 gpload setup" + """setup query.sql for all cases""" + for num in range(1,44): + f = open(mkpath('query%d.sql' % num),'w') + f.write("\\! gpload -f "+mkpath('config/config_file')+ " -d reuse_gptest\n"+"\\! gpload -f "+mkpath('config/config_file')+ " -d reuse_gptest\n") + f.close() + file = mkpath('setup.sql') + runfile(file) + check_result(file) + +def test_01_gpload_formatOpts_delimiter(): + "1 gpload formatOpts delimiter '|' with reuse " + copy_data('external_file_01.txt','data_file.txt') + write_config_file(reuse_tables=True,format='text',file='data_file.txt',table='texttable',delimiter='|') + doTest(1) + +def test_02_gpload_formatOpts_delimiter(): + "2 gpload formatOpts delimiter '\t' with reuse" + copy_data('external_file_02.txt','data_file.txt') + write_config_file(reuse_tables=True, format='text', file='data_file.txt',delimiter='\t') + doTest(2) + +def test_03_gpload_formatOpts_delimiter(): + "3 gpload formatOpts delimiter E'\t' with reuse" + copy_data('external_file_02.txt','data_file.txt') + write_config_file(reuse_tables=True,format='text',file='data_file.txt',table='texttable',delimiter="E'\\t'") + doTest(3) + +def test_04_gpload_formatOpts_delimiter(): + "4 gpload formatOpts delimiter E'\u0009' with reuse" + copy_data('external_file_02.txt','data_file.txt') + write_config_file(reuse_tables=True,format='text',file='data_file.txt',table='texttable',delimiter="E'\u0009'") + doTest(4) + +def test_05_gpload_formatOpts_delimiter(): + "5 gpload formatOpts delimiter E'\\'' with reuse" + copy_data('external_file_03.txt','data_file.txt') + write_config_file(reuse_tables=True,format='text',file='data_file.txt',table='texttable',delimiter="E'\''") + doTest(5) + +def test_06_gpload_formatOpts_delimiter(): + "6 gpload formatOpts delimiter \"'\" with reuse" + copy_data('external_file_03.txt','data_file.txt') + write_config_file(reuse_tables=True,format='text',file='data_file.txt',table='texttable',delimiter="'") + doTest(6) + +def test_07_gpload_reuse_table_insert_mode_without_reuse(): + "7 gpload insert mode without reuse" + runfile(mkpath('setup.sql')) + f = open(mkpath('query7.sql'),'a') + f.write("\\! psql -d reuse_gptest -c 'select count(*) from texttable;'") + f.close() + write_config_file(mode='insert',reuse_tables=False) + doTest(7) + +def test_08_gpload_reuse_table_update_mode_with_reuse(): + "8 gpload update mode with reuse" + drop_tables() + copy_data('external_file_04.txt','data_file.txt') + write_config_file(mode='update',reuse_tables=True,file='data_file.txt') + doTest(8) + +def test_09_gpload_reuse_table_update_mode_without_reuse(): + "9 gpload update mode without reuse" + f = open(mkpath('query9.sql'),'a') + f.write("\\! psql -d reuse_gptest -c 'select count(*) from texttable;'\n"+"\\! psql -d reuse_gptest -c 'select * from texttable where n2=222;'") + f.close() + copy_data('external_file_05.txt','data_file.txt') + write_config_file(mode='update',reuse_tables=False,file='data_file.txt') + doTest(9) + +def test_10_gpload_reuse_table_merge_mode_with_reuse(): + "10 gpload merge mode with reuse " + drop_tables() + copy_data('external_file_06.txt','data_file.txt') + write_config_file(mode='merge', reuse_tables=True, file='data_file.txt') + doTest(10) + +def test_11_gpload_reuse_table_merge_mode_without_reuse(): + "11 gpload merge mode without reuse " + copy_data('external_file_07.txt','data_file.txt') + write_config_file(mode='merge', reuse_tables=False, file='data_file.txt') + doTest(11) + +def test_12_gpload_reuse_table_merge_mode_with_different_columns_number_in_file(): + "12 gpload merge mode with reuse (RERUN with different columns number in file) " + psql_run(cmd="ALTER TABLE texttable ADD column n8 text",dbname='reuse_gptest') + copy_data('external_file_08.txt','data_file.txt') + write_config_file(mode='merge', reuse_tables=True, file='data_file.txt') + doTest(12) + +def test_13_gpload_reuse_table_merge_mode_with_different_columns_number_in_DB(): + "13 gpload merge mode with reuse (RERUN with different columns number in DB table) " + preTest = mkpath('pre_test_13.sql') + psql_run(preTest, dbname='reuse_gptest') + copy_data('external_file_09.txt','data_file.txt') + write_config_file(mode='merge', reuse_tables=True, file='data_file.txt') + doTest(13) + +def test_14_gpload_reuse_table_update_mode_with_reuse_RERUN(): + "14 gpload update mode with reuse (RERUN) " + write_config_file(mode='update', reuse_tables=True, file='data_file.txt') + doTest(14) + +def test_15_gpload_reuse_table_merge_mode_with_different_columns_order(): + "15 gpload merge mode with different columns' order " + copy_data('external_file_10.txt','data/data_file.tbl') + input_columns = [{'s_s1':'text'}, + {'s_s2':'text'}, + {'s_dt':'timestamp'}, + {'s_s3':'text'}, + {'s_n1':'smallint'}, + {'s_n2':'integer'}, + {'s_n3':'bigint'}, + {'s_n4':'decimal'}, + {'s_n5':'numeric'}, + {'s_n6':'real'}, + {'s_n7':'double precision'}, + {'s_n8':'text'}, + {'s_n9':'text'}] + output_mapping = {"s1": "s_s1", "s2": "s_s2", "dt": "s_dt", "s3": "s_s3", "n1": "s_n1", + "n2": "s_n2", "n3": "s_n3", "n4": "s_n4", "n5": "s_n5", "n6": "s_n6", "n7": "s_n7", "n8": "s_n8", "n9": "s_n9"} + write_config_file(mode='merge', reuse_tables=True, file='data/data_file.tbl',columns=input_columns,mapping=output_mapping) + doTest(15) + +def test_16_gpload_formatOpts_quote(): + "16 gpload formatOpts quote unspecified in CSV with reuse " + copy_data('external_file_11.csv','data_file.csv') + write_config_file(reuse_tables=True, format='csv', file='data_file.csv', table='csvtable',delimiter=',') + doTest(16) + +def test_17_gpload_formatOpts_quote(): + "17 gpload formatOpts quote '\\x26'(&) with reuse" + copy_data('external_file_12.csv','data_file.csv') + write_config_file(reuse_tables=True, format='csv', file='data_file.csv', table='csvtable', delimiter=',', quote='\x26') + doTest(17) + +def test_18_gpload_formatOpts_quote(): + "18 gpload formatOpts quote E'\\x26'(&) with reuse" + copy_data('external_file_12.csv','data_file.csv') + write_config_file(reuse_tables=True, format='csv', file='data_file.csv', table='csvtable', delimiter=',', quote="E'\x26'") + doTest(18) + +def test_19_gpload_formatOpts_escape(): + "19 gpload formatOpts escape '\\' with reuse" + copy_data('external_file_01.txt','data_file.txt') + file = mkpath('setup.sql') + runfile(file) + write_config_file(reuse_tables=True, format='text', file='data_file.txt',table='texttable',escape='\\') + doTest(19) + +def test_20_gpload_formatOpts_escape(): + "20 gpload formatOpts escape '\\' with reuse" + copy_data('external_file_01.txt','data_file.txt') + write_config_file(reuse_tables=True, format='text', file='data_file.txt',table='texttable',escape='\x5C') + doTest(20) + +def test_21_gpload_formatOpts_escape(): + "21 gpload formatOpts escape E'\\\\' with reuse" + copy_data('external_file_01.txt','data_file.txt') + write_config_file(reuse_tables=True, format='text', file='data_file.txt',table='texttable',escape="E'\\\\'") + doTest(21) + +# case 22 is flaky on concourse. It may report: Fatal Python error: GC object already tracked during testing. +# This is seldom issue. we can't reproduce it locally, so we disable it, in order to not blocking others +#def test_22_gpload_error_count(self): +# "22 gpload error count" +# f = open(mkpath('query22.sql'),'a') +# f.write("\\! psql -d reuse_gptest -c 'select count(*) from csvtable;'") +# f.close() +# f = open(mkpath('data/large_file.csv'),'w') +# for i in range(0, 10000): +# if i % 2 == 0: +# f.write('1997,Ford,E350,"ac, abs, moon",3000.00,a\n') +# else: +# f.write('1997,Ford,E350,"ac, abs, moon",3000.00\n') +# f.close() +# copy_data('large_file.csv','data_file.csv') +# write_config_file(reuse_flag='true',formatOpts='csv',file='data_file.csv',table='csvtable',format='csv',delimiter="','",log_errors=True,error_limit='90000000') +# self.doTest(22) + +def test_23_gpload_error_count(): + "23 gpload error_table" + file = mkpath('setup.sql') + runfile(file) + f = open(mkpath('query23.sql'),'a') + f.write("\\! psql -d reuse_gptest -c 'select count(*) from csvtable;'") + f.close() + f = open(mkpath('data/large_file.csv'),'w') + for i in range(0, 10000): + if i % 2 == 0: + f.write('1997,Ford,E350,"ac, abs, moon",3000.00,a\n') + else: + f.write('1997,Ford,E350,"ac, abs, moon",3000.00\n') + f.close() + copy_data('large_file.csv','data_file.csv') + write_config_file(reuse_tables=True, format='csv', file='data_file.csv',table='csvtable', delimiter=',', error_table="err_table", error_limit=90000000) + doTest(23) + +def test_24_gpload_error_count(): + "24 gpload error count with ext schema" + file = mkpath('setup.sql') + runfile(file) + f = open(mkpath('query24.sql'),'a') + f.write("\\! psql -d reuse_gptest -c 'select count(*) from csvtable;'") + f.close() + f = open(mkpath('data/large_file.csv'),'w') + for i in range(0, 10000): + if i % 2 == 0: + f.write('1997,Ford,E350,"ac, abs, moon",3000.00,a\n') + else: + f.write('1997,Ford,E350,"ac, abs, moon",3000.00\n') + f.close() + copy_data('large_file.csv','data_file.csv') + write_config_file(reuse_tables=True, format='csv', file='data_file.csv',table='csvtable', delimiter=',', log_errors=True, error_limit=90000000, externalSchema='test') + doTest(24) + +def test_25_gpload_ext_staging_table(): + "25 gpload reuse ext_staging_table if it is configured" + file = mkpath('setup.sql') + runfile(file) + f = open(mkpath('query25.sql'),'a') + f.write("\\! psql -d reuse_gptest -c 'select count(*) from csvtable;'") + f.close() + copy_data('external_file_13.csv','data_file.csv') + write_config_file(reuse_tables=True, format='csv', file='data_file.csv', table='csvtable', delimiter=',', log_errors=True,error_limit=10,staging_table='staging_table') + doTest(25) + +def test_26_gpload_ext_staging_table_with_externalschema(): + "26 gpload reuse ext_staging_table if it is configured with externalschema" + file = mkpath('setup.sql') + runfile(file) + f = open(mkpath('query26.sql'),'a') + f.write("\\! psql -d reuse_gptest -c 'select count(*) from csvtable;'") + f.close() + copy_data('external_file_13.csv','data_file.csv') + write_config_file(reuse_tables=True, format='csv', file='data_file.csv', table='csvtable', delimiter=',',log_errors=True,error_limit=10, staging_table='staging_table',externalSchema='test') + doTest(26) + +def test_27_gpload_ext_staging_table_with_externalschema(): + "27 gpload reuse ext_staging_table if it is configured with externalschema" + file = mkpath('setup.sql') + runfile(file) + f = open(mkpath('query27.sql'),'a') + f.write("\\! psql -d reuse_gptest -c 'select count(*) from test.csvtable;'") + f.close() + copy_data('external_file_13.csv','data_file.csv') + write_config_file(reuse_tables=True, format='csv', file='data_file.csv',table='test.csvtable',delimiter=',',log_errors=True,error_limit=10,staging_table='staging_table',externalSchema='%') + doTest(27) + +def test_28_gpload_ext_staging_table_with_dot(): + "28 gpload reuse ext_staging_table if it is configured with dot" + file = mkpath('setup.sql') + runfile(file) + f = open(mkpath('query28.sql'),'a') + f.write("\\! psql -d reuse_gptest -c 'select count(*) from test.csvtable;'") + f.close() + copy_data('external_file_13.csv','data_file.csv') + write_config_file(reuse_tables=True, file='data_file.csv',table='test.csvtable',format='csv',delimiter=',',log_errors=True,error_limit=10,staging_table='t.staging_table') + doTest(28) + +def test_29_gpload_reuse_table_insert_mode_with_reuse_and_null(): + "29 gpload insert mode with reuse and null" + runfile(mkpath('setup.sql')) + f = open(mkpath('query29.sql'),'a') + f.write("\\! psql -d reuse_gptest -c 'select count(*) from texttable where n2 is null;'") + f.close() + copy_data('external_file_14.txt','data_file.txt') + write_config_file(mode='insert', reuse_tables=True, file='data_file.txt',log_errors=True, error_limit=100) + doTest(29) + +def test_30_gpload_reuse_table_update_mode_with_fast_match(): + "30 gpload update mode with fast match" + drop_tables() + copy_data('external_file_04.txt','data_file.txt') + write_config_file(mode='update',reuse_tables=True,fast_match=True,file='data_file.txt') + doTest(30) + +def test_31_gpload_reuse_table_update_mode_with_fast_match_and_different_columns_number(): + "31 gpload update mode with fast match and differenct columns number) " + psql_run(cmd="ALTER TABLE texttable ADD column n8 text",dbname='reuse_gptest') + copy_data('external_file_08.txt','data_file.txt') + write_config_file(mode='update',reuse_tables=True,fast_match=True,file='data_file.txt') + doTest(31) + +def test_32_gpload_update_mode_without_reuse_table_with_fast_match(): + "32 gpload update mode when reuse table is false and fast match is true" + drop_tables() + copy_data('external_file_08.txt','data_file.txt') + write_config_file(mode='update',reuse_tables=False,fast_match=True,file='data_file.txt') + doTest(32) + +def test_33_gpload_reuse_table_merge_mode_with_fast_match_and_external_schema(): + "33 gpload update mode with fast match and external schema" + file = mkpath('setup.sql') + runfile(file) + copy_data('external_file_04.txt','data_file.txt') + write_config_file(mode='merge',reuse_tables=True,fast_match=True,file='data_file.txt',externalSchema='test') + doTest(33) + +def test_34_gpload_reuse_table_merge_mode_with_fast_match_and_encoding(): + "34 gpload merge mode with fast match and encoding GBK" + file = mkpath('setup.sql') + runfile(file) + copy_data('external_file_04.txt','data_file.txt') + write_config_file(mode='merge',reuse_tables=True,fast_match=True,file='data_file.txt',encoding='GBK') + doTest(34) + +def test_35_gpload_reuse_table_merge_mode_with_fast_match_default_encoding(): + "35 gpload does not reuse table when encoding is setted from GBK to empty" + write_config_file(mode='merge',reuse_tables=True,fast_match=True,file='data_file.txt') + doTest(35) + +def test_36_gpload_reuse_table_merge_mode_default_encoding(): + "36 gpload merge mode with encoding GBK" + file = mkpath('setup.sql') + runfile(file) + copy_data('external_file_04.txt','data_file.txt') + write_config_file(mode='merge',reuse_tables=True,fast_match=False,file='data_file.txt',encoding='GBK') + doTest(36) + +def test_37_gpload_reuse_table_merge_mode_invalid_encoding(): + "37 gpload merge mode with invalid encoding" + file = mkpath('setup.sql') + runfile(file) + copy_data('external_file_04.txt','data_file.txt') + write_config_file(mode='merge',reuse_tables=True,fast_match=False,file='data_file.txt',encoding='xxxx') + doTest(37) + +def test_38_gpload_without_preload(): + "38 gpload insert mode without preload" + file = mkpath('setup.sql') + runfile(file) + copy_data('external_file_04.txt','data_file.txt') + write_config_file(mode='insert',reuse_tables=True,fast_match=False,file='data_file.txt',error_table="err_table",error_limit=1000,preload=False) + doTest(38) + +def test_39_gpload_fill_missing_fields(): + "39 gpload fill missing fields" + file = mkpath('setup.sql') + runfile(file) + copy_data('external_file_04.txt','data_file.txt') + write_config_file(mode='insert',reuse_tables=False,fast_match=False,file='data_file.txt',table='texttable1', error_limit=1000, fill_missing_fields=True) + doTest(39) + +def test_40_gpload_merge_mode_with_multi_pk(): + "40 gpload merge mode with multiple pk" + file = mkpath('setup.sql') + runfile(file) + copy_data('external_file_pk.txt','data_file.txt') + write_config_file(mode='merge',reuse_tables=True,fast_match=False,file='data_file.txt',table='testpk') + #write_config_file(mode='merge',reuse_flag='true',fast_match='false',file='data_file.txt',table='testpk') + copy_data('external_file_pk2.txt','data_file2.txt') + write_config_file(mode='merge',reuse_tables=True,fast_match=False,file='data_file2.txt',table='testpk',config='config/config_file2') + # write_config_file(mode='merge',reuse_flag='true',fast_match='false',file='data_file2.txt',table='testpk',config='config/config_file2') + f = open(mkpath('query40.sql'),'w') + f.write("""\\! psql -d reuse_gptest -c "create table testpk (n1 integer, s1 integer, s2 varchar(128), n2 integer, primary key(n1,s1,s2))\ + partition by range (s1)\ + subpartition by list(s2)\ + SUBPARTITION TEMPLATE\ + ( SUBPARTITION usa VALUES ('usa'),\ + SUBPARTITION asia VALUES ('asia'),\ + SUBPARTITION europe VALUES ('europe'),\ + DEFAULT SUBPARTITION other_regions)\ + (start (1) end (13) every (1),\ + default partition others)\ + ;"\n""") + f.write("\\! gpload -f "+mkpath('config/config_file')+ " -d reuse_gptest\n") + f.write("\\! gpload -f "+mkpath('config/config_file2')+ " -d reuse_gptest\n") + f.write("\\! psql -d reuse_gptest -c 'drop table testpk;'\n") + f.close() + doTest(40) + +def test_41_gpload_special_char(): + "41 gpload special char" + file = mkpath('setup.sql') + runfile(file) + copy_data('external_file_15.txt','data_file.txt') + columns = [{'"Field1"': 'bigint'},{'"Field#2"': 'text'}] + write_config_file(mode='insert',reuse_tables=True,fast_match=False, file='data_file.txt',table='testSpecialChar', columns=columns,delimiter=";") + copy_data('external_file_16.txt','data_file2.txt') + update_columns=['"Field#2"'] + match_columns = ['"Field1"', '"Field#2"'] + write_config_file(update_columns=update_columns ,config='config/config_file2', mode='merge',reuse_tables=True,fast_match=False, file='data_file2.txt',table='testSpecialChar',columns=columns, delimiter=';',match_columns=match_columns) + f = open(mkpath('query41.sql'),'a') + f.write("\\! gpload -f "+mkpath('config/config_file2')+ " -d reuse_gptest\n") + f.close() + doTest(41) + +def test_42_gpload_update_condition(): + "42 gpload update condition" + file = mkpath('setup.sql') + runfile(file) + copy_data('external_file_01.txt','data_file.txt') + copy_data('external_file_03.txt','data_file2.txt') + f = open(mkpath('query42.sql'),'w') + f.write("\\! gpload -f "+mkpath('config/config_file')+ " -d reuse_gptest\n") + f.write("\\! gpload -f "+mkpath('config/config_file2')+ " -d reuse_gptest\n") + f.close() + write_config_file(mode='insert', format='text',file='data_file.txt',table='texttable',delimiter="|") + update_columns = ['s2'] + match_columns = ['n1'] + write_config_file(update_columns=update_columns,match_columns=match_columns,config='config/config_file2',mode='update', update_condition="s3='shpits'", format='text',file='data_file2.txt',table='texttable',delimiter="'") + doTest(42) + +def test_43_gpload_column_without_data_type(): + "43 gpload column name has capital letters and without data type" + file = mkpath('setup.sql') + prepare_test_file(43) + runfile(file) + copy_data('external_file_15.txt','data_file.txt') + columns = [{'"Field1"': ''},{'"Field#2"': ''}] + write_config_file(mode='insert',reuse_tables=True,fast_match=False, file='data_file.txt',table='testSpecialChar',columns=columns, delimiter=";") + doTest(43) diff --git a/gpMgmt/bin/gpload_test/gpload2/conftest.py b/gpMgmt/bin/gpload_test/gpload2/conftest.py new file mode 100644 index 0000000000..e955cfefd4 --- /dev/null +++ b/gpMgmt/bin/gpload_test/gpload2/conftest.py @@ -0,0 +1,15 @@ +import pytest +pytest.register_assert_rewrite('TEST') +from TEST import AnsFile +from TEST import read_diff +import os + +def pytest_assertrepr_compare(config,op, left, right): + + #first: fname + #second: output path + if op == '==': + diff = read_diff(os.path.splitext(left.path)[0], "") + print(diff) + output = ["query resulted in diff:"] + return output \ No newline at end of file diff --git a/gpMgmt/bin/gpload_test/gpload2/data/external_file_03.txt b/gpMgmt/bin/gpload_test/gpload2/data/external_file_03.txt index 5af560d441..2b6ca9b9a6 100644 --- a/gpMgmt/bin/gpload_test/gpload2/data/external_file_03.txt +++ b/gpMgmt/bin/gpload_test/gpload2/data/external_file_03.txt @@ -1,6 +1,6 @@ aaa'qwer'shjhjg'2012-06-01 15:30:30'1'732'834567'45.67'789.123'7.12345'123.456789 bbb'twob'shpits'2011-06-01 12:30:30'2'732'834567'45.67'789.123'7.12345'987.654321 -fff'twof'shpits'2011-06-01 12:30:30'3'732'834567'45.67'789.123'7.12345'654.321987 +fff'twoff'shpits'2011-06-01 12:30:30'3'732'834567'45.67'789.123'7.12345'654.321987 eee'twoe'shpits'2011-06-01 12:30:30'4'732'834567'45.67'789.123'7.12345'145.456789 ggg'twog'shpits'2011-06-01 12:30:30'5'732'834567'45.67'789.123'7.12345'123.222289 iii'twoi'shpits'2011-06-01 12:30:30'6'732'834567'45.67'789.123'7.12345'122.444789 diff --git a/gpMgmt/bin/gpload_test/gpload2/query37.ans b/gpMgmt/bin/gpload_test/gpload2/query37.ans index 312f4ff652..417b545cc2 100644 --- a/gpMgmt/bin/gpload_test/gpload2/query37.ans +++ b/gpMgmt/bin/gpload_test/gpload2/query37.ans @@ -1,22 +1,24 @@ -2018-11-05 22:52:11|INFO|gpload session started 2018-11-05 22:52:11 -2018-11-05 22:52:11|INFO|setting schema 'public' for table 'texttable' -2018-11-05 22:52:11|INFO|started gpfdist -p 8081 -P 8082 -f "/home/gpadmin/workspace/gpdb/gpMgmt/bin/gpload_test/gpload2/data_file.txt" -t 30 -2018-11-05 22:52:11|INFO|did not find a staging table to reuse. creating staging_gpload_reusable_afbaac0da7ced19791c9ab9c537f41d3 -2018-11-05 22:52:11|INFO|did not find an external table to reuse. creating ext_gpload_reusable_601e34fe_e10a_11e8_b2e8_00505698a2d7 -2018-11-05 22:52:11|ERROR|could not run SQL "create external table ext_gpload_reusable_601e34fe_e10a_11e8_b2e8_00505698a2d7("s1" text,"s2" text,"s3" text,"dt" timestamp without time zone,"n1" smallint,"n2" integer,"n3" bigint,"n4" numeric,"n5" numeric,"n6" real,"n7" double precision)location('gpfdist://127.0.0.1:8081//home/gpadmin/workspace/gpdb/gpMgmt/bin/gpload_test/gpload2/data_file.txt') format'text' (delimiter '|' null '\N' escape '\') encoding'xxxx' ": ERROR: xxxx is not a valid encoding name - -2018-11-05 22:52:11|INFO|rows Inserted = 0 -2018-11-05 22:52:11|INFO|rows Updated = 0 -2018-11-05 22:52:11|INFO|data formatting errors = 0 -2018-11-05 22:52:11|INFO|gpload failed -2018-11-05 22:52:11|INFO|gpload session started 2018-11-05 22:52:11 -2018-11-05 22:52:11|INFO|setting schema 'public' for table 'texttable' -2018-11-05 22:52:11|INFO|started gpfdist -p 8081 -P 8082 -f "/home/gpadmin/workspace/gpdb/gpMgmt/bin/gpload_test/gpload2/data_file.txt" -t 30 -2018-11-05 22:52:11|INFO|did not find a staging table to reuse. creating staging_gpload_reusable_afbaac0da7ced19791c9ab9c537f41d3 -2018-11-05 22:52:12|INFO|did not find an external table to reuse. creating ext_gpload_reusable_6067ad3c_e10a_11e8_b378_00505698a2d7 -2018-11-05 22:52:12|ERROR|could not run SQL "create external table ext_gpload_reusable_6067ad3c_e10a_11e8_b378_00505698a2d7("s1" text,"s2" text,"s3" text,"dt" timestamp without time zone,"n1" smallint,"n2" integer,"n3" bigint,"n4" numeric,"n5" numeric,"n6" real,"n7" double precision)location('gpfdist://127.0.0.1:8081//home/gpadmin/workspace/gpdb/gpMgmt/bin/gpload_test/gpload2/data_file.txt') format'text' (delimiter '|' null '\N' escape '\') encoding'xxxx' ": ERROR: xxxx is not a valid encoding name - -2018-11-05 22:52:12|INFO|rows Inserted = 0 -2018-11-05 22:52:12|INFO|rows Updated = 0 -2018-11-05 22:52:12|INFO|data formatting errors = 0 -2018-11-05 22:52:12|INFO|gpload failed +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 's1' as the Greenplum Database data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. +2020-10-19 11:09:21|INFO|gpload session started 2020-10-19 11:09:21 +2020-10-19 11:09:21|INFO|setting schema 'public' for table 'texttable' +2020-10-19 11:09:21|INFO|started gpfdist -p 8081 -P 8082 -f "/home/gpadmin/workspace/gpdb/gpMgmt/bin/gpload_test/gpload2/data_file.txt" -t 30 +2020-10-19 11:09:21|INFO|did not find a staging table to reuse. creating staging_gpload_reusable_afbaac0da7ced19791c9ab9c537f41d3 +2020-10-19 11:09:21|INFO|did not find an external table to reuse. creating ext_gpload_reusable_7bf26200_11b8_11eb_a45c_00505698707d +2020-10-19 11:09:21|ERROR|unexpected error -- backtrace written to log file +2020-10-19 11:09:21|INFO|rows Inserted = 0 +2020-10-19 11:09:21|INFO|rows Updated = 0 +2020-10-19 11:09:21|INFO|data formatting errors = 0 +2020-10-19 11:09:21|INFO|gpload failed +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 's1' as the Greenplum Database data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. +2020-10-19 11:09:22|INFO|gpload session started 2020-10-19 11:09:22 +2020-10-19 11:09:22|INFO|setting schema 'public' for table 'texttable' +2020-10-19 11:09:22|INFO|started gpfdist -p 8081 -P 8082 -f "/home/gpadmin/workspace/gpdb/gpMgmt/bin/gpload_test/gpload2/data_file.txt" -t 30 +2020-10-19 11:09:22|INFO|did not find a staging table to reuse. creating staging_gpload_reusable_afbaac0da7ced19791c9ab9c537f41d3 +2020-10-19 11:09:22|INFO|did not find an external table to reuse. creating ext_gpload_reusable_7c2d9dde_11b8_11eb_af8a_00505698707d +2020-10-19 11:09:22|ERROR|unexpected error -- backtrace written to log file +2020-10-19 11:09:22|INFO|rows Inserted = 0 +2020-10-19 11:09:22|INFO|rows Updated = 0 +2020-10-19 11:09:22|INFO|data formatting errors = 0 +2020-10-19 11:09:22|INFO|gpload failed diff --git a/gpMgmt/bin/gpload_test/gpload2/query42.ans b/gpMgmt/bin/gpload_test/gpload2/query42.ans new file mode 100644 index 0000000000..ea07f0f4e5 --- /dev/null +++ b/gpMgmt/bin/gpload_test/gpload2/query42.ans @@ -0,0 +1,21 @@ +2020-10-19 10:58:33|INFO|gpload session started 2020-10-19 10:58:33 +2020-10-19 10:58:33|INFO|setting schema 'public' for table 'texttable' +2020-10-19 10:58:33|INFO|started gpfdist -p 8081 -P 8082 -f "/home/gpadmin/workspace/gpdb/gpMgmt/bin/gpload_test/gpload2/data_file.txt" -t 30 +2020-10-19 10:58:33|INFO|did not find an external table to reuse. creating ext_gpload_reusable_f96a472c_11b6_11eb_9b79_00505698707d +2020-10-19 10:58:33|INFO|running time: 0.10 seconds +2020-10-19 10:58:33|INFO|rows Inserted = 16 +2020-10-19 10:58:33|INFO|rows Updated = 0 +2020-10-19 10:58:33|INFO|data formatting errors = 0 +2020-10-19 10:58:33|INFO|gpload succeeded +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 's1' as the Greenplum Database data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. +2020-10-19 10:58:33|INFO|gpload session started 2020-10-19 10:58:33 +2020-10-19 10:58:33|INFO|setting schema 'public' for table 'texttable' +2020-10-19 10:58:33|INFO|started gpfdist -p 8081 -P 8082 -f "/home/gpadmin/workspace/gpdb/gpMgmt/bin/gpload_test/gpload2/data_file2.txt" -t 30 +2020-10-19 10:58:33|INFO|did not find a staging table to reuse. creating staging_gpload_reusable_bf2513bc4e5c7466f3cd5abecf21f8f4 +2020-10-19 10:58:33|INFO|did not find an external table to reuse. creating ext_gpload_reusable_f99cbec8_11b6_11eb_9e57_00505698707d +2020-10-19 10:58:33|INFO|running time: 0.11 seconds +2020-10-19 10:58:33|INFO|rows Inserted = 0 +2020-10-19 10:58:33|INFO|rows Updated = 15 +2020-10-19 10:58:33|INFO|data formatting errors = 0 +2020-10-19 10:58:33|INFO|gpload succeeded diff --git a/gpMgmt/bin/gpload_test/gpload2/query43.ans b/gpMgmt/bin/gpload_test/gpload2/query43.ans new file mode 100644 index 0000000000..898b37de61 --- /dev/null +++ b/gpMgmt/bin/gpload_test/gpload2/query43.ans @@ -0,0 +1,18 @@ +2020-10-19 14:21:35|INFO|gpload session started 2020-10-19 14:21:35 +2020-10-19 14:21:35|INFO|setting schema 'public' for table 'testspecialchar' +2020-10-19 14:21:35|INFO|started gpfdist -p 8081 -P 8082 -f "/home/gpadmin/workspace/gpdb/gpMgmt/bin/gpload_test/gpload2/data_file.txt" -t 30 +2020-10-19 14:21:35|INFO|did not find an external table to reuse. creating ext_gpload_reusable_5682442a_11d3_11eb_912a_00505698707d +2020-10-19 14:21:35|INFO|running time: 0.10 seconds +2020-10-19 14:21:35|INFO|rows Inserted = 8 +2020-10-19 14:21:35|INFO|rows Updated = 0 +2020-10-19 14:21:35|INFO|data formatting errors = 0 +2020-10-19 14:21:35|INFO|gpload succeeded +2020-10-19 14:21:35|INFO|gpload session started 2020-10-19 14:21:35 +2020-10-19 14:21:35|INFO|setting schema 'public' for table 'testspecialchar' +2020-10-19 14:21:35|INFO|started gpfdist -p 8081 -P 8082 -f "/home/gpadmin/workspace/gpdb/gpMgmt/bin/gpload_test/gpload2/data_file.txt" -t 30 +2020-10-19 14:21:35|INFO|reusing external table ext_gpload_reusable_5682442a_11d3_11eb_912a_00505698707d +2020-10-19 14:21:35|INFO|running time: 0.08 seconds +2020-10-19 14:21:35|INFO|rows Inserted = 8 +2020-10-19 14:21:35|INFO|rows Updated = 0 +2020-10-19 14:21:35|INFO|data formatting errors = 0 +2020-10-19 14:21:35|INFO|gpload succeeded -- GitLab