Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

[AP-591] Use SHOW SCHEMAS|TABLES|COLUMNS instead of INFORMATION_SCHEMA #67

Merged
merged 1 commit into from
Mar 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 16 additions & 38 deletions target_snowflake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,6 @@ class InvalidValidationOperationException(Exception):
pass


class InvalidTableStructureException(Exception):
"""Exception to raise when target table structure not compatible with singer messages"""
pass


def float_to_decimal(value):
"""Walk the given data structure and turn all instances of float into double."""
if isinstance(value, float):
Expand Down Expand Up @@ -113,17 +108,16 @@ def get_schema_names_from_config(config):
return schema_names


def load_information_schema_cache(config):
information_schema_cache = []
def load_table_cache(config):
table_cache = []
if not ('disable_table_cache' in config and config['disable_table_cache']):
LOGGER.info("Getting catalog objects from information_schema cache table...")
LOGGER.info("Getting catalog objects from table cache...")

db = DbSync(config)
information_schema_cache = db.get_table_columns(
table_schemas=get_schema_names_from_config(config),
from_information_schema_cache_table=True)
table_cache = db.get_table_columns(
table_schemas=get_schema_names_from_config(config))

return information_schema_cache
return table_cache


def adjust_timestamps_in_record(record: Dict, schema: Dict) -> None:
Expand Down Expand Up @@ -155,7 +149,7 @@ def reset_new_value(record: Dict, key: str, format: str):


# pylint: disable=too-many-locals,too-many-branches,too-many-statements
def persist_lines(config, lines, information_schema_cache=None) -> None:
def persist_lines(config, lines, table_cache=None) -> None:
state = None
flushed_state = None
schemas = {}
Expand Down Expand Up @@ -279,21 +273,12 @@ def persist_lines(config, lines, information_schema_cache=None) -> None:
key_properties[stream] = o['key_properties']

if config.get('add_metadata_columns') or config.get('hard_delete'):
stream_to_sync[stream] = DbSync(config, add_metadata_columns_to_schema(o), information_schema_cache)
stream_to_sync[stream] = DbSync(config, add_metadata_columns_to_schema(o), table_cache)
else:
stream_to_sync[stream] = DbSync(config, o, information_schema_cache)

try:
stream_to_sync[stream].create_schema_if_not_exists()
stream_to_sync[stream].sync_table()
except Exception as ex:
LOGGER.exception(ex)
raise InvalidTableStructureException("""
Cannot sync table structure in Snowflake schema: {} .
Try to delete {}.COLUMNS table to reset information_schema cache. Maybe it's outdated.
""".format(
stream_to_sync[stream].schema_name,
stream_to_sync[stream].pipelinewise_schema.upper()))
stream_to_sync[stream] = DbSync(config, o, table_cache)

stream_to_sync[stream].create_schema_if_not_exists()
stream_to_sync[stream].sync_table()

row_count[stream] = 0
total_row_count[stream] = 0
Expand Down Expand Up @@ -421,14 +406,7 @@ def flush_records(stream, records_to_load, row_count, db_sync, temp_dir=None):
f.write(bytes(csv_line + '\n', 'UTF-8'))

s3_key = db_sync.put_to_stage(csv_file, stream, row_count, temp_dir=temp_dir)
try:
db_sync.load_csv(s3_key, row_count)
except Exception as e:
LOGGER.error("""
Cannot load data from S3 into Snowflake schema: {} .
Try to delete {}.COLUMNS table to reset information_schema cache. Maybe it's outdated.
""".format(db_sync.schema_name, db_sync.pipelinewise_schema.upper()))
raise e
db_sync.load_csv(s3_key, row_count)

os.remove(csv_file)
db_sync.delete_from_stage(s3_key)
Expand All @@ -445,12 +423,12 @@ def main():
else:
config = {}

# Init information schema cache
information_schema_cache = load_information_schema_cache(config)
# Init columns cache
table_cache = load_table_cache(config)

# Consume singer messages
singer_messages = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8')
persist_lines(config, singer_messages, information_schema_cache)
persist_lines(config, singer_messages, table_cache)

LOGGER.debug("Exiting normally")

Expand Down
Loading