Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

converted to use new struct logging #55

Merged
merged 4 commits into from
Nov 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ N/A
- Capping `google-api-core` to version `1.31.3` due to `protobuf` dependency conflict ([#53](https:/dbt-labs/dbt-bigquery/pull/53))
- Bump `google-cloud-core` and `google-api-core` upper bounds to `<3`, thereby removing `<1.31.3` limit on the latter. Remove explicit dependency on `six` ([#57](https:/dbt-labs/dbt-bigquery/pull/57))
- Remove official support for python 3.6, which is reaching end of life on December 23, 2021 ([dbt-core#4134](https:/dbt-labs/dbt-core/issues/4134), [#59](https:/dbt-labs/dbt-bigquery/pull/59))
- Add support for structured logging [#55](https:/dbt-labs/dbt-bigquery/pull/55)

### Contributors
- [@imartynetz](https:/imartynetz) ([#48](https:/dbt-labs/dbt-bigquery/pull/48))
Expand Down
14 changes: 9 additions & 5 deletions dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@
FailedToConnectException, RuntimeException, DatabaseException, DbtProfileError
)
from dbt.adapters.base import BaseConnectionManager, Credentials
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.events import AdapterLogger
from dbt.events.functions import fire_event
from dbt.events.types import SQLQuery
from dbt.version import __version__ as dbt_version

from dbt.dataclass_schema import StrEnum

logger = AdapterLogger("BigQuery")

BQ_QUERY_JOB_SPLIT = '-----Query Job SQL Follows-----'

Expand Down Expand Up @@ -344,7 +347,7 @@ def raw_execute(self, sql, fetch=False, *, use_legacy_sql=False):
conn = self.get_thread_connection()
client = conn.handle

logger.debug('On {}: {}', conn.name, sql)
fire_event(SQLQuery(conn_name=conn.name, sql=sql))

if self.profile.query_comment and self.profile.query_comment.job_label:
query_comment = self.query_header.comment.query_comment
Expand Down Expand Up @@ -534,7 +537,7 @@ def _retry_and_handle(self, msg, conn, fn):
"""retry a function call within the context of exception_handler."""
def reopen_conn_on_error(error):
if isinstance(error, REOPENABLE_ERRORS):
logger.warning('Reopening connection after {!r}', error)
logger.warning('Reopening connection after {!r}'.format(error))
self.close(conn)
self.open(conn)
return
Expand Down Expand Up @@ -577,8 +580,9 @@ def count_error(self, error):
self.error_count += 1
if _is_retryable(error) and self.error_count <= self.retries:
logger.debug(
'Retry attempt {} of {} after error: {}',
self.error_count, self.retries, repr(error))
'Retry attempt {} of {} after error: {}'.format(
self.error_count, self.retries, repr(error)
))
return True
else:
return False
Expand Down
4 changes: 3 additions & 1 deletion dbt/adapters/bigquery/gcloud.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.events import AdapterLogger
import dbt.exceptions
from dbt.clients.system import run_cmd

Expand All @@ -9,6 +9,8 @@
https://cloud.google.com/sdk/
"""

logger = AdapterLogger("BigQuery")


def gcloud_installed():
try:
Expand Down
9 changes: 5 additions & 4 deletions dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from dbt.adapters.bigquery import BigQueryColumn
from dbt.adapters.bigquery import BigQueryConnectionManager
from dbt.contracts.graph.manifest import Manifest
from dbt.logger import GLOBAL_LOGGER as logger, print_timestamped_line
from dbt.events import AdapterLogger
from dbt.utils import filter_null_values

import google.auth
Expand All @@ -29,6 +29,8 @@
import agate
import json

logger = AdapterLogger("BigQuery")

# Write dispositions for bigquery.
WRITE_APPEND = google.cloud.bigquery.job.WriteDisposition.WRITE_APPEND
WRITE_TRUNCATE = google.cloud.bigquery.job.WriteDisposition.WRITE_TRUNCATE
Expand Down Expand Up @@ -481,9 +483,8 @@ def _bq_table_to_relation(self, bq_table):
@classmethod
def warning_on_hooks(hook_type):
msg = "{} is not supported in bigquery and will be ignored"
print_timestamped_line(
msg.format(hook_type), ui.COLOR_FG_YELLOW
)
warn_msg = dbt.ui.color(msg, ui.COLOR_FG_YELLOW)
logger.info(warn_msg)

@available
def add_query(self, sql, auto_begin=True, bindings=None,
Expand Down
16 changes: 9 additions & 7 deletions tests/integration/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,15 @@
from dbt.clients.jinja import template_cache
from dbt.config import RuntimeConfig
from dbt.context import providers
from dbt.logger import GLOBAL_LOGGER as logger, log_manager
from dbt.logger import log_manager
from dbt.events.functions import (
capture_stdout_logs, fire_event, setup_event_logger, stop_capture_stdout_logs
)
from dbt.events import AdapterLogger
from dbt.contracts.graph.manifest import Manifest


logger = AdapterLogger("Bigquery")
INITIAL_ROOT = os.getcwd()


Expand Down Expand Up @@ -231,6 +236,7 @@ def setUp(self):
os.chdir(self.initial_dir)
# before we go anywhere, collect the initial path info
self._logs_dir = os.path.join(self.initial_dir, 'logs', self.prefix)
setup_event_logger(self._logs_dir)
_really_makedirs(self._logs_dir)
self.test_original_source_path = _pytest_get_test_root()
self.test_root_dir = self._generate_test_root_dir()
Expand Down Expand Up @@ -403,16 +409,12 @@ def run_dbt(self, args=None, expect_pass=True, profiles_dir=True):

def run_dbt_and_capture(self, *args, **kwargs):
try:
initial_stdout = log_manager.stdout
initial_stderr = log_manager.stderr
stringbuf = io.StringIO()
log_manager.set_output_stream(stringbuf)

stringbuf = capture_stdout_logs()
res = self.run_dbt(*args, **kwargs)
stdout = stringbuf.getvalue()

finally:
log_manager.set_output_stream(initial_stdout, initial_stderr)
stop_capture_stdout_logs()

return res, stdout

Expand Down
24 changes: 13 additions & 11 deletions tests/integration/query_comments_test/test_query_comments.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import io
import json
import os
import re

import dbt.exceptions
from dbt.version import __version__ as dbt_version
Expand Down Expand Up @@ -52,28 +53,29 @@ def tearDown(self):
super().tearDown()

def run_get_json(self, expect_pass=True):
self.run_dbt(
res, raw_logs = self.run_dbt_and_capture(
['--debug', '--log-format=json', 'run'],
expect_pass=expect_pass
)
logs = []
for line in self.stringbuf.getvalue().split('\n'):
parsed_logs = []
for line in raw_logs.split('\n'):
try:
log = json.loads(line)
except ValueError:
continue

if log['extra'].get('run_state') != 'running':
continue
logs.append(log)
self.assertGreater(len(logs), 0)
return logs
parsed_logs.append(log)

# empty lists evaluate as False
self.assertTrue(parsed_logs)
return parsed_logs

def query_comment(self, model_name, log):
log_msg = re.sub("(?:[01]\d|2[0123]):(?:[012345]\d):(?:[012345]\d \| )", "", log['msg'])
prefix = 'On {}: '.format(model_name)

if log['message'].startswith(prefix):
msg = log['message'][len(prefix):]
if log_msg.startswith(prefix):
msg = log_msg[len(prefix):]
if msg in {'COMMIT', 'BEGIN', 'ROLLBACK'}:
return None
return msg
Expand All @@ -88,7 +90,7 @@ def run_assert_comments(self):
if msg is not None and self.matches_comment(msg):
seen = True

self.assertTrue(seen, 'Never saw a matching log message! Logs:\n{}'.format('\n'.join(l['message'] for l in logs)))
self.assertTrue(seen, 'Never saw a matching log message! Logs:\n{}'.format('\n'.join(l['msg'] for l in logs)))

@use_profile('bigquery')
def test_bigquery_comments(self):
Expand Down