Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge update+insert truncates a delta table #2320

Closed
t1g0rz opened this issue Mar 23, 2024 · 2 comments · Fixed by #2324
Closed

Merge update+insert truncates a delta table #2320

t1g0rz opened this issue Mar 23, 2024 · 2 comments · Fixed by #2324
Labels
binding/python Issues for the Python package binding/rust Issues for the Rust crate bug Something isn't working

Comments

@t1g0rz
Copy link
Contributor

t1g0rz commented Mar 23, 2024

Environment

Delta-rs version: 0.16.2

Binding: python

Environment:

  • OS: ubuntu22.04.1

Bug

What happened:
I attempted to limit the scan of the target table and noticed that if I do so, it simply removes predicates evaluated to false.

What you expected to happen:
I expected updates and inserts to occur according to the predicate.

How to reproduce it:
Below is the code which could help to understand the issue. I'm not entirely sure if it is indeed an issue or if I am simply doing something wrong:

import pandas as pd
import numpy as np
import pyarrow as pa
from deltalake import DeltaTable

dt = DeltaTable.create(
    "./test1",
    schema=pa.schema(
        [
            pa.field("ts", pa.timestamp("us"), nullable=False),
            pa.field("some_data", pa.float64(), nullable=True),
        ]
    ),
)


df = pd.DataFrame(
    {
        "ts": pd.date_range("2023-01-01", freq="1h", periods=5),
        "some_data": np.random.random(5),
    }
)
dt = DeltaTable("./test1")
dt.merge(
    df,
    predicate=f"t.ts::Timestamp >= '{df.ts.min()}'::Timestamp and s.ts = t.ts",
    source_alias="s",
    target_alias="t",
).when_matched_update_all().when_not_matched_insert_all().execute()

"""
{'num_source_rows': 5,
 'num_target_rows_inserted': 5,
 'num_target_rows_updated': 0,
 'num_target_rows_deleted': 0,
 'num_target_rows_copied': 0,
 'num_output_rows': 5,
 'num_target_files_added': 1,
 'num_target_files_removed': 0,
 'execution_time_ms': 8,
 'scan_time_ms': 0,
 'rewrite_time_ms': 1}
"""

dt = DeltaTable('./test1')
dt.to_pandas().sort_values('ts', ignore_index=True)

"""
	ts                              	some_data
0	2023-01-01 00:00:00	0.791746
1	2023-01-01 01:00:00	0.119636
2	2023-01-01 02:00:00	0.389704
3	2023-01-01 03:00:00	0.205776
4	2023-01-01 04:00:00	0.865292

"""

dt = DeltaTable("./test1")
df = pd.DataFrame(
    {
        "ts": pd.date_range("2023-01-01 1:00:00", freq="1h", periods=5),
        "some_data": np.random.random(5),
    }
)
dt.merge(
    df,
    predicate=f"t.ts::Timestamp >= '{df.ts.min()}'::Timestamp and s.ts = t.ts",
    source_alias="s",
    target_alias="t",
).when_matched_update_all().when_not_matched_insert_all().execute()

"""
{'num_source_rows': 5,
 'num_target_rows_inserted': 1,
 'num_target_rows_updated': 4,
 'num_target_rows_deleted': 0,
 'num_target_rows_copied': 0,
 'num_output_rows': 5,
 'num_target_files_added': 2,
 'num_target_files_removed': 1,
 'execution_time_ms': 9,
 'scan_time_ms': 0,
 'rewrite_time_ms': 1}
"""

dt = DeltaTable('./test1')
dt.to_pandas().sort_values('ts', ignore_index=True)

"""
	ts                               	some_data
0	2023-01-01 01:00:00	0.835136
1	2023-01-01 02:00:00	0.064837
2	2023-01-01 03:00:00	0.997990
3	2023-01-01 04:00:00	0.088445
4	2023-01-01 05:00:00	0.024112
"""

dt = DeltaTable("./test1")
df = pd.DataFrame(
    {
        "ts": pd.date_range("2023-01-01 3:00:00", freq="1h", periods=5),
        "some_data": np.random.random(5),
    }
)
dt.merge(
    df,
    predicate=f"t.ts::Timestamp >= '{df.ts.min()}'::Timestamp and s.ts = t.ts",
    source_alias="s",
    target_alias="t",
).when_matched_update_all().when_not_matched_insert_all().execute()
"""
{'num_source_rows': 5,
 'num_target_rows_inserted': 2,
 'num_target_rows_updated': 3,
 'num_target_rows_deleted': 0,
 'num_target_rows_copied': 0,
 'num_output_rows': 5,
 'num_target_files_added': 3,
 'num_target_files_removed': 2,
 'execution_time_ms': 14,
 'scan_time_ms': 0,
 'rewrite_time_ms': 1}
"""

