Skip to content

Commit

Permalink
Initial structured logging changes (#5954)
Browse files Browse the repository at this point in the history
Co-authored-by: Peter Allen Webb <[email protected]>
Co-authored-by: Emily Rockman <[email protected]>
  • Loading branch information
3 people authored Oct 14, 2022
1 parent 0959979 commit 9b84b6e
Show file tree
Hide file tree
Showing 54 changed files with 6,283 additions and 2,447 deletions.
7 changes: 7 additions & 0 deletions .changes/unreleased/Features-20220817-154857.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Features
body: Proto logging messages
time: 2022-08-17T15:48:57.225267-04:00
custom:
Author: gshank
Issue: "5610"
PR: "5643"
13 changes: 0 additions & 13 deletions .github/workflows/structured-logging-schema-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,6 @@ jobs:
with:
python-version: "3.8"

- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
override: true

- name: Install python dependencies
run: |
pip install --user --upgrade pip
Expand All @@ -69,10 +63,3 @@ jobs:
# we actually care if these pass, because the normal test run doesn't usually include many json log outputs
- name: Run integration tests
run: tox -e integration -- -nauto

# apply our schema tests to every log event from the previous step
# skips any output that isn't valid json
- uses: actions-rs/cargo@v1
with:
command: run
args: --manifest-path test/interop/log_parsing/Cargo.toml
20 changes: 13 additions & 7 deletions core/dbt/adapters/base/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
from time import sleep
import sys
import traceback

# multiprocessing.RLock is a function returning this type
from multiprocessing.synchronize import RLock
Expand Down Expand Up @@ -48,6 +49,7 @@
RollbackFailed,
)
from dbt import flags
from dbt.utils import cast_to_str

SleepTime = Union[int, float] # As taken by time.sleep.
AdapterHandle = Any # Adapter connection handle objects can be any class.
Expand Down Expand Up @@ -304,9 +306,9 @@ def cleanup_all(self) -> None:
with self.lock:
for connection in self.thread_connections.values():
if connection.state not in {"closed", "init"}:
fire_event(ConnectionLeftOpen(conn_name=connection.name))
fire_event(ConnectionLeftOpen(conn_name=cast_to_str(connection.name)))
else:
fire_event(ConnectionClosed(conn_name=connection.name))
fire_event(ConnectionClosed(conn_name=cast_to_str(connection.name)))
self.close(connection)

# garbage collect these connections
Expand All @@ -332,17 +334,21 @@ def _rollback_handle(cls, connection: Connection) -> None:
try:
connection.handle.rollback()
except Exception:
fire_event(RollbackFailed(conn_name=connection.name))
fire_event(
RollbackFailed(
conn_name=cast_to_str(connection.name), exc_info=traceback.format_exc()
)
)

@classmethod
def _close_handle(cls, connection: Connection) -> None:
"""Perform the actual close operation."""
# On windows, sometimes connection handles don't have a close() attr.
if hasattr(connection.handle, "close"):
fire_event(ConnectionClosed2(conn_name=connection.name))
fire_event(ConnectionClosed2(conn_name=cast_to_str(connection.name)))
connection.handle.close()
else:
fire_event(ConnectionLeftOpen2(conn_name=connection.name))
fire_event(ConnectionLeftOpen2(conn_name=cast_to_str(connection.name)))

@classmethod
def _rollback(cls, connection: Connection) -> None:
Expand All @@ -353,7 +359,7 @@ def _rollback(cls, connection: Connection) -> None:
f'"{connection.name}", but it does not have one open!'
)

fire_event(Rollback(conn_name=connection.name))
fire_event(Rollback(conn_name=cast_to_str(connection.name)))
cls._rollback_handle(connection)

connection.transaction_open = False
Expand All @@ -365,7 +371,7 @@ def close(cls, connection: Connection) -> Connection:
return connection

if connection.transaction_open and connection.handle:
fire_event(Rollback(conn_name=connection.name))
fire_event(Rollback(conn_name=cast_to_str(connection.name)))
cls._rollback_handle(connection)
connection.transaction_open = False

Expand Down
10 changes: 5 additions & 5 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
CodeExecution,
CodeExecutionStatus,
)
from dbt.utils import filter_null_values, executor
from dbt.utils import filter_null_values, executor, cast_to_str

from dbt.adapters.base.connections import Connection, AdapterResponse
from dbt.adapters.base.meta import AdapterMeta, available
Expand All @@ -61,7 +61,7 @@
)
from dbt.adapters.base import Column as BaseColumn
from dbt.adapters.base import Credentials
from dbt.adapters.cache import RelationsCache, _make_key
from dbt.adapters.cache import RelationsCache, _make_ref_key_msg


