Skip to content

Commit

Permalink
Merging main into 1.0.latest (#264)
Browse files Browse the repository at this point in the history
* Refactor seed macros, clearer sql param logging (#250)

* Try refactoring seed macros

* Add changelog entry

* 187: Adding apache hudi support to dbt (#210)

* initial working version

* Rebased and resolve all the merge conflicts.

* Rebased and resolved merge conflicts.

* Removed hudi dep jar and used the released version via packages option

* Added insert overwrite unit tests for hudi

* Used unique_key as default value for hudi primaryKey option

* Updated changelog.md with this new update.

* Final round of testing and few minor fixes

* Fixed lint issues

* Fixed the integration tests

* Fixed the circle ci env to add hudi packages

* Updated hudi spark bundle to use scala 2.11

* Fixed Hudi incremental strategy integration tests and other integration tests

* Fixed the hudi hive sync hms integration test issues

* Added sql HMS config to fix the integration tests.

* Added hudi hive sync mode conf to CI

* Set the hms schema verification to false

* Removed the merge update columns hence its not supported.

* Passed the correct hiveconf to the circle ci build script

* Disabled few incremental tests for spark2 and reverted to spark2 config

* Added hudi configs to the circle ci build script

* Commented out the Hudi integration test until we have the hudi 0.10.0 version

* Fixed the macro which checks the table type.

* Disabled this model since hudi is not supported in databricks runtime, will be added later

* Update profile_template.yml for v1 (#247)

* Update profile_template.yml for v1

* PR feedback, fix indentation issues

* It was my intention to remove the square brackets

* Fixup changelog entry

* Merge main, update changelog

* Bump version to 1.0.0rc2 (#259)

* bumpversion 1.0.0rc2

* Update changelog

* Use pytest-dbt-adapter==0.6.0

* Corrected definition for set full_refresh_mode (#262)

* Replaced definition for set full_refresh_mode

* Updated changelog

* Edit changelog

Co-authored-by: Jeremy Cohen <[email protected]>

* `get_response` -> `AdapterResponse` (#265)

* Return AdapterResponse from get_response

* fix flake

Co-authored-by: Jeremy Cohen <[email protected]>
Co-authored-by: Vinoth Govindarajan <[email protected]>
Co-authored-by: Sindre Grindheim <[email protected]>
  • Loading branch information
4 people authored Dec 3, 2021
1 parent e239dfb commit fd2b699
Show file tree
Hide file tree
Showing 27 changed files with 372 additions and 117 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 1.0.0rc1
current_version = 1.0.0rc2
parse = (?P<major>\d+)
\.(?P<minor>\d+)
\.(?P<patch>\d+)
Expand Down
12 changes: 12 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,18 @@ jobs:
--conf spark.hadoop.javax.jdo.option.ConnectionUserName=dbt
--conf spark.hadoop.javax.jdo.option.ConnectionPassword=dbt
--conf spark.hadoop.javax.jdo.option.ConnectionDriverName=org.postgresql.Driver
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.jars.packages=org.apache.hudi:hudi-spark-bundle_2.11:0.9.0
--conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension
--conf spark.driver.userClassPathFirst=true
--conf spark.hadoop.datanucleus.autoCreateTables=true
--conf spark.hadoop.datanucleus.schema.autoCreateTables=true
--conf spark.hadoop.datanucleus.fixedDatastore=false
--conf spark.sql.hive.convertMetastoreParquet=false
--hiveconf hoodie.datasource.hive_sync.use_jdbc=false
--hiveconf hoodie.datasource.hive_sync.mode=hms
--hiveconf datanucleus.schema.autoCreateAll=true
--hiveconf hive.metastore.schema.verification=false
- image: postgres:9.6.17-alpine
environment:
Expand Down
21 changes: 18 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,28 @@
## dbt-spark 1.0.0 (Release TBD)

## dbt-spark 1.0.0rc1 (November 10, 2021)
### Fixes
- Incremental materialization corrected to respect `full_refresh` config, by using `should_full_refresh()` macro ([#260](https:/dbt-labs/dbt-spark/issues/260), [#262](https:/dbt-labs/dbt-spark/pull/262/))

### Contributors
- [@grindheim](https:/grindheim) ([#262](https:/dbt-labs/dbt-spark/pull/262/))

## dbt-spark 1.0.0rc2 (November 24, 2021)

### Features
- Add support for Apache Hudi (hudi file format) which supports incremental merge strategies ([#187](https:/dbt-labs/dbt-spark/issues/187), [#210](https:/dbt-labs/dbt-spark/pull/210))

### Under the hood
- Refactor seed macros: remove duplicated code from dbt-core, and provide clearer logging of SQL parameters that differ by connection method ([#249](https:/dbt-labs/dbt-spark/issues/249), [#250](https:/dbt-labs/dbt-snowflake/pull/250))
- Replace `sample_profiles.yml` with `profile_template.yml`, for use with new `dbt init` ([#247](https:/dbt-labs/dbt-spark/pull/247))

- Remove official support for python 3.6, which is reaching end of life on December 23, 2021 ([dbt-core#4134](https:/dbt-labs/dbt-core/issues/4134), [#253](https:/dbt-labs/dbt-snowflake/pull/253))
### Contributors
- [@vingov](https:/vingov) ([#210](https:/dbt-labs/dbt-spark/pull/210))

## dbt-spark 1.0.0rc1 (November 10, 2021)

### Under the hood
- Add support for structured logging [#251](https:/dbt-labs/dbt-spark/pull/251)
- Remove official support for python 3.6, which is reaching end of life on December 23, 2021 ([dbt-core#4134](https:/dbt-labs/dbt-core/issues/4134), [#253](https:/dbt-labs/dbt-snowflake/pull/253))
- Add support for structured logging ([#251](https:/dbt-labs/dbt-spark/pull/251))

## dbt-spark 0.21.1 (Release TBD)

Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/spark/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "1.0.0rc1"
version = "1.0.0rc2"
10 changes: 7 additions & 3 deletions dbt/adapters/spark/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import dbt.exceptions
from dbt.adapters.base import Credentials
from dbt.adapters.sql import SQLConnectionManager
from dbt.contracts.connection import ConnectionState
from dbt.contracts.connection import ConnectionState, AdapterResponse
from dbt.events import AdapterLogger
from dbt.utils import DECIMALS
from dbt.adapters.spark import __version__
Expand Down Expand Up @@ -304,8 +304,12 @@ def cancel(self, connection):
connection.handle.cancel()

@classmethod
def get_response(cls, cursor):
return 'OK'
def get_response(cls, cursor) -> AdapterResponse:
# https:/dbt-labs/dbt-spark/issues/142
message = 'OK'
return AdapterResponse(
_message=message
)

# No transactions on Spark....
def add_begin_query(self, *args, **kwargs):
Expand Down
13 changes: 13 additions & 0 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ class SparkAdapter(SQLAdapter):
INFORMATION_OWNER_REGEX = re.compile(r"^Owner: (.*)$", re.MULTILINE)
INFORMATION_STATISTICS_REGEX = re.compile(
r"^Statistics: (.*)$", re.MULTILINE)
HUDI_METADATA_COLUMNS = [
'_hoodie_commit_time',
'_hoodie_commit_seqno',
'_hoodie_record_key',
'_hoodie_partition_path',
'_hoodie_file_name'
]

Relation = SparkRelation
Column = SparkColumn
Expand Down Expand Up @@ -145,12 +152,14 @@ def list_relations_without_caching(
rel_type = RelationType.View \
if 'Type: VIEW' in information else RelationType.Table
is_delta = 'Provider: delta' in information
is_hudi = 'Provider: hudi' in information
relation = self.Relation.create(
schema=_schema,
identifier=name,
type=rel_type,
information=information,
is_delta=is_delta,
is_hudi=is_hudi,
)
relations.append(relation)

Expand Down Expand Up @@ -224,6 +233,10 @@ def get_columns_in_relation(self, relation: Relation) -> List[SparkColumn]:
# which would execute 'describe extended tablename' query
rows: List[agate.Row] = super().get_columns_in_relation(relation)
columns = self.parse_describe_extended(relation, rows)

# strip hudi metadata columns.
columns = [x for x in columns
if x.name not in self.HUDI_METADATA_COLUMNS]
return columns

def parse_columns_from_information(
Expand Down
1 change: 1 addition & 0 deletions dbt/adapters/spark/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class SparkRelation(BaseRelation):
include_policy: SparkIncludePolicy = SparkIncludePolicy()
quote_character: str = '`'
is_delta: Optional[bool] = None
is_hudi: Optional[bool] = None
information: str = None

def __post_init__(self):
Expand Down
13 changes: 12 additions & 1 deletion dbt/include/spark/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,17 @@

{% macro options_clause() -%}
{%- set options = config.get('options') -%}
{%- if config.get('file_format') == 'hudi' -%}
{%- set unique_key = config.get('unique_key') -%}
{%- if unique_key is not none and options is none -%}
{%- set options = {'primaryKey': config.get('unique_key')} -%}
{%- elif unique_key is not none and options is not none and 'primaryKey' not in options -%}
{%- set _ = options.update({'primaryKey': config.get('unique_key')}) -%}
{%- elif options is not none and 'primaryKey' in options and options['primaryKey'] != unique_key -%}
{{ exceptions.raise_compiler_error("unique_key and options('primaryKey') should be the same column(s).") }}
{%- endif %}
{%- endif %}

{%- if options is not none %}
options (
{%- for option in options -%}
Expand Down Expand Up @@ -181,7 +192,7 @@
{% endmacro %}

{% macro spark__alter_column_comment(relation, column_dict) %}
{% if config.get('file_format', validator=validation.any[basestring]) == 'delta' %}
{% if config.get('file_format', validator=validation.any[basestring]) in ['delta', 'hudi'] %}
{% for column_name in column_dict %}
{% set comment = column_dict[column_name]['description'] %}
{% set escaped_comment = comment | replace('\'', '\\\'') %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
{%- set unique_key = config.get('unique_key', none) -%}
{%- set partition_by = config.get('partition_by', none) -%}

{%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%}
{%- set full_refresh_mode = (should_full_refresh()) -%}

{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{% macro dbt_spark_validate_get_file_format(raw_file_format) %}
{#-- Validate the file format #}

{% set accepted_formats = ['text', 'csv', 'json', 'jdbc', 'parquet', 'orc', 'hive', 'delta', 'libsvm'] %}
{% set accepted_formats = ['text', 'csv', 'json', 'jdbc', 'parquet', 'orc', 'hive', 'delta', 'libsvm', 'hudi'] %}

{% set invalid_file_format_msg -%}
Invalid file format provided: {{ raw_file_format }}
Expand All @@ -26,7 +26,7 @@

{% set invalid_merge_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
You can only choose this strategy when file_format is set to 'delta'
You can only choose this strategy when file_format is set to 'delta' or 'hudi'
{%- endset %}

{% set invalid_insert_overwrite_delta_msg -%}
Expand All @@ -44,7 +44,7 @@
{% if raw_strategy not in ['append', 'merge', 'insert_overwrite'] %}
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
{%-else %}
{% if raw_strategy == 'merge' and file_format != 'delta' %}
{% if raw_strategy == 'merge' and file_format not in ['delta', 'hudi'] %}
{% do exceptions.raise_compiler_error(invalid_merge_msg) %}
{% endif %}
{% if raw_strategy == 'insert_overwrite' and file_format == 'delta' %}
Expand Down
109 changes: 42 additions & 67 deletions dbt/include/spark/macros/materializations/seed.sql
Original file line number Diff line number Diff line change
@@ -1,40 +1,8 @@
{% macro spark__load_csv_rows(model, agate_table) %}
{% set batch_size = 1000 %}
{% set column_override = model['config'].get('column_types', {}) %}

{% set statements = [] %}

{% for chunk in agate_table.rows | batch(batch_size) %}
{% set bindings = [] %}

{% for row in chunk %}
{% do bindings.extend(row) %}
{% endfor %}

{% set sql %}
insert into {{ this.render() }} values
{% for row in chunk -%}
({%- for col_name in agate_table.column_names -%}
{%- set inferred_type = adapter.convert_type(agate_table, loop.index0) -%}
{%- set type = column_override.get(col_name, inferred_type) -%}
cast(%s as {{type}})
{%- if not loop.last%},{%- endif %}
{%- endfor -%})
{%- if not loop.last%},{%- endif %}
{%- endfor %}
{% endset %}

{% do adapter.add_query(sql, bindings=bindings, abridge_sql_log=True) %}

{% if loop.index0 == 0 %}
{% do statements.append(sql) %}
{% endif %}
{% endfor %}

{# Return SQL so we can render it out into the compiled files #}
{{ return(statements[0]) }}
{% macro spark__get_binding_char() %}
{{ return('?' if target.method == 'odbc' else '%s') }}
{% endmacro %}


{% macro spark__reset_csv_table(model, full_refresh, old_relation, agate_table) %}
{% if old_relation %}
{{ adapter.drop_relation(old_relation) }}
Expand All @@ -44,6 +12,45 @@
{% endmacro %}


{% macro spark__load_csv_rows(model, agate_table) %}

{% set batch_size = get_batch_size() %}
{% set column_override = model['config'].get('column_types', {}) %}

{% set statements = [] %}

{% for chunk in agate_table.rows | batch(batch_size) %}
{% set bindings = [] %}

{% for row in chunk %}
{% do bindings.extend(row) %}
{% endfor %}

{% set sql %}
insert into {{ this.render() }} values
{% for row in chunk -%}
({%- for col_name in agate_table.column_names -%}
{%- set inferred_type = adapter.convert_type(agate_table, loop.index0) -%}
{%- set type = column_override.get(col_name, inferred_type) -%}
cast({{ get_binding_char() }} as {{type}})
{%- if not loop.last%},{%- endif %}
{%- endfor -%})
{%- if not loop.last%},{%- endif %}
{%- endfor %}
{% endset %}

{% do adapter.add_query(sql, bindings=bindings, abridge_sql_log=True) %}

{% if loop.index0 == 0 %}
{% do statements.append(sql) %}
{% endif %}
{% endfor %}

{# Return SQL so we can render it out into the compiled files #}
{{ return(statements[0]) }}
{% endmacro %}


{% macro spark__create_csv_table(model, agate_table) %}
{%- set column_override = model['config'].get('column_types', {}) -%}
{%- set quote_seed_column = model['config'].get('quote_columns', None) -%}
Expand All @@ -70,35 +77,3 @@

{{ return(sql) }}
{% endmacro %}


{% materialization seed, adapter='spark' %}

{%- set identifier = model['alias'] -%}
{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(database=database, schema=schema, identifier=identifier,
type='table') -%}
{%- set agate_table = load_agate_table() -%}
{%- do store_result('agate_table', response='OK', agate_table=agate_table) -%}

{{ run_hooks(pre_hooks) }}

-- build model
{% set create_table_sql = reset_csv_table(model, full_refresh_mode, old_relation, agate_table) %}
{% set status = 'CREATE' %}
{% set num_rows = (agate_table.rows | length) %}
{% set sql = load_csv_rows(model, agate_table) %}

{% call noop_statement('main', status ~ ' ' ~ num_rows) %}
{{ create_table_sql }};
-- dbt seed --
{{ sql }}
{% endcall %}

{% do persist_docs(target_relation, model) %}

{{ run_hooks(post_hooks) }}

{{ return({'relations': [target_relation]}) }}

{% endmaterialization %}
8 changes: 4 additions & 4 deletions dbt/include/spark/macros/materializations/snapshot.sql
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,18 @@
identifier=target_table,
type='table') -%}

{%- if file_format != 'delta' -%}
{%- if file_format not in ['delta', 'hudi'] -%}
{% set invalid_format_msg -%}
Invalid file format: {{ file_format }}
Snapshot functionality requires file_format be set to 'delta'
Snapshot functionality requires file_format be set to 'delta' or 'hudi'
{%- endset %}
{% do exceptions.raise_compiler_error(invalid_format_msg) %}
{% endif %}

{%- if target_relation_exists -%}
{%- if not target_relation.is_delta -%}
{%- if not target_relation.is_delta and not target_relation.is_hudi -%}
{% set invalid_format_msg -%}
The existing table {{ model.schema }}.{{ target_table }} is in another format than 'delta'
The existing table {{ model.schema }}.{{ target_table }} is in another format than 'delta' or 'hudi'
{%- endset %}
{% do exceptions.raise_compiler_error(invalid_format_msg) %}
{% endif %}
Expand Down
42 changes: 42 additions & 0 deletions dbt/include/spark/profile_template.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
fixed:
type: spark
prompts:
host:
hint: yourorg.sparkhost.com
_choose_authentication_method:
odbc:
_fixed_method: odbc
driver:
hint: 'path/to/driver'
_choose_endpoint_or_cluster:
endpoint:
endpoint:
hint: 'endpoint ID'
cluster:
cluster:
hint: 'cluster ID'
token:
hint: 'abc123'
hide_input: true
http:
_fixed_method: http
token:
hint: 'abc123'
hide_input: true
connect_timeout:
default: 10
type: 'int'
connect_retries:
default: 0
type: 'int'
thrift:
_fixed_method: thrift
port:
default: 443
type: 'int'
schema:
hint: 'default schema that dbt will build objects in'
threads:
hint: '1 or more'
type: 'int'
default: 1
Loading

0 comments on commit fd2b699

Please sign in to comment.