Skip to content

Commit

Permalink
docs, cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
majorgreys committed Jun 16, 2020
1 parent 29abf3c commit 440c66d
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 42 deletions.
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ dummy-variables-rgx=_+$|(_[a-zA-Z0-9_]*[a-zA-Z0-9]+?$)|dummy|^ignored_|^unused_

# Argument names that match this expression will be ignored. Default to name
# with leading underscore.
ignored-argument-names=_.*|^ignored_|^unused_
ignored-argument-names=_.*|^ignored_|^unused_|^kwargs|^args

# Tells whether we should check for unused import in __init__ files.
init-import=no
Expand Down
27 changes: 27 additions & 0 deletions ext/opentelemetry-ext-celery/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,33 @@ Installation

pip install opentelemetry-ext-celery

Usage
-----

* Start broker backend

::
docker run -p 5672:5672 rabbitmq


* Run instrumented task

.. code-block:: python
from opentelemetry import trace
from opentelemetry.ext.celery import CeleryInstrumentor
CeleryInstrumentor().instrument()
from celery import Celery
app = Celery("tasks", broker="amqp://localhost")
@app.task
def add(x, y):
return x + y
add.delay(42, 50)
References
----------
Expand Down
2 changes: 1 addition & 1 deletion ext/opentelemetry-ext-celery/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
#
[metadata]
name = opentelemetry-ext-celery
description = OpenTelemetry Celery integration
description = OpenTelemetry Celery Instrumentation
long_description = file: README.rst
long_description_content_type = text/x-rst
author = OpenTelemetry Authors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Instrument `celery`_ to report Celery APP operations.
There are two options for instrumenting code. The first option is to use the
``opentelemetry-instrument`` executable which will automatically
instrument your Celery APP. The second is to programmatically enable
instrumentation as explained in the following section.
Instrument `celery`_ to trace Celery applications.
.. _celery: https://pypi.org/project/celery/
Usage
-----
Be sure rabbitmq is running:
* Start broker backend
.. code::
docker run -p 5672:5672 rabbitmq
* Run instrumented task
.. code:: python
from opentelemetry import trace
Expand Down Expand Up @@ -57,9 +55,9 @@ def add(x, y):
from celery import signals # pylint: disable=no-name-in-module

from opentelemetry import trace
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.ext.celery import utils
from opentelemetry.ext.celery.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.trace.status import Status, StatusCanonicalCode

logger = logging.getLogger(__name__)
Expand All @@ -77,7 +75,6 @@ def add(x, y):


class CeleryInstrumentor(BaseInstrumentor):
# pylint: disable=unused-argument
def _instrument(self, **kwargs):
tracer_provider = kwargs.get("tracer_provider")

Expand All @@ -104,8 +101,8 @@ def _uninstrument(self, **kwargs):
signals.task_retry.disconnect(self._trace_retry)

def _trace_prerun(self, *args, **kwargs):
task = utils.signal_retrieve_task(kwargs)
task_id = utils.signal_retrieve_task_id(kwargs)
task = utils.retrieve_task(kwargs)
task_id = utils.retrieve_task_id(kwargs)

if task is None or task_id is None:
return
Expand All @@ -120,8 +117,8 @@ def _trace_prerun(self, *args, **kwargs):

@staticmethod
def _trace_postrun(*args, **kwargs):
task = utils.signal_retrieve_task(kwargs)
task_id = utils.signal_retrieve_task_id(kwargs)
task = utils.retrieve_task(kwargs)
task_id = utils.retrieve_task_id(kwargs)

if task is None or task_id is None:
return
Expand All @@ -147,8 +144,8 @@ def _trace_before_publish(self, *args, **kwargs):
# The `Task` instance **does not** include any information about the current
# execution, so it **must not** be used to retrieve `request` data.
# pylint: disable=no-member
task = utils.signal_retrieve_task_from_sender(kwargs)
task_id = utils.signal_retrieve_task_id_from_message(kwargs)
task = utils.retrieve_task_from_sender(kwargs)
task_id = utils.retrieve_task_id_from_message(kwargs)

if task is None or task_id is None:
return
Expand All @@ -167,8 +164,8 @@ def _trace_before_publish(self, *args, **kwargs):

@staticmethod
def _trace_after_publish(*args, **kwargs):
task = utils.signal_retrieve_task_from_sender(kwargs)
task_id = utils.signal_retrieve_task_id_from_message(kwargs)
task = utils.retrieve_task_from_sender(kwargs)
task_id = utils.retrieve_task_id_from_message(kwargs)

if task is None or task_id is None:
return
Expand All @@ -184,8 +181,8 @@ def _trace_after_publish(*args, **kwargs):