SeedModel = Union[ParsedSeedNode, CompiledSeedNode]
Expand Down Expand Up @@ -343,7 +343,7 @@ def _schema_is_cached(self, database: Optional[str], schema: str) -> bool:
fire_event(
CacheMiss(
conn_name=self.nice_connection_name(),
database=database,
database=cast_to_str(database),
schema=schema,
)
)
Expand Down Expand Up @@ -726,9 +726,9 @@ def list_relations(self, database: Optional[str], schema: str) -> List[BaseRelat
relations = self.list_relations_without_caching(schema_relation)
fire_event(
ListRelations(
database=database,
database=cast_to_str(database),
schema=schema,
relations=[_make_key(x) for x in relations],
relations=[_make_ref_key_msg(x) for x in relations],
)
)

Expand Down
92 changes: 57 additions & 35 deletions core/dbt/adapters/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@
from copy import deepcopy
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple

from dbt.adapters.reference_keys import _make_key, _ReferenceKey
from dbt.adapters.reference_keys import (
_make_ref_key,
_make_ref_key_msg,
_make_msg_from_ref_key,
_ReferenceKey,
)
import dbt.exceptions
from dbt.events.functions import fire_event
from dbt.events.functions import fire_event, fire_event_if
from dbt.events.types import (
AddLink,
AddRelation,
Expand All @@ -21,8 +26,8 @@
UncachedRelation,
UpdateReference,
)
import dbt.flags as flags
from dbt.utils import lowercase
from dbt.helper_types import Lazy


def dot_separated(key: _ReferenceKey) -> str:
Expand Down Expand Up @@ -82,7 +87,7 @@ def key(self):
:return _ReferenceKey: A key for this relation.
"""
return _make_key(self)
return _make_ref_key(self)

def add_reference(self, referrer: "_CachedRelation"):
"""Add a reference from referrer to self, indicating that if this node
Expand Down Expand Up @@ -294,13 +299,18 @@ def add_link(self, referenced, dependent):
:param BaseRelation dependent: The dependent model.
:raises InternalError: If either entry does not exist.
"""
ref_key = _make_key(referenced)
dep_key = _make_key(dependent)
ref_key = _make_ref_key(referenced)
dep_key = _make_ref_key(dependent)
if (ref_key.database, ref_key.schema) not in self:
# if we have not cached the referenced schema at all, we must be
# referring to a table outside our control. There's no need to make
# a link - we will never drop the referenced relation during a run.
fire_event(UncachedRelation(dep_key=dep_key, ref_key=ref_key))
fire_event(
UncachedRelation(
dep_key=_make_msg_from_ref_key(dep_key),
ref_key=_make_msg_from_ref_key(ref_key),
)
)
return
if ref_key not in self.relations:
# Insert a dummy "external" relation.
Expand All @@ -310,7 +320,11 @@ def add_link(self, referenced, dependent):
# Insert a dummy "external" relation.
dependent = dependent.replace(type=referenced.External)
self.add(dependent)
fire_event(AddLink(dep_key=dep_key, ref_key=ref_key))
fire_event(
AddLink(
dep_key=_make_msg_from_ref_key(dep_key), ref_key=_make_msg_from_ref_key(ref_key)
)
)
with self.lock:
self._add_link(ref_key, dep_key)

Expand All @@ -321,12 +335,12 @@ def add(self, relation):
:param BaseRelation relation: The underlying relation.
"""
cached = _CachedRelation(relation)
fire_event(AddRelation(relation=_make_key(cached)))
fire_event(DumpBeforeAddGraph(dump=Lazy.defer(lambda: self.dump_graph())))
fire_event(AddRelation(relation=_make_ref_key_msg(cached)))
fire_event_if(flags.LOG_CACHE_EVENTS, lambda: DumpBeforeAddGraph(dump=self.dump_graph()))

with self.lock:
self._setdefault(cached)
fire_event(DumpAfterAddGraph(dump=Lazy.defer(lambda: self.dump_graph())))
fire_event_if(flags.LOG_CACHE_EVENTS, lambda: DumpAfterAddGraph(dump=self.dump_graph()))

def _remove_refs(self, keys):
"""Removes all references to all entries in keys. This does not
Expand All @@ -341,19 +355,6 @@ def _remove_refs(self, keys):
for cached in self.relations.values():
cached.release_references(keys)

def _drop_cascade_relation(self, dropped_key):
"""Drop the given relation and cascade it appropriately to all
dependent relations.
:param _CachedRelation dropped: An existing _CachedRelation to drop.
"""
if dropped_key not in self.relations:
fire_event(DropMissingRelation(relation=dropped_key))
return
consequences = self.relations[dropped_key].collect_consequences()
fire_event(DropCascade(dropped=dropped_key, consequences=consequences))
self._remove_refs(consequences)

def drop(self, relation):
"""Drop the named relation and cascade it appropriately to all
dependent relations.
Expand All @@ -365,10 +366,19 @@ def drop(self, relation):
:param str schema: The schema of the relation to drop.
:param str identifier: The identifier of the relation to drop.
"""
dropped_key = _make_key(relation)
fire_event(DropRelation(dropped=dropped_key))
dropped_key = _make_ref_key(relation)
dropped_key_msg = _make_ref_key_msg(relation)
fire_event(DropRelation(dropped=dropped_key_msg))
with self.lock:
self._drop_cascade_relation(dropped_key)
if dropped_key not in self.relations:
fire_event(DropMissingRelation(relation=dropped_key_msg))
return
consequences = self.relations[dropped_key].collect_consequences()
# convert from a list of _ReferenceKeys to a list of ReferenceKeyMsgs
consequence_msgs = [_make_msg_from_ref_key(key) for key in consequences]

fire_event(DropCascade(dropped=dropped_key_msg, consequences=consequence_msgs))
self._remove_refs(consequences)

def _rename_relation(self, old_key, new_relation):
"""Rename a relation named old_key to new_key, updating references.
Expand All @@ -390,7 +400,11 @@ def _rename_relation(self, old_key, new_relation):
for cached in self.relations.values():
if cached.is_referenced_by(old_key):
fire_event(
UpdateReference(old_key=old_key, new_key=new_key, cached_key=cached.key())
UpdateReference(
old_key=_make_ref_key_msg(old_key),
new_key=_make_ref_key_msg(new_key),
cached_key=_make_ref_key_msg(cached.key()),
)
)
cached.rename_key(old_key, new_key)

Expand Down Expand Up @@ -436,7 +450,7 @@ def _check_rename_constraints(self, old_key, new_key):
)

