Skip to content

Commit

Permalink
Messaging convention reviewed (#1297)
Browse files Browse the repository at this point in the history
* Kafka reviewed

* Kafka Streams reviewed

* JMS reviewed

* RabbitMQ reviewed

* Format

* Format

* Polish

* Add comments

* Update parents as per spec proposal

* Cleanup

* Remove Operation enum

* muzzle

* Fix build

* Polish

Co-authored-by: Trask Stalnaker <[email protected]>
  • Loading branch information
iNikem and trask authored Oct 13, 2020
1 parent 4801473 commit f23ad29
Show file tree
Hide file tree
Showing 33 changed files with 1,076 additions and 820 deletions.
19 changes: 19 additions & 0 deletions docs/semantic-conventions.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,22 @@ not values defined by spec.
| `db.hbase` | Y | -, HBase is not supported |
| `db.redis.database_index` | N | only set for Lettuce driver, not for Jedis |
| `db.mongodb.collection` | Y | - |

## Messaging

Attribute name | Required? | Implemented? |
| -------------- | :-----: | :---: |
| `messaging.system` | Y | + |
| `messaging.destination` | Y | + |
| `messaging.destination_kind` | Y | + |
| `messaging.temp_destination` | N | - |
| `messaging.protocol` | N | - |
| `messaging.protocol_version` | N | - |
| `messaging.url` | N | - |
| `messaging.message_id` | N | only for JMS |
| `messaging.conversation_id` | N | only for JMS |
| `messaging.message_payload_size_bytes` | N | only for RabbitMQ and Kafka [1] |
| `messaging.message_payload_compressed_size_bytes` | N | - |
| `messaging.operation` | for consumers only | +

**[1]:** Kafka consumer instrumentation sets this to the serialized size of the value
75 changes: 41 additions & 34 deletions instrumentation/jms-1.1/src/jms2Test/groovy/JMS2Test.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
* SPDX-License-Identifier: Apache-2.0
*/

import static io.opentelemetry.trace.Span.Kind.CLIENT
import static io.opentelemetry.trace.Span.Kind.CONSUMER
import static io.opentelemetry.trace.Span.Kind.PRODUCER

import com.google.common.io.Files
import io.opentelemetry.instrumentation.test.AgentTestRunner
import io.opentelemetry.instrumentation.test.asserts.TraceAssert
import io.opentelemetry.javaagent.instrumentation.jms.JMSTracer
import io.opentelemetry.sdk.trace.data.SpanData
import io.opentelemetry.trace.attributes.SemanticAttributes
import java.util.concurrent.CountDownLatch
Expand All @@ -29,7 +29,6 @@ import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory
import org.hornetq.core.server.HornetQServer
import org.hornetq.core.server.HornetQServers
import org.hornetq.jms.client.HornetQMessageConsumer
import org.hornetq.jms.client.HornetQTextMessage
import spock.lang.Shared

Expand Down Expand Up @@ -99,7 +98,7 @@ class JMS2Test extends AgentTestRunner {
producerSpan(it, 0, destinationType, destinationName)
}
trace(1, 1) {
consumerSpan(it, 0, destinationType, destinationName, messageId, false, HornetQMessageConsumer, traces[0][0])
consumerSpan(it, 0, destinationType, destinationName, messageId, null, "receive")
}
}

Expand All @@ -111,8 +110,8 @@ class JMS2Test extends AgentTestRunner {
destination | destinationType | destinationName
session.createQueue("someQueue") | "queue" | "someQueue"
session.createTopic("someTopic") | "topic" | "someTopic"
session.createTemporaryQueue() | "queue" | "<temporary>"
session.createTemporaryTopic() | "topic" | "<temporary>"
session.createTemporaryQueue() | "queue" | JMSTracer.TEMP_DESTINATION_NAME
session.createTemporaryTopic() | "topic" | JMSTracer.TEMP_DESTINATION_NAME
}

def "sending to a MessageListener on #destinationName #destinationType generates a span"() {
Expand All @@ -136,7 +135,7 @@ class JMS2Test extends AgentTestRunner {
assertTraces(1) {
trace(0, 2) {
producerSpan(it, 0, destinationType, destinationName)
consumerSpan(it, 1, destinationType, destinationName, messageRef.get().getJMSMessageID(), true, consumer.messageListener.class, span(0))
consumerSpan(it, 1, destinationType, destinationName, messageRef.get().getJMSMessageID(), span(0), "process")
}
}
// This check needs to go after all traces have been accounted for
Expand All @@ -150,8 +149,8 @@ class JMS2Test extends AgentTestRunner {
destination | destinationType | destinationName
session.createQueue("someQueue") | "queue" | "someQueue"
session.createTopic("someTopic") | "topic" | "someTopic"
session.createTemporaryQueue() | "queue" | "<temporary>"
session.createTemporaryTopic() | "topic" | "<temporary>"
session.createTemporaryQueue() | "queue" | JMSTracer.TEMP_DESTINATION_NAME
session.createTemporaryTopic() | "topic" | JMSTracer.TEMP_DESTINATION_NAME
}

def "failing to receive message with receiveNoWait on #destinationName #destinationType works"() {
Expand All @@ -167,12 +166,14 @@ class JMS2Test extends AgentTestRunner {
trace(0, 1) { // Consumer trace
span(0) {
hasNoParent()
name destinationType + "/" + destinationName + " receive"
kind CLIENT
name destinationName + " receive"
kind CONSUMER
errored false
attributes {
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key()}" destinationType
"${SemanticAttributes.MESSAGING_DESTINATION.key()}" destinationName
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "jms"
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" destinationType
"${SemanticAttributes.MESSAGING_DESTINATION.key}" destinationName
"${SemanticAttributes.MESSAGING_OPERATION.key}" "receive"
}
}
}
Expand Down Expand Up @@ -200,12 +201,15 @@ class JMS2Test extends AgentTestRunner {
trace(0, 1) { // Consumer trace
span(0) {
hasNoParent()
name destinationType + "/" + destinationName + " receive"
kind CLIENT
name destinationName + " receive"
kind CONSUMER
errored false
attributes {
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key()}" destinationType
"${SemanticAttributes.MESSAGING_DESTINATION.key()}" destinationName
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "jms"
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" destinationType
"${SemanticAttributes.MESSAGING_DESTINATION.key}" destinationName
"${SemanticAttributes.MESSAGING_OPERATION.key}" "receive"
}
}
}
Expand All @@ -222,42 +226,45 @@ class JMS2Test extends AgentTestRunner {
static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName) {
trace.span(index) {
name destinationType + "/" + destinationName + " send"
name destinationName + " send"
kind PRODUCER
errored false
hasNoParent()
attributes {
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key()}" destinationType
"${SemanticAttributes.MESSAGING_DESTINATION.key()}" destinationName
if (destinationName == "<temporary>") {
"${SemanticAttributes.MESSAGING_TEMP_DESTINATION.key()}" true
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "jms"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" destinationName
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" destinationType
if (destinationName == JMSTracer.TEMP_DESTINATION_NAME) {
"${SemanticAttributes.MESSAGING_TEMP_DESTINATION.key}" true
}
}
}
}
static consumerSpan(TraceAssert trace, int index, String destinationType, String destinationName, String messageId, boolean messageListener, Class origin, Object parentOrLinkedSpan) {
// passing messageId = null will verify message.id is not captured,
// passing messageId = "" will verify message.id is captured (but won't verify anything about the value),
// any other value for messageId will verify that message.id is captured and has that same value
static consumerSpan(TraceAssert trace, int index, String destinationType, String destinationName, String messageId, Object parentOrLinkedSpan, String operation) {
trace.span(index) {
name destinationType + "/" + destinationName + " receive"
if (messageListener) {
kind CONSUMER
name destinationName + " " + operation
kind CONSUMER
if (parentOrLinkedSpan != null) {
childOf((SpanData) parentOrLinkedSpan)
} else {
kind CLIENT
hasNoParent()
hasLink((SpanData) parentOrLinkedSpan)
}
errored false
attributes {
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key()}" destinationType
"${SemanticAttributes.MESSAGING_DESTINATION.key()}" destinationName
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "jms"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" destinationName
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" destinationType
"${SemanticAttributes.MESSAGING_OPERATION.key}" operation
if (messageId != null) {
"${SemanticAttributes.MESSAGING_MESSAGE_ID.key()}" messageId
} else {
"${SemanticAttributes.MESSAGING_MESSAGE_ID.key()}" String
//In some tests we don't know exact messageId, so we pass "" and verify just the existence of the attribute
"${SemanticAttributes.MESSAGING_MESSAGE_ID.key}" { it == messageId || messageId == "" }
}
if (destinationName == "<temporary>") {
"${SemanticAttributes.MESSAGING_TEMP_DESTINATION.key()}" true
if (destinationName == JMSTracer.TEMP_DESTINATION_NAME) {
"${SemanticAttributes.MESSAGING_TEMP_DESTINATION.key}" true
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@ import static JMS2Test.producerSpan
import io.opentelemetry.instrumentation.test.AgentTestRunner
import javax.jms.ConnectionFactory
import listener.Config
import org.hornetq.jms.client.HornetQMessageConsumer
import org.springframework.context.annotation.AnnotationConfigApplicationContext
import org.springframework.jms.core.JmsTemplate
import org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter

class SpringListenerJMS2Test extends AgentTestRunner {
def "receiving message in spring listener generates spans"() {
Expand All @@ -26,10 +24,10 @@ class SpringListenerJMS2Test extends AgentTestRunner {
assertTraces(2) {
trace(0, 2) {
producerSpan(it, 0, "queue", "SpringListenerJMS2")
consumerSpan(it, 1, "queue", "SpringListenerJMS2", null, true, MessagingMessageListenerAdapter, span(0))
consumerSpan(it, 1, "queue", "SpringListenerJMS2", "", span(0), "process")
}
trace(1, 1) {
consumerSpan(it, 0, "queue", "SpringListenerJMS2", null, false, HornetQMessageConsumer, traces[0][0])
consumerSpan(it, 0, "queue", "SpringListenerJMS2", "", null, "receive")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory
import org.hornetq.core.server.HornetQServer
import org.hornetq.core.server.HornetQServers
import org.hornetq.jms.client.HornetQMessageConsumer
import org.springframework.jms.core.JmsTemplate
import spock.lang.Shared

Expand Down Expand Up @@ -90,7 +89,7 @@ class SpringTemplateJMS2Test extends AgentTestRunner {
producerSpan(it, 0, destinationType, destinationName)
}
trace(1, 1) {
consumerSpan(it, 0, destinationType, destinationName, receivedMessage.getJMSMessageID(), false, HornetQMessageConsumer, traces[0][0])
consumerSpan(it, 0, destinationType, destinationName, receivedMessage.getJMSMessageID(), null, "receive")
}
}

Expand Down Expand Up @@ -123,14 +122,13 @@ class SpringTemplateJMS2Test extends AgentTestRunner {
producerSpan(it, 0, destinationType, destinationName)
}
trace(1, 1) {
consumerSpan(it, 0, destinationType, destinationName, msgId.get(), false, HornetQMessageConsumer, traces[0][0])
consumerSpan(it, 0, destinationType, destinationName, msgId.get(), null, "receive")
}
trace(2, 1) {
// receive doesn't propagate the trace, so this is a root
producerSpan(it, 0, "queue", "<temporary>")
producerSpan(it, 0, "queue", "(temporary)")
}
trace(3, 1) {
consumerSpan(it, 0, "queue", "<temporary>", receivedMessage.getJMSMessageID(), false, HornetQMessageConsumer, traces[2][0])
consumerSpan(it, 0, "queue", "(temporary)", receivedMessage.getJMSMessageID(), null, "receive")
}
}

Expand Down

This file was deleted.

Loading

0 comments on commit f23ad29

Please sign in to comment.