Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CT-137] [Bug] Snapshot job creates duplicate valid records for unique_key #4661

Open
1 task done
Tracked by #10151
rdeese opened this issue Feb 2, 2022 · 14 comments
Open
1 task done
Tracked by #10151
Labels
bug Something isn't working snapshots Issues related to dbt's snapshot functionality

Comments

@rdeese
Copy link

rdeese commented Feb 2, 2022

Is there an existing issue for this?

  • I have searched the existing issues

Current Behavior

One of my running snapshots is amassing multiple valid records per unique_key. The competing valid records are produced regardless of whether a value in one of the check_cols has changed. Read on for details.

I have this snapshot:

{% snapshot billing_totals_snapshot %}

{{
  config(
    tags="billing",
    unique_key="org_id || '-' || billed_month",
    strategy='check',
    check_cols=[
      'source',
      'normalized_billing_type',
      'total_foo_charges',
      'total_bar_charges',
      'total_baz_charges',
      'total_other_charges',
      'total_charges'
    ]
  )
}}

SELECT * FROM {{ ref('billing_totals') }}

{% endsnapshot %}

The underlying table, billing_totals, is unique for the unique key. I wrote a query to check whether any unique keys have more than one valid record in the snapshot table:

	select 
		org_id,
		billed_month,
		count(1) as competing_valid_records
	from dbt_snapshots.billing_totals_snapshot
	where dbt_valid_to is null
	group by 1, 2
	having competing_valid_records > 1

Running it, I get many results (336,755 to be precise). I would expect zero.

My first guess was that the snapshot was recording real changes to the data, but not setting dbt_valid_to on the invalidated records. So I wrote a query to group competing valid records by the combined values of their check columns.

with non_unique_records as (
	select 
		org_id,
		billed_month,
		count(1) as competing_valid_records
	from dbt_snapshots.billing_totals_snapshot
	where dbt_valid_to is null
	group by 1, 2
	having competing_valid_records > 1
)
select 
	non_unique_records.org_id,
	non_unique_records.billed_month,
	non_unique_records.competing_valid_records,
	(snap."source" || '-' ||
	snap.normalized_billing_type || '-' ||
	snap.total_foo_charges || '-' ||
	snap.total_bar_charges || '-' ||
	snap.total_baz_charges || '-' ||
	snap.total_other_charges || '-' ||
	snap.total_charges) as snapshot_check_cols,
	count(1) as identical_valid_records
from non_unique_records
left join dbt_snapshots.billing_totals_snapshot as snap
	on non_unique_records.org_id = snap.org_id
	and non_unique_records.billed_month = snap.billed_month
where snap.dbt_valid_to is null
group by 1, 2, 3, 4

If every competing record represented an actual change, we should get 1 for identical_valid_records in all cases. I found that in most cases (330K out of 336K), identical_valid_records = competing_valid_records, i.e. all of the competing valid records are identical in the check columns.

Another thing that I find perplexing about this is that the number of competing valid records is not the same for all affected records. For a snapshot that has been run ~50 times (twice daily), the largest numbers of competing valid records are 21, 20, and 2, but 3-19 are also represented.

Expected Behavior

I expect that a snapshot table will have a single valid (i.e. dbt_valid_to is NULL) record for each unique_key.

Steps To Reproduce

Unfortunately I don't have steps to reproduce this, yet.

Relevant log output

No response

Environment

- OS: Docker, python:3.7.9-slim image
- Python: 3.7.9
- dbt: 0.21.0

Link to python:3.7.9-slim image.

What database are you using dbt with?

redshift

Additional Context

I have other snapshots running that are unaffected by this issue.

There is a previous issue that appears to be similar to this one, #2607 . I decided to make a new issue because that one is fairly stale (last substantive discussion was in 2020), has a confusing history (the ticket at first claimed the opposite problem, but was later renamed), and a lot of the discussion involves post-hooks which don't apply to my case.

Hopefully this new issue is an opportunity to have a more focused discussion of the problem -- but I'm happy to move conversation over to that ticket if the maintainers prefer.

