Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dbt snapshots to handle append tables #3878

Closed
Tracked by #10151
mpavillet opened this issue Sep 13, 2021 · 4 comments
Closed
Tracked by #10151

dbt snapshots to handle append tables #3878

mpavillet opened this issue Sep 13, 2021 · 4 comments
Labels
enhancement New feature or request snapshots Issues related to dbt's snapshot functionality stale Issues that have gone stale

Comments

@mpavillet
Copy link

Describe the feature

dbt snapshot feature to handle scd2 changes when a give id exists several times (data is appended to the source table, as opposed to overwritten). Currenty, dbt snapshots assume that:

  • There is one row per unique id
  • In the source data, the original row gets overwritten when a record is modified (and not appended).

Describe alternatives you've considered

The way our data lake works (dbt reads from the data lake and writes to our data warehouse) - those are two distinct databases in Snowflake.
We get a FULL copy of the prod table every day, clustered by date. Thus, we have a full table every day. The delta between day and day-1 is what we need for scd2. The only way I’ve made it work so far is: that snapshots pull from the latest version of the table in prod.
Problem: if we lose a day or two, we need to re-run for the missing days, which I’ve made work by passing a variable like dbt snapshot --var ref_date = 'my date'
Sadly, snapshots in dbt don’t allow to iterate over an array of dates

Additional context

It's not db specific but snowflake does throw this ERROR_ON_NONDETERMINISTIC_MERGE if an id exists several times.
When set to false, Snowflake doesn't throw any error but does not handle scd2 changes properly. (see dates below):

ID | Updated_at | Valid from | Valid to 
 1 | 2020-12-15 | 2020-12-15 | 2021-08-19
 1 | 2021-07-13 | 2021-07-13 | 2021-08-19
 1 | 2021-08-19 | 2021-08-19 | NULL

Who will this benefit?

Anyone who wishes to have scd2 snapshots derived from append tables or create a scd2 table off off an incremental table where an id can exists several time.

Are you interested in contributing this feature?

Yes. I am happy to look into the SQL, what needs to be done, update the macro.

@mpavillet mpavillet added enhancement New feature or request triage labels Sep 13, 2021
@jtcohen6 jtcohen6 added snapshots Issues related to dbt's snapshot functionality and removed triage labels Sep 20, 2021
@jtcohen6
Copy link
Contributor

@mpavillet This is a really interesting issue. Thanks for the thoughtful & thorough write-up!

In some sense, you're actually in a better position than most folks using snapshots. Those folks need to use snapshots, because they don't have access to a full roster of historical values; you have them in spades. For me, this begs a question: which is the more important feature of snapshots?

  • The implementation of slowly changing dimension logic
  • The safeguards to ensure that the most essential extracts from raw data are never lost, since the history of a fully replaced source table cannot be reprocessed with idempotence

I think it's the latter. The logic underpinning the timestamp and check_cols strategies is cool and all, but really a lot of the snapshot-specific functionality exists to make them rock-solid reliable: every time a snapshot runs, it either does its thing, or historical data could be gone for good.

If you have guaranteed access to all historical values, with a full table copy from every single day, and you want to summarize those values into slowly changing dimensions—you can write the SQL for it, and wrap that SQL in an incremental model. That SQL is not trivial, but (I think) it's also less complicated than you'd expect.

Plus, that incremental model can include logic to dynamically grab only the newest days of data, dynamically determined based on a bookmark from the model's own last run—so there's no need to manually reprocess day by day. If at any time you need to change the logic, you can --full-refresh without fear of losing historical data. It may just cost some extra time and warehouse credits to scan over every date cluster in your data lake.

Here's some (untested) code I'm imagining for that model:

{{ config(
    materialized = 'incremental',
    unique_key = 'scd_id',
    merge_update_columns = ['valid_to']
) }}

with all_or_new as (

    select * from {{ source('my_date_clustered', 'source_table') }}

    {% if is_incremental() %}
    
        -- get the latest day of data from when this model was last run.
        -- always reprocess that day and the day before (just to be safe)
        -- plus anything newer, of course
    
        where date_day >= {{ dbt_utils.dateadd(
            datepart='day', interval=1, from_date_or_timestamp= '(select max(updated_at) from {{ this }})'
        ) }}
    
    {% endif %}

),

hashed as (

    select *,
        {{ dbt_utils.surrogate_key(['id', 'updated_at']) }} as scd_id
        
    from all_or_new

),

windowed as (

    select *,
        min(date_day) over (partition by scd_id) as valid_from,
        
        -- valid_to should be null if this scd_id appears in the most recent day of data
        -- (i.e. the record is still valid)
        nullif(
            max(date_day) over (partition by scd_id),
            max(date_day) over ()
        ) as valid_to
        
    from windowed
    group by 1

)

select * from windowed
-- deduplicate: only take the latest day of data for each scd_id
qualify row_number() over (partition by scd_id order by date_day) = 1

There are a few significant things to call out here:

  • We manually hash an scd_id to use as the unique_key. That's what Snowflake will use to merge on, and update previous records with new values. If updated_at changes, the scd_id changes, and generates a new record in the resulting table. When valid_to changes from today to yesterday, because that value of updated_at no longer appears in today's data, then the merge will update the associated scd_id with the new (not-null) valid_to.
  • This is written using a timestamp-based strategy (updated_at), since that's what your example above includes. We could equally calculate the scd_id as a hash of specific column values instead.
  • The merge_update_columns config (docs) is new in dbt v0.20, and it enables you to overwrite only a subset of columns for existing rows during incremental runs, instead of the entire row. This makes our job here a lot easier. When the latest version of a record keeps appearing day after day, we don't need to bend over backwards to keep grabbing its correct valid_from, since we only care about updating valid_to.
  • All of this logic should work equally well for an incremental run over one day of new data, over multiple days of new data, and a full-refresh backfill of all days of data

Curious to hear what you think! Does that logic feel unwieldy? Have I missed a few thorny details of your specific use case?

@github-actions
Copy link
Contributor

This issue has been marked as Stale because it has been open for 180 days with no activity. If you would like the issue to remain open, please remove the stale label or comment on the issue, or it will be closed in 7 days.

@github-actions github-actions bot added the stale Issues that have gone stale label May 26, 2022
@github-actions
Copy link
Contributor

github-actions bot commented Jun 2, 2022

Although we are closing this issue as stale, it's not gone forever. Issues can be reopened if there is renewed community interest; add a comment to notify the maintainers.

@github-actions github-actions bot closed this as completed Jun 2, 2022
@jimmymaise
Copy link

jimmymaise commented Jan 19, 2024

can you revise your logic @jtcohen6 as i don't think it correct

We need to update the old row (valid_to) and insert the new one. I don't see the logic here. You may try to update the old one but how about the new one?. And the way you create sk is difficult to understand.{{ dbt_utils.surrogate_key(['id', 'updated_at']) }}. For sure, every single record change log have only sk as every change the unique is id and updated_at. So why we you need to partition by this sk if we just have only one record for it?

on the other hand from windowed as is select its self, it should be select * from hashed and what is date_day in your query?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request snapshots Issues related to dbt's snapshot functionality stale Issues that have gone stale
Projects
None yet
Development

No branches or pull requests

3 participants