From 4e499159b69e5e13d07f8e3736a34db3c69ea24b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Mon, 4 May 2020 12:16:29 -0500 Subject: [PATCH 01/22] ext/docker-tests: Fix SQL docker tests The executemany test fails in some conditions because the argument used is a sequence and not a sequence of sequences as it should be: https://dev.mysql.com/doc/connector-python/en/connector-python-api-mysqlcursor-executemany.html https://pymysql.readthedocs.io/en/latest/modules/cursors.html#pymysql.cursors.Cursor.executemany https://www.psycopg.org/docs/cursor.html#cursor.executemany --- .../tests/mysql/test_mysql_functional.py | 2 +- .../tests/postgres/test_psycopg_functional.py | 2 +- .../tests/pymysql/test_pymysql_functional.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ext/opentelemetry-ext-docker-tests/tests/mysql/test_mysql_functional.py b/ext/opentelemetry-ext-docker-tests/tests/mysql/test_mysql_functional.py index d0261d2f63..46f63d3c66 100644 --- a/ext/opentelemetry-ext-docker-tests/tests/mysql/test_mysql_functional.py +++ b/ext/opentelemetry-ext-docker-tests/tests/mysql/test_mysql_functional.py @@ -82,7 +82,7 @@ def test_executemany(self): """Should create a child span for executemany """ with self._tracer.start_as_current_span("rootSpan"): - data = ["1", "2", "3"] + data = (("1",), ("2",), ("3",)) stmt = "INSERT INTO test (id) VALUES (%s)" self._cursor.executemany(stmt, data) self.validate_spans() diff --git a/ext/opentelemetry-ext-docker-tests/tests/postgres/test_psycopg_functional.py b/ext/opentelemetry-ext-docker-tests/tests/postgres/test_psycopg_functional.py index 0a43926145..e0537ad293 100644 --- a/ext/opentelemetry-ext-docker-tests/tests/postgres/test_psycopg_functional.py +++ b/ext/opentelemetry-ext-docker-tests/tests/postgres/test_psycopg_functional.py @@ -89,7 +89,7 @@ def test_executemany(self): """Should create a child span for executemany """ with self._tracer.start_as_current_span("rootSpan"): - data = ("1", "2", "3") + data = (("1",), ("2",), ("3",)) stmt = "INSERT INTO test (id) VALUES (%s)" self._cursor.executemany(stmt, data) self.validate_spans() diff --git a/ext/opentelemetry-ext-docker-tests/tests/pymysql/test_pymysql_functional.py b/ext/opentelemetry-ext-docker-tests/tests/pymysql/test_pymysql_functional.py index 06507c4f35..15ec6dccca 100644 --- a/ext/opentelemetry-ext-docker-tests/tests/pymysql/test_pymysql_functional.py +++ b/ext/opentelemetry-ext-docker-tests/tests/pymysql/test_pymysql_functional.py @@ -81,7 +81,7 @@ def test_executemany(self): """Should create a child span for executemany """ with self._tracer.start_as_current_span("rootSpan"): - data = ["1", "2", "3"] + data = (("1",), ("2",), ("3",)) stmt = "INSERT INTO test (id) VALUES (%s)" self._cursor.executemany(stmt, data) self.validate_spans() From 7094ec4d44b6a19007ca9d0204efabe779b47bed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Wed, 29 Apr 2020 07:52:12 -0500 Subject: [PATCH 02/22] Add Celery Instrumentation Port Celery instrumentation from DataDog donation: https://github.com/open-telemetry/opentelemetry-python-contrib/tree/5aee3ce32e418e1ce2ae46e303d1ca0630f3f836/reference/ddtrace/contrib/celery --- dev-requirements.txt | 1 + docs-requirements.txt | 1 + docs/ext/celery/celery.rst | 7 + ext/opentelemetry-ext-celery/CHANGELOG.md | 5 + ext/opentelemetry-ext-celery/LICENSE | 201 ++++++++ ext/opentelemetry-ext-celery/MANIFEST.in | 9 + ext/opentelemetry-ext-celery/README.rst | 24 + ext/opentelemetry-ext-celery/setup.cfg | 55 +++ ext/opentelemetry-ext-celery/setup.py | 26 + .../src/opentelemetry/ext/celery/__init__.py | 255 ++++++++++ .../src/opentelemetry/ext/celery/utils.py | 135 ++++++ .../src/opentelemetry/ext/celery/version.py | 15 + .../tests/__init__.py | 0 .../tests/test_utils.py | 200 ++++++++ .../tests/celery/test_celery_functional.py | 456 ++++++++++++++++++ tox.ini | 11 + 16 files changed, 1401 insertions(+) create mode 100644 docs/ext/celery/celery.rst create mode 100644 ext/opentelemetry-ext-celery/CHANGELOG.md create mode 100644 ext/opentelemetry-ext-celery/LICENSE create mode 100644 ext/opentelemetry-ext-celery/MANIFEST.in create mode 100644 ext/opentelemetry-ext-celery/README.rst create mode 100644 ext/opentelemetry-ext-celery/setup.cfg create mode 100644 ext/opentelemetry-ext-celery/setup.py create mode 100644 ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py create mode 100644 ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/utils.py create mode 100644 ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/version.py create mode 100644 ext/opentelemetry-ext-celery/tests/__init__.py create mode 100644 ext/opentelemetry-ext-celery/tests/test_utils.py create mode 100644 ext/opentelemetry-ext-docker-tests/tests/celery/test_celery_functional.py diff --git a/dev-requirements.txt b/dev-requirements.txt index 24da70defa..ec0ba9fcea 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -10,3 +10,4 @@ pytest!=5.2.3 pytest-cov>=2.8 readme-renderer~=24.0 httpretty~=1.0 +celery>=4.0 diff --git a/docs-requirements.txt b/docs-requirements.txt index f2d6ec2a57..80aff7be4d 100644 --- a/docs-requirements.txt +++ b/docs-requirements.txt @@ -14,3 +14,4 @@ pymongo~=3.1 redis>=2.6 thrift>=0.10.0 wrapt >=1.0.0,<2.0.0 +celery>=4.0 diff --git a/docs/ext/celery/celery.rst b/docs/ext/celery/celery.rst new file mode 100644 index 0000000000..b94093ae08 --- /dev/null +++ b/docs/ext/celery/celery.rst @@ -0,0 +1,7 @@ +OpenTelemetry Celery Integration +================================ + +.. automodule:: opentelemetry.ext.celery + :members: + :undoc-members: + :show-inheritance: diff --git a/ext/opentelemetry-ext-celery/CHANGELOG.md b/ext/opentelemetry-ext-celery/CHANGELOG.md new file mode 100644 index 0000000000..66c8fa4fd1 --- /dev/null +++ b/ext/opentelemetry-ext-celery/CHANGELOG.md @@ -0,0 +1,5 @@ +# Changelog + +## Unreleased + +- Implement Celery integration ([#648](https://github.com/open-telemetry/opentelemetry-python/pull/648)) diff --git a/ext/opentelemetry-ext-celery/LICENSE b/ext/opentelemetry-ext-celery/LICENSE new file mode 100644 index 0000000000..261eeb9e9f --- /dev/null +++ b/ext/opentelemetry-ext-celery/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/ext/opentelemetry-ext-celery/MANIFEST.in b/ext/opentelemetry-ext-celery/MANIFEST.in new file mode 100644 index 0000000000..aed3e33273 --- /dev/null +++ b/ext/opentelemetry-ext-celery/MANIFEST.in @@ -0,0 +1,9 @@ +graft src +graft tests +global-exclude *.pyc +global-exclude *.pyo +global-exclude __pycache__/* +include CHANGELOG.md +include MANIFEST.in +include README.rst +include LICENSE diff --git a/ext/opentelemetry-ext-celery/README.rst b/ext/opentelemetry-ext-celery/README.rst new file mode 100644 index 0000000000..3bc37be0d3 --- /dev/null +++ b/ext/opentelemetry-ext-celery/README.rst @@ -0,0 +1,24 @@ +OpenTelemetry Celery Integration +================================ + +|pypi| + +.. |pypi| image:: https://badge.fury.io/py/opentelemetry-ext-celery.svg + :target: https://pypi.org/project/opentelemetry-ext-celery/ + +Integration with Celery. + + +Installation +------------ + +:: + + pip install opentelemetry-ext-celery + + +References +---------- +* `OpenTelemetry Celery Integration `_ +* `OpenTelemetry Project `_ + diff --git a/ext/opentelemetry-ext-celery/setup.cfg b/ext/opentelemetry-ext-celery/setup.cfg new file mode 100644 index 0000000000..c36e20bc29 --- /dev/null +++ b/ext/opentelemetry-ext-celery/setup.cfg @@ -0,0 +1,55 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +[metadata] +name = opentelemetry-ext-celery +description = OpenTelemetry Celery integration +long_description = file: README.rst +long_description_content_type = text/x-rst +author = OpenTelemetry Authors +author_email = cncf-opentelemetry-contributors@lists.cncf.io +url = https://github.com/open-telemetry/opentelemetry-python/tree/master/ext/opentelemetry-ext-celery +platforms = any +license = Apache-2.0 +classifiers = + Development Status :: 4 - Beta + Intended Audience :: Developers + License :: OSI Approved :: Apache Software License + Programming Language :: Python + Programming Language :: Python :: 3 + Programming Language :: Python :: 3.4 + Programming Language :: Python :: 3.5 + Programming Language :: Python :: 3.6 + Programming Language :: Python :: 3.7 + Programming Language :: Python :: 3.8 + +[options] +python_requires = >=3.4 +package_dir= + =src +packages=find_namespace: +install_requires = + opentelemetry-api == 0.7.dev0 + celery ~= 4.0 + +[options.extras_require] +test = + opentelemetry-test == 0.7.dev0 + +[options.packages.find] +where = src + +[options.entry_points] +opentelemetry_instrumentor = + celery = opentelemetry.ext.celery:CeleryInstrumentor diff --git a/ext/opentelemetry-ext-celery/setup.py b/ext/opentelemetry-ext-celery/setup.py new file mode 100644 index 0000000000..40d1d7aaba --- /dev/null +++ b/ext/opentelemetry-ext-celery/setup.py @@ -0,0 +1,26 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os + +import setuptools + +BASE_DIR = os.path.dirname(__file__) +VERSION_FILENAME = os.path.join( + BASE_DIR, "src", "opentelemetry", "ext", "celery", "version.py" +) +PACKAGE_INFO = {} +with open(VERSION_FILENAME) as f: + exec(f.read(), PACKAGE_INFO) + +setuptools.setup(version=PACKAGE_INFO["__version__"]) diff --git a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py new file mode 100644 index 0000000000..5ff5dc9ce9 --- /dev/null +++ b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py @@ -0,0 +1,255 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Instrument `celery`_ to report Celery APP operations. + +There are two options for instrumenting code. The first option is to use the +``opentelemetry-auto-instrumentation`` executable which will automatically +instrument your Celery APP. The second is to programmatically enable +instrumentation via the following code: + +.. _celery: https://pypi.org/project/celery/ + +Usage +----- + +Be sure rabbitmq is running: + +.. code:: + + docker run -p 5672:5672 rabbitmq + +.. code:: python + + from opentelemetry import trace + from opentelemetry.sdk.trace import TracerProvider + + trace.set_tracer_provider(TracerProvider()) + # TODO: configure span exporters + + from celery import Celery + + app = Celery("tasks", broker="amqp://localhost") + + @app.task + def add(x, y): + return x + y + + add.delay(42, 50) + +API +--- +""" + +import logging + +import celery +from celery import signals + +from opentelemetry import trace +from opentelemetry.auto_instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.ext.celery.utils import ( + attach_span, + detach_span, + retrieve_span, + retrieve_task_id, + set_attributes_from_context, +) +from opentelemetry.ext.celery.version import __version__ +from opentelemetry.trace.status import Status, StatusCanonicalCode + +logger = logging.getLogger(__name__) + +# Span names +_PRODUCER_ROOT_SPAN = "celery.apply" +_WORKER_ROOT_SPAN = "celery.run" + +# Task operations +_TASK_TAG_KEY = "celery.action" +_TASK_APPLY = "apply" +_TASK_APPLY_ASYNC = "apply_async" +_TASK_RUN = "run" +_TASK_RETRY_REASON_KEY = "celery.retry.reason" +_TASK_NAME_KEY = "celery.task_name" + + +class CeleryInstrumentor(BaseInstrumentor): + # pylint: disable=unused-argument + def _instrument(self, **kwargs): + tracer_provider = kwargs.get("tracer_provider") + + # pylint: disable=attribute-defined-outside-init + self._tracer = trace.get_tracer(__name__, __version__, tracer_provider) + + signals.task_prerun.connect(self._trace_prerun, weak=False) + signals.task_postrun.connect(self._trace_postrun, weak=False) + signals.before_task_publish.connect( + self._trace_before_publish, weak=False + ) + signals.after_task_publish.connect( + self._trace_after_publish, weak=False + ) + signals.task_failure.connect(self._trace_failure, weak=False) + signals.task_retry.connect(self._trace_retry, weak=False) + + def _uninstrument(self, **kwargs): + signals.task_prerun.disconnect(self._trace_prerun) + signals.task_postrun.disconnect(self._trace_postrun) + signals.before_task_publish.disconnect(self._trace_before_publish) + signals.after_task_publish.disconnect(self._trace_after_publish) + signals.task_failure.disconnect(self._trace_failure) + signals.task_retry.disconnect(self._trace_retry) + + def _trace_prerun(self, *args, **kwargs): + task = kwargs.get("sender") + task_id = kwargs.get("task_id") + logger.debug("prerun signal start task_id=%s", task_id) + if task is None or task_id is None: + logger.debug( + "unable to extract the Task and the task_id. This version of Celery may not be supported." + ) + return + + span = self._tracer.start_span(_WORKER_ROOT_SPAN) + + activation = self._tracer.use_span(span, end_on_exit=True) + activation.__enter__() + attach_span(task, task_id, (span, activation)) + + @staticmethod + def _trace_postrun(*args, **kwargs): + task = kwargs.get("sender") + task_id = kwargs.get("task_id") + logger.debug("postrun signal task_id=%s", task_id) + if task is None or task_id is None: + logger.debug( + "unable to extract the Task and the task_id. This version of Celery may not be supported." + ) + return + + # retrieve and finish the Span + span, activation = retrieve_span(task, task_id) + if span is None: + logger.warning("no existing span found for task_id=%s", task_id) + return + + # request context tags + span.set_attribute(_TASK_TAG_KEY, _TASK_RUN) + set_attributes_from_context(span, kwargs) + set_attributes_from_context(span, task.request) + span.set_attribute(_TASK_NAME_KEY, task.name) + + activation.__exit__(None, None, None) + detach_span(task, task_id) + + def _trace_before_publish(self, *args, **kwargs): + # The `Task` instance **does not** include any information about the current + # execution, so it **must not** be used to retrieve `request` data. + task_name = kwargs.get("sender") + # pylint: disable=no-member + task = celery.registry.tasks.get(task_name) + task_id = retrieve_task_id(kwargs) + + if task is None or task_id is None: + logger.debug( + "unable to extract the Task and the task_id. This version of Celery may not be supported." + ) + return + + span = self._tracer.start_span(_PRODUCER_ROOT_SPAN) + + # apply some attributes here because most of the data is not available + span.set_attribute(_TASK_TAG_KEY, _TASK_APPLY_ASYNC) + span.set_attribute("celery.id", task_id) + span.set_attribute(_TASK_NAME_KEY, task.name) + set_attributes_from_context(span, kwargs) + + activation = self._tracer.use_span(span, end_on_exit=True) + activation.__enter__() + attach_span(task, task_id, (span, activation), is_publish=True) + + @staticmethod + def _trace_after_publish(*args, **kwargs): + task_name = kwargs.get("sender") + # pylint: disable=no-member + task = celery.registry.tasks.get(task_name) + task_id = retrieve_task_id(kwargs) + if task is None or task_id is None: + logger.debug( + "unable to extract the Task and the task_id. This version of Celery may not be supported." + ) + return + + # retrieve and finish the Span + _, activation = retrieve_span(task, task_id, is_publish=True) + if activation is None: + logger.warning("no existing span found for task_id=%s", task_id) + return + + activation.__exit__(None, None, None) + detach_span(task, task_id, is_publish=True) + + @staticmethod + def _trace_failure(*args, **kwargs): + task = kwargs.get("sender") + task_id = kwargs.get("task_id") + if task is None or task_id is None: + logger.debug( + "unable to extract the Task and the task_id. This version of Celery may not be supported." + ) + return + + # retrieve and pass exception info to activation + span, _ = retrieve_span(task, task_id) + if span is None: + return + + ex = kwargs.get("einfo") + if ex is None: + return + if hasattr(task, "throws") and isinstance(ex.exception, task.throws): + return + + span.set_status( + Status( + canonical_code=StatusCanonicalCode.UNKNOWN, + description=str(ex), + ) + ) + + @staticmethod + def _trace_retry(*args, **kwargs): + task = kwargs.get("sender") + context = kwargs.get("request") + if task is None or context is None: + logger.debug( + "unable to extract the Task or the Context. This version of Celery may not be supported." + ) + return + + reason = kwargs.get("reason") + if not reason: + logger.debug( + "unable to extract the retry reason. This version of Celery may not be supported." + ) + return + + span, _ = retrieve_span(task, context.id) + if span is None: + return + + # Add retry reason metadata to span + # DEV: Use `str(reason)` instead of `reason.message` in case we get + # something that isn't an `Exception` + span.set_attribute(_TASK_RETRY_REASON_KEY, str(reason)) diff --git a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/utils.py b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/utils.py new file mode 100644 index 0000000000..cd1ce034fb --- /dev/null +++ b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/utils.py @@ -0,0 +1,135 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Celery Context key +CTX_KEY = "__otel_task_span" + + +def set_attributes_from_context(span, context): + """Helper to extract meta values from a Celery Context""" + attribute_keys = ( + "compression", + "correlation_id", + "countdown", + "delivery_info", + "eta", + "exchange", + "expires", + "hostname", + "id", + "priority", + "queue", + "reply_to", + "retries", + "routing_key", + "serializer", + "timelimit", + "origin", + "state", + ) + + for key in attribute_keys: + value = context.get(key) + + # Skip this key if it is not set + if value is None or value == "": + continue + + # Skip `timelimit` if it is not set (it's default/unset value is a + # tuple or a list of `None` values + if key == "timelimit" and value in [(None, None), [None, None]]: + continue + + # Skip `retries` if it's value is `0` + if key == "retries" and value == 0: + continue + + # Celery 4.0 uses `origin` instead of `hostname`; this change preserves + # the same name for the tag despite Celery version + if key == "origin": + key = "hostname" + + # TODO: hack to avoid bad attribute type + if key == "delivery_info": + value = str(value) + + # prefix the tag as 'celery' + tag_name = "celery.{}".format(key) + + span.set_attribute(tag_name, value) + + +def attach_span(task, task_id, span, is_publish=False): + """Helper to propagate a `Span` for the given `Task` instance. This + function uses a `dict` that stores the Span using the + `(task_id, is_publish)` as a key. This is useful when information must be + propagated from one Celery signal to another. + + DEV: We use (task_id, is_publish) for the key to ensure that publishing a + task from within another task does not cause any conflicts. + + This mostly happens when either a task fails and a retry policy is in place, + or when a task is manually retries (e.g. `task.retry()`), we end up trying + to publish a task with the same id as the task currently running. + + Previously publishing the new task would overwrite the existing `celery.run` span + in the `dict` causing that span to be forgotten and never finished + NOTE: We cannot test for this well yet, because we do not run a celery worker, + and cannot run `task.apply_async()` + """ + span_dict = getattr(task, CTX_KEY, None) + if span_dict is None: + span_dict = dict() + setattr(task, CTX_KEY, span_dict) + + span_dict[(task_id, is_publish)] = span + + +def detach_span(task, task_id, is_publish=False): + """Helper to remove a `Span` in a Celery task when it's propagated. + This function handles tasks where the `Span` is not attached. + """ + span_dict = getattr(task, CTX_KEY, None) + if span_dict is None: + return + + # DEV: See note in `attach_span` for key info + span_dict.pop((task_id, is_publish), (None, None)) + + +def retrieve_span(task, task_id, is_publish=False): + """Helper to retrieve an active `Span` stored in a `Task` + instance + """ + span_dict = getattr(task, CTX_KEY, None) + if span_dict is None: + return (None, None) + + # DEV: See note in `attach_span` for key info + return span_dict.get((task_id, is_publish), (None, None)) + + +def retrieve_task_id(context): + """Helper to retrieve the `Task` identifier from the message `body`. + This helper supports Protocol Version 1 and 2. The Protocol is well + detailed in the official documentation: + http://docs.celeryproject.org/en/latest/internals/protocol.html + """ + headers = context.get("headers") + body = context.get("body") + if headers: + # Protocol Version 2 (default from Celery 4.0) + return headers.get("id") + # Protocol Version 1 + return body.get("id") diff --git a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/version.py b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/version.py new file mode 100644 index 0000000000..86c61362ab --- /dev/null +++ b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/version.py @@ -0,0 +1,15 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__version__ = "0.7.dev0" diff --git a/ext/opentelemetry-ext-celery/tests/__init__.py b/ext/opentelemetry-ext-celery/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/ext/opentelemetry-ext-celery/tests/test_utils.py b/ext/opentelemetry-ext-celery/tests/test_utils.py new file mode 100644 index 0000000000..03c5404a8a --- /dev/null +++ b/ext/opentelemetry-ext-celery/tests/test_utils.py @@ -0,0 +1,200 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from unittest import mock + +from celery import Celery + +from opentelemetry import trace as trace_api +from opentelemetry.ext.celery import utils +from opentelemetry.sdk import trace + + +class TestUtils(unittest.TestCase): + def setUp(self): + self.app = Celery("celery.test_app") + + def test_set_attributes_from_context(self): + # it should extract only relevant keys + context = { + "correlation_id": "44b7f305", + "delivery_info": "{'eager': 'True'}", + "eta": "soon", + "expires": "later", + "hostname": "localhost", + "id": "44b7f305", + "reply_to": "44b7f305", + "retries": 4, + "timelimit": ("now", "later"), + "custom_meta": "custom_value", + } + + span = trace.Span("name", mock.Mock(spec=trace_api.SpanContext)) + utils.set_attributes_from_context(span, context) + + self.assertEqual(span.attributes["celery.correlation_id"], "44b7f305") + self.assertEqual( + span.attributes["celery.delivery_info"], "{'eager': 'True'}" + ) + self.assertEqual(span.attributes.get("celery.eta"), "soon") + self.assertEqual(span.attributes.get("celery.expires"), "later") + self.assertEqual(span.attributes.get("celery.hostname"), "localhost") + self.assertEqual(span.attributes.get("celery.id"), "44b7f305") + self.assertEqual(span.attributes.get("celery.reply_to"), "44b7f305") + self.assertEqual(span.attributes.get("celery.retries"), 4) + self.assertEqual( + span.attributes.get("celery.timelimit"), ("now", "later") + ) + self.assertNotIn("custom_meta", span.attributes) + + def test_set_attributes_from_context_empty_keys(self): + # it should not extract empty keys + context = { + "correlation_id": None, + "exchange": "", + "timelimit": (None, None), + "retries": 0, + } + + span = trace.Span("name", mock.Mock(spec=trace_api.SpanContext)) + utils.set_attributes_from_context(span, context) + + self.assertEqual(len(span.attributes), 0) + # edge case: `timelimit` can also be a list of None values + context = { + "timelimit": [None, None], + } + + utils.set_attributes_from_context(span, context) + + self.assertEqual(len(span.attributes), 0) + + def test_span_propagation(self): + # ensure spans getter and setter works properly + @self.app.task + def fn_task(): + return 42 + + # propagate and retrieve a Span + task_id = "7c6731af-9533-40c3-83a9-25b58f0d837f" + span = trace.Span("name", mock.Mock(spec=trace_api.SpanContext)) + utils.attach_span(fn_task, task_id, span) + span_after = utils.retrieve_span(fn_task, task_id) + self.assertIs(span, span_after) + + def test_span_delete(self): + # ensure the helper removes properly a propagated Span + @self.app.task + def fn_task(): + return 42 + + # propagate a Span + task_id = "7c6731af-9533-40c3-83a9-25b58f0d837f" + span = trace.Span("name", mock.Mock(spec=trace_api.SpanContext)) + utils.attach_span(fn_task, task_id, span) + # delete the Span + utils.detach_span(fn_task, task_id) + self.assertEqual(utils.retrieve_span(fn_task, task_id), (None, None)) + + def test_span_delete_empty(self): + # ensure detach_span doesn't raise an exception if span is not present + @self.app.task + def fn_task(): + return 42 + + # delete the Span + exception = None + task_id = "7c6731af-9533-40c3-83a9-25b58f0d837f" + try: + utils.detach_span(fn_task, task_id) + self.assertEqual( + utils.retrieve_span(fn_task, task_id), (None, None) + ) + except Exception as ex: # pylint: disable=broad-except + exception = ex + self.assertIsNone(exception) + + def test_task_id_from_protocol_v1(self): + # ensures a `task_id` is properly returned when Protocol v1 is used. + # `context` is an example of an emitted Signal with Protocol v1 + context = { + "body": { + "expires": None, + "utc": True, + "args": ["user"], + "chord": None, + "callbacks": None, + "errbacks": None, + "taskset": None, + "id": "dffcaec1-dd92-4a1a-b3ab-d6512f4beeb7", + "retries": 0, + "task": "tests.contrib.celery.test_integration.fn_task_parameters", + "timelimit": (None, None), + "eta": None, + "kwargs": {"force_logout": True}, + }, + "sender": "tests.contrib.celery.test_integration.fn_task_parameters", + "exchange": "celery", + "routing_key": "celery", + "retry_policy": None, + "headers": {}, + "properties": {}, + } + + task_id = utils.retrieve_task_id(context) + self.assertEqual(task_id, "dffcaec1-dd92-4a1a-b3ab-d6512f4beeb7") + + def test_task_id_from_protocol_v2(self): + # ensures a `task_id` is properly returned when Protocol v2 is used. + # `context` is an example of an emitted Signal with Protocol v2 + context = { + "body": ( + ["user"], + {"force_logout": True}, + { + u"chord": None, + u"callbacks": None, + u"errbacks": None, + u"chain": None, + }, + ), + "sender": u"tests.contrib.celery.test_integration.fn_task_parameters", + "exchange": u"", + "routing_key": u"celery", + "retry_policy": None, + "headers": { + u"origin": u"gen83744@hostname", + u"root_id": "7e917b83-4018-431d-9832-73a28e1fb6c0", + u"expires": None, + u"shadow": None, + u"id": "7e917b83-4018-431d-9832-73a28e1fb6c0", + u"kwargsrepr": u"{'force_logout': True}", + u"lang": u"py", + u"retries": 0, + u"task": u"tests.contrib.celery.test_integration.fn_task_parameters", + u"group": None, + u"timelimit": [None, None], + u"parent_id": None, + u"argsrepr": u"['user']", + u"eta": None, + }, + "properties": { + u"reply_to": "c3054a07-5b28-3855-b18c-1623a24aaeca", + u"correlation_id": "7e917b83-4018-431d-9832-73a28e1fb6c0", + }, + } + + task_id = utils.retrieve_task_id(context) + self.assertEqual(task_id, "7e917b83-4018-431d-9832-73a28e1fb6c0") diff --git a/ext/opentelemetry-ext-docker-tests/tests/celery/test_celery_functional.py b/ext/opentelemetry-ext-docker-tests/tests/celery/test_celery_functional.py new file mode 100644 index 0000000000..1718b801a8 --- /dev/null +++ b/ext/opentelemetry-ext-docker-tests/tests/celery/test_celery_functional.py @@ -0,0 +1,456 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import celery +from celery import Celery +from celery.exceptions import Retry + +from opentelemetry.ext.celery import CeleryInstrumentor +from opentelemetry.sdk import resources +from opentelemetry.test.test_base import TestBase +from opentelemetry.trace.status import StatusCanonicalCode + +REDIS_HOST = os.getenv("REDIS_HOST", "localhost") +REDIS_PORT = int(os.getenv("REDIS_PORT ", "6379")) +REDIS_URL = "redis://{host}:{port}".format(host=REDIS_HOST, port=REDIS_PORT) +BROKER_URL = "{redis}/{db}".format(redis=REDIS_URL, db=0) +BACKEND_URL = "{redis}/{db}".format(redis=REDIS_URL, db=1) + + +class MyException(Exception): + pass + + +class TestCeleryIntegration(TestBase): + def setUp(self): + super().setUp() + CeleryInstrumentor().instrument() + self.app = Celery( + "celery.test_app", broker=BROKER_URL, backend=BACKEND_URL + ) + + def tearDown(self): + with self.disable_logging(): + CeleryInstrumentor().uninstrument() + super().tearDown() + + def test_concurrent_delays(self): + # it should create one trace for each delayed execution + @self.app.task + def fn_task(): + return 42 + + for x in range(100): + fn_task.delay() + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 100) + + def test_fn_task_run(self): + # the body of the function is not instrumented so calling it + # directly doesn't create a trace + @self.app.task + def fn_task(): + return 42 + + t = fn_task.run() + self.assertEqual(t, 42) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 0) + + def test_fn_task_call(self): + # the body of the function is not instrumented so calling it + # directly doesn't create a trace + @self.app.task + def fn_task(): + return 42 + + t = fn_task() + self.assertEqual(t, 42) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 0) + + def test_fn_task_apply(self): + # it should execute a traced task with a returning value + @self.app.task + def fn_task(): + return 42 + + t = fn_task.apply() + self.assertTrue(t.successful()) + self.assertEqual(t.result, 42) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + + span = spans[0] + + self.assertTrue(span.status.is_ok) + self.assertEqual(span.name, "celery.run") + self.assertEqual( + span.attributes.get("celery.task_name"), + "test_celery_functional.fn_task", + ) + self.assertEqual(span.attributes.get("celery.id"), t.task_id) + self.assertEqual(span.attributes.get("celery.action"), "run") + self.assertEqual(span.attributes.get("celery.state"), "SUCCESS") + + def test_fn_task_apply_bind(self): + # it should execute a traced task with a returning value + @self.app.task(bind=True) + def fn_task(self): + return self + + t = fn_task.apply() + self.assertTrue(t.successful()) + self.assertIn("fn_task", t.result.name) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + + span = spans[0] + + self.assertTrue(span.status.is_ok) + self.assertEqual(span.name, "celery.run") + self.assertEqual( + span.attributes.get("celery.task_name"), + "test_celery_functional.fn_task", + ) + self.assertEqual(span.attributes.get("celery.id"), t.task_id) + self.assertEqual(span.attributes.get("celery.action"), "run") + self.assertEqual(span.attributes.get("celery.state"), "SUCCESS") + + def test_fn_task_apply_async(self): + # it should execute a traced async task that has parameters + @self.app.task + def fn_task_parameters(user, force_logout=False): + return (user, force_logout) + + t = fn_task_parameters.apply_async( + args=["user"], kwargs={"force_logout": True} + ) + self.assertEqual("PENDING", t.status) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + + span = spans[0] + + self.assertTrue(span.status.is_ok) + self.assertEqual(span.name, "celery.apply") + self.assertEqual( + span.attributes.get("celery.task_name"), + "test_celery_functional.fn_task_parameters", + ) + self.assertEqual(span.attributes.get("celery.id"), t.task_id) + self.assertEqual(span.attributes.get("celery.action"), "apply_async") + self.assertEqual(span.attributes.get("celery.routing_key"), "celery") + + def test_fn_task_delay(self): + # using delay shorthand must preserve arguments + @self.app.task + def fn_task_parameters(user, force_logout=False): + return (user, force_logout) + + t = fn_task_parameters.delay("user", force_logout=True) + self.assertEqual(t.status, "PENDING") + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + + span = spans[0] + + self.assertTrue(span.status.is_ok) + self.assertEqual(span.name, "celery.apply") + self.assertEqual( + span.attributes.get("celery.task_name"), + "test_celery_functional.fn_task_parameters", + ) + self.assertEqual(span.attributes.get("celery.id"), t.task_id) + self.assertEqual(span.attributes.get("celery.action"), "apply_async") + self.assertEqual(span.attributes.get("celery.routing_key"), "celery") + + def test_fn_exception(self): + # it should catch exceptions in task functions + @self.app.task + def fn_exception(): + raise Exception("Task class is failing") + + t = fn_exception.apply() + self.assertTrue(t.failed()) + self.assertIn("Task class is failing", t.traceback) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + + span = spans[0] + + self.assertEqual(span.name, "celery.run") + self.assertEqual( + span.attributes.get("celery.task_name"), + "test_celery_functional.fn_exception", + ) + self.assertEqual(span.attributes.get("celery.id"), t.task_id) + self.assertEqual(span.attributes.get("celery.action"), "run") + self.assertEqual(span.attributes.get("celery.state"), "FAILURE") + + self.assertIs(span.status.canonical_code, StatusCanonicalCode.UNKNOWN) + self.assertIn("Task class is failing", span.status.description) + + def test_fn_exception_expected(self): + # it should catch exceptions in task functions + @self.app.task(throws=(MyException,)) + def fn_exception(): + raise MyException("Task class is failing") + + t = fn_exception.apply() + self.assertTrue(t.failed()) + self.assertIn("Task class is failing", t.traceback) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + + span = spans[0] + + self.assertEqual(span.name, "celery.run") + self.assertEqual( + span.attributes.get("celery.task_name"), + "test_celery_functional.fn_exception", + ) + self.assertEqual(span.attributes.get("celery.id"), t.task_id) + self.assertEqual(span.attributes.get("celery.action"), "run") + self.assertEqual(span.attributes.get("celery.state"), "FAILURE") + + self.assertTrue(span.status.is_ok) + + def test_fn_retry_exception(self): + # it should not catch retry exceptions in task functions + @self.app.task + def fn_exception(): + raise Retry("Task class is being retried") + + t = fn_exception.apply() + self.assertFalse(t.failed()) + self.assertIn("Task class is being retried", t.traceback) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + + span = spans[0] + + self.assertEqual(span.name, "celery.run") + self.assertEqual( + span.attributes.get("celery.task_name"), + "test_celery_functional.fn_exception", + ) + self.assertEqual(span.attributes.get("celery.id"), t.task_id) + self.assertEqual(span.attributes.get("celery.action"), "run") + self.assertEqual(span.attributes.get("celery.state"), "RETRY") + + # This type of retrying should not be marked as an exceptio + self.assertTrue(span.status.is_ok) + + def test_class_task(self): + # it should execute class based tasks with a returning value + class BaseTask(self.app.Task): + def run(self): + return 42 + + t = BaseTask() + # register the Task class if it's available (required in Celery 4.0+) + register_task = getattr(self.app, "register_task", None) + if register_task is not None: + register_task(t) + + r = t.apply() + self.assertTrue(r.successful()) + self.assertEqual(r.result, 42) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + + span = spans[0] + + self.assertTrue(span.status.is_ok) + self.assertEqual(span.name, "celery.run") + self.assertEqual( + span.attributes.get("celery.task_name"), + "test_celery_functional.BaseTask", + ) + self.assertEqual(span.attributes.get("celery.id"), r.task_id) + self.assertEqual(span.attributes.get("celery.action"), "run") + self.assertEqual(span.attributes.get("celery.state"), "SUCCESS") + + def test_class_task_exception(self): + # it should catch exceptions in class based tasks + class BaseTask(self.app.Task): + def run(self): + raise Exception("Task class is failing") + + t = BaseTask() + # register the Task class if it's available (required in Celery 4.0+) + register_task = getattr(self.app, "register_task", None) + if register_task is not None: + register_task(t) + + r = t.apply() + self.assertTrue(r.failed()) + self.assertIn("Task class is failing", r.traceback) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + + span = spans[0] + + self.assertEqual(span.name, "celery.run") + self.assertEqual( + span.attributes.get("celery.task_name"), + "test_celery_functional.BaseTask", + ) + self.assertEqual(span.attributes.get("celery.id"), r.task_id) + self.assertEqual(span.attributes.get("celery.action"), "run") + self.assertEqual(span.attributes.get("celery.state"), "FAILURE") + + self.assertIs(span.status.canonical_code, StatusCanonicalCode.UNKNOWN) + self.assertIn("Task class is failing", span.status.description) + + def test_class_task_exception_expected(self): + # it should catch exceptions in class based tasks + class BaseTask(self.app.Task): + throws = (MyException,) + + def run(self): + raise MyException("Task class is failing") + + t = BaseTask() + # register the Task class if it's available (required in Celery 4.0+) + register_task = getattr(self.app, "register_task", None) + if register_task is not None: + register_task(t) + + r = t.apply() + self.assertTrue(r.failed()) + self.assertIn("Task class is failing", r.traceback) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + + span = spans[0] + + self.assertEqual(span.name, "celery.run") + self.assertEqual( + span.attributes.get("celery.task_name"), + "test_celery_functional.BaseTask", + ) + self.assertEqual(span.attributes.get("celery.id"), r.task_id) + self.assertEqual(span.attributes.get("celery.action"), "run") + self.assertEqual(span.attributes.get("celery.state"), "FAILURE") + + self.assertTrue(span.status.is_ok) + + def test_shared_task(self): + # Ensure Django Shared Task are supported + @celery.shared_task + def add(x, y): + return x + y + + res = add.apply([2, 2]) + self.assertEqual(res.result, 4) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + + span = spans[0] + + self.assertTrue(span.status.is_ok) + self.assertEqual(span.name, "celery.run") + self.assertEqual( + span.attributes.get("celery.task_name"), + "test_celery_functional.add", + ) + self.assertEqual(span.attributes.get("celery.id"), res.task_id) + self.assertEqual(span.attributes.get("celery.action"), "run") + self.assertEqual(span.attributes.get("celery.state"), "SUCCESS") + + def test_apply_async_previous_style_tasks(self): + # ensures apply_async is properly patched if Celery 1.0 style tasks + # are used even in newer versions. This should extend support to + # previous versions of Celery. + class CelerySuperClass(celery.task.Task): + abstract = True + + @classmethod + def apply_async(cls, args=None, kwargs=None, **kwargs_): + return super(CelerySuperClass, cls).apply_async( + args=args, kwargs=kwargs, **kwargs_ + ) + + def run(self, *args, **kwargs): + if "stop" in kwargs: + # avoid call loop + return + CelerySubClass.apply_async(args=[], kwargs={"stop": True}) + + class CelerySubClass(CelerySuperClass): + pass + + t = CelerySubClass() + res = t.apply() + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 2) + + span = spans[1] + self.assertTrue(span.status.is_ok) + self.assertEqual(span.name, "celery.run") + self.assertEqual( + span.attributes.get("celery.task_name"), + "test_celery_functional.CelerySubClass", + ) + self.assertEqual(span.attributes.get("celery.id"), res.task_id) + self.assertEqual(span.attributes.get("celery.action"), "run") + self.assertEqual(span.attributes.get("celery.state"), "SUCCESS") + + span = spans[0] + self.assertTrue(span.status.is_ok) + self.assertEqual(span.name, "celery.apply") + self.assertEqual( + span.attributes.get("celery.task_name"), + "test_celery_functional.CelerySubClass", + ) + self.assertEqual(span.attributes.get("celery.action"), "apply_async") + + def test_custom_tracer_provider(self): + @self.app.task + def fn_task(): + return 42 + + resource = resources.Resource.create({}) + result = self.create_tracer_provider(resource=resource) + tracer_provider, exporter = result + + CeleryInstrumentor().uninstrument() + CeleryInstrumentor().instrument(tracer_provider=tracer_provider) + + fn_task.delay() + + spans_list = exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + + self.assertIs(span.resource, resource) diff --git a/tox.ini b/tox.ini index 1570df787c..9bf37fa3ae 100644 --- a/tox.ini +++ b/tox.ini @@ -95,6 +95,10 @@ envlist = py3{4,5,6,7,8}-test-ext-redis pypy3-test-ext-redis + ; opentelemetry-ext-celery + py3{4,5,6,7,8}-test-ext-celery + pypy3-test-ext-celery + ; Coverage is temporarily disabled for pypy3 due to the pytest bug. ; pypy3-coverage @@ -142,6 +146,7 @@ changedir = test-opentracing-shim: ext/opentelemetry-ext-opentracing-shim/tests test-ext-sqlalchemy: ext/opentelemetry-ext-sqlalchemy/tests test-ext-redis: ext/opentelemetry-ext-redis/tests + test-ext-celery: ext/opentelemetry-ext-celery/tests commands_pre = ; Install without -e to test the actual installation @@ -166,6 +171,9 @@ commands_pre = example-http: pip install -e {toxinidir}/ext/opentelemetry-ext-wsgi example-http: pip install -r {toxinidir}/docs/examples/http/requirements.txt + celery: pip install {toxinidir}/opentelemetry-auto-instrumentation + celery: pip install {toxinidir}/ext/opentelemetry-ext-celery[test] + grpc: pip install {toxinidir}/ext/opentelemetry-ext-grpc[test] wsgi,flask: pip install {toxinidir}/ext/opentelemetry-ext-wsgi @@ -240,6 +248,7 @@ deps = psutil readme_renderer httpretty + celery commands_pre = python scripts/eachdist.py install --editable @@ -288,6 +297,7 @@ deps = psycopg2-binary ~= 2.8.4 sqlalchemy ~= 1.3.16 redis ~= 3.3.11 + celery ~= 4.0 changedir = ext/opentelemetry-ext-docker-tests/tests @@ -297,6 +307,7 @@ commands_pre = -e {toxinidir}/opentelemetry-sdk \ -e {toxinidir}/opentelemetry-auto-instrumentation \ -e {toxinidir}/tests/util \ + -e {toxinidir}/ext/opentelemetry-ext-celery \ -e {toxinidir}/ext/opentelemetry-ext-dbapi \ -e {toxinidir}/ext/opentelemetry-ext-mysql \ -e {toxinidir}/ext/opentelemetry-ext-psycopg2 \ From 8b0f2e524885f58c963ba85128107a7b457fc50f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Tue, 5 May 2020 17:10:59 -0500 Subject: [PATCH 03/22] add missing instrument in docstring --- .../src/opentelemetry/ext/celery/__init__.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py index 5ff5dc9ce9..1825a170a8 100644 --- a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py +++ b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py @@ -38,6 +38,9 @@ trace.set_tracer_provider(TracerProvider()) # TODO: configure span exporters + from opentelemetry.ext.celery import CeleryInstrumentor + CeleryInstrumentor().instrument() + from celery import Celery app = Celery("tasks", broker="amqp://localhost") From df5bf4819ce757c276fdaab27e545b5db14f33a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Wed, 6 May 2020 10:22:41 -0500 Subject: [PATCH 04/22] Update with some semantic conventions --- .../src/opentelemetry/ext/celery/__init__.py | 15 ++-- .../src/opentelemetry/ext/celery/utils.py | 31 ++++++- .../tests/test_utils.py | 18 +++- .../tests/celery/test_celery_functional.py | 86 +++++++++++++------ 4 files changed, 109 insertions(+), 41 deletions(-) diff --git a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py index 1825a170a8..ea602ad4e5 100644 --- a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py +++ b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py @@ -74,17 +74,14 @@ def add(x, y): logger = logging.getLogger(__name__) -# Span names -_PRODUCER_ROOT_SPAN = "celery.apply" -_WORKER_ROOT_SPAN = "celery.run" - # Task operations _TASK_TAG_KEY = "celery.action" -_TASK_APPLY = "apply" _TASK_APPLY_ASYNC = "apply_async" _TASK_RUN = "run" + _TASK_RETRY_REASON_KEY = "celery.retry.reason" _TASK_NAME_KEY = "celery.task_name" +_MESSAGE_ID_ATTRIBUTE_NAME = "messaging.message_id" class CeleryInstrumentor(BaseInstrumentor): @@ -124,7 +121,8 @@ def _trace_prerun(self, *args, **kwargs): ) return - span = self._tracer.start_span(_WORKER_ROOT_SPAN) + # TODO: When the span could be SERVER? + span = self._tracer.start_span(task.name, kind=trace.SpanKind.CONSUMER) activation = self._tracer.use_span(span, end_on_exit=True) activation.__enter__() @@ -170,11 +168,12 @@ def _trace_before_publish(self, *args, **kwargs): ) return - span = self._tracer.start_span(_PRODUCER_ROOT_SPAN) + # TODO: When the span could be CLIENT? + span = self._tracer.start_span(task.name, kind=trace.SpanKind.PRODUCER) # apply some attributes here because most of the data is not available span.set_attribute(_TASK_TAG_KEY, _TASK_APPLY_ASYNC) - span.set_attribute("celery.id", task_id) + span.set_attribute(_MESSAGE_ID_ATTRIBUTE_NAME, task_id) span.set_attribute(_TASK_NAME_KEY, task.name) set_attributes_from_context(span, kwargs) diff --git a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/utils.py b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/utils.py index cd1ce034fb..0ffdbbcb0e 100644 --- a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/utils.py +++ b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/utils.py @@ -16,13 +16,16 @@ CTX_KEY = "__otel_task_span" +# pylint:disable=too-many-branches def set_attributes_from_context(span, context): """Helper to extract meta values from a Celery Context""" + attribute_keys = ( "compression", "correlation_id", "countdown", "delivery_info", + "declare", "eta", "exchange", "expires", @@ -62,12 +65,36 @@ def set_attributes_from_context(span, context): # TODO: hack to avoid bad attribute type if key == "delivery_info": + # Get also destination from this + routing_key = value.get("routing_key") + if routing_key: + span.set_attribute("messaging.destination", routing_key) value = str(value) # prefix the tag as 'celery' - tag_name = "celery.{}".format(key) + attribute_name = "celery.{}".format(key) + + if key == "id": + attribute_name = "messaging.message_id" + + if key == "correlation_id": + attribute_name = "messaging.conversation_id" + + if key == "routing_key": + attribute_name = "messaging.destination" + + # according to https://docs.celeryproject.org/en/stable/userguide/routing.html#exchange-types + if key == "declare": + attribute_name = "messaging.destination_kind" + for declare in value: + if declare.exchange.type == "direct": + value = "queue" + break + if declare.exchange.type == "topic": + value = "topic" + break - span.set_attribute(tag_name, value) + span.set_attribute(attribute_name, value) def attach_span(task, task_id, span, is_publish=False): diff --git a/ext/opentelemetry-ext-celery/tests/test_utils.py b/ext/opentelemetry-ext-celery/tests/test_utils.py index 03c5404a8a..ea0b4b819e 100644 --- a/ext/opentelemetry-ext-celery/tests/test_utils.py +++ b/ext/opentelemetry-ext-celery/tests/test_utils.py @@ -30,7 +30,7 @@ def test_set_attributes_from_context(self): # it should extract only relevant keys context = { "correlation_id": "44b7f305", - "delivery_info": "{'eager': 'True'}", + "delivery_info": {"eager": True}, "eta": "soon", "expires": "later", "hostname": "localhost", @@ -39,19 +39,29 @@ def test_set_attributes_from_context(self): "retries": 4, "timelimit": ("now", "later"), "custom_meta": "custom_value", + "routing_key": "celery", } span = trace.Span("name", mock.Mock(spec=trace_api.SpanContext)) utils.set_attributes_from_context(span, context) - self.assertEqual(span.attributes["celery.correlation_id"], "44b7f305") self.assertEqual( - span.attributes["celery.delivery_info"], "{'eager': 'True'}" + span.attributes.get("messaging.message_id"), "44b7f305" + ) + self.assertEqual( + span.attributes.get("messaging.conversation_id"), "44b7f305" + ) + self.assertEqual( + span.attributes.get("messaging.destination"), "celery" + ) + + self.assertEqual( + span.attributes["celery.delivery_info"], str({"eager": True}) ) self.assertEqual(span.attributes.get("celery.eta"), "soon") self.assertEqual(span.attributes.get("celery.expires"), "later") self.assertEqual(span.attributes.get("celery.hostname"), "localhost") - self.assertEqual(span.attributes.get("celery.id"), "44b7f305") + self.assertEqual(span.attributes.get("celery.reply_to"), "44b7f305") self.assertEqual(span.attributes.get("celery.retries"), 4) self.assertEqual( diff --git a/ext/opentelemetry-ext-docker-tests/tests/celery/test_celery_functional.py b/ext/opentelemetry-ext-docker-tests/tests/celery/test_celery_functional.py index 1718b801a8..5f36a4584a 100644 --- a/ext/opentelemetry-ext-docker-tests/tests/celery/test_celery_functional.py +++ b/ext/opentelemetry-ext-docker-tests/tests/celery/test_celery_functional.py @@ -101,12 +101,14 @@ def fn_task(): span = spans[0] self.assertTrue(span.status.is_ok) - self.assertEqual(span.name, "celery.run") + self.assertEqual(span.name, "test_celery_functional.fn_task") + self.assertEqual( + span.attributes.get("messaging.message_id"), t.task_id + ) self.assertEqual( span.attributes.get("celery.task_name"), "test_celery_functional.fn_task", ) - self.assertEqual(span.attributes.get("celery.id"), t.task_id) self.assertEqual(span.attributes.get("celery.action"), "run") self.assertEqual(span.attributes.get("celery.state"), "SUCCESS") @@ -126,12 +128,14 @@ def fn_task(self): span = spans[0] self.assertTrue(span.status.is_ok) - self.assertEqual(span.name, "celery.run") + self.assertEqual(span.name, "test_celery_functional.fn_task") + self.assertEqual( + span.attributes.get("messaging.message_id"), t.task_id + ) self.assertEqual( span.attributes.get("celery.task_name"), "test_celery_functional.fn_task", ) - self.assertEqual(span.attributes.get("celery.id"), t.task_id) self.assertEqual(span.attributes.get("celery.action"), "run") self.assertEqual(span.attributes.get("celery.state"), "SUCCESS") @@ -152,14 +156,20 @@ def fn_task_parameters(user, force_logout=False): span = spans[0] self.assertTrue(span.status.is_ok) - self.assertEqual(span.name, "celery.apply") + self.assertEqual( + span.name, "test_celery_functional.fn_task_parameters" + ) + self.assertEqual( + span.attributes.get("messaging.message_id"), t.task_id + ) + self.assertEqual( + span.attributes.get("messaging.destination"), "celery" + ) self.assertEqual( span.attributes.get("celery.task_name"), "test_celery_functional.fn_task_parameters", ) - self.assertEqual(span.attributes.get("celery.id"), t.task_id) self.assertEqual(span.attributes.get("celery.action"), "apply_async") - self.assertEqual(span.attributes.get("celery.routing_key"), "celery") def test_fn_task_delay(self): # using delay shorthand must preserve arguments @@ -176,14 +186,20 @@ def fn_task_parameters(user, force_logout=False): span = spans[0] self.assertTrue(span.status.is_ok) - self.assertEqual(span.name, "celery.apply") + self.assertEqual( + span.name, "test_celery_functional.fn_task_parameters" + ) + self.assertEqual( + span.attributes.get("messaging.message_id"), t.task_id + ) + self.assertEqual( + span.attributes.get("messaging.destination"), "celery" + ) self.assertEqual( span.attributes.get("celery.task_name"), "test_celery_functional.fn_task_parameters", ) - self.assertEqual(span.attributes.get("celery.id"), t.task_id) self.assertEqual(span.attributes.get("celery.action"), "apply_async") - self.assertEqual(span.attributes.get("celery.routing_key"), "celery") def test_fn_exception(self): # it should catch exceptions in task functions @@ -200,12 +216,14 @@ def fn_exception(): span = spans[0] - self.assertEqual(span.name, "celery.run") + self.assertEqual(span.name, "test_celery_functional.fn_exception") + self.assertEqual( + span.attributes.get("messaging.message_id"), t.task_id + ) self.assertEqual( span.attributes.get("celery.task_name"), "test_celery_functional.fn_exception", ) - self.assertEqual(span.attributes.get("celery.id"), t.task_id) self.assertEqual(span.attributes.get("celery.action"), "run") self.assertEqual(span.attributes.get("celery.state"), "FAILURE") @@ -227,12 +245,14 @@ def fn_exception(): span = spans[0] - self.assertEqual(span.name, "celery.run") + self.assertEqual(span.name, "test_celery_functional.fn_exception") + self.assertEqual( + span.attributes.get("messaging.message_id"), t.task_id + ) self.assertEqual( span.attributes.get("celery.task_name"), "test_celery_functional.fn_exception", ) - self.assertEqual(span.attributes.get("celery.id"), t.task_id) self.assertEqual(span.attributes.get("celery.action"), "run") self.assertEqual(span.attributes.get("celery.state"), "FAILURE") @@ -253,12 +273,14 @@ def fn_exception(): span = spans[0] - self.assertEqual(span.name, "celery.run") + self.assertEqual(span.name, "test_celery_functional.fn_exception") + self.assertEqual( + span.attributes.get("messaging.message_id"), t.task_id + ) self.assertEqual( span.attributes.get("celery.task_name"), "test_celery_functional.fn_exception", ) - self.assertEqual(span.attributes.get("celery.id"), t.task_id) self.assertEqual(span.attributes.get("celery.action"), "run") self.assertEqual(span.attributes.get("celery.state"), "RETRY") @@ -287,12 +309,14 @@ def run(self): span = spans[0] self.assertTrue(span.status.is_ok) - self.assertEqual(span.name, "celery.run") + self.assertEqual(span.name, "test_celery_functional.BaseTask") + self.assertEqual( + span.attributes.get("messaging.message_id"), r.task_id + ) self.assertEqual( span.attributes.get("celery.task_name"), "test_celery_functional.BaseTask", ) - self.assertEqual(span.attributes.get("celery.id"), r.task_id) self.assertEqual(span.attributes.get("celery.action"), "run") self.assertEqual(span.attributes.get("celery.state"), "SUCCESS") @@ -317,12 +341,14 @@ def run(self): span = spans[0] - self.assertEqual(span.name, "celery.run") + self.assertEqual(span.name, "test_celery_functional.BaseTask") + self.assertEqual( + span.attributes.get("messaging.message_id"), r.task_id + ) self.assertEqual( span.attributes.get("celery.task_name"), "test_celery_functional.BaseTask", ) - self.assertEqual(span.attributes.get("celery.id"), r.task_id) self.assertEqual(span.attributes.get("celery.action"), "run") self.assertEqual(span.attributes.get("celery.state"), "FAILURE") @@ -352,12 +378,14 @@ def run(self): span = spans[0] - self.assertEqual(span.name, "celery.run") + self.assertEqual(span.name, "test_celery_functional.BaseTask") + self.assertEqual( + span.attributes.get("messaging.message_id"), r.task_id + ) self.assertEqual( span.attributes.get("celery.task_name"), "test_celery_functional.BaseTask", ) - self.assertEqual(span.attributes.get("celery.id"), r.task_id) self.assertEqual(span.attributes.get("celery.action"), "run") self.assertEqual(span.attributes.get("celery.state"), "FAILURE") @@ -378,12 +406,14 @@ def add(x, y): span = spans[0] self.assertTrue(span.status.is_ok) - self.assertEqual(span.name, "celery.run") + self.assertEqual(span.name, "test_celery_functional.add") self.assertEqual( span.attributes.get("celery.task_name"), "test_celery_functional.add", ) - self.assertEqual(span.attributes.get("celery.id"), res.task_id) + self.assertEqual( + span.attributes.get("messaging.message_id"), res.task_id + ) self.assertEqual(span.attributes.get("celery.action"), "run") self.assertEqual(span.attributes.get("celery.state"), "SUCCESS") @@ -417,18 +447,20 @@ class CelerySubClass(CelerySuperClass): span = spans[1] self.assertTrue(span.status.is_ok) - self.assertEqual(span.name, "celery.run") + self.assertEqual(span.name, "test_celery_functional.CelerySubClass") + self.assertEqual( + span.attributes.get("messaging.message_id"), res.task_id + ) self.assertEqual( span.attributes.get("celery.task_name"), "test_celery_functional.CelerySubClass", ) - self.assertEqual(span.attributes.get("celery.id"), res.task_id) self.assertEqual(span.attributes.get("celery.action"), "run") self.assertEqual(span.attributes.get("celery.state"), "SUCCESS") span = spans[0] self.assertTrue(span.status.is_ok) - self.assertEqual(span.name, "celery.apply") + self.assertEqual(span.name, "test_celery_functional.CelerySubClass") self.assertEqual( span.attributes.get("celery.task_name"), "test_celery_functional.CelerySubClass", From 5ae56261bd272bc131c04614cb6c3c75876fc2d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Wed, 6 May 2020 10:53:18 -0500 Subject: [PATCH 05/22] add test for instrumentation info --- .../tests/celery/test_celery_functional.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/ext/opentelemetry-ext-docker-tests/tests/celery/test_celery_functional.py b/ext/opentelemetry-ext-docker-tests/tests/celery/test_celery_functional.py index 5f36a4584a..d1ca730c09 100644 --- a/ext/opentelemetry-ext-docker-tests/tests/celery/test_celery_functional.py +++ b/ext/opentelemetry-ext-docker-tests/tests/celery/test_celery_functional.py @@ -18,6 +18,7 @@ from celery import Celery from celery.exceptions import Retry +import opentelemetry.ext.celery from opentelemetry.ext.celery import CeleryInstrumentor from opentelemetry.sdk import resources from opentelemetry.test.test_base import TestBase @@ -47,6 +48,19 @@ def tearDown(self): CeleryInstrumentor().uninstrument() super().tearDown() + def test_instrumentation_info(self): + @self.app.task + def fn_task(): + return 42 + + fn_task.delay() + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + + span = spans[0] + self.check_span_instrumentation_info(span, opentelemetry.ext.celery) + def test_concurrent_delays(self): # it should create one trace for each delayed execution @self.app.task From 001527e51f773753075397c513d2ad6ca94a35d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Wed, 6 May 2020 13:55:36 -0500 Subject: [PATCH 06/22] don't import whole celery --- .../src/opentelemetry/ext/celery/__init__.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py index ea602ad4e5..4f89f96814 100644 --- a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py +++ b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py @@ -57,8 +57,7 @@ def add(x, y): import logging -import celery -from celery import signals +from celery import registry, signals from opentelemetry import trace from opentelemetry.auto_instrumentation.instrumentor import BaseInstrumentor @@ -159,7 +158,7 @@ def _trace_before_publish(self, *args, **kwargs): # execution, so it **must not** be used to retrieve `request` data. task_name = kwargs.get("sender") # pylint: disable=no-member - task = celery.registry.tasks.get(task_name) + task = registry.tasks.get(task_name) task_id = retrieve_task_id(kwargs) if task is None or task_id is None: @@ -185,7 +184,7 @@ def _trace_before_publish(self, *args, **kwargs): def _trace_after_publish(*args, **kwargs): task_name = kwargs.get("sender") # pylint: disable=no-member - task = celery.registry.tasks.get(task_name) + task = registry.tasks.get(task_name) task_id = retrieve_task_id(kwargs) if task is None or task_id is None: logger.debug( From eb05bae2c060418b8be244ac6171b36f389cf4a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Wed, 6 May 2020 14:08:59 -0500 Subject: [PATCH 07/22] make variables name consistent --- .../src/opentelemetry/ext/celery/__init__.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py index 4f89f96814..9c7289b231 100644 --- a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py +++ b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py @@ -111,7 +111,7 @@ def _uninstrument(self, **kwargs): signals.task_retry.disconnect(self._trace_retry) def _trace_prerun(self, *args, **kwargs): - task = kwargs.get("sender") + task = kwargs.get("task") task_id = kwargs.get("task_id") logger.debug("prerun signal start task_id=%s", task_id) if task is None or task_id is None: @@ -129,7 +129,7 @@ def _trace_prerun(self, *args, **kwargs): @staticmethod def _trace_postrun(*args, **kwargs): - task = kwargs.get("sender") + task = kwargs.get("task") task_id = kwargs.get("task_id") logger.debug("postrun signal task_id=%s", task_id) if task is None or task_id is None: @@ -156,9 +156,8 @@ def _trace_postrun(*args, **kwargs): def _trace_before_publish(self, *args, **kwargs): # The `Task` instance **does not** include any information about the current # execution, so it **must not** be used to retrieve `request` data. - task_name = kwargs.get("sender") # pylint: disable=no-member - task = registry.tasks.get(task_name) + task = registry.tasks.get(kwargs.get("sender")) task_id = retrieve_task_id(kwargs) if task is None or task_id is None: @@ -182,9 +181,8 @@ def _trace_before_publish(self, *args, **kwargs): @staticmethod def _trace_after_publish(*args, **kwargs): - task_name = kwargs.get("sender") # pylint: disable=no-member - task = registry.tasks.get(task_name) + task = registry.tasks.get(kwargs.get("sender")) task_id = retrieve_task_id(kwargs) if task is None or task_id is None: logger.debug( From 0b590c6876c52605df5737eab9da3a7a77358735 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Wed, 6 May 2020 14:17:31 -0500 Subject: [PATCH 08/22] handle feedback --- .../src/opentelemetry/ext/celery/__init__.py | 16 ++++++++-------- .../src/opentelemetry/ext/celery/utils.py | 6 +++--- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py index 9c7289b231..7b5f90d326 100644 --- a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py +++ b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py @@ -116,7 +116,7 @@ def _trace_prerun(self, *args, **kwargs): logger.debug("prerun signal start task_id=%s", task_id) if task is None or task_id is None: logger.debug( - "unable to extract the Task and the task_id. This version of Celery may not be supported." + "Unable to extract the Task and the task_id. This version of Celery may not be supported." ) return @@ -134,7 +134,7 @@ def _trace_postrun(*args, **kwargs): logger.debug("postrun signal task_id=%s", task_id) if task is None or task_id is None: logger.debug( - "unable to extract the Task and the task_id. This version of Celery may not be supported." + "Unable to extract the Task and the task_id. This version of Celery may not be supported." ) return @@ -162,7 +162,7 @@ def _trace_before_publish(self, *args, **kwargs): if task is None or task_id is None: logger.debug( - "unable to extract the Task and the task_id. This version of Celery may not be supported." + "Unable to extract the Task and the task_id. This version of Celery may not be supported." ) return @@ -186,7 +186,7 @@ def _trace_after_publish(*args, **kwargs): task_id = retrieve_task_id(kwargs) if task is None or task_id is None: logger.debug( - "unable to extract the Task and the task_id. This version of Celery may not be supported." + "Unable to extract the Task and the task_id. This version of Celery may not be supported." ) return @@ -205,7 +205,7 @@ def _trace_failure(*args, **kwargs): task_id = kwargs.get("task_id") if task is None or task_id is None: logger.debug( - "unable to extract the Task and the task_id. This version of Celery may not be supported." + "Unable to extract the Task and the task_id. This version of Celery may not be supported." ) return @@ -233,14 +233,14 @@ def _trace_retry(*args, **kwargs): context = kwargs.get("request") if task is None or context is None: logger.debug( - "unable to extract the Task or the Context. This version of Celery may not be supported." + "Unable to extract the Task or the Context. This version of Celery may not be supported." ) return reason = kwargs.get("reason") if not reason: logger.debug( - "unable to extract the retry reason. This version of Celery may not be supported." + "Unable to extract the retry reason. This version of Celery may not be supported." ) return @@ -249,6 +249,6 @@ def _trace_retry(*args, **kwargs): return # Add retry reason metadata to span - # DEV: Use `str(reason)` instead of `reason.message` in case we get + # Use `str(reason)` instead of `reason.message` in case we get # something that isn't an `Exception` span.set_attribute(_TASK_RETRY_REASON_KEY, str(reason)) diff --git a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/utils.py b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/utils.py index 0ffdbbcb0e..1b83b2fae4 100644 --- a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/utils.py +++ b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/utils.py @@ -103,7 +103,7 @@ def attach_span(task, task_id, span, is_publish=False): `(task_id, is_publish)` as a key. This is useful when information must be propagated from one Celery signal to another. - DEV: We use (task_id, is_publish) for the key to ensure that publishing a + We use (task_id, is_publish) for the key to ensure that publishing a task from within another task does not cause any conflicts. This mostly happens when either a task fails and a retry policy is in place, @@ -131,7 +131,7 @@ def detach_span(task, task_id, is_publish=False): if span_dict is None: return - # DEV: See note in `attach_span` for key info + # See note in `attach_span` for key info span_dict.pop((task_id, is_publish), (None, None)) @@ -143,7 +143,7 @@ def retrieve_span(task, task_id, is_publish=False): if span_dict is None: return (None, None) - # DEV: See note in `attach_span` for key info + # See note in `attach_span` for key info return span_dict.get((task_id, is_publish), (None, None)) From d8aead4b8ed679a1725348ac60c13700029cf9a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Wed, 6 May 2020 14:18:29 -0500 Subject: [PATCH 09/22] remove unuseful comment --- .../src/opentelemetry/ext/celery/utils.py | 1 - 1 file changed, 1 deletion(-) diff --git a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/utils.py b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/utils.py index 1b83b2fae4..2aa7fb301f 100644 --- a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/utils.py +++ b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/utils.py @@ -71,7 +71,6 @@ def set_attributes_from_context(span, context): span.set_attribute("messaging.destination", routing_key) value = str(value) - # prefix the tag as 'celery' attribute_name = "celery.{}".format(key) if key == "id": From c2b7561f6c135d9454217a750070f3330e481c42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Wed, 6 May 2020 14:20:58 -0500 Subject: [PATCH 10/22] use unittest.fail --- ext/opentelemetry-ext-celery/tests/test_utils.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ext/opentelemetry-ext-celery/tests/test_utils.py b/ext/opentelemetry-ext-celery/tests/test_utils.py index ea0b4b819e..f1129b324e 100644 --- a/ext/opentelemetry-ext-celery/tests/test_utils.py +++ b/ext/opentelemetry-ext-celery/tests/test_utils.py @@ -133,8 +133,7 @@ def fn_task(): utils.retrieve_span(fn_task, task_id), (None, None) ) except Exception as ex: # pylint: disable=broad-except - exception = ex - self.assertIsNone(exception) + self.fail("Exception was raised: %s" % ex) def test_task_id_from_protocol_v1(self): # ensures a `task_id` is properly returned when Protocol v1 is used. From a571304573f65303b5a4c8b5ecd434b2664207e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Wed, 6 May 2020 14:22:56 -0500 Subject: [PATCH 11/22] use instrumentation instead of integration --- ext/opentelemetry-ext-celery/README.rst | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ext/opentelemetry-ext-celery/README.rst b/ext/opentelemetry-ext-celery/README.rst index 3bc37be0d3..432ead43e0 100644 --- a/ext/opentelemetry-ext-celery/README.rst +++ b/ext/opentelemetry-ext-celery/README.rst @@ -1,12 +1,12 @@ -OpenTelemetry Celery Integration -================================ +OpenTelemetry Celery Instrumentation +==================================== |pypi| .. |pypi| image:: https://badge.fury.io/py/opentelemetry-ext-celery.svg :target: https://pypi.org/project/opentelemetry-ext-celery/ -Integration with Celery. +Instrumentation for Celery. Installation @@ -19,6 +19,6 @@ Installation References ---------- -* `OpenTelemetry Celery Integration `_ +* `OpenTelemetry Celery Instrumentation `_ * `OpenTelemetry Project `_ From 3d94aed346b5d2f07cfb702474a105355800b703 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Wed, 6 May 2020 14:23:49 -0500 Subject: [PATCH 12/22] Apply suggestions from code review Co-authored-by: Diego Hurtado --- .../src/opentelemetry/ext/celery/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py index ea602ad4e5..6efc0f0615 100644 --- a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py +++ b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py @@ -17,7 +17,7 @@ There are two options for instrumenting code. The first option is to use the ``opentelemetry-auto-instrumentation`` executable which will automatically instrument your Celery APP. The second is to programmatically enable -instrumentation via the following code: +instrumentation as explained in the following section. .. _celery: https://pypi.org/project/celery/ From ba6db4ec5c0a562325e84513431daa0657b26490 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Wed, 6 May 2020 15:31:52 -0500 Subject: [PATCH 13/22] fix lint --- .../src/opentelemetry/ext/celery/__init__.py | 2 +- ext/opentelemetry-ext-celery/tests/test_utils.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py index 60bc839dd8..742b548992 100644 --- a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py +++ b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py @@ -57,7 +57,7 @@ def add(x, y): import logging -from celery import registry, signals +from celery import registry, signals # pylint: disable=no-name-in-module from opentelemetry import trace from opentelemetry.auto_instrumentation.instrumentor import BaseInstrumentor diff --git a/ext/opentelemetry-ext-celery/tests/test_utils.py b/ext/opentelemetry-ext-celery/tests/test_utils.py index f1129b324e..3b8d2d76cf 100644 --- a/ext/opentelemetry-ext-celery/tests/test_utils.py +++ b/ext/opentelemetry-ext-celery/tests/test_utils.py @@ -125,7 +125,6 @@ def fn_task(): return 42 # delete the Span - exception = None task_id = "7c6731af-9533-40c3-83a9-25b58f0d837f" try: utils.detach_span(fn_task, task_id) From 9dff127cdaa0f8e8a08584be19e16ad2179f5b5a Mon Sep 17 00:00:00 2001 From: "Tahir H. Butt" Date: Thu, 4 Jun 2020 19:00:23 -0400 Subject: [PATCH 14/22] address feedback from PR #648 and fix async tests --- ext/opentelemetry-ext-celery/setup.cfg | 5 +- .../src/opentelemetry/ext/celery/__init__.py | 11 +- .../tests/celery/conftest.py | 92 ++ .../tests/celery/test_celery_functional.py | 919 +++++++++--------- tox.ini | 2 +- 5 files changed, 570 insertions(+), 459 deletions(-) create mode 100644 ext/opentelemetry-ext-docker-tests/tests/celery/conftest.py diff --git a/ext/opentelemetry-ext-celery/setup.cfg b/ext/opentelemetry-ext-celery/setup.cfg index c36e20bc29..261ec4d542 100644 --- a/ext/opentelemetry-ext-celery/setup.cfg +++ b/ext/opentelemetry-ext-celery/setup.cfg @@ -40,12 +40,13 @@ package_dir= =src packages=find_namespace: install_requires = - opentelemetry-api == 0.7.dev0 + opentelemetry-api == 0.9.dev0 celery ~= 4.0 [options.extras_require] test = - opentelemetry-test == 0.7.dev0 + pytest + opentelemetry-test == 0.9.dev0 [options.packages.find] where = src diff --git a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py index 742b548992..09af1ac98d 100644 --- a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py +++ b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py @@ -33,12 +33,8 @@ .. code:: python from opentelemetry import trace - from opentelemetry.sdk.trace import TracerProvider - - trace.set_tracer_provider(TracerProvider()) - # TODO: configure span exporters - from opentelemetry.ext.celery import CeleryInstrumentor + CeleryInstrumentor().instrument() from celery import Celery @@ -56,6 +52,7 @@ def add(x, y): """ import logging +import signal from celery import registry, signals # pylint: disable=no-name-in-module @@ -79,6 +76,8 @@ def add(x, y): _TASK_RUN = "run" _TASK_RETRY_REASON_KEY = "celery.retry.reason" +_TASK_REVOKED_REASON_KEY = "celery.revoked.reason" +_TASK_REVOKED_TERMINATED_SIGNAL_KEY = "celery.terminated.signal" _TASK_NAME_KEY = "celery.task_name" _MESSAGE_ID_ATTRIBUTE_NAME = "messaging.message_id" @@ -120,7 +119,6 @@ def _trace_prerun(self, *args, **kwargs): ) return - # TODO: When the span could be SERVER? span = self._tracer.start_span(task.name, kind=trace.SpanKind.CONSUMER) activation = self._tracer.use_span(span, end_on_exit=True) @@ -166,7 +164,6 @@ def _trace_before_publish(self, *args, **kwargs): ) return - # TODO: When the span could be CLIENT? span = self._tracer.start_span(task.name, kind=trace.SpanKind.PRODUCER) # apply some attributes here because most of the data is not available diff --git a/ext/opentelemetry-ext-docker-tests/tests/celery/conftest.py b/ext/opentelemetry-ext-docker-tests/tests/celery/conftest.py new file mode 100644 index 0000000000..0e6976382e --- /dev/null +++ b/ext/opentelemetry-ext-docker-tests/tests/celery/conftest.py @@ -0,0 +1,92 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from functools import wraps + +import pytest + +from opentelemetry import trace as trace_api +from opentelemetry.ext.celery import CeleryInstrumentor +from opentelemetry.sdk.trace import TracerProvider, export +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) + +REDIS_HOST = os.getenv("REDIS_HOST", "localhost") +REDIS_PORT = int(os.getenv("REDIS_PORT ", "6379")) +REDIS_URL = "redis://{host}:{port}".format(host=REDIS_HOST, port=REDIS_PORT) +BROKER_URL = "{redis}/{db}".format(redis=REDIS_URL, db=0) +BACKEND_URL = "{redis}/{db}".format(redis=REDIS_URL, db=1) + + +@pytest.fixture(scope="session") +def celery_config(): + return {"broker_url": BROKER_URL, "result_backend": BACKEND_URL} + + +@pytest.fixture +def celery_worker_parameters(): + return { + # See https://github.com/celery/celery/issues/3642#issuecomment-457773294 + "perform_ping_check": False, + } + + +@pytest.fixture(autouse=True) +def patch_celery_app(celery_app, celery_worker): + """Patch task decorator on app fixture to reload worker""" + # See https://github.com/celery/celery/issues/3642 + def wrap_task(fn): + @wraps(fn) + def wrapper(*args, **kwargs): + result = fn(*args, **kwargs) + celery_worker.reload() + return result + + return wrapper + + celery_app.task = wrap_task(celery_app.task) + + +@pytest.fixture(autouse=True) +def instrument(tracer_provider, memory_exporter): + CeleryInstrumentor().instrument(tracer_provider=tracer_provider) + memory_exporter.clear() + + yield + + CeleryInstrumentor().uninstrument() + + +@pytest.fixture(scope="session") +def tracer_provider(memory_exporter): + original_tracer_provider = trace_api.get_tracer_provider() + + tracer_provider = TracerProvider() + + span_processor = export.SimpleExportSpanProcessor(memory_exporter) + tracer_provider.add_span_processor(span_processor) + + trace_api.set_tracer_provider(tracer_provider) + + yield tracer_provider + + trace_api.set_tracer_provider(original_tracer_provider) + + +@pytest.fixture(scope="session") +def memory_exporter(): + memory_exporter = InMemorySpanExporter() + return memory_exporter diff --git a/ext/opentelemetry-ext-docker-tests/tests/celery/test_celery_functional.py b/ext/opentelemetry-ext-docker-tests/tests/celery/test_celery_functional.py index d1ca730c09..570a12b758 100644 --- a/ext/opentelemetry-ext-docker-tests/tests/celery/test_celery_functional.py +++ b/ext/opentelemetry-ext-docker-tests/tests/celery/test_celery_functional.py @@ -12,491 +12,512 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os + +import time import celery -from celery import Celery +import pytest +from celery import signals from celery.exceptions import Retry import opentelemetry.ext.celery +from opentelemetry import trace as trace_api from opentelemetry.ext.celery import CeleryInstrumentor from opentelemetry.sdk import resources -from opentelemetry.test.test_base import TestBase +from opentelemetry.sdk.trace import TracerProvider, export from opentelemetry.trace.status import StatusCanonicalCode -REDIS_HOST = os.getenv("REDIS_HOST", "localhost") -REDIS_PORT = int(os.getenv("REDIS_PORT ", "6379")) -REDIS_URL = "redis://{host}:{port}".format(host=REDIS_HOST, port=REDIS_PORT) -BROKER_URL = "{redis}/{db}".format(redis=REDIS_URL, db=0) -BACKEND_URL = "{redis}/{db}".format(redis=REDIS_URL, db=1) - class MyException(Exception): pass -class TestCeleryIntegration(TestBase): - def setUp(self): - super().setUp() - CeleryInstrumentor().instrument() - self.app = Celery( - "celery.test_app", broker=BROKER_URL, backend=BACKEND_URL - ) +def test_instrumentation_info(celery_app, memory_exporter): + @celery_app.task + def fn_task(): + return 42 - def tearDown(self): - with self.disable_logging(): - CeleryInstrumentor().uninstrument() - super().tearDown() + result = fn_task.apply_async() + assert result.get() == 42 - def test_instrumentation_info(self): - @self.app.task - def fn_task(): - return 42 + spans = memory_exporter.get_finished_spans() + assert len(spans) == 2 - fn_task.delay() + async_span, run_span = spans - spans = self.memory_exporter.get_finished_spans() - self.assertEqual(len(spans), 1) + assert ( + async_span.instrumentation_info.name + == opentelemetry.ext.celery.__name__ + ) + assert ( + async_span.instrumentation_info.version + == opentelemetry.ext.celery.__version__ + ) + assert ( + run_span.instrumentation_info.name == opentelemetry.ext.celery.__name__ + ) + assert ( + run_span.instrumentation_info.version + == opentelemetry.ext.celery.__version__ + ) - span = spans[0] - self.check_span_instrumentation_info(span, opentelemetry.ext.celery) - def test_concurrent_delays(self): - # it should create one trace for each delayed execution - @self.app.task - def fn_task(): - return 42 +def test_fn_task_run(celery_app, memory_exporter): + @celery_app.task + def fn_task(): + return 42 - for x in range(100): - fn_task.delay() + t = fn_task.run() + assert t == 42 + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 0 - spans = self.memory_exporter.get_finished_spans() - self.assertEqual(len(spans), 100) - def test_fn_task_run(self): - # the body of the function is not instrumented so calling it - # directly doesn't create a trace - @self.app.task - def fn_task(): - return 42 +def test_fn_task_call(celery_app, memory_exporter): + @celery_app.task + def fn_task(): + return 42 - t = fn_task.run() - self.assertEqual(t, 42) + t = fn_task() + assert t == 42 - spans = self.memory_exporter.get_finished_spans() - self.assertEqual(len(spans), 0) + spans = memory_exporter.get_finished_spans() + assert len(spans) == 0 - def test_fn_task_call(self): - # the body of the function is not instrumented so calling it - # directly doesn't create a trace - @self.app.task - def fn_task(): - return 42 - t = fn_task() - self.assertEqual(t, 42) +def test_fn_task_apply(celery_app, memory_exporter): + @celery_app.task + def fn_task(): + return 42 + + t = fn_task.apply() + assert t.successful() is True + assert t.result == 42 + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + + assert span.status.is_ok is True + assert span.name == "test_celery_functional.fn_task" + assert span.attributes.get("messaging.message_id") == t.task_id + assert ( + span.attributes.get("celery.task_name") + == "test_celery_functional.fn_task" + ) + assert span.attributes.get("celery.action") == "run" + assert span.attributes.get("celery.state") == "SUCCESS" + + +def test_fn_task_apply_bind(celery_app, memory_exporter): + @celery_app.task(bind=True) + def fn_task(self): + return self + + t = fn_task.apply() + assert t.successful() is True + assert "fn_task" in t.result.name + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + + assert span.status.is_ok is True + assert span.name == "test_celery_functional.fn_task" + assert span.attributes.get("messaging.message_id") == t.task_id + assert ( + span.attributes.get("celery.task_name") + == "test_celery_functional.fn_task" + ) + assert span.attributes.get("celery.action") == "run" + assert span.attributes.get("celery.state") == "SUCCESS" + + +def test_fn_task_apply_async(celery_app, memory_exporter): + @celery_app.task + def fn_task_parameters(user, force_logout=False): + return (user, force_logout) + + result = fn_task_parameters.apply_async( + args=["user"], kwargs={"force_logout": True} + ) + assert result.get(timeout=10) == ["user", True] + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 2 + + async_span, run_span = spans + + assert run_span.context.trace_id != async_span.context.trace_id + + assert async_span.status.is_ok is True + assert async_span.name == "test_celery_functional.fn_task_parameters" + assert async_span.attributes.get("celery.action") == "apply_async" + assert async_span.attributes.get("messaging.message_id") == result.task_id + assert ( + async_span.attributes.get("celery.task_name") + == "test_celery_functional.fn_task_parameters" + ) + + assert run_span.status.is_ok is True + assert run_span.name == "test_celery_functional.fn_task_parameters" + assert run_span.attributes.get("celery.action") == "run" + assert run_span.attributes.get("celery.state") == "SUCCESS" + assert run_span.attributes.get("messaging.message_id") == result.task_id + assert ( + run_span.attributes.get("celery.task_name") + == "test_celery_functional.fn_task_parameters" + ) + + +def test_concurrent_delays(celery_app, memory_exporter): + @celery_app.task + def fn_task(): + return 42 + + results = [fn_task.delay() for _ in range(100)] + + for result in results: + assert result.get(timeout=1) == 42 + + spans = memory_exporter.get_finished_spans() + + assert len(spans) == 200 + + +def test_fn_task_delay(celery_app, memory_exporter): + @celery_app.task + def fn_task_parameters(user, force_logout=False): + return (user, force_logout) + + result = fn_task_parameters.delay("user", force_logout=True) + assert result.get(timeout=10) == ["user", True] + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 2 + + async_span, run_span = spans + + assert run_span.context.trace_id != async_span.context.trace_id + + assert async_span.status.is_ok is True + assert async_span.name == "test_celery_functional.fn_task_parameters" + assert async_span.attributes.get("celery.action") == "apply_async" + assert async_span.attributes.get("messaging.message_id") == result.task_id + assert ( + async_span.attributes.get("celery.task_name") + == "test_celery_functional.fn_task_parameters" + ) - spans = self.memory_exporter.get_finished_spans() - self.assertEqual(len(spans), 0) + assert run_span.status.is_ok is True + assert run_span.name == "test_celery_functional.fn_task_parameters" + assert run_span.attributes.get("celery.action") == "run" + assert run_span.attributes.get("celery.state") == "SUCCESS" + assert run_span.attributes.get("messaging.message_id") == result.task_id + assert ( + run_span.attributes.get("celery.task_name") + == "test_celery_functional.fn_task_parameters" + ) - def test_fn_task_apply(self): - # it should execute a traced task with a returning value - @self.app.task - def fn_task(): + +def test_fn_exception(celery_app, memory_exporter): + @celery_app.task + def fn_exception(): + raise Exception("Task class is failing") + + result = fn_exception.apply() + + assert result.failed() is True + assert "Task class is failing" in result.traceback + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + + assert span.status.is_ok is False + assert span.name == "test_celery_functional.fn_exception" + assert span.attributes.get("celery.action") == "run" + assert span.attributes.get("celery.state") == "FAILURE" + assert ( + span.attributes.get("celery.task_name") + == "test_celery_functional.fn_exception" + ) + assert span.status.canonical_code == StatusCanonicalCode.UNKNOWN + assert span.attributes.get("messaging.message_id") == result.task_id + assert "Task class is failing" in span.status.description + + +def test_fn_exception_expected(celery_app, memory_exporter): + @celery_app.task(throws=(MyException,)) + def fn_exception(): + raise MyException("Task class is failing") + + result = fn_exception.apply() + + assert result.failed() is True + assert "Task class is failing" in result.traceback + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + + assert span.status.is_ok is True + assert span.status.canonical_code == StatusCanonicalCode.OK + assert span.name == "test_celery_functional.fn_exception" + assert span.attributes.get("celery.action") == "run" + assert span.attributes.get("celery.state") == "FAILURE" + assert ( + span.attributes.get("celery.task_name") + == "test_celery_functional.fn_exception" + ) + assert span.attributes.get("messaging.message_id") == result.task_id + + +def test_fn_retry_exception(celery_app, memory_exporter): + @celery_app.task + def fn_exception(): + raise Retry("Task class is being retried") + + result = fn_exception.apply() + + assert result.failed() is False + assert "Task class is being retried" in result.traceback + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + + assert span.status.is_ok is True + assert span.status.canonical_code == StatusCanonicalCode.OK + assert span.name == "test_celery_functional.fn_exception" + assert span.attributes.get("celery.action") == "run" + assert span.attributes.get("celery.state") == "RETRY" + assert ( + span.attributes.get("celery.task_name") + == "test_celery_functional.fn_exception" + ) + assert span.attributes.get("messaging.message_id") == result.task_id + + +def test_class_task(celery_app, memory_exporter): + class BaseTask(celery_app.Task): + def run(self): return 42 - t = fn_task.apply() - self.assertTrue(t.successful()) - self.assertEqual(t.result, 42) - - spans = self.memory_exporter.get_finished_spans() - self.assertEqual(len(spans), 1) - - span = spans[0] - - self.assertTrue(span.status.is_ok) - self.assertEqual(span.name, "test_celery_functional.fn_task") - self.assertEqual( - span.attributes.get("messaging.message_id"), t.task_id - ) - self.assertEqual( - span.attributes.get("celery.task_name"), - "test_celery_functional.fn_task", - ) - self.assertEqual(span.attributes.get("celery.action"), "run") - self.assertEqual(span.attributes.get("celery.state"), "SUCCESS") - - def test_fn_task_apply_bind(self): - # it should execute a traced task with a returning value - @self.app.task(bind=True) - def fn_task(self): - return self - - t = fn_task.apply() - self.assertTrue(t.successful()) - self.assertIn("fn_task", t.result.name) - - spans = self.memory_exporter.get_finished_spans() - self.assertEqual(len(spans), 1) - - span = spans[0] - - self.assertTrue(span.status.is_ok) - self.assertEqual(span.name, "test_celery_functional.fn_task") - self.assertEqual( - span.attributes.get("messaging.message_id"), t.task_id - ) - self.assertEqual( - span.attributes.get("celery.task_name"), - "test_celery_functional.fn_task", - ) - self.assertEqual(span.attributes.get("celery.action"), "run") - self.assertEqual(span.attributes.get("celery.state"), "SUCCESS") - - def test_fn_task_apply_async(self): - # it should execute a traced async task that has parameters - @self.app.task - def fn_task_parameters(user, force_logout=False): - return (user, force_logout) - - t = fn_task_parameters.apply_async( - args=["user"], kwargs={"force_logout": True} - ) - self.assertEqual("PENDING", t.status) - - spans = self.memory_exporter.get_finished_spans() - self.assertEqual(len(spans), 1) - - span = spans[0] - - self.assertTrue(span.status.is_ok) - self.assertEqual( - span.name, "test_celery_functional.fn_task_parameters" - ) - self.assertEqual( - span.attributes.get("messaging.message_id"), t.task_id - ) - self.assertEqual( - span.attributes.get("messaging.destination"), "celery" - ) - self.assertEqual( - span.attributes.get("celery.task_name"), - "test_celery_functional.fn_task_parameters", - ) - self.assertEqual(span.attributes.get("celery.action"), "apply_async") - - def test_fn_task_delay(self): - # using delay shorthand must preserve arguments - @self.app.task - def fn_task_parameters(user, force_logout=False): - return (user, force_logout) - - t = fn_task_parameters.delay("user", force_logout=True) - self.assertEqual(t.status, "PENDING") - - spans = self.memory_exporter.get_finished_spans() - self.assertEqual(len(spans), 1) - - span = spans[0] - - self.assertTrue(span.status.is_ok) - self.assertEqual( - span.name, "test_celery_functional.fn_task_parameters" - ) - self.assertEqual( - span.attributes.get("messaging.message_id"), t.task_id - ) - self.assertEqual( - span.attributes.get("messaging.destination"), "celery" - ) - self.assertEqual( - span.attributes.get("celery.task_name"), - "test_celery_functional.fn_task_parameters", - ) - self.assertEqual(span.attributes.get("celery.action"), "apply_async") - - def test_fn_exception(self): - # it should catch exceptions in task functions - @self.app.task - def fn_exception(): + task = BaseTask() + # register the Task class if it's available (required in Celery 4.0+) + register_task = getattr(celery_app, "register_task", None) + if register_task is not None: + register_task(task) + + result = task.apply() + + assert result.successful() is True + assert result.result == 42 + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + + assert span.status.is_ok is True + assert span.name == "test_celery_functional.BaseTask" + assert ( + span.attributes.get("celery.task_name") + == "test_celery_functional.BaseTask" + ) + assert span.attributes.get("celery.action") == "run" + assert span.attributes.get("celery.state") == "SUCCESS" + assert span.attributes.get("messaging.message_id") == result.task_id + + +def test_class_task_exception(celery_app, memory_exporter): + class BaseTask(celery_app.Task): + def run(self): raise Exception("Task class is failing") - t = fn_exception.apply() - self.assertTrue(t.failed()) - self.assertIn("Task class is failing", t.traceback) - - spans = self.memory_exporter.get_finished_spans() - self.assertEqual(len(spans), 1) - - span = spans[0] - - self.assertEqual(span.name, "test_celery_functional.fn_exception") - self.assertEqual( - span.attributes.get("messaging.message_id"), t.task_id - ) - self.assertEqual( - span.attributes.get("celery.task_name"), - "test_celery_functional.fn_exception", - ) - self.assertEqual(span.attributes.get("celery.action"), "run") - self.assertEqual(span.attributes.get("celery.state"), "FAILURE") - - self.assertIs(span.status.canonical_code, StatusCanonicalCode.UNKNOWN) - self.assertIn("Task class is failing", span.status.description) - - def test_fn_exception_expected(self): - # it should catch exceptions in task functions - @self.app.task(throws=(MyException,)) - def fn_exception(): - raise MyException("Task class is failing") + task = BaseTask() + # register the Task class if it's available (required in Celery 4.0+) + register_task = getattr(celery_app, "register_task", None) + if register_task is not None: + register_task(task) - t = fn_exception.apply() - self.assertTrue(t.failed()) - self.assertIn("Task class is failing", t.traceback) - - spans = self.memory_exporter.get_finished_spans() - self.assertEqual(len(spans), 1) - - span = spans[0] - - self.assertEqual(span.name, "test_celery_functional.fn_exception") - self.assertEqual( - span.attributes.get("messaging.message_id"), t.task_id - ) - self.assertEqual( - span.attributes.get("celery.task_name"), - "test_celery_functional.fn_exception", - ) - self.assertEqual(span.attributes.get("celery.action"), "run") - self.assertEqual(span.attributes.get("celery.state"), "FAILURE") - - self.assertTrue(span.status.is_ok) - - def test_fn_retry_exception(self): - # it should not catch retry exceptions in task functions - @self.app.task - def fn_exception(): - raise Retry("Task class is being retried") - - t = fn_exception.apply() - self.assertFalse(t.failed()) - self.assertIn("Task class is being retried", t.traceback) - - spans = self.memory_exporter.get_finished_spans() - self.assertEqual(len(spans), 1) - - span = spans[0] - - self.assertEqual(span.name, "test_celery_functional.fn_exception") - self.assertEqual( - span.attributes.get("messaging.message_id"), t.task_id - ) - self.assertEqual( - span.attributes.get("celery.task_name"), - "test_celery_functional.fn_exception", - ) - self.assertEqual(span.attributes.get("celery.action"), "run") - self.assertEqual(span.attributes.get("celery.state"), "RETRY") - - # This type of retrying should not be marked as an exceptio - self.assertTrue(span.status.is_ok) - - def test_class_task(self): - # it should execute class based tasks with a returning value - class BaseTask(self.app.Task): - def run(self): - return 42 - - t = BaseTask() - # register the Task class if it's available (required in Celery 4.0+) - register_task = getattr(self.app, "register_task", None) - if register_task is not None: - register_task(t) - - r = t.apply() - self.assertTrue(r.successful()) - self.assertEqual(r.result, 42) - - spans = self.memory_exporter.get_finished_spans() - self.assertEqual(len(spans), 1) - - span = spans[0] - - self.assertTrue(span.status.is_ok) - self.assertEqual(span.name, "test_celery_functional.BaseTask") - self.assertEqual( - span.attributes.get("messaging.message_id"), r.task_id - ) - self.assertEqual( - span.attributes.get("celery.task_name"), - "test_celery_functional.BaseTask", - ) - self.assertEqual(span.attributes.get("celery.action"), "run") - self.assertEqual(span.attributes.get("celery.state"), "SUCCESS") - - def test_class_task_exception(self): - # it should catch exceptions in class based tasks - class BaseTask(self.app.Task): - def run(self): - raise Exception("Task class is failing") - - t = BaseTask() - # register the Task class if it's available (required in Celery 4.0+) - register_task = getattr(self.app, "register_task", None) - if register_task is not None: - register_task(t) - - r = t.apply() - self.assertTrue(r.failed()) - self.assertIn("Task class is failing", r.traceback) - - spans = self.memory_exporter.get_finished_spans() - self.assertEqual(len(spans), 1) - - span = spans[0] - - self.assertEqual(span.name, "test_celery_functional.BaseTask") - self.assertEqual( - span.attributes.get("messaging.message_id"), r.task_id - ) - self.assertEqual( - span.attributes.get("celery.task_name"), - "test_celery_functional.BaseTask", - ) - self.assertEqual(span.attributes.get("celery.action"), "run") - self.assertEqual(span.attributes.get("celery.state"), "FAILURE") - - self.assertIs(span.status.canonical_code, StatusCanonicalCode.UNKNOWN) - self.assertIn("Task class is failing", span.status.description) - - def test_class_task_exception_expected(self): - # it should catch exceptions in class based tasks - class BaseTask(self.app.Task): - throws = (MyException,) - - def run(self): - raise MyException("Task class is failing") - - t = BaseTask() - # register the Task class if it's available (required in Celery 4.0+) - register_task = getattr(self.app, "register_task", None) - if register_task is not None: - register_task(t) - - r = t.apply() - self.assertTrue(r.failed()) - self.assertIn("Task class is failing", r.traceback) - - spans = self.memory_exporter.get_finished_spans() - self.assertEqual(len(spans), 1) - - span = spans[0] - - self.assertEqual(span.name, "test_celery_functional.BaseTask") - self.assertEqual( - span.attributes.get("messaging.message_id"), r.task_id - ) - self.assertEqual( - span.attributes.get("celery.task_name"), - "test_celery_functional.BaseTask", - ) - self.assertEqual(span.attributes.get("celery.action"), "run") - self.assertEqual(span.attributes.get("celery.state"), "FAILURE") - - self.assertTrue(span.status.is_ok) - - def test_shared_task(self): - # Ensure Django Shared Task are supported - @celery.shared_task - def add(x, y): - return x + y - - res = add.apply([2, 2]) - self.assertEqual(res.result, 4) - - spans = self.memory_exporter.get_finished_spans() - self.assertEqual(len(spans), 1) - - span = spans[0] - - self.assertTrue(span.status.is_ok) - self.assertEqual(span.name, "test_celery_functional.add") - self.assertEqual( - span.attributes.get("celery.task_name"), - "test_celery_functional.add", - ) - self.assertEqual( - span.attributes.get("messaging.message_id"), res.task_id - ) - self.assertEqual(span.attributes.get("celery.action"), "run") - self.assertEqual(span.attributes.get("celery.state"), "SUCCESS") - - def test_apply_async_previous_style_tasks(self): - # ensures apply_async is properly patched if Celery 1.0 style tasks - # are used even in newer versions. This should extend support to - # previous versions of Celery. - class CelerySuperClass(celery.task.Task): - abstract = True - - @classmethod - def apply_async(cls, args=None, kwargs=None, **kwargs_): - return super(CelerySuperClass, cls).apply_async( - args=args, kwargs=kwargs, **kwargs_ - ) - - def run(self, *args, **kwargs): - if "stop" in kwargs: - # avoid call loop - return - CelerySubClass.apply_async(args=[], kwargs={"stop": True}) - - class CelerySubClass(CelerySuperClass): - pass - - t = CelerySubClass() - res = t.apply() - - spans = self.memory_exporter.get_finished_spans() - self.assertEqual(len(spans), 2) - - span = spans[1] - self.assertTrue(span.status.is_ok) - self.assertEqual(span.name, "test_celery_functional.CelerySubClass") - self.assertEqual( - span.attributes.get("messaging.message_id"), res.task_id - ) - self.assertEqual( - span.attributes.get("celery.task_name"), - "test_celery_functional.CelerySubClass", - ) - self.assertEqual(span.attributes.get("celery.action"), "run") - self.assertEqual(span.attributes.get("celery.state"), "SUCCESS") - - span = spans[0] - self.assertTrue(span.status.is_ok) - self.assertEqual(span.name, "test_celery_functional.CelerySubClass") - self.assertEqual( - span.attributes.get("celery.task_name"), - "test_celery_functional.CelerySubClass", - ) - self.assertEqual(span.attributes.get("celery.action"), "apply_async") - - def test_custom_tracer_provider(self): - @self.app.task - def fn_task(): - return 42 + result = task.apply() - resource = resources.Resource.create({}) - result = self.create_tracer_provider(resource=resource) - tracer_provider, exporter = result + assert result.failed() is True + assert "Task class is failing" in result.traceback - CeleryInstrumentor().uninstrument() - CeleryInstrumentor().instrument(tracer_provider=tracer_provider) + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 - fn_task.delay() + span = spans[0] - spans_list = exporter.get_finished_spans() - self.assertEqual(len(spans_list), 1) - span = spans_list[0] + assert span.status.is_ok is False + assert span.name == "test_celery_functional.BaseTask" + assert ( + span.attributes.get("celery.task_name") + == "test_celery_functional.BaseTask" + ) + assert span.attributes.get("celery.action") == "run" + assert span.attributes.get("celery.state") == "FAILURE" + assert span.status.canonical_code == StatusCanonicalCode.UNKNOWN + assert span.attributes.get("messaging.message_id") == result.task_id + assert "Task class is failing" in span.status.description + + +def test_class_task_exception_excepted(celery_app, memory_exporter): + class BaseTask(celery_app.Task): + throws = (MyException,) + + def run(self): + raise MyException("Task class is failing") - self.assertIs(span.resource, resource) + task = BaseTask() + # register the Task class if it's available (required in Celery 4.0+) + register_task = getattr(celery_app, "register_task", None) + if register_task is not None: + register_task(task) + + result = task.apply() + + assert result.failed() is True + assert "Task class is failing" in result.traceback + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + + assert span.status.is_ok is True + assert span.status.canonical_code == StatusCanonicalCode.OK + assert span.name == "test_celery_functional.BaseTask" + assert span.attributes.get("celery.action") == "run" + assert span.attributes.get("celery.state") == "FAILURE" + assert span.attributes.get("messaging.message_id") == result.task_id + + +def test_shared_task(celery_app, memory_exporter): + """Ensure Django Shared Task are supported""" + + @celery.shared_task + def add(x, y): + return x + y + + result = add.apply([2, 2]) + assert result.result == 4 + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + + assert span.status.is_ok is True + assert span.name == "test_celery_functional.add" + assert ( + span.attributes.get("celery.task_name") == "test_celery_functional.add" + ) + assert span.attributes.get("celery.action") == "run" + assert span.attributes.get("celery.state") == "SUCCESS" + assert span.attributes.get("messaging.message_id") == result.task_id + + +def test_apply_async_previous_style_tasks( + celery_app, celery_worker, memory_exporter +): + """Ensures apply_async is properly patched if Celery 1.0 style tasks are + used even in newer versions. This should extend support to previous versions + of Celery.""" + + class CelerySuperClass(celery.task.Task): + abstract = True + + @classmethod + def apply_async(cls, args=None, kwargs=None, **kwargs_): + return super(CelerySuperClass, cls).apply_async( + args=args, kwargs=kwargs, **kwargs_ + ) + + def run(self, *args, **kwargs): + if "stop" in kwargs: + # avoid call loop + return + CelerySubClass.apply_async(args=[], kwargs={"stop": True}).get( + timeout=10 + ) + + class CelerySubClass(CelerySuperClass): + pass + + celery_worker.reload() + + task = CelerySubClass() + result = task.apply() + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 3 + + async_span, async_run_span, run_span = spans + + assert run_span.status.is_ok is True + assert run_span.name == "test_celery_functional.CelerySubClass" + assert ( + run_span.attributes.get("celery.task_name") + == "test_celery_functional.CelerySubClass" + ) + assert run_span.attributes.get("celery.action") == "run" + assert run_span.attributes.get("celery.state") == "SUCCESS" + assert run_span.attributes.get("messaging.message_id") == result.task_id + + assert async_run_span.status.is_ok is True + assert async_run_span.name == "test_celery_functional.CelerySubClass" + assert ( + async_run_span.attributes.get("celery.task_name") + == "test_celery_functional.CelerySubClass" + ) + assert async_run_span.attributes.get("celery.action") == "run" + assert async_run_span.attributes.get("celery.state") == "SUCCESS" + assert ( + async_run_span.attributes.get("messaging.message_id") != result.task_id + ) + + assert async_span.status.is_ok is True + assert async_span.name == "test_celery_functional.CelerySubClass" + assert ( + async_span.attributes.get("celery.task_name") + == "test_celery_functional.CelerySubClass" + ) + assert async_span.attributes.get("celery.action") == "apply_async" + assert async_span.attributes.get("messaging.message_id") != result.task_id + assert async_span.attributes.get( + "messaging.message_id" + ) == async_run_span.attributes.get("messaging.message_id") + + +def test_custom_tracer_provider(celery_app, memory_exporter): + @celery_app.task + def fn_task(): + return 42 + + resource = resources.Resource.create({}) + tracer_provider = TracerProvider(resource=resource) + span_processor = export.SimpleExportSpanProcessor(memory_exporter) + tracer_provider.add_span_processor(span_processor) + + trace_api.set_tracer_provider(tracer_provider) + + CeleryInstrumentor().uninstrument() + CeleryInstrumentor().instrument(tracer_provider=tracer_provider) + + fn_task.delay() + + spans_list = memory_exporter.get_finished_spans() + assert len(spans_list) == 1 + + span = spans_list[0] + assert span.resource == resource diff --git a/tox.ini b/tox.ini index 9bf37fa3ae..ca1d20cfda 100644 --- a/tox.ini +++ b/tox.ini @@ -297,7 +297,7 @@ deps = psycopg2-binary ~= 2.8.4 sqlalchemy ~= 1.3.16 redis ~= 3.3.11 - celery ~= 4.0 + celery ~= 4.0, != 4.4.4 changedir = ext/opentelemetry-ext-docker-tests/tests From e433608aada2274f101b110e8135a7ab9bfe679c Mon Sep 17 00:00:00 2001 From: "Tahir H. Butt" Date: Thu, 4 Jun 2020 19:37:47 -0400 Subject: [PATCH 15/22] update changelog --- ext/opentelemetry-ext-celery/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ext/opentelemetry-ext-celery/CHANGELOG.md b/ext/opentelemetry-ext-celery/CHANGELOG.md index 66c8fa4fd1..dd6bf18c9d 100644 --- a/ext/opentelemetry-ext-celery/CHANGELOG.md +++ b/ext/opentelemetry-ext-celery/CHANGELOG.md @@ -2,4 +2,4 @@ ## Unreleased -- Implement Celery integration ([#648](https://github.com/open-telemetry/opentelemetry-python/pull/648)) +- Add instrumentation for Celery ([#780](https://github.com/open-telemetry/opentelemetry-python/pull/780)) From 2d559b956203bada047de3cdb76bdd9b8808b0ab Mon Sep 17 00:00:00 2001 From: "Tahir H. Butt" Date: Fri, 5 Jun 2020 13:29:53 -0400 Subject: [PATCH 16/22] Apply suggestions from code review Co-authored-by: alrex --- docs/ext/celery/celery.rst | 4 ++-- .../src/opentelemetry/ext/celery/version.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/ext/celery/celery.rst b/docs/ext/celery/celery.rst index b94093ae08..125233e006 100644 --- a/docs/ext/celery/celery.rst +++ b/docs/ext/celery/celery.rst @@ -1,5 +1,5 @@ -OpenTelemetry Celery Integration -================================ +OpenTelemetry Celery Instrumentation +==================================== .. automodule:: opentelemetry.ext.celery :members: diff --git a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/version.py b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/version.py index 86c61362ab..603bf0b7e5 100644 --- a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/version.py +++ b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "0.7.dev0" +__version__ = "0.9.dev0" From 03cf262a21aee2e1bec236ca866df06d0c41d5d5 Mon Sep 17 00:00:00 2001 From: "Tahir H. Butt" Date: Mon, 15 Jun 2020 09:51:33 -0400 Subject: [PATCH 17/22] Apply suggestions from code review Co-authored-by: Diego Hurtado --- .../src/opentelemetry/ext/celery/__init__.py | 2 +- .../src/opentelemetry/ext/celery/utils.py | 4 ++-- tox.ini | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py index 09af1ac98d..7648da53a0 100644 --- a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py +++ b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py @@ -15,7 +15,7 @@ Instrument `celery`_ to report Celery APP operations. There are two options for instrumenting code. The first option is to use the -``opentelemetry-auto-instrumentation`` executable which will automatically +``opentelemetry-instrument`` executable which will automatically instrument your Celery APP. The second is to programmatically enable instrumentation as explained in the following section. diff --git a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/utils.py b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/utils.py index 2aa7fb301f..549a87d98a 100644 --- a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/utils.py +++ b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/utils.py @@ -67,7 +67,7 @@ def set_attributes_from_context(span, context): if key == "delivery_info": # Get also destination from this routing_key = value.get("routing_key") - if routing_key: + if routing_key is not None: span.set_attribute("messaging.destination", routing_key) value = str(value) @@ -154,7 +154,7 @@ def retrieve_task_id(context): """ headers = context.get("headers") body = context.get("body") - if headers: + if headers is not None: # Protocol Version 2 (default from Celery 4.0) return headers.get("id") # Protocol Version 1 diff --git a/tox.ini b/tox.ini index d46de2b347..30a29565e9 100644 --- a/tox.ini +++ b/tox.ini @@ -193,7 +193,7 @@ commands_pre = getting-started: pip install -e {toxinidir}/opentelemetry-auto-instrumentation -e {toxinidir}/ext/opentelemetry-ext-requests -e {toxinidir}/ext/opentelemetry-ext-wsgi -e {toxinidir}/ext/opentelemetry-ext-flask - celery: pip install {toxinidir}/opentelemetry-auto-instrumentation + celery: pip install {toxinidir}/opentelemetry-instrument celery: pip install {toxinidir}/ext/opentelemetry-ext-celery[test] grpc: pip install {toxinidir}/ext/opentelemetry-ext-grpc[test] From 074299f4ca5dc168848f071c230f7282e41eb419 Mon Sep 17 00:00:00 2001 From: "Tahir H. Butt" Date: Tue, 16 Jun 2020 12:52:50 -0400 Subject: [PATCH 18/22] refactor utils --- ext/opentelemetry-ext-celery/setup.cfg | 1 + .../src/opentelemetry/ext/celery/__init__.py | 96 +++++++------------ .../src/opentelemetry/ext/celery/utils.py | 67 ++++++++++++- tox.ini | 3 +- 4 files changed, 101 insertions(+), 66 deletions(-) diff --git a/ext/opentelemetry-ext-celery/setup.cfg b/ext/opentelemetry-ext-celery/setup.cfg index 81a95e4ecd..9e500f1ce8 100644 --- a/ext/opentelemetry-ext-celery/setup.cfg +++ b/ext/opentelemetry-ext-celery/setup.cfg @@ -40,6 +40,7 @@ package_dir= packages=find_namespace: install_requires = opentelemetry-api == 0.10.dev0 + opentelemetry-instrumentation == 0.10.dev0 celery ~= 4.0 [options.extras_require] diff --git a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py index 7648da53a0..b808e32887 100644 --- a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py +++ b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py @@ -54,17 +54,11 @@ def add(x, y): import logging import signal -from celery import registry, signals # pylint: disable=no-name-in-module +from celery import signals # pylint: disable=no-name-in-module from opentelemetry import trace -from opentelemetry.auto_instrumentation.instrumentor import BaseInstrumentor -from opentelemetry.ext.celery.utils import ( - attach_span, - detach_span, - retrieve_span, - retrieve_task_id, - set_attributes_from_context, -) +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.ext.celery import utils from opentelemetry.ext.celery.version import __version__ from opentelemetry.trace.status import Status, StatusCanonicalCode @@ -110,58 +104,53 @@ def _uninstrument(self, **kwargs): signals.task_retry.disconnect(self._trace_retry) def _trace_prerun(self, *args, **kwargs): - task = kwargs.get("task") - task_id = kwargs.get("task_id") - logger.debug("prerun signal start task_id=%s", task_id) + task = utils.signal_retrieve_task(kwargs) + task_id = utils.signal_retrieve_task_id(kwargs) + if task is None or task_id is None: - logger.debug( - "Unable to extract the Task and the task_id. This version of Celery may not be supported." - ) return + logger.debug("prerun signal start task_id=%s", task_id) + span = self._tracer.start_span(task.name, kind=trace.SpanKind.CONSUMER) activation = self._tracer.use_span(span, end_on_exit=True) activation.__enter__() - attach_span(task, task_id, (span, activation)) + utils.attach_span(task, task_id, (span, activation)) @staticmethod def _trace_postrun(*args, **kwargs): - task = kwargs.get("task") - task_id = kwargs.get("task_id") - logger.debug("postrun signal task_id=%s", task_id) + task = utils.signal_retrieve_task(kwargs) + task_id = utils.signal_retrieve_task_id(kwargs) + if task is None or task_id is None: - logger.debug( - "Unable to extract the Task and the task_id. This version of Celery may not be supported." - ) return + logger.debug("postrun signal task_id=%s", task_id) + # retrieve and finish the Span - span, activation = retrieve_span(task, task_id) + span, activation = utils.retrieve_span(task, task_id) if span is None: logger.warning("no existing span found for task_id=%s", task_id) return # request context tags span.set_attribute(_TASK_TAG_KEY, _TASK_RUN) - set_attributes_from_context(span, kwargs) - set_attributes_from_context(span, task.request) + utils.set_attributes_from_context(span, kwargs) + utils.set_attributes_from_context(span, task.request) span.set_attribute(_TASK_NAME_KEY, task.name) activation.__exit__(None, None, None) - detach_span(task, task_id) + utils.detach_span(task, task_id) def _trace_before_publish(self, *args, **kwargs): # The `Task` instance **does not** include any information about the current # execution, so it **must not** be used to retrieve `request` data. # pylint: disable=no-member - task = registry.tasks.get(kwargs.get("sender")) - task_id = retrieve_task_id(kwargs) + task = utils.signal_retrieve_task_from_sender(kwargs) + task_id = utils.signal_retrieve_task_id_from_message(kwargs) if task is None or task_id is None: - logger.debug( - "Unable to extract the Task and the task_id. This version of Celery may not be supported." - ) return span = self._tracer.start_span(task.name, kind=trace.SpanKind.PRODUCER) @@ -170,44 +159,39 @@ def _trace_before_publish(self, *args, **kwargs): span.set_attribute(_TASK_TAG_KEY, _TASK_APPLY_ASYNC) span.set_attribute(_MESSAGE_ID_ATTRIBUTE_NAME, task_id) span.set_attribute(_TASK_NAME_KEY, task.name) - set_attributes_from_context(span, kwargs) + utils.set_attributes_from_context(span, kwargs) activation = self._tracer.use_span(span, end_on_exit=True) activation.__enter__() - attach_span(task, task_id, (span, activation), is_publish=True) + utils.attach_span(task, task_id, (span, activation), is_publish=True) @staticmethod def _trace_after_publish(*args, **kwargs): - # pylint: disable=no-member - task = registry.tasks.get(kwargs.get("sender")) - task_id = retrieve_task_id(kwargs) + task = utils.signal_retrieve_task_from_sender(kwargs) + task_id = utils.signal_retrieve_task_id_from_message(kwargs) + if task is None or task_id is None: - logger.debug( - "Unable to extract the Task and the task_id. This version of Celery may not be supported." - ) return # retrieve and finish the Span - _, activation = retrieve_span(task, task_id, is_publish=True) + _, activation = utils.retrieve_span(task, task_id, is_publish=True) if activation is None: logger.warning("no existing span found for task_id=%s", task_id) return activation.__exit__(None, None, None) - detach_span(task, task_id, is_publish=True) + utils.detach_span(task, task_id, is_publish=True) @staticmethod def _trace_failure(*args, **kwargs): - task = kwargs.get("sender") - task_id = kwargs.get("task_id") + task = utils.signal_retrieve_task_from_sender(kwargs) + task_id = utils.signal_retrieve_task_id(kwargs) + if task is None or task_id is None: - logger.debug( - "Unable to extract the Task and the task_id. This version of Celery may not be supported." - ) return # retrieve and pass exception info to activation - span, _ = retrieve_span(task, task_id) + span, _ = utils.retrieve_span(task, task_id) if span is None: return @@ -226,22 +210,14 @@ def _trace_failure(*args, **kwargs): @staticmethod def _trace_retry(*args, **kwargs): - task = kwargs.get("sender") - context = kwargs.get("request") - if task is None or context is None: - logger.debug( - "Unable to extract the Task or the Context. This version of Celery may not be supported." - ) - return + task = utils.signal_retrieve_task_from_sender(kwargs) + task_id = utils.signal_retrieve_task_id_from_request(kwargs) + reason = utils.signal_retrieve_reason(kwargs) - reason = kwargs.get("reason") - if not reason: - logger.debug( - "Unable to extract the retry reason. This version of Celery may not be supported." - ) + if task is None or task_id is None or reason is None: return - span, _ = retrieve_span(task, context.id) + span, _ = utils.retrieve_span(task, task_id) if span is None: return diff --git a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/utils.py b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/utils.py index 787e8b9114..501eab1d02 100644 --- a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/utils.py +++ b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/utils.py @@ -12,6 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + +import celery + +logger = logging.getLogger(__name__) + # Celery Context key CTX_KEY = "__otel_task_span" @@ -148,16 +154,69 @@ def retrieve_span(task, task_id, is_publish=False): return span_dict.get((task_id, is_publish), (None, None)) -def retrieve_task_id(context): +def signal_retrieve_task(kwargs): + task = kwargs.get("task") + if task is None: + logger.debug("Unable to retrieve task from signal arguments") + return task + + +def signal_retrieve_task_from_sender(kwargs): + sender = kwargs.get("sender") + if sender is None: + logger.debug("Unable to retrieve the sender from signal arguments") + return + + # before and after publish signals sender is the task name + # for retry and failure signals sender is the task object + if isinstance(sender, str): + sender = celery.registry.tasks.get(sender) + if sender is None: + logger.debug("Unable to retrieve the task from sender=%s", sender) + return + + return sender + + +def signal_retrieve_task_id(kwargs): + task_id = kwargs.get("task_id") + if task_id is None: + logger.debug("Unable to retrieve task_id from signal arguments") + return task_id + + +def signal_retrieve_task_id_from_request(kwargs): + # retry signal does not include task_id as argument so use request argument + request = kwargs.get("request") + if request is None: + logger.debug("Unable to retrieve the request from signal arguments") + + task_id = getattr(request, "id") + if task_id is None: + logger.debug("Unable to retrieve the task_id from the request") + + return task_id + + +def signal_retrieve_task_id_from_message(kwargs): """Helper to retrieve the `Task` identifier from the message `body`. This helper supports Protocol Version 1 and 2. The Protocol is well detailed in the official documentation: http://docs.celeryproject.org/en/latest/internals/protocol.html """ - headers = context.get("headers") - body = context.get("body") - if headers is not None: + headers = kwargs.get("headers") + body = kwargs.get("body") + if headers is not None and len(headers) > 0: # Protocol Version 2 (default from Celery 4.0) return headers.get("id") # Protocol Version 1 return body.get("id") + + + + +def signal_retrieve_reason(kwargs): + reason = kwargs.get("reason") + if not reason: + logger.debug("Unable to retrieve the retry reason") + return reason diff --git a/tox.ini b/tox.ini index 375d3ee92b..2798d78614 100644 --- a/tox.ini +++ b/tox.ini @@ -144,7 +144,7 @@ envlist = pypy3-test-ext-redis ; opentelemetry-ext-celery - py3{4,5,6,7,8}-test-ext-celery + py3{5,6,7,8}-test-ext-celery pypy3-test-ext-celery ; opentelemetry-ext-system-metrics @@ -232,7 +232,6 @@ commands_pre = getting-started: pip install -e {toxinidir}/opentelemetry-instrumentation -e {toxinidir}/ext/opentelemetry-ext-requests -e {toxinidir}/ext/opentelemetry-ext-wsgi -e {toxinidir}/ext/opentelemetry-ext-flask - celery: pip install {toxinidir}/opentelemetry-instrument celery: pip install {toxinidir}/ext/opentelemetry-ext-celery[test] grpc: pip install {toxinidir}/ext/opentelemetry-ext-grpc[test] From 29abf3c3fdf20dd7c06485cc1fad8704878cee52 Mon Sep 17 00:00:00 2001 From: "Tahir H. Butt" Date: Tue, 16 Jun 2020 12:59:38 -0400 Subject: [PATCH 19/22] update version --- .../src/opentelemetry/ext/celery/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/version.py b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/version.py index 603bf0b7e5..6d4fefa599 100644 --- a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/version.py +++ b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "0.9.dev0" +__version__ = "0.10.dev0" From 440c66d1c4886c585aed9c31fb5585211881d4a4 Mon Sep 17 00:00:00 2001 From: "Tahir H. Butt" Date: Tue, 16 Jun 2020 15:13:15 -0400 Subject: [PATCH 20/22] docs, cleanup --- .pylintrc | 2 +- ext/opentelemetry-ext-celery/README.rst | 27 ++++++++++++ ext/opentelemetry-ext-celery/setup.cfg | 2 +- .../src/opentelemetry/ext/celery/__init__.py | 41 +++++++++---------- .../src/opentelemetry/ext/celery/utils.py | 29 ++++++------- .../tests/test_utils.py | 4 +- 6 files changed, 63 insertions(+), 42 deletions(-) diff --git a/.pylintrc b/.pylintrc index 01c96ea395..5f9463df7d 100644 --- a/.pylintrc +++ b/.pylintrc @@ -226,7 +226,7 @@ dummy-variables-rgx=_+$|(_[a-zA-Z0-9_]*[a-zA-Z0-9]+?$)|dummy|^ignored_|^unused_ # Argument names that match this expression will be ignored. Default to name # with leading underscore. -ignored-argument-names=_.*|^ignored_|^unused_ +ignored-argument-names=_.*|^ignored_|^unused_|^kwargs|^args # Tells whether we should check for unused import in __init__ files. init-import=no diff --git a/ext/opentelemetry-ext-celery/README.rst b/ext/opentelemetry-ext-celery/README.rst index 432ead43e0..54a30f3a25 100644 --- a/ext/opentelemetry-ext-celery/README.rst +++ b/ext/opentelemetry-ext-celery/README.rst @@ -16,6 +16,33 @@ Installation pip install opentelemetry-ext-celery +Usage +----- + +* Start broker backend + +:: + docker run -p 5672:5672 rabbitmq + + +* Run instrumented task + +.. code-block:: python + + from opentelemetry import trace + from opentelemetry.ext.celery import CeleryInstrumentor + + CeleryInstrumentor().instrument() + + from celery import Celery + + app = Celery("tasks", broker="amqp://localhost") + + @app.task + def add(x, y): + return x + y + + add.delay(42, 50) References ---------- diff --git a/ext/opentelemetry-ext-celery/setup.cfg b/ext/opentelemetry-ext-celery/setup.cfg index 9e500f1ce8..ecb42b7fb7 100644 --- a/ext/opentelemetry-ext-celery/setup.cfg +++ b/ext/opentelemetry-ext-celery/setup.cfg @@ -14,7 +14,7 @@ # [metadata] name = opentelemetry-ext-celery -description = OpenTelemetry Celery integration +description = OpenTelemetry Celery Instrumentation long_description = file: README.rst long_description_content_type = text/x-rst author = OpenTelemetry Authors diff --git a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py index b808e32887..4b7d0cb508 100644 --- a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py +++ b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py @@ -12,24 +12,22 @@ # See the License for the specific language governing permissions and # limitations under the License. """ -Instrument `celery`_ to report Celery APP operations. - -There are two options for instrumenting code. The first option is to use the -``opentelemetry-instrument`` executable which will automatically -instrument your Celery APP. The second is to programmatically enable -instrumentation as explained in the following section. +Instrument `celery`_ to trace Celery applications. .. _celery: https://pypi.org/project/celery/ Usage ----- -Be sure rabbitmq is running: +* Start broker backend .. code:: docker run -p 5672:5672 rabbitmq + +* Run instrumented task + .. code:: python from opentelemetry import trace @@ -57,9 +55,9 @@ def add(x, y): from celery import signals # pylint: disable=no-name-in-module from opentelemetry import trace -from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.ext.celery import utils from opentelemetry.ext.celery.version import __version__ +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.trace.status import Status, StatusCanonicalCode logger = logging.getLogger(__name__) @@ -77,7 +75,6 @@ def add(x, y): class CeleryInstrumentor(BaseInstrumentor): - # pylint: disable=unused-argument def _instrument(self, **kwargs): tracer_provider = kwargs.get("tracer_provider") @@ -104,8 +101,8 @@ def _uninstrument(self, **kwargs): signals.task_retry.disconnect(self._trace_retry) def _trace_prerun(self, *args, **kwargs): - task = utils.signal_retrieve_task(kwargs) - task_id = utils.signal_retrieve_task_id(kwargs) + task = utils.retrieve_task(kwargs) + task_id = utils.retrieve_task_id(kwargs) if task is None or task_id is None: return @@ -120,8 +117,8 @@ def _trace_prerun(self, *args, **kwargs): @staticmethod def _trace_postrun(*args, **kwargs): - task = utils.signal_retrieve_task(kwargs) - task_id = utils.signal_retrieve_task_id(kwargs) + task = utils.retrieve_task(kwargs) + task_id = utils.retrieve_task_id(kwargs) if task is None or task_id is None: return @@ -147,8 +144,8 @@ def _trace_before_publish(self, *args, **kwargs): # The `Task` instance **does not** include any information about the current # execution, so it **must not** be used to retrieve `request` data. # pylint: disable=no-member - task = utils.signal_retrieve_task_from_sender(kwargs) - task_id = utils.signal_retrieve_task_id_from_message(kwargs) + task = utils.retrieve_task_from_sender(kwargs) + task_id = utils.retrieve_task_id_from_message(kwargs) if task is None or task_id is None: return @@ -167,8 +164,8 @@ def _trace_before_publish(self, *args, **kwargs): @staticmethod def _trace_after_publish(*args, **kwargs): - task = utils.signal_retrieve_task_from_sender(kwargs) - task_id = utils.signal_retrieve_task_id_from_message(kwargs) + task = utils.retrieve_task_from_sender(kwargs) + task_id = utils.retrieve_task_id_from_message(kwargs) if task is None or task_id is None: return @@ -184,8 +181,8 @@ def _trace_after_publish(*args, **kwargs): @staticmethod def _trace_failure(*args, **kwargs): - task = utils.signal_retrieve_task_from_sender(kwargs) - task_id = utils.signal_retrieve_task_id(kwargs) + task = utils.retrieve_task_from_sender(kwargs) + task_id = utils.retrieve_task_id(kwargs) if task is None or task_id is None: return @@ -210,9 +207,9 @@ def _trace_failure(*args, **kwargs): @staticmethod def _trace_retry(*args, **kwargs): - task = utils.signal_retrieve_task_from_sender(kwargs) - task_id = utils.signal_retrieve_task_id_from_request(kwargs) - reason = utils.signal_retrieve_reason(kwargs) + task = utils.retrieve_task_from_sender(kwargs) + task_id = utils.retrieve_task_id_from_request(kwargs) + reason = utils.retrieve_reason(kwargs) if task is None or task_id is None or reason is None: return diff --git a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/utils.py b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/utils.py index 501eab1d02..60fe52f04e 100644 --- a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/utils.py +++ b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/utils.py @@ -14,7 +14,7 @@ import logging -import celery +from celery import registry # pylint: disable=no-name-in-module logger = logging.getLogger(__name__) @@ -50,7 +50,6 @@ def set_attributes_from_context(span, context): """Helper to extract meta values from a Celery Context""" for key in CELERY_CONTEXT_ATTRIBUTES: value = context.get(key) - attribute_name = None # Skip this key if it is not set if value is None or value == "": @@ -58,16 +57,18 @@ def set_attributes_from_context(span, context): # Skip `timelimit` if it is not set (it's default/unset value is a # tuple or a list of `None` values - elif key == "timelimit" and value in [(None, None), [None, None]]: + if key == "timelimit" and value in [(None, None), [None, None]]: continue # Skip `retries` if it's value is `0` - elif key == "retries" and value == 0: + if key == "retries" and value == 0: continue + attribute_name = None + # Celery 4.0 uses `origin` instead of `hostname`; this change preserves # the same name for the tag despite Celery version - elif key == "origin": + if key == "origin": key = "hostname" elif key == "delivery_info": @@ -154,38 +155,36 @@ def retrieve_span(task, task_id, is_publish=False): return span_dict.get((task_id, is_publish), (None, None)) -def signal_retrieve_task(kwargs): +def retrieve_task(kwargs): task = kwargs.get("task") if task is None: logger.debug("Unable to retrieve task from signal arguments") return task -def signal_retrieve_task_from_sender(kwargs): +def retrieve_task_from_sender(kwargs): sender = kwargs.get("sender") if sender is None: logger.debug("Unable to retrieve the sender from signal arguments") - return # before and after publish signals sender is the task name # for retry and failure signals sender is the task object if isinstance(sender, str): - sender = celery.registry.tasks.get(sender) + sender = registry.tasks.get(sender) if sender is None: logger.debug("Unable to retrieve the task from sender=%s", sender) - return return sender -def signal_retrieve_task_id(kwargs): +def retrieve_task_id(kwargs): task_id = kwargs.get("task_id") if task_id is None: logger.debug("Unable to retrieve task_id from signal arguments") return task_id -def signal_retrieve_task_id_from_request(kwargs): +def retrieve_task_id_from_request(kwargs): # retry signal does not include task_id as argument so use request argument request = kwargs.get("request") if request is None: @@ -198,7 +197,7 @@ def signal_retrieve_task_id_from_request(kwargs): return task_id -def signal_retrieve_task_id_from_message(kwargs): +def retrieve_task_id_from_message(kwargs): """Helper to retrieve the `Task` identifier from the message `body`. This helper supports Protocol Version 1 and 2. The Protocol is well detailed in the official documentation: @@ -213,9 +212,7 @@ def signal_retrieve_task_id_from_message(kwargs): return body.get("id") - - -def signal_retrieve_reason(kwargs): +def retrieve_reason(kwargs): reason = kwargs.get("reason") if not reason: logger.debug("Unable to retrieve the retry reason") diff --git a/ext/opentelemetry-ext-celery/tests/test_utils.py b/ext/opentelemetry-ext-celery/tests/test_utils.py index 3b8d2d76cf..b5e8163def 100644 --- a/ext/opentelemetry-ext-celery/tests/test_utils.py +++ b/ext/opentelemetry-ext-celery/tests/test_utils.py @@ -161,7 +161,7 @@ def test_task_id_from_protocol_v1(self): "properties": {}, } - task_id = utils.retrieve_task_id(context) + task_id = utils.retrieve_task_id_from_message(context) self.assertEqual(task_id, "dffcaec1-dd92-4a1a-b3ab-d6512f4beeb7") def test_task_id_from_protocol_v2(self): @@ -204,5 +204,5 @@ def test_task_id_from_protocol_v2(self): }, } - task_id = utils.retrieve_task_id(context) + task_id = utils.retrieve_task_id_from_message(context) self.assertEqual(task_id, "7e917b83-4018-431d-9832-73a28e1fb6c0") From e18452c6cfe69cf55f1477d0da77ed70f278cf93 Mon Sep 17 00:00:00 2001 From: "Tahir H. Butt" Date: Tue, 16 Jun 2020 15:48:20 -0400 Subject: [PATCH 21/22] status for exceptions --- .../src/opentelemetry/ext/celery/__init__.py | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py index a977fff223..9ce31f34f6 100644 --- a/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py +++ b/ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/__init__.py @@ -188,18 +188,21 @@ def _trace_failure(*args, **kwargs): if span is None: return + status_kwargs = {"canonical_code": StatusCanonicalCode.UNKNOWN} + ex = kwargs.get("einfo") - if ex is None: - return - if hasattr(task, "throws") and isinstance(ex.exception, task.throws): + + if ( + hasattr(task, "throws") + and ex is not None + and isinstance(ex.exception, task.throws) + ): return - span.set_status( - Status( - canonical_code=StatusCanonicalCode.UNKNOWN, - description=str(ex), - ) - ) + if ex is not None: + status_kwargs["description"] = str(ex) + + span.set_status(Status(**status_kwargs)) @staticmethod def _trace_retry(*args, **kwargs): From 8d47875d11e5ee33903ff2049af59eef03d2cfd0 Mon Sep 17 00:00:00 2001 From: "Tahir H. Butt" Date: Tue, 16 Jun 2020 21:32:16 -0400 Subject: [PATCH 22/22] remove celery from lint tox --- tox.ini | 1 - 1 file changed, 1 deletion(-) diff --git a/tox.ini b/tox.ini index 2798d78614..e182be2e00 100644 --- a/tox.ini +++ b/tox.ini @@ -334,7 +334,6 @@ deps = psutil readme_renderer httpretty - celery commands_pre = python scripts/eachdist.py install --editable --with-test-deps