Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial implementation of telemtry using Scarf in DAG Factory #250

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dagfactory/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""Modules and methods to export for easier access"""

from .dagfactory import DagFactory, load_yaml_dags

__version__ = "0.20.0a1"
__all__ = [
"DagFactory",
"load_yaml_dags",
]
]
3 changes: 0 additions & 3 deletions dagfactory/__version__.py

This file was deleted.

2 changes: 2 additions & 0 deletions dagfactory/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
TELEMETRY_URL = "https://astronomer.gateway.scarf.sh/dag-factory/{version}"
TELEMETRY_TIMEOUT = 5.0
5 changes: 5 additions & 0 deletions dagfactory/dagbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ def __init__(self, dag_name: str, dag_config: Dict[str, Any], default_config: Di
self.dag_name: str = dag_name
self.dag_config: Dict[str, Any] = deepcopy(dag_config)
self.default_config: Dict[str, Any] = deepcopy(default_config)
self.tasks_count: int = 0
self.taskgroups_count: int = 0

# pylint: disable=too-many-branches,too-many-statements
def get_dag_params(self) -> Dict[str, Any]:
Expand Down Expand Up @@ -773,12 +775,15 @@ def build(self) -> Dict[str, Union[str, DAG]]:
dag.tags = dag_params.get("tags", None)

tasks: Dict[str, Dict[str, Any]] = dag_params["tasks"]
self.tasks_count = len(tasks)

# add a property to mark this dag as an auto-generated on
dag.is_dagfactory_auto_generated = True

# create dictionary of task groups
task_groups_dict: Dict[str, "TaskGroup"] = self.make_task_groups(dag_params.get("task_groups", {}), dag)
self.taskgroups_count = len(task_groups_dict)


# create dictionary to track tasks and set dependencies
tasks_dict: Dict[str, BaseOperator] = {}
Expand Down
22 changes: 19 additions & 3 deletions dagfactory/dagfactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
from airflow.configuration import conf as airflow_conf
from airflow.models import DAG

from dagfactory import telemetry
from dagfactory.dagbuilder import DagBuilder
from dagfactory.exceptions import DagFactoryConfigException, DagFactoryException


# these are params that cannot be a dag name
SYSTEM_PARAMS: List[str] = ["default", "task_groups"]

Expand All @@ -27,6 +29,9 @@ class DagFactory:
:param config: DAG factory config dictionary. Cannot be user with `config_filepath`.
:type config: dict
"""
dags_count: int = 0
tasks_count: int = 0
taskgroups_count: int = 0

def __init__(self, config_filepath: Optional[str] = None, config: Optional[dict] = None) -> None:
assert bool(config_filepath) ^ bool(config), "Either `config_filepath` or `config` should be provided"
Expand All @@ -53,7 +58,6 @@ def _load_config(config_filepath: str) -> Dict[str, Any]:
"""
# pylint: disable=consider-using-with
try:

def __join(loader: yaml.FullLoader, node: yaml.Node) -> str:
seq = loader.construct_sequence(node)
return "".join([str(i) for i in seq])
Expand Down Expand Up @@ -106,6 +110,9 @@ def build_dags(self) -> Dict[str, DAG]:
dags[dag["dag_id"]]: DAG = dag["dag"]
except Exception as err:
raise DagFactoryException(f"Failed to generate dag {dag_name}. verify config is correct") from err
else:
self.taskgroups_count = dag_builder.taskgroups_count
self.tasks_count = dag_builder.tasks_count

return dags

Expand All @@ -130,6 +137,7 @@ def generate_dags(self, globals: Dict[str, Any]) -> None:
"""
dags: Dict[str, Any] = self.build_dags()
self.register_dags(dags, globals)
self.dags_count = len(dags)

def clean_dags(self, globals: Dict[str, Any]) -> None:
"""
Expand Down Expand Up @@ -183,7 +191,15 @@ def load_yaml_dags(
config_file_abs_path = str(config_file_path.absolute())
logging.info("Loading %s", config_file_abs_path)
try:
DagFactory(config_file_abs_path).generate_dags(globals_dict)
logging.info("DAG loaded: %s", config_file_path)
factory = DagFactory(config_file_abs_path)
factory.generate_dags(globals_dict)
except Exception: # pylint: disable=broad-except
logging.exception("Failed to load dag from %s", config_file_path)
else:
additional_telemetry_metrics = {
"dags_count": factory.dags_count,
"tasks_count": factory.tasks_count,
"taskgroups_count": factory.taskgroups_count
}
telemetry.emit_usage_metrics_if_enabled(additional_telemetry_metrics)
logging.info("DAG loaded: %s", config_file_path)
4 changes: 4 additions & 0 deletions dagfactory/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from airflow.configuration import conf


enable_telemetry = conf.getboolean("dag_factory", "enable_telemetry", fallback=True)
79 changes: 79 additions & 0 deletions dagfactory/telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from __future__ import annotations

import logging
import platform
from urllib.parse import urlencode

import httpx
from airflow import __version__ as airflow_version
from packaging.version import parse

import dagfactory
from dagfactory import constants
from dagfactory import settings

CUSTOM_METRICS_KEY = "dagfactory"


def should_emit() -> bool:
"""
Identify if telemetry metrics should be emitted or not.
"""
return settings.enable_telemetry and not parse(dagfactory.__version__).is_prerelease


def collect_standard_usage_metrics() -> dict[str, object]:
"""
Return standard telemetry metrics.
"""
metrics = {
CUSTOM_METRICS_KEY: {
"version": dagfactory.__version__
},
"system": {
"airflow_version": airflow_version,
"platform_system": platform.system(),
"platform_machine": platform.machine(),
"python_version": platform.python_version(),

}
}
return metrics


def emit_usage_metrics(metrics):
"""
Emit desired telemetry metrics to remote telemetry endpoint.
"""
query_string = urlencode(metrics)
telemetry_url = constants.TELEMETRY_URL.format(version=dagfactory.__version__) + f"?{query_string}"
logging.debug(
"Emitting the following usage metrics to %s: %s",
telemetry_url,
metrics
)
response = httpx.get(telemetry_url, timeout=constants.TELEMETRY_TIMEOUT)
if not response.is_success:
logging.warning(
"Unable to emit usage metrics to %s. Status code: %s. Message: %s",
telemetry_url,
response.status_code,
response.text
)


def emit_usage_metrics_if_enabled(additional_metrics):
"""
Checks if telemetry should be emitted, fetch standard metrics, complement with custom metrics
and emit them to remote telemetry endpoint.
"""
if should_emit():
metrics = collect_standard_usage_metrics()
metrics[CUSTOM_METRICS_KEY].update(additional_metrics)
emit_usage_metrics(metrics)
return True
else:
logging.debug(
"Telemetry is disabled"
)
return False
3 changes: 0 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,6 @@ test-cov = 'sh scripts/test/unit-cov.sh'
[project.urls]
Source = "https:/astronomer/dag-factory"

[tool.hatch.version]
path = "dagfactory/__version__.py"

[tool.hatch.build.targets.sdist]
include = ["dagfactory"]

Expand Down
Loading