Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Record logger name as the instrumentation scope name #4208

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#4206](https:/open-telemetry/opentelemetry-python/pull/4206))
- Update environment variable descriptions to match signal
([#4222](https:/open-telemetry/opentelemetry-python/pull/4222))
- Record logger name as the instrumentation scope name
([#4208](https:/open-telemetry/opentelemetry-python/pull/4208))

## Version 1.27.0/0.48b0 (2024-08-28)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import logging

import pytest

from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import (
InMemoryLogExporter,
SimpleLogRecordProcessor,
)


def _set_up_logging_handler(level):
logger_provider = LoggerProvider()
exporter = InMemoryLogExporter()
processor = SimpleLogRecordProcessor(exporter=exporter)
logger_provider.add_log_record_processor(processor)
handler = LoggingHandler(level=level, logger_provider=logger_provider)
return handler


def _create_logger(handler, name):
logger = logging.getLogger(name)
logger.addHandler(handler)
return logger


@pytest.mark.parametrize("num_loggers", [1, 10, 100, 1000])
def test_simple_get_logger_different_names(benchmark, num_loggers):
handler = _set_up_logging_handler(level=logging.DEBUG)
loggers = [
_create_logger(handler, str(f"logger_{i}")) for i in range(num_loggers)
]

def benchmark_get_logger():
for index in range(1000):
loggers[index % num_loggers].warning("test message")

benchmark(benchmark_get_logger)
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import traceback
import warnings
from os import environ
from threading import Lock
from time import time_ns
from typing import Any, Callable, Optional, Tuple, Union # noqa

Expand Down Expand Up @@ -471,9 +472,6 @@ def __init__(
) -> None:
super().__init__(level=level)
self._logger_provider = logger_provider or get_logger_provider()
self._logger = get_logger(
__name__, logger_provider=self._logger_provider
)

@staticmethod
def _get_attributes(record: logging.LogRecord) -> Attributes:
Expand Down Expand Up @@ -558,6 +556,7 @@ def _translate(self, record: logging.LogRecord) -> LogRecord:
"WARN" if record.levelname == "WARNING" else record.levelname
)

logger = get_logger(record.name, logger_provider=self._logger_provider)
return LogRecord(
timestamp=timestamp,
observed_timestamp=observered_timestamp,
Expand All @@ -567,7 +566,7 @@ def _translate(self, record: logging.LogRecord) -> LogRecord:
severity_text=level_name,
severity_number=severity_number,
body=body,
resource=self._logger.resource,
resource=logger.resource,
attributes=attributes,
)

Expand All @@ -577,14 +576,17 @@ def emit(self, record: logging.LogRecord) -> None:

The record is translated to OTel format, and then sent across the pipeline.
"""
if not isinstance(self._logger, NoOpLogger):
self._logger.emit(self._translate(record))
logger = get_logger(record.name, logger_provider=self._logger_provider)
aabmass marked this conversation as resolved.
Show resolved Hide resolved
if not isinstance(logger, NoOpLogger):
logger.emit(self._translate(record))

def flush(self) -> None:
"""
Flushes the logging output. Skip flushing if logger is NoOp.
Flushes the logging output. Skip flushing if logging_provider has no force_flush method.
"""
if not isinstance(self._logger, NoOpLogger):
if hasattr(self._logger_provider, "force_flush") and callable(
self._logger_provider.force_flush
):
self._logger_provider.force_flush()


Expand Down Expand Up @@ -642,26 +644,20 @@ def __init__(
self._at_exit_handler = None
if shutdown_on_exit:
self._at_exit_handler = atexit.register(self.shutdown)
self._logger_cache = {}
self._logger_cache_lock = Lock()

@property
def resource(self):
return self._resource

def get_logger(
def _get_logger_no_cache(
emdneto marked this conversation as resolved.
Show resolved Hide resolved
self,
name: str,
version: Optional[str] = None,
schema_url: Optional[str] = None,
attributes: Optional[Attributes] = None,
) -> Logger:
if self._disabled:
_logger.warning("SDK is disabled.")
return NoOpLogger(
name,
version=version,
schema_url=schema_url,
attributes=attributes,
)
return Logger(
self._resource,
self._multi_log_record_processor,
Expand All @@ -673,6 +669,41 @@ def get_logger(
),
)

def _get_logger_cached(
self,
name: str,
version: Optional[str] = None,
schema_url: Optional[str] = None,
) -> Logger:
with self._logger_cache_lock:
key = (name, version, schema_url)
if key in self._logger_cache:
return self._logger_cache[key]

self._logger_cache[key] = self._get_logger_no_cache(
name, version, schema_url
)
return self._logger_cache[key]

def get_logger(
self,
name: str,
version: Optional[str] = None,
schema_url: Optional[str] = None,
attributes: Optional[Attributes] = None,
) -> Logger:
if self._disabled:
_logger.warning("SDK is disabled.")
return NoOpLogger(
name,
version=version,
schema_url=schema_url,
attributes=attributes,
)
if attributes is None:
return self._get_logger_cached(name, version, schema_url)
return self._get_logger_no_cache(name, version, schema_url, attributes)

def add_log_record_processor(
self, log_record_processor: LogRecordProcessor
):
Expand Down
32 changes: 32 additions & 0 deletions opentelemetry-sdk/tests/logs/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ def test_simple_log_record_processor_default_level(self):
self.assertEqual(
warning_log_record.severity_number, SeverityNumber.WARN
)
self.assertEqual(
finished_logs[0].instrumentation_scope.name, "default_level"
)

def test_simple_log_record_processor_custom_level(self):
exporter = InMemoryLogExporter()
Expand Down Expand Up @@ -104,6 +107,12 @@ def test_simple_log_record_processor_custom_level(self):
self.assertEqual(
fatal_log_record.severity_number, SeverityNumber.FATAL
)
self.assertEqual(
finished_logs[0].instrumentation_scope.name, "custom_level"
)
self.assertEqual(
finished_logs[1].instrumentation_scope.name, "custom_level"
)

def test_simple_log_record_processor_trace_correlation(self):
exporter = InMemoryLogExporter()
Expand All @@ -129,6 +138,9 @@ def test_simple_log_record_processor_trace_correlation(self):
self.assertEqual(
log_record.trace_flags, INVALID_SPAN_CONTEXT.trace_flags
)
self.assertEqual(
finished_logs[0].instrumentation_scope.name, "trace_correlation"
)
exporter.clear()

tracer = trace.TracerProvider().get_tracer(__name__)
Expand All @@ -140,6 +152,10 @@ def test_simple_log_record_processor_trace_correlation(self):
self.assertEqual(log_record.body, "Critical message within span")
self.assertEqual(log_record.severity_text, "CRITICAL")
self.assertEqual(log_record.severity_number, SeverityNumber.FATAL)
self.assertEqual(
finished_logs[0].instrumentation_scope.name,
"trace_correlation",
)
span_context = span.get_span_context()
self.assertEqual(log_record.trace_id, span_context.trace_id)
self.assertEqual(log_record.span_id, span_context.span_id)
Expand All @@ -166,6 +182,9 @@ def test_simple_log_record_processor_shutdown(self):
self.assertEqual(
warning_log_record.severity_number, SeverityNumber.WARN
)
self.assertEqual(
finished_logs[0].instrumentation_scope.name, "shutdown"
)
exporter.clear()
logger_provider.shutdown()
with self.assertLogs(level=logging.WARNING):
Expand Down Expand Up @@ -206,6 +225,10 @@ def test_simple_log_record_processor_different_msg_types(self):
for item in finished_logs
]
self.assertEqual(expected, emitted)
for item in finished_logs:
self.assertEqual(
item.instrumentation_scope.name, "different_msg_types"
)

def test_simple_log_record_processor_different_msg_types_with_formatter(
self,
Expand Down Expand Up @@ -428,6 +451,8 @@ def test_shutdown(self):
for item in finished_logs
]
self.assertEqual(expected, emitted)
for item in finished_logs:
self.assertEqual(item.instrumentation_scope.name, "shutdown")

def test_force_flush(self):
exporter = InMemoryLogExporter()
Expand All @@ -447,6 +472,9 @@ def test_force_flush(self):
log_record = finished_logs[0].log_record
self.assertEqual(log_record.body, "Earth is burning")
self.assertEqual(log_record.severity_number, SeverityNumber.FATAL)
self.assertEqual(
finished_logs[0].instrumentation_scope.name, "force_flush"
)

def test_log_record_processor_too_many_logs(self):
exporter = InMemoryLogExporter()
Expand All @@ -465,6 +493,8 @@ def test_log_record_processor_too_many_logs(self):
self.assertTrue(log_record_processor.force_flush())
finised_logs = exporter.get_finished_logs()
self.assertEqual(len(finised_logs), 1000)
for item in finised_logs:
self.assertEqual(item.instrumentation_scope.name, "many_logs")

def test_with_multiple_threads(self):
exporter = InMemoryLogExporter()
Expand Down Expand Up @@ -492,6 +522,8 @@ def bulk_log_and_flush(num_logs):

finished_logs = exporter.get_finished_logs()
self.assertEqual(len(finished_logs), 2415)
for item in finished_logs:
self.assertEqual(item.instrumentation_scope.name, "threads")

@unittest.skipUnless(
hasattr(os, "fork"),
Expand Down
111 changes: 111 additions & 0 deletions opentelemetry-sdk/tests/logs/test_logger_provider_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import logging
import unittest

from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import (
InMemoryLogExporter,
SimpleLogRecordProcessor,
)


def set_up_logging_handler(level):
logger_provider = LoggerProvider()
exporter = InMemoryLogExporter()
processor = SimpleLogRecordProcessor(exporter=exporter)
logger_provider.add_log_record_processor(processor)
handler = LoggingHandler(level=level, logger_provider=logger_provider)
return handler, logger_provider


def create_logger(handler, name):
logger = logging.getLogger(name)
logger.addHandler(handler)
return logger


class TestLoggerProviderCache(unittest.TestCase):

def test_get_logger_single_handler(self):
handler, logger_provider = set_up_logging_handler(level=logging.DEBUG)
# pylint: disable=protected-access
logger_cache = logger_provider._logger_cache
logger = create_logger(handler, "test_logger")

# Ensure logger is lazily cached
self.assertEqual(0, len(logger_cache))

with self.assertLogs(level=logging.WARNING):
logger.warning("test message")

self.assertEqual(1, len(logger_cache))

# Ensure only one logger is cached
with self.assertLogs(level=logging.WARNING):
rounds = 100
for _ in range(rounds):
logger.warning("test message")

self.assertEqual(1, len(logger_cache))

def test_get_logger_multiple_loggers(self):
handler, logger_provider = set_up_logging_handler(level=logging.DEBUG)
# pylint: disable=protected-access
logger_cache = logger_provider._logger_cache

num_loggers = 10
loggers = [create_logger(handler, str(i)) for i in range(num_loggers)]

# Ensure loggers are lazily cached
self.assertEqual(0, len(logger_cache))

with self.assertLogs(level=logging.WARNING):
for logger in loggers:
logger.warning("test message")

self.assertEqual(num_loggers, len(logger_cache))

with self.assertLogs(level=logging.WARNING):
rounds = 100
for _ in range(rounds):
for logger in loggers:
logger.warning("test message")

self.assertEqual(num_loggers, len(logger_cache))

def test_provider_get_logger_no_cache(self):
_, logger_provider = set_up_logging_handler(level=logging.DEBUG)
# pylint: disable=protected-access
logger_cache = logger_provider._logger_cache

logger_provider.get_logger(
name="test_logger",
version="version",
schema_url="schema_url",
attributes={"key": "value"},
)

# Ensure logger is not cached if attributes is set
self.assertEqual(0, len(logger_cache))

def test_provider_get_logger_cached(self):
_, logger_provider = set_up_logging_handler(level=logging.DEBUG)
# pylint: disable=protected-access
logger_cache = logger_provider._logger_cache

logger_provider.get_logger(
name="test_logger",
version="version",
schema_url="schema_url",
)

# Ensure only one logger is cached
self.assertEqual(1, len(logger_cache))

logger_provider.get_logger(
name="test_logger",
version="version",
schema_url="schema_url",
)

# Ensure only one logger is cached
self.assertEqual(1, len(logger_cache))