From fd2b6998dbd00c5a6fb73ebf8e6215bc279aa339 Mon Sep 17 00:00:00 2001 From: leahwicz <60146280+leahwicz@users.noreply.github.com> Date: Fri, 3 Dec 2021 15:07:04 -0500 Subject: [PATCH] Merging `main` into `1.0.latest` (#264) * Refactor seed macros, clearer sql param logging (#250) * Try refactoring seed macros * Add changelog entry * 187: Adding apache hudi support to dbt (#210) * initial working version * Rebased and resolve all the merge conflicts. * Rebased and resolved merge conflicts. * Removed hudi dep jar and used the released version via packages option * Added insert overwrite unit tests for hudi * Used unique_key as default value for hudi primaryKey option * Updated changelog.md with this new update. * Final round of testing and few minor fixes * Fixed lint issues * Fixed the integration tests * Fixed the circle ci env to add hudi packages * Updated hudi spark bundle to use scala 2.11 * Fixed Hudi incremental strategy integration tests and other integration tests * Fixed the hudi hive sync hms integration test issues * Added sql HMS config to fix the integration tests. * Added hudi hive sync mode conf to CI * Set the hms schema verification to false * Removed the merge update columns hence its not supported. * Passed the correct hiveconf to the circle ci build script * Disabled few incremental tests for spark2 and reverted to spark2 config * Added hudi configs to the circle ci build script * Commented out the Hudi integration test until we have the hudi 0.10.0 version * Fixed the macro which checks the table type. * Disabled this model since hudi is not supported in databricks runtime, will be added later * Update profile_template.yml for v1 (#247) * Update profile_template.yml for v1 * PR feedback, fix indentation issues * It was my intention to remove the square brackets * Fixup changelog entry * Merge main, update changelog * Bump version to 1.0.0rc2 (#259) * bumpversion 1.0.0rc2 * Update changelog * Use pytest-dbt-adapter==0.6.0 * Corrected definition for set full_refresh_mode (#262) * Replaced definition for set full_refresh_mode * Updated changelog * Edit changelog Co-authored-by: Jeremy Cohen * `get_response` -> `AdapterResponse` (#265) * Return AdapterResponse from get_response * fix flake Co-authored-by: Jeremy Cohen Co-authored-by: Vinoth Govindarajan Co-authored-by: Sindre Grindheim --- .bumpversion.cfg | 2 +- .circleci/config.yml | 12 ++ CHANGELOG.md | 21 +++- dbt/adapters/spark/__version__.py | 2 +- dbt/adapters/spark/connections.py | 10 +- dbt/adapters/spark/impl.py | 13 +++ dbt/adapters/spark/relation.py | 1 + dbt/include/spark/macros/adapters.sql | 13 ++- .../incremental/incremental.sql | 2 +- .../materializations/incremental/validate.sql | 6 +- .../spark/macros/materializations/seed.sql | 109 +++++++----------- .../macros/materializations/snapshot.sql | 8 +- dbt/include/spark/profile_template.yml | 42 +++++++ dbt/include/spark/sample_profiles.yml | 31 ----- dev_requirements.txt | 3 +- docker-compose.yml | 1 + docker/hive-site.xml | 4 + docker/spark-defaults.conf | 7 ++ .../models_hudi/append.sql | 19 +++ .../insert_overwrite_no_partitions.sql | 19 +++ .../insert_overwrite_partitions.sql | 20 ++++ .../models_hudi/merge_no_key.sql | 19 +++ .../models_hudi/merge_unique_key.sql | 20 ++++ .../models_hudi/merge_update_columns.sql | 22 ++++ .../test_incremental_strategies.py | 24 ++++ .../persist_docs/models/schema.yml | 24 ++++ tests/unit/test_macros.py | 35 ++++++ 27 files changed, 372 insertions(+), 117 deletions(-) create mode 100644 dbt/include/spark/profile_template.yml delete mode 100644 dbt/include/spark/sample_profiles.yml create mode 100644 docker/spark-defaults.conf create mode 100644 tests/integration/incremental_strategies/models_hudi/append.sql create mode 100644 tests/integration/incremental_strategies/models_hudi/insert_overwrite_no_partitions.sql create mode 100644 tests/integration/incremental_strategies/models_hudi/insert_overwrite_partitions.sql create mode 100644 tests/integration/incremental_strategies/models_hudi/merge_no_key.sql create mode 100644 tests/integration/incremental_strategies/models_hudi/merge_unique_key.sql create mode 100644 tests/integration/incremental_strategies/models_hudi/merge_update_columns.sql diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 26af54b05..b41ae92c5 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 1.0.0rc1 +current_version = 1.0.0rc2 parse = (?P\d+) \.(?P\d+) \.(?P\d+) diff --git a/.circleci/config.yml b/.circleci/config.yml index 99154fb64..4921fac98 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -25,6 +25,18 @@ jobs: --conf spark.hadoop.javax.jdo.option.ConnectionUserName=dbt --conf spark.hadoop.javax.jdo.option.ConnectionPassword=dbt --conf spark.hadoop.javax.jdo.option.ConnectionDriverName=org.postgresql.Driver + --conf spark.serializer=org.apache.spark.serializer.KryoSerializer + --conf spark.jars.packages=org.apache.hudi:hudi-spark-bundle_2.11:0.9.0 + --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension + --conf spark.driver.userClassPathFirst=true + --conf spark.hadoop.datanucleus.autoCreateTables=true + --conf spark.hadoop.datanucleus.schema.autoCreateTables=true + --conf spark.hadoop.datanucleus.fixedDatastore=false + --conf spark.sql.hive.convertMetastoreParquet=false + --hiveconf hoodie.datasource.hive_sync.use_jdbc=false + --hiveconf hoodie.datasource.hive_sync.mode=hms + --hiveconf datanucleus.schema.autoCreateAll=true + --hiveconf hive.metastore.schema.verification=false - image: postgres:9.6.17-alpine environment: diff --git a/CHANGELOG.md b/CHANGELOG.md index ff6f49082..63d245797 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,13 +1,28 @@ ## dbt-spark 1.0.0 (Release TBD) -## dbt-spark 1.0.0rc1 (November 10, 2021) +### Fixes +- Incremental materialization corrected to respect `full_refresh` config, by using `should_full_refresh()` macro ([#260](https://github.com/dbt-labs/dbt-spark/issues/260), [#262](https://github.com/dbt-labs/dbt-spark/pull/262/)) + +### Contributors +- [@grindheim](https://github.com/grindheim) ([#262](https://github.com/dbt-labs/dbt-spark/pull/262/)) + +## dbt-spark 1.0.0rc2 (November 24, 2021) + +### Features +- Add support for Apache Hudi (hudi file format) which supports incremental merge strategies ([#187](https://github.com/dbt-labs/dbt-spark/issues/187), [#210](https://github.com/dbt-labs/dbt-spark/pull/210)) ### Under the hood +- Refactor seed macros: remove duplicated code from dbt-core, and provide clearer logging of SQL parameters that differ by connection method ([#249](https://github.com/dbt-labs/dbt-spark/issues/249), [#250](https://github.com/dbt-labs/dbt-snowflake/pull/250)) +- Replace `sample_profiles.yml` with `profile_template.yml`, for use with new `dbt init` ([#247](https://github.com/dbt-labs/dbt-spark/pull/247)) -- Remove official support for python 3.6, which is reaching end of life on December 23, 2021 ([dbt-core#4134](https://github.com/dbt-labs/dbt-core/issues/4134), [#253](https://github.com/dbt-labs/dbt-snowflake/pull/253)) +### Contributors +- [@vingov](https://github.com/vingov) ([#210](https://github.com/dbt-labs/dbt-spark/pull/210)) + +## dbt-spark 1.0.0rc1 (November 10, 2021) ### Under the hood -- Add support for structured logging [#251](https://github.com/dbt-labs/dbt-spark/pull/251) +- Remove official support for python 3.6, which is reaching end of life on December 23, 2021 ([dbt-core#4134](https://github.com/dbt-labs/dbt-core/issues/4134), [#253](https://github.com/dbt-labs/dbt-snowflake/pull/253)) +- Add support for structured logging ([#251](https://github.com/dbt-labs/dbt-spark/pull/251)) ## dbt-spark 0.21.1 (Release TBD) diff --git a/dbt/adapters/spark/__version__.py b/dbt/adapters/spark/__version__.py index b25b76735..5f5b4bad1 100644 --- a/dbt/adapters/spark/__version__.py +++ b/dbt/adapters/spark/__version__.py @@ -1 +1 @@ -version = "1.0.0rc1" +version = "1.0.0rc2" diff --git a/dbt/adapters/spark/connections.py b/dbt/adapters/spark/connections.py index 1848a72a8..2a055bf27 100644 --- a/dbt/adapters/spark/connections.py +++ b/dbt/adapters/spark/connections.py @@ -3,7 +3,7 @@ import dbt.exceptions from dbt.adapters.base import Credentials from dbt.adapters.sql import SQLConnectionManager -from dbt.contracts.connection import ConnectionState +from dbt.contracts.connection import ConnectionState, AdapterResponse from dbt.events import AdapterLogger from dbt.utils import DECIMALS from dbt.adapters.spark import __version__ @@ -304,8 +304,12 @@ def cancel(self, connection): connection.handle.cancel() @classmethod - def get_response(cls, cursor): - return 'OK' + def get_response(cls, cursor) -> AdapterResponse: + # https://github.com/dbt-labs/dbt-spark/issues/142 + message = 'OK' + return AdapterResponse( + _message=message + ) # No transactions on Spark.... def add_begin_query(self, *args, **kwargs): diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 0a8e4c49d..74845422b 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -70,6 +70,13 @@ class SparkAdapter(SQLAdapter): INFORMATION_OWNER_REGEX = re.compile(r"^Owner: (.*)$", re.MULTILINE) INFORMATION_STATISTICS_REGEX = re.compile( r"^Statistics: (.*)$", re.MULTILINE) + HUDI_METADATA_COLUMNS = [ + '_hoodie_commit_time', + '_hoodie_commit_seqno', + '_hoodie_record_key', + '_hoodie_partition_path', + '_hoodie_file_name' + ] Relation = SparkRelation Column = SparkColumn @@ -145,12 +152,14 @@ def list_relations_without_caching( rel_type = RelationType.View \ if 'Type: VIEW' in information else RelationType.Table is_delta = 'Provider: delta' in information + is_hudi = 'Provider: hudi' in information relation = self.Relation.create( schema=_schema, identifier=name, type=rel_type, information=information, is_delta=is_delta, + is_hudi=is_hudi, ) relations.append(relation) @@ -224,6 +233,10 @@ def get_columns_in_relation(self, relation: Relation) -> List[SparkColumn]: # which would execute 'describe extended tablename' query rows: List[agate.Row] = super().get_columns_in_relation(relation) columns = self.parse_describe_extended(relation, rows) + + # strip hudi metadata columns. + columns = [x for x in columns + if x.name not in self.HUDI_METADATA_COLUMNS] return columns def parse_columns_from_information( diff --git a/dbt/adapters/spark/relation.py b/dbt/adapters/spark/relation.py index 5fc096550..043cabfa0 100644 --- a/dbt/adapters/spark/relation.py +++ b/dbt/adapters/spark/relation.py @@ -26,6 +26,7 @@ class SparkRelation(BaseRelation): include_policy: SparkIncludePolicy = SparkIncludePolicy() quote_character: str = '`' is_delta: Optional[bool] = None + is_hudi: Optional[bool] = None information: str = None def __post_init__(self): diff --git a/dbt/include/spark/macros/adapters.sql b/dbt/include/spark/macros/adapters.sql index ee59b8131..2542af811 100644 --- a/dbt/include/spark/macros/adapters.sql +++ b/dbt/include/spark/macros/adapters.sql @@ -15,6 +15,17 @@ {% macro options_clause() -%} {%- set options = config.get('options') -%} + {%- if config.get('file_format') == 'hudi' -%} + {%- set unique_key = config.get('unique_key') -%} + {%- if unique_key is not none and options is none -%} + {%- set options = {'primaryKey': config.get('unique_key')} -%} + {%- elif unique_key is not none and options is not none and 'primaryKey' not in options -%} + {%- set _ = options.update({'primaryKey': config.get('unique_key')}) -%} + {%- elif options is not none and 'primaryKey' in options and options['primaryKey'] != unique_key -%} + {{ exceptions.raise_compiler_error("unique_key and options('primaryKey') should be the same column(s).") }} + {%- endif %} + {%- endif %} + {%- if options is not none %} options ( {%- for option in options -%} @@ -181,7 +192,7 @@ {% endmacro %} {% macro spark__alter_column_comment(relation, column_dict) %} - {% if config.get('file_format', validator=validation.any[basestring]) == 'delta' %} + {% if config.get('file_format', validator=validation.any[basestring]) in ['delta', 'hudi'] %} {% for column_name in column_dict %} {% set comment = column_dict[column_name]['description'] %} {% set escaped_comment = comment | replace('\'', '\\\'') %} diff --git a/dbt/include/spark/macros/materializations/incremental/incremental.sql b/dbt/include/spark/macros/materializations/incremental/incremental.sql index 72b4d2516..d0b6e89ba 100644 --- a/dbt/include/spark/macros/materializations/incremental/incremental.sql +++ b/dbt/include/spark/macros/materializations/incremental/incremental.sql @@ -10,7 +10,7 @@ {%- set unique_key = config.get('unique_key', none) -%} {%- set partition_by = config.get('partition_by', none) -%} - {%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%} + {%- set full_refresh_mode = (should_full_refresh()) -%} {% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %} diff --git a/dbt/include/spark/macros/materializations/incremental/validate.sql b/dbt/include/spark/macros/materializations/incremental/validate.sql index 400a2eee5..3e9de359b 100644 --- a/dbt/include/spark/macros/materializations/incremental/validate.sql +++ b/dbt/include/spark/macros/materializations/incremental/validate.sql @@ -1,7 +1,7 @@ {% macro dbt_spark_validate_get_file_format(raw_file_format) %} {#-- Validate the file format #} - {% set accepted_formats = ['text', 'csv', 'json', 'jdbc', 'parquet', 'orc', 'hive', 'delta', 'libsvm'] %} + {% set accepted_formats = ['text', 'csv', 'json', 'jdbc', 'parquet', 'orc', 'hive', 'delta', 'libsvm', 'hudi'] %} {% set invalid_file_format_msg -%} Invalid file format provided: {{ raw_file_format }} @@ -26,7 +26,7 @@ {% set invalid_merge_msg -%} Invalid incremental strategy provided: {{ raw_strategy }} - You can only choose this strategy when file_format is set to 'delta' + You can only choose this strategy when file_format is set to 'delta' or 'hudi' {%- endset %} {% set invalid_insert_overwrite_delta_msg -%} @@ -44,7 +44,7 @@ {% if raw_strategy not in ['append', 'merge', 'insert_overwrite'] %} {% do exceptions.raise_compiler_error(invalid_strategy_msg) %} {%-else %} - {% if raw_strategy == 'merge' and file_format != 'delta' %} + {% if raw_strategy == 'merge' and file_format not in ['delta', 'hudi'] %} {% do exceptions.raise_compiler_error(invalid_merge_msg) %} {% endif %} {% if raw_strategy == 'insert_overwrite' and file_format == 'delta' %} diff --git a/dbt/include/spark/macros/materializations/seed.sql b/dbt/include/spark/macros/materializations/seed.sql index 536e6447b..196479cb0 100644 --- a/dbt/include/spark/macros/materializations/seed.sql +++ b/dbt/include/spark/macros/materializations/seed.sql @@ -1,40 +1,8 @@ -{% macro spark__load_csv_rows(model, agate_table) %} - {% set batch_size = 1000 %} - {% set column_override = model['config'].get('column_types', {}) %} - - {% set statements = [] %} - - {% for chunk in agate_table.rows | batch(batch_size) %} - {% set bindings = [] %} - - {% for row in chunk %} - {% do bindings.extend(row) %} - {% endfor %} - - {% set sql %} - insert into {{ this.render() }} values - {% for row in chunk -%} - ({%- for col_name in agate_table.column_names -%} - {%- set inferred_type = adapter.convert_type(agate_table, loop.index0) -%} - {%- set type = column_override.get(col_name, inferred_type) -%} - cast(%s as {{type}}) - {%- if not loop.last%},{%- endif %} - {%- endfor -%}) - {%- if not loop.last%},{%- endif %} - {%- endfor %} - {% endset %} - - {% do adapter.add_query(sql, bindings=bindings, abridge_sql_log=True) %} - - {% if loop.index0 == 0 %} - {% do statements.append(sql) %} - {% endif %} - {% endfor %} - - {# Return SQL so we can render it out into the compiled files #} - {{ return(statements[0]) }} +{% macro spark__get_binding_char() %} + {{ return('?' if target.method == 'odbc' else '%s') }} {% endmacro %} + {% macro spark__reset_csv_table(model, full_refresh, old_relation, agate_table) %} {% if old_relation %} {{ adapter.drop_relation(old_relation) }} @@ -44,6 +12,45 @@ {% endmacro %} +{% macro spark__load_csv_rows(model, agate_table) %} + + {% set batch_size = get_batch_size() %} + {% set column_override = model['config'].get('column_types', {}) %} + + {% set statements = [] %} + + {% for chunk in agate_table.rows | batch(batch_size) %} + {% set bindings = [] %} + + {% for row in chunk %} + {% do bindings.extend(row) %} + {% endfor %} + + {% set sql %} + insert into {{ this.render() }} values + {% for row in chunk -%} + ({%- for col_name in agate_table.column_names -%} + {%- set inferred_type = adapter.convert_type(agate_table, loop.index0) -%} + {%- set type = column_override.get(col_name, inferred_type) -%} + cast({{ get_binding_char() }} as {{type}}) + {%- if not loop.last%},{%- endif %} + {%- endfor -%}) + {%- if not loop.last%},{%- endif %} + {%- endfor %} + {% endset %} + + {% do adapter.add_query(sql, bindings=bindings, abridge_sql_log=True) %} + + {% if loop.index0 == 0 %} + {% do statements.append(sql) %} + {% endif %} + {% endfor %} + + {# Return SQL so we can render it out into the compiled files #} + {{ return(statements[0]) }} +{% endmacro %} + + {% macro spark__create_csv_table(model, agate_table) %} {%- set column_override = model['config'].get('column_types', {}) -%} {%- set quote_seed_column = model['config'].get('quote_columns', None) -%} @@ -70,35 +77,3 @@ {{ return(sql) }} {% endmacro %} - - -{% materialization seed, adapter='spark' %} - - {%- set identifier = model['alias'] -%} - {%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%} - {%- set target_relation = api.Relation.create(database=database, schema=schema, identifier=identifier, - type='table') -%} - {%- set agate_table = load_agate_table() -%} - {%- do store_result('agate_table', response='OK', agate_table=agate_table) -%} - - {{ run_hooks(pre_hooks) }} - - -- build model - {% set create_table_sql = reset_csv_table(model, full_refresh_mode, old_relation, agate_table) %} - {% set status = 'CREATE' %} - {% set num_rows = (agate_table.rows | length) %} - {% set sql = load_csv_rows(model, agate_table) %} - - {% call noop_statement('main', status ~ ' ' ~ num_rows) %} - {{ create_table_sql }}; - -- dbt seed -- - {{ sql }} - {% endcall %} - - {% do persist_docs(target_relation, model) %} - - {{ run_hooks(post_hooks) }} - - {{ return({'relations': [target_relation]}) }} - -{% endmaterialization %} diff --git a/dbt/include/spark/macros/materializations/snapshot.sql b/dbt/include/spark/macros/materializations/snapshot.sql index 6dad51a02..82d186ce2 100644 --- a/dbt/include/spark/macros/materializations/snapshot.sql +++ b/dbt/include/spark/macros/materializations/snapshot.sql @@ -82,18 +82,18 @@ identifier=target_table, type='table') -%} - {%- if file_format != 'delta' -%} + {%- if file_format not in ['delta', 'hudi'] -%} {% set invalid_format_msg -%} Invalid file format: {{ file_format }} - Snapshot functionality requires file_format be set to 'delta' + Snapshot functionality requires file_format be set to 'delta' or 'hudi' {%- endset %} {% do exceptions.raise_compiler_error(invalid_format_msg) %} {% endif %} {%- if target_relation_exists -%} - {%- if not target_relation.is_delta -%} + {%- if not target_relation.is_delta and not target_relation.is_hudi -%} {% set invalid_format_msg -%} - The existing table {{ model.schema }}.{{ target_table }} is in another format than 'delta' + The existing table {{ model.schema }}.{{ target_table }} is in another format than 'delta' or 'hudi' {%- endset %} {% do exceptions.raise_compiler_error(invalid_format_msg) %} {% endif %} diff --git a/dbt/include/spark/profile_template.yml b/dbt/include/spark/profile_template.yml new file mode 100644 index 000000000..192b9385f --- /dev/null +++ b/dbt/include/spark/profile_template.yml @@ -0,0 +1,42 @@ +fixed: + type: spark +prompts: + host: + hint: yourorg.sparkhost.com + _choose_authentication_method: + odbc: + _fixed_method: odbc + driver: + hint: 'path/to/driver' + _choose_endpoint_or_cluster: + endpoint: + endpoint: + hint: 'endpoint ID' + cluster: + cluster: + hint: 'cluster ID' + token: + hint: 'abc123' + hide_input: true + http: + _fixed_method: http + token: + hint: 'abc123' + hide_input: true + connect_timeout: + default: 10 + type: 'int' + connect_retries: + default: 0 + type: 'int' + thrift: + _fixed_method: thrift + port: + default: 443 + type: 'int' + schema: + hint: 'default schema that dbt will build objects in' + threads: + hint: '1 or more' + type: 'int' + default: 1 diff --git a/dbt/include/spark/sample_profiles.yml b/dbt/include/spark/sample_profiles.yml deleted file mode 100644 index b1cff2731..000000000 --- a/dbt/include/spark/sample_profiles.yml +++ /dev/null @@ -1,31 +0,0 @@ -default: - outputs: - - # Use this if connecting to a hosted spark (e.g. Databricks) - dev: - type: spark - method: odbc - driver: [path/to/driver] - schema: [schema_name] - host: [yourorg.sparkhost.com] - organization: [organization id] # Azure Databricks only - token: [abc123] - - # one of: - endpoint: [endpoint id] - cluster: [cluster id] - - # optional - port: [port] # default 443 - user: [user] - - # Use this if connecting to Dockerized spark - prod: - type: spark - method: thrift - schema: [dev_schema] - host: [host] - port: [port] - user: [prod_user] - - target: dev diff --git a/dev_requirements.txt b/dev_requirements.txt index a44d72a4e..9b371f9c6 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -15,7 +15,6 @@ flaky>=3.5.3,<4 pytest-csv # Test requirements -#pytest-dbt-adapter==0.5.1 -git+https://github.com/dbt-labs/dbt-adapter-tests.git#egg=pytest-dbt-adapter +pytest-dbt-adapter==0.6.0 sasl==0.2.1 thrift_sasl==0.4.1 diff --git a/docker-compose.yml b/docker-compose.yml index 869e4ecd2..8054dfd75 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,6 +14,7 @@ services: volumes: - ./.spark-warehouse/:/spark-warehouse/ - ./docker/hive-site.xml:/usr/spark/conf/hive-site.xml + - ./docker/spark-defaults.conf:/usr/spark/conf/spark-defaults.conf environment: - WAIT_FOR=dbt-hive-metastore:5432 diff --git a/docker/hive-site.xml b/docker/hive-site.xml index a92e87b76..457d04f31 100644 --- a/docker/hive-site.xml +++ b/docker/hive-site.xml @@ -39,4 +39,8 @@ dbt + + hive.metastore.schema.verification + false + diff --git a/docker/spark-defaults.conf b/docker/spark-defaults.conf new file mode 100644 index 000000000..48a0501c2 --- /dev/null +++ b/docker/spark-defaults.conf @@ -0,0 +1,7 @@ +spark.hadoop.datanucleus.autoCreateTables true +spark.hadoop.datanucleus.schema.autoCreateTables true +spark.hadoop.datanucleus.fixedDatastore false +spark.serializer org.apache.spark.serializer.KryoSerializer +spark.jars.packages org.apache.hudi:hudi-spark3-bundle_2.12:0.9.0 +spark.sql.extensions org.apache.spark.sql.hudi.HoodieSparkSessionExtension +spark.driver.userClassPathFirst true diff --git a/tests/integration/incremental_strategies/models_hudi/append.sql b/tests/integration/incremental_strategies/models_hudi/append.sql new file mode 100644 index 000000000..9be27bec3 --- /dev/null +++ b/tests/integration/incremental_strategies/models_hudi/append.sql @@ -0,0 +1,19 @@ +{{ config( + materialized = 'incremental', + incremental_strategy = 'append', + file_format = 'hudi', +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg +union all +select cast(2 as bigint) as id, 'goodbye' as msg + +{% else %} + +select cast(2 as bigint) as id, 'yo' as msg +union all +select cast(3 as bigint) as id, 'anyway' as msg + +{% endif %} diff --git a/tests/integration/incremental_strategies/models_hudi/insert_overwrite_no_partitions.sql b/tests/integration/incremental_strategies/models_hudi/insert_overwrite_no_partitions.sql new file mode 100644 index 000000000..081374089 --- /dev/null +++ b/tests/integration/incremental_strategies/models_hudi/insert_overwrite_no_partitions.sql @@ -0,0 +1,19 @@ +{{ config( + materialized = 'incremental', + incremental_strategy = 'insert_overwrite', + file_format = 'hudi', +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg +union all +select cast(2 as bigint) as id, 'goodbye' as msg + +{% else %} + +select cast(2 as bigint) as id, 'yo' as msg +union all +select cast(3 as bigint) as id, 'anyway' as msg + +{% endif %} diff --git a/tests/integration/incremental_strategies/models_hudi/insert_overwrite_partitions.sql b/tests/integration/incremental_strategies/models_hudi/insert_overwrite_partitions.sql new file mode 100644 index 000000000..0f74cfdb3 --- /dev/null +++ b/tests/integration/incremental_strategies/models_hudi/insert_overwrite_partitions.sql @@ -0,0 +1,20 @@ +{{ config( + materialized = 'incremental', + incremental_strategy = 'insert_overwrite', + partition_by = 'id', + file_format = 'hudi', +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg +union all +select cast(2 as bigint) as id, 'goodbye' as msg + +{% else %} + +select cast(2 as bigint) as id, 'yo' as msg +union all +select cast(3 as bigint) as id, 'anyway' as msg + +{% endif %} diff --git a/tests/integration/incremental_strategies/models_hudi/merge_no_key.sql b/tests/integration/incremental_strategies/models_hudi/merge_no_key.sql new file mode 100644 index 000000000..8def11ddf --- /dev/null +++ b/tests/integration/incremental_strategies/models_hudi/merge_no_key.sql @@ -0,0 +1,19 @@ +{{ config( + materialized = 'incremental', + incremental_strategy = 'merge', + file_format = 'hudi', +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg +union all +select cast(2 as bigint) as id, 'goodbye' as msg + +{% else %} + +select cast(2 as bigint) as id, 'yo' as msg +union all +select cast(3 as bigint) as id, 'anyway' as msg + +{% endif %} diff --git a/tests/integration/incremental_strategies/models_hudi/merge_unique_key.sql b/tests/integration/incremental_strategies/models_hudi/merge_unique_key.sql new file mode 100644 index 000000000..ee72860d2 --- /dev/null +++ b/tests/integration/incremental_strategies/models_hudi/merge_unique_key.sql @@ -0,0 +1,20 @@ +{{ config( + materialized = 'incremental', + incremental_strategy = 'merge', + file_format = 'hudi', + unique_key = 'id', +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg +union all +select cast(2 as bigint) as id, 'goodbye' as msg + +{% else %} + +select cast(2 as bigint) as id, 'yo' as msg +union all +select cast(3 as bigint) as id, 'anyway' as msg + +{% endif %} diff --git a/tests/integration/incremental_strategies/models_hudi/merge_update_columns.sql b/tests/integration/incremental_strategies/models_hudi/merge_update_columns.sql new file mode 100644 index 000000000..99f0d0b73 --- /dev/null +++ b/tests/integration/incremental_strategies/models_hudi/merge_update_columns.sql @@ -0,0 +1,22 @@ +{{ config( + materialized = 'incremental', + incremental_strategy = 'merge', + file_format = 'hudi', + unique_key = 'id', + merge_update_columns = ['msg'], +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg, 'blue' as color +union all +select cast(2 as bigint) as id, 'goodbye' as msg, 'red' as color + +{% else %} + +-- msg will be updated, color will be ignored +select cast(2 as bigint) as id, 'yo' as msg, 'green' as color +union all +select cast(3 as bigint) as id, 'anyway' as msg, 'purple' as color + +{% endif %} diff --git a/tests/integration/incremental_strategies/test_incremental_strategies.py b/tests/integration/incremental_strategies/test_incremental_strategies.py index ef253fc5b..839f167e6 100644 --- a/tests/integration/incremental_strategies/test_incremental_strategies.py +++ b/tests/integration/incremental_strategies/test_incremental_strategies.py @@ -16,6 +16,10 @@ def project_config(self): }, } + def seed_and_run_once(self): + self.run_dbt(["seed"]) + self.run_dbt(["run"]) + def seed_and_run_twice(self): self.run_dbt(["seed"]) self.run_dbt(["run"]) @@ -77,6 +81,26 @@ def run_and_test(self): def test_delta_strategies_databricks_cluster(self): self.run_and_test() +# Uncomment this hudi integration test after the hudi 0.10.0 release to make it work. +# class TestHudiStrategies(TestIncrementalStrategies): +# @property +# def models(self): +# return "models_hudi" +# +# def run_and_test(self): +# self.seed_and_run_once() +# self.assertTablesEqual("append", "expected_append") +# self.assertTablesEqual("merge_no_key", "expected_append") +# self.assertTablesEqual("merge_unique_key", "expected_upsert") +# self.assertTablesEqual( +# "insert_overwrite_no_partitions", "expected_overwrite") +# self.assertTablesEqual( +# "insert_overwrite_partitions", "expected_upsert") +# +# @use_profile("apache_spark") +# def test_hudi_strategies_apache_spark(self): +# self.run_and_test() + class TestBadStrategies(TestIncrementalStrategies): @property diff --git a/tests/integration/persist_docs/models/schema.yml b/tests/integration/persist_docs/models/schema.yml index 2639037ba..6680f392e 100644 --- a/tests/integration/persist_docs/models/schema.yml +++ b/tests/integration/persist_docs/models/schema.yml @@ -49,6 +49,30 @@ models: description: | Some stuff here and then a call to {{ doc('my_fun_doc')}} + + - name: table_hudi_model + description: | + Table model description "with double quotes" + and with 'single quotes' as welll as other; + '''abc123''' + reserved -- characters + -- + /* comment */ + Some $lbl$ labeled $lbl$ and $$ unlabeled $$ dollar-quoting + columns: + - name: id + description: | + id Column description "with double quotes" + and with 'single quotes' as welll as other; + '''abc123''' + reserved -- characters + -- + /* comment */ + Some $lbl$ labeled $lbl$ and $$ unlabeled $$ dollar-quoting + - name: name + description: | + Some stuff here and then a call to + {{ doc('my_fun_doc')}} - name: view_model description: | diff --git a/tests/unit/test_macros.py b/tests/unit/test_macros.py index 151631e08..06ce202a7 100644 --- a/tests/unit/test_macros.py +++ b/tests/unit/test_macros.py @@ -43,6 +43,10 @@ def test_macros_create_table_as_file_format(self): sql = self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1').strip() self.assertEqual(sql, "create or replace table my_table using delta as select 1") + self.config['file_format'] = 'hudi' + sql = self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1').strip() + self.assertEqual(sql, "create table my_table using hudi as select 1") + def test_macros_create_table_as_options(self): template = self.__get_template('adapters.sql') @@ -51,6 +55,30 @@ def test_macros_create_table_as_options(self): sql = self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1').strip() self.assertEqual(sql, 'create or replace table my_table using delta options (compression "gzip" ) as select 1') + self.config['file_format'] = 'hudi' + sql = self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1').strip() + self.assertEqual(sql, 'create table my_table using hudi options (compression "gzip" ) as select 1') + + def test_macros_create_table_as_hudi_options(self): + template = self.__get_template('adapters.sql') + + self.config['file_format'] = 'hudi' + self.config['unique_key'] = 'id' + sql = self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1 as id').strip() + self.assertEqual(sql, 'create table my_table using hudi options (primaryKey "id" ) as select 1 as id') + + self.config['file_format'] = 'hudi' + self.config['unique_key'] = 'id' + self.config['options'] = {'primaryKey': 'id'} + sql = self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1 as id').strip() + self.assertEqual(sql, 'create table my_table using hudi options (primaryKey "id" ) as select 1 as id') + + self.config['file_format'] = 'hudi' + self.config['unique_key'] = 'uuid' + self.config['options'] = {'primaryKey': 'id'} + sql = self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1 as id') + self.assertIn('mock.raise_compiler_error()', sql) + def test_macros_create_table_as_partition(self): template = self.__get_template('adapters.sql') @@ -113,3 +141,10 @@ def test_macros_create_table_as_all(self): sql, "create or replace table my_table using delta partitioned by (partition_1,partition_2) clustered by (cluster_1,cluster_2) into 1 buckets location '/mnt/root/my_table' comment 'Description Test' as select 1" ) + + self.config['file_format'] = 'hudi' + sql = self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1').strip() + self.assertEqual( + sql, + "create table my_table using hudi partitioned by (partition_1,partition_2) clustered by (cluster_1,cluster_2) into 1 buckets location '/mnt/root/my_table' comment 'Description Test' as select 1" + )