@rdeese rdeese added bug Something isn't working triage labels Feb 2, 2022
@github-actions github-actions bot changed the title [Bug] Snapshot job creates duplicate valid records for unique_key [CT-137] [Bug] Snapshot job creates duplicate valid records for unique_key Feb 2, 2022
@jtcohen6 jtcohen6 added snapshots Issues related to dbt's snapshot functionality Team:Execution labels Feb 2, 2022
@codigo-ergo-sum
Copy link

Do you have any automated tests to catch this? We've added some tests on some of our snapshots just to catch odd behavior or weirdness in our source data or human error. Something like this:

      - dbt_utils.unique_combination_of_columns:
          combination_of_columns:
            - account_id
            - dbt_valid_from
      - dbt_utils.mutually_exclusive_ranges:
          lower_bound_column: dbt_valid_from
          upper_bound_column: dbt_valid_to
          partition_by: account_id
          gaps: not_allowed

We also add an is_current flag on a layer of models that we have on top of our snapshots (we call them "filters") and the logic for is_current is this:

SUBQUERY AS (
    SELECT
     *,
        CAST(COALESCE(dbt_valid_to, {{ var('future_eternity_ts') }}) AS TIMESTAMP) AS dbt_valid_to
        FROM {{ref('my_snapshot')}} 
    )
SELECT
    *,
    (
        CASE WHEN
            CURRENT_TIMESTAMP() BETWEEN dbt_valid_from AND dbt_valid_to
            THEN TRUE
            ELSE FALSE
        END
    ) AS is_current
    FROM SUBQUERY

Then we have this test on top of the filter to make sure that every entry has one and only one is_current row:

      - name: is_current
        tests:
          - not_null
          - count_value_by_id:
              id: vendor_id
              value: TRUE
              operand: "="
              count: 1

