Skip to content

Commit

Permalink
Move SQL previously embedded into adapters into macros
Browse files Browse the repository at this point in the history
Adapters now store an internal manifest that only has the dbt internal projects
Adapters use that manifest if none is provided to execute_manifest
The internal manifest is lazy-loaded to avoid recursion issues
Moved declared plugin paths down one level
Connection management changes to accomadate calling macro -> adapter -> macro
Split up precision and scale when describing number columns so agate doesn't eat commas
Manifest building now happens in the RunManager instead of the compiler

Now macros:
  create/drop schema
  get_columns_in_relation
  alter column type
  rename/drop/truncate
  list_schemas/check_schema_exists
  list_relations_without_caching
  • Loading branch information
Jacob Beck committed Jan 4, 2019
1 parent 44fa678 commit 0f9c4a8
Show file tree
Hide file tree
Showing 41 changed files with 896 additions and 612 deletions.
2 changes: 1 addition & 1 deletion core/dbt/adapters/base/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ def commit_if_has_connection(self, name):
:param str name: The name of the connection to use.
"""
connection = self.get_if_exists(name)
connection = self.in_use.get(name)
if connection:
self.commit(connection)

Expand Down
81 changes: 62 additions & 19 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from dbt.compat import abstractclassmethod, classmethod
from dbt.contracts.connection import Connection
from dbt.loader import GraphLoader
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.schema import Column
from dbt.utils import filter_null_values, translate_aliases
Expand All @@ -21,6 +22,7 @@
from dbt.adapters.base import BaseRelation
from dbt.adapters.cache import RelationsCache


GET_CATALOG_MACRO_NAME = 'get_catalog'


Expand Down Expand Up @@ -68,11 +70,14 @@ def test(row):
class BaseAdapter(object):
"""The BaseAdapter provides an abstract base class for adapters.
Adapters must implement the following methods. Some of these methods can be
safely overridden as a noop, where it makes sense (transactions on
databases that don't support them, for instance). Those methods are marked
with a (passable) in their docstrings. Check docstrings for type
information, etc.
Adapters must implement the following methods and macros. Some of the
methods can be safely overridden as a noop, where it makes sense
(transactions on databases that don't support them, for instance). Those
methods are marked with a (passable) in their docstrings. Check docstrings
for type information, etc.
To implement a macro, implement "${adapter_type}__${macro_name}". in the
adapter's internal project.
Methods:
- exception_handler
Expand All @@ -94,6 +99,9 @@ class BaseAdapter(object):
- convert_datetime_type
- convert_date_type
- convert_time_type
Macros:
- get_catalog
"""
requires = {}

Expand All @@ -106,6 +114,7 @@ def __init__(self, config):
self.config = config
self.cache = RelationsCache()
self.connections = self.ConnectionManager(config)
self._internal_manifest_lazy = None

###
# Methods that pass through to the connection manager
Expand Down Expand Up @@ -159,6 +168,19 @@ def type(cls):
"""
return cls.ConnectionManager.TYPE

@property
def _internal_manifest(self):
if self._internal_manifest_lazy is None:
manifest = GraphLoader.load_internal(self.config)
self._internal_manifest_lazy = manifest
return self._internal_manifest_lazy

def check_internal_manifest(self):
"""Return the internal manifest (used for executing macros) if it's
been initialized, otherwise return None.
"""
return self._internal_manifest_lazy

###
# Caching methods
###
Expand Down Expand Up @@ -667,21 +689,38 @@ def convert_agate_type(cls, agate_table, col_idx):
###
# Operations involving the manifest
###
def execute_macro(self, manifest, macro_name, project=None,
context_override=None):
def execute_macro(self, macro_name, manifest=None, project=None,
context_override=None, kwargs=None, release=False,
connection_name=None):
"""Look macro_name up in the manifest and execute its results.
:param Manifest manifest: The manifest to use for generating the base
macro execution context.
:param str macro_name: The name of the macro to execute.
:param Optional[Manifest] manifest: The manifest to use for generating
the base macro execution context. If none is provided, use the
internal manifest.
:param Optional[str] project: The name of the project to search in, or
None for the first match.
:param Optional[dict] context_override: An optional dict to update()
the macro execution context.
:param Optional[dict] kwargs: An optional dict of keyword args used to
pass to the macro.
:param bool release: If True, release the connection after executing.
:param Optional[str] connection_name: The connection name to use, or
use the macro name.
Return an an AttrDict with three attributes: 'table', 'data', and
'status'. 'table' is an agate.Table.
"""
if kwargs is None:
kwargs = {}
if context_override is None:
context_override = {}
if connection_name is None:
connection_name = macro_name

if manifest is None:
manifest = self._internal_manifest

macro = manifest.find_macro_by_name(macro_name, project)
if macro is None:
raise dbt.exceptions.RuntimeException(
Expand All @@ -692,15 +731,21 @@ def execute_macro(self, manifest, macro_name, project=None,
# This causes a reference cycle, as dbt.context.runtime.generate()
# ends up calling get_adapter, so the import has to be here.
import dbt.context.runtime
macro_context = dbt.context.runtime.generate(
macro_context = dbt.context.runtime.generate_macro(
macro,
self.config,
manifest
manifest,
connection_name
)
if context_override:
macro_context.update(context_override)
macro_context.update(context_override)

macro_function = macro.generator(macro_context)

result = macro.generator(macro_context)()
try:
result = macro_function(**kwargs)
finally:
if release:
self.release_connection(connection_name)
return result

@classmethod
Expand All @@ -716,11 +761,9 @@ def get_catalog(self, manifest):
"""
# make it a list so macros can index into it.
context = {'databases': list(manifest.get_used_databases())}
try:
table = self.execute_macro(manifest, GET_CATALOG_MACRO_NAME,
context_override=context)
finally:
self.release_connection(GET_CATALOG_MACRO_NAME)
table = self.execute_macro(GET_CATALOG_MACRO_NAME,
context_override=context,
release=True)

results = self._catalog_filter_table(table, manifest)
return results
Expand Down
3 changes: 1 addition & 2 deletions core/dbt/adapters/base/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ def __init__(self, adapter, credentials, include_path, dependencies=None):
self.adapter = adapter
self.credentials = credentials
self.include_path = include_path
project_path = os.path.join(self.include_path, adapter.type())
project = Project.from_project_root(project_path, {})
project = Project.from_project_root(include_path, {})
self.project_name = project.project_name
if dependencies is None:
dependencies = []
Expand Down
3 changes: 2 additions & 1 deletion core/dbt/adapters/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,6 @@ def reset_adapters():
"""Clear the adapters. This is useful for tests, which change configs.
"""
with _ADAPTER_LOCK:
for adapter in _ADAPTERS.values():
adapter.cleanup_connections()
_ADAPTERS.clear()
ADAPTER_TYPES.clear()
Loading

0 comments on commit 0f9c4a8

Please sign in to comment.