diff --git a/CHANGELOG.md b/CHANGELOG.md index 82daa12a0b..a8f0d92f0d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - `opentelemetry-instrumentation-aws-lambda` Adds support for configurable flush timeout via `OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT` property. ([#825](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/825)) +- `opentelemetry-instrumentation-pika` Adds support for versions between `0.12.0` to `1.0.0`. ([#837](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/837)) ### Fixed diff --git a/instrumentation/README.md b/instrumentation/README.md index fb8a9638b5..9349836203 100644 --- a/instrumentation/README.md +++ b/instrumentation/README.md @@ -20,7 +20,7 @@ | [opentelemetry-instrumentation-jinja2](./opentelemetry-instrumentation-jinja2) | jinja2 >= 2.7, < 4.0 | | [opentelemetry-instrumentation-logging](./opentelemetry-instrumentation-logging) | logging | | [opentelemetry-instrumentation-mysql](./opentelemetry-instrumentation-mysql) | mysql-connector-python ~= 8.0 | -| [opentelemetry-instrumentation-pika](./opentelemetry-instrumentation-pika) | pika >= 1.1.0 | +| [opentelemetry-instrumentation-pika](./opentelemetry-instrumentation-pika) | pika >= 0.12.0 | | [opentelemetry-instrumentation-psycopg2](./opentelemetry-instrumentation-psycopg2) | psycopg2 >= 2.7.3.1 | | [opentelemetry-instrumentation-pymemcache](./opentelemetry-instrumentation-pymemcache) | pymemcache ~= 1.3 | | [opentelemetry-instrumentation-pymongo](./opentelemetry-instrumentation-pymongo) | pymongo ~= 3.1 | diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/package.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/package.py index 27ceebbac7..52c0863b58 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/package.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/package.py @@ -13,4 +13,4 @@ # limitations under the License. from typing import Collection -_instruments: Collection[str] = ("pika >= 1.1.0",) +_instruments: Collection[str] = ("pika >= 0.12.0",) diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py index 072ee56cfd..b09c3a0f9c 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py @@ -14,7 +14,9 @@ from logging import getLogger from typing import Any, Collection, Dict, Optional +import pika import wrapt +from packaging import version from pika.adapters import BlockingConnection from pika.adapters.blocking_connection import BlockingChannel @@ -32,7 +34,18 @@ _FUNCTIONS_TO_UNINSTRUMENT = ["basic_publish"] +def _consumer_callback_attribute_name() -> str: + pika_version = version.parse(pika.__version__) + return ( + "on_message_callback" + if pika_version >= version.parse("1.0.0") + else "consumer_cb" + ) + + class PikaInstrumentor(BaseInstrumentor): # type: ignore + CONSUMER_CALLBACK_ATTR = _consumer_callback_attribute_name() + # pylint: disable=attribute-defined-outside-init @staticmethod def _instrument_blocking_channel_consumers( @@ -41,8 +54,12 @@ def _instrument_blocking_channel_consumers( consume_hook: utils.HookT = utils.dummy_callback, ) -> Any: for consumer_tag, consumer_info in channel._consumer_infos.items(): + callback_attr = PikaInstrumentor.CONSUMER_CALLBACK_ATTR + consumer_callback = getattr(consumer_info, callback_attr, None) + if consumer_callback is None: + continue decorated_callback = utils._decorate_callback( - consumer_info.on_message_callback, + consumer_callback, tracer, consumer_tag, consume_hook, @@ -51,9 +68,9 @@ def _instrument_blocking_channel_consumers( setattr( decorated_callback, "_original_callback", - consumer_info.on_message_callback, + consumer_callback, ) - consumer_info.on_message_callback = decorated_callback + setattr(consumer_info, callback_attr, decorated_callback) @staticmethod def _instrument_basic_publish( @@ -126,10 +143,12 @@ def uninstrument_channel(channel: BlockingChannel) -> None: return for consumers_tag, client_info in channel._consumer_infos.items(): - if hasattr(client_info.on_message_callback, "_original_callback"): + callback_attr = PikaInstrumentor.CONSUMER_CALLBACK_ATTR + consumer_callback = getattr(client_info, callback_attr, None) + if hasattr(consumer_callback, "_original_callback"): channel._consumer_infos[ consumers_tag - ] = client_info.on_message_callback._original_callback + ] = consumer_callback._original_callback PikaInstrumentor._uninstrument_channel_functions(channel) def _decorate_channel_function( diff --git a/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py index 410ccf069f..6e154c04f9 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py @@ -18,6 +18,9 @@ from wrapt import BoundFunctionWrapper from opentelemetry.instrumentation.pika import PikaInstrumentor +from opentelemetry.instrumentation.pika.pika_instrumentor import ( + _consumer_callback_attribute_name, +) from opentelemetry.instrumentation.pika.utils import dummy_callback from opentelemetry.trace import Tracer @@ -26,7 +29,8 @@ class TestPika(TestCase): def setUp(self) -> None: self.channel = mock.MagicMock(spec=Channel) consumer_info = mock.MagicMock() - consumer_info.on_message_callback = mock.MagicMock() + callback_attr = PikaInstrumentor.CONSUMER_CALLBACK_ATTR + setattr(consumer_info, callback_attr, mock.MagicMock()) self.channel._consumer_infos = {"consumer-tag": consumer_info} self.mock_callback = mock.MagicMock() @@ -72,8 +76,11 @@ def test_instrument_consumers( self, decorate_callback: mock.MagicMock ) -> None: tracer = mock.MagicMock(spec=Tracer) + callback_attr = PikaInstrumentor.CONSUMER_CALLBACK_ATTR expected_decoration_calls = [ - mock.call(value.on_message_callback, tracer, key, dummy_callback) + mock.call( + getattr(value, callback_attr), tracer, key, dummy_callback + ) for key, value in self.channel._consumer_infos.items() ] PikaInstrumentor._instrument_blocking_channel_consumers( @@ -109,3 +116,13 @@ def test_uninstrument_channel_functions(self) -> None: self.channel.basic_publish._original_function = original_function PikaInstrumentor._uninstrument_channel_functions(self.channel) self.assertEqual(self.channel.basic_publish, original_function) + + def test_consumer_callback_attribute_name(self) -> None: + with mock.patch("pika.__version__", "1.0.0"): + self.assertEqual( + _consumer_callback_attribute_name(), "on_message_callback" + ) + with mock.patch("pika.__version__", "0.12.0"): + self.assertEqual( + _consumer_callback_attribute_name(), "consumer_cb" + ) diff --git a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py index 8e551cd106..9c74505d8d 100644 --- a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py +++ b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py @@ -81,7 +81,7 @@ "instrumentation": "opentelemetry-instrumentation-mysql==0.27b0", }, "pika": { - "library": "pika >= 1.1.0", + "library": "pika >= 0.12.0", "instrumentation": "opentelemetry-instrumentation-pika==0.27b0", }, "psycopg2": { diff --git a/tox.ini b/tox.ini index 97673d1a47..0f2ca1e949 100644 --- a/tox.ini +++ b/tox.ini @@ -182,8 +182,8 @@ envlist = pypy3-test-propagator-ot-trace ; opentelemetry-instrumentation-pika - py3{6,7,8,9,10}-test-instrumentation-pika - pypy3-test-instrumentation-pika + py3{6,7,8,9,10}-test-instrumentation-pika{0,1} + pypy3-test-instrumentation-pika{0,1} lint docker-tests @@ -216,6 +216,8 @@ deps = sqlalchemy11: sqlalchemy>=1.1,<1.2 sqlalchemy14: aiosqlite sqlalchemy14: sqlalchemy~=1.4 + pika0: pika>=0.12.0,<1.0.0 + pika1: pika>=1.0.0 ; FIXME: add coverage testing ; FIXME: add mypy testing @@ -249,7 +251,7 @@ changedir = test-instrumentation-jinja2: instrumentation/opentelemetry-instrumentation-jinja2/tests test-instrumentation-logging: instrumentation/opentelemetry-instrumentation-logging/tests test-instrumentation-mysql: instrumentation/opentelemetry-instrumentation-mysql/tests - test-instrumentation-pika: instrumentation/opentelemetry-instrumentation-pika/tests + test-instrumentation-pika{0,1}: instrumentation/opentelemetry-instrumentation-pika/tests test-instrumentation-psycopg2: instrumentation/opentelemetry-instrumentation-psycopg2/tests test-instrumentation-pymemcache: instrumentation/opentelemetry-instrumentation-pymemcache/tests test-instrumentation-pymongo: instrumentation/opentelemetry-instrumentation-pymongo/tests @@ -286,7 +288,7 @@ commands_pre = celery: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-celery[test] - pika: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-pika[test] + pika{0,1}: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-pika[test] grpc: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-grpc[test]