-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
functions.py
331 lines (266 loc) · 11.1 KB
/
functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
import betterproto
from colorama import Style
from dbt.events.base_types import NoStdOut, BaseEvent, NoFile, Cache
from dbt.events.helpers import env_secrets, scrub_secrets
from dbt.events.types import (
EventBufferFull,
MainReportVersion,
EmptyLine,
)
import dbt.flags as flags
from dbt.constants import METADATA_ENV_PREFIX
from dbt.logger import make_log_dir_if_missing, GLOBAL_LOGGER
from datetime import datetime
import json
import io
from io import StringIO, TextIOWrapper
import logbook
import logging
from logging import Logger
import sys
from logging.handlers import RotatingFileHandler
import os
import uuid
import threading
from typing import Optional, Union, Callable, Dict
from collections import deque
LOG_VERSION = 3
EVENT_HISTORY = None
# create the global file logger with no configuration
FILE_LOG = logging.getLogger("default_file")
null_handler = logging.NullHandler()
FILE_LOG.addHandler(null_handler)
# set up logger to go to stdout with defaults
# setup_event_logger will be called once args have been parsed
STDOUT_LOG = logging.getLogger("default_stdout")
STDOUT_LOG.setLevel(logging.INFO)
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setLevel(logging.INFO)
STDOUT_LOG.addHandler(stdout_handler)
format_color = True
format_json = False
invocation_id: Optional[str] = None
metadata_vars: Optional[Dict[str, str]] = None
def setup_event_logger(log_path, level_override=None):
global format_json, format_color, STDOUT_LOG, FILE_LOG
make_log_dir_if_missing(log_path)
format_json = flags.LOG_FORMAT == "json"
# USE_COLORS can be None if the app just started and the cli flags
# havent been applied yet
format_color = True if flags.USE_COLORS else False
# TODO this default should live somewhere better
log_dest = os.path.join(log_path, "dbt.log")
level = level_override or (logging.DEBUG if flags.DEBUG else logging.INFO)
# overwrite the STDOUT_LOG logger with the configured one
STDOUT_LOG = logging.getLogger("configured_std_out")
STDOUT_LOG.setLevel(level)
FORMAT = "%(message)s"
stdout_passthrough_formatter = logging.Formatter(fmt=FORMAT)
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setFormatter(stdout_passthrough_formatter)
stdout_handler.setLevel(level)
# clear existing stdout TextIOWrapper stream handlers
STDOUT_LOG.handlers = [
h
for h in STDOUT_LOG.handlers
if not (hasattr(h, "stream") and isinstance(h.stream, TextIOWrapper)) # type: ignore
]
STDOUT_LOG.addHandler(stdout_handler)
# overwrite the FILE_LOG logger with the configured one
FILE_LOG = logging.getLogger("configured_file")
FILE_LOG.setLevel(logging.DEBUG) # always debug regardless of user input
file_passthrough_formatter = logging.Formatter(fmt=FORMAT)
file_handler = RotatingFileHandler(
filename=log_dest, encoding="utf8", maxBytes=10 * 1024 * 1024, backupCount=5 # 10 mb
)
file_handler.setFormatter(file_passthrough_formatter)
file_handler.setLevel(logging.DEBUG) # always debug regardless of user input
FILE_LOG.handlers.clear()
FILE_LOG.addHandler(file_handler)
# used for integration tests
def capture_stdout_logs() -> StringIO:
global STDOUT_LOG
capture_buf = io.StringIO()
stdout_capture_handler = logging.StreamHandler(capture_buf)
stdout_handler.setLevel(logging.DEBUG)
STDOUT_LOG.addHandler(stdout_capture_handler)
return capture_buf
# used for integration tests
def stop_capture_stdout_logs() -> None:
global STDOUT_LOG
STDOUT_LOG.handlers = [
h
for h in STDOUT_LOG.handlers
if not (hasattr(h, "stream") and isinstance(h.stream, StringIO)) # type: ignore
]
# returns a dictionary representation of the event fields.
# the message may contain secrets which must be scrubbed at the usage site.
def event_to_json(
event: BaseEvent,
) -> str:
event_dict = event_to_dict(event)
raw_log_line = json.dumps(event_dict, sort_keys=True)
return raw_log_line
def event_to_dict(event: BaseEvent) -> dict:
event_dict = dict()
try:
# We could use to_json here, but it wouldn't sort the keys.
# The 'to_json' method just does json.dumps on the dict anyway.
event_dict = event.to_dict(casing=betterproto.Casing.SNAKE, include_default_values=True) # type: ignore
except AttributeError as exc:
event_type = type(event).__name__
raise Exception(f"type {event_type} is not serializable. {str(exc)}")
return event_dict
# translates an Event to a completely formatted text-based log line
# type hinting everything as strings so we don't get any unintentional string conversions via str()
def reset_color() -> str:
global format_color
return "" if not format_color else Style.RESET_ALL
def create_info_text_log_line(e: BaseEvent) -> str:
color_tag: str = reset_color()
ts: str = get_ts().strftime("%H:%M:%S") # TODO: get this from the event.ts?
scrubbed_msg: str = scrub_secrets(e.message(), env_secrets())
log_line: str = f"{color_tag}{ts} {scrubbed_msg}"
return log_line
def create_debug_text_log_line(e: BaseEvent) -> str:
log_line: str = ""
# Create a separator if this is the beginning of an invocation
if type(e) == MainReportVersion:
separator = 30 * "="
log_line = f"\n\n{separator} {get_ts()} | {get_invocation_id()} {separator}\n"
color_tag: str = reset_color()
ts: str = get_ts().strftime("%H:%M:%S.%f")
scrubbed_msg: str = scrub_secrets(e.message(), env_secrets())
# Make the levels all 5 characters so they line up
level: str = f"{e.level_tag():<5}"
thread = ""
if threading.current_thread().name:
thread_name = threading.current_thread().name
thread_name = thread_name[:10]
thread_name = thread_name.ljust(10, " ")
thread = f" [{thread_name}]:"
log_line = log_line + f"{color_tag}{ts} [{level}]{thread} {scrubbed_msg}"
return log_line
# translates an Event to a completely formatted json log line
def create_json_log_line(e: BaseEvent) -> Optional[str]:
if type(e) == EmptyLine:
return None # will not be sent to logger
raw_log_line = event_to_json(e)
return scrub_secrets(raw_log_line, env_secrets())
# calls create_stdout_text_log_line() or create_json_log_line() according to logger config
def create_log_line(e: BaseEvent, file_output=False) -> Optional[str]:
global format_json
if format_json:
return create_json_log_line(e) # json output, both console and file
elif file_output is True or flags.DEBUG:
return create_debug_text_log_line(e) # default file output
else:
return create_info_text_log_line(e) # console output
# allows for reuse of this obnoxious if else tree.
# do not use for exceptions, it doesn't pass along exc_info, stack_info, or extra
def send_to_logger(l: Union[Logger, logbook.Logger], level_tag: str, log_line: str):
if not log_line:
return
if level_tag == "test":
# TODO after implmenting #3977 send to new test level
l.debug(log_line)
elif level_tag == "debug":
l.debug(log_line)
elif level_tag == "info":
l.info(log_line)
elif level_tag == "warn":
l.warning(log_line)
elif level_tag == "error":
l.error(log_line)
else:
raise AssertionError(
f"While attempting to log {log_line}, encountered the unhandled level: {level_tag}"
)
def warn_or_error(event, node=None):
if flags.WARN_ERROR:
from dbt.exceptions import raise_compiler_error
raise_compiler_error(scrub_secrets(event.info.msg, env_secrets()), node)
else:
fire_event(event)
# an alternative to fire_event which only creates and logs the event value
# if the condition is met. Does nothing otherwise.
def fire_event_if(conditional: bool, lazy_e: Callable[[], BaseEvent]) -> None:
if conditional:
fire_event(lazy_e())
# top-level method for accessing the new eventing system
# this is where all the side effects happen branched by event type
# (i.e. - mutating the event history, printing to stdout, logging
# to files, etc.)
def fire_event(e: BaseEvent) -> None:
# skip logs when `--log-cache-events` is not passed
if isinstance(e, Cache) and not flags.LOG_CACHE_EVENTS:
return
add_to_event_history(e)
# backwards compatibility for plugins that require old logger (dbt-rpc)
if flags.ENABLE_LEGACY_LOGGER:
# using Event::message because the legacy logger didn't differentiate messages by
# destination
log_line = create_log_line(e)
if log_line:
send_to_logger(GLOBAL_LOGGER, e.level_tag(), log_line)
return # exit the function to avoid using the current logger as well
# always logs debug level regardless of user input
if not isinstance(e, NoFile):
log_line = create_log_line(e, file_output=True)
# doesn't send exceptions to exception logger
if log_line:
send_to_logger(FILE_LOG, level_tag=e.level_tag(), log_line=log_line)
if not isinstance(e, NoStdOut):
# explicitly checking the debug flag here so that potentially expensive-to-construct
# log messages are not constructed if debug messages are never shown.
if e.level_tag() == "debug" and not flags.DEBUG:
return # eat the message in case it was one of the expensive ones
if e.level_tag() != "error" and flags.QUIET:
return # eat all non-exception messages in quiet mode
log_line = create_log_line(e)
if log_line:
send_to_logger(STDOUT_LOG, level_tag=e.level_tag(), log_line=log_line)
def get_metadata_vars() -> Dict[str, str]:
global metadata_vars
if metadata_vars is None:
metadata_vars = {
k[len(METADATA_ENV_PREFIX) :]: v
for k, v in os.environ.items()
if k.startswith(METADATA_ENV_PREFIX)
}
return metadata_vars
def reset_metadata_vars() -> None:
global metadata_vars
metadata_vars = None
def get_invocation_id() -> str:
global invocation_id
if invocation_id is None:
invocation_id = str(uuid.uuid4())
return invocation_id
def set_invocation_id() -> None:
# This is primarily for setting the invocation_id for separate
# commands in the dbt servers. It shouldn't be necessary for the CLI.
global invocation_id
invocation_id = str(uuid.uuid4())
# exactly one time stamp per concrete event
def get_ts() -> datetime:
ts = datetime.utcnow()
return ts
# preformatted time stamp
def get_ts_rfc3339() -> str:
ts = get_ts()
ts_rfc3339 = ts.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
return ts_rfc3339
def add_to_event_history(event):
if flags.EVENT_BUFFER_SIZE == 0:
return
global EVENT_HISTORY
if EVENT_HISTORY is None:
reset_event_history()
EVENT_HISTORY.append(event)
# We only set the EventBufferFull message for event buffers >= 10,000
if flags.EVENT_BUFFER_SIZE >= 10000 and len(EVENT_HISTORY) == (flags.EVENT_BUFFER_SIZE - 1):
fire_event(EventBufferFull())
def reset_event_history():
global EVENT_HISTORY
EVENT_HISTORY = deque(maxlen=flags.EVENT_BUFFER_SIZE)