Skip to content

Commit

Permalink
Merge pull request #2041 from fishtown-analytics/fix/source-freshness…
Browse files Browse the repository at this point in the history
…-rpc

Add source snapshot-freshness to dbt rpc (#2040)
  • Loading branch information
cmcarthur authored Jan 16, 2020
2 parents 7d3591f + 2ac2499 commit a0524af
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 3 deletions.
6 changes: 6 additions & 0 deletions core/dbt/contracts/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ class RPCRunOperationParameters(RPCParameters):
args: Dict[str, Any] = field(default_factory=dict)


@dataclass
class RPCSourceFreshnessParameters(RPCParameters):
threads: Optional[int] = None
select: Union[None, str, List[str]] = None


# Outputs

@dataclass
Expand Down
7 changes: 5 additions & 2 deletions core/dbt/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,8 +594,11 @@ def _build_source_snapshot_freshness_subparser(subparsers, base_subparser):
Specify number of threads to use. Overrides settings in profiles.yml
'''
)
sub.set_defaults(cls=freshness_task.FreshnessTask,
which='snapshot-freshness', rpc_method=None)
sub.set_defaults(
cls=freshness_task.FreshnessTask,
which='snapshot-freshness',
rpc_method='snapshot-freshness',
)
return sub


Expand Down
15 changes: 15 additions & 0 deletions core/dbt/task/rpc/project_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
RemoteExecutionResult,
RemoteRunOperationResult,
RPCSnapshotParameters,
RPCSourceFreshnessParameters,
)
from dbt.rpc.method import (
Parameters,
)
from dbt.task.compile import CompileTask
from dbt.task.freshness import FreshnessTask
from dbt.task.generate import GenerateTask
from dbt.task.run import RunTask
from dbt.task.run_operation import RunOperationTask
Expand Down Expand Up @@ -150,3 +152,16 @@ def set_args(self, params: RPCSnapshotParameters) -> None:
self.args.exclude = self._listify(params.exclude)
if params.threads is not None:
self.args.threads = params.threads


class RemoteSourceFreshnessTask(
RPCCommandTask[RPCSourceFreshnessParameters],
FreshnessTask
):
METHOD_NAME = 'snapshot-freshness'

def set_args(self, params: RPCSourceFreshnessParameters) -> None:
self.args.selected = self._listify(params.select)
if params.threads is not None:
self.args.threads = params.threads
self.args.output = None
80 changes: 79 additions & 1 deletion test/rpc/test_base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# flake8: disable=redefined-outer-name

from datetime import datetime, timedelta
import time
import yaml
from .util import (
Expand Down Expand Up @@ -712,3 +712,81 @@ def test_get_status(
result = querier.is_result(querier.poll(token, logs=False))
assert 'logs' in result
assert len(result['logs']) == 0


source_freshness_schema_yml = '''
version: 2
sources:
- name: test_source
loaded_at_field: b
schema: {schema}
freshness:
warn_after: {{count: 10, period: hour}}
error_after: {{count: 1, period: day}}
tables:
- name: test_table
identifier: source
- name: failure_table
identifier: other_source
'''


def test_source_freshness(
project_root, profiles_root, postgres_profile, unique_schema
):
start_time = datetime.utcnow()
warn_me = start_time - timedelta(hours=18)
error_me = start_time - timedelta(days=2)
# this should trigger a 'warn'
project = ProjectDefinition(
project_data={'seeds': {'quote_columns': False}},
seeds={
'source.csv': 'a,b\n1,{}\n'.format(error_me.strftime('%Y-%m-%d %H:%M:%S')),
'other_source.csv': 'a,b\n1,{}\n'.format(error_me.strftime('%Y-%m-%d %H:%M:%S'))
},
models={
'sources.yml': source_freshness_schema_yml.format(schema=unique_schema),
},
)
querier_ctx = get_querier(
project_def=project,
project_dir=project_root,
profiles_dir=profiles_root,
schema=unique_schema,
test_kwargs={},
)

with querier_ctx as querier:
seeds = querier.async_wait_for_result(querier.seed())
assert len(seeds['results']) == 2
# should error
error_results = querier.async_wait_for_result(querier.snapshot_freshness(), state='failed')
assert len(error_results['results']) == 2
for result in error_results['results']:
assert result['status'] == 'error'
error_results = querier.async_wait_for_result(querier.cli_args('source snapshot-freshness'), state='failed')
assert len(error_results['results']) == 2
for result in error_results['results']:
assert result['status'] == 'error'

project.seeds['source.csv'] += '2,{}\n'.format(warn_me.strftime('%Y-%m-%d %H:%M:%S'))
project.write_seeds(project_root, remove=True)
querier.async_wait_for_result(querier.seed())
# should warn
warn_results = querier.async_wait_for_result(querier.snapshot_freshness(select='test_source.test_table'))
assert len(warn_results['results']) == 1
assert warn_results['results'][0]['status'] == 'warn'
warn_results = querier.async_wait_for_result(querier.cli_args('source snapshot-freshness -s test_source.test_table'))
assert len(warn_results['results']) == 1
assert warn_results['results'][0]['status'] == 'warn'

project.seeds['source.csv'] += '3,{}\n'.format(start_time.strftime('%Y-%m-%d %H:%M:%S'))
project.write_seeds(project_root, remove=True)
querier.async_wait_for_result(querier.seed())
# should pass!
pass_results = querier.async_wait_for_result(querier.snapshot_freshness(select=['test_source.test_table']))
assert len(pass_results['results']) == 1
assert pass_results['results'][0]['status'] == 'pass'
pass_results = querier.async_wait_for_result(querier.cli_args('source snapshot-freshness --select test_source.test_table'))
assert len(pass_results['results']) == 1
assert pass_results['results'][0]['status'] == 'pass'
15 changes: 15 additions & 0 deletions test/rpc/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,21 @@ def snapshot(
method='snapshot', params=params, request_id=request_id
)

def snapshot_freshness(
self,
select: Optional[Union[str, List[str]]] = None,
threads: Optional[int] = None,
request_id: int = 1,
):
params = {}
if select is not None:
params['select'] = select
if threads is not None:
params['threads'] = threads
return self.request(
method='snapshot-freshness', params=params, request_id=request_id
)

def test(
self,
models: Optional[Union[str, List[str]]] = None,
Expand Down

0 comments on commit a0524af

Please sign in to comment.