Skip to content

Commit

Permalink
Merge pull request #2530 from alepuccetti/bigquery-processed-bytes-log
Browse files Browse the repository at this point in the history
Add data processed info into dbt run logs for all statement types
  • Loading branch information
jtcohen6 authored Jun 23, 2020
2 parents f758614 + 1da4bef commit 76f9f23
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 8 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,20 @@
- Added support for renaming BigQuery relations ([#2520](https:/fishtown-analytics/dbt/issues/2520), [#2521](https:/fishtown-analytics/dbt/pull/2521))
- Added support for BigQuery authorized views ([#1718](https:/fishtown-analytics/dbt/issues/1718), [#2517](https:/fishtown-analytics/dbt/pull/2517))
- Added support for altering BigQuery column types ([#2546](https:/fishtown-analytics/dbt/issues/2546), [#2547](https:/fishtown-analytics/dbt/pull/2547))
- Include row counts and bytes processed in log output for all BigQuery statement types ([#2526](https:/fishtown-analytics/dbt/issues/2526))

### Fixes
- Fixed an error in create_adapter_plugins.py script when -dependency arg not passed ([#2507](https:/fishtown-analytics/dbt/issues/2507), [#2508](https:/fishtown-analytics/dbt/pull/2508))
- Remove misleading "Opening a new connection" log message in set_connection_name. ([#2511](https:/fishtown-analytics/dbt/issues/2511))
- Now all the BigQuery statement types return the number of bytes processed ([#2526](https:/fishtown-analytics/dbt/issues/2526)).

Contributors:
- [@raalsky](https:/Raalsky) ([#2417](https:/fishtown-analytics/dbt/pull/2417), [#2485](https:/fishtown-analytics/dbt/pull/2485))
- [@alf-mindshift](https:/alf-mindshift) ([#2431](https:/fishtown-analytics/dbt/pull/2431))
- [@scarrucciu](https:/scarrucciu) ([#2508](https:/fishtown-analytics/dbt/pull/2508))
- [@southpolemonkey](https:/southpolemonkey) ([#2511](https:/fishtown-analytics/dbt/issues/2511))
- [@azhard](https:/azhard) ([#2517](https:/fishtown-analytics/dbt/pull/2517), ([#2521](https:/fishtown-analytics/dbt/pull/2521)), [#2547](https:/fishtown-analytics/dbt/pull/2547))
- [@alepuccetti](https:/alepuccetti) ([#2526](https:/fishtown-analytics/dbt/issues/2526))


## dbt 0.17.1 (Release TBD)
Expand Down
15 changes: 13 additions & 2 deletions core/dbt/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,12 +517,23 @@ def __get__(self, obj, objtype):


def format_bytes(num_bytes):
for unit in ['Bytes', 'KB', 'MB', 'GB', 'TB']:
for unit in ['Bytes', 'KB', 'MB', 'GB', 'TB', 'PB']:
if abs(num_bytes) < 1024.0:
return f"{num_bytes:3.1f} {unit}"
num_bytes /= 1024.0

return "> 1024 TB"
num_bytes *= 1024.0
return f"{num_bytes:3.1f} {unit}"


def format_rows_number(rows_number):
for unit in ['', 'k', 'm', 'b', 't']:
if abs(rows_number) < 1000.0:
return f"{rows_number:3.1f}{unit}".strip()
rows_number /= 1000.0

rows_number *= 1000.0
return f"{rows_number:3.1f}{unit}".strip()


# a little concurrent.futures.Executor for single-threaded mode
Expand Down
13 changes: 9 additions & 4 deletions plugins/bigquery/dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from google.api_core import retry, client_info
from google.oauth2 import service_account

from dbt.utils import format_bytes
from dbt.utils import format_bytes, format_rows_number
from dbt.clients import agate_helper, gcloud
from dbt.exceptions import (
FailedToConnectException, RuntimeException, DatabaseException
Expand Down Expand Up @@ -245,16 +245,21 @@ def execute(self, sql, auto_begin=False, fetch=None):
conn = self.get_thread_connection()
client = conn.handle
table = client.get_table(query_job.destination)
status = 'CREATE TABLE ({})'.format(table.num_rows)
processed = format_bytes(query_job.total_bytes_processed)
status = 'CREATE TABLE ({} rows, {} processed)'.format(
format_rows_number(table.num_rows),
format_bytes(query_job.total_bytes_processed),
)

elif query_job.statement_type == 'SCRIPT':
processed = format_bytes(query_job.total_bytes_processed)
status = f'SCRIPT ({processed} processed)'

elif query_job.statement_type in ['INSERT', 'DELETE', 'MERGE']:
status = '{} ({})'.format(
status = '{} ({} rows, {} processed)'.format(
query_job.statement_type,
query_job.num_dml_affected_rows
format_rows_number(query_job.num_dml_affected_rows),
format_bytes(query_job.total_bytes_processed),
)

else:
Expand Down
21 changes: 19 additions & 2 deletions test/unit/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ def test__simple_cases(self):
actual = dbt.utils.deep_map(self.intify_all, expected)
self.assertEqual(actual, expected)


@staticmethod
def special_keypath(value, keypath):

Expand Down Expand Up @@ -150,7 +149,25 @@ def test__simple_cases(self):
self.assertEqual(dbt.utils.format_bytes(1024**2*1.5), '1.5 MB')
self.assertEqual(dbt.utils.format_bytes(1024**3*52.6), '52.6 GB')
self.assertEqual(dbt.utils.format_bytes(1024**4*128), '128.0 TB')
self.assertEqual(dbt.utils.format_bytes(1024**5+1), '> 1024 TB')
self.assertEqual(dbt.utils.format_bytes(1024**5), '1.0 PB')
self.assertEqual(dbt.utils.format_bytes(1024**5*31.4), '31.4 PB')
self.assertEqual(dbt.utils.format_bytes(1024**6), '1024.0 PB')
self.assertEqual(dbt.utils.format_bytes(1024**6*42), '43008.0 PB')


class TestRowsNumberFormatting(unittest.TestCase):

def test__simple_cases(self):
self.assertEqual(dbt.utils.format_rows_number(-1), '-1.0')
self.assertEqual(dbt.utils.format_rows_number(0), '0.0')
self.assertEqual(dbt.utils.format_rows_number(20), '20.0')
self.assertEqual(dbt.utils.format_rows_number(1030), '1.0k')
self.assertEqual(dbt.utils.format_rows_number(1000**2*1.5), '1.5m')
self.assertEqual(dbt.utils.format_rows_number(1000**3*52.6), '52.6b')
self.assertEqual(dbt.utils.format_rows_number(1000**3*128), '128.0b')
self.assertEqual(dbt.utils.format_rows_number(1000**4), '1.0t')
self.assertEqual(dbt.utils.format_rows_number(1000**4*31.4), '31.4t')
self.assertEqual(dbt.utils.format_rows_number(1000**5*31.4), '31400.0t') # noqa: E501


class TestMultiDict(unittest.TestCase):
Expand Down

0 comments on commit 76f9f23

Please sign in to comment.