if old_key not in self.relations:
fire_event(TemporaryRelation(key=old_key))
fire_event(TemporaryRelation(key=_make_msg_from_ref_key(old_key)))
return False
return True

Expand All @@ -452,19 +466,27 @@ def rename(self, old, new):
:param BaseRelation new: The new relation name information.
:raises InternalError: If the new key is already present.
"""
old_key = _make_key(old)
new_key = _make_key(new)
fire_event(RenameSchema(old_key=old_key, new_key=new_key))
old_key = _make_ref_key(old)
new_key = _make_ref_key(new)
fire_event(
RenameSchema(
old_key=_make_msg_from_ref_key(old_key), new_key=_make_msg_from_ref_key(new)
)
)

fire_event(DumpBeforeRenameSchema(dump=Lazy.defer(lambda: self.dump_graph())))
fire_event_if(
flags.LOG_CACHE_EVENTS, lambda: DumpBeforeRenameSchema(dump=self.dump_graph())
)

with self.lock:
if self._check_rename_constraints(old_key, new_key):
self._rename_relation(old_key, _CachedRelation(new))
else:
self._setdefault(_CachedRelation(new))

fire_event(DumpAfterRenameSchema(dump=Lazy.defer(lambda: self.dump_graph())))
fire_event_if(
flags.LOG_CACHE_EVENTS, lambda: DumpAfterRenameSchema(dump=self.dump_graph())
)

def get_relations(self, database: Optional[str], schema: Optional[str]) -> List[Any]:
"""Case-insensitively yield all relations matching the given schema.
Expand Down Expand Up @@ -512,6 +534,6 @@ def _remove_all(self, to_remove: List[_CachedRelation]):
"""
for relation in to_remove:
# it may have been cascaded out already
drop_key = _make_key(relation)
drop_key = _make_ref_key(relation)
if drop_key in self.relations:
self.drop(drop_key)
3 changes: 2 additions & 1 deletion core/dbt/adapters/factory.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import threading
import traceback
from contextlib import contextmanager
from importlib import import_module
from pathlib import Path
Expand Down Expand Up @@ -63,7 +64,7 @@ def load_plugin(self, name: str) -> Type[Credentials]:
# otherwise, the error had to have come from some underlying
# library. Log the stack trace.

fire_event(PluginLoadError())
fire_event(PluginLoadError(exc_info=traceback.format_exc()))
raise
plugin: AdapterPlugin = mod.Plugin
plugin_type = plugin.adapter.type()
Expand Down
16 changes: 16 additions & 0 deletions core/dbt/adapters/reference_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from collections import namedtuple
from typing import Any, Optional
from dbt.events.proto_types import ReferenceKeyMsg


_ReferenceKey = namedtuple("_ReferenceKey", "database schema identifier")
Expand All @@ -14,11 +15,26 @@ def lowercase(value: Optional[str]) -> Optional[str]:
return value.lower()


# For backwards compatibility. New code should use _make_ref_key
def _make_key(relation: Any) -> _ReferenceKey:
return _make_ref_key(relation)


def _make_ref_key(relation: Any) -> _ReferenceKey:
"""Make _ReferenceKeys with lowercase values for the cache so we don't have
to keep track of quoting
"""
# databases and schemas can both be None
return _ReferenceKey(
lowercase(relation.database), lowercase(relation.schema), lowercase(relation.identifier)
)


def _make_ref_key_msg(relation: Any):
return _make_msg_from_ref_key(_make_ref_key(relation))


def _make_msg_from_ref_key(ref_key: _ReferenceKey) -> ReferenceKeyMsg:
return ReferenceKeyMsg(
database=ref_key.database, schema=ref_key.schema, identifier=ref_key.identifier
)
Loading

0 comments on commit 9b84b6e

Please sign in to comment.