Skip to content

Commit

Permalink
Merge branch 'main' into emptyroute
Browse files Browse the repository at this point in the history
  • Loading branch information
muncus authored Jun 26, 2023
2 parents 17f2cce + a45c9c3 commit 9e6be51
Show file tree
Hide file tree
Showing 9 changed files with 347 additions and 46 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Fix async redis clients not being traced correctly ([#1830](https:/open-telemetry/opentelemetry-python-contrib/pull/1830))
- Make Flask request span attributes available for `start_span`.
([#1784](https:/open-telemetry/opentelemetry-python-contrib/pull/1784))
- Fix falcon instrumentation's usage of Span Status to only set the description if the status code is ERROR.
Expand All @@ -33,6 +34,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#1789](https:/open-telemetry/opentelemetry-python-contrib/pull/1789))
- `opentelemetry-instrumentation-grpc` Allow gRPC connections via Unix socket
([#1833](https:/open-telemetry/opentelemetry-python-contrib/pull/1833))
- Fix elasticsearch `Transport.perform_request` instrument wrap for elasticsearch >= 8
([#1810](https:/open-telemetry/opentelemetry-python-contrib/pull/1810))

## Version 1.18.0/0.39b0 (2023-05-10)

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ Approvers ([@open-telemetry/python-approvers](https:/orgs/open-telem
- [Aaron Abbott](https:/aabmass), Google
- [Jeremy Voss](https:/jeremydvoss), Microsoft
- [Sanket Mehta](https:/sanketmehta28), Cisco
- [Shalev Roda](https:/shalevr), Cisco

Emeritus Approvers:

Expand All @@ -112,6 +111,7 @@ Maintainers ([@open-telemetry/python-maintainers](https:/orgs/open-t

- [Diego Hurtado](https:/ocelotl), Lightstep
- [Leighton Chen](https:/lzchen), Microsoft
- [Shalev Roda](https:/shalevr), Cisco

Emeritus Maintainers:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ def response_hook(span, response):

from .utils import sanitize_body

# Split of elasticsearch and elastic_transport in 8.0.0+
# https://www.elastic.co/guide/en/elasticsearch/client/python-api/master/release-notes.html#rn-8-0-0
es_transport_split = elasticsearch.VERSION[0] > 7
if es_transport_split:
import elastic_transport

logger = getLogger(__name__)


Expand Down Expand Up @@ -137,16 +143,28 @@ def _instrument(self, **kwargs):
tracer = get_tracer(__name__, __version__, tracer_provider)
request_hook = kwargs.get("request_hook")
response_hook = kwargs.get("response_hook")
_wrap(
elasticsearch,
"Transport.perform_request",
_wrap_perform_request(
tracer,
self._span_name_prefix,
request_hook,
response_hook,
),
)
if es_transport_split:
_wrap(
elastic_transport,
"Transport.perform_request",
_wrap_perform_request(
tracer,
self._span_name_prefix,
request_hook,
response_hook,
),
)
else:
_wrap(
elasticsearch,
"Transport.perform_request",
_wrap_perform_request(
tracer,
self._span_name_prefix,
request_hook,
response_hook,
),
)

def _uninstrument(self, **kwargs):
unwrap(elasticsearch.Transport, "perform_request")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from elasticsearch_dsl import Document, Keyword, Text


class Article(Document):
title = Text(analyzer="snowball", fields={"raw": Keyword()})
body = Text(analyzer="snowball")

class Index:
name = "test-index"


dsl_create_statement = {
"mappings": {
"properties": {
"title": {
"analyzer": "snowball",
"fields": {"raw": {"type": "keyword"}},
"type": "text",
},
"body": {"analyzer": "snowball", "type": "text"},
}
}
}
dsl_index_result = (1, {}, '{"result": "created"}')
dsl_index_span_name = "Elasticsearch/test-index/_doc/2"
dsl_index_url = "/test-index/_doc/2"
dsl_search_method = "POST"
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@

major_version = elasticsearch.VERSION[0]

if major_version == 7:
if major_version == 8:
from . import helpers_es8 as helpers # pylint: disable=no-name-in-module
elif major_version == 7:
from . import helpers_es7 as helpers # pylint: disable=no-name-in-module
elif major_version == 6:
from . import helpers_es6 as helpers # pylint: disable=no-name-in-module
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,18 +136,52 @@ def _set_connection_attributes(span, conn):
span.set_attribute(key, value)


def _build_span_name(instance, cmd_args):
if len(cmd_args) > 0 and cmd_args[0]:
name = cmd_args[0]
else:
name = instance.connection_pool.connection_kwargs.get("db", 0)
return name


def _build_span_meta_data_for_pipeline(instance):
try:
command_stack = (
instance.command_stack
if hasattr(instance, "command_stack")
else instance._command_stack
)

cmds = [
_format_command_args(c.args if hasattr(c, "args") else c[0])
for c in command_stack
]
resource = "\n".join(cmds)

span_name = " ".join(
[
(c.args[0] if hasattr(c, "args") else c[0][0])
for c in command_stack
]
)
except (AttributeError, IndexError):
command_stack = []
resource = ""
span_name = ""

return command_stack, resource, span_name


# pylint: disable=R0915
def _instrument(
tracer,
request_hook: _RequestHookT = None,
response_hook: _ResponseHookT = None,
):
def _traced_execute_command(func, instance, args, kwargs):
query = _format_command_args(args)
name = _build_span_name(instance, args)

if len(args) > 0 and args[0]:
name = args[0]
else:
name = instance.connection_pool.connection_kwargs.get("db", 0)
with tracer.start_as_current_span(
name, kind=trace.SpanKind.CLIENT
) as span:
Expand All @@ -163,31 +197,11 @@ def _traced_execute_command(func, instance, args, kwargs):
return response

def _traced_execute_pipeline(func, instance, args, kwargs):
try:
command_stack = (
instance.command_stack
if hasattr(instance, "command_stack")
else instance._command_stack
)

cmds = [
_format_command_args(
c.args if hasattr(c, "args") else c[0],
)
for c in command_stack
]
resource = "\n".join(cmds)

span_name = " ".join(
[
(c.args[0] if hasattr(c, "args") else c[0][0])
for c in command_stack
]
)
except (AttributeError, IndexError):
command_stack = []
resource = ""
span_name = ""
(
command_stack,
resource,
span_name,
) = _build_span_meta_data_for_pipeline(instance)

with tracer.start_as_current_span(
span_name, kind=trace.SpanKind.CLIENT
Expand Down Expand Up @@ -232,32 +246,72 @@ def _traced_execute_pipeline(func, instance, args, kwargs):
"ClusterPipeline.execute",
_traced_execute_pipeline,
)

async def _async_traced_execute_command(func, instance, args, kwargs):
query = _format_command_args(args)
name = _build_span_name(instance, args)

with tracer.start_as_current_span(
name, kind=trace.SpanKind.CLIENT
) as span:
if span.is_recording():
span.set_attribute(SpanAttributes.DB_STATEMENT, query)
_set_connection_attributes(span, instance)
span.set_attribute("db.redis.args_length", len(args))
if callable(request_hook):
request_hook(span, instance, args, kwargs)
response = await func(*args, **kwargs)
if callable(response_hook):
response_hook(span, instance, response)
return response

async def _async_traced_execute_pipeline(func, instance, args, kwargs):
(
command_stack,
resource,
span_name,
) = _build_span_meta_data_for_pipeline(instance)

with tracer.start_as_current_span(
span_name, kind=trace.SpanKind.CLIENT
) as span:
if span.is_recording():
span.set_attribute(SpanAttributes.DB_STATEMENT, resource)
_set_connection_attributes(span, instance)
span.set_attribute(
"db.redis.pipeline_length", len(command_stack)
)
response = await func(*args, **kwargs)
if callable(response_hook):
response_hook(span, instance, response)
return response

if redis.VERSION >= _REDIS_ASYNCIO_VERSION:
wrap_function_wrapper(
"redis.asyncio",
f"{redis_class}.execute_command",
_traced_execute_command,
_async_traced_execute_command,
)
wrap_function_wrapper(
"redis.asyncio.client",
f"{pipeline_class}.execute",
_traced_execute_pipeline,
_async_traced_execute_pipeline,
)
wrap_function_wrapper(
"redis.asyncio.client",
f"{pipeline_class}.immediate_execute_command",
_traced_execute_command,
_async_traced_execute_command,
)
if redis.VERSION >= _REDIS_ASYNCIO_CLUSTER_VERSION:
wrap_function_wrapper(
"redis.asyncio.cluster",
"RedisCluster.execute_command",
_traced_execute_command,
_async_traced_execute_command,
)
wrap_function_wrapper(
"redis.asyncio.cluster",
"ClusterPipeline.execute",
_traced_execute_pipeline,
_async_traced_execute_pipeline,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,36 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
from unittest import mock

import redis
import redis.asyncio

from opentelemetry import trace
from opentelemetry.instrumentation.redis import RedisInstrumentor
from opentelemetry.test.test_base import TestBase
from opentelemetry.trace import SpanKind


class AsyncMock:
"""A sufficient async mock implementation.
Python 3.7 doesn't have an inbuilt async mock class, so this is used.
"""

def __init__(self):
self.mock = mock.Mock()

async def __call__(self, *args, **kwargs):
future = asyncio.Future()
future.set_result("random")
return future

def __getattr__(self, item):
return AsyncMock()


class TestRedis(TestBase):
def setUp(self):
super().setUp()
Expand Down Expand Up @@ -87,6 +107,35 @@ def test_instrument_uninstrument(self):
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)

def test_instrument_uninstrument_async_client_command(self):
redis_client = redis.asyncio.Redis()

with mock.patch.object(redis_client, "connection", AsyncMock()):
asyncio.run(redis_client.get("key"))

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.memory_exporter.clear()

# Test uninstrument
RedisInstrumentor().uninstrument()

with mock.patch.object(redis_client, "connection", AsyncMock()):
asyncio.run(redis_client.get("key"))

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 0)
self.memory_exporter.clear()

# Test instrument again
RedisInstrumentor().instrument()

with mock.patch.object(redis_client, "connection", AsyncMock()):
asyncio.run(redis_client.get("key"))

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)

def test_response_hook(self):
redis_client = redis.Redis()
connection = redis.connection.Connection()
Expand Down
Loading

0 comments on commit 9e6be51

Please sign in to comment.