Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kubernetes labels #1236

Merged
merged 23 commits into from
Apr 17, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions metaflow/metaflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,8 @@
KUBERNETES_NODE_SELECTOR = from_conf("KUBERNETES_NODE_SELECTOR", "")
KUBERNETES_TOLERATIONS = from_conf("KUBERNETES_TOLERATIONS", "")
KUBERNETES_SECRETS = from_conf("KUBERNETES_SECRETS", "")
# Default labels for kubernetes pods
KUBERNETES_LABELS = from_conf("KUBERNETES_LABELS", "")
# Default GPU vendor to use by K8S jobs created by Metaflow (supports nvidia, amd)
KUBERNETES_GPU_VENDOR = from_conf("KUBERNETES_GPU_VENDOR", "nvidia")
# Default container image for K8S
Expand Down
3 changes: 3 additions & 0 deletions metaflow/plugins/argo/argo_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -894,11 +894,14 @@ def _container_templates(self):
# the field 'task-id' in 'parameters'
# .annotation("metaflow/task_id", ...)
.annotation("metaflow/attempt", retry_count)
# Set labels
.labels(resources.get("labels"))
)
# Set emptyDir volume for state management
.empty_dir_volume("out")
# Set node selectors
.node_selectors(resources.get("node_selector"))
# Set tolerations
.tolerations(resources.get("tolerations"))
# Set container
.container(
Expand Down
6 changes: 5 additions & 1 deletion metaflow/plugins/kubernetes/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from metaflow.metaflow_config import (
SERVICE_HEADERS,
SERVICE_INTERNAL_URL,
CARD_AZUREROOT,
dhpollack marked this conversation as resolved.
Show resolved Hide resolved
CARD_GSROOT,
CARD_S3ROOT,
DATASTORE_SYSROOT_S3,
DATATOOLS_S3ROOT,
Expand All @@ -28,8 +30,8 @@
BASH_SAVE_LOGS,
bash_capture_logs,
export_mflog_env_vars,
tail_logs,
get_log_tailer,
tail_logs,
)

from .kubernetes_client import KubernetesClient
Expand Down Expand Up @@ -151,6 +153,7 @@ def create_job(
run_time_limit=None,
env=None,
tolerations=None,
labels=None,
):

if env is None:
Expand Down Expand Up @@ -184,6 +187,7 @@ def create_job(
retries=0,
step_name=step_name,
tolerations=tolerations,
labels=labels,
)
.environment_variable("METAFLOW_CODE_SHA", code_package_sha)
.environment_variable("METAFLOW_CODE_URL", code_package_url)
Expand Down
18 changes: 16 additions & 2 deletions metaflow/plugins/kubernetes/kubernetes_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import time
import traceback

from metaflow import util, JSONTypeClass
from metaflow import JSONTypeClass, util
from metaflow._vendor import click
from metaflow.exception import METAFLOW_EXIT_DISALLOW_RETRY, CommandException
from metaflow.metadata.util import sync_local_metadata_from_datastore
Expand Down Expand Up @@ -91,6 +91,12 @@ def kubernetes():
type=JSONTypeClass(),
multiple=False,
)
@click.option(
"--labels",
multiple=True,
default=None,
help="Labels for Kubernetes pod.",
)
@click.pass_context
def step(
ctx,
Expand All @@ -110,6 +116,7 @@ def step(
gpu_vendor=None,
run_time_limit=None,
tolerations=None,
labels=None,
**kwargs
):
def echo(msg, stream="stderr", job_id=None):
Expand Down Expand Up @@ -175,7 +182,13 @@ def echo(msg, stream="stderr", job_id=None):
stderr_location = ds.get_log_location(TASK_LOG_SOURCE, "stderr")

# `node_selector` is a tuple of strings, convert it to a dictionary
node_selector = KubernetesDecorator.parse_node_selector(node_selector)
node_selector = KubernetesDecorator.parse_kube_keyvalue_list(node_selector)

# `labels` is a tuple of strings or a tuple with a single comma separated string
# convert it to a dict
labels = KubernetesDecorator.validate_kube_labels(
KubernetesDecorator.parse_kube_keyvalue_list(labels, False)
)

def _sync_metadata():
if ctx.obj.metadata.TYPE == "local":
Expand Down Expand Up @@ -218,6 +231,7 @@ def _sync_metadata():
run_time_limit=run_time_limit,
env=env,
tolerations=tolerations,
labels=labels,
)
except Exception as e:
traceback.print_exc(chain=False)
Expand Down
92 changes: 76 additions & 16 deletions metaflow/plugins/kubernetes/kubernetes_decorator.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import hashlib
import json
import os
import platform
import re
import requests
import sys
dhpollack marked this conversation as resolved.
Show resolved Hide resolved
from typing import Dict, List, Optional, Union

from metaflow.decorators import StepDecorator
from metaflow.exception import MetaflowException
Expand All @@ -13,18 +16,18 @@
KUBERNETES_CONTAINER_IMAGE,
KUBERNETES_CONTAINER_REGISTRY,
KUBERNETES_GPU_VENDOR,
KUBERNETES_LABELS,
KUBERNETES_NAMESPACE,
KUBERNETES_NODE_SELECTOR,
KUBERNETES_TOLERATIONS,
KUBERNETES_SERVICE_ACCOUNT,
KUBERNETES_SECRETS,
KUBERNETES_SERVICE_ACCOUNT,
KUBERNETES_TOLERATIONS,
)
from metaflow.plugins.resources_decorator import ResourcesDecorator
from metaflow.plugins.timeout_decorator import get_run_time_limit_for_task
from metaflow.sidecar import Sidecar

from ..aws.aws_utils import get_docker_registry

from .kubernetes import KubernetesException

try:
Expand Down Expand Up @@ -65,6 +68,8 @@ class KubernetesDecorator(StepDecorator):
in Metaflow configuration.
tolerations : List[str], default: METAFLOW_KUBERNETES_TOLERATIONS
Kubernetes tolerations to use when launching pod in Kubernetes.
labels : Dict[str, str], default: METAFLOW_KUBERNETES_LABELS
Kubernetes labels to use when launching pod in Kubernetes.
"""

name = "kubernetes"
Expand All @@ -76,6 +81,7 @@ class KubernetesDecorator(StepDecorator):
"service_account": None,
"secrets": None, # e.g., mysecret
"node_selector": None, # e.g., kubernetes.io/os=linux
"labels": None, # e.g., my_label=my_value
"namespace": None,
"gpu": None, # value of 0 implies that the scheduled node should not have GPUs
"gpu_vendor": None,
Expand All @@ -99,9 +105,17 @@ def __init__(self, attributes=None, statically_defined=False):
self.attributes["node_selector"] = KUBERNETES_NODE_SELECTOR
if not self.attributes["tolerations"] and KUBERNETES_TOLERATIONS:
self.attributes["tolerations"] = json.loads(KUBERNETES_TOLERATIONS)
if not self.attributes["labels"] and KUBERNETES_LABELS:
self.attributes["labels"] = KUBERNETES_LABELS

if isinstance(self.attributes["labels"], str):
self.attributes["labels"] = self.parse_kube_keyvalue_list(
self.attributes["labels"].split(","), False
)
self.attributes["labels"] = self.validate_kube_labels(self.attributes["labels"])
dhpollack marked this conversation as resolved.
Show resolved Hide resolved

if isinstance(self.attributes["node_selector"], str):
self.attributes["node_selector"] = self.parse_node_selector(
self.attributes["node_selector"] = self.parse_kube_keyvalue_list(
self.attributes["node_selector"].split(",")
)

Expand Down Expand Up @@ -280,10 +294,11 @@ def runtime_step_cli(
for k, v in self.attributes.items():
if k == "namespace":
cli_args.command_options["k8s_namespace"] = v
elif k == "node_selector" and v:
cli_args.command_options[k] = ",".join(
["=".join([key, str(val)]) for key, val in v.items()]
)
elif k in {"node_selector", "labels"} and v:
cli_args.command_options[k] = [
"=".join([key, str(val)]) if val else key
for key, val in v.items()
]
elif k == "tolerations":
cli_args.command_options[k] = json.dumps(v)
else:
Expand Down Expand Up @@ -399,14 +414,59 @@ def _save_package_once(cls, flow_datastore, package):
[package.blob], len_hint=1
)[0]

@classmethod
def _parse_decorator_spec(cls, deco_spec: str):
if not deco_spec:
return cls()

valid_options = "|".join(cls.defaults.keys())
deco_spec_parts = []
for part in re.split(f""",(?=[\s\w]+[{valid_options}]=)""", deco_spec):
name, val = part.split("=", 1)
if name in {"labels", "node_selector"}:
try:
json.loads(val.strip().replace('\\"', '"'))
except json.JSONDecodeError:
both = name == "node_selector"
val = json.dumps(
cls.parse_kube_keyvalue_list(val.split(","), both),
separators=(",", ":"),
)
deco_spec_parts.append("=".join([name, val]))
deco_spec_parsed = ",".join(deco_spec_parts)
return super()._parse_decorator_spec(deco_spec_parsed)

@staticmethod
def parse_node_selector(node_selector: list):
def parse_kube_keyvalue_list(items: List[str], requires_both: bool = True):
try:
return {
str(k.split("=", 1)[0]): str(k.split("=", 1)[1])
for k in node_selector or []
}
ret = {}
for item_str in items:
item = item_str.split("=", 1)
if requires_both:
item[1] # raise IndexError
ret[str(item[0])] = str(item[1]) if len(item) > 1 else None
return ret
except (AttributeError, IndexError):
raise KubernetesException(
"Unable to parse node_selector: %s" % node_selector
)
raise KubernetesException("Unable to parse kubernetes list: %s" % items)

@staticmethod
def validate_kube_labels(
labels: Optional[Dict[str, Optional[str]]],
regex_match: str = r"^(([A-Za-z0-9][-A-Za-z0-9_.]{0,61})?[A-Za-z0-9])?$",
saikonen marked this conversation as resolved.
Show resolved Hide resolved
):
def validate_label(s: Optional[str]):
if not s:
# allow empty label
return s
if not re.search(regex_match, s):
# this is the same message kubernetes itself returns
raise Exception(
saikonen marked this conversation as resolved.
Show resolved Hide resolved
f'Invalid value: "{s}": a valid label must be an empty string or '
"consist of alphanumeric characters, '-', '_' or '.', and must "
"start and end with an alphanumeric character (e.g. 'MyValue', "
"or 'my_value', or '12345', regex used for validation is "
"'^(([A-Za-z0-9][-A-Za-z0-9_.]{0,61})?[A-Za-z0-9])?$'"
saikonen marked this conversation as resolved.
Show resolved Hide resolved
)
return s

return {k: validate_label(v) for k, v in labels.items()} if labels else labels
saikonen marked this conversation as resolved.
Show resolved Hide resolved
68 changes: 68 additions & 0 deletions test/unit/test_kubernetes_decorator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import pytest

from metaflow.plugins.kubernetes.kubernetes_decorator import KubernetesDecorator


@pytest.mark.parametrize(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is awesome! Thank you so much for doing this!

"labels",
[
None,
{"label": "value"},
{"label1": "val1", "label2": "val2"},
{"label1": "val1", "label2": None},
{"label": "a"},
{"label": ""},
{
"label": (
"1234567890"
"1234567890"
"1234567890"
"1234567890"
"1234567890"
"1234567890"
"123"
)
},
{
"label": (
"1234567890"
"1234567890"
"1234-_.890"
"1234567890"
"1234567890"
"1234567890"
"123"
)
},
],
)
def test_kubernetes_decorator_validate_kube_labels(labels):
cleaned_labels = KubernetesDecorator.validate_kube_labels(labels)
assert cleaned_labels == labels


@pytest.mark.parametrize(
"labels",
[
{"label": "a-"},
{"label": ".a"},
{"label": "test()"},
{
"label": (
"1234567890"
"1234567890"
"1234567890"
"1234567890"
"1234567890"
"1234567890"
"1234"
)
},
{"label": "(){}??"},
{"valid": "test", "invalid": "bißchen"},
],
)
def test_kubernetes_decorator_validate_kube_labels_fail(labels):
"""Fail if label contains invalid characters or is too long"""
with pytest.raises(Exception):
KubernetesDecorator.validate_kube_labels(labels)