@staticmethod
def _trace_failure(*args, **kwargs):
task = utils.signal_retrieve_task_from_sender(kwargs)
task_id = utils.signal_retrieve_task_id(kwargs)
task = utils.retrieve_task_from_sender(kwargs)
task_id = utils.retrieve_task_id(kwargs)

if task is None or task_id is None:
return
Expand All @@ -210,9 +207,9 @@ def _trace_failure(*args, **kwargs):

@staticmethod
def _trace_retry(*args, **kwargs):
task = utils.signal_retrieve_task_from_sender(kwargs)
task_id = utils.signal_retrieve_task_id_from_request(kwargs)
reason = utils.signal_retrieve_reason(kwargs)
task = utils.retrieve_task_from_sender(kwargs)
task_id = utils.retrieve_task_id_from_request(kwargs)
reason = utils.retrieve_reason(kwargs)

if task is None or task_id is None or reason is None:
return
Expand Down
29 changes: 13 additions & 16 deletions ext/opentelemetry-ext-celery/src/opentelemetry/ext/celery/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import logging

import celery
from celery import registry # pylint: disable=no-name-in-module

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -50,24 +50,25 @@ def set_attributes_from_context(span, context):
"""Helper to extract meta values from a Celery Context"""
for key in CELERY_CONTEXT_ATTRIBUTES:
value = context.get(key)
attribute_name = None

# Skip this key if it is not set
if value is None or value == "":
continue

# Skip `timelimit` if it is not set (it's default/unset value is a
# tuple or a list of `None` values
elif key == "timelimit" and value in [(None, None), [None, None]]:
if key == "timelimit" and value in [(None, None), [None, None]]:
continue

# Skip `retries` if it's value is `0`
elif key == "retries" and value == 0:
if key == "retries" and value == 0:
continue

attribute_name = None

# Celery 4.0 uses `origin` instead of `hostname`; this change preserves
# the same name for the tag despite Celery version
elif key == "origin":
if key == "origin":
key = "hostname"

elif key == "delivery_info":
Expand Down Expand Up @@ -154,38 +155,36 @@ def retrieve_span(task, task_id, is_publish=False):
return span_dict.get((task_id, is_publish), (None, None))


def signal_retrieve_task(kwargs):
def retrieve_task(kwargs):
task = kwargs.get("task")
if task is None:
logger.debug("Unable to retrieve task from signal arguments")
return task


def signal_retrieve_task_from_sender(kwargs):
def retrieve_task_from_sender(kwargs):
sender = kwargs.get("sender")
if sender is None:
logger.debug("Unable to retrieve the sender from signal arguments")
return

# before and after publish signals sender is the task name
# for retry and failure signals sender is the task object
if isinstance(sender, str):
sender = celery.registry.tasks.get(sender)
sender = registry.tasks.get(sender)
if sender is None:
logger.debug("Unable to retrieve the task from sender=%s", sender)
return

return sender


def signal_retrieve_task_id(kwargs):
def retrieve_task_id(kwargs):
task_id = kwargs.get("task_id")
if task_id is None:
logger.debug("Unable to retrieve task_id from signal arguments")
return task_id


def signal_retrieve_task_id_from_request(kwargs):
def retrieve_task_id_from_request(kwargs):
# retry signal does not include task_id as argument so use request argument
request = kwargs.get("request")
if request is None:
Expand All @@ -198,7 +197,7 @@ def signal_retrieve_task_id_from_request(kwargs):
return task_id


def signal_retrieve_task_id_from_message(kwargs):
def retrieve_task_id_from_message(kwargs):
"""Helper to retrieve the `Task` identifier from the message `body`.
This helper supports Protocol Version 1 and 2. The Protocol is well
detailed in the official documentation:
Expand All @@ -213,9 +212,7 @@ def signal_retrieve_task_id_from_message(kwargs):
return body.get("id")




def signal_retrieve_reason(kwargs):
def retrieve_reason(kwargs):
reason = kwargs.get("reason")
if not reason:
logger.debug("Unable to retrieve the retry reason")
Expand Down
4 changes: 2 additions & 2 deletions ext/opentelemetry-ext-celery/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def test_task_id_from_protocol_v1(self):
"properties": {},
}

task_id = utils.retrieve_task_id(context)
task_id = utils.retrieve_task_id_from_message(context)
self.assertEqual(task_id, "dffcaec1-dd92-4a1a-b3ab-d6512f4beeb7")

def test_task_id_from_protocol_v2(self):
Expand Down Expand Up @@ -204,5 +204,5 @@ def test_task_id_from_protocol_v2(self):
},
}

task_id = utils.retrieve_task_id(context)
task_id = utils.retrieve_task_id_from_message(context)
self.assertEqual(task_id, "7e917b83-4018-431d-9832-73a28e1fb6c0")

0 comments on commit 440c66d

Please sign in to comment.