From 86dba1706a0093c0e953b206b5451efd96367c91 Mon Sep 17 00:00:00 2001 From: Ofek Lev Date: Thu, 13 May 2021 16:52:49 -0400 Subject: [PATCH] Add runtime configuration validation (#8942) * Sync config models * fix --- kafka_consumer/assets/configuration/spec.yaml | 24 ++-- .../kafka_consumer/config_models/__init__.py | 18 +++ .../kafka_consumer/config_models/defaults.py | 116 ++++++++++++++++++ .../kafka_consumer/config_models/instance.py | 67 ++++++++++ .../kafka_consumer/config_models/shared.py | 44 +++++++ .../config_models/validators.py | 3 + .../kafka_consumer/data/conf.yaml.example | 15 ++- 7 files changed, 277 insertions(+), 10 deletions(-) create mode 100644 kafka_consumer/datadog_checks/kafka_consumer/config_models/__init__.py create mode 100644 kafka_consumer/datadog_checks/kafka_consumer/config_models/defaults.py create mode 100644 kafka_consumer/datadog_checks/kafka_consumer/config_models/instance.py create mode 100644 kafka_consumer/datadog_checks/kafka_consumer/config_models/shared.py create mode 100644 kafka_consumer/datadog_checks/kafka_consumer/config_models/validators.py diff --git a/kafka_consumer/assets/configuration/spec.yaml b/kafka_consumer/assets/configuration/spec.yaml index 20db5865bbbb4..996ebf5b561a8 100644 --- a/kafka_consumer/assets/configuration/spec.yaml +++ b/kafka_consumer/assets/configuration/spec.yaml @@ -41,13 +41,23 @@ files: only generate a single check process, but if one host goes down, KafkaClient tries contacting the next host. Details: https://github.com/DataDog/dd-agent/issues/2943 - required: True - value: - type: array - items: - type: string - example: - - localhost:9092 + + You may specify a single server like: + + kafka_connect_str: server:9092 + + or multiple servers like: + + kafka_connect_str: + - server1:9092 + - server2:9092 + required: true + value: + anyOf: + - type: string + - type: array + items: + type: string - name: kafka_client_api_version description: | Specify the highest client protocol version supported by all brokers in the cluster. diff --git a/kafka_consumer/datadog_checks/kafka_consumer/config_models/__init__.py b/kafka_consumer/datadog_checks/kafka_consumer/config_models/__init__.py new file mode 100644 index 0000000000000..ba42dbdc7ffb0 --- /dev/null +++ b/kafka_consumer/datadog_checks/kafka_consumer/config_models/__init__.py @@ -0,0 +1,18 @@ +# (C) Datadog, Inc. 2021-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +from .instance import InstanceConfig +from .shared import SharedConfig + + +class ConfigMixin: + _config_model_instance: InstanceConfig + _config_model_shared: SharedConfig + + @property + def config(self) -> InstanceConfig: + return self._config_model_instance + + @property + def shared_config(self) -> SharedConfig: + return self._config_model_shared diff --git a/kafka_consumer/datadog_checks/kafka_consumer/config_models/defaults.py b/kafka_consumer/datadog_checks/kafka_consumer/config_models/defaults.py new file mode 100644 index 0000000000000..a3335b4446b87 --- /dev/null +++ b/kafka_consumer/datadog_checks/kafka_consumer/config_models/defaults.py @@ -0,0 +1,116 @@ +# (C) Datadog, Inc. 2021-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +from datadog_checks.base.utils.models.fields import get_default_field_value + + +def shared_kafka_timeout(field, value): + return 5 + + +def shared_service(field, value): + return get_default_field_value(field, value) + + +def shared_zk_timeout(field, value): + return 5 + + +def instance_broker_requests_batch_size(field, value): + return 30 + + +def instance_consumer_groups(field, value): + return get_default_field_value(field, value) + + +def instance_empty_default_hostname(field, value): + return False + + +def instance_kafka_client_api_version(field, value): + return '2.3.0' + + +def instance_kafka_consumer_offsets(field, value): + return False + + +def instance_min_collection_interval(field, value): + return 15 + + +def instance_monitor_all_broker_highwatermarks(field, value): + return False + + +def instance_monitor_unlisted_consumer_groups(field, value): + return False + + +def instance_sasl_kerberos_domain_name(field, value): + return 'localhost' + + +def instance_sasl_kerberos_service_name(field, value): + return 'kafka' + + +def instance_sasl_mechanism(field, value): + return 'PLAIN' + + +def instance_sasl_plain_password(field, value): + return get_default_field_value(field, value) + + +def instance_sasl_plain_username(field, value): + return get_default_field_value(field, value) + + +def instance_security_protocol(field, value): + return 'PLAINTEXT' + + +def instance_service(field, value): + return get_default_field_value(field, value) + + +def instance_ssl_cafile(field, value): + return get_default_field_value(field, value) + + +def instance_ssl_certfile(field, value): + return get_default_field_value(field, value) + + +def instance_ssl_check_hostname(field, value): + return True + + +def instance_ssl_context(field, value): + return get_default_field_value(field, value) + + +def instance_ssl_crlfile(field, value): + return get_default_field_value(field, value) + + +def instance_ssl_keyfile(field, value): + return get_default_field_value(field, value) + + +def instance_ssl_password(field, value): + return get_default_field_value(field, value) + + +def instance_tags(field, value): + return get_default_field_value(field, value) + + +def instance_zk_connect_str(field, value): + return get_default_field_value(field, value) + + +def instance_zk_prefix(field, value): + return get_default_field_value(field, value) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/config_models/instance.py b/kafka_consumer/datadog_checks/kafka_consumer/config_models/instance.py new file mode 100644 index 0000000000000..3001049eb6536 --- /dev/null +++ b/kafka_consumer/datadog_checks/kafka_consumer/config_models/instance.py @@ -0,0 +1,67 @@ +# (C) Datadog, Inc. 2021-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +from __future__ import annotations + +from typing import Any, Mapping, Optional, Sequence, Union + +from pydantic import BaseModel, root_validator, validator + +from datadog_checks.base.utils.functions import identity +from datadog_checks.base.utils.models import validation + +from . import defaults, validators + + +class InstanceConfig(BaseModel): + class Config: + allow_mutation = False + + broker_requests_batch_size: Optional[int] + consumer_groups: Optional[Mapping[str, Any]] + empty_default_hostname: Optional[bool] + kafka_client_api_version: Optional[str] + kafka_connect_str: Union[str, Sequence[str]] + kafka_consumer_offsets: Optional[bool] + min_collection_interval: Optional[float] + monitor_all_broker_highwatermarks: Optional[bool] + monitor_unlisted_consumer_groups: Optional[bool] + sasl_kerberos_domain_name: Optional[str] + sasl_kerberos_service_name: Optional[str] + sasl_mechanism: Optional[str] + sasl_plain_password: Optional[str] + sasl_plain_username: Optional[str] + security_protocol: Optional[str] + service: Optional[str] + ssl_cafile: Optional[str] + ssl_certfile: Optional[str] + ssl_check_hostname: Optional[bool] + ssl_context: Optional[str] + ssl_crlfile: Optional[str] + ssl_keyfile: Optional[str] + ssl_password: Optional[str] + tags: Optional[Sequence[str]] + zk_connect_str: Optional[Sequence[Mapping[str, Any]]] + zk_prefix: Optional[str] + + @root_validator(pre=True) + def _initial_validation(cls, values): + return validation.core.initialize_config(getattr(validators, 'initialize_instance', identity)(values)) + + @validator('*', pre=True, always=True) + def _ensure_defaults(cls, v, field): + if v is not None or field.required: + return v + + return getattr(defaults, f'instance_{field.name}')(field, v) + + @validator('*') + def _run_validations(cls, v, field): + if not v: + return v + + return getattr(validators, f'instance_{field.name}', identity)(v, field=field) + + @root_validator(pre=False) + def _final_validation(cls, values): + return validation.core.finalize_config(getattr(validators, 'finalize_instance', identity)(values)) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/config_models/shared.py b/kafka_consumer/datadog_checks/kafka_consumer/config_models/shared.py new file mode 100644 index 0000000000000..b00990caa570a --- /dev/null +++ b/kafka_consumer/datadog_checks/kafka_consumer/config_models/shared.py @@ -0,0 +1,44 @@ +# (C) Datadog, Inc. 2021-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +from __future__ import annotations + +from typing import Optional + +from pydantic import BaseModel, root_validator, validator + +from datadog_checks.base.utils.functions import identity +from datadog_checks.base.utils.models import validation + +from . import defaults, validators + + +class SharedConfig(BaseModel): + class Config: + allow_mutation = False + + kafka_timeout: Optional[int] + service: Optional[str] + zk_timeout: Optional[int] + + @root_validator(pre=True) + def _initial_validation(cls, values): + return validation.core.initialize_config(getattr(validators, 'initialize_shared', identity)(values)) + + @validator('*', pre=True, always=True) + def _ensure_defaults(cls, v, field): + if v is not None or field.required: + return v + + return getattr(defaults, f'shared_{field.name}')(field, v) + + @validator('*') + def _run_validations(cls, v, field): + if not v: + return v + + return getattr(validators, f'shared_{field.name}', identity)(v, field=field) + + @root_validator(pre=False) + def _final_validation(cls, values): + return validation.core.finalize_config(getattr(validators, 'finalize_shared', identity)(values)) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/config_models/validators.py b/kafka_consumer/datadog_checks/kafka_consumer/config_models/validators.py new file mode 100644 index 0000000000000..9d0b0155542cb --- /dev/null +++ b/kafka_consumer/datadog_checks/kafka_consumer/config_models/validators.py @@ -0,0 +1,3 @@ +# (C) Datadog, Inc. 2021-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/data/conf.yaml.example b/kafka_consumer/datadog_checks/kafka_consumer/data/conf.yaml.example index a6a6b1fd33c1f..7f3b5feaf7b12 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/data/conf.yaml.example +++ b/kafka_consumer/datadog_checks/kafka_consumer/data/conf.yaml.example @@ -32,7 +32,7 @@ init_config: # instances: - ## @param kafka_connect_str - list of strings - required + ## @param kafka_connect_str - string or list of strings - required ## Kafka endpoints and port to connect to. ## ## In a production environment, it's often useful to specify multiple @@ -40,9 +40,18 @@ instances: ## only generate a single check process, but if one host goes down, ## KafkaClient tries contacting the next host. ## Details: https://github.com/DataDog/dd-agent/issues/2943 + ## + ## You may specify a single server like: + ## + ## kafka_connect_str: server:9092 + ## + ## or multiple servers like: + ## + ## kafka_connect_str: + ## - server1:9092 + ## - server2:9092 # - - kafka_connect_str: - - localhost:9092 + - kafka_connect_str: ## @param kafka_client_api_version - string - optional ## Specify the highest client protocol version supported by all brokers in the cluster.