And then this is the macro for our custom test (there's probably something better now in dbt_expectations, we wrote this like 18 months ago:

/*
Custom schema test that checks a column to test for the count of a particular value.
Example usage:
count_value:
  id: id
  value: NULL
  operand: <
  count: 25
The test will pass if the count of NULL values is less than 25 for any given id, and will fail if the count of NULL values are greater than or equal to 25.
*/

{% macro test_count_value_by_id(model, column_name, id, value, operand, count) %}

with all_ids_values as (

    select
        {{ id }} as id,
        {% if value|lower == "null" -%}
        if({{ column_name }} is null,TRUE,FALSE) as field_value
        {%- else -%}
        if({{ column_name }} = {{ value }},TRUE,FALSE) as field_value
        {%- endif %}
        from {{ model }}

), distinct_ids as (

    select distinct
        id
        from all_ids_values

), ids_with_value as (

    select
        id,
        count(*) as count
        from all_ids_values
        where field_value = true
        group by id

), all_ids_with_counts as (

    select 
        a.id,
        coalesce(count,0) as count 
        from distinct_ids a
        left join ids_with_value b
            on a.id = b.id

)
select 
    *
    from all_ids_with_counts 
    where not count {{ operand }} {{ count }}

{% endmacro %}

@codigo-ergo-sum
Copy link

I know the above won't solve your problem but maybe it helps to sleep a little bit better knowing that you'll be immediately warned if the problem does happen again...

@rdeese
Copy link
Author

rdeese commented Feb 9, 2022

Thanks for sharing your approach. We don't currently have anything like this in place, but I think we'll certainly add something using your tests as a jumping off point.

@rdeese
Copy link
Author

rdeese commented Feb 9, 2022

The way I've patched this up in the meantime is to create a shim table which imputes what dbt_valid_to should be by:

  1. ranking rows according to dbt_valid_from
  2. joining consecutive ranks
  3. using the subsequent rank's dbt_valid_from as the synthetic_dbt_valid_to for each rank.

The SQL is below. I've validated the code on a properly functioning snapshot and it gives an identical result to the correct DBT behavior.

with
    ranked_entries as (
        select
            org_id,
            billed_month,
            dbt_valid_from,
            rank() over (
                partition by org_id, billed_month order by dbt_valid_from
            ) as valid_from_rank,
            dbt_valid_to
        from {{ ref('billing_totals_snapshot') }}
    ),
    synthentic_valid_to as (
        select
            ranked_entries.org_id,
            ranked_entries.billed_month,
            ranked_entries.dbt_valid_from,
            next_entries.dbt_valid_from as synthetic_dbt_valid_to
        from ranked_entries
        left join
            ranked_entries as next_entries
            on ranked_entries.org_id = next_entries.org_id
            and ranked_entries.billed_month = next_entries.billed_month
            and ranked_entries.valid_from_rank + 1 = next_entries.valid_from_rank
    )
select
    billing_totals_snapshot.dbt_valid_from,
    synthentic_valid_to.synthetic_dbt_valid_to,
    billing_totals_snapshot.org_id,
    billing_totals_snapshot.billed_month,
    ...
from {{ ref('billing_totals_snapshot') }}
inner join
    synthentic_valid_to
    on billing_totals_snapshot.org_id = synthentic_valid_to.org_id
    and billing_totals_snapshot.billed_month = synthentic_valid_to.billed_month
    and billing_totals_snapshot.dbt_valid_from = synthentic_valid_to.dbt_valid_from

@ChenyuLInx
Copy link
Contributor

Hey @rdeese, thanks for opening this. If this is a critical part in the data pipeline, we highly recommend trying to get the real event stream from some kind of change data capture system, since that's going to be the most accurate. But if that's not the case, a few area that you can look into:

  • is there multiple snapshot running at the same time
  • is the unique key in the table truly unique all the time
    Since under the hood, dbt is trying to compare the difference of you table between current snapshot run and previous snapshot run. There might also be missing states between snapshots based on how frequent you run the snapshot.

@ChenyuLInx ChenyuLInx removed the triage label Feb 14, 2022
@alicefinidori
Copy link

I am having the same issue, it is occurring almost daily that some (not all) of the snaphosts contain duplicated rows per unique key for a valid record (i.e. dbt_valid_to is NULL).

  • there is only one simultaneous snapshot run
  • the unique key in the table is truly unique and the valid record is a true duplicate (all fields in the record are exactly the same, including dbt_scd_id)
  • it is happening almost every day

We are also using the check_col strategy.

@github-actions
Copy link
Contributor

github-actions bot commented Oct 5, 2022

This issue has been marked as Stale because it has been open for 180 days with no activity. If you would like the issue to remain open, please remove the stale label or comment on the issue, or it will be closed in 7 days.

@github-actions github-actions bot added stale Issues that have gone stale and removed stale Issues that have gone stale labels Oct 5, 2022
@kjstultz
Copy link

kjstultz commented Oct 17, 2022

  • OS: Windows 10 Pro 19043.2130
  • Python:3.8
  • dbt:1.0.1

I ran into this as well, and it was happening for me when I would have two merge statements that tried to run at the same time. And the sinister part is that for me the duplicate record bug didn't occur until at least 2 snapshots past the async run. Reproducibility steps:

1. Table created with one value:

create table sandbox.public.table1(pk int,new_value int);
insert into sandbox.public.table1 values(1,1001);

2. Snapshotting code: DBT_TEST_SNAP.sql

{% snapshot DBT_TEST_SNAP%}

{{
    config(
      unique_key='PK',
      strategy='check',
      check_cols = 'all',
      invalidate_hard_deletes=True
    )
}}

select * from SANDBOX.PUBLIC.TABLE1

{% endsnapshot %}

3. Update value:
update sandbox.public.table1 set new_value = 1002 where pk = 1;

4. Run snapshot twice at the same time:
I did this in powershell...dealers choice. resulting table. Two active records created, near-duplicates:
After_async_snap.csv

5. Update Record again:
update sandbox.public.table1 set new_value = 1003 where pk = 1;
This closes both of the existing records and creates two exact duplicates.
After_Update.csv

6. Update Record again:
update sandbox.public.table1 set new_value = 1004 where pk = 1;

20:06:25  1 of 1 START snapshot ....DBT_TEST_SNAP........................... [RUN]
20:06:30  1 of 1 ERROR snapshotting ....DBT_TEST_SNAP....................... [ERROR in 5.77s]        
20:06:30
20:06:30  Finished running 1 snapshot in 13.47s.
20:06:30
20:06:30  Completed with 1 error and 0 warnings:
20:06:30
20:06:30  Database Error in snapshot DBT_TEST_SNAP (snapshots\DBT_TEST_SNAP.sql)
20:06:30    100090 (42P18): Duplicate row detected during DML action
20:06:30    Row Values: [1, 1003, "a98b516393854d620735c2e1b9e1a8dc", 1666037081091000000, 1666037081091000000, NULL]
20:06:30    compiled SQL at target\run\groups_dbt\snapshots\DBT_TEST_SNAP.sql
20:06:30
20:06:30  Done. PASS=0 WARN=0 ERROR=1 SKIP=0 TOTAL=1

When they don't run at the same time, everything is fine, but ideally there'd be some parameterizable logic to put a lock on a table when a snapshot is already running.

@kjstultz
Copy link

Digging into the snowflake documentation a bit, it sounds like they should be blocking these simultaneous merge statements from occurring by having the table locked by a transaction.

But it does sound like if they are using the same connection, they've seen some wonky behavior. The session ID's of the two simultaneous merge statements were different, but it's a bit unclear to me whether or not this would make a difference and if these transactions could bump into each other.

One thought I had for testing the theory is by naming the "begin" transaction block when the snowflake_dml_explicit_transaction macro is called from the snowflake adapter.

If the transactions are named, I imagine it should eliminate any transaction-melding that might be happening, if that is what's causing the issue. But at this point, it's just a theory.

@razmik
Copy link

razmik commented Mar 10, 2023

Faced the same original issue. It seems dbt snapshotting doesn't read data types consistently. Therefore, the issue can be resolved by specifically casting all columns to what they are. (e.g., all numeric columns to be casted to numeric. Snapshotting messes it up sometimes by interchangeably reading as float and numeric.) Make sure to cast when you read from the source.

@aaronleming
Copy link

  • dbt cloud 1.3
  • snowflake

We are encountering this issue as well, although I am only seeing duplicate unique records with exactly 2 null values for dbt_valid_to.

Having looked through debug logs, I believe this is the result of 2 colliding jobs running snapshot commands for the same snapshot at (nearly) the precise same time. I'm now trying to figure out a way to avoid these collisions without having to manually adjust schedules or --exclude flags on all of our prod jobs.

@gunnava
Copy link

gunnava commented Nov 15, 2023

Another user running into this issue where Duplicate records are created when the snapshot job has 2 colliding runs.
Please find teh details and debug logs of both teh runs below:

Sample data, where unique_key='cv_transaction_sid', check_cols=['md_version_hash']
Duplicate_snapshot_records.csv


 

CV_TRANSACTION_SID | MD_VERSION_HASH | DBT_SCD_ID | DBT_UPDATED_AT | DBT_VALID_FROM | DBT_VALID_TO | MD_ADF_LOG_ENTRY_TIMESTAMP | MD_LOG_ID | MD_STAGE_LOG_ID -- | -- | -- | -- | -- | -- | -- | -- | -- 368af339ace909b00d6a7fdeaed17017 | 8025b86b4de9237c32cdb807252b87f9 | 0621901c6d6ff749b883732d994d077c | 2023-10-19 07:11:19.905 | 2023-10-19 07:11:19.905 | 2023-10-25 03:35:38.917 | 2023-10-19 08:18:55.472 +0200 | ce23820b-8f20-4015-bab9-0da6d0d7c09a | dbdb1655-f987-4a53-a864-e9f1d38d6583\|\|\|2023/10/19/03/00 368af339ace909b00d6a7fdeaed17017 | e905fc8af2fd3c424d1da79cf048eb1e | cc948d7ae2e177a306e2670baeb3538f | 2023-10-25 03:35:38.917 | 2023-10-25 03:35:38.917 | 2023-10-27 03:43:34.504 | 2023-10-25 05:16:04.732 +0200 | 39ff636b-0aa4-4fa6-8f73-891081ed44d6 | f754cc5a-4c7a-4f8c-b78d-df2ce9483d73\|\|\|2023/10/25/03/00 368af339ace909b00d6a7fdeaed17017 | e905fc8af2fd3c424d1da79cf048eb1e | 6662200c24fa4ae4f358fff1a684d380 | 2023-10-25 07:22:33.821 | 2023-10-25 07:22:33.821 | 2023-10-27 03:43:34.504 | 2023-10-25 05:16:04.732 +0200 | 92d4f6ee-2fde-42a2-91ed-5e52446b13c6 | f754cc5a-4c7a-4f8c-b78d-df2ce9483d73\|\|\|2023/10/25/03/00 368af339ace909b00d6a7fdeaed17017 | 3c67fa1acf619610b8183a635abb1513 | dbbd26c4f526ca742c44ffde7bcf763c | 2023-10-27 03:43:34.504 | 2023-10-27 03:43:34.504 |   | 2023-10-27 05:27:56.232 +0200 | 875e371e-153d-42a6-98d6-ef6e1f103643 | 43ea984f-9614-46b7-a8e4-65d2e50e2a29\|\|\|2023/10/27/03/00 368af339ace909b00d6a7fdeaed17017 | 3c67fa1acf619610b8183a635abb1513 | dbbd26c4f526ca742c44ffde7bcf763c | 2023-10-27 03:43:34.504 | 2023-10-27 03:43:34.504 |   | 2023-10-27 05:27:56.232 +0200 | 875e371e-153d-42a6-98d6-ef6e1f103643 | 43ea984f-9614-46b7-a8e4-65d2e50e2a29\|\|\|2023/10/27/03/00

 

We can see that row 1 is ok, rows 2 and 3 have the same MD_VERSION_HASH but different DBT_SCD_ID, while rows 4 and 5 have same MD_VERSION_HASH and same DBT_SCD_ID.

How it happened:

  1. At 2023-10-25 03:35:38.917, first create or replace temporary table "PROD"."PBW"."PBW_CV_TRANSACTION_VER__dbt_tmp" statement with query_id = 01afe037-0302-ef55-0000-815d22cc0912 was triggered.
  2. At 2023-10-25 07:14:54.863, first merge statement with query_id = 01afe112-0302-ee58-0000-815d22cfcaa2 was triggered.
  3. At 2023-10-25 07:22:33.821, second create or replace temporary table "PROD"."PBW"."PBW_CV_TRANSACTION_VER__dbt_tmp" statement with query_id = 01afe11a-0302-ee58-0000-815d22d00422 was triggered, while first merge was still running.
  4. At 2023-10-25 07:30:38.033, first merge statement finished, inserting row 2.
  5. At 2023-10-25 11:15:41.083, second merge is finished, inserting row 3.
  6. At 2023-10-27 03:43:34.504, third create or replace temporary table "PROD"."PBW"."PBW_CV_TRANSACTION_VER__dbt_tmp" statement produce duplicated rows 4 and 5.

Attached are the debug logs for colliding runs of the same job on dbt cloud.
Deploy - Run #2035649
Deploy - Run #2037538
image (1)
debug (50).log
debug (51).log

@elsander
Copy link

elsander commented May 3, 2024

I also experienced this issue. Steps to replicate:

  1. Create a table with no duplicates on the unique key (which we'll call unique_id)
  2. Snapshot the table with check_cols='all'
  3. Manually add a new record to the table, which has two rows for a new unique_id
  4. Snapshot the table again.

What I have observed is that if a duplicate appears for a unique_id value that is already in the snapshot, the snapshot will error as expected. However, if a duplicate appears for a unique_id value which is NOT already in the snapshot, the snapshot will add the duplicate rows without error, and will error in all subsequent snapshots due to having non-unique values in the snapshot itself.

@krspinney
Copy link

krspinney commented Sep 18, 2024

I've also encountered this behavior occurring intermittently, and I suspect that it's caused by some sort of network hiccup in our environment (AWS Airflow, EKS, Snowflake). We have a number of tables using the check all cols approach since the source doesn't have timestamps we can use, and for now this is the only way we can get CDC. Both our dev and prod environments are pointing to the same source, but occasionally one environment will produce duplicates while the other does not.

I confirmed that Airflow only triggered the snapshots once, then got digging through the logs on Snowflake. What I eventually found was that the temp table was created for the merge only once - but the merge statement was executed twice for some reason, about 2 hours apart. As a workaround I'm thinking about creating a post hook for these snapshots to de-dupe them.

(Edit - and I should mention, we're not using dbt retry on our snapshot runs)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working snapshots Issues related to dbt's snapshot functionality
Projects
None yet
Development

No branches or pull requests