dt = DeltaTable('./test1')
dt.to_pandas().sort_values('ts', ignore_index=True)
"""
        ts                                      some_data
0	2023-01-01 03:00:00	0.515337
1	2023-01-01 04:00:00	0.692165
2	2023-01-01 05:00:00	0.432456
3	2023-01-01 06:00:00	0.498319
4	2023-01-01 07:00:00	0.778521
"""
@t1g0rz t1g0rz added the bug Something isn't working label Mar 23, 2024
@ion-elgreco
Copy link
Collaborator

@t1g0rz thanks for the issue, this seems to have been introduced in the latest version, 0.16.1 was still working fine.

@ion-elgreco ion-elgreco added binding/python Issues for the Python package binding/rust Issues for the Rust crate labels Mar 23, 2024
@t1g0rz
Copy link
Contributor Author

t1g0rz commented Mar 31, 2024

@ion-elgreco
This bug still persists in 0.16.3, but it has become more sophisticated. I think Delta starts experiencing problems when there are few parquet files in the folder.

Here is my updated code to catch this bug:

import numpy as np
import pandas as pd
from deltalake import DeltaTable
import pyarrow as pa
import polars as pl
import string

def create_mock_df(st_idx, end_idx, sets_of_data):
    diff = end_idx - st_idx
    res = []
    for i in range(sets_of_data):
        mock_df = pd.DataFrame(np.random.random((diff, 5)), columns=[f"c{i}" for i in range(1, 6)], dtype=str)
        mock_df.insert(0, 'iii', range(st_idx, end_idx))
        mock_df.insert(1, 'name', np.random.choice(list(string.ascii_uppercase), size=diff))
        res.append(mock_df)
    
    return pd.concat(res, ignore_index=True).drop_duplicates(['iii', 'name'])

settings_to_merge = [
    (0, 1_400_000, 10),
    (1_040_000, 1_045_000, 10),
    (1_450_000, 1_500_000, 10),
    (1_139_800, 1_600_000, 10),
]

path = 'test'
storage_options=None

DeltaTable.create(path, 
                  storage_options=storage_options, 
                  schema=pa.schema(
        [
            pa.field('iii', type=pa.int64(), nullable=False),
            pa.field('name', type=pa.string(), nullable=False),
            pa.field('c1', type=pa.string()),
            pa.field('c2', type=pa.string()),
            pa.field('c3', type=pa.string()),
            pa.field('c4', type=pa.string()),
            pa.field('c5', type=pa.string()),
        ]
    )
)

for st_idx, end_idx, sets_of_data in settings_to_merge:
    mock_df = create_mock_df(st_idx, end_idx, sets_of_data)
    dt = DeltaTable(path, storage_options=storage_options)
    es = (
        dt.merge(mock_df, predicate=f't.iii > {mock_df.iii.min()} and s.iii = t.iii and s.name = t.name', source_alias='s', target_alias='t')
          .when_not_matched_insert_all()
          .when_matched_update_all()
          .execute()
    )
    print(es)
    print('init df shape:', len(mock_df))
    print('delta shape after merge:', pl.scan_delta(path, storage_options=storage_options).select('name').collect().shape)
    print('----')

Here is the output:

{'num_source_rows': 11810600, 'num_target_rows_inserted': 11810600, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 0, 'num_output_rows': 11810600, 'num_target_files_added': 10, 'num_target_files_removed': 0, 'execution_time_ms': 15032, 'scan_time_ms': 0, 'rewrite_time_ms': 15018}
init df shape: 11810600
delta shape after merge: (11_810_600, 1)
----
{'num_source_rows': 42210, 'num_target_rows_inserted': 28587, 'num_target_rows_updated': 13623, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 11779569, 'num_output_rows': 11821779, 'num_target_files_added': 14, 'num_target_files_removed': 10, 'execution_time_ms': 6020, 'scan_time_ms': 0, 'rewrite_time_ms': 6005}
init df shape: 42210
delta shape after merge: (11_821_779, 1)
----
{'num_source_rows': 421823, 'num_target_rows_inserted': 421823, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 0, 'num_output_rows': 421823, 'num_target_files_added': 1, 'num_target_files_removed': 0, 'execution_time_ms': 576, 'scan_time_ms': 0, 'rewrite_time_ms': 561}
init df shape: 421823
delta shape after merge: (12_243_602, 1)
----
{'num_source_rows': 3880878, 'num_target_rows_inserted': 3032571, 'num_target_rows_updated': 848307, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 2344350, 'num_output_rows': 6225228, 'num_target_files_added': 10, 'num_target_files_removed': 15, 'execution_time_ms': 7357, 'scan_time_ms': 0, 'rewrite_time_ms': 7341}
init df shape: 3880878
delta shape after merge: (6_225_228, 1)
----

UPD: I understood that I cannot reopen the issue, and I'm not sure if anyone will notice my update, so I created a new one: #2362

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/python Issues for the Python package binding/rust Issues for the Rust crate bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants