Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS MQ span attributes #1977

Merged
merged 6 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/**
* This implementation of {@link CollectionFactory} will only be used if the agent-bridge
* is being used by an application and the agent is NOT being loaded. Thus, it is unlikely
* that the objects created by this implementation are going to receive much use.
* So methods in this implementation do not need to implement all functional requirements
* of the methods in the interface, but they should not break under low use.
*/
public final class AgentBridge {
/**
* Calls to methods on these classes will automatically be logged at FINEST.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package com.newrelic.agent.bridge;

import java.util.Map;
import java.util.function.Function;

/**
* Allows instrumentation and bridge API implementations to use collections from third partly libraries without
Expand Down Expand Up @@ -38,4 +39,14 @@ public interface CollectionFactory {
* @param <V> value type
*/
<K, V> Map<K, V> createConcurrentTimeBasedEvictionMap(long ageInSeconds);

/**
* Wraps the provided function into one that will cache the results for future calls.
* @param loader the function that calculates the value.
* @param maxSize the max number of items to be cached.
* @return the cached item, or the result of the loader call.
* @param <K> the type of key
* @param <V> the type of value stored/returned
*/
<K, V> Function<K, V> memorize(Function<K, V> loader, int maxSize);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import java.util.HashMap;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;

public class DefaultCollectionFactory implements CollectionFactory {

Expand All @@ -30,4 +32,15 @@ public <K, V> Map<K, V> createConcurrentWeakKeyedMap() {
public <K, V> Map<K, V> createConcurrentTimeBasedEvictionMap(long ageInSeconds) {
return Collections.synchronizedMap(new HashMap<>());
}

@Override
public <K, V> Function<K, V> memorize(Function<K, V> loader, int maxSize) {
Map<K, V> map = new ConcurrentHashMap<>();
return k -> map.computeIfAbsent(k, k1 -> {
if (map.size() >= maxSize) {
map.remove(map.keySet().iterator().next());
}
return loader.apply(k1);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ public void setInstanceName(String instanceName) {
public void addTracerParameter(String key, String value) {
}

@Override
public void addTracerParameter(String key, String value, boolean addToSpan) {
}

@Override
public void addTracerParameter(String key, Map<String, String> values) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public interface PrivateApi {

void addTracerParameter(String key, String value);

void addTracerParameter(String key, String value, boolean addToSpan);

void addTracerParameter(String key, Map<String, String> values);

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
*
* * Copyright 2024 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/
package com.newrelic.agent.bridge.messaging;

public class BrokerInstance {
private final String hostName;
private final Integer port;

public static BrokerInstance empty() {
return new BrokerInstance(null, null);
}

public BrokerInstance(String host, Integer port) {
this.hostName = host;
this.port = port;
}

public String getHostName() {
return hostName;
}

public Integer getPort() {
return port;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
*
* * Copyright 2024 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/
package com.newrelic.agent.bridge.messaging;

public class JmsProperties {
public static final String NR_JMS_BROKER_INSTANCE_PROPERTY = "NR_JMS_BROKER_INSTANCE_PROPERTY";
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
import com.newrelic.agent.TransactionListener;
import com.newrelic.agent.bridge.AgentBridge;
import com.newrelic.agent.bridge.TransactionNamePriority;
import com.newrelic.agent.browser.BrowserConfigTest;
import com.newrelic.agent.config.AgentConfigImpl;
import com.newrelic.agent.config.ConfigConstant;
import com.newrelic.agent.config.Hostname;
import com.newrelic.agent.dispatchers.WebRequestDispatcher;
import com.newrelic.agent.environment.AgentIdentity;
import com.newrelic.agent.errors.ErrorService;
Expand All @@ -38,7 +38,6 @@
import com.newrelic.agent.tracers.servlet.MockHttpResponse;
import com.newrelic.agent.transaction.PriorityTransactionName;
import com.newrelic.agent.transaction.TransactionThrowable;
import com.newrelic.agent.util.Obfuscator;
import com.newrelic.api.agent.DatastoreParameters;
import com.newrelic.api.agent.DestinationType;
import com.newrelic.api.agent.ExtendedRequest;
Expand Down Expand Up @@ -77,13 +76,11 @@
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/* (non-javadoc)
Expand All @@ -95,6 +92,7 @@ public class ApiTest implements TransactionListener {
ApiTestHelper apiTestHelper = new ApiTestHelper();
private static final String CAT_CONFIG_FILE = "configs/cross_app_tracing_test.yml";
private static final String HIGH_SECURITY_CONFIG_FILE = "configs/high_security_config.yml";
public static String HOSTNAME = Hostname.getHostname(ServiceFactory.getConfigService().getDefaultAgentConfig());
private static final ClassLoader CLASS_LOADER = ApiTest.class.getClassLoader();

@Before
Expand Down Expand Up @@ -2007,6 +2005,28 @@ public void testMessagingAPI() throws Exception {
}
}


@Test
public void testMessagingAPIWithHostAndPort() throws Exception {
// override default agent config to disabled distributed tracing and use CAT instead
EnvironmentHolder holder = setupEnvironmentHolder(CAT_CONFIG_FILE, "cat_enabled_dt_disabled_test");
MessagingTestServer server = new MessagingTestServer(8088);

try {
server.start();
runTestMessagingAPIWithHostAndPort();
String messageBrokerMetric = "MessageBroker/JMS/Queue/Consume/Temp";
Assert.assertTrue("The following metric should exist: " + messageBrokerMetric, apiTestHelper.tranStats.getScopedStats().getStatsMap().containsKey(messageBrokerMetric));
} catch (IOException e) {
e.printStackTrace();
Assert.fail();
} finally {
Transaction.clearTransaction();
server.closeAllConnections();
holder.close();
}
}

@Trace(dispatcher = true)
private void runTestMessagingAPI() {
URL myURL = null;
Expand Down Expand Up @@ -2048,6 +2068,48 @@ private void runTestMessagingAPI() {
}
}

@Trace(dispatcher = true)
private void runTestMessagingAPIWithHostAndPort() {
URL myURL = null;
try {
Thread.sleep(600);
myURL = new URL("http://localhost:8088");
HttpUriRequest request = RequestBuilder.get().setUri(myURL.toURI()).build();

ApiTestHelper.OutboundWrapper outboundRequestWrapper = new ApiTestHelper.OutboundWrapper(request, HeaderType.MESSAGE);

// MessageProducer
ExternalParameters messageProduceParameters = MessageProduceParameters
.library("JMS")
.destinationType(DestinationType.NAMED_QUEUE)
.destinationName("MessageDestination")
.outboundHeaders(outboundRequestWrapper)
.instance(myURL.getHost(), myURL.getPort())
.build();
NewRelic.getAgent().getTracedMethod().reportAsExternal(messageProduceParameters);

Assert.assertTrue(request.getHeaders("NewRelicID").length != 0);
Assert.assertTrue(request.getHeaders("NewRelicTransaction").length != 0);

CloseableHttpClient connection = HttpClientBuilder.create().build();
CloseableHttpResponse response = connection.execute(request);

// MessageConsumer
ExternalParameters messageResponseParameters = MessageConsumeParameters
.library("JMS")
.destinationType(DestinationType.TEMP_QUEUE)
.destinationName("MessageDestination")
.inboundHeaders(new ApiTestHelper.InboundWrapper(response, HeaderType.MESSAGE))
.build();
NewRelic.getAgent().getTracedMethod().reportAsExternal(messageResponseParameters);

Assert.assertTrue(response.getHeaders("NewRelicAppData").length != 0);
} catch (Exception e) {
e.printStackTrace();
Assert.fail();
}
}

@Test
public void testNotNull() {
Assert.assertNotNull(NewRelic.getAgent().getTransaction().getTracedMethod());
Expand Down
21 changes: 21 additions & 0 deletions instrumentation/activemq-client-5.8.0/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
dependencies {
implementation(project(":agent-bridge"))
implementation("org.apache.activemq:activemq-client:5.16.7")
testImplementation("org.slf4j:slf4j-simple:1.7.30")
}


jar {
manifest { attributes 'Implementation-Title': 'com.newrelic.instrumentation.activemq-client-5.8.0' }
}


verifyInstrumentation {
passesOnly 'org.apache.activemq:activemq-client:[5.8.0,)'
}


site {
title 'ActiveMQClient'
type 'Messaging'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
*
* * Copyright 2024 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/
package com.nr.agent.instrumentation.activemqclient580;

import com.newrelic.agent.bridge.AgentBridge;
import com.newrelic.agent.bridge.messaging.BrokerInstance;

import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class ActiveMQUtil {

private static final ActiveMQUtil INSTANCE = ActiveMQUtil.create();

public static ActiveMQUtil get() {
return INSTANCE;
}

private static final Pattern ADDRESS_PATTERN = Pattern.compile("^\\w+://(.+)/.+:(\\d+)");
private final Function<String, BrokerInstance> DO_PARSE_HOST_REF = this::doParseHostAndPort;
private final Function<String, BrokerInstance> CACHE = AgentBridge.collectionFactory.memorize(DO_PARSE_HOST_REF, 32);

public BrokerInstance parseHostAndPort(String address) {
return CACHE.apply(address);
}
public BrokerInstance doParseHostAndPort(String address) {

Matcher m = ADDRESS_PATTERN.matcher(address);
if(!m.find()) {
return BrokerInstance.empty();
}

String hostName = m.group(1);
int port;

try {
String portStr = m.group(2);
port = Integer.parseInt(portStr);
} catch (NumberFormatException e) {
return BrokerInstance.empty();
}
return new BrokerInstance(hostName, port);
}

private ActiveMQUtil() {
// prevent instantiation of utility class
}

private static ActiveMQUtil create() {
return new ActiveMQUtil();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
*
* * Copyright 2024 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/
package org.apache.activemq.command;

import com.newrelic.api.agent.weaver.MatchType;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;
import com.nr.agent.instrumentation.activemqclient580.ActiveMQUtil;
import org.apache.activemq.ActiveMQConnection;

import static com.newrelic.agent.bridge.messaging.JmsProperties.NR_JMS_BROKER_INSTANCE_PROPERTY;

@Weave(type = MatchType.BaseClass, originalName = "org.apache.activemq.command.ActiveMQMessage")
public abstract class ActiveMQMessage_Instrumentation {
public abstract ActiveMQConnection getConnection();

// This is so the JMS instrumentation can grab host and port of the Active MQ instance
public Object getObjectProperty(String name) {
if (NR_JMS_BROKER_INSTANCE_PROPERTY.equals(name)) {
return ActiveMQUtil.get().parseHostAndPort(getConnection().getTransport().toString());
}
return Weaver.callOriginal();
}
}
Loading
Loading