diff --git a/dagfactory/__init__.py b/dagfactory/__init__.py index 4f14b38..3a038e8 100644 --- a/dagfactory/__init__.py +++ b/dagfactory/__init__.py @@ -1,8 +1,8 @@ """Modules and methods to export for easier access""" - from .dagfactory import DagFactory, load_yaml_dags +__version__ = "0.20.0a1" __all__ = [ "DagFactory", "load_yaml_dags", -] +] \ No newline at end of file diff --git a/dagfactory/__version__.py b/dagfactory/__version__.py deleted file mode 100644 index 9af9599..0000000 --- a/dagfactory/__version__.py +++ /dev/null @@ -1,3 +0,0 @@ -"""Module contains the version of dag-factory""" - -__version__ = "0.19.0" diff --git a/dagfactory/constants.py b/dagfactory/constants.py new file mode 100644 index 0000000..6ba6200 --- /dev/null +++ b/dagfactory/constants.py @@ -0,0 +1,2 @@ +TELEMETRY_URL = "https://astronomer.gateway.scarf.sh/dag-factory/{version}" +TELEMETRY_TIMEOUT = 5.0 \ No newline at end of file diff --git a/dagfactory/dagbuilder.py b/dagfactory/dagbuilder.py index c026980..85b0e62 100644 --- a/dagfactory/dagbuilder.py +++ b/dagfactory/dagbuilder.py @@ -135,6 +135,8 @@ def __init__(self, dag_name: str, dag_config: Dict[str, Any], default_config: Di self.dag_name: str = dag_name self.dag_config: Dict[str, Any] = deepcopy(dag_config) self.default_config: Dict[str, Any] = deepcopy(default_config) + self.tasks_count: int = 0 + self.taskgroups_count: int = 0 # pylint: disable=too-many-branches,too-many-statements def get_dag_params(self) -> Dict[str, Any]: @@ -773,12 +775,15 @@ def build(self) -> Dict[str, Union[str, DAG]]: dag.tags = dag_params.get("tags", None) tasks: Dict[str, Dict[str, Any]] = dag_params["tasks"] + self.tasks_count = len(tasks) # add a property to mark this dag as an auto-generated on dag.is_dagfactory_auto_generated = True # create dictionary of task groups task_groups_dict: Dict[str, "TaskGroup"] = self.make_task_groups(dag_params.get("task_groups", {}), dag) + self.taskgroups_count = len(task_groups_dict) + # create dictionary to track tasks and set dependencies tasks_dict: Dict[str, BaseOperator] = {} diff --git a/dagfactory/dagfactory.py b/dagfactory/dagfactory.py index 0864c1c..ccdb712 100644 --- a/dagfactory/dagfactory.py +++ b/dagfactory/dagfactory.py @@ -10,9 +10,11 @@ from airflow.configuration import conf as airflow_conf from airflow.models import DAG +from dagfactory import telemetry from dagfactory.dagbuilder import DagBuilder from dagfactory.exceptions import DagFactoryConfigException, DagFactoryException + # these are params that cannot be a dag name SYSTEM_PARAMS: List[str] = ["default", "task_groups"] @@ -27,6 +29,9 @@ class DagFactory: :param config: DAG factory config dictionary. Cannot be user with `config_filepath`. :type config: dict """ + dags_count: int = 0 + tasks_count: int = 0 + taskgroups_count: int = 0 def __init__(self, config_filepath: Optional[str] = None, config: Optional[dict] = None) -> None: assert bool(config_filepath) ^ bool(config), "Either `config_filepath` or `config` should be provided" @@ -53,7 +58,6 @@ def _load_config(config_filepath: str) -> Dict[str, Any]: """ # pylint: disable=consider-using-with try: - def __join(loader: yaml.FullLoader, node: yaml.Node) -> str: seq = loader.construct_sequence(node) return "".join([str(i) for i in seq]) @@ -106,6 +110,9 @@ def build_dags(self) -> Dict[str, DAG]: dags[dag["dag_id"]]: DAG = dag["dag"] except Exception as err: raise DagFactoryException(f"Failed to generate dag {dag_name}. verify config is correct") from err + else: + self.taskgroups_count = dag_builder.taskgroups_count + self.tasks_count = dag_builder.tasks_count return dags @@ -130,6 +137,7 @@ def generate_dags(self, globals: Dict[str, Any]) -> None: """ dags: Dict[str, Any] = self.build_dags() self.register_dags(dags, globals) + self.dags_count = len(dags) def clean_dags(self, globals: Dict[str, Any]) -> None: """ @@ -183,7 +191,15 @@ def load_yaml_dags( config_file_abs_path = str(config_file_path.absolute()) logging.info("Loading %s", config_file_abs_path) try: - DagFactory(config_file_abs_path).generate_dags(globals_dict) - logging.info("DAG loaded: %s", config_file_path) + factory = DagFactory(config_file_abs_path) + factory.generate_dags(globals_dict) except Exception: # pylint: disable=broad-except logging.exception("Failed to load dag from %s", config_file_path) + else: + additional_telemetry_metrics = { + "dags_count": factory.dags_count, + "tasks_count": factory.tasks_count, + "taskgroups_count": factory.taskgroups_count + } + telemetry.emit_usage_metrics_if_enabled(additional_telemetry_metrics) + logging.info("DAG loaded: %s", config_file_path) diff --git a/dagfactory/settings.py b/dagfactory/settings.py new file mode 100644 index 0000000..5e4e022 --- /dev/null +++ b/dagfactory/settings.py @@ -0,0 +1,4 @@ +from airflow.configuration import conf + + +enable_telemetry = conf.getboolean("dag_factory", "enable_telemetry", fallback=True) diff --git a/dagfactory/telemetry.py b/dagfactory/telemetry.py new file mode 100644 index 0000000..d3c9d33 --- /dev/null +++ b/dagfactory/telemetry.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +import logging +import platform +from urllib.parse import urlencode + +import httpx +from airflow import __version__ as airflow_version +from packaging.version import parse + +import dagfactory +from dagfactory import constants +from dagfactory import settings + +CUSTOM_METRICS_KEY = "dagfactory" + + +def should_emit() -> bool: + """ + Identify if telemetry metrics should be emitted or not. + """ + return settings.enable_telemetry and not parse(dagfactory.__version__).is_prerelease + + +def collect_standard_usage_metrics() -> dict[str, object]: + """ + Return standard telemetry metrics. + """ + metrics = { + CUSTOM_METRICS_KEY: { + "version": dagfactory.__version__ + }, + "system": { + "airflow_version": airflow_version, + "platform_system": platform.system(), + "platform_machine": platform.machine(), + "python_version": platform.python_version(), + + } + } + return metrics + + +def emit_usage_metrics(metrics): + """ + Emit desired telemetry metrics to remote telemetry endpoint. + """ + query_string = urlencode(metrics) + telemetry_url = constants.TELEMETRY_URL.format(version=dagfactory.__version__) + f"?{query_string}" + logging.debug( + "Emitting the following usage metrics to %s: %s", + telemetry_url, + metrics + ) + response = httpx.get(telemetry_url, timeout=constants.TELEMETRY_TIMEOUT) + if not response.is_success: + logging.warning( + "Unable to emit usage metrics to %s. Status code: %s. Message: %s", + telemetry_url, + response.status_code, + response.text + ) + + +def emit_usage_metrics_if_enabled(additional_metrics): + """ + Checks if telemetry should be emitted, fetch standard metrics, complement with custom metrics + and emit them to remote telemetry endpoint. + """ + if should_emit(): + metrics = collect_standard_usage_metrics() + metrics[CUSTOM_METRICS_KEY].update(additional_metrics) + emit_usage_metrics(metrics) + return True + else: + logging.debug( + "Telemetry is disabled" + ) + return False \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 5daab94..f577293 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -78,9 +78,6 @@ test-cov = 'sh scripts/test/unit-cov.sh' [project.urls] Source = "https://github.com/astronomer/dag-factory" -[tool.hatch.version] -path = "dagfactory/__version__.py" - [tool.hatch.build.targets.sdist] include = ["dagfactory"]