From 970049fde832f16570670417ec25aea9b0780e56 Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Fri, 24 Jul 2020 15:29:22 -0600 Subject: [PATCH 1/6] Always close connections in release() --- CHANGELOG.md | 1 + core/dbt/adapters/base/connections.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fedcda61b1e..4e2dd721070 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ ### Fixes - fast-fail option with adapters that don't support cancelling queries will now passthrough the original error messages ([#2644](https://github.com/fishtown-analytics/dbt/issues/2644), [#2646](https://github.com/fishtown-analytics/dbt/pull/2646)) - `dbt clean` no longer requires a profile ([#2620](https://github.com/fishtown-analytics/dbt/issues/2620), [#2649](https://github.com/fishtown-analytics/dbt/pull/2649)) +- Close all connections so snowflake's keepalive thread will exit. ([#2645](https://github.com/fishtown-analytics/dbt/issues/2645), [#2650](https://github.com/fishtown-analytics/dbt/pull/2650)) Contributors: - [@joshpeng-quibi](https://github.com/joshpeng-quibi) ([#2646](https://github.com/fishtown-analytics/dbt/pull/2646)) diff --git a/core/dbt/adapters/base/connections.py b/core/dbt/adapters/base/connections.py index 1909d9627a3..4b280e50362 100644 --- a/core/dbt/adapters/base/connections.py +++ b/core/dbt/adapters/base/connections.py @@ -179,8 +179,8 @@ def release(self) -> None: if conn.state == 'open': if conn.transaction_open is True: self._rollback(conn) - else: - self.close(conn) + # always close the connection + self.close(conn) except Exception: # if rollback or close failed, remove our busted connection self.clear_thread_connection() From 3b917b9d79bce6f303249e8dbbd7690c48a75ad1 Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Mon, 27 Jul 2020 10:36:32 -0600 Subject: [PATCH 2/6] Handle the fallout of closing connections in release - close() implies rollback, so do not call it - make sure to not open new connections for executors in single-threaded mode - logging cleanups - fix a test case that never acquired connections - to cancel other connections, one must first acquire a connection for the master thread - change a number of release() calls to rollback release vs rollback --- core/dbt/adapters/base/connections.py | 24 ++++--- core/dbt/adapters/base/impl.py | 72 ++++++++++--------- core/dbt/task/runnable.py | 48 +++++++------ core/dbt/utils.py | 31 ++++++-- .../dbt/adapters/bigquery/connections.py | 3 +- .../dbt/adapters/postgres/connections.py | 5 +- .../dbt/adapters/snowflake/connections.py | 5 +- .../test_execute_fetch_and_serialize.py | 3 +- .../test_query_comments.py | 39 ++++------ 9 files changed, 124 insertions(+), 106 deletions(-) diff --git a/core/dbt/adapters/base/connections.py b/core/dbt/adapters/base/connections.py index 4b280e50362..a7d51892d6d 100644 --- a/core/dbt/adapters/base/connections.py +++ b/core/dbt/adapters/base/connections.py @@ -88,6 +88,11 @@ def clear_transaction(self) -> None: self.begin() self.commit() + def rollback_if_open(self) -> None: + conn = self.get_if_exists() + if conn is not None and conn.handle and conn.transaction_open: + self._rollback(conn) + @abc.abstractmethod def exception_handler(self, sql: str) -> ContextManager: """Create a context manager that handles exceptions caused by database @@ -176,10 +181,8 @@ def release(self) -> None: return try: - if conn.state == 'open': - if conn.transaction_open is True: - self._rollback(conn) - # always close the connection + # always close the connection. close() calls _rollback() if there + # is an open transaction self.close(conn) except Exception: # if rollback or close failed, remove our busted connection @@ -230,11 +233,10 @@ def _close_handle(cls, connection: Connection) -> None: """Perform the actual close operation.""" # On windows, sometimes connection handles don't have a close() attr. if hasattr(connection.handle, 'close'): - logger.debug('On {}: Close'.format(connection.name)) + logger.debug(f'On {connection.name}: Close') connection.handle.close() else: - logger.debug('On {}: No close available on handle' - .format(connection.name)) + logger.debug(f'On {connection.name}: No close available on handle') @classmethod def _rollback(cls, connection: Connection) -> None: @@ -247,10 +249,11 @@ def _rollback(cls, connection: Connection) -> None: if connection.transaction_open is False: raise dbt.exceptions.InternalException( - 'Tried to rollback transaction on connection "{}", but ' - 'it does not have one open!'.format(connection.name)) + f'Tried to rollback transaction on connection ' + f'"{connection.name}", but it does not have one open!' + ) - logger.debug('On {}: ROLLBACK'.format(connection.name)) + logger.debug(f'On {connection.name}: ROLLBACK') cls._rollback_handle(connection) connection.transaction_open = False @@ -268,6 +271,7 @@ def close(cls, connection: Connection) -> Connection: return connection if connection.transaction_open and connection.handle: + logger.debug('On {}: ROLLBACK'.format(connection.name)) cls._rollback_handle(connection) connection.transaction_open = False diff --git a/core/dbt/adapters/base/impl.py b/core/dbt/adapters/base/impl.py index 6af38b5264b..db4d6ce8254 100644 --- a/core/dbt/adapters/base/impl.py +++ b/core/dbt/adapters/base/impl.py @@ -312,13 +312,6 @@ def _get_catalog_schemas(self, manifest: Manifest) -> SchemaSearchMap: # databases return info_schema_name_map - def _list_relations_get_connection( - self, schema_relation: BaseRelation - ) -> List[BaseRelation]: - name = f'list_{schema_relation.database}_{schema_relation.schema}' - with self.connection_named(name): - return self.list_relations_without_caching(schema_relation) - def _relations_cache_for_schemas(self, manifest: Manifest) -> None: """Populate the relations cache for the given schemas. Returns an iterable of the schemas populated, as strings. @@ -328,10 +321,16 @@ def _relations_cache_for_schemas(self, manifest: Manifest) -> None: cache_schemas = self._get_cache_schemas(manifest) with executor(self.config) as tpe: - futures: List[Future[List[BaseRelation]]] = [ - tpe.submit(self._list_relations_get_connection, cache_schema) - for cache_schema in cache_schemas - ] + futures: List[Future[List[BaseRelation]]] = [] + for cache_schema in cache_schemas: + fut = tpe.submit_connected( + self, + f'list_{cache_schema.database}_{cache_schema.schema}', + self.list_relations_without_caching, + cache_schema + ) + futures.append(fut) + for future in as_completed(futures): # if we can't read the relations we need to just raise anyway, # so just call future.result() and let that raise on failure @@ -1001,24 +1000,18 @@ def _get_one_catalog( manifest: Manifest, ) -> agate.Table: - name = '.'.join([ - str(information_schema.database), - 'information_schema' - ]) - - with self.connection_named(name): - kwargs = { - 'information_schema': information_schema, - 'schemas': schemas - } - table = self.execute_macro( - GET_CATALOG_MACRO_NAME, - kwargs=kwargs, - release=True, - # pass in the full manifest so we get any local project - # overrides - manifest=manifest, - ) + kwargs = { + 'information_schema': information_schema, + 'schemas': schemas + } + table = self.execute_macro( + GET_CATALOG_MACRO_NAME, + kwargs=kwargs, + release=False, + # pass in the full manifest so we get any local project + # overrides + manifest=manifest, + ) results = self._catalog_filter_table(table, manifest) return results @@ -1029,10 +1022,21 @@ def get_catalog( schema_map = self._get_catalog_schemas(manifest) with executor(self.config) as tpe: - futures: List[Future[agate.Table]] = [ - tpe.submit(self._get_one_catalog, info, schemas, manifest) - for info, schemas in schema_map.items() if len(schemas) > 0 - ] + futures: List[Future[agate.Table]] = [] + for info, schemas in schema_map.items(): + if len(schemas) == 0: + continue + name = '.'.join([ + str(info.database), + 'information_schema' + ]) + + fut = tpe.submit_connected( + self, name, + self._get_one_catalog, info, schemas, manifest + ) + futures.append(fut) + catalogs, exceptions = catch_as_completed(futures) return catalogs, exceptions @@ -1059,7 +1063,7 @@ def calculate_freshness( table = self.execute_macro( FRESHNESS_MACRO_NAME, kwargs=kwargs, - release=True, + release=False, manifest=manifest ) # now we have a 1-row table of the maximum `loaded_at_field` value and diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index e19e277ea7a..e4effd91aba 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -297,13 +297,15 @@ def _cancel_connections(self, pool): dbt.ui.printer.print_timestamped_line(msg, yellow) else: - for conn_name in adapter.cancel_open_connections(): - if self.manifest is not None: - node = self.manifest.nodes.get(conn_name) - if node is not None and node.is_ephemeral_model: - continue - # if we don't have a manifest/don't have a node, print anyway. - dbt.ui.printer.print_cancel_line(conn_name) + with adapter.connection_named('master'): + for conn_name in adapter.cancel_open_connections(): + if self.manifest is not None: + node = self.manifest.nodes.get(conn_name) + if node is not None and node.is_ephemeral_model: + continue + # if we don't have a manifest/don't have a node, print + # anyway. + dbt.ui.printer.print_cancel_line(conn_name) pool.join() @@ -457,18 +459,15 @@ def list_schemas( db_lowercase = dbt.utils.lowercase(db_only.database) if db_only.database is None: database_quoted = None - conn_name = 'list_schemas' else: database_quoted = str(db_only) - conn_name = f'list_{db_only.database}' - with adapter.connection_named(conn_name): - # we should never create a null schema, so just filter them out - return [ - (db_lowercase, s.lower()) - for s in adapter.list_schemas(database_quoted) - if s is not None - ] + # we should never create a null schema, so just filter them out + return [ + (db_lowercase, s.lower()) + for s in adapter.list_schemas(database_quoted) + if s is not None + ] def create_schema(relation: BaseRelation) -> None: db = relation.database or '' @@ -480,9 +479,13 @@ def create_schema(relation: BaseRelation) -> None: create_futures = [] with dbt.utils.executor(self.config) as tpe: - list_futures = [ - tpe.submit(list_schemas, db) for db in required_databases - ] + for req in required_databases: + if req.database is None: + name = 'list_schemas' + else: + name = f'list_{req.database}' + fut = tpe.submit_connected(adapter, name, list_schemas, req) + list_futures.append(fut) for ls_future in as_completed(list_futures): existing_schemas_lowered.update(ls_future.result()) @@ -499,9 +502,12 @@ def create_schema(relation: BaseRelation) -> None: db_schema = (db_lower, schema.lower()) if db_schema not in existing_schemas_lowered: existing_schemas_lowered.add(db_schema) - create_futures.append( - tpe.submit(create_schema, info) + + fut = tpe.submit_connected( + adapter, f'create_{info.database or ""}_{info.schema}', + create_schema, info ) + create_futures.append(fut) for create_future in as_completed(create_futures): # trigger/re-raise any excceptions while creating schemas diff --git a/core/dbt/utils.py b/core/dbt/utils.py index 989b93308d5..2fba9f669b4 100644 --- a/core/dbt/utils.py +++ b/core/dbt/utils.py @@ -8,6 +8,7 @@ import itertools import json import os +from contextlib import contextmanager from enum import Enum from typing_extensions import Protocol from typing import ( @@ -518,8 +519,16 @@ def format_bytes(num_bytes): return "> 1024 TB" +class ConnectingExecutor(concurrent.futures.Executor): + def submit_connected(self, adapter, conn_name, func, *args, **kwargs): + def connected(conn_name, func, *args, **kwargs): + with self.connection_named(adapter, conn_name): + return func(*args, **kwargs) + return self.submit(connected, conn_name, func, *args, **kwargs) + + # a little concurrent.futures.Executor for single-threaded mode -class SingleThreadedExecutor(concurrent.futures.Executor): +class SingleThreadedExecutor(ConnectingExecutor): def submit(*args, **kwargs): # this basic pattern comes from concurrent.futures.Executor itself, # but without handling the `fn=` form. @@ -544,6 +553,20 @@ def submit(*args, **kwargs): fut.set_result(result) return fut + @contextmanager + def connection_named(self, adapter, name): + yield + + +class MultiThreadedExecutor( + ConnectingExecutor, + concurrent.futures.ThreadPoolExecutor, +): + @contextmanager + def connection_named(self, adapter, name): + with adapter.connection_named(name): + yield + class ThreadedArgs(Protocol): single_threaded: bool @@ -554,13 +577,11 @@ class HasThreadingConfig(Protocol): threads: Optional[int] -def executor(config: HasThreadingConfig) -> concurrent.futures.Executor: +def executor(config: HasThreadingConfig) -> ConnectingExecutor: if config.args.single_threaded: return SingleThreadedExecutor() else: - return concurrent.futures.ThreadPoolExecutor( - max_workers=config.threads - ) + return MultiThreadedExecutor(max_workers=config.threads) def fqn_search( diff --git a/plugins/bigquery/dbt/adapters/bigquery/connections.py b/plugins/bigquery/dbt/adapters/bigquery/connections.py index ec07565ed0f..7254d905845 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/connections.py +++ b/plugins/bigquery/dbt/adapters/bigquery/connections.py @@ -10,6 +10,7 @@ from dbt.utils import format_bytes from dbt.clients import agate_helper, gcloud +from dbt.contracts.connection import ConnectionState from dbt.exceptions import ( FailedToConnectException, RuntimeException, DatabaseException ) @@ -111,7 +112,7 @@ def cancel_open(self) -> None: @classmethod def close(cls, connection): - connection.state = 'closed' + connection.state = ConnectionState.CLOSED return connection diff --git a/plugins/postgres/dbt/adapters/postgres/connections.py b/plugins/postgres/dbt/adapters/postgres/connections.py index d72fdb25427..04d4d5af4f0 100644 --- a/plugins/postgres/dbt/adapters/postgres/connections.py +++ b/plugins/postgres/dbt/adapters/postgres/connections.py @@ -49,8 +49,7 @@ def exception_handler(self, sql): logger.debug('Postgres error: {}'.format(str(e))) try: - # attempt to release the connection - self.release() + self.rollback_if_open() except psycopg2.Error: logger.debug("Failed to release connection!") pass @@ -60,7 +59,7 @@ def exception_handler(self, sql): except Exception as e: logger.debug("Error running SQL: {}", sql) logger.debug("Rolling back transaction.") - self.release() + self.rollback_if_open() if isinstance(e, dbt.exceptions.RuntimeException): # during a sql query, an internal to dbt exception was raised. # this sounds a lot like a signal handler and probably has diff --git a/plugins/snowflake/dbt/adapters/snowflake/connections.py b/plugins/snowflake/dbt/adapters/snowflake/connections.py index 4a64f3c3ee8..b829c0c140e 100644 --- a/plugins/snowflake/dbt/adapters/snowflake/connections.py +++ b/plugins/snowflake/dbt/adapters/snowflake/connections.py @@ -165,7 +165,6 @@ def exception_handler(self, sql): if 'Empty SQL statement' in msg: logger.debug("got empty sql statement, moving on") elif 'This session does not have a current database' in msg: - self.release() raise FailedToConnectException( ('{}\n\nThis error sometimes occurs when invalid ' 'credentials are provided, or when your default role ' @@ -173,7 +172,6 @@ def exception_handler(self, sql): 'Please double check your profile and try again.') .format(msg)) else: - self.release() raise DatabaseException(msg) except Exception as e: if isinstance(e, snowflake.connector.errors.Error): @@ -181,7 +179,7 @@ def exception_handler(self, sql): logger.debug("Error running SQL: {}", sql) logger.debug("Rolling back transaction.") - self.release() + self.rollback_if_open() if isinstance(e, RuntimeException): # during a sql query, an internal to dbt exception was raised. # this sounds a lot like a signal handler and probably has @@ -328,7 +326,6 @@ def _rollback_handle(cls, connection): """On snowflake, rolling back the handle of an aborted session raises an exception. """ - logger.debug('initiating rollback') try: connection.handle.rollback() except snowflake.connector.errors.ProgrammingError as e: diff --git a/test/integration/048_rpc_test/test_execute_fetch_and_serialize.py b/test/integration/048_rpc_test/test_execute_fetch_and_serialize.py index bdcceeb16ec..6ba315afa02 100644 --- a/test/integration/048_rpc_test/test_execute_fetch_and_serialize.py +++ b/test/integration/048_rpc_test/test_execute_fetch_and_serialize.py @@ -33,7 +33,8 @@ def do_test_file(self, filename): with open(file_path) as fh: query = fh.read() - status, table = self.adapter.execute(query, auto_begin=False, fetch=True) + with self.adapter.connection_named('master'): + status, table = self.adapter.execute(query, auto_begin=False, fetch=True) self.assertTrue(len(table.columns) > 0, "agate table had no columns") self.assertTrue(len(table.rows) > 0, "agate table had no rows") diff --git a/test/integration/051_query_comments_test/test_query_comments.py b/test/integration/051_query_comments_test/test_query_comments.py index ee1d761df32..54c211f2f63 100644 --- a/test/integration/051_query_comments_test/test_query_comments.py +++ b/test/integration/051_query_comments_test/test_query_comments.py @@ -9,15 +9,17 @@ class TestDefaultQueryComments(DBTIntegrationTest): - def matches_comment(self, msg): + def matches_comment(self, msg) -> bool: if not msg.startswith('/* '): return False # our blob is the first line of the query comments, minus the comment json_str = msg.split('\n')[0][3:-3] data = json.loads(json_str) - self.assertEqual(data['app'], 'dbt') - self.assertEqual(data['dbt_version'], dbt_version) - self.assertEqual(data['node_id'], 'model.test.x') + return ( + data['app'] == 'dbt' and + data['dbt_version'] == dbt_version and + data['node_id'] == 'model.test.x' + ) @property def project_config(self): @@ -26,7 +28,6 @@ def project_config(self): 'macro-paths': ['macros'] } - @property def schema(self): return 'dbt_query_comments_051' @@ -84,8 +85,7 @@ def run_assert_comments(self): seen = False for log in logs: msg = self.query_comment('model.test.x', log) - if msg is not None: - self.matches_comment(msg) + if msg is not None and self.matches_comment(msg): seen = True self.assertTrue(seen, 'Never saw a matching log message! Logs:\n{}'.format('\n'.join(l['message'] for l in logs))) @@ -115,10 +115,7 @@ def project_config(self): return cfg def matches_comment(self, msg) -> bool: - self.assertTrue( - msg.startswith('/* dbt\nrules! */\n'), - f'{msg} did not start with query comment' - ) + return msg.startswith('/* dbt\nrules! */\n') class TestMacroQueryComments(TestDefaultQueryComments): @@ -130,10 +127,7 @@ def project_config(self): def matches_comment(self, msg) -> bool: start_with = '/* dbt macros\nare pretty cool */\n' - self.assertTrue( - msg.startswith(start_with), - f'"{msg}" did not start with expected query comment "{start_with}"' - ) + return msg.startswith(start_with) class TestMacroArgsQueryComments(TestDefaultQueryComments): @@ -148,10 +142,7 @@ def project_config(self): def matches_comment(self, msg) -> bool: expected_dct = {'app': 'dbt++', 'dbt_version': dbt_version, 'macro_version': '0.1.0', 'message': 'blah: default2'} expected = '/* {} */\n'.format(json.dumps(expected_dct, sort_keys=True)) - self.assertTrue( - msg.startswith(expected), - f"'{msg}' did not start with query comment '{expected}'" - ) + return msg.startswith(expected) class TestMacroInvalidQueryComments(TestDefaultQueryComments): @@ -174,10 +165,7 @@ def project_config(self): return cfg def matches_comment(self, msg) -> bool: - self.assertFalse( - '/*' in msg or '*/' in msg, - f"'{msg}' contained a query comment" - ) + return not ('/*' in msg or '*/' in msg) class TestEmptyQueryComments(TestDefaultQueryComments): @@ -188,7 +176,4 @@ def project_config(self): return cfg def matches_comment(self, msg) -> bool: - self.assertFalse( - '/*' in msg or '*/' in msg, - f"'{msg}' contained a query comment" - ) + return not ('/*' in msg or '*/' in msg) From fd0b460391373661d3d54fc9534019704745283b Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Mon, 27 Jul 2020 15:14:56 -0600 Subject: [PATCH 3/6] alter the tests so we can run rpc tests on snowflake --- test/rpc/conftest.py | 92 +++++++++++++++++++++++++++++-- test/rpc/test_compile.py | 4 +- test/rpc/test_concurrency.py | 4 +- test/rpc/test_deps.py | 8 ++- test/rpc/test_management.py | 22 +++++--- test/rpc/test_run.py | 10 +++- test/rpc/test_run_operation.py | 8 ++- test/rpc/test_run_sql.py | 43 +++++++++++++++ test/rpc/test_seed.py | 7 ++- test/rpc/test_snapshots.py | 10 +++- test/rpc/test_source_freshness.py | 4 +- test/rpc/test_test.py | 4 +- test/rpc/util.py | 23 ++++++-- tox.ini | 6 +- 14 files changed, 209 insertions(+), 36 deletions(-) create mode 100644 test/rpc/test_run_sql.py diff --git a/test/rpc/conftest.py b/test/rpc/conftest.py index e8e44d21d16..f5790b565c0 100644 --- a/test/rpc/conftest.py +++ b/test/rpc/conftest.py @@ -2,11 +2,48 @@ import pytest import random import time -from typing import Dict, Any +from typing import Dict, Any, Set import yaml +def pytest_addoption(parser): + parser.addoption( + '--profile', default='postgres', help='Use the postgres profile', + ) + + +def _get_item_profiles(item) -> Set[str]: + supported = set() + for mark in item.iter_markers(name='supported'): + supported.update(mark.args) + return supported + + +def pytest_collection_modifyitems(config, items): + selected_profile = config.getoption('profile') + + to_remove = [] + + for item in items: + item_profiles = _get_item_profiles(item) + if selected_profile not in item_profiles and 'any' not in item_profiles: + to_remove.append(item) + + for item in to_remove: + items.remove(item) + + +def pytest_configure(config): + config.addinivalue_line('markers', 'supported(: Marks postgres-only tests') + config.addinivalue_line( + 'markers', 'snowflake: Mark snowflake-only tests' + ) + config.addinivalue_line( + 'markers', 'any: Mark ' + ) + + @pytest.fixture def unique_schema() -> str: return "test{}{:04}".format(int(time.time()), random.randint(0, 9999)) @@ -22,7 +59,6 @@ def project_root(tmpdir): return tmpdir.mkdir('project') -@pytest.fixture def postgres_profile_data(unique_schema): return { 'config': { @@ -46,9 +82,55 @@ def postgres_profile_data(unique_schema): } +def snowflake_profile_data(unique_schema): + return { + 'config': { + 'send_anonymous_usage_stats': False + }, + 'test': { + 'outputs': { + 'default': { + 'type': 'snowflake', + 'threads': 4, + 'account': os.getenv('SNOWFLAKE_TEST_ACCOUNT'), + 'user': os.getenv('SNOWFLAKE_TEST_USER'), + 'password': os.getenv('SNOWFLAKE_TEST_PASSWORD'), + 'database': os.getenv('SNOWFLAKE_TEST_DATABASE'), + 'schema': unique_schema, + 'warehouse': os.getenv('SNOWFLAKE_TEST_WAREHOUSE'), + }, + 'keepalives': { + 'type': 'snowflake', + 'threads': 4, + 'account': os.getenv('SNOWFLAKE_TEST_ACCOUNT'), + 'user': os.getenv('SNOWFLAKE_TEST_USER'), + 'password': os.getenv('SNOWFLAKE_TEST_PASSWORD'), + 'database': os.getenv('SNOWFLAKE_TEST_DATABASE'), + 'schema': unique_schema, + 'warehouse': os.getenv('SNOWFLAKE_TEST_WAREHOUSE'), + 'client_session_keep_alive': True, + }, + }, + 'target': 'default', + }, + } + + +@pytest.fixture +def dbt_profile_data(unique_schema, pytestconfig): + profile_name = pytestconfig.getoption('profile') + if profile_name == 'postgres': + return postgres_profile_data(unique_schema) + elif profile_name == 'snowflake': + return snowflake_profile_data(unique_schema) + else: + print(f'Bad profile name {profile_name}!') + return {} + + @pytest.fixture -def postgres_profile(profiles_root, postgres_profile_data) -> Dict[str, Any]: +def dbt_profile(profiles_root, dbt_profile_data) -> Dict[str, Any]: path = os.path.join(profiles_root, 'profiles.yml') with open(path, 'w') as fp: - fp.write(yaml.safe_dump(postgres_profile_data)) - return postgres_profile_data + fp.write(yaml.safe_dump(dbt_profile_data)) + return dbt_profile_data diff --git a/test/rpc/test_compile.py b/test/rpc/test_compile.py index d6e7e0e4a2a..ae4bbf8e67e 100644 --- a/test/rpc/test_compile.py +++ b/test/rpc/test_compile.py @@ -1,3 +1,4 @@ +import pytest from .util import ( assert_has_threads, get_querier, @@ -5,8 +6,9 @@ ) +@pytest.mark.supported('postgres') def test_rpc_compile_threads( - project_root, profiles_root, postgres_profile, unique_schema + project_root, profiles_root, dbt_profile, unique_schema ): project = ProjectDefinition( models={'my_model.sql': 'select 1 as id'} diff --git a/test/rpc/test_concurrency.py b/test/rpc/test_concurrency.py index 31be49540ef..7155d5a8d8a 100644 --- a/test/rpc/test_concurrency.py +++ b/test/rpc/test_concurrency.py @@ -1,4 +1,5 @@ from concurrent.futures import ThreadPoolExecutor, as_completed +import pytest from .util import ( get_querier, @@ -15,8 +16,9 @@ def _compile_poll_for_result(querier, id: int): assert compile_sql_result['results'][0]['compiled_sql'] == sql +@pytest.mark.supported('postgres') def test_rpc_compile_sql_concurrency( - project_root, profiles_root, postgres_profile, unique_schema + project_root, profiles_root, dbt_profile, unique_schema ): project = ProjectDefinition( models={'my_model.sql': 'select 1 as id'} diff --git a/test/rpc/test_deps.py b/test/rpc/test_deps.py index ea83cfb9ec8..dc8ca99f726 100644 --- a/test/rpc/test_deps.py +++ b/test/rpc/test_deps.py @@ -1,3 +1,5 @@ +import pytest + from .util import ( get_querier, ProjectDefinition, @@ -70,7 +72,8 @@ def deps_with_packages(packages, bad_packages, project_dir, profiles_dir, schema querier.is_result(querier.async_wait(tok1)) -def test_rpc_deps_packages(project_root, profiles_root, postgres_profile, unique_schema): +@pytest.mark.supported('postgres') +def test_rpc_deps_packages(project_root, profiles_root, dbt_profile, unique_schema): packages = [{ 'package': 'fishtown-analytics/dbt_utils', 'version': '0.2.1', @@ -82,7 +85,8 @@ def test_rpc_deps_packages(project_root, profiles_root, postgres_profile, unique deps_with_packages(packages, bad_packages, project_root, profiles_root, unique_schema) -def test_rpc_deps_git(project_root, profiles_root, postgres_profile, unique_schema): +@pytest.mark.supported('postgres') +def test_rpc_deps_git(project_root, profiles_root, dbt_profile, unique_schema): packages = [{ 'git': 'https://github.com/fishtown-analytics/dbt-utils.git', 'revision': '0.2.1' diff --git a/test/rpc/test_management.py b/test/rpc/test_management.py index 370f629441f..931117eaa22 100644 --- a/test/rpc/test_management.py +++ b/test/rpc/test_management.py @@ -1,3 +1,4 @@ +import pytest import time from .util import ( get_querier, @@ -5,8 +6,9 @@ ) +@pytest.mark.supported('postgres') def test_rpc_basics( - project_root, profiles_root, postgres_profile, unique_schema + project_root, profiles_root, dbt_profile, unique_schema ): project = ProjectDefinition( models={'my_model.sql': 'select 1 as id'} @@ -62,7 +64,8 @@ def test_rpc_basics( ''' -def test_rpc_status_error(project_root, profiles_root, postgres_profile, unique_schema): +@pytest.mark.supported('postgres') +def test_rpc_status_error(project_root, profiles_root, dbt_profile, unique_schema): project = ProjectDefinition( models={ 'descendant_model.sql': 'select * from {{ source("test_source", "test_table") }}', @@ -130,7 +133,8 @@ def test_rpc_status_error(project_root, profiles_root, postgres_profile, unique_ querier.is_result(querier.compile_sql('select 1 as id')) -def test_gc_change_interval(project_root, profiles_root, postgres_profile, unique_schema): +@pytest.mark.supported('postgres') +def test_gc_change_interval(project_root, profiles_root, dbt_profile, unique_schema): project = ProjectDefinition( models={'my_model.sql': 'select 1 as id'} ) @@ -176,7 +180,8 @@ def test_gc_change_interval(project_root, profiles_root, postgres_profile, uniqu assert len(result['rows']) == 2 -def test_ps_poll_output_match(project_root, profiles_root, postgres_profile, unique_schema): +@pytest.mark.supported('postgres') +def test_ps_poll_output_match(project_root, profiles_root, dbt_profile, unique_schema): project = ProjectDefinition( models={'my_model.sql': 'select 1 as id'} ) @@ -248,8 +253,9 @@ def wait_for_log_ordering(querier, token, attempts, *messages) -> int: assert False, msg +@pytest.mark.supported('postgres') def test_get_status( - project_root, profiles_root, postgres_profile, unique_schema + project_root, profiles_root, dbt_profile, unique_schema ): project = ProjectDefinition( models={'my_model.sql': 'select 1 as id'}, @@ -294,8 +300,9 @@ def test_get_status( assert len(result['logs']) == 0 +@pytest.mark.supported('postgres') def test_missing_tag_sighup( - project_root, profiles_root, postgres_profile, unique_schema + project_root, profiles_root, dbt_profile, unique_schema ): project = ProjectDefinition( models={ @@ -333,8 +340,9 @@ def test_missing_tag_sighup( assert querier.wait_for_status('ready') is True +@pytest.mark.supported('postgres') def test_get_manifest( - project_root, profiles_root, postgres_profile, unique_schema + project_root, profiles_root, dbt_profile, unique_schema ): project = ProjectDefinition( models={ diff --git a/test/rpc/test_run.py b/test/rpc/test_run.py index 7d1a44c8c01..8e9b76711a4 100644 --- a/test/rpc/test_run.py +++ b/test/rpc/test_run.py @@ -1,3 +1,4 @@ +import pytest from .util import ( assert_has_threads, get_querier, @@ -5,8 +6,9 @@ ) +@pytest.mark.supported('postgres') def test_rpc_run_threads( - project_root, profiles_root, postgres_profile, unique_schema + project_root, profiles_root, dbt_profile, unique_schema ): project = ProjectDefinition( models={'my_model.sql': 'select 1 as id'} @@ -28,8 +30,9 @@ def test_rpc_run_threads( assert_has_threads(results, 7) +@pytest.mark.supported('postgres') def test_rpc_run_vars( - project_root, profiles_root, postgres_profile, unique_schema + project_root, profiles_root, dbt_profile, unique_schema ): project = ProjectDefinition( models={ @@ -50,8 +53,9 @@ def test_rpc_run_vars( assert results['results'][0]['node']['compiled_sql'] == 'select 100 as id' +@pytest.mark.supported('postgres') def test_rpc_run_vars_compiled( - project_root, profiles_root, postgres_profile, unique_schema + project_root, profiles_root, dbt_profile, unique_schema ): project = ProjectDefinition( models={ diff --git a/test/rpc/test_run_operation.py b/test/rpc/test_run_operation.py index f07f772d35e..ee72a3dc936 100644 --- a/test/rpc/test_run_operation.py +++ b/test/rpc/test_run_operation.py @@ -1,3 +1,5 @@ +import pytest + from .util import ( get_querier, ProjectDefinition, @@ -16,8 +18,9 @@ ''' +@pytest.mark.supported('postgres') def test_run_operation( - project_root, profiles_root, postgres_profile, unique_schema + project_root, profiles_root, dbt_profile, unique_schema ): project = ProjectDefinition( models={'my_model.sql': 'select 1 as id'}, @@ -62,8 +65,9 @@ def test_run_operation( assert poll_result['success'] is True +@pytest.mark.supported('postgres') def test_run_operation_cli( - project_root, profiles_root, postgres_profile, unique_schema + project_root, profiles_root, dbt_profile, unique_schema ): project = ProjectDefinition( models={'my_model.sql': 'select 1 as id'}, diff --git a/test/rpc/test_run_sql.py b/test/rpc/test_run_sql.py new file mode 100644 index 00000000000..a87ebd4a14c --- /dev/null +++ b/test/rpc/test_run_sql.py @@ -0,0 +1,43 @@ +import pytest + +from .util import ( + get_querier, + ProjectDefinition, +) + + +@pytest.mark.supported('any') +def test_rpc_run_sql_nohang( + project_root, profiles_root, dbt_profile, unique_schema +): + project = ProjectDefinition( + models={'my_model.sql': 'select 1 as id'} + ) + querier_ctx = get_querier( + project_def=project, + project_dir=project_root, + profiles_dir=profiles_root, + schema=unique_schema, + test_kwargs={}, + ) + with querier_ctx as querier: + querier.async_wait_for_result(querier.run_sql('select 1 as id')) + + +@pytest.mark.supported('snowflake') +def test_snowflake_rpc_run_sql_keepalive_nohang( + project_root, profiles_root, dbt_profile, unique_schema +): + project = ProjectDefinition( + models={'my_model.sql': 'select 1 as id'} + ) + querier_ctx = get_querier( + project_def=project, + project_dir=project_root, + profiles_dir=profiles_root, + schema=unique_schema, + test_kwargs={}, + target='keepalives', + ) + with querier_ctx as querier: + querier.async_wait_for_result(querier.run_sql('select 1 as id')) diff --git a/test/rpc/test_seed.py b/test/rpc/test_seed.py index 8b8c7dc28c8..1e288d94878 100644 --- a/test/rpc/test_seed.py +++ b/test/rpc/test_seed.py @@ -1,3 +1,4 @@ +import pytest from .util import ( assert_has_threads, get_querier, @@ -5,8 +6,9 @@ ) +@pytest.mark.supported('postgres') def test_rpc_seed_threads( - project_root, profiles_root, postgres_profile, unique_schema + project_root, profiles_root, dbt_profile, unique_schema ): project = ProjectDefinition( project_data={'seeds': {'config': {'quote_columns': False}}}, @@ -30,8 +32,9 @@ def test_rpc_seed_threads( assert_has_threads(results, 7) +@pytest.mark.supported('postgres') def test_rpc_seed_include_exclude( - project_root, profiles_root, postgres_profile, unique_schema + project_root, profiles_root, dbt_profile, unique_schema ): project = ProjectDefinition( project_data={'seeds': {'config': {'quote_columns': False}}}, diff --git a/test/rpc/test_snapshots.py b/test/rpc/test_snapshots.py index b05758dacfe..8d49f81902d 100644 --- a/test/rpc/test_snapshots.py +++ b/test/rpc/test_snapshots.py @@ -1,3 +1,4 @@ +import pytest from .util import ( assert_has_threads, get_querier, @@ -22,8 +23,9 @@ ''' +@pytest.mark.supported('postgres') def test_snapshots( - project_root, profiles_root, postgres_profile, unique_schema + project_root, profiles_root, dbt_profile, unique_schema ): project = ProjectDefinition( snapshots={'my_snapshots.sql': snapshot_data}, @@ -50,8 +52,9 @@ def test_snapshots( assert len(results['results']) == 1 +@pytest.mark.supported('postgres') def test_snapshots_cli( - project_root, profiles_root, postgres_profile, unique_schema + project_root, profiles_root, dbt_profile, unique_schema ): project = ProjectDefinition( snapshots={'my_snapshots.sql': snapshot_data}, @@ -81,8 +84,9 @@ def test_snapshots_cli( assert len(results['results']) == 1 +@pytest.mark.supported('postgres') def test_rpc_snapshot_threads( - project_root, profiles_root, postgres_profile, unique_schema + project_root, profiles_root, dbt_profile, unique_schema ): project = ProjectDefinition( snapshots={'my_snapshots.sql': snapshot_data}, diff --git a/test/rpc/test_source_freshness.py b/test/rpc/test_source_freshness.py index 323b1aeb45a..beced003d35 100644 --- a/test/rpc/test_source_freshness.py +++ b/test/rpc/test_source_freshness.py @@ -1,3 +1,4 @@ +import pytest from datetime import datetime, timedelta from .util import ( get_querier, @@ -21,8 +22,9 @@ ''' +@pytest.mark.supported('postgres') def test_source_freshness( - project_root, profiles_root, postgres_profile, unique_schema + project_root, profiles_root, dbt_profile, unique_schema ): start_time = datetime.utcnow() warn_me = start_time - timedelta(hours=18) diff --git a/test/rpc/test_test.py b/test/rpc/test_test.py index 5e2db6c6e18..888d4da1495 100644 --- a/test/rpc/test_test.py +++ b/test/rpc/test_test.py @@ -1,3 +1,4 @@ +import pytest import yaml from .util import ( assert_has_threads, @@ -6,8 +7,9 @@ ) +@pytest.mark.supported('postgres') def test_rpc_test_threads( - project_root, profiles_root, postgres_profile, unique_schema + project_root, profiles_root, dbt_profile, unique_schema ): schema_yaml = { 'version': 2, diff --git a/test/rpc/util.py b/test/rpc/util.py index d2caf7579ef..49a65bd8e31 100644 --- a/test/rpc/util.py +++ b/test/rpc/util.py @@ -30,7 +30,13 @@ class NoServerException(Exception): class ServerProcess(dbt.flags.MP_CONTEXT.Process): def __init__( - self, cwd, port, profiles_dir, cli_vars=None, criteria=('ready',) + self, + cwd, + port, + profiles_dir, + cli_vars=None, + criteria=('ready',), + target=None, ): self.cwd = cwd self.port = port @@ -43,6 +49,9 @@ def __init__( ] if cli_vars: handle_and_check_args.extend(['--vars', cli_vars]) + if target is not None: + handle_and_check_args.extend(['--target', target]) + super().__init__( target=handle_and_check, args=(handle_and_check_args,), @@ -421,7 +430,7 @@ def async_wait_for_error(self, data: Dict[str, Any], state='success'): return self.is_error(self.async_wait(token, state=state)) -def _first_server(cwd, cli_vars, profiles_dir, criteria): +def _first_server(cwd, cli_vars, profiles_dir, criteria, target): stored = None for _ in range(5): port = random.randint(20000, 65535) @@ -432,6 +441,7 @@ def _first_server(cwd, cli_vars, profiles_dir, criteria): profiles_dir=str(profiles_dir), port=port, criteria=criteria, + target=target, ) try: proc.start() @@ -444,7 +454,9 @@ def _first_server(cwd, cli_vars, profiles_dir, criteria): @contextmanager -def rpc_server(project_dir, schema, profiles_dir, criteria='ready'): +def rpc_server( + project_dir, schema, profiles_dir, criteria='ready', target=None +): if isinstance(criteria, str): criteria = (criteria,) else: @@ -452,7 +464,7 @@ def rpc_server(project_dir, schema, profiles_dir, criteria='ready'): cli_vars = '{{test_run_schema: {}}}'.format(schema) - proc = _first_server(project_dir, cli_vars, profiles_dir, criteria) + proc = _first_server(project_dir, cli_vars, profiles_dir, criteria, target) yield proc if proc.is_alive(): os.kill(proc.pid, signal.SIGKILL) @@ -605,10 +617,11 @@ def get_querier( schema, test_kwargs, criteria='ready', + target=None, ): server_ctx = rpc_server( project_dir=project_dir, schema=schema, profiles_dir=profiles_dir, - criteria=criteria, + criteria=criteria, target=target, ) schema_ctx = built_schema( project_dir=project_dir, schema=schema, profiles_dir=profiles_dir, diff --git a/tox.ini b/tox.ini index b607cc96a84..627af812d71 100644 --- a/tox.ini +++ b/tox.ini @@ -41,7 +41,7 @@ passenv = * setenv = HOME=/home/tox commands = /bin/bash -c '{envpython} -m pytest --durations 0 -v -m profile_postgres {posargs} -n4 test/integration/*' - /bin/bash -c '{envpython} -m pytest --durations 0 -v {posargs} -n4 test/rpc/*' + /bin/bash -c '{envpython} -m pytest --durations 0 -v {posargs} -n4 --profile=postgres test/rpc' deps = ./core ./plugins/postgres @@ -116,7 +116,7 @@ passenv = * setenv = HOME=/home/tox commands = /bin/bash -c '{envpython} -m pytest --durations 0 -v -m profile_postgres {posargs} -n4 test/integration/*' && \ - /bin/bash -c '{envpython} -m pytest --durations 0 -v {posargs} -n4 test/rpc/*' + /bin/bash -c '{envpython} -m pytest --durations 0 -v {posargs} -n4 --profile=postgres test/rpc' deps = ./core ./plugins/postgres @@ -192,7 +192,7 @@ passenv = * setenv = HOME=/home/tox commands = /bin/bash -c '{envpython} -m pytest --durations 0 -v -m profile_postgres {posargs} -n4 test/integration/*' - /bin/bash -c '{envpython} -m pytest --durations 0 -v {posargs} -n4 test/rpc/*' + /bin/bash -c '{envpython} -m pytest --durations 0 -v {posargs} -n4 --profile=postgres test/rpc' deps = ./core ./plugins/postgres From 44e3c7eb6dfef37e0f218ec08c8d9c18ca596637 Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Mon, 27 Jul 2020 15:49:34 -0600 Subject: [PATCH 4/6] only try to cancel open connections --- core/dbt/adapters/sql/connections.py | 7 +++++-- .../postgres/dbt/adapters/postgres/connections.py | 12 +++++++++++- test/unit/utils.py | 3 ++- 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/core/dbt/adapters/sql/connections.py b/core/dbt/adapters/sql/connections.py index 3fe1dad2ec7..e50125f701f 100644 --- a/core/dbt/adapters/sql/connections.py +++ b/core/dbt/adapters/sql/connections.py @@ -6,7 +6,7 @@ import dbt.clients.agate_helper import dbt.exceptions -from dbt.contracts.connection import Connection +from dbt.contracts.connection import Connection, ConnectionState from dbt.adapters.base import BaseConnectionManager from dbt.logger import GLOBAL_LOGGER as logger @@ -37,7 +37,10 @@ def cancel_open(self) -> List[str]: # if the connection failed, the handle will be None so we have # nothing to cancel. - if connection.handle is not None: + if ( + connection.handle is not None and + connection.state == ConnectionState.OPEN + ): self.cancel(connection) if connection.name is not None: names.append(connection.name) diff --git a/plugins/postgres/dbt/adapters/postgres/connections.py b/plugins/postgres/dbt/adapters/postgres/connections.py index 04d4d5af4f0..7b610127d1e 100644 --- a/plugins/postgres/dbt/adapters/postgres/connections.py +++ b/plugins/postgres/dbt/adapters/postgres/connections.py @@ -121,7 +121,17 @@ def open(cls, connection): def cancel(self, connection): connection_name = connection.name - pid = connection.handle.get_backend_pid() + try: + pid = connection.handle.get_backend_pid() + except psycopg2.InterfaceError as exc: + # if the connection is already closed, not much to cancel! + if 'already closed' in str(exc): + logger.debug( + f'Connection {connection_name} was already closed' + ) + return + # probably bad, re-raise it + raise sql = "select pg_terminate_backend({})".format(pid) diff --git a/test/unit/utils.py b/test/unit/utils.py index d27793afd87..c99ab596154 100644 --- a/test/unit/utils.py +++ b/test/unit/utils.py @@ -30,9 +30,10 @@ class Obj: single_threaded = False -def mock_connection(name): +def mock_connection(name, state='open'): conn = mock.MagicMock() conn.name = name + conn.state = state return conn From 46136198411bcb412b400b3beb3b03bfdd0ffaa9 Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Tue, 28 Jul 2020 07:55:24 -0600 Subject: [PATCH 5/6] missed the snowflake rpc tests --- tox.ini | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tox.ini b/tox.ini index 627af812d71..52dd11e6498 100644 --- a/tox.ini +++ b/tox.ini @@ -54,6 +54,7 @@ passenv = * setenv = HOME=/home/tox commands = /bin/bash -c '{envpython} -m pytest --durations 0 -v -m profile_snowflake {posargs} -n4 test/integration/*' + /bin/bash -c '{envpython} -m pytest --durations 0 -v {posargs} -n4 --profile=snowflake test/rpc' deps = ./core ./plugins/snowflake @@ -128,6 +129,7 @@ passenv = * setenv = HOME=/home/tox commands = /bin/bash -c '{envpython} -m pytest --durations 0 -v -m profile_snowflake {posargs} -n4 test/integration/*' + /bin/bash -c '{envpython} -m pytest --durations 0 -v {posargs} -n4 --profile=snowflake test/rpc' deps = ./core ./plugins/snowflake @@ -204,6 +206,7 @@ passenv = * setenv = HOME=/home/tox commands = /bin/bash -c '{envpython} -m pytest --durations 0 -v -m profile_snowflake {posargs} -n4 test/integration/*' + /bin/bash -c '{envpython} -m pytest --durations 0 -v {posargs} -n4 --profile=snowflake test/rpc' deps = ./core ./plugins/snowflake From 1bfe43ff76a87439a1430af94e83fc8332561ad6 Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Tue, 28 Jul 2020 11:14:12 -0600 Subject: [PATCH 6/6] PR feedback: - fix conftest adainivalue_line calls - deprecate the release argument to execute_macro --- CHANGELOG.md | 6 ++++++ core/dbt/adapters/base/impl.py | 12 ++++-------- core/dbt/deprecations.py | 10 ++++++++++ test/rpc/conftest.py | 9 ++++----- 4 files changed, 24 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4e2dd721070..8c866d71b01 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,10 @@ ## dbt 0.17.2 (Release TBD) +### Breaking changes (for plugins) +- The `release` argument to adapter.execute_macro no longer has any effect. It will be removed in a future release of dbt (likely 0.18.0) ([#2650](https://github.com/fishtown-analytics/dbt/pull/2650)) + + ### Fixes - fast-fail option with adapters that don't support cancelling queries will now passthrough the original error messages ([#2644](https://github.com/fishtown-analytics/dbt/issues/2644), [#2646](https://github.com/fishtown-analytics/dbt/pull/2646)) - `dbt clean` no longer requires a profile ([#2620](https://github.com/fishtown-analytics/dbt/issues/2620), [#2649](https://github.com/fishtown-analytics/dbt/pull/2649)) @@ -8,6 +12,8 @@ Contributors: - [@joshpeng-quibi](https://github.com/joshpeng-quibi) ([#2646](https://github.com/fishtown-analytics/dbt/pull/2646)) + + ## dbt 0.17.2b1 (July 21, 2020) diff --git a/core/dbt/adapters/base/impl.py b/core/dbt/adapters/base/impl.py index db4d6ce8254..3770625a4bb 100644 --- a/core/dbt/adapters/base/impl.py +++ b/core/dbt/adapters/base/impl.py @@ -934,8 +934,10 @@ def execute_macro( execution context. :param kwargs: An optional dict of keyword args used to pass to the macro. - :param release: If True, release the connection after executing. + :param release: Ignored. """ + if release is not False: + deprecations.warn('execute-macro-release') if kwargs is None: kwargs = {} if context_override is None: @@ -971,11 +973,7 @@ def execute_macro( macro_function = MacroGenerator(macro, macro_context) with self.connections.exception_handler(f'macro {macro_name}'): - try: - result = macro_function(**kwargs) - finally: - if release: - self.release_connection() + result = macro_function(**kwargs) return result @classmethod @@ -1007,7 +1005,6 @@ def _get_one_catalog( table = self.execute_macro( GET_CATALOG_MACRO_NAME, kwargs=kwargs, - release=False, # pass in the full manifest so we get any local project # overrides manifest=manifest, @@ -1063,7 +1060,6 @@ def calculate_freshness( table = self.execute_macro( FRESHNESS_MACRO_NAME, kwargs=kwargs, - release=False, manifest=manifest ) # now we have a 1-row table of the maximum `loaded_at_field` value and diff --git a/core/dbt/deprecations.py b/core/dbt/deprecations.py index b253e7518f7..d937c3af440 100644 --- a/core/dbt/deprecations.py +++ b/core/dbt/deprecations.py @@ -108,6 +108,15 @@ class DbtProjectYamlDeprecation(DBTDeprecation): ''' +class ExecuteMacrosReleaseDeprecation(DBTDeprecation): + _name = 'execute-macro-release' + _description = '''\ + The "release" argument to execute_macro is now ignored, and will be removed + in a future relase of dbt. At that time, providing a `release` argument + will result in an error. + ''' + + _adapter_renamed_description = """\ The adapter function `adapter.{old_name}` is deprecated and will be removed in a future release of dbt. Please use `adapter.{new_name}` instead. @@ -151,6 +160,7 @@ def warn(name, *args, **kwargs): ColumnQuotingDeprecation(), ModelsKeyNonModelDeprecation(), DbtProjectYamlDeprecation(), + ExecuteMacrosReleaseDeprecation(), ] deprecations: Dict[str, DBTDeprecation] = { diff --git a/test/rpc/conftest.py b/test/rpc/conftest.py index f5790b565c0..712e446e2b7 100644 --- a/test/rpc/conftest.py +++ b/test/rpc/conftest.py @@ -35,12 +35,11 @@ def pytest_collection_modifyitems(config, items): def pytest_configure(config): - config.addinivalue_line('markers', 'supported(: Marks postgres-only tests') + # the '(plugin, ...)' part isn't really important: any positional arguments + # to `pytest.mark.supported` will be consumed as plugin names. + helptxt = 'Marks supported test types ("postgres", "snowflake", "any")' config.addinivalue_line( - 'markers', 'snowflake: Mark snowflake-only tests' - ) - config.addinivalue_line( - 'markers', 'any: Mark ' + 'markers', f'supported(plugin, ...): {helptxt}' )