From 562f3d0bb7b7b5cb762bd2037b590be204d8076d Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Thu, 27 Feb 2020 17:57:08 -0700 Subject: [PATCH 1/4] use show schemas for snowflake list_schemas Also fix up the changelog to better reflect releases --- CHANGELOG.md | 25 +++++++++---- core/dbt/adapters/base/impl.py | 21 +++++++---- core/dbt/deprecations.py | 17 --------- core/dbt/task/runnable.py | 37 ++++++++++++++----- .../snowflake/dbt/adapters/snowflake/impl.py | 13 ++++++- .../dbt/include/snowflake/macros/adapters.sql | 18 +++++++++ 6 files changed, 90 insertions(+), 41 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e1d7c731bc2..e64556cd850 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,21 @@ +## dbt 0.16.next (Release TBD) + +### Breaking changes +- When overriding the snowflake__list_schemas macro, you must now run a result with a column named 'name' instead of the first column ([#2171](https://github.com/fishtown-analytics/dbt/pull/2171)) +- dbt no longer supports databases with greater than 10,000 schemas ([#2171](https://github.com/fishtown-analytics/dbt/pull/2171)) + +### Features +- Remove the requirement to have a passphrase when using Snowflake key pair authentication ([#1805](https://github.com/fishtown-analytics/dbt/issues/1805), [#2164](https://github.com/fishtown-analytics/dbt/pull/2164)) +- Adding optional "sslmode" parameter for postgres ([#2152](https://github.com/fishtown-analytics/dbt/issues/2152), [#2154](https://github.com/fishtown-analytics/dbt/pull/2154)) + +### Under the hood +- Use `show terse schemas in database` (chosen based on data collected by Michael Weinberg) instead of `select ... from information_schema.schemata` when collecting the list of schemas in a database ([#2166](https://github.com/fishtown-analytics/dbt/issues/2166), [#2171](https://github.com/fishtown-analytics/dbt/pull/2171)) +- Parallelize filling the cache and listing schemas in each database during startup ([#2127](https://github.com/fishtown-analytics/dbt/issues/2127), [#2157](https://github.com/fishtown-analytics/dbt/pull/2157)) + +Contributors: + - [@mhmcdonald](https://github.com/mhmcdonald) ([#2164](https://github.com/fishtown-analytics/dbt/pull/2164)) + - [@dholleran-lendico](https://github.com/dholleran-lendico) ([#2154](https://github.com/fishtown-analytics/dbt/pull/2154)) + ## dbt 0.16.0b3 (February 26, 2020) ### Breaking changes @@ -7,10 +25,8 @@ ### Features - Add a "docs" field to models, with a "show" subfield ([#1671](https://github.com/fishtown-analytics/dbt/issues/1671), [#2107](https://github.com/fishtown-analytics/dbt/pull/2107)) - Add a dbt-{dbt_version} user agent field to the bigquery connector ([#2121](https://github.com/fishtown-analytics/dbt/issues/2121), [#2146](https://github.com/fishtown-analytics/dbt/pull/2146)) -- Adding optional "sslmode" parameter for postgres ([#2152](https://github.com/fishtown-analytics/dbt/issues/2152), [#2154](https://github.com/fishtown-analytics/dbt/pull/2154)) - Add support for `generate_database_name` macro ([#1695](https://github.com/fishtown-analytics/dbt/issues/1695), [#2143](https://github.com/fishtown-analytics/dbt/pull/2143)) - Expand the search path for schema.yml (and by extension, the default docs path) to include macro-paths and analysis-paths (in addition to source-paths, data-paths, and snapshot-paths) ([#2155](https://github.com/fishtown-analytics/dbt/issues/2155), [#2160](https://github.com/fishtown-analytics/dbt/pull/2160)) -- Remove the requirement to have a passphrase when using Snowflake key pair authentication ([#1804](https://github.com/fishtown-analytics/dbt/issues/1805), [#2164](https://github.com/fishtown-analytics/dbt/pull/2164)) ### Fixes - Fix issue where dbt did not give an error in the presence of duplicate doc names ([#2054](https://github.com/fishtown-analytics/dbt/issues/2054), [#2080](https://github.com/fishtown-analytics/dbt/pull/2080)) @@ -19,14 +35,9 @@ - Fix an issue where dbt rendered source test args, fix issue where dbt ran an extra compile pass over the wrapped SQL. ([#2114](https://github.com/fishtown-analytics/dbt/issues/2114), [#2150](https://github.com/fishtown-analytics/dbt/pull/2150)) - Set more upper bounds for jinja2,requests, and idna dependencies, upgrade snowflake-connector-python ([#2147](https://github.com/fishtown-analytics/dbt/issues/2147), [#2151](https://github.com/fishtown-analytics/dbt/pull/2151)) -### Under the hood -- Parallelize filling the cache and listing schemas in each database during startup ([#2127](https://github.com/fishtown-analytics/dbt/issues/2127), [#2157](https://github.com/fishtown-analytics/dbt/pull/2157)) - Contributors: - [@bubbomb](https://github.com/bubbomb) ([#2080](https://github.com/fishtown-analytics/dbt/pull/2080)) - [@sonac](https://github.com/sonac) ([#2078](https://github.com/fishtown-analytics/dbt/pull/2078)) - - [@mhmcdonald](https://github.com/mhmcdonald) ([#2164](https://github.com/fishtown-analytics/dbt/pull/2164)) - - [@dholleran-lendico](https://github.com/dholleran-lendico) ([#2154](https://github.com/fishtown-analytics/dbt/pull/2154)) ## dbt 0.16.0b1 (February 11, 2020) diff --git a/core/dbt/adapters/base/impl.py b/core/dbt/adapters/base/impl.py index 76908738328..2c8fe5c147b 100644 --- a/core/dbt/adapters/base/impl.py +++ b/core/dbt/adapters/base/impl.py @@ -124,23 +124,30 @@ class SchemaSearchMap(Dict[InformationSchema, Set[Optional[str]]]): """A utility class to keep track of what information_schema tables to search for what schemas """ - def add(self, relation: BaseRelation): + def add(self, relation: BaseRelation, preserve_case=False): key = relation.information_schema_only() if key not in self: self[key] = set() - lowered: Optional[str] = None + schema: Optional[str] = None if relation.schema is not None: - lowered = relation.schema.lower() - self[key].add(lowered) + if preserve_case: + schema = relation.schema + else: + schema = relation.schema.lower() + self[key].add(schema) - def search(self): + def search(self) -> Iterator[Tuple[InformationSchema, Optional[str]]]: for information_schema_name, schemas in self.items(): for schema in schemas: yield information_schema_name, schema - def schemas_searched(self): - result: Set[Tuple[str, str]] = set() + def schemas_searched(self) -> Set[Tuple[str, Optional[str]]]: + result: Set[Tuple[str, Optional[str]]] = set() for information_schema_name, schemas in self.items(): + if information_schema_name.database is None: + raise InternalException( + 'Got a None database in an information schema!' + ) result.update( (information_schema_name.database, schema) for schema in schemas diff --git a/core/dbt/deprecations.py b/core/dbt/deprecations.py index 3cfeea14491..20df3919de7 100644 --- a/core/dbt/deprecations.py +++ b/core/dbt/deprecations.py @@ -36,22 +36,6 @@ def show(self, *args, **kwargs) -> None: active_deprecations.add(self.name) -class GenerateSchemaNameSingleArgDeprecated(DBTDeprecation): - _name = 'generate-schema-name-single-arg' - - _description = '''\ - As of dbt v0.14.0, the `generate_schema_name` macro accepts a second "node" - argument. The one-argument form of `generate_schema_name` is deprecated, - and will become unsupported in a future release. - - - - For more information, see: - - https://docs.getdbt.com/v0.14/docs/upgrading-to-014 - ''' - - class MaterializationReturnDeprecation(DBTDeprecation): _name = 'materialization-return' @@ -166,7 +150,6 @@ def warn(name, *args, **kwargs): active_deprecations: Set[str] = set() deprecations_list: List[DBTDeprecation] = [ - GenerateSchemaNameSingleArgDeprecated(), MaterializationReturnDeprecation(), NotADictionaryDeprecation(), ColumnQuotingDeprecation(), diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index a7b4b06aefe..65f95e7dfb9 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -6,6 +6,8 @@ from typing import Optional, Dict, List, Set, Tuple, Iterable from dbt.task.base import ConfiguredTask +# TODO: move this... +from dbt.adapters.base.impl import SchemaSearchMap from dbt.adapters.factory import get_adapter from dbt.logger import ( GLOBAL_LOGGER as logger, @@ -376,23 +378,36 @@ def interpret_results(self, results): return len(failures) == 0 def get_model_schemas( - self, selected_uids: Iterable[str] - ) -> Set[Tuple[str, str]]: + self, adapter, selected_uids: Iterable[str] + ) -> SchemaSearchMap: if self.manifest is None: raise InternalException('manifest was None in get_model_schemas') + search_map = SchemaSearchMap() - schemas: Set[Tuple[str, str]] = set() for node in self.manifest.nodes.values(): if node.unique_id not in selected_uids: continue if node.is_refable and not node.is_ephemeral: - schemas.add((node.database, node.schema)) + relation = adapter.Relation.create_from(self.config, node) + # we're going to be creating these schemas, so preserve the + # case. + search_map.add(relation, preserve_case=True) - return schemas + return search_map def create_schemas(self, adapter, selected_uids: Iterable[str]): - required_schemas = self.get_model_schemas(selected_uids) - required_databases = set(db for db, _ in required_schemas) + required_schemas = self.get_model_schemas(adapter, selected_uids) + # we want the string form of the information schema database + required_databases: List[str] = [] + for info in required_schemas: + include_policy = info.include_policy.replace( + schema=False, identifier=False, database=True + ) + db_only = info.replace( + include_policy=include_policy, + information_schema_view=None, + ) + required_databases.append(str(db_only)) existing_schemas_lowered: Set[Tuple[str, str]] = set() @@ -418,8 +433,12 @@ def create_schema(db: str, schema: str) -> None: for ls_future in as_completed(list_futures): existing_schemas_lowered.update(ls_future.result()) - for db, schema in required_schemas: - db_schema = (db.lower(), schema.lower()) + for info, schema in required_schemas.search(): + db = info.database + lower_schema: Optional[str] = None + if schema is not None: + lower_schema = schema.lower() + db_schema = (db.lower(), lower_schema) if db_schema not in existing_schemas_lowered: existing_schemas_lowered.add(db_schema) create_futures.append( diff --git a/plugins/snowflake/dbt/adapters/snowflake/impl.py b/plugins/snowflake/dbt/adapters/snowflake/impl.py index 86c96abbb55..267cebf6136 100644 --- a/plugins/snowflake/dbt/adapters/snowflake/impl.py +++ b/plugins/snowflake/dbt/adapters/snowflake/impl.py @@ -1,8 +1,9 @@ -from typing import Mapping, Any, Optional +from typing import Mapping, Any, Optional, List import agate from dbt.adapters.sql import SQLAdapter +from dbt.adapters.sql.impl import LIST_SCHEMAS_MACRO_NAME from dbt.adapters.snowflake import SnowflakeConnectionManager from dbt.adapters.snowflake import SnowflakeRelation from dbt.adapters.snowflake import SnowflakeColumn @@ -81,3 +82,13 @@ def post_model_hook( ) -> None: if context is not None: self._use_warehouse(context) + + def list_schemas(self, database: str) -> List[str]: + results = self.execute_macro( + LIST_SCHEMAS_MACRO_NAME, + kwargs={'database': database} + ) + # this uses 'show terse schemas in database', and the column name we + # want is 'name' + + return [row['name'] for row in results] diff --git a/plugins/snowflake/dbt/include/snowflake/macros/adapters.sql b/plugins/snowflake/dbt/include/snowflake/macros/adapters.sql index ffa7d6bae62..9d051c4e39c 100644 --- a/plugins/snowflake/dbt/include/snowflake/macros/adapters.sql +++ b/plugins/snowflake/dbt/include/snowflake/macros/adapters.sql @@ -79,6 +79,24 @@ {% endmacro %} +{% macro snowflake__list_schemas(database) -%} + {# 10k limit from here: https://docs.snowflake.net/manuals/sql-reference/sql/show-schemas.html#usage-notes #} + {% set maximum = 10000 %} + {% set sql -%} + show terse schemas in database {{ database }} + limit {{ maximum }} + {%- endset %} + {% set result = run_query(sql) %} + {% if (result | length) >= maximum %} + {% set msg %} + Too many schemas in database {{ database }}! dbt can only get + information about databases with fewer than {{ maximum }} schemas. + {% endset %} + {% do exceptions.raise_compiler_error(msg) %} + {% endif %} + {{ return(result) }} +{% endmacro %} + {% macro snowflake__list_relations_without_caching(information_schema, schema) %} {% call statement('list_relations_without_caching', fetch_result=True) -%} From cc3ba20ec9cf3bc2a5047f4d2e0fafe0e86e7f63 Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Fri, 28 Feb 2020 11:26:15 -0700 Subject: [PATCH 2/4] refactor, fix flake8/mypy --- core/dbt/adapters/base/__init__.py | 6 +- core/dbt/adapters/base/impl.py | 58 +----------------- core/dbt/adapters/base/relation.py | 59 ++++++++++++++++++- core/dbt/task/runnable.py | 12 ++-- .../bigquery/dbt/adapters/bigquery/impl.py | 5 +- 5 files changed, 75 insertions(+), 65 deletions(-) diff --git a/core/dbt/adapters/base/__init__.py b/core/dbt/adapters/base/__init__.py index 39461477c69..b4ddf791159 100644 --- a/core/dbt/adapters/base/__init__.py +++ b/core/dbt/adapters/base/__init__.py @@ -4,7 +4,11 @@ from dbt.contracts.connection import Credentials # noqa from dbt.adapters.base.meta import available # noqa from dbt.adapters.base.connections import BaseConnectionManager # noqa -from dbt.adapters.base.relation import BaseRelation, RelationType # noqa +from dbt.adapters.base.relation import ( # noqa + BaseRelation, + RelationType, + SchemaSearchMap, +) from dbt.adapters.base.column import Column # noqa from dbt.adapters.base.impl import BaseAdapter # noqa from dbt.adapters.base.plugin import AdapterPlugin # noqa diff --git a/core/dbt/adapters/base/impl.py b/core/dbt/adapters/base/impl.py index 2c8fe5c147b..d8e638443fd 100644 --- a/core/dbt/adapters/base/impl.py +++ b/core/dbt/adapters/base/impl.py @@ -31,7 +31,7 @@ from dbt.adapters.base.connections import BaseConnectionManager, Connection from dbt.adapters.base.meta import AdapterMeta, available from dbt.adapters.base.relation import ( - ComponentName, BaseRelation, InformationSchema + ComponentName, BaseRelation, InformationSchema, SchemaSearchMap ) from dbt.adapters.base import Column as BaseColumn from dbt.adapters.cache import RelationsCache @@ -120,62 +120,6 @@ def _relation_name(rel: Optional[BaseRelation]) -> str: return str(rel) -class SchemaSearchMap(Dict[InformationSchema, Set[Optional[str]]]): - """A utility class to keep track of what information_schema tables to - search for what schemas - """ - def add(self, relation: BaseRelation, preserve_case=False): - key = relation.information_schema_only() - if key not in self: - self[key] = set() - schema: Optional[str] = None - if relation.schema is not None: - if preserve_case: - schema = relation.schema - else: - schema = relation.schema.lower() - self[key].add(schema) - - def search(self) -> Iterator[Tuple[InformationSchema, Optional[str]]]: - for information_schema_name, schemas in self.items(): - for schema in schemas: - yield information_schema_name, schema - - def schemas_searched(self) -> Set[Tuple[str, Optional[str]]]: - result: Set[Tuple[str, Optional[str]]] = set() - for information_schema_name, schemas in self.items(): - if information_schema_name.database is None: - raise InternalException( - 'Got a None database in an information schema!' - ) - result.update( - (information_schema_name.database, schema) - for schema in schemas - ) - return result - - def flatten(self): - new = self.__class__() - - # make sure we don't have duplicates - seen = {r.database.lower() for r in self if r.database} - if len(seen) > 1: - raise_compiler_error(str(seen)) - - for information_schema_name, schema in self.search(): - path = { - 'database': information_schema_name.database, - 'schema': schema - } - new.add(information_schema_name.incorporate( - path=path, - quote_policy={'database': False}, - include_policy={'database': False}, - )) - - return new - - class BaseAdapter(metaclass=AdapterMeta): """The BaseAdapter provides an abstract base class for adapters. diff --git a/core/dbt/adapters/base/relation.py b/core/dbt/adapters/base/relation.py index 6d1c6c41783..d223741de01 100644 --- a/core/dbt/adapters/base/relation.py +++ b/core/dbt/adapters/base/relation.py @@ -6,7 +6,8 @@ from collections.abc import Mapping, Hashable from dataclasses import dataclass, fields from typing import ( - Optional, TypeVar, Generic, Any, Type, Dict, Union, List, Iterator, Tuple + Optional, TypeVar, Generic, Any, Type, Dict, Union, List, Iterator, Tuple, + Set ) from typing_extensions import Protocol @@ -496,3 +497,59 @@ def _render_iterator(self): for k, v in super()._render_iterator(): yield k, v yield None, self.information_schema_view + + +class SchemaSearchMap(Dict[InformationSchema, Set[Optional[str]]]): + """A utility class to keep track of what information_schema tables to + search for what schemas + """ + def add(self, relation: BaseRelation, preserve_case=False): + key = relation.information_schema_only() + if key not in self: + self[key] = set() + schema: Optional[str] = None + if relation.schema is not None: + if preserve_case: + schema = relation.schema + else: + schema = relation.schema.lower() + self[key].add(schema) + + def search(self) -> Iterator[Tuple[InformationSchema, Optional[str]]]: + for information_schema_name, schemas in self.items(): + for schema in schemas: + yield information_schema_name, schema + + def schemas_searched(self) -> Set[Tuple[str, Optional[str]]]: + result: Set[Tuple[str, Optional[str]]] = set() + for information_schema_name, schemas in self.items(): + if information_schema_name.database is None: + raise InternalException( + 'Got a None database in an information schema!' + ) + result.update( + (information_schema_name.database, schema) + for schema in schemas + ) + return result + + def flatten(self): + new = self.__class__() + + # make sure we don't have duplicates + seen = {r.database.lower() for r in self if r.database} + if len(seen) > 1: + dbt.exceptions.raise_compiler_error(str(seen)) + + for information_schema_name, schema in self.search(): + path = { + 'database': information_schema_name.database, + 'schema': schema + } + new.add(information_schema_name.incorporate( + path=path, + quote_policy={'database': False}, + include_policy={'database': False}, + )) + + return new diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index 65f95e7dfb9..33e6a78ff58 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -6,8 +6,7 @@ from typing import Optional, Dict, List, Set, Tuple, Iterable from dbt.task.base import ConfiguredTask -# TODO: move this... -from dbt.adapters.base.impl import SchemaSearchMap +from dbt.adapters.base import SchemaSearchMap from dbt.adapters.factory import get_adapter from dbt.logger import ( GLOBAL_LOGGER as logger, @@ -409,7 +408,7 @@ def create_schemas(self, adapter, selected_uids: Iterable[str]): ) required_databases.append(str(db_only)) - existing_schemas_lowered: Set[Tuple[str, str]] = set() + existing_schemas_lowered: Set[Tuple[str, Optional[str]]] = set() def list_schemas(db: str) -> List[Tuple[str, str]]: with adapter.connection_named(f'list_{db}'): @@ -434,10 +433,15 @@ def create_schema(db: str, schema: str) -> None: existing_schemas_lowered.update(ls_future.result()) for info, schema in required_schemas.search(): - db = info.database + if info.database is None: + raise InternalException( + 'Got an information schema with no database!' + ) + db: str = info.database lower_schema: Optional[str] = None if schema is not None: lower_schema = schema.lower() + db_schema = (db.lower(), lower_schema) if db_schema not in existing_schemas_lowered: existing_schemas_lowered.add(db_schema) diff --git a/plugins/bigquery/dbt/adapters/bigquery/impl.py b/plugins/bigquery/dbt/adapters/bigquery/impl.py index f7f6581426a..d277715ae37 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/impl.py +++ b/plugins/bigquery/dbt/adapters/bigquery/impl.py @@ -9,8 +9,9 @@ import dbt.clients.agate_helper import dbt.links -from dbt.adapters.base import BaseAdapter, available, RelationType -from dbt.adapters.base.impl import SchemaSearchMap +from dbt.adapters.base import ( + BaseAdapter, available, RelationType, SchemaSearchMap +) from dbt.adapters.bigquery.relation import ( BigQueryRelation, BigQueryInformationSchema ) From c59adc33698264e73fc00467b16b98ea9c3e306d Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Fri, 28 Feb 2020 12:04:31 -0700 Subject: [PATCH 3/4] strip out the database qutoes on bigquery for the API --- plugins/bigquery/dbt/adapters/bigquery/impl.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/plugins/bigquery/dbt/adapters/bigquery/impl.py b/plugins/bigquery/dbt/adapters/bigquery/impl.py index d277715ae37..3ae6f40a59d 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/impl.py +++ b/plugins/bigquery/dbt/adapters/bigquery/impl.py @@ -180,6 +180,9 @@ def rename_relation( @available def list_schemas(self, database: str) -> List[str]: + # the database string we get here is potentially quoted. Strip that off + # for the API call. + database = database.strip('`') conn = self.connections.get_thread_connection() client = conn.handle From 47cef1d9078cf2383290d0f914cdeb07596510da Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Sun, 1 Mar 2020 20:32:23 -0700 Subject: [PATCH 4/4] PR feedback: Add exception handler around macro execution, add a message about what is going on when list_schemas fails --- core/dbt/adapters/base/impl.py | 11 ++++++----- .../snowflake/dbt/adapters/snowflake/impl.py | 17 ++++++++++++----- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/core/dbt/adapters/base/impl.py b/core/dbt/adapters/base/impl.py index d8e638443fd..8600560b1d7 100644 --- a/core/dbt/adapters/base/impl.py +++ b/core/dbt/adapters/base/impl.py @@ -961,11 +961,12 @@ def execute_macro( macro_function = MacroGenerator(macro, macro_context) - try: - result = macro_function(**kwargs) - finally: - if release: - self.release_connection() + with self.connections.exception_handler(f'macro {macro_name}'): + try: + result = macro_function(**kwargs) + finally: + if release: + self.release_connection() return result @classmethod diff --git a/plugins/snowflake/dbt/adapters/snowflake/impl.py b/plugins/snowflake/dbt/adapters/snowflake/impl.py index 267cebf6136..37e27dcb2de 100644 --- a/plugins/snowflake/dbt/adapters/snowflake/impl.py +++ b/plugins/snowflake/dbt/adapters/snowflake/impl.py @@ -8,7 +8,7 @@ from dbt.adapters.snowflake import SnowflakeRelation from dbt.adapters.snowflake import SnowflakeColumn from dbt.contracts.graph.manifest import Manifest -from dbt.exceptions import RuntimeException +from dbt.exceptions import RuntimeException, DatabaseException from dbt.utils import filter_null_values @@ -84,10 +84,17 @@ def post_model_hook( self._use_warehouse(context) def list_schemas(self, database: str) -> List[str]: - results = self.execute_macro( - LIST_SCHEMAS_MACRO_NAME, - kwargs={'database': database} - ) + try: + results = self.execute_macro( + LIST_SCHEMAS_MACRO_NAME, + kwargs={'database': database} + ) + except DatabaseException as exc: + msg = ( + f'Database error while listing schemas in database ' + f'"{database}"\n{exc}' + ) + raise RuntimeException(msg) # this uses 'show terse schemas in database', and the column name we # want is 'name'