diff --git a/CHANGELOG.md b/CHANGELOG.md index 567bb4041..1f4a030d3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,10 +1,13 @@ ## dbt-spark 1.0.0 (Release TBD) +### Under the hood +- Refactor seed macros: remove duplicated code from dbt-core, and provide clearer logging of SQL parameters that differ by connection method ([#249](https://github.com/dbt-labs/dbt-spark/issues/249), [#250](https://github.com/dbt-labs/dbt-snowflake/pull/250)) + ## dbt-spark 1.0.0rc1 (November 10, 2021) ### Under the hood - - Remove official support for python 3.6, which is reaching end of life on December 23, 2021 ([dbt-core#4134](https://github.com/dbt-labs/dbt-core/issues/4134), [#253](https://github.com/dbt-labs/dbt-snowflake/pull/253)) +- Add support for structured logging [#251](https://github.com/dbt-labs/dbt-spark/pull/251) ### Under the hood - Add support for structured logging [#251](https://github.com/dbt-labs/dbt-spark/pull/251) diff --git a/dbt/include/spark/macros/materializations/seed.sql b/dbt/include/spark/macros/materializations/seed.sql index 536e6447b..196479cb0 100644 --- a/dbt/include/spark/macros/materializations/seed.sql +++ b/dbt/include/spark/macros/materializations/seed.sql @@ -1,40 +1,8 @@ -{% macro spark__load_csv_rows(model, agate_table) %} - {% set batch_size = 1000 %} - {% set column_override = model['config'].get('column_types', {}) %} - - {% set statements = [] %} - - {% for chunk in agate_table.rows | batch(batch_size) %} - {% set bindings = [] %} - - {% for row in chunk %} - {% do bindings.extend(row) %} - {% endfor %} - - {% set sql %} - insert into {{ this.render() }} values - {% for row in chunk -%} - ({%- for col_name in agate_table.column_names -%} - {%- set inferred_type = adapter.convert_type(agate_table, loop.index0) -%} - {%- set type = column_override.get(col_name, inferred_type) -%} - cast(%s as {{type}}) - {%- if not loop.last%},{%- endif %} - {%- endfor -%}) - {%- if not loop.last%},{%- endif %} - {%- endfor %} - {% endset %} - - {% do adapter.add_query(sql, bindings=bindings, abridge_sql_log=True) %} - - {% if loop.index0 == 0 %} - {% do statements.append(sql) %} - {% endif %} - {% endfor %} - - {# Return SQL so we can render it out into the compiled files #} - {{ return(statements[0]) }} +{% macro spark__get_binding_char() %} + {{ return('?' if target.method == 'odbc' else '%s') }} {% endmacro %} + {% macro spark__reset_csv_table(model, full_refresh, old_relation, agate_table) %} {% if old_relation %} {{ adapter.drop_relation(old_relation) }} @@ -44,6 +12,45 @@ {% endmacro %} +{% macro spark__load_csv_rows(model, agate_table) %} + + {% set batch_size = get_batch_size() %} + {% set column_override = model['config'].get('column_types', {}) %} + + {% set statements = [] %} + + {% for chunk in agate_table.rows | batch(batch_size) %} + {% set bindings = [] %} + + {% for row in chunk %} + {% do bindings.extend(row) %} + {% endfor %} + + {% set sql %} + insert into {{ this.render() }} values + {% for row in chunk -%} + ({%- for col_name in agate_table.column_names -%} + {%- set inferred_type = adapter.convert_type(agate_table, loop.index0) -%} + {%- set type = column_override.get(col_name, inferred_type) -%} + cast({{ get_binding_char() }} as {{type}}) + {%- if not loop.last%},{%- endif %} + {%- endfor -%}) + {%- if not loop.last%},{%- endif %} + {%- endfor %} + {% endset %} + + {% do adapter.add_query(sql, bindings=bindings, abridge_sql_log=True) %} + + {% if loop.index0 == 0 %} + {% do statements.append(sql) %} + {% endif %} + {% endfor %} + + {# Return SQL so we can render it out into the compiled files #} + {{ return(statements[0]) }} +{% endmacro %} + + {% macro spark__create_csv_table(model, agate_table) %} {%- set column_override = model['config'].get('column_types', {}) -%} {%- set quote_seed_column = model['config'].get('quote_columns', None) -%} @@ -70,35 +77,3 @@ {{ return(sql) }} {% endmacro %} - - -{% materialization seed, adapter='spark' %} - - {%- set identifier = model['alias'] -%} - {%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%} - {%- set target_relation = api.Relation.create(database=database, schema=schema, identifier=identifier, - type='table') -%} - {%- set agate_table = load_agate_table() -%} - {%- do store_result('agate_table', response='OK', agate_table=agate_table) -%} - - {{ run_hooks(pre_hooks) }} - - -- build model - {% set create_table_sql = reset_csv_table(model, full_refresh_mode, old_relation, agate_table) %} - {% set status = 'CREATE' %} - {% set num_rows = (agate_table.rows | length) %} - {% set sql = load_csv_rows(model, agate_table) %} - - {% call noop_statement('main', status ~ ' ' ~ num_rows) %} - {{ create_table_sql }}; - -- dbt seed -- - {{ sql }} - {% endcall %} - - {% do persist_docs(target_relation, model) %} - - {{ run_hooks(post_hooks) }} - - {{ return({'relations': [target_relation]}) }} - -{% endmaterialization %}