diff --git a/CHANGELOG.md b/CHANGELOG.md index c1abb68a5bf..b3ed1a5b1e2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## dbt 0.6.1 (unreleased) + +#### Changes + +- add `--debug` flag, replace calls to `print()` with a global logger ([#256](https://github.com/analyst-collective/dbt/pull/256)) + ## dbt release 0.6.0 ### tl;dr diff --git a/dbt/compilation.py b/dbt/compilation.py index ce7d556801a..653cdbf55d9 100644 --- a/dbt/compilation.py +++ b/dbt/compilation.py @@ -1,17 +1,19 @@ - import os import fnmatch import jinja2 from collections import defaultdict +import time +import sqlparse + import dbt.project -from dbt.source import Source -from dbt.utils import find_model_by_fqn, find_model_by_name, dependency_projects, split_path, This, Var, compiler_error, to_string -from dbt.linker import Linker -from dbt.runtime import RuntimeContext import dbt.targets import dbt.templates -import time -import sqlparse + +from dbt.linker import Linker +from dbt.logger import GLOBAL_LOGGER as logger +from dbt.runtime import RuntimeContext +from dbt.source import Source +from dbt.utils import find_model_by_fqn, find_model_by_name, dependency_projects, split_path, This, Var, compiler_error, to_string CompilableEntities = ["models", "data tests", "schema tests", "archives", "analyses"] @@ -174,10 +176,10 @@ def wrapped_do_ref(*args): except RuntimeError as e: root = os.path.relpath(model.root_dir, model.project['project-root']) filepath = os.path.join(root, model.rel_filepath) - print("Compiler error in {}".format(filepath)) - print("Enabled models:") + logger.info("Compiler error in {}".format(filepath)) + logger.info("Enabled models:") for m in all_models: - print(" - {}".format(".".join(m.fqn))) + logger.info(" - {}".format(".".join(m.fqn))) raise e return wrapped_do_ref diff --git a/dbt/deprecations.py b/dbt/deprecations.py index f6d2adcb9eb..e6a26fc6341 100644 --- a/dbt/deprecations.py +++ b/dbt/deprecations.py @@ -1,4 +1,4 @@ - +from dbt.logger import GLOBAL_LOGGER as logger class DBTDeprecation(object): name = None @@ -7,12 +7,12 @@ class DBTDeprecation(object): def show(self, *args, **kwargs): if self.name not in active_deprecations: desc = self.description.format(**kwargs) - print("* Deprecation Warning: {}\n".format(desc)) + logger.info("* Deprecation Warning: {}\n".format(desc)) active_deprecations.add(self.name) class DBTRunTargetDeprecation(DBTDeprecation): name = 'run-target' - description = """profiles.yml configuration option 'run-target' is deprecated. Please use 'target' instead. + description = """profiles.yml configuration option 'run-target' is deprecated. Please use 'target' instead. The 'run-target' option will be removed (in favor of 'target') in DBT version 0.7.0""" class DBTInvalidPackageName(DBTDeprecation): diff --git a/dbt/logger.py b/dbt/logger.py index 38137d6705a..9fabc93ef98 100644 --- a/dbt/logger.py +++ b/dbt/logger.py @@ -1,43 +1,24 @@ import logging -import logging.config -import os +import sys -def make_log_dir_if_missing(log_dir): - if not os.path.exists(log_dir): - os.makedirs(log_dir) +# disable logs from other modules, excepting ERROR logs +logging.getLogger('contracts').setLevel(logging.ERROR) +logging.getLogger('requests').setLevel(logging.ERROR) +logging.getLogger('urllib3').setLevel(logging.ERROR) -def getLogger(log_dir, name): - make_log_dir_if_missing(log_dir) - filename = "dbt.log" - base_log_path = os.path.join(log_dir, filename) - dictLogConfig = { - "version":1, - "handlers": { - "fileHandler":{ - "class":"logging.handlers.TimedRotatingFileHandler", - "formatter":"fileFormatter", - "when": "d", # rotate daily - "interval": 1, - "backupCount": 7, - "filename": base_log_path - }, - }, - "loggers":{ - "dbt":{ - "handlers":["fileHandler"], - "level":"DEBUG", - "propagate": False - } - }, +# create a global console logger for dbt +handler = logging.StreamHandler(sys.stdout) +handler.setFormatter(logging.Formatter('%(message)s')) - "formatters":{ - "fileFormatter":{ - "format":"%(asctime)s - %(name)s - %(levelname)s - %(threadName)s - %(message)s" - } - } - } - logging.config.dictConfig(dictLogConfig) - logger = logging.getLogger(name) - return logger +logger = logging.getLogger() +logger.addHandler(handler) +logger.setLevel(logging.INFO) + +def initialize_logger(debug_mode=False,): + if debug_mode: + handler.setFormatter(logging.Formatter('%(asctime)-18s: %(message)s')) + logger.setLevel(logging.DEBUG) + +GLOBAL_LOGGER = logger diff --git a/dbt/main.py b/dbt/main.py index 3121acb40ca..11467b98e05 100644 --- a/dbt/main.py +++ b/dbt/main.py @@ -1,5 +1,5 @@ -from dbt.logger import getLogger +from dbt.logger import initialize_logger, GLOBAL_LOGGER as logger import argparse import os.path @@ -38,13 +38,15 @@ def main(args=None): return handle(args) except RuntimeError as e: - print("Encountered an error:") - print(str(e)) + logger.info("Encountered an error:") + logger.info(str(e)) sys.exit(1) def handle(args): parsed = parse_args(args) + initialize_logger(parsed.debug) + # this needs to happen after args are parsed so we can determine the correct profiles.yml file if is_opted_out(parsed.profiles_dir): dbt.tracking.do_not_track() @@ -76,7 +78,6 @@ def run_from_args(parsed): else: nearest_project_dir = get_nearest_project_dir() - if nearest_project_dir is None: raise RuntimeError("fatal: Not a dbt project (or any of the parent directories). Missing dbt_project.yml file") @@ -104,12 +105,16 @@ def invoke_dbt(parsed): proj = project.read_project('dbt_project.yml', parsed.profiles_dir, validate=False, profile_to_load=parsed.profile) proj.validate() except project.DbtProjectError as e: - print("Encountered an error while reading the project:") - print(" ERROR {}".format(str(e))) - print("Did you set the correct --profile? Using: {}".format(parsed.profile)) + logger.info("Encountered an error while reading the project:") + logger.info(" ERROR {}".format(str(e))) + logger.info("Did you set the correct --profile? Using: {}".format(parsed.profile)) + + logger.info("Valid profiles:") + all_profiles = project.read_profiles(parsed.profiles_dir).keys() - profiles_string = "\n".join([" - " + key for key in all_profiles]) - print("Valid profiles:\n{}".format(profiles_string)) + for profile in all_profiles: + logger.info(" - {}".format(profile)) + dbt.tracking.track_invalid_invocation(project=proj, args=parsed, result_type="invalid_profile", result=str(e)) return None @@ -118,16 +123,15 @@ def invoke_dbt(parsed): if parsed.target in targets: proj.cfg['target'] = parsed.target else: - print("Encountered an error while reading the project:") - print(" ERROR Specified target {} is not a valid option for profile {}".format(parsed.target, proj.profile_to_load)) - print("Valid targets are: {}".format(targets)) + logger.info("Encountered an error while reading the project:") + logger.info(" ERROR Specified target {} is not a valid option for profile {}".format(parsed.target, proj.profile_to_load)) + logger.info("Valid targets are: {}".format(targets)) dbt.tracking.track_invalid_invocation(project=proj, args=parsed, result_type="invalid_target", result="target not found") return None log_dir = proj.get('log-path', 'logs') - logger = getLogger(log_dir, __name__) - logger.info("running dbt with arguments %s", parsed) + logger.debug("running dbt with arguments %s", parsed) task = parsed.cls(args=parsed, project=proj) @@ -136,6 +140,8 @@ def invoke_dbt(parsed): def parse_args(args): p = argparse.ArgumentParser(prog='dbt: data build tool', formatter_class=argparse.RawTextHelpFormatter) p.add_argument('--version', action='version', version=dbt.version.get_version_information(), help="Show version information") + p.add_argument('-d', '--debug', action='store_true', help='Display debug logging during dbt execution. Useful for debugging and making bug reports.') + subs = p.add_subparsers() base_subparser = argparse.ArgumentParser(add_help=False) @@ -184,7 +190,10 @@ def parse_args(args): sub.add_argument('--threads', type=int, required=False, help="Specify number of threads to use while executing tests. Overrides settings in profiles.yml") sub.set_defaults(cls=test_task.TestTask, which='test') - if len(args) == 0: return p.print_help() + if len(args) == 0: + p.print_help() + sys.exit(1) parsed = p.parse_args(args) + return parsed diff --git a/dbt/runner.py b/dbt/runner.py index 569ee77a2c4..ec6ffb2ffa2 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -10,6 +10,7 @@ import yaml from datetime import datetime +from dbt.logger import GLOBAL_LOGGER as logger from dbt.compilation import compile_string from dbt.linker import Linker from dbt.templates import BaseCreateTemplate @@ -231,7 +232,7 @@ def post_run_all(self, models, results, context): relation_type = 'table' if model.materialization == 'incremental' else 'view' self.schema_helper.drop(schema_name, relation_type, model.name) count_dropped += 1 - print("Dropped {} dry-run models".format(count_dropped)) + logger.info("Dropped {} dry-run models".format(count_dropped)) class TestRunner(ModelRunner): run_type = 'test' @@ -333,7 +334,6 @@ def execute(self, target, model): class RunManager(object): def __init__(self, project, target_path, graph_type, args): - self.logger = logging.getLogger(__name__) self.project = project self.target_path = target_path self.graph_type = graph_type @@ -342,10 +342,10 @@ def __init__(self, project, target_path, graph_type, args): self.target = dbt.targets.get_target(self.project.run_environment(), self.args.threads) if self.target.should_open_tunnel(): - print("Opening ssh tunnel to host {}... ".format(self.target.ssh_host), end="") + logger.info("Opening ssh tunnel to host {}... ".format(self.target.ssh_host), end="") sys.stdout.flush() self.target.open_tunnel_if_needed() - print("Connected") + logger.info("Connected") self.schema = dbt.schema.Schema(self.project, self.target) @@ -369,7 +369,7 @@ def deserialize_graph(self): return linker def execute_model(self, runner, model): - self.logger.info("executing model %s", model) + logger.debug("executing model %s", model) result = runner.execute(self.target, model) return result @@ -385,12 +385,12 @@ def safe_execute_model(self, data): except (RuntimeError, psycopg2.ProgrammingError, psycopg2.InternalError) as e: error = "Error executing {filepath}\n{error}".format(filepath=model['build_path'], error=str(e).strip()) status = "ERROR" - self.logger.exception(error) + logger.exception(error) if type(e) == psycopg2.InternalError and ABORTED_TRANSACTION_STRING == e.diag.message_primary: return RunModelResult(model, error=ABORTED_TRANSACTION_STRING, status="SKIP") except Exception as e: error = "Unhandled error while executing {filepath}\n{error}".format(filepath=model['build_path'], error=str(e).strip()) - self.logger.exception(error) + logger.exception(error) raise e execution_time = time.time() - start_time @@ -428,27 +428,27 @@ def print_fancy_output_line(self, message, status, index, total, execution_time= if execution_time is None: status_time = "" else: - status_time = " in {execution_time:0.2f}s".format(execution_time=execution_time) + status_time = " in {execution_time:0.2f}s".format(execution_time=execution_time) output = "{justified} [{status}{status_time}]".format(justified=justified, status=status, status_time=status_time) - print(output) + logger.info(output) def execute_models(self, runner, model_dependency_list, on_failure): flat_models = list(itertools.chain.from_iterable(model_dependency_list)) num_models = len(flat_models) if num_models == 0: - print("WARNING: Nothing to do. Try checking your model configs and running `dbt compile`".format(self.target_path)) + logger.info("WARNING: Nothing to do. Try checking your model configs and running `dbt compile`".format(self.target_path)) return [] num_threads = self.target.threads - print("Concurrency: {} threads (target='{}')".format(num_threads, self.project.get_target().get('name'))) - print("Running!") + logger.info("Concurrency: {} threads (target='{}')".format(num_threads, self.project.get_target().get('name'))) + logger.info("Running!") pool = ThreadPool(num_threads) - print() - print(runner.pre_run_all_msg(flat_models)) + logger.info("") + logger.info(runner.pre_run_all_msg(flat_models)) runner.pre_run_all(flat_models, self.context) fqn_to_id_map = {model.fqn: i + 1 for (i, model) in enumerate(flat_models)} @@ -494,7 +494,7 @@ def on_complete(run_model_results): if run_model_result.errored: on_failure(run_model_result.model) - print(run_model_result.error) + logger.info(run_model_result.error) while model_index < num_models_this_batch: local_models = [] @@ -514,14 +514,14 @@ def on_complete(run_model_results): pool.close() pool.join() - print() - print(runner.post_run_all_msg(model_results)) + logger.info("") + logger.info(runner.post_run_all_msg(model_results)) runner.post_run_all(flat_models, model_results, self.context) return model_results def run_from_graph(self, runner, limit_to): - print("Loading dependency graph file") + logger.info("Loading dependency graph file") linker = self.deserialize_graph() compiled_models = [make_compiled_model(fqn, linker.get_node(fqn)) for fqn in linker.nodes()] relevant_compiled_models = [m for m in compiled_models if m.is_type(runner.run_type)] @@ -535,12 +535,12 @@ def run_from_graph(self, runner, limit_to): schema_name = self.target.schema - print("Connecting to redshift") + logger.info("Connecting to redshift") try: self.schema.create_schema_if_not_exists(schema_name) except psycopg2.OperationalError as e: - print("ERROR: Could not connect to the target database. Try `dbt debug` for more information") - print(str(e)) + logger.info("ERROR: Could not connect to the target database. Try `dbt debug` for more information") + logger.info(str(e)) sys.exit(1) existing = self.schema.query_for_existing(schema_name); @@ -563,10 +563,10 @@ def safe_run_from_graph(self, *args, **kwargs): raise finally: if self.target.should_open_tunnel(): - print("Closing SSH tunnel... ", end="") + logger.info("Closing SSH tunnel... ", end="") sys.stdout.flush() self.target.cleanup() - print("Done") + logger.info("Done") def run_tests_from_graph(self, test_schemas, test_data): @@ -575,12 +575,12 @@ def run_tests_from_graph(self, test_schemas, test_data): schema_name = self.target.schema - print("Connecting to redshift") + logger.info("Connecting to redshift") try: self.schema.create_schema_if_not_exists(schema_name) except psycopg2.OperationalError as e: - print("ERROR: Could not connect to the target database. Try `dbt debug` for more information") - print(str(e)) + logger.info("ERROR: Could not connect to the target database. Try `dbt debug` for more information") + logger.info(str(e)) sys.exit(1) test_runner = TestRunner(self.project, self.schema) @@ -626,5 +626,3 @@ def dry_run(self, limit_to=None): def run_archive(self): runner = ArchiveRunner(self.project, self.schema) return self.safe_run_from_graph(runner, None) - - diff --git a/dbt/schema.py b/dbt/schema.py index 644f954dfeb..26dc0840447 100644 --- a/dbt/schema.py +++ b/dbt/schema.py @@ -1,4 +1,6 @@ +from dbt.logger import GLOBAL_LOGGER as logger + import psycopg2 import logging import time @@ -74,7 +76,6 @@ class Schema(object): def __init__(self, project, target): self.project = project self.target = target - self.logger = logging.getLogger(__name__) self.schema_cache = {} self.runtime_existing = self.query_for_existing(self.target.schema) @@ -124,34 +125,34 @@ def execute(self, sql): with self.target.get_handle() as handle: with handle.cursor() as cursor: try: - self.logger.debug("SQL: %s", sql) + logger.debug("SQL: %s", sql) pre = time.time() cursor.execute(sql) post = time.time() - self.logger.debug("SQL status: %s in %0.2f seconds", cursor.statusmessage, post-pre) + logger.debug("SQL status: %s in %0.2f seconds", cursor.statusmessage, post-pre) return cursor.statusmessage except Exception as e: self.target.rollback() - self.logger.exception("Error running SQL: %s", sql) - self.logger.debug("rolling back connection") + logger.exception("Error running SQL: %s", sql) + logger.debug("rolling back connection") raise e def execute_and_fetch(self, sql): with self.target.get_handle() as handle: with handle.cursor() as cursor: try: - self.logger.debug("SQL: %s", sql) + logger.debug("SQL: %s", sql) pre = time.time() cursor.execute(sql) post = time.time() - self.logger.debug("SQL status: %s in %0.2f seconds", cursor.statusmessage, post-pre) + logger.debug("SQL status: %s in %0.2f seconds", cursor.statusmessage, post-pre) data = cursor.fetchall() - self.logger.debug("SQL response: %s", data) + logger.debug("SQL response: %s", data) return data except Exception as e: self.target.rollback() - self.logger.exception("Error running SQL: %s", sql) - self.logger.debug("rolling back connection") + logger.exception("Error running SQL: %s", sql) + logger.debug("rolling back connection") raise e def execute_and_handle_permissions(self, query, model_name): @@ -173,31 +174,31 @@ def execute_without_auto_commit(self, sql, handle=None): cursor = handle.cursor() try: - self.logger.debug("SQL: %s", sql) + logger.debug("SQL: %s", sql) pre = time.time() cursor.execute(sql) post = time.time() - self.logger.debug("SQL status: %s in %0.2f seconds", cursor.statusmessage, post-pre) + logger.debug("SQL status: %s in %0.2f seconds", cursor.statusmessage, post-pre) return handle, cursor.statusmessage except Exception as e: self.target.rollback() - self.logger.exception("Error running SQL: %s", sql) - self.logger.debug("rolling back connection") + logger.exception("Error running SQL: %s", sql) + logger.debug("rolling back connection") raise e finally: cursor.close() def truncate(self, schema, relation): sql = 'truncate table "{schema}"."{relation}"'.format(schema=schema, relation=relation) - self.logger.info("dropping table %s.%s", schema, relation) + logger.debug("dropping table %s.%s", schema, relation) self.execute_and_handle_permissions(sql, relation) - self.logger.info("dropped %s.%s", schema, relation) + logger.debug("dropped %s.%s", schema, relation) def drop(self, schema, relation_type, relation): sql = 'drop {relation_type} if exists "{schema}"."{relation}" cascade'.format(schema=schema, relation_type=relation_type, relation=relation) - self.logger.info("dropping %s %s.%s", relation_type, schema, relation) + logger.debug("dropping %s %s.%s", relation_type, schema, relation) self.execute_and_handle_permissions(sql, relation) - self.logger.info("dropped %s %s.%s", relation_type, schema, relation) + logger.debug("dropped %s %s.%s", relation_type, schema, relation) def sql_columns_in_table(self, schema_name, table_name): sql = """ @@ -211,11 +212,11 @@ def sql_columns_in_table(self, schema_name, table_name): return sql def get_columns_in_table(self, schema_name, table_name, use_cached=True): - self.logger.debug("getting columns in table %s.%s", schema_name, table_name) + logger.debug("getting columns in table %s.%s", schema_name, table_name) columns = self.get_table_columns_if_cached(schema_name, table_name) if columns is not None and use_cached: - self.logger.debug("Found columns (in cache): %s", columns) + logger.debug("Found columns (in cache): %s", columns) return columns sql = self.sql_columns_in_table(schema_name, table_name) @@ -229,14 +230,14 @@ def get_columns_in_table(self, schema_name, table_name, use_cached=True): self.cache_table_columns(schema_name, table_name, columns) - self.logger.debug("Found columns: %s", columns) + logger.debug("Found columns: %s", columns) return columns def rename(self, schema, from_name, to_name): rename_query = 'alter table "{schema}"."{from_name}" rename to "{to_name}"'.format(schema=schema, from_name=from_name, to_name=to_name) - self.logger.info("renaming model %s.%s --> %s.%s", schema, from_name, schema, to_name) + logger.debug("renaming model %s.%s --> %s.%s", schema, from_name, schema, to_name) self.execute_and_handle_permissions(rename_query, from_name) - self.logger.info("renamed model %s.%s --> %s.%s", schema, from_name, schema, to_name) + logger.debug("renamed model %s.%s --> %s.%s", schema, from_name, schema, to_name) def get_missing_columns(self, from_schema, from_table, to_schema, to_table): "Returns dict of {column:type} for columns in from_table that are missing from to_table" @@ -253,7 +254,7 @@ def create_table(self, schema, table, columns, sort, dist): dist = self.target.dist_qualifier(dist) sort = self.target.sort_qualifier('compound', sort) sql = 'create table if not exists "{schema}"."{table}" (\n {fields}\n) {dist} {sort};'.format(schema=schema, table=table, fields=fields_csv, sort=sort, dist=dist) - self.logger.info('creating table "%s"."%s"'.format(schema, table)) + logger.debug('creating table "%s"."%s"'.format(schema, table)) self.execute_and_handle_permissions(sql, table) def create_schema_if_not_exists(self, schema_name): @@ -297,7 +298,7 @@ def expand_column_types_if_needed(self, temp_table, to_schema, to_table): if dest_column is not None and dest_column.can_expand_to(source_column): new_type = Column.string_type(source_column.string_size()) - self.logger.debug("Changing col type from %s to %s in table %s.%s", dest_column.data_type, new_type, to_schema, to_table) + logger.debug("Changing col type from %s to %s in table %s.%s", dest_column.data_type, new_type, to_schema, to_table) self.alter_column_type(to_schema, to_table, column_name, new_type) # update these cols in the cache! This is a hack to fix broken incremental models for type expansion. TODO @@ -311,4 +312,3 @@ def table_exists(self, schema, table): tables = self.query_for_existing(schema) exists = tables.get(table) is not None return exists - diff --git a/dbt/schema_tester.py b/dbt/schema_tester.py index ba1b77f31d3..0c2bde4cc72 100644 --- a/dbt/schema_tester.py +++ b/dbt/schema_tester.py @@ -1,5 +1,6 @@ import os +from dbt.logger import GLOBAL_LOGGER as logger import dbt.targets import psycopg2 @@ -65,7 +66,6 @@ class SchemaTester(object): def __init__(self, project): - self.logger = logging.getLogger(__name__) self.project = project self.test_started_at = datetime.datetime.now() @@ -80,23 +80,23 @@ def execute_query(self, model, sql): with target.get_handle() as handle: with handle.cursor() as cursor: try: - self.logger.debug("SQL: %s", sql) + logger.debug("SQL: %s", sql) pre = time.time() cursor.execute(sql) post = time.time() - self.logger.debug("SQL status: %s in %d seconds", cursor.statusmessage, post-pre) + logger.debug("SQL status: %s in %d seconds", cursor.statusmessage, post-pre) except psycopg2.ProgrammingError as e: - self.logger.exception('programming error: %s', sql) + logger.exception('programming error: %s', sql) return e.diag.message_primary except Exception as e: - self.logger.exception('encountered exception while running: %s', sql) + logger.exception('encountered exception while running: %s', sql) e.model = model raise e result = cursor.fetchone() if len(result) != 1: - self.logger.error("SQL: %s", sql) - self.logger.error("RESULT: %s", result) + logger.error("SQL: %s", sql) + logger.error("RESULT: %s", result) raise RuntimeError("Unexpected validation result. Expected 1 record, got {}".format(len(result))) else: return result[0] @@ -105,10 +105,8 @@ def validate_schema(self, schema_test): sql = schema_test.render() num_rows = self.execute_query(model, sql) if num_rows == 0: - print(" OK") + logger.info(" OK") yield True else: - print(" FAILED ({})".format(num_rows)) + logger.info(" FAILED ({})".format(num_rows)) yield False - - diff --git a/dbt/seeder.py b/dbt/seeder.py index 3ceb92b364f..52d81b66f39 100644 --- a/dbt/seeder.py +++ b/dbt/seeder.py @@ -5,8 +5,9 @@ from sqlalchemy.dialects import postgresql as postgresql_dialect import psycopg2 -from dbt.source import Source import dbt.targets +from dbt.source import Source +from dbt.logger import GLOBAL_LOGGER as logger class Seeder: def __init__(self, project): @@ -19,18 +20,18 @@ def find_csvs(self): def drop_table(self, cursor, schema, table): sql = 'drop table if exists "{schema}"."{table}" cascade'.format(schema=schema, table=table) - print("Dropping table {}.{}".format(schema, table)) + logger.info("Dropping table {}.{}".format(schema, table)) cursor.execute(sql) def truncate_table(self, cursor, schema, table): sql = 'truncate table "{schema}"."{table}"'.format(schema=schema, table=table) - print("Truncating table {}.{}".format(schema, table)) + logger.info("Truncating table {}.{}".format(schema, table)) cursor.execute(sql) def create_table(self, cursor, schema, table, virtual_table): sql_table = csv_sql.make_table(virtual_table, db_schema=schema) create_table_sql = csv_sql.make_create_table_statement(sql_table, dialect='postgresql') - print("Creating table {}.{}".format(schema, table)) + logger.info("Creating table {}.{}".format(schema, table)) cursor.execute(create_table_sql) def insert_into_table(self, cursor, schema, table, virtual_table): @@ -51,7 +52,7 @@ def quote_or_null(s): record_csv_wrapped = "({})".format(record_csv) records.append(record_csv_wrapped) insert_sql = "{} {}".format(base_insert, ",\n".join(records)) - print("Inserting {} records into table {}.{}".format(len(virtual_table.to_rows()), schema, table)) + logger.info("Inserting {} records into table {}.{}".format(len(virtual_table.to_rows()), schema, table)) cursor.execute(insert_sql) def existing_tables(self, cursor, schema): @@ -60,7 +61,7 @@ def existing_tables(self, cursor, schema): cursor.execute(sql) existing = set([row[0] for row in cursor.fetchall()]) return existing - + def do_seed(self, schema, cursor, drop_existing): existing_tables = self.existing_tables(cursor, schema) @@ -83,10 +84,10 @@ def do_seed(self, schema, cursor, drop_existing): try: self.insert_into_table(cursor, schema, table_name, virtual_table) except psycopg2.ProgrammingError as e: - print('Encountered an error while inserting into table "{}"."{}"'.format(schema, table_name)) - print('Check for formatting errors in {}'.format(csv.filepath)) - print('Try --drop-existing to delete and recreate the table instead') - print(str(e)) + logger.info('Encountered an error while inserting into table "{}"."{}"'.format(schema, table_name)) + logger.info('Check for formatting errors in {}'.format(csv.filepath)) + logger.info('Try --drop-existing to delete and recreate the table instead') + logger.info(str(e)) def seed(self, drop_existing=False): diff --git a/dbt/task/archive.py b/dbt/task/archive.py index fde6748f564..fdbd6d75bb8 100644 --- a/dbt/task/archive.py +++ b/dbt/task/archive.py @@ -2,6 +2,7 @@ from dbt.runner import RunManager from dbt.templates import ArchiveInsertTemplate from dbt.compilation import Compiler +from dbt.logger import GLOBAL_LOGGER as logger class ArchiveTask: def __init__(self, args, project): @@ -13,11 +14,10 @@ def compile(self): compiler = Compiler(self.project, self.create_template, self.args) compiler.initialize() compiled = compiler.compile_archives() - print("Compiled {} archives".format(len(compiled))) + logger.info("Compiled {} archives".format(len(compiled))) def run(self): self.compile() runner = RunManager(self.project, self.project['target-path'], self.create_template.label, self.args) results = runner.run_archive() - diff --git a/dbt/task/compile.py b/dbt/task/compile.py index 5bcf62118a5..f173cb33e6d 100644 --- a/dbt/task/compile.py +++ b/dbt/task/compile.py @@ -1,6 +1,7 @@ from dbt.compilation import Compiler, CompilableEntities from dbt.templates import BaseCreateTemplate, DryCreateTemplate +from dbt.logger import GLOBAL_LOGGER as logger class CompileTask: @@ -19,4 +20,4 @@ def run(self): results = compiler.compile(limit_to=CompilableEntities) stat_line = ", ".join(["{} {}".format(results[k], k) for k in CompilableEntities]) - print("Compiled {}".format(stat_line)) + logger.info("Compiled {}".format(stat_line)) diff --git a/dbt/task/debug.py b/dbt/task/debug.py index 45f5b56117f..482bfdbbd3a 100644 --- a/dbt/task/debug.py +++ b/dbt/task/debug.py @@ -1,4 +1,5 @@ import pprint +from dbt.logger import GLOBAL_LOGGER as logger class DebugTask: @@ -7,6 +8,8 @@ def __init__(self, args, project): self.project = project def run(self): - print("args: {}".format(self.args)) - print("project: ") + logger.info("args: {}".format(self.args)) + logger.info("project: ") + + # TODO: switch this out for a log statement pprint.pprint(self.project) diff --git a/dbt/task/deps.py b/dbt/task/deps.py index 1556213f3dd..d5c31ecaf4c 100644 --- a/dbt/task/deps.py +++ b/dbt/task/deps.py @@ -6,6 +6,8 @@ import subprocess import dbt.project as project +from dbt.logger import GLOBAL_LOGGER as logger + def folder_from_git_remote(remote_spec): start = remote_spec.rfind('/') + 1 end = len(remote_spec) - (4 if remote_spec.endswith('.git') else 0) @@ -17,7 +19,7 @@ def __init__(self, args, project): self.project = project def __checkout_branch(self, branch, full_path): - print(" checking out branch {}".format(branch)) + logger.info(" checking out branch {}".format(branch)) proc = subprocess.Popen( ['git', 'checkout', branch], cwd=full_path, @@ -38,7 +40,7 @@ def __pull_repo(self, repo, branch=None): folder = None if exists: folder = exists.group(1) - print("updating existing dependency {}".format(folder)) + logger.info("updating existing dependency {}".format(folder)) full_path = os.path.join(self.project['modules-path'], folder) proc = subprocess.Popen( ['git', 'fetch', '--all'], @@ -59,7 +61,7 @@ def __pull_repo(self, repo, branch=None): matches = re.match("Cloning into '(.+)'", err.decode('utf-8')) folder = matches.group(1) full_path = os.path.join(self.project['modules-path'], folder) - print("pulled new dependency {}".format(folder)) + logger.info("pulled new dependency {}".format(folder)) if branch is not None: self.__checkout_branch(branch, full_path) @@ -97,7 +99,7 @@ def __pull_deps_recursive(self, repos, processed_repos = None, i=0): try: if repo_folder in processed_repos: - print("skipping already processed dependency {}".format(repo_folder)) + logger.info("skipping already processed dependency {}".format(repo_folder)) else: dep_folder = self.__pull_repo(repo, branch) dep_project = project.read_project( @@ -109,7 +111,7 @@ def __pull_deps_recursive(self, repos, processed_repos = None, i=0): self.__pull_deps_recursive(dep_project['repositories'], processed_repos, i+1) except IOError as e: if e.errno == errno.ENOENT: - print("'{}' is not a valid dbt project - dbt_project.yml not found".format(repo)) + logger.info("'{}' is not a valid dbt project - dbt_project.yml not found".format(repo)) exit(1) else: raise e diff --git a/dbt/task/run.py b/dbt/task/run.py index 8e4cd56e0bf..79ca53361fd 100644 --- a/dbt/task/run.py +++ b/dbt/task/run.py @@ -1,10 +1,11 @@ - from __future__ import print_function import os -from dbt.templates import DryCreateTemplate, BaseCreateTemplate -from dbt.runner import RunManager + from dbt.compilation import Compiler, CompilableEntities +from dbt.logger import GLOBAL_LOGGER as logger +from dbt.runner import RunManager +from dbt.templates import DryCreateTemplate, BaseCreateTemplate THREAD_LIMIT = 9 @@ -20,7 +21,7 @@ def compile(self): results = compiler.compile(limit_to=['models'] ) stat_line = ", ".join(["{} {}".format(results[k], k) for k in CompilableEntities]) - print("Compiled {}".format(stat_line)) + logger.info("Compiled {}".format(stat_line)) return create_template.label @@ -39,6 +40,4 @@ def run(self): errored = len([r for r in results if r.errored]) skipped = len([r for r in results if r.skipped]) - - print() - print("Done. PASS={passed} ERROR={errored} SKIP={skipped} TOTAL={total}".format(total=total, passed=passed, errored=errored, skipped=skipped)) + logger.info("Done. PASS={passed} ERROR={errored} SKIP={skipped} TOTAL={total}".format(total=total, passed=passed, errored=errored, skipped=skipped)) diff --git a/dbt/task/test.py b/dbt/task/test.py index f2516cf435f..db27ca48ce1 100644 --- a/dbt/task/test.py +++ b/dbt/task/test.py @@ -7,6 +7,7 @@ from dbt.templates import DryCreateTemplate, BaseCreateTemplate from dbt.runner import RunManager from dbt.schema_tester import SchemaTester +from dbt.logger import GLOBAL_LOGGER as logger class TestTask: @@ -29,7 +30,7 @@ def compile(self): results = compiler.compile(limit_to=['tests']) stat_line = ", ".join(["{} {}".format(results[k], k) for k in CompilableEntities]) - print("Compiled {}".format(stat_line)) + logger.info("Compiled {}".format(stat_line)) return compiler @@ -46,5 +47,5 @@ def run(self): else: raise RuntimeError("unexpected") - print("Done!") + logger.info("Done!") return res diff --git a/dbt/tracking.py b/dbt/tracking.py index 01c63929655..843d43e449d 100644 --- a/dbt/tracking.py +++ b/dbt/tracking.py @@ -1,7 +1,8 @@ - +from dbt.logger import GLOBAL_LOGGER as logger from dbt import version as dbt_version from snowplow_tracker import Subject, Tracker, Emitter, logger as sp_logger from snowplow_tracker import SelfDescribingJson, disable_contracts + disable_contracts() import platform @@ -11,8 +12,6 @@ import json import logging -logger = logging.getLogger(__name__) - sp_logger.setLevel(100) COLLECTOR_URL = "events.fivetran.com/snowplow/forgiving_ain" @@ -155,7 +154,7 @@ def track(*args, **kwargs): if __is_do_not_track: return else: - #logger.debug("Sending event: {}".format(kwargs)) + logger.debug("Sending event: {}".format(kwargs)) try: tracker.track_struct_event(*args, **kwargs) except Exception as e: @@ -182,9 +181,10 @@ def track_invalid_invocation(project=None, args=None, result_type=None, result=N track(category="dbt", action='invocation', label='invalid', context=context) def flush(): + logger.debug("Flushing usage events") tracker.flush() def do_not_track(): global __is_do_not_track - logger.info("Not sending anonymous usage events") + logger.debug("Not sending anonymous usage events") __is_do_not_track = True diff --git a/dbt/utils.py b/dbt/utils.py index 4ecb56c51e8..ee4cf827cb1 100644 --- a/dbt/utils.py +++ b/dbt/utils.py @@ -4,6 +4,7 @@ import pprint import json import dbt.project +from dbt.logger import GLOBAL_LOGGER as logger DBTConfigKeys = [ 'enabled', @@ -39,7 +40,7 @@ def compiler_error(model, msg): raise RuntimeError("! Compilation error while compiling model {}:\n! {}".format(name, msg)) def compiler_warning(model, msg): - print("* Compilation warning while compiling model {}:\n* {}".format(model.nice_name, msg)) + logger.info("* Compilation warning while compiling model {}:\n* {}".format(model.nice_name, msg)) class Var(object): UndefinedVarError = "Required var '{}' not found in config:\nVars supplied to {} = {}" @@ -96,8 +97,8 @@ def dependency_projects(project): try: yield dbt.project.read_project(os.path.join(full_obj, 'dbt_project.yml'), project.profiles_dir, profile_to_load=project.profile_to_load) except dbt.project.DbtProjectError as e: - print("Error reading dependency project at {}".format(full_obj)) - print(str(e)) + logger.info("Error reading dependency project at {}".format(full_obj)) + logger.info(str(e)) def split_path(path): norm = os.path.normpath(path)