Skip to content

Commit

Permalink
Initial implementation of cross-project ref (#7276)
Browse files Browse the repository at this point in the history
* Create publication.py, various Publication classes, Dependency class

* Load dependencies.yml and the corresponding publication file

* Add "public_nodes" and populate ref_lookup

* resolve_ref working

* Add public nodes to parent and child maps

* Bump manifest version and fix tests, use ModelDependsOn

* Split out PublicationArtifact and PublicationConfig, store public_models
separately

* Store dependencies in publication artifact

* change detection of PublicModel for >= python3.10

* Handle removing references for re-processing if publication has changed

* Handle only changed publication artifacts

* Add some logging events

* Remove duplicate nodes from manifest

* refactor relation_from_relation_name

* Remove duplicate writing of manifest.json

* Add public_nodes to flat_graph

* Move some file name constants to core/dbt/constants.py

* Remove "environment" from ProjectDependency. Add
database/schema/identifier to PublicModel. Update TargetNotFound
exception.

* Include external publication dependencies in publication artifact dependencies

* Remove create_from_relation_name, call create_from_node instead

* Change PublicationArtifactChanged message to debug level

* Make write_publication_artifact a function in parser/manifest.py

* Create fixture to create minimal alternate project (just models)

* develop multi project test case
  • Loading branch information
gshank authored May 3, 2023
1 parent f1dddaa commit fd73066
Show file tree
Hide file tree
Showing 30 changed files with 6,689 additions and 720 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20230420-124756.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Publication artifacts and cross-project ref
time: 2023-04-20T12:47:56.92683-04:00
custom:
Author: gshank
Issue: "7227"
2 changes: 1 addition & 1 deletion core/dbt/adapters/base/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def create_ephemeral_from_node(
def create_from_node(
cls: Type[Self],
config: HasQuoting,
node: ManifestNode,
node,
quote_policy: Optional[Dict[str, bool]] = None,
**kwargs: Any,
) -> Self:
Expand Down
187 changes: 97 additions & 90 deletions core/dbt/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,90 @@ def write_graph(self, outfile: str, manifest: Manifest):
with open(outfile, "wb") as outfh:
pickle.dump(out_graph, outfh, protocol=pickle.HIGHEST_PROTOCOL)

def link_node(self, node: GraphMemberNode, manifest: Manifest):
self.add_node(node.unique_id)

for dependency in node.depends_on_nodes:
if dependency in manifest.nodes:
self.dependency(node.unique_id, (manifest.nodes[dependency].unique_id))
elif dependency in manifest.sources:
self.dependency(node.unique_id, (manifest.sources[dependency].unique_id))
elif dependency in manifest.metrics:
self.dependency(node.unique_id, (manifest.metrics[dependency].unique_id))
else:
raise GraphDependencyNotFoundError(node, dependency)

def link_graph(self, manifest: Manifest):
for source in manifest.sources.values():
self.add_node(source.unique_id)
for node in manifest.nodes.values():
self.link_node(node, manifest)
for exposure in manifest.exposures.values():
self.link_node(exposure, manifest)
for metric in manifest.metrics.values():
self.link_node(metric, manifest)

cycle = self.find_cycles()

if cycle:
raise RuntimeError("Found a cycle: {}".format(cycle))

def add_test_edges(self, manifest: Manifest) -> None:
"""This method adds additional edges to the DAG. For a given non-test
executable node, add an edge from an upstream test to the given node if
the set of nodes the test depends on is a subset of the upstream nodes
for the given node."""

# Given a graph:
# model1 --> model2 --> model3
# | |
# | \/
# \/ test 2
# test1
#
# Produce the following graph:
# model1 --> model2 --> model3
# | /\ | /\ /\
# | | \/ | |
# \/ | test2 ----| |
# test1 ----|---------------|

for node_id in self.graph:
# If node is executable (in manifest.nodes) and does _not_
# represent a test, continue.
if (
node_id in manifest.nodes
and manifest.nodes[node_id].resource_type != NodeType.Test
):
# Get *everything* upstream of the node
all_upstream_nodes = nx.traversal.bfs_tree(self.graph, node_id, reverse=True)
# Get the set of upstream nodes not including the current node.
upstream_nodes = set([n for n in all_upstream_nodes if n != node_id])

# Get all tests that depend on any upstream nodes.
upstream_tests = []
for upstream_node in upstream_nodes:
upstream_tests += _get_tests_for_node(manifest, upstream_node)

for upstream_test in upstream_tests:
# Get the set of all nodes that the test depends on
# including the upstream_node itself. This is necessary
# because tests can depend on multiple nodes (ex:
# relationship tests). Test nodes do not distinguish
# between what node the test is "testing" and what
# node(s) it depends on.
test_depends_on = set(manifest.nodes[upstream_test].depends_on_nodes)

# If the set of nodes that an upstream test depends on
# is a subset of all upstream nodes of the current node,
# add an edge from the upstream test to the current node.
if test_depends_on.issubset(upstream_nodes):
self.graph.add_edge(upstream_test, node_id, edge_type="parent_test")

def get_graph(self, manifest: Manifest) -> Graph:
self.link_graph(manifest)
return Graph(self.graph)

def get_graph_summary(self, manifest: Manifest) -> Dict[int, Dict[str, Any]]:
"""Create a smaller summary of the graph, suitable for basic diagnostics
and performance tuning. The summary includes only the edge structure,
Expand Down Expand Up @@ -406,98 +490,13 @@ def _compile_code(

return node

def write_graph_file(self, linker: Linker, manifest: Manifest):
filename = graph_file_name
graph_path = os.path.join(self.config.target_path, filename)
flags = get_flags()
if flags.WRITE_JSON:
linker.write_graph(graph_path, manifest)

def link_node(self, linker: Linker, node: GraphMemberNode, manifest: Manifest):
linker.add_node(node.unique_id)

for dependency in node.depends_on_nodes:
if dependency in manifest.nodes:
linker.dependency(node.unique_id, (manifest.nodes[dependency].unique_id))
elif dependency in manifest.sources:
linker.dependency(node.unique_id, (manifest.sources[dependency].unique_id))
elif dependency in manifest.metrics:
linker.dependency(node.unique_id, (manifest.metrics[dependency].unique_id))
else:
raise GraphDependencyNotFoundError(node, dependency)

def link_graph(self, linker: Linker, manifest: Manifest):
for source in manifest.sources.values():
linker.add_node(source.unique_id)
for node in manifest.nodes.values():
self.link_node(linker, node, manifest)
for exposure in manifest.exposures.values():
self.link_node(linker, exposure, manifest)
for metric in manifest.metrics.values():
self.link_node(linker, metric, manifest)

cycle = linker.find_cycles()

if cycle:
raise RuntimeError("Found a cycle: {}".format(cycle))

def add_test_edges(self, linker: Linker, manifest: Manifest) -> None:
"""This method adds additional edges to the DAG. For a given non-test
executable node, add an edge from an upstream test to the given node if
the set of nodes the test depends on is a subset of the upstream nodes
for the given node."""

# Given a graph:
# model1 --> model2 --> model3
# | |
# | \/
# \/ test 2
# test1
#
# Produce the following graph:
# model1 --> model2 --> model3
# | /\ | /\ /\
# | | \/ | |
# \/ | test2 ----| |
# test1 ----|---------------|

for node_id in linker.graph:
# If node is executable (in manifest.nodes) and does _not_
# represent a test, continue.
if (
node_id in manifest.nodes
and manifest.nodes[node_id].resource_type != NodeType.Test
):
# Get *everything* upstream of the node
all_upstream_nodes = nx.traversal.bfs_tree(linker.graph, node_id, reverse=True)
# Get the set of upstream nodes not including the current node.
upstream_nodes = set([n for n in all_upstream_nodes if n != node_id])

# Get all tests that depend on any upstream nodes.
upstream_tests = []
for upstream_node in upstream_nodes:
upstream_tests += _get_tests_for_node(manifest, upstream_node)

for upstream_test in upstream_tests:
# Get the set of all nodes that the test depends on
# including the upstream_node itself. This is necessary
# because tests can depend on multiple nodes (ex:
# relationship tests). Test nodes do not distinguish
# between what node the test is "testing" and what
# node(s) it depends on.
test_depends_on = set(manifest.nodes[upstream_test].depends_on_nodes)

# If the set of nodes that an upstream test depends on
# is a subset of all upstream nodes of the current node,
# add an edge from the upstream test to the current node.
if test_depends_on.issubset(upstream_nodes):
linker.graph.add_edge(upstream_test, node_id, edge_type="parent_test")

# This method doesn't actually "compile" any of the nodes. That is done by the
# "compile_node" method. This creates a Linker and builds the networkx graph,
# writes out the graph.gpickle file, and prints the stats, returning a Graph object.
def compile(self, manifest: Manifest, write=True, add_test_edges=False) -> Graph:
self.initialize()
linker = Linker()

self.link_graph(linker, manifest)
linker.link_graph(manifest)

# Create a file containing basic information about graph structure,
# supporting diagnostics and performance analysis.
Expand All @@ -507,7 +506,7 @@ def compile(self, manifest: Manifest, write=True, add_test_edges=False) -> Graph

if add_test_edges:
manifest.build_parent_and_child_maps()
self.add_test_edges(linker, manifest)
linker.add_test_edges(manifest)

# Create another diagnostic summary, just as above, but this time
# including the test edges.
Expand All @@ -533,10 +532,18 @@ def compile(self, manifest: Manifest, write=True, add_test_edges=False) -> Graph
self.config.args.__class__ == argparse.Namespace
and self.config.args.cls == list_task.ListTask
):
stats = _generate_stats(manifest)
print_compile_stats(stats)

return Graph(linker.graph)

def write_graph_file(self, linker: Linker, manifest: Manifest):
filename = graph_file_name
graph_path = os.path.join(self.config.target_path, filename)
flags = get_flags()
if flags.WRITE_JSON:
linker.write_graph(graph_path, manifest)

# writes the "compiled_code" into the target/compiled directory
def _write_node(self, node: ManifestSQLNode) -> ManifestSQLNode:
if not node.extra_ctes_injected or node.resource_type in (
Expand Down
4 changes: 4 additions & 0 deletions core/dbt/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,7 @@
PIN_PACKAGE_URL = (
"https://docs.getdbt.com/docs/package-management#section-specifying-package-versions"
)

DEPENDENCIES_FILE_NAME = "dependencies.yml"
MANIFEST_FILE_NAME = "manifest.json"
PARTIAL_PARSE_FILE_NAME = "partial_parse.msgpack"
12 changes: 10 additions & 2 deletions core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ def resolve(
) -> RelationProxy:
self.model.refs.append(self._repack_args(name, package, version))

# This is not the ref for the "name" passed in, but for the current model.
return self.Relation.create_from(self.config, self.model)


Expand Down Expand Up @@ -512,7 +513,11 @@ def resolve(
return self.create_relation(target_model)

def create_relation(self, target_model: ManifestNode) -> RelationProxy:
if target_model.is_ephemeral_model:
if target_model.is_public_node:
# Get quoting from publication artifact
pub_metadata = self.manifest.publications[target_model.package_name].metadata
return self.Relation.create_from_node(pub_metadata, target_model)
elif target_model.is_ephemeral_model:
self.model.set_cte(target_model.unique_id, None)
return self.Relation.create_ephemeral_from_node(self.config, target_model)
else:
Expand All @@ -525,7 +530,10 @@ def validate(
target_package: Optional[str],
target_version: Optional[NodeVersion],
) -> None:
if resolved.unique_id not in self.model.depends_on.nodes:
if (
resolved.unique_id not in self.model.depends_on.nodes
and resolved.unique_id not in self.model.depends_on.public_nodes
):
args = self._repack_args(target_name, target_package, target_version)
raise RefBadContextError(node=self.model, args=args)

Expand Down
Loading

0 comments on commit fd73066

Please sign in to comment.