diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java index 41d114c7e74b8..6b528a991e470 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java @@ -338,9 +338,10 @@ private Flux getSession(Scheduler scheduler, boolean d return existing; } - return new ServiceBusSessionReceiver(link, messageSerializer, connectionProcessor.getRetryOptions(), + final Duration idleTimeout = disposeOnIdle ? sessionIdleTimeout : null; + return new ServiceBusSessionReceiver(sessionId, link, messageSerializer, connectionProcessor.getRetryOptions(), receiverOptions.getPrefetchCount(), scheduler, this::renewSessionLock, - maxSessionLockRenewDuration, disposeOnIdle ? sessionIdleTimeout : null); + maxSessionLockRenewDuration, idleTimeout); }))) .flatMapMany(sessionReceiver -> sessionReceiver.receive().doFinally(signalType -> { LOGGER.atVerbose() diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiver.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiver.java index e0076403cd2a5..a163a4a565cd0 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiver.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiver.java @@ -29,7 +29,6 @@ import static com.azure.core.amqp.implementation.ClientConstants.ENTITY_PATH_KEY; import static com.azure.core.amqp.implementation.ClientConstants.LINK_NAME_KEY; -import static com.azure.core.amqp.implementation.ClientConstants.NOT_APPLICABLE; import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.MESSAGE_ID_LOGGING_KEY; import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.SESSION_ID_KEY; @@ -47,8 +46,8 @@ class ServiceBusSessionReceiver implements AsyncCloseable, AutoCloseable { // also, the lock is removed after the completion of the message disposition. private final LockContainer lockContainer; private final AtomicReference sessionLockedUntil = new AtomicReference<>(); - private final AtomicReference sessionId = new AtomicReference<>(); private final AtomicReference renewalOperation = new AtomicReference<>(); + private final String sessionId; private final ServiceBusReceiveLink receiveLink; private final Disposable.Composite subscriptions; private final Flux receivedMessages; @@ -61,6 +60,7 @@ class ServiceBusSessionReceiver implements AsyncCloseable, AutoCloseable { * Creates a receiver for the first available session. * * @param receiveLink Service Bus receive link for available session. + * @param sessionId Identifier of the Service Bus Session that the receiver is associated with. * @param messageSerializer Serializes and deserializes messages from Service Bus. * @param retryOptions Retry options for the receiver. * @param prefetch Number of messages to prefetch from session. @@ -71,10 +71,11 @@ class ServiceBusSessionReceiver implements AsyncCloseable, AutoCloseable { * @param sessionIdleTimeout Timeout after which session receiver will be disposed if there are no more messages * and the receiver is idle. Set it to {@code null} to not dispose receiver. */ - ServiceBusSessionReceiver(ServiceBusReceiveLink receiveLink, MessageSerializer messageSerializer, + ServiceBusSessionReceiver(String sessionId, ServiceBusReceiveLink receiveLink, MessageSerializer messageSerializer, AmqpRetryOptions retryOptions, int prefetch, Scheduler scheduler, Function> renewSessionLock, Duration maxSessionLockRenewDuration, Duration sessionIdleTimeout) { + this.sessionId = sessionId; this.receiveLink = receiveLink; this.lockContainer = new LockContainer<>(ServiceBusConstants.OPERATION_TIMEOUT); this.retryOptions = retryOptions; @@ -157,14 +158,6 @@ class ServiceBusSessionReceiver implements AsyncCloseable, AutoCloseable { })); } - this.subscriptions.add(receiveLink.getSessionId().subscribe(id -> { - if (!sessionId.compareAndSet(null, id)) { - LOGGER.atWarning() - .addKeyValue("existingSessionId", sessionId.get()) - .addKeyValue("returnedSessionId", id) - .log("Another method set sessionId."); - } - })); this.subscriptions.add(receiveLink.getSessionLockedUntil().subscribe(lockedUntil -> { if (!sessionLockedUntil.compareAndSet(null, lockedUntil)) { withReceiveLinkInformation(LOGGER.atInfo()) @@ -174,7 +167,7 @@ class ServiceBusSessionReceiver implements AsyncCloseable, AutoCloseable { return; } - this.renewalOperation.compareAndSet(null, new LockRenewalOperation(sessionId.get(), + this.renewalOperation.compareAndSet(null, new LockRenewalOperation(sessionId, maxSessionLockRenewDuration, true, renewSessionLock, lockedUntil)); })); } @@ -204,7 +197,7 @@ String getLinkName() { } String getSessionId() { - return sessionId.get(); + return sessionId; } /** @@ -254,9 +247,7 @@ public void close() { } private LoggingEventBuilder withReceiveLinkInformation(LoggingEventBuilder builder) { - final String current = sessionId.get(); - - return builder.addKeyValue(SESSION_ID_KEY, current != null ? current : NOT_APPLICABLE) + return builder.addKeyValue(SESSION_ID_KEY, sessionId) .addKeyValue(ENTITY_PATH_KEY, receiveLink.getEntityPath()) .addKeyValue(LINK_NAME_KEY, receiveLink.getLinkName()); } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverTest.java index aa7b9a565d53d..e9609f1c0f8a1 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverTest.java @@ -104,7 +104,7 @@ public void getsProperties() { final Duration maxSessionRenewalDuration = Duration.ofMinutes(5); // Act - final ServiceBusSessionReceiver sessionReceiver = new ServiceBusSessionReceiver(amqpReceiveLink, + final ServiceBusSessionReceiver sessionReceiver = new ServiceBusSessionReceiver(sessionId, amqpReceiveLink, messageSerializer, retryOptions, 1, scheduler, unused -> renewSessionLock(Duration.ofMinutes(1)), maxSessionRenewalDuration, NO_SESSION_IDLE_TIMEOUT); @@ -152,7 +152,7 @@ public void receivesMessages() { final AmqpRetryOptions retryOptions = new AmqpRetryOptions(); final Scheduler scheduler = Schedulers.boundedElastic(); final Duration maxSessionRenewalDuration = Duration.ofMinutes(5); - final ServiceBusSessionReceiver sessionReceiver = new ServiceBusSessionReceiver(amqpReceiveLink, + final ServiceBusSessionReceiver sessionReceiver = new ServiceBusSessionReceiver(sessionId, amqpReceiveLink, messageSerializer, retryOptions, 1, scheduler, unused -> renewSessionLock(Duration.ofMinutes(1)), maxSessionRenewalDuration, NO_SESSION_IDLE_TIMEOUT); @@ -222,7 +222,7 @@ public void disposesOnIdle() { final AmqpRetryOptions retryOptions = new AmqpRetryOptions().setTryTimeout(Duration.ofMinutes(10)); final Scheduler scheduler = Schedulers.boundedElastic(); final Duration maxSessionRenewalDuration = Duration.ofMinutes(5); - final ServiceBusSessionReceiver sessionReceiver = new ServiceBusSessionReceiver(amqpReceiveLink, + final ServiceBusSessionReceiver sessionReceiver = new ServiceBusSessionReceiver(sessionId, amqpReceiveLink, messageSerializer, retryOptions, 1, scheduler, unused -> renewSessionLock(Duration.ofMinutes(1)), maxSessionRenewalDuration, waitTime); @@ -281,7 +281,7 @@ public void removesLockOnUpdateDisposition() { final AmqpRetryOptions retryOptions = new AmqpRetryOptions(); final Scheduler scheduler = Schedulers.boundedElastic(); final Duration maxSessionRenewalDuration = Duration.ofMinutes(5); - final ServiceBusSessionReceiver sessionReceiver = new ServiceBusSessionReceiver(amqpReceiveLink, + final ServiceBusSessionReceiver sessionReceiver = new ServiceBusSessionReceiver(sessionId, amqpReceiveLink, messageSerializer, retryOptions, 1, scheduler, unused -> renewSessionLock(Duration.ofMinutes(1)), maxSessionRenewalDuration, NO_SESSION_IDLE_TIMEOUT);