Skip to content

Commit

Permalink
Refactor seed macros, clearer sql param logging (dbt-labs#250)
Browse files Browse the repository at this point in the history
* Try refactoring seed macros

* Add changelog entry
  • Loading branch information
jtcohen6 authored and Vinoth Govindarajan committed Nov 17, 2021
1 parent b3ac59a commit 1445d16
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 68 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
## dbt-spark 1.0.0 (Release TBD)

### Under the hood
- Refactor seed macros: remove duplicated code from dbt-core, and provide clearer logging of SQL parameters that differ by connection method ([#249](https:/dbt-labs/dbt-spark/issues/249), [#250](https:/dbt-labs/dbt-snowflake/pull/250))

## dbt-spark 1.0.0rc1 (November 10, 2021)

### Under the hood

- Remove official support for python 3.6, which is reaching end of life on December 23, 2021 ([dbt-core#4134](https:/dbt-labs/dbt-core/issues/4134), [#253](https:/dbt-labs/dbt-snowflake/pull/253))
- Add support for structured logging [#251](https:/dbt-labs/dbt-spark/pull/251)

### Under the hood
- Add support for structured logging [#251](https:/dbt-labs/dbt-spark/pull/251)
Expand Down
109 changes: 42 additions & 67 deletions dbt/include/spark/macros/materializations/seed.sql
Original file line number Diff line number Diff line change
@@ -1,40 +1,8 @@
{% macro spark__load_csv_rows(model, agate_table) %}
{% set batch_size = 1000 %}
{% set column_override = model['config'].get('column_types', {}) %}

{% set statements = [] %}

{% for chunk in agate_table.rows | batch(batch_size) %}
{% set bindings = [] %}

{% for row in chunk %}
{% do bindings.extend(row) %}
{% endfor %}

{% set sql %}
insert into {{ this.render() }} values
{% for row in chunk -%}
({%- for col_name in agate_table.column_names -%}
{%- set inferred_type = adapter.convert_type(agate_table, loop.index0) -%}
{%- set type = column_override.get(col_name, inferred_type) -%}
cast(%s as {{type}})
{%- if not loop.last%},{%- endif %}
{%- endfor -%})
{%- if not loop.last%},{%- endif %}
{%- endfor %}
{% endset %}

{% do adapter.add_query(sql, bindings=bindings, abridge_sql_log=True) %}

{% if loop.index0 == 0 %}
{% do statements.append(sql) %}
{% endif %}
{% endfor %}

{# Return SQL so we can render it out into the compiled files #}
{{ return(statements[0]) }}
{% macro spark__get_binding_char() %}
{{ return('?' if target.method == 'odbc' else '%s') }}
{% endmacro %}


{% macro spark__reset_csv_table(model, full_refresh, old_relation, agate_table) %}
{% if old_relation %}
{{ adapter.drop_relation(old_relation) }}
Expand All @@ -44,6 +12,45 @@
{% endmacro %}


{% macro spark__load_csv_rows(model, agate_table) %}

{% set batch_size = get_batch_size() %}
{% set column_override = model['config'].get('column_types', {}) %}

{% set statements = [] %}

{% for chunk in agate_table.rows | batch(batch_size) %}
{% set bindings = [] %}

{% for row in chunk %}
{% do bindings.extend(row) %}
{% endfor %}

{% set sql %}
insert into {{ this.render() }} values
{% for row in chunk -%}
({%- for col_name in agate_table.column_names -%}
{%- set inferred_type = adapter.convert_type(agate_table, loop.index0) -%}
{%- set type = column_override.get(col_name, inferred_type) -%}
cast({{ get_binding_char() }} as {{type}})
{%- if not loop.last%},{%- endif %}
{%- endfor -%})
{%- if not loop.last%},{%- endif %}
{%- endfor %}
{% endset %}

{% do adapter.add_query(sql, bindings=bindings, abridge_sql_log=True) %}

{% if loop.index0 == 0 %}
{% do statements.append(sql) %}
{% endif %}
{% endfor %}

{# Return SQL so we can render it out into the compiled files #}
{{ return(statements[0]) }}
{% endmacro %}


{% macro spark__create_csv_table(model, agate_table) %}
{%- set column_override = model['config'].get('column_types', {}) -%}
{%- set quote_seed_column = model['config'].get('quote_columns', None) -%}
Expand All @@ -70,35 +77,3 @@

{{ return(sql) }}
{% endmacro %}


{% materialization seed, adapter='spark' %}

{%- set identifier = model['alias'] -%}
{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(database=database, schema=schema, identifier=identifier,
type='table') -%}
{%- set agate_table = load_agate_table() -%}
{%- do store_result('agate_table', response='OK', agate_table=agate_table) -%}

{{ run_hooks(pre_hooks) }}

-- build model
{% set create_table_sql = reset_csv_table(model, full_refresh_mode, old_relation, agate_table) %}
{% set status = 'CREATE' %}
{% set num_rows = (agate_table.rows | length) %}
{% set sql = load_csv_rows(model, agate_table) %}

{% call noop_statement('main', status ~ ' ' ~ num_rows) %}
{{ create_table_sql }};
-- dbt seed --
{{ sql }}
{% endcall %}

{% do persist_docs(target_relation, model) %}

{{ run_hooks(post_hooks) }}

{{ return({'relations': [target_relation]}) }}

{% endmaterialization %}

0 comments on commit 1445d16

Please sign in to comment.