Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/archive #183

Merged
merged 10 commits into from
Oct 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions dbt/archival.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@

from __future__ import print_function
import dbt.targets
import dbt.schema
import dbt.templates
import jinja2


class Archival(object):

def __init__(self, project, archive_model):
self.archive_model = archive_model
self.project = project

self.target = dbt.targets.get_target(self.project.run_environment())
self.schema = dbt.schema.Schema(self.project, self.target)

def compile(self):
source_schema = self.archive_model.source_schema
target_schema = self.archive_model.target_schema
source_table = self.archive_model.source_table
target_table = self.archive_model.target_table
unique_key = self.archive_model.unique_key
updated_at = self.archive_model.updated_at

self.schema.create_schema(target_schema)

source_columns = self.schema.get_columns_in_table(source_schema, source_table)

if len(source_columns) == 0:
raise RuntimeError('Source table "{}"."{}" does not exist'.format(source_schema, source_table))

# create archive table if not exists! TODO: Sort & Dist keys! Hmmmm

extra_cols = [
("valid_from", "timestamp"),
("valid_to", "timestamp"),
("scd_id","text"),
("dbt_updated_at","timestamp")
]

dest_columns = source_columns + extra_cols
self.schema.create_table(target_schema, target_table, dest_columns, sort=updated_at, dist=unique_key)

env = jinja2.Environment()

ctx = {
"columns" : source_columns,
"updated_at" : updated_at,
"unique_key" : unique_key,
"source_schema" : source_schema,
"source_table" : source_table,
"target_schema" : target_schema,
"target_table" : target_table
}

base_query = dbt.templates.SCDArchiveTemplate
template = env.from_string(base_query, globals=ctx)
rendered = template.render(ctx)

return rendered

def runtime_compile(self, compiled_model):
context = self.context.copy()
context.update(model.context())
model.compile(context)

40 changes: 35 additions & 5 deletions dbt/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@
from dbt.utils import find_model_by_fqn, find_model_by_name, dependency_projects, split_path, This, Var, compiler_error
from dbt.linker import Linker
import dbt.targets
import dbt.templates
import time
import sqlparse

CompilableEntities = ["models", "tests", "archives", "analyses"]

class Compiler(object):
def __init__(self, project, create_template_class):
self.project = project
Expand Down Expand Up @@ -38,6 +41,8 @@ def model_sources(self, this_project, own_project=None):
return Source(this_project, own_project=own_project).get_models(paths, self.create_template)
elif self.create_template.label == 'test':
return Source(this_project, own_project=own_project).get_test_models(paths, self.create_template)
elif self.create_template.label == 'archive':
return []
else:
raise RuntimeError("unexpected create template type: '{}'".format(self.create_template.label))

Expand All @@ -47,6 +52,10 @@ def get_macros(self, this_project, own_project=None):
paths = own_project.get('macro-paths', [])
return Source(this_project, own_project=own_project).get_macros(paths)

def get_archives(self, project):
archive_template = dbt.templates.ArchiveInsertTemplate()
return Source(project, own_project=project).get_archives(archive_template)

def project_schemas(self):
source_paths = self.project.get('source-paths', [])
return Source(self.project).get_schemas(source_paths)
Expand Down Expand Up @@ -192,8 +201,8 @@ def compile_model(self, linker, model, models):

return rendered

def write_graph_file(self, linker):
filename = 'graph-{}.yml'.format(self.create_template.label)
def write_graph_file(self, linker, label):
filename = 'graph-{}.yml'.format(label)
graph_path = os.path.join(self.project['target-path'], filename)
linker.write_graph(graph_path)

Expand Down Expand Up @@ -329,6 +338,19 @@ def do_gen(ctx):
return macros
return do_gen

def compile_archives(self):
linker = Linker()
all_archives = self.get_archives(self.project)

for archive in all_archives:
sql = archive.compile()
fqn = tuple(archive.fqn)
linker.update_node_data(fqn, archive.serialize())
self.__write(archive.build_path(), sql)

self.write_graph_file(linker, 'archive')
return all_archives

def compile(self, dry=False):
linker = Linker()

Expand All @@ -350,11 +372,19 @@ def compile(self, dry=False):

self.validate_models_unique(compiled_models)
self.validate_models_unique(written_schema_tests)
self.write_graph_file(linker)
self.write_graph_file(linker, self.create_template.label)

if self.create_template.label != 'test':
if self.create_template.label not in ['test', 'archive']:
written_analyses = self.compile_analyses(linker, compiled_models)
else:
written_analyses = []

return len(written_models), len(written_schema_tests), len(written_analyses)

compiled_archives = self.compile_archives()

return {
"models": len(written_models),
"tests" : len(written_schema_tests),
"archives": len(compiled_archives),
"analyses" : len(written_analyses)
}
22 changes: 22 additions & 0 deletions dbt/compiled_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ class CompiledModel(object):
def __init__(self, fqn, data):
self.fqn = fqn
self.data = data
self.nice_name = ".".join(fqn)

# these are set just before the models are executed
self.tmp_drop_type = None
Expand All @@ -23,6 +24,9 @@ def hashed_name(self):
fqn_string = ".".join(self.fqn)
return hashlib.md5(fqn_string.encode('utf-8')).hexdigest()

def context(self):
return self.data

def hashed_contents(self):
return hashlib.md5(self.contents.encode('utf-8')).hexdigest()

Expand Down Expand Up @@ -110,13 +114,31 @@ def prepare(self, existing, target):
def __repr__(self):
return "<CompiledModel {}.{}: {}>".format(self.data['project_name'], self.name, self.data['build_path'])

class CompiledArchive(CompiledModel):
def __init__(self, fqn, data):
super(CompiledArchive, self).__init__(fqn, data)

def should_rename(self):
return False

def should_execute(self):
return True

def prepare(self, existing, target):
self.target = target

def __repr__(self):
return "<CompiledArchive {}.{}: {}>".format(self.data['project_name'], self.name, self.data['build_path'])

def make_compiled_model(fqn, data):
run_type = data['dbt_run_type']

if run_type in ['run', 'dry-run']:
return CompiledModel(fqn, data)
elif run_type == 'test':
return CompiledTest(fqn, data)
elif run_type == 'archive':
return CompiledArchive(fqn, data)
else:
raise RuntimeError("invalid run_type given: {}".format(run_type))

Expand Down
4 changes: 4 additions & 0 deletions dbt/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import dbt.task.init as init_task
import dbt.task.seed as seed_task
import dbt.task.test as test_task
import dbt.task.archive as archive_task
import dbt.tracking


Expand Down Expand Up @@ -71,6 +72,9 @@ def handle(args):
sub = subs.add_parser('deps', parents=[base_subparser])
sub.set_defaults(cls=deps_task.DepsTask, which='deps')

sub = subs.add_parser('archive', parents=[base_subparser])
sub.set_defaults(cls=archive_task.ArchiveTask, which='archive')

sub = subs.add_parser('run', parents=[base_subparser])
sub.add_argument('--dry', action='store_true', help="'dry run' models")
sub.add_argument('--models', required=False, nargs='+', help="Specify the models to run. All models depending on these models will also be run")
Expand Down
66 changes: 66 additions & 0 deletions dbt/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from dbt.utils import split_path
import dbt.schema_tester
import dbt.project
import dbt.archival
from dbt.utils import This, deep_merge, DBTConfigKeys, compiler_error

class SourceConfig(object):
Expand Down Expand Up @@ -578,3 +579,68 @@ def __repr__(self):
return "<Macro {}.{}: {}>".format(self.project['name'], self.name, self.filepath)


class ArchiveModel(DBTSource):
dbt_run_type = 'archive'

def __init__(self, project, create_template, archive_data):

self.create_template = create_template

self.validate(archive_data)

self.source_schema = archive_data['source_schema']
self.target_schema = archive_data['target_schema']
self.source_table = archive_data['source_table']
self.target_table = archive_data['target_table']
self.unique_key = archive_data['unique_key']
self.updated_at = archive_data['updated_at']

target_dir = self.create_template.label
rel_filepath = os.path.join(self.target_schema, self.target_table)

super(ArchiveModel, self).__init__(project, target_dir, rel_filepath, project)

def validate(self, data):
required = [
'source_schema',
'target_schema',
'source_table',
'target_table',
'unique_key',
'updated_at',
]

for key in required:
if data.get(key, None) is None:
raise RuntimeError("Invalid archive config: missing required field '{}'".format(key))

def serialize(self):
data = DBTSource.serialize(self).copy()

serialized = {
"source_schema" : self.source_schema,
"target_schema" : self.target_schema,
"source_table" : self.source_table,
"target_table" : self.target_table,
"unique_key" : self.unique_key,
"updated_at" : self.updated_at
}

data.update(serialized)
return data

def compile(self):
archival = dbt.archival.Archival(self.project, self)
query = archival.compile()

sql = self.create_template.wrap(self.target_schema, self.target_table, query, self.unique_key)
return sql

def build_path(self):
build_dir = self.create_template.label
filename = "{}.sql".format(self.name)
path_parts = [build_dir] + self.fqn[:-1] + [filename]
return os.path.join(*path_parts)

def __repr__(self):
return "<ArchiveModel {} --> {} unique:{} updated_at:{}>".format(self.source_table, self.target_table, self.unique_key, self.updated_at)
50 changes: 47 additions & 3 deletions dbt/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,42 @@ def execute(self, schema, target, model):

return row[0]

class ArchiveRunner(BaseRunner):
run_type = 'archive'

def pre_run_msg(self, model):
print_vars = {
"schema": model.target.schema,
"model_name": model.name,
}

output = "START archive table {schema}.{model_name} ".format(**print_vars)
return output

def post_run_msg(self, result):
model = result.model
print_vars = {
"schema": model.target.schema,
"model_name": model.name,
"info": "ERROR archiving" if result.errored else "OK created"
}

output = "{info} table {schema}.{model_name} ".format(**print_vars)
return output

def pre_run_all_msg(self, models):
return "Archiving {} tables".format(len(models))

def post_run_all_msg(self, results):
return "Finished archiving {} tables".format(len(results))

def status(self, result):
return result.status

def execute(self, schema, target, model):
status = schema.execute_and_handle_permissions(model.compiled_contents, model.name)
return status

class RunManager(object):
def __init__(self, project, target_path, graph_type, threads):
self.logger = logging.getLogger(__name__)
Expand All @@ -218,7 +254,9 @@ def __init__(self, project, target_path, graph_type, threads):

self.context = {
"run_started_at": datetime.now(),
"invocation_id": dbt.tracking.invocation_id,
"invocation_id" : dbt.tracking.invocation_id,
"get_columns_in_table" : self.schema.get_columns_in_table,
"get_missing_columns" : self.schema.get_missing_columns,
}


Expand Down Expand Up @@ -301,7 +339,7 @@ def execute_models(self, runner, model_dependency_list, on_failure):

num_models = len(flat_models)
if num_models == 0:
print("WARNING: No models to run in '{}'. Try checking your model configs and running `dbt compile`".format(self.target_path))
print("WARNING: Nothing to do. Try checking your model configs and running `dbt compile`".format(self.target_path))
return []

num_threads = self.target.threads
Expand Down Expand Up @@ -391,7 +429,9 @@ def run_from_graph(self, runner, limit_to):

for m in relevant_compiled_models:
if m.should_execute():
m.compile(self.context)
context = self.context.copy()
context.update(m.context())
m.compile(context)

schema_name = self.target.schema

Expand Down Expand Up @@ -443,3 +483,7 @@ def dry_run(self, limit_to=None):
runner = DryRunner()
return self.safe_run_from_graph(runner, limit_to)

def run_archive(self):
runner = ArchiveRunner()
return self.safe_run_from_graph(runner, None)

Loading