Skip to content

Commit

Permalink
more updates
Browse files Browse the repository at this point in the history
  • Loading branch information
Kyle Wigley committed Dec 11, 2020
1 parent 61991eb commit 0d99a63
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 31 deletions.
61 changes: 41 additions & 20 deletions core/dbt/contracts/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
from datetime import datetime
from typing import Union, Dict, List, Optional, Any, NamedTuple, Sequence

from dbt.clients.system import write_json


@dataclass
class TimingInfo(JsonSchemaMixin):
Expand Down Expand Up @@ -100,7 +102,7 @@ class NodeResult(BaseResult):


@dataclass
class PartialNodeResult(NodeResult, Writable):
class PartialNodeResult(NodeResult):
# if the result got to the point where it could be skipped/failed, we would
# be returning a real result, not a partial.
@property
Expand All @@ -109,21 +111,18 @@ def skipped(self):


@dataclass
class WritableRunModelResult(NodeResult, Writable):
@property
def skipped(self):
return self.status == RunStatus.Skipped


@dataclass
class RunModelResult(WritableRunModelResult):
class RunModelResult(NodeResult):
agate_table: Optional[agate.Table] = None

def to_dict(self, *args, **kwargs):
dct = super().to_dict(*args, **kwargs)
dct.pop('agate_table', None)
return dct

@property
def skipped(self):
return self.status == RunStatus.Skipped


@dataclass
class ExecutionResult(JsonSchemaMixin):
Expand All @@ -140,7 +139,7 @@ def __getitem__(self, idx):
return self.results[idx]


RunResult = Union[PartialNodeResult, WritableRunModelResult]
RunResult = Union[PartialNodeResult, RunModelResult]


@dataclass
Expand All @@ -153,51 +152,73 @@ class RunResultsMetadata(BaseArtifactMetadata):
@dataclass
class RunResultOutput(BaseResult):
unique_id: str
agate_table: Optional[agate.Table] = None

def to_dict(self, *args, **kwargs):
dct = super().to_dict(*args, **kwargs)
dct.pop('agate_table', None)
return dct

def process_run_result(result: Union[RunResult, RunResultOutput]) -> RunResultOutput: # noqa
if isinstance(result, RunResultOutput):
return result

def process_run_result(result: RunResult) -> RunResultOutput:
return RunResultOutput(
unique_id=result.node.unique_id,
status=result.status,
timing=result.timing,
thread_id=result.thread_id,
execution_time=result.execution_time,
message=result.message
message=result.message,
agate_table=getattr(result, 'agate_table')
)


@dataclass
@schema_version('run-results', 1)
class RunResultsArtifact(
class RunExecutionResult(
ExecutionResult,
ArtifactMixin
):
results: Sequence[RunResult]
args: Dict[str, Any] = field(default_factory=dict)
generated_at: datetime = field(default_factory=datetime.utcnow)

def write(self, path: str):
writable = RunResultsArtifact.from_execution_results(
results=self.results,
elapsed_time=self.elapsed_time,
generated_at=self.generated_at,
args=self.args,
)
writable.write(path)


@dataclass
@schema_version('run-results', 1)
class RunResultsArtifact(ExecutionResult, ArtifactMixin):
results: Sequence[RunResultOutput]
args: Dict[str, Any] = field(default_factory=dict)

@classmethod
def from_node_results(
def from_execution_results(
cls,
results: Union[Sequence[RunResult], Sequence[RunResultOutput]],
results: Sequence[RunResult],
elapsed_time: float,
generated_at: datetime,
args: Dict,
):
processed_results = [process_run_result(result) for result in results]
meta = RunResultsMetadata(
dbt_schema_version=str(cls.dbt_schema_version),
generated_at=generated_at,
)
processed_results = [process_run_result(result) for result in results]
return cls(
metadata=meta,
results=processed_results,
elapsed_time=elapsed_time,
args=args
)

def write(self, path: str, omit_none=False):
write_json(path, self.to_dict(omit_none=omit_none))


@dataclass
class RunOperationResult(ExecutionResult):
Expand Down
12 changes: 6 additions & 6 deletions core/dbt/contracts/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@
from dbt.contracts.graph.compiled import CompileResultNode
from dbt.contracts.graph.manifest import WritableManifest
from dbt.contracts.results import (
RunResultOutput, TimingInfo,
RunResult, RunResultsArtifact, TimingInfo,
CatalogArtifact,
CatalogResults,
ExecutionResult,
FreshnessExecutionResultArtifact,
FreshnessResult,
RunOperationResult,
RunOperationResultsArtifact,
RunResultsArtifact,
RunExecutionResult,
)
from dbt.contracts.util import VersionedSchema, schema_version
from dbt.exceptions import InternalException
Expand Down Expand Up @@ -224,12 +224,12 @@ def error(self):
@dataclass
@schema_version('remote-execution-result', 1)
class RemoteExecutionResult(ExecutionResult, RemoteResult):
results: Sequence[RunResultOutput]
results: Sequence[RunResult]
args: Dict[str, Any] = field(default_factory=dict)
generated_at: datetime = field(default_factory=datetime.utcnow)

def write(self, path: str):
writable = RunResultsArtifact.from_node_results(
writable = RunResultsArtifact.from_execution_results(
generated_at=self.generated_at,
results=self.results,
elapsed_time=self.elapsed_time,
Expand All @@ -240,11 +240,11 @@ def write(self, path: str):
@classmethod
def from_local_result(
cls,
base: RunResultsArtifact,
base: RunExecutionResult,
logs: List[LogMessage],
) -> 'RemoteExecutionResult':
return cls(
generated_at=base.metadata.generated_at,
generated_at=base.generated_at,
results=base.results,
elapsed_time=base.elapsed_time,
args=base.args,
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/task/printer.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ def print_run_result_error(
logger.info(" compiled SQL at {}".format(
result.node.build_path))

else:
elif result.message is not None:
first = True
for line in result.message.split("\n"):
if first:
Expand Down
5 changes: 2 additions & 3 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from dbt.contracts.graph.compiled import CompileResultNode
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.parsed import ParsedSourceDefinition
from dbt.contracts.results import NodeStatus, RunResultsArtifact
from dbt.contracts.results import NodeStatus, RunExecutionResult
from dbt.contracts.state import PreviousState
from dbt.exceptions import (
InternalException,
Expand Down Expand Up @@ -538,8 +538,7 @@ def create_schema(relation: BaseRelation) -> None:
create_future.result()

def get_result(self, results, elapsed_time, generated_at):

return RunResultsArtifact.from_node_results(
return RunExecutionResult(
results=results,
elapsed_time=elapsed_time,
generated_at=generated_at,
Expand Down
2 changes: 1 addition & 1 deletion test/integration/042_sources_test/test_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ def _assert_freshness_results(self, path, state):
'max_loaded_at': last_inserted_time,
'snapshotted_at': AnyStringWith(),
'max_loaded_at_time_ago_in_s': AnyFloat(),
'state': state,
'status': state,
'criteria': {
'filter': None,
'warn_after': {'count': 10, 'period': 'hour'},
Expand Down

0 comments on commit 0d99a63

Please sign in to comment.