Skip to content

Commit

Permalink
Add runtime configuration validation (#8942)
Browse files Browse the repository at this point in the history
* Sync config models

* fix
  • Loading branch information
ofek authored May 13, 2021
1 parent 3b3b3e0 commit 86dba17
Show file tree
Hide file tree
Showing 7 changed files with 277 additions and 10 deletions.
24 changes: 17 additions & 7 deletions kafka_consumer/assets/configuration/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,23 @@ files:
only generate a single check process, but if one host goes down,
KafkaClient tries contacting the next host.
Details: https:/DataDog/dd-agent/issues/2943
required: True
value:
type: array
items:
type: string
example:
- localhost:9092
You may specify a single server like:
kafka_connect_str: server:9092
or multiple servers like:
kafka_connect_str:
- server1:9092
- server2:9092
required: true
value:
anyOf:
- type: string
- type: array
items:
type: string
- name: kafka_client_api_version
description: |
Specify the highest client protocol version supported by all brokers in the cluster.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# (C) Datadog, Inc. 2021-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
from .instance import InstanceConfig
from .shared import SharedConfig


class ConfigMixin:
_config_model_instance: InstanceConfig
_config_model_shared: SharedConfig

@property
def config(self) -> InstanceConfig:
return self._config_model_instance

@property
def shared_config(self) -> SharedConfig:
return self._config_model_shared
116 changes: 116 additions & 0 deletions kafka_consumer/datadog_checks/kafka_consumer/config_models/defaults.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# (C) Datadog, Inc. 2021-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
from datadog_checks.base.utils.models.fields import get_default_field_value


def shared_kafka_timeout(field, value):
return 5


def shared_service(field, value):
return get_default_field_value(field, value)


def shared_zk_timeout(field, value):
return 5


def instance_broker_requests_batch_size(field, value):
return 30


def instance_consumer_groups(field, value):
return get_default_field_value(field, value)


def instance_empty_default_hostname(field, value):
return False


def instance_kafka_client_api_version(field, value):
return '2.3.0'


def instance_kafka_consumer_offsets(field, value):
return False


def instance_min_collection_interval(field, value):
return 15


def instance_monitor_all_broker_highwatermarks(field, value):
return False


def instance_monitor_unlisted_consumer_groups(field, value):
return False


def instance_sasl_kerberos_domain_name(field, value):
return 'localhost'


def instance_sasl_kerberos_service_name(field, value):
return 'kafka'


def instance_sasl_mechanism(field, value):
return 'PLAIN'


def instance_sasl_plain_password(field, value):
return get_default_field_value(field, value)


def instance_sasl_plain_username(field, value):
return get_default_field_value(field, value)


def instance_security_protocol(field, value):
return 'PLAINTEXT'


def instance_service(field, value):
return get_default_field_value(field, value)


def instance_ssl_cafile(field, value):
return get_default_field_value(field, value)


def instance_ssl_certfile(field, value):
return get_default_field_value(field, value)


def instance_ssl_check_hostname(field, value):
return True


def instance_ssl_context(field, value):
return get_default_field_value(field, value)


def instance_ssl_crlfile(field, value):
return get_default_field_value(field, value)


def instance_ssl_keyfile(field, value):
return get_default_field_value(field, value)


def instance_ssl_password(field, value):
return get_default_field_value(field, value)


def instance_tags(field, value):
return get_default_field_value(field, value)


def instance_zk_connect_str(field, value):
return get_default_field_value(field, value)


def instance_zk_prefix(field, value):
return get_default_field_value(field, value)
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# (C) Datadog, Inc. 2021-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
from __future__ import annotations

from typing import Any, Mapping, Optional, Sequence, Union

from pydantic import BaseModel, root_validator, validator

from datadog_checks.base.utils.functions import identity
from datadog_checks.base.utils.models import validation

from . import defaults, validators


class InstanceConfig(BaseModel):
class Config:
allow_mutation = False

broker_requests_batch_size: Optional[int]
consumer_groups: Optional[Mapping[str, Any]]
empty_default_hostname: Optional[bool]
kafka_client_api_version: Optional[str]
kafka_connect_str: Union[str, Sequence[str]]
kafka_consumer_offsets: Optional[bool]
min_collection_interval: Optional[float]
monitor_all_broker_highwatermarks: Optional[bool]
monitor_unlisted_consumer_groups: Optional[bool]
sasl_kerberos_domain_name: Optional[str]
sasl_kerberos_service_name: Optional[str]
sasl_mechanism: Optional[str]
sasl_plain_password: Optional[str]
sasl_plain_username: Optional[str]
security_protocol: Optional[str]
service: Optional[str]
ssl_cafile: Optional[str]
ssl_certfile: Optional[str]
ssl_check_hostname: Optional[bool]
ssl_context: Optional[str]
ssl_crlfile: Optional[str]
ssl_keyfile: Optional[str]
ssl_password: Optional[str]
tags: Optional[Sequence[str]]
zk_connect_str: Optional[Sequence[Mapping[str, Any]]]
zk_prefix: Optional[str]

@root_validator(pre=True)
def _initial_validation(cls, values):
return validation.core.initialize_config(getattr(validators, 'initialize_instance', identity)(values))

@validator('*', pre=True, always=True)
def _ensure_defaults(cls, v, field):
if v is not None or field.required:
return v

return getattr(defaults, f'instance_{field.name}')(field, v)

@validator('*')
def _run_validations(cls, v, field):
if not v:
return v

return getattr(validators, f'instance_{field.name}', identity)(v, field=field)

@root_validator(pre=False)
def _final_validation(cls, values):
return validation.core.finalize_config(getattr(validators, 'finalize_instance', identity)(values))
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# (C) Datadog, Inc. 2021-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
from __future__ import annotations

from typing import Optional

from pydantic import BaseModel, root_validator, validator

from datadog_checks.base.utils.functions import identity
from datadog_checks.base.utils.models import validation

from . import defaults, validators


class SharedConfig(BaseModel):
class Config:
allow_mutation = False

kafka_timeout: Optional[int]
service: Optional[str]
zk_timeout: Optional[int]

@root_validator(pre=True)
def _initial_validation(cls, values):
return validation.core.initialize_config(getattr(validators, 'initialize_shared', identity)(values))

@validator('*', pre=True, always=True)
def _ensure_defaults(cls, v, field):
if v is not None or field.required:
return v

return getattr(defaults, f'shared_{field.name}')(field, v)

@validator('*')
def _run_validations(cls, v, field):
if not v:
return v

return getattr(validators, f'shared_{field.name}', identity)(v, field=field)

@root_validator(pre=False)
def _final_validation(cls, values):
return validation.core.finalize_config(getattr(validators, 'finalize_shared', identity)(values))
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# (C) Datadog, Inc. 2021-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,26 @@ init_config:
#
instances:

## @param kafka_connect_str - list of strings - required
## @param kafka_connect_str - string or list of strings - required
## Kafka endpoints and port to connect to.
##
## In a production environment, it's often useful to specify multiple
## Kafka nodes for a single check instance. This way you
## only generate a single check process, but if one host goes down,
## KafkaClient tries contacting the next host.
## Details: https:/DataDog/dd-agent/issues/2943
##
## You may specify a single server like:
##
## kafka_connect_str: server:9092
##
## or multiple servers like:
##
## kafka_connect_str:
## - server1:9092
## - server2:9092
#
- kafka_connect_str:
- localhost:9092
- kafka_connect_str: <KAFKA_CONNECT_STR>

## @param kafka_client_api_version - string - optional
## Specify the highest client protocol version supported by all brokers in the cluster.
Expand Down

0 comments on commit 86dba17

Please sign in to comment.