Skip to content

Commit

Permalink
Revert "Revert "Upgrade underlying Argo SDK (#38)" (#42)"
Browse files Browse the repository at this point in the history
This reverts commit 015f11f.
  • Loading branch information
flaviuvadan authored Jan 9, 2022
1 parent 015f11f commit cb04778
Show file tree
Hide file tree
Showing 26 changed files with 549 additions and 431 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ The general format is:
```

# 0.4.1 - DATE (08/01/2022)

### Changed

- underlying Argo SDK dependency from the current PyPi argo-workflows to the Argo Workflows repo unpublished SDK

# 0.4.0 - DATE (15/12/2021)

### Added
Expand Down
305 changes: 120 additions & 185 deletions Pipfile.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.4.0
0.4.1
2 changes: 1 addition & 1 deletion conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class MockModel(BaseModel):
field1: int = 1
field2: int = 2

yield MockModel
yield MockModel()


@pytest.fixture(scope='session')
Expand Down
2 changes: 1 addition & 1 deletion examples/artifact_with_fanout.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def consumer(i: int):


ws = WorkflowService('my-argo-server.com', 'my-auth-token')
w = Workflow('fv-testing', ws)
w = Workflow('artifact-with-fanout', ws)
w_t = Task('writer', writer, output_artifacts=[OutputArtifact(name='test', path='/file')])
f_t = Task(
'fanout',
Expand Down
6 changes: 3 additions & 3 deletions examples/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@

# TODO: replace the domain and token with your own
ws = WorkflowService('my-argo-server.com', 'my-auth-token')
w = Workflow('fv-testing', ws)
w = Workflow('container', ws)

# notice the placeholder `lambda: _`, for now this is required because
# notice the placeholder `lambda: None`, for now this is required because
# func is a required parameter, by design. Perhaps this can be made
# `Optional[Callable]` in the future
t = Task('cowsay', lambda: _, image='docker/whalesay', command=['cowsay', 'foo'])
t = Task('cowsay', lambda: None, image='docker/whalesay', command=['cowsay', 'foo'])
w.add_task(t)
w.submit()
2 changes: 1 addition & 1 deletion examples/dynamic_fanout.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def consume(value: int):

# TODO: replace the domain and token with your own
ws = WorkflowService('my-argo-server.com', 'my-auth-token')
w = Workflow('dynamic-fan-out', ws)
w = Workflow('dynamic-fanout', ws)
generate_task = Task('generate', generate)
consume_task = Task('consume', consume, input_from=InputFrom(name='generate', parameters=['value']))

Expand Down
2 changes: 1 addition & 1 deletion examples/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@ def random_fail():
# TODO: replace the domain and token with your own
ws = WorkflowService('my-argo-server.com', 'my-auth-token')
w = Workflow('retry', ws)
t = Task('fail', random_fail, retry=Retry(duration=5, max_duration=60))
t = Task('fail', random_fail, retry=Retry(duration=3, max_duration=9))
w.add_task(t)
w.submit()
2 changes: 1 addition & 1 deletion mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ check_untyped_defs = True
no_implicit_optional = True
strict_optional = False

[mypy-argo.workflows.*]
[mypy-argo_workflows.*]
ignore_missing_imports = True

[mypy-urllib3.*]
Expand Down
5 changes: 3 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@
include_package_data=True,
python_requires='>=3.7',
install_requires=[
"argo-workflows",
# TODO: argo_workflows did not make it into a release that would have published an official
# TODO: PyPi version. Once that happens, which is likely in 3.2+, this dependency can be changed safely
"argo_workflows @ git+https:/argoproj/argo-workflows@master#subdirectory=sdks/python/client",
"pydantic",
"python-dateutil",
"urllib3",
"certifi"
],
zip_safe=False
)

10 changes: 5 additions & 5 deletions src/hera/v1/artifact.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from argo.workflows.client import V1alpha1Artifact
from argo_workflows.models import IoArgoprojWorkflowV1alpha1Artifact
from pydantic import BaseModel


Expand All @@ -25,9 +25,9 @@ class Artifact(BaseModel):
name: str
path: str

def get_spec(self) -> V1alpha1Artifact:
def get_spec(self) -> IoArgoprojWorkflowV1alpha1Artifact:
"""Constructs the corresponding Argo artifact representation"""
return V1alpha1Artifact(name=self.name, path=self.path)
return IoArgoprojWorkflowV1alpha1Artifact(name=self.name, path=self.path)


class OutputArtifact(Artifact):
Expand Down Expand Up @@ -57,7 +57,7 @@ class InputArtifact(Artifact):
from_task: str
artifact_name: str

def get_spec(self) -> V1alpha1Artifact:
def get_spec(self) -> IoArgoprojWorkflowV1alpha1Artifact:
"""Constructs the corresponding Argo artifact representation"""
_from = f"{{{{tasks.{self.from_task}.outputs.artifacts.{self.artifact_name}}}}}"
return V1alpha1Artifact(name=self.name, path=self.path, _from=_from)
return IoArgoprojWorkflowV1alpha1Artifact(name=self.name, path=self.path, _from=_from)
3 changes: 2 additions & 1 deletion src/hera/v1/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Holds client configurations for communicating with Argo APIs"""
from argo.workflows.client import ApiClient as ArgoApiClient

from argo_workflows.api_client import ApiClient as ArgoApiClient

from hera.v1.config import Config

Expand Down
2 changes: 1 addition & 1 deletion src/hera/v1/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os

import urllib3
from argo.workflows.client import Configuration as ArgoConfig
from argo_workflows.configuration import Configuration as ArgoConfig

# __get_config() explicitly disables SSL verification, so urllib3 will throw a warning to the user. Since we have
# explicitly asked for it to disable SSL, it's safe to ignore the warning.
Expand Down
47 changes: 25 additions & 22 deletions src/hera/v1/cron_workflow.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
"""The implementation of a Hera cron workflow for Argo-based cron workflows"""
from datetime import datetime, timezone
from typing import Optional
from uuid import uuid4

from argo.workflows.client import (
V1alpha1CronWorkflow,
V1alpha1CronWorkflowSpec,
V1alpha1CronWorkflowStatus,
V1alpha1DAGTemplate,
V1alpha1Template,
V1alpha1WorkflowSpec,
V1ObjectMeta,
from argo_workflows.models import (
IoArgoprojWorkflowV1alpha1CronWorkflow,
IoArgoprojWorkflowV1alpha1CronWorkflowSpec,
IoArgoprojWorkflowV1alpha1DAGTemplate,
IoArgoprojWorkflowV1alpha1Template,
IoArgoprojWorkflowV1alpha1WorkflowSpec,
ObjectMeta,
)

from hera.v1.cron_workflow_service import CronWorkflowService
Expand All @@ -20,8 +18,8 @@
class CronWorkflow:
"""A cron workflow representation.
CronWorkflow are workflows that run on a preset schedule.
In essence, CronWorkflow = Workflow + some specific cron options.
CronWorkflow are workflows that run on a preset schedule. In essence, CronWorkflow = Workflow + some specific cron
options.
Parameters
----------
Expand Down Expand Up @@ -51,26 +49,31 @@ def __init__(
self.parallelism = parallelism
self.service_account_name = service_account_name

self.dag_template = V1alpha1DAGTemplate(tasks=[])
self.template = V1alpha1Template(
self.dag_template = IoArgoprojWorkflowV1alpha1DAGTemplate(tasks=[])

self.template = IoArgoprojWorkflowV1alpha1Template(
name=self.name,
steps=[],
dag=self.dag_template,
parallelism=self.parallelism,
service_account_name=self.service_account_name,
)
self.metadata = V1ObjectMeta(name=self.name)
self.spec = V1alpha1WorkflowSpec(
templates=[self.template], entrypoint=self.name, service_account_name=self.service_account_name

self.metadata = ObjectMeta(name=self.name)
self.spec = IoArgoprojWorkflowV1alpha1WorkflowSpec(
templates=[self.template],
entrypoint=self.name,
volume_claim_templates=[],
volumes=[],
)

self.cron_spec = V1alpha1CronWorkflowSpec(schedule=self.schedule, workflow_spec=self.spec)
self.workflow = V1alpha1CronWorkflow(
if self.service_account_name:
setattr(self.template, 'service_account_name', self.service_account_name)
setattr(self.spec, 'service_account_name', self.service_account_name)

self.cron_spec = IoArgoprojWorkflowV1alpha1CronWorkflowSpec(schedule=self.schedule, workflow_spec=self.spec)
self.workflow = IoArgoprojWorkflowV1alpha1CronWorkflow(
metadata=self.metadata,
spec=self.cron_spec,
status=V1alpha1CronWorkflowStatus(
active=[], conditions=[], last_scheduled_time=datetime.now(timezone.utc)
),
)

def add_task(self, t: Task) -> None:
Expand Down
47 changes: 31 additions & 16 deletions src/hera/v1/cron_workflow_service.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
"""Holds the cron workflow service that supports client cron workflow creations"""
from typing import Tuple

from argo.workflows.client import (
CronWorkflowServiceApi,
V1alpha1CreateCronWorkflowRequest,
V1alpha1CronWorkflow,
V1alpha1CronWorkflowResumeRequest,
V1alpha1CronWorkflowSuspendRequest,
from argo_workflows.apis import CronWorkflowServiceApi
from argo_workflows.models import (
IoArgoprojWorkflowV1alpha1CreateCronWorkflowRequest,
IoArgoprojWorkflowV1alpha1CronWorkflow,
IoArgoprojWorkflowV1alpha1CronWorkflowResumeRequest,
IoArgoprojWorkflowV1alpha1CronWorkflowSuspendRequest,
)

from hera.v1.client import Client
Expand Down Expand Up @@ -34,7 +34,9 @@ def __init__(self, domain: str, token: str, namespace: str = 'default'):
api_client = Client(Config(domain), token).api_client
self.service = CronWorkflowServiceApi(api_client=api_client)

def create(self, cron_workflow: V1alpha1CronWorkflow, namespace: str = 'default') -> V1alpha1CronWorkflow:
def create(
self, cron_workflow: IoArgoprojWorkflowV1alpha1CronWorkflow, namespace: str = 'default'
) -> IoArgoprojWorkflowV1alpha1CronWorkflow:
"""Creates given cron workflow in the argo server.
Parameters
Expand All @@ -44,12 +46,18 @@ def create(self, cron_workflow: V1alpha1CronWorkflow, namespace: str = 'default'
namespace: str = 'default'
The K8S namespace of the Argo server to create the cron workflow in.
Returns
-------
IoArgoprojWorkflowV1alpha1CronWorkflow - created workflow.
Raises
------
argo.workflows.client.ApiException: Raised upon any HTTP-related errors
argo_workflows.exceptions.ApiException: Raised upon any HTTP-related errors.
"""
return self.service.create_cron_workflow(
namespace, V1alpha1CreateCronWorkflowRequest(cron_workflow=cron_workflow)
namespace,
IoArgoprojWorkflowV1alpha1CreateCronWorkflowRequest(cron_workflow=cron_workflow, namespace=namespace),
_check_return_type=False,
)

def delete(self, name: str, namespace: str = 'default') -> Tuple[object, int, dict]:
Expand All @@ -68,9 +76,9 @@ def delete(self, name: str, namespace: str = 'default') -> Tuple[object, int, di
Raises
------
argo.workflows.client.ApiException: Raised upon any HTTP-related errors
argo_workflows.exceptions.ApiException: Raised upon any HTTP-related errors.
"""
return self.service.delete_cron_workflow(namespace, name)
return self.service.delete_cron_workflow(namespace, name, _check_return_type=False)

def suspend(self, name: str, namespace: str = 'default') -> Tuple[object, int, dict]:
"""Suspends a cron workflow from the given namespace based on the specified name.
Expand All @@ -88,10 +96,13 @@ def suspend(self, name: str, namespace: str = 'default') -> Tuple[object, int, d
Raises
------
argo.workflows.client.ApiException: Raised upon any HTTP-related errors
argo_workflows.exceptions.ApiException: Raised upon any HTTP-related errors.
"""
return self.service.suspend_cron_workflow(
namespace, name, body=V1alpha1CronWorkflowSuspendRequest(name=name, namespace=namespace)
namespace,
name,
IoArgoprojWorkflowV1alpha1CronWorkflowSuspendRequest(name=name, namespace=namespace),
_check_return_type=False,
)

def resume(self, name: str, namespace: str = 'default') -> Tuple[object, int, dict]:
Expand All @@ -110,14 +121,14 @@ def resume(self, name: str, namespace: str = 'default') -> Tuple[object, int, di
Raises
------
argo.workflows.client.ApiException: Raised upon any HTTP-related errors
argo_workflows.exceptions.ApiException: Raised upon any HTTP-related errors.
"""
return self.service.resume_cron_workflow(
namespace, name, body=V1alpha1CronWorkflowResumeRequest(name=name, namespace=namespace)
namespace, name, body=IoArgoprojWorkflowV1alpha1CronWorkflowResumeRequest(name=name, namespace=namespace)
)

def get_cron_workflow_link(self, name: str, namespace: str = 'default') -> str:
"""Assembles a cron workflow link for the given cron workflow name. Note that the returned path works only for Argo.
"""Assembles a cron workflow link for the given cron workflow name.
Parameters
----------
Expand All @@ -130,5 +141,9 @@ def get_cron_workflow_link(self, name: str, namespace: str = 'default') -> str:
-------
str
The cron workflow link.
Notes
-----
The returned path works only for Argo.
"""
return f'https://{self._domain}/cron-workflows/{namespace}/{name}'
13 changes: 6 additions & 7 deletions src/hera/v1/empty_dir_volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import uuid
from typing import Optional

from argo.workflows.client import V1EmptyDirVolumeSource, V1Volume, V1VolumeMount
from argo_workflows.models import EmptyDirVolumeSource, Volume, VolumeMount
from pydantic import BaseModel, validator


Expand Down Expand Up @@ -38,24 +38,23 @@ def check_name(cls, value):
return str(uuid.uuid4())
return value

def get_volume(self) -> V1Volume:
def get_volume(self) -> Volume:
"""Constructs an Argo volume representation for mounting existing volumes to a step/task.
Returns
-------
V1Volume
The volume representation that can be mounted in workflow steps/tasks.
"""
size_limit = self.size if self.size else None
empty_dir = V1EmptyDirVolumeSource(medium='Memory', size_limit=size_limit)
return V1Volume(name=self.name, empty_dir=empty_dir)
empty_dir = EmptyDirVolumeSource(medium='Memory', size_limit=self.size)
return Volume(name=self.name, empty_dir=empty_dir)

def get_mount(self) -> V1VolumeMount:
def get_mount(self) -> VolumeMount:
"""Constructs and returns an Argo volume mount representation for tasks.
Returns
-------
V1VolumeMount
The Argo model for mounting volumes.
"""
return V1VolumeMount(mount_path=self.mount_path, name=self.name)
return VolumeMount(mount_path=self.mount_path, name=self.name)
12 changes: 6 additions & 6 deletions src/hera/v1/env.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
from typing import Any, Optional, Union

from argo.workflows.client import V1EnvVar, V1EnvVarSource, V1SecretKeySelector
from argo_workflows.models import EnvVar, EnvVarSource, SecretKeySelector
from pydantic import BaseModel, validator

from hera.v1.validators import json_serializable
Expand Down Expand Up @@ -36,10 +36,10 @@ def check_value_json_serializable(cls, value):
return value

@property
def argo_spec(self) -> V1EnvVar:
def argo_spec(self) -> EnvVar:
"""Constructs and returns the Argo environment specification"""
value = self.value.json() if isinstance(self.value, BaseModel) else json.dumps(self.value)
return V1EnvVar(name=self.name, value=value)
return EnvVar(name=self.name, value=value)


class SecretEnvSpec(EnvSpec):
Expand All @@ -57,9 +57,9 @@ class SecretEnvSpec(EnvSpec):
secret_key: str

@property
def argo_spec(self) -> V1EnvVar:
def argo_spec(self) -> EnvVar:
"""Constructs and returns the Argo environment specification"""
return V1EnvVar(
return EnvVar(
name=self.name,
value_from=V1EnvVarSource(secret_key_ref=V1SecretKeySelector(name=self.secret_name, key=self.secret_key)),
value_from=EnvVarSource(secret_key_ref=SecretKeySelector(name=self.secret_name, key=self.secret_key)),
)
Loading

0 comments on commit cb04778

Please sign in to comment.