Skip to content

Commit

Permalink
Revert "Add kubernetes labels (#1236)"
Browse files Browse the repository at this point in the history
This reverts commit a992dde.
  • Loading branch information
savingoyal authored Apr 17, 2023
1 parent a992dde commit df98332
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 216 deletions.
2 changes: 0 additions & 2 deletions metaflow/metaflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,6 @@
KUBERNETES_NODE_SELECTOR = from_conf("KUBERNETES_NODE_SELECTOR", "")
KUBERNETES_TOLERATIONS = from_conf("KUBERNETES_TOLERATIONS", "")
KUBERNETES_SECRETS = from_conf("KUBERNETES_SECRETS", "")
# Default labels for kubernetes pods
KUBERNETES_LABELS = from_conf("KUBERNETES_LABELS", "")
# Default GPU vendor to use by K8S jobs created by Metaflow (supports nvidia, amd)
KUBERNETES_GPU_VENDOR = from_conf("KUBERNETES_GPU_VENDOR", "nvidia")
# Default container image for K8S
Expand Down
6 changes: 2 additions & 4 deletions metaflow/plugins/argo/argo_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -887,22 +887,20 @@ def _container_templates(self):
.retry_strategy(
times=total_retries,
minutes_between_retries=minutes_between_retries,
).metadata(
)
.metadata(
ObjectMeta().annotation("metaflow/step_name", node.name)
# Unfortunately, we can't set the task_id since it is generated
# inside the pod. However, it can be inferred from the annotation
# set by argo-workflows - `workflows.argoproj.io/outputs` - refer
# the field 'task-id' in 'parameters'
# .annotation("metaflow/task_id", ...)
.annotation("metaflow/attempt", retry_count)
# Set labels
.labels(resources.get("labels"))
)
# Set emptyDir volume for state management
.empty_dir_volume("out")
# Set node selectors
.node_selectors(resources.get("node_selector"))
# Set tolerations
.tolerations(resources.get("tolerations"))
# Set container
.container(
Expand Down
6 changes: 1 addition & 5 deletions metaflow/plugins/kubernetes/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
from metaflow.metaflow_config import (
SERVICE_HEADERS,
SERVICE_INTERNAL_URL,
CARD_AZUREROOT,
CARD_GSROOT,
CARD_S3ROOT,
DATASTORE_SYSROOT_S3,
DATATOOLS_S3ROOT,
Expand All @@ -31,8 +29,8 @@
BASH_SAVE_LOGS,
bash_capture_logs,
export_mflog_env_vars,
get_log_tailer,
tail_logs,
get_log_tailer,
)

from .kubernetes_client import KubernetesClient
Expand Down Expand Up @@ -154,7 +152,6 @@ def create_job(
run_time_limit=None,
env=None,
tolerations=None,
labels=None,
):

if env is None:
Expand Down Expand Up @@ -188,7 +185,6 @@ def create_job(
retries=0,
step_name=step_name,
tolerations=tolerations,
labels=labels,
)
.environment_variable("METAFLOW_CODE_SHA", code_package_sha)
.environment_variable("METAFLOW_CODE_URL", code_package_url)
Expand Down
17 changes: 2 additions & 15 deletions metaflow/plugins/kubernetes/kubernetes_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import time
import traceback

from metaflow import JSONTypeClass, util
from metaflow import util, JSONTypeClass
from metaflow._vendor import click
from metaflow.exception import METAFLOW_EXIT_DISALLOW_RETRY, CommandException
from metaflow.metadata.util import sync_local_metadata_from_datastore
Expand Down Expand Up @@ -91,12 +91,6 @@ def kubernetes():
type=JSONTypeClass(),
multiple=False,
)
@click.option(
"--labels",
multiple=True,
default=None,
help="Labels for Kubernetes pod.",
)
@click.pass_context
def step(
ctx,
Expand All @@ -116,7 +110,6 @@ def step(
gpu_vendor=None,
run_time_limit=None,
tolerations=None,
labels=None,
**kwargs
):
def echo(msg, stream="stderr", job_id=None, **kwargs):
Expand Down Expand Up @@ -182,12 +175,7 @@ def echo(msg, stream="stderr", job_id=None, **kwargs):
stderr_location = ds.get_log_location(TASK_LOG_SOURCE, "stderr")

# `node_selector` is a tuple of strings, convert it to a dictionary
node_selector = KubernetesDecorator.parse_kube_keyvalue_list(node_selector)

# `labels` is a tuple of strings or a tuple with a single comma separated string
# convert it to a dict
labels = KubernetesDecorator.parse_kube_keyvalue_list(labels, False)
KubernetesDecorator.validate_kube_labels(labels)
node_selector = KubernetesDecorator.parse_node_selector(node_selector)

def _sync_metadata():
if ctx.obj.metadata.TYPE == "local":
Expand Down Expand Up @@ -230,7 +218,6 @@ def _sync_metadata():
run_time_limit=run_time_limit,
env=env,
tolerations=tolerations,
labels=labels,
)
except Exception as e:
traceback.print_exc(chain=False)
Expand Down
110 changes: 14 additions & 96 deletions metaflow/plugins/kubernetes/kubernetes_decorator.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import hashlib
import json
import os
import platform
import re
import sys
from typing import Dict, List, Optional, Union

from metaflow.decorators import StepDecorator
from metaflow.exception import MetaflowException
Expand All @@ -15,12 +12,11 @@
KUBERNETES_CONTAINER_IMAGE,
KUBERNETES_CONTAINER_REGISTRY,
KUBERNETES_GPU_VENDOR,
KUBERNETES_LABELS,
KUBERNETES_NAMESPACE,
KUBERNETES_NODE_SELECTOR,
KUBERNETES_TOLERATIONS,
KUBERNETES_SECRETS,
KUBERNETES_SERVICE_ACCOUNT,
KUBERNETES_SECRETS,
KUBERNETES_FETCH_EC2_METADATA,
)
from metaflow.plugins.resources_decorator import ResourcesDecorator
Expand Down Expand Up @@ -69,8 +65,6 @@ class KubernetesDecorator(StepDecorator):
in Metaflow configuration.
tolerations : List[str], default: METAFLOW_KUBERNETES_TOLERATIONS
Kubernetes tolerations to use when launching pod in Kubernetes.
labels : Dict[str, str], default: METAFLOW_KUBERNETES_LABELS
Kubernetes labels to use when launching pod in Kubernetes.
"""

name = "kubernetes"
Expand All @@ -82,7 +76,6 @@ class KubernetesDecorator(StepDecorator):
"service_account": None,
"secrets": None, # e.g., mysecret
"node_selector": None, # e.g., kubernetes.io/os=linux
"labels": None, # e.g., my_label=my_value
"namespace": None,
"gpu": None, # value of 0 implies that the scheduled node should not have GPUs
"gpu_vendor": None,
Expand All @@ -106,17 +99,9 @@ def __init__(self, attributes=None, statically_defined=False):
self.attributes["node_selector"] = KUBERNETES_NODE_SELECTOR
if not self.attributes["tolerations"] and KUBERNETES_TOLERATIONS:
self.attributes["tolerations"] = json.loads(KUBERNETES_TOLERATIONS)
if not self.attributes["labels"] and KUBERNETES_LABELS:
self.attributes["labels"] = KUBERNETES_LABELS

if isinstance(self.attributes["labels"], str):
self.attributes["labels"] = self.parse_kube_keyvalue_list(
self.attributes["labels"].split(","), False
)
self.validate_kube_labels(self.attributes["labels"])

if isinstance(self.attributes["node_selector"], str):
self.attributes["node_selector"] = self.parse_kube_keyvalue_list(
self.attributes["node_selector"] = self.parse_node_selector(
self.attributes["node_selector"].split(",")
)

Expand Down Expand Up @@ -295,11 +280,10 @@ def runtime_step_cli(
for k, v in self.attributes.items():
if k == "namespace":
cli_args.command_options["k8s_namespace"] = v
elif k in {"node_selector", "labels"} and v:
cli_args.command_options[k] = [
"=".join([key, str(val)]) if val else key
for key, val in v.items()
]
elif k == "node_selector" and v:
cli_args.command_options[k] = ",".join(
["=".join([key, str(val)]) for key, val in v.items()]
)
elif k == "tolerations":
cli_args.command_options[k] = json.dumps(v)
else:
Expand Down Expand Up @@ -407,80 +391,14 @@ def _save_package_once(cls, flow_datastore, package):
[package.blob], len_hint=1
)[0]

@classmethod
def _parse_decorator_spec(cls, deco_spec: str):
if not deco_spec:
return cls()

valid_options = "|".join(cls.defaults.keys())
deco_spec_parts = []
for part in re.split(""",(?=[\s\w]+[{}]=)""".format(valid_options), deco_spec):
name, val = part.split("=", 1)
if name in {"labels", "node_selector"}:
try:
tmp_vals = json.loads(val.strip().replace('\\"', '"'))
for val_i in tmp_vals.values():
if not (val_i is None or isinstance(val_i, str)):
raise KubernetesException(
"All values must be string or null."
)
except json.JSONDecodeError:
if val.startswith("{"):
raise KubernetesException(
"Malform json detected in %s" % str(val)
)
both = name == "node_selector"
val = json.dumps(
cls.parse_kube_keyvalue_list(val.split(","), both),
separators=(",", ":"),
)
deco_spec_parts.append("=".join([name, val]))
deco_spec_parsed = ",".join(deco_spec_parts)
return super()._parse_decorator_spec(deco_spec_parsed)

@staticmethod
def parse_kube_keyvalue_list(items: List[str], requires_both: bool = True):
def parse_node_selector(node_selector: list):
try:
ret = {}
for item_str in items:
item = item_str.split("=", 1)
if requires_both:
item[1] # raise IndexError
if str(item[0]) in ret:
raise KubernetesException("Duplicate key found: %s" % str(item[0]))
ret[str(item[0])] = str(item[1]) if len(item) > 1 else None
return ret
except KubernetesException as e:
raise e
return {
str(k.split("=", 1)[0]): str(k.split("=", 1)[1])
for k in node_selector or []
}
except (AttributeError, IndexError):
raise KubernetesException("Unable to parse kubernetes list: %s" % items)

@staticmethod
def validate_kube_labels(
labels: Optional[Dict[str, Optional[str]]],
) -> bool:
"""Validate label values.
This validates the kubernetes label values. It does not validate the keys.
Ideally, keys should be static and also the validation rules for keys are
more complex than those for values. For full validation rules, see:
https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set
"""

def validate_label(s: Optional[str]):
regex_match = r"^(([A-Za-z0-9][-A-Za-z0-9_.]{0,61})?[A-Za-z0-9])?$"
if not s:
# allow empty label
return True
if not re.search(regex_match, s):
raise KubernetesException(
'Invalid value: "%s"\n'
"A valid label must be an empty string or one that\n"
" - Consist of alphanumeric, '-', '_' or '.' characters\n"
" - Begins and ends with an alphanumeric character\n"
" - Is at most 63 characters" % s
)
return True

return all([validate_label(v) for v in labels.values()]) if labels else True
raise KubernetesException(
"Unable to parse node_selector: %s" % node_selector
)
94 changes: 0 additions & 94 deletions test/unit/test_kubernetes_decorator.py

This file was deleted.

0 comments on commit df98332

Please sign in to comment.