Skip to content

Commit

Permalink
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
Browse files Browse the repository at this point in the history
  • Loading branch information
pre-commit-ci[bot] committed Dec 20, 2023
1 parent 547b0af commit be0de1a
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 16 deletions.
2 changes: 1 addition & 1 deletion cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ def __init__(
raise CosmosValueError(
"RenderConfig.dbt_project_path is required for rendering an airflow DAG from a DBT Graph if no manifest is provided."
)

if execution_config.execution_mode != ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None:
logger.warning(
"`ExecutionConfig.virtualenv_dir` is only supported when \
Expand Down
25 changes: 14 additions & 11 deletions cosmos/operators/virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,17 @@

PY_INTERPRETER = "python3"


def depends_on_virtualenv_dir(method: Callable[[Any], Any]) -> Callable[[Any], Any]:
def wrapper(operator: DbtVirtualenvBaseOperator, *args: Any) -> None:
if operator.virtualenv_dir is None:
raise CosmosValueError(f"Method relies on value of parameter `virtualenv_dir` which is None.")

method(operator, *args)

return wrapper


class DbtVirtualenvBaseOperator(DbtLocalBaseOperator):
"""
Executes a dbt core cli command within a Python Virtual Environment, that is created before running the dbt command
Expand All @@ -51,7 +54,8 @@ class DbtVirtualenvBaseOperator(DbtLocalBaseOperator):
within the virtual environment (if py_requirements argument is specified).
Avoid using unless the dbt job requires it.
"""
template_fields = DbtLocalBaseOperator.template_fields + ("virtualenv_dir",) # type: ignore[operator]

template_fields = DbtLocalBaseOperator.template_fields + ("virtualenv_dir",) # type: ignore[operator]

def __init__(
self,
Expand Down Expand Up @@ -107,7 +111,7 @@ def execute(self, context: Context) -> None:
logger.info(output)

def _get_or_create_venv_py_interpreter(self) -> str:
"""Helper method that parses virtual env configuration
"""Helper method that parses virtual env configuration
and returns a DBT binary within the resulting virtualenv"""

# No virtualenv_dir set, so revert to making a temporary virtualenv
Expand Down Expand Up @@ -142,19 +146,19 @@ def _get_or_create_venv_py_interpreter(self) -> str:
self.__release_venv_lock()

return py_bin

@property
def __lock_file(self) -> Path:
return Path(f"{self.virtualenv_dir}/LOCK")

@property
def _pid(self) -> int:
return os.getpid()
#@depends_on_virtualenv_dir

# @depends_on_virtualenv_dir
def _is_lock_available(self) -> bool:
if self.__lock_file.is_file():
with open(self.__lock_file, "r") as lf:
with open(self.__lock_file) as lf:
pid = int(lf.read())

self.log.info(f"Checking for running process with PID {pid}")
Expand All @@ -170,21 +174,21 @@ def _is_lock_available(self) -> bool:

@depends_on_virtualenv_dir
def __acquire_venv_lock(self) -> None:
if not self.virtualenv_dir.is_dir(): # type: ignore
if not self.virtualenv_dir.is_dir(): # type: ignore
os.mkdir(str(self.virtualenv_dir))

with open(self.__lock_file, "w") as lf:
self.log.info(f"Acquiring lock at {self.__lock_file} with pid {str(self._pid)}")
lf.write(str(self._pid))

@depends_on_virtualenv_dir
def __release_venv_lock(self) -> None:
if not self.__lock_file.is_file():
self.log.warn(f"Lockfile {self.__lock_file} not found, perhaps deleted by other concurrent operator?")

return

with open(self.__lock_file, "r") as lf:
with open(self.__lock_file) as lf:
lock_file_pid = int(lf.read())

if lock_file_pid == self._pid:
Expand Down Expand Up @@ -240,4 +244,3 @@ class DbtDocsVirtualenvOperator(DbtVirtualenvBaseOperator, DbtDocsLocalOperator)
Executes `dbt docs generate` command within a Python Virtual Environment, that is created before running the dbt
command and deleted just after.
"""

11 changes: 7 additions & 4 deletions tests/test_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def test_converter_creates_dag_with_seed(mock_load_dbt_graph, execution_mode, op
"execution_mode,operator_args",
[
(ExecutionMode.KUBERNETES, {}),
]
],
)
@patch("cosmos.converter.DbtGraph.filtered_nodes", nodes)
@patch("cosmos.converter.DbtGraph.load")
Expand All @@ -196,6 +196,7 @@ def test_converter_creates_dag_with_project_path_str(mock_load_dbt_graph, execut
)
assert converter


@pytest.mark.parametrize(
"execution_mode,virtualenv_dir,operator_args",
[
Expand Down Expand Up @@ -228,9 +229,11 @@ def test_converter_raises_warning(mock_load_dbt_graph, execution_mode, virtualen
operator_args=operator_args,
)

assert "`ExecutionConfig.virtualenv_dir` is only supported when \
ExecutionConfig.execution_mode is set to ExecutionMode.VIRTUALENV." in caplog.text

assert (
"`ExecutionConfig.virtualenv_dir` is only supported when \
ExecutionConfig.execution_mode is set to ExecutionMode.VIRTUALENV."
in caplog.text
)


@pytest.mark.parametrize(
Expand Down

0 comments on commit be0de1a

Please sign in to comment.