diff --git a/target_snowflake/__init__.py b/target_snowflake/__init__.py index d3095559..822e5ab1 100644 --- a/target_snowflake/__init__.py +++ b/target_snowflake/__init__.py @@ -46,11 +46,6 @@ class InvalidValidationOperationException(Exception): pass -class InvalidTableStructureException(Exception): - """Exception to raise when target table structure not compatible with singer messages""" - pass - - def float_to_decimal(value): """Walk the given data structure and turn all instances of float into double.""" if isinstance(value, float): @@ -113,17 +108,16 @@ def get_schema_names_from_config(config): return schema_names -def load_information_schema_cache(config): - information_schema_cache = [] +def load_table_cache(config): + table_cache = [] if not ('disable_table_cache' in config and config['disable_table_cache']): - LOGGER.info("Getting catalog objects from information_schema cache table...") + LOGGER.info("Getting catalog objects from table cache...") db = DbSync(config) - information_schema_cache = db.get_table_columns( - table_schemas=get_schema_names_from_config(config), - from_information_schema_cache_table=True) + table_cache = db.get_table_columns( + table_schemas=get_schema_names_from_config(config)) - return information_schema_cache + return table_cache def adjust_timestamps_in_record(record: Dict, schema: Dict) -> None: @@ -155,7 +149,7 @@ def reset_new_value(record: Dict, key: str, format: str): # pylint: disable=too-many-locals,too-many-branches,too-many-statements -def persist_lines(config, lines, information_schema_cache=None) -> None: +def persist_lines(config, lines, table_cache=None) -> None: state = None flushed_state = None schemas = {} @@ -279,21 +273,12 @@ def persist_lines(config, lines, information_schema_cache=None) -> None: key_properties[stream] = o['key_properties'] if config.get('add_metadata_columns') or config.get('hard_delete'): - stream_to_sync[stream] = DbSync(config, add_metadata_columns_to_schema(o), information_schema_cache) + stream_to_sync[stream] = DbSync(config, add_metadata_columns_to_schema(o), table_cache) else: - stream_to_sync[stream] = DbSync(config, o, information_schema_cache) - - try: - stream_to_sync[stream].create_schema_if_not_exists() - stream_to_sync[stream].sync_table() - except Exception as ex: - LOGGER.exception(ex) - raise InvalidTableStructureException(""" - Cannot sync table structure in Snowflake schema: {} . - Try to delete {}.COLUMNS table to reset information_schema cache. Maybe it's outdated. - """.format( - stream_to_sync[stream].schema_name, - stream_to_sync[stream].pipelinewise_schema.upper())) + stream_to_sync[stream] = DbSync(config, o, table_cache) + + stream_to_sync[stream].create_schema_if_not_exists() + stream_to_sync[stream].sync_table() row_count[stream] = 0 total_row_count[stream] = 0 @@ -421,14 +406,7 @@ def flush_records(stream, records_to_load, row_count, db_sync, temp_dir=None): f.write(bytes(csv_line + '\n', 'UTF-8')) s3_key = db_sync.put_to_stage(csv_file, stream, row_count, temp_dir=temp_dir) - try: - db_sync.load_csv(s3_key, row_count) - except Exception as e: - LOGGER.error(""" - Cannot load data from S3 into Snowflake schema: {} . - Try to delete {}.COLUMNS table to reset information_schema cache. Maybe it's outdated. - """.format(db_sync.schema_name, db_sync.pipelinewise_schema.upper())) - raise e + db_sync.load_csv(s3_key, row_count) os.remove(csv_file) db_sync.delete_from_stage(s3_key) @@ -445,12 +423,12 @@ def main(): else: config = {} - # Init information schema cache - information_schema_cache = load_information_schema_cache(config) + # Init columns cache + table_cache = load_table_cache(config) # Consume singer messages singer_messages = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8') - persist_lines(config, singer_messages, information_schema_cache) + persist_lines(config, singer_messages, table_cache) LOGGER.debug("Exiting normally") diff --git a/target_snowflake/db_sync.py b/target_snowflake/db_sync.py index 38e2a1f9..0da37b2e 100644 --- a/target_snowflake/db_sync.py +++ b/target_snowflake/db_sync.py @@ -15,6 +15,11 @@ from snowflake.connector.remote_storage_util import SnowflakeFileEncryptionMaterial +class TooManyRecordsException(Exception): + """Exception to raise when query returns more records than max_records""" + pass + + def validate_config(config): errors = [] required_config_keys = [ @@ -187,7 +192,7 @@ def stream_name_to_dict(stream_name, separator='-'): # pylint: disable=too-many-public-methods,too-many-instance-attributes class DbSync: - def __init__(self, connection_config, stream_schema_message=None, information_schema_cache=None): + def __init__(self, connection_config, stream_schema_message=None, table_cache=None): """ connection_config: Snowflake connection details @@ -208,7 +213,7 @@ def __init__(self, connection_config, stream_schema_message=None, information_sc """ self.connection_config = connection_config self.stream_schema_message = stream_schema_message - self.information_schema_columns = information_schema_cache + self.table_cache = table_cache # logger to be used across the class's methods self.logger = get_logger('target_snowflake') @@ -221,11 +226,8 @@ def __init__(self, connection_config, stream_schema_message=None, information_sc self.logger.error("Invalid configuration:\n * {}".format('\n * '.join(config_errors))) sys.exit(1) - # Internal pipelinewise schema derived from the stage object in the config stage = stream_name_to_dict(self.connection_config['stage'], separator='.') - if stage['schema_name']: - self.pipelinewise_schema = stage['schema_name'] - else: + if not stage['schema_name']: self.logger.error( "The named external stage object in config has to use the . format.") sys.exit(1) @@ -303,7 +305,7 @@ def open_connection(self): autocommit=True ) - def query(self, query, params=None): + def query(self, query, params=None, max_records=0): result = [] with self.open_connection() as connection: with connection.cursor(snowflake.connector.DictCursor) as cur: @@ -320,6 +322,11 @@ def query(self, query, params=None): self.logger.debug("SNOWFLAKE - Running query: {}".format(q)) cur.execute(q, params) + # Raise exception if returned rows greater than max allowed records + if 0 < max_records < cur.rowcount: + raise TooManyRecordsException( + f"Query returned too many records. This query can return max {max_records} records") + if cur.rowcount > 0: result = cur.fetchall() @@ -331,7 +338,7 @@ def table_name(self, stream_name, is_temporary, without_schema = False): sf_table_name = table_name.replace('.', '_').replace('-', '_').lower() if is_temporary: - sf_table_name = '{}_temp'.format(sf_table_name) + sf_table_name = '{}_temp'.format(sf_table_name) if without_schema: return f'"{sf_table_name.upper()}"' @@ -363,7 +370,7 @@ def record_to_csv_line(self, record): def put_to_stage(self, file, stream, count, temp_dir=None): self.logger.info("Uploading {} rows to external snowflake stage on S3".format(count)) - # Generating key in S3 bucket + # Generating key in S3 bucket bucket = self.connection_config['s3_bucket'] s3_key_prefix = self.connection_config.get('s3_key_prefix', '') s3_key = "{}pipelinewise_{}_{}.csv".format(s3_key_prefix, stream, datetime.datetime.now().strftime("%Y%m%d-%H%M%S-%f")) @@ -407,39 +414,6 @@ def delete_from_stage(self, s3_key): bucket = self.connection_config['s3_bucket'] self.s3.delete_object(Bucket=bucket, Key=s3_key) - def cache_information_schema_columns(self, table_schemas=[], create_only=False): - """Information_schema_columns cache is a copy of snowflake INFORMATION_SCHAME.COLUMNS table to avoid the error of - 'Information schema query returned too much data. Please repeat query with more selective predicates.'. - - Snowflake gives the above error message when running multiple taps in parallel (approx. >10 taps) and - when these taps selecting from information_schema at the same time. To avoid this problem we maintain a - local copy of the INFORMATION_SCHAME.COLUMNS table and it's keep updating automatically whenever it's needed. - """ - - # Create empty columns cache table if not exists - self.query(""" - CREATE SCHEMA IF NOT EXISTS {} - """.format(self.pipelinewise_schema)) - self.query(""" - CREATE TABLE IF NOT EXISTS {}.columns (table_schema VARCHAR, table_name VARCHAR, column_name VARCHAR, data_type VARCHAR) - """.format(self.pipelinewise_schema)) - - if not create_only and table_schemas: - # Delete existing data about the current schema - delete = """ - DELETE FROM {}.columns - """.format(self.pipelinewise_schema) - delete = delete + " WHERE LOWER(table_schema) IN ({})".format(', '.join("'{}'".format(s).lower() for s in table_schemas)) - - # Insert the latest data from information_schema into the cache table - insert = """ - INSERT INTO {}.columns - SELECT table_schema, table_name, column_name, data_type - FROM information_schema.columns - """.format(self.pipelinewise_schema) - insert = insert + " WHERE LOWER(table_schema) IN ({})".format(', '.join("'{}'".format(s).lower() for s in table_schemas)) - self.query([delete, insert]) - def load_csv(self, s3_key, count): stream_schema_message = self.stream_schema_message stream = stream_schema_message['stream'] @@ -557,15 +531,12 @@ def create_schema_if_not_exists(self): schema_name = self.schema_name schema_rows = 0 - # information_schema_columns is an optional pre-collected list of available objects in snowflake - if self.information_schema_columns: - schema_rows = list(filter(lambda x: x['TABLE_SCHEMA'] == schema_name, self.information_schema_columns)) + # table_cache is an optional pre-collected list of available objects in snowflake + if self.table_cache: + schema_rows = list(filter(lambda x: x['SCHEMA_NAME'] == schema_name.upper(), self.table_cache)) # Query realtime if not pre-collected else: - schema_rows = self.query( - 'SELECT LOWER(schema_name) schema_name FROM information_schema.schemata WHERE LOWER(schema_name) = %s', - (schema_name.lower(),) - ) + schema_rows = self.query(f"SHOW SCHEMAS LIKE '{schema_name.upper()}'") if len(schema_rows) == 0: query = "CREATE SCHEMA IF NOT EXISTS {}".format(schema_name) @@ -574,52 +545,99 @@ def create_schema_if_not_exists(self): self.grant_privilege(schema_name, self.grantees, self.grant_usage_on_schema) - def get_tables(self, table_schema=None): - return self.query("""SELECT LOWER(table_schema) table_schema, LOWER(table_name) table_name - FROM information_schema.tables - WHERE LOWER(table_schema) = {}""".format( - "LOWER(table_schema)" if table_schema is None else "'{}'".format(table_schema.lower()) - )) + # Refresh columns cache if required + if self.table_cache: + self.table_cache = self.get_table_columns(table_schemas=[self.schema_name]) + + def get_tables(self, table_schemas=[]): + tables = [] + if table_schemas: + for schema in table_schemas: + queries = [] + + # Get column data types by SHOW COLUMNS + show_tables = f"SHOW TERSE TABLES IN SCHEMA {self.connection_config['dbname']}.{schema}" + + # Convert output of SHOW COLUMNS to table and insert restuls into the cache COLUMNS table + select = f""" + SELECT "schema_name" AS schema_name + ,"name" AS table_name + FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())) + """ + queries.extend([show_tables, select]) + + # Run everything in one transaction + self.query(show_tables, max_records=9999) + else: + raise Exception("Cannot get table columns. List of table schemas empty") - def get_table_columns(self, table_schemas=[], table_name=None, from_information_schema_cache_table=False): - if from_information_schema_cache_table: - self.cache_information_schema_columns(create_only=True) + return tables - # Select columns + def get_table_columns(self, table_schemas=[], table_name=None): table_columns = [] if table_schemas or table_name: - sql = """SELECT LOWER(c.table_schema) table_schema, LOWER(c.table_name) table_name, c.column_name, c.data_type - FROM {}.columns c - """.format("information_schema" if not from_information_schema_cache_table else self.pipelinewise_schema) - if table_schemas: - sql = sql + " WHERE LOWER(c.table_schema) IN ({})".format(', '.join("'{}'".format(s).lower() for s in table_schemas)) - elif table_name: - sql = sql + " WHERE LOWER(c.table_name) = '{}'".format(table_name.lower()) - table_columns = self.query(sql) + for schema in table_schemas: + queries = [] + + # Get column data types by SHOW COLUMNS + show_columns = f"SHOW COLUMNS IN SCHEMA {self.connection_config['dbname']}.{schema}" + + # Convert output of SHOW COLUMNS to table and insert results into the cache COLUMNS table + select = f""" + SELECT "schema_name" AS schema_name + ,"table_name" AS table_name + ,"column_name" AS column_name + -- ---------------------------------------------------------------------------------------- + -- Character and numeric columns display their generic data type rather than their defined + -- data type (i.e. TEXT for all character types, FIXED for all fixed-point numeric types, + -- and REAL for all floating-point numeric types). + -- + -- Further info at https://docs.snowflake.net/manuals/sql-reference/sql/show-columns.html + -- ---------------------------------------------------------------------------------------- + ,CASE PARSE_JSON("data_type"):type::varchar + WHEN 'FIXED' THEN 'NUMBER' + WHEN 'REAL' THEN 'FLOAT' + ELSE PARSE_JSON("data_type"):type::varchar + END data_type + FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())) + """ + queries.extend([show_columns, select]) + + # Run everything in one transaction + try: + columns = self.query(queries, max_records=9999) + table_columns.extend(columns) + + # Catch exception when schema not exists and SHOW COLUMNS throws a ProgrammingError + # Regexp to extract snowflake error code and message from the exception message + # Do nothing if schema not exists + except snowflake.connector.errors.ProgrammingError as exc: + if re.match('002003 \(02000\):.*\n.*does not exist or not authorized.*', str(sys.exc_info()[1])): + pass + else: + raise exc + else: raise Exception("Cannot get table columns. List of table schemas empty") - # Refresh cached information_schema if no results - if from_information_schema_cache_table and len(table_columns) == 0: - self.cache_information_schema_columns(table_schemas=table_schemas) - table_columns = self.query(sql) - - # Get columns from cache or information_schema and return results return table_columns + def refresh_table_cache(self): + self.table_cache = self.get_table_columns([self.schema_name]) + def update_columns(self): stream_schema_message = self.stream_schema_message stream = stream_schema_message['stream'] table_name = self.table_name(stream, False, True) - if self.information_schema_columns: - columns = list(filter(lambda x: x['TABLE_SCHEMA'] == self.schema_name.lower() and + if self.table_cache: + columns = list(filter(lambda x: x['SCHEMA_NAME'] == self.schema_name.upper() and f'"{x["TABLE_NAME"].upper()}"' == table_name, - self.information_schema_columns)) + self.table_cache)) else: columns = self.get_table_columns(table_schemas=[self.schema_name], table_name=table_name) - columns_dict = {column['COLUMN_NAME'].lower(): column for column in columns} + columns_dict = {column['COLUMN_NAME'].upper(): column for column in columns} columns_to_add = [ column_clause( @@ -627,7 +645,7 @@ def update_columns(self): properties_schema ) for (name, properties_schema) in self.flatten_schema.items() - if name.lower() not in columns_dict + if name.upper() not in columns_dict ] for column in columns_to_add: @@ -639,8 +657,8 @@ def update_columns(self): properties_schema )) for (name, properties_schema) in self.flatten_schema.items() - if name.lower() in columns_dict and - columns_dict[name.lower()]['DATA_TYPE'].lower() != column_type(properties_schema).lower() and + if name.upper() in columns_dict and + columns_dict[name.upper()]['DATA_TYPE'].upper() != column_type(properties_schema).upper() and # Don't alter table if TIMESTAMP_NTZ detected as the new required column type # @@ -652,7 +670,7 @@ def update_columns(self): # TODO: Support both TIMESTAMP_TZ and TIMESTAMP_NTZ in target-snowflake # when extracting data-time values from JSON # (Check the column_type function for further details) - column_type(properties_schema).lower() != 'timestamp_ntz' + column_type(properties_schema).upper() != 'TIMESTAMP_NTZ' ] for (column_name, column) in columns_to_replace: @@ -660,9 +678,9 @@ def update_columns(self): self.version_column(column_name, stream) self.add_column(column, stream) - # Refresh columns cache if required - if self.information_schema_columns and (len(columns_to_add) > 0 or len(columns_to_replace)): - self.cache_information_schema_columns(table_schemas=[self.schema_name]) + # Refresh table cache if required + if self.table_cache and (len(columns_to_add) > 0 or len(columns_to_replace)): + self.table_cache = self.get_table_columns(table_schemas=[self.schema_name]) def drop_column(self, column_name, stream): drop_column = "ALTER TABLE {} DROP COLUMN {}".format(self.table_name(stream, False), column_name) @@ -685,12 +703,12 @@ def sync_table(self): table_name = self.table_name(stream, False, True) table_name_with_schema = self.table_name(stream, False) - if self.information_schema_columns: - found_tables = list(filter(lambda x: x['TABLE_SCHEMA'] == self.schema_name.lower() and + if self.table_cache: + found_tables = list(filter(lambda x: x['SCHEMA_NAME'] == self.schema_name.upper() and f'"{x["TABLE_NAME"].upper()}"' == table_name, - self.information_schema_columns)) + self.table_cache)) else: - found_tables = [table for table in (self.get_tables(self.schema_name.lower())) + found_tables = [table for table in (self.get_tables([self.schema_name.upper()])) if f'"{table["TABLE_NAME"].upper()}"' == table_name] if len(found_tables) == 0: @@ -701,8 +719,8 @@ def sync_table(self): self.grant_privilege(self.schema_name, self.grantees, self.grant_select_on_all_tables_in_schema) # Refresh columns cache if required - if self.information_schema_columns: - self.cache_information_schema_columns(table_schemas=[self.schema_name]) + if self.table_cache: + self.table_cache = self.get_table_columns(table_schemas=[self.schema_name]) else: self.logger.info("Table '{}' exists".format(table_name_with_schema)) self.update_columns() diff --git a/tests/integration/test_target_snowflake.py b/tests/integration/test_target_snowflake.py index 9ade2b01..07e4d67e 100644 --- a/tests/integration/test_target_snowflake.py +++ b/tests/integration/test_target_snowflake.py @@ -38,11 +38,6 @@ def setUp(self): if self.config['default_target_schema']: snowflake.query("DROP SCHEMA IF EXISTS {}".format(self.config['default_target_schema'])) - # Delete target schema entries from PIPELINEWISE.COLUMNS - if self.config['stage']: - snowflake.query("CREATE TABLE IF NOT EXISTS {}.columns (table_schema VARCHAR, table_name VARCHAR, column_name VARCHAR, data_type VARCHAR)".format(snowflake.pipelinewise_schema)) - snowflake.query("DELETE FROM {}.columns WHERE TABLE_SCHEMA ilike '{}'".format(snowflake.pipelinewise_schema, self.config['default_target_schema'])) - def persist_lines_with_cache(self, lines): """Enables table caching option and loads singer messages into snowflake. @@ -54,8 +49,8 @@ def persist_lines_with_cache(self, lines): Selecting from a real table instead of INFORMATION_SCHEMA and keeping it in memory while the target-snowflake is running results better load performance. """ - information_schema_cache = target_snowflake.load_information_schema_cache(self.config) - target_snowflake.persist_lines(self.config, lines, information_schema_cache) + table_cache = target_snowflake.load_table_cache(self.config) + target_snowflake.persist_lines(self.config, lines, table_cache) def remove_metadata_columns_from_rows(self, rows): """Removes metadata columns from a list of rows""" @@ -585,113 +580,6 @@ def test_logical_streams_from_pg_with_hard_delete_and_batch_size_of_5_and_no_rec self.assert_logical_streams_are_in_snowflake_and_are_empty() - def test_information_schema_cache_create_and_update(self): - """Newly created and altered tables must be cached automatically for later use. - - Information_schema_columns cache is a copy of snowflake INFORMATION_SCHAME.COLUMNS table to avoid the error of - 'Information schema query returned too much data. Please repeat query with more selective predicates.'. - """ - tap_lines_before_column_name_change = test_utils.get_test_tap_lines('messages-with-three-streams.json') - tap_lines_after_column_name_change = test_utils.get_test_tap_lines( - 'messages-with-three-streams-modified-column.json') - - # Load with default settings - self.persist_lines_with_cache(tap_lines_before_column_name_change) - self.persist_lines_with_cache(tap_lines_after_column_name_change) - - # Get data form information_schema cache table - snowflake = DbSync(self.config) - target_schema = self.config.get('default_target_schema', '') - information_schema_cache = snowflake.query(""" - SELECT table_schema - ,table_name - ,column_name - ,data_type - FROM {}.columns - WHERE table_schema = '{}' - ORDER BY table_schema, table_name, column_name - """.format( - snowflake.pipelinewise_schema, - target_schema.upper())) - - # Get the previous column name from information schema in test_table_two - previous_column_name = snowflake.query(""" - SELECT column_name - FROM information_schema.columns - WHERE table_catalog = '{}' - AND table_schema = '{}' - AND table_name = 'TEST_TABLE_TWO' - AND ordinal_position = 1 - """.format( - self.config.get('dbname', '').upper(), - target_schema.upper()))[0]["COLUMN_NAME"] - - # Every column has to be in the cached information_schema with the latest versions - self.assertEqual( - information_schema_cache, - [ - {'TABLE_SCHEMA': target_schema.upper(), 'TABLE_NAME': 'TEST_TABLE_ONE', 'COLUMN_NAME': 'C_INT', - 'DATA_TYPE': 'NUMBER'}, - {'TABLE_SCHEMA': target_schema.upper(), 'TABLE_NAME': 'TEST_TABLE_ONE', 'COLUMN_NAME': 'C_PK', - 'DATA_TYPE': 'NUMBER'}, - {'TABLE_SCHEMA': target_schema.upper(), 'TABLE_NAME': 'TEST_TABLE_ONE', 'COLUMN_NAME': 'C_VARCHAR', - 'DATA_TYPE': 'TEXT'}, - - {'TABLE_SCHEMA': target_schema.upper(), 'TABLE_NAME': 'TEST_TABLE_THREE', 'COLUMN_NAME': 'C_INT', - 'DATA_TYPE': 'NUMBER'}, - {'TABLE_SCHEMA': target_schema.upper(), 'TABLE_NAME': 'TEST_TABLE_THREE', 'COLUMN_NAME': 'C_PK', - 'DATA_TYPE': 'NUMBER'}, - {'TABLE_SCHEMA': target_schema.upper(), 'TABLE_NAME': 'TEST_TABLE_THREE', 'COLUMN_NAME': 'C_TIME', - 'DATA_TYPE': 'TIME'}, - {'TABLE_SCHEMA': target_schema.upper(), 'TABLE_NAME': 'TEST_TABLE_THREE', 'COLUMN_NAME': 'C_TIME_RENAMED', - 'DATA_TYPE': 'TIME'}, - {'TABLE_SCHEMA': target_schema.upper(), 'TABLE_NAME': 'TEST_TABLE_THREE', 'COLUMN_NAME': 'C_VARCHAR', - 'DATA_TYPE': 'TEXT'}, - - {'TABLE_SCHEMA': target_schema.upper(), 'TABLE_NAME': 'TEST_TABLE_TWO', 'COLUMN_NAME': 'C_DATE', - 'DATA_TYPE': 'TEXT'}, - {'TABLE_SCHEMA': target_schema.upper(), 'TABLE_NAME': 'TEST_TABLE_TWO', 'COLUMN_NAME': previous_column_name, - 'DATA_TYPE': 'TIMESTAMP_NTZ'}, - {'TABLE_SCHEMA': target_schema.upper(), 'TABLE_NAME': 'TEST_TABLE_TWO', 'COLUMN_NAME': 'C_INT', - 'DATA_TYPE': 'NUMBER'}, - {'TABLE_SCHEMA': target_schema.upper(), 'TABLE_NAME': 'TEST_TABLE_TWO', 'COLUMN_NAME': 'C_PK', - 'DATA_TYPE': 'NUMBER'}, - {'TABLE_SCHEMA': target_schema.upper(), 'TABLE_NAME': 'TEST_TABLE_TWO', 'COLUMN_NAME': 'C_VARCHAR', - 'DATA_TYPE': 'TEXT'} - ]) - - def test_information_schema_cache_outdated(self): - """If informations schema cache is not up to date then it should fail""" - tap_lines_with_multi_streams = test_utils.get_test_tap_lines('messages-with-three-streams.json') - - # 1) Simulate an out of data cache: - # Table is in cache but not exists in database - snowflake = DbSync(self.config) - target_schema = self.config.get('default_target_schema', '').upper() - snowflake.query(""" - CREATE TABLE IF NOT EXISTS {}.columns (table_schema VARCHAR, table_name VARCHAR, column_name VARCHAR, data_type VARCHAR) - """.format(snowflake.pipelinewise_schema)) - snowflake.query(""" - INSERT INTO {0}.columns (table_schema, table_name, column_name, data_type) - SELECT '{1}', 'TEST_TABLE_ONE', 'DUMMY_COLUMN_1', 'TEXT' UNION - SELECT '{1}', 'TEST_TABLE_ONE', 'DUMMY_COLUMN_2', 'TEXT' UNION - SELECT '{1}', 'TEST_TABLE_TWO', 'DUMMY_COLUMN_3', 'TEXT' - """.format(snowflake.pipelinewise_schema, target_schema)) - - # Loading into an outdated information_schema cache should fail with table not exists - with self.assertRaises(target_snowflake.InvalidTableStructureException): - self.persist_lines_with_cache(tap_lines_with_multi_streams) - - # 2) Simulate an out of data cache: - # Table is in cache structure is not in sync with the actual table in the database - snowflake.query("CREATE SCHEMA IF NOT EXISTS {}".format(target_schema)) - snowflake.query("CREATE OR REPLACE TABLE {}.test_table_one (C_PK NUMBER, C_INT NUMBER, C_VARCHAR TEXT)".format(target_schema)) - - # Loading into an outdated information_schema cache should fail with columns exists - # It should try adding the new column based on the values in cache but the column already exists - with self.assertRaises(Exception): - self.persist_lines_with_cache(tap_lines_with_multi_streams) - @mock.patch('target_snowflake.emit_state') def test_flush_streams_with_no_intermediate_flushes(self, mock_emit_state): """Test emitting states when no intermediate flush required""" @@ -930,3 +818,15 @@ def test_using_aws_environment_variables(self): finally: del os.environ["AWS_ACCESS_KEY_ID"] del os.environ["AWS_SECRET_ACCESS_KEY"] + + def test_too_many_records_exception(self): + """Test if query function raise exception if max_records exceeded""" + snowflake = DbSync(self.config) + + # No max_record limit by default + sample_rows = snowflake.query("SELECT seq4() FROM TABLE(GENERATOR(ROWCOUNT => 50000))") + self.assertEqual(len(sample_rows), 50000) + + # Should raise exception when max_records exceeded + with assert_raises(target_snowflake.db_sync.TooManyRecordsException): + snowflake.query("SELECT seq4() FROM TABLE(GENERATOR(ROWCOUNT => 50000))", max_records=10000)