Skip to content

Commit

Permalink
Simplify the logic reading session-id by making it final (#35822)
Browse files Browse the repository at this point in the history
  • Loading branch information
anuchandy authored Jul 11, 2023
1 parent e1b23ad commit 54d92a3
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,10 @@ private Flux<ServiceBusMessageContext> getSession(Scheduler scheduler, boolean d
return existing;
}

return new ServiceBusSessionReceiver(link, messageSerializer, connectionProcessor.getRetryOptions(),
final Duration idleTimeout = disposeOnIdle ? sessionIdleTimeout : null;
return new ServiceBusSessionReceiver(sessionId, link, messageSerializer, connectionProcessor.getRetryOptions(),
receiverOptions.getPrefetchCount(), scheduler, this::renewSessionLock,
maxSessionLockRenewDuration, disposeOnIdle ? sessionIdleTimeout : null);
maxSessionLockRenewDuration, idleTimeout);
})))
.flatMapMany(sessionReceiver -> sessionReceiver.receive().doFinally(signalType -> {
LOGGER.atVerbose()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

import static com.azure.core.amqp.implementation.ClientConstants.ENTITY_PATH_KEY;
import static com.azure.core.amqp.implementation.ClientConstants.LINK_NAME_KEY;
import static com.azure.core.amqp.implementation.ClientConstants.NOT_APPLICABLE;
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.MESSAGE_ID_LOGGING_KEY;
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.SESSION_ID_KEY;

Expand All @@ -47,8 +46,8 @@ class ServiceBusSessionReceiver implements AsyncCloseable, AutoCloseable {
// also, the lock is removed after the completion of the message disposition.
private final LockContainer<OffsetDateTime> lockContainer;
private final AtomicReference<OffsetDateTime> sessionLockedUntil = new AtomicReference<>();
private final AtomicReference<String> sessionId = new AtomicReference<>();
private final AtomicReference<LockRenewalOperation> renewalOperation = new AtomicReference<>();
private final String sessionId;
private final ServiceBusReceiveLink receiveLink;
private final Disposable.Composite subscriptions;
private final Flux<ServiceBusMessageContext> receivedMessages;
Expand All @@ -61,6 +60,7 @@ class ServiceBusSessionReceiver implements AsyncCloseable, AutoCloseable {
* Creates a receiver for the first available session.
*
* @param receiveLink Service Bus receive link for available session.
* @param sessionId Identifier of the Service Bus Session that the receiver is associated with.
* @param messageSerializer Serializes and deserializes messages from Service Bus.
* @param retryOptions Retry options for the receiver.
* @param prefetch Number of messages to prefetch from session.
Expand All @@ -71,10 +71,11 @@ class ServiceBusSessionReceiver implements AsyncCloseable, AutoCloseable {
* @param sessionIdleTimeout Timeout after which session receiver will be disposed if there are no more messages
* and the receiver is idle. Set it to {@code null} to not dispose receiver.
*/
ServiceBusSessionReceiver(ServiceBusReceiveLink receiveLink, MessageSerializer messageSerializer,
ServiceBusSessionReceiver(String sessionId, ServiceBusReceiveLink receiveLink, MessageSerializer messageSerializer,
AmqpRetryOptions retryOptions, int prefetch, Scheduler scheduler,
Function<String, Mono<OffsetDateTime>> renewSessionLock, Duration maxSessionLockRenewDuration, Duration sessionIdleTimeout) {

this.sessionId = sessionId;
this.receiveLink = receiveLink;
this.lockContainer = new LockContainer<>(ServiceBusConstants.OPERATION_TIMEOUT);
this.retryOptions = retryOptions;
Expand Down Expand Up @@ -157,14 +158,6 @@ class ServiceBusSessionReceiver implements AsyncCloseable, AutoCloseable {
}));
}

this.subscriptions.add(receiveLink.getSessionId().subscribe(id -> {
if (!sessionId.compareAndSet(null, id)) {
LOGGER.atWarning()
.addKeyValue("existingSessionId", sessionId.get())
.addKeyValue("returnedSessionId", id)
.log("Another method set sessionId.");
}
}));
this.subscriptions.add(receiveLink.getSessionLockedUntil().subscribe(lockedUntil -> {
if (!sessionLockedUntil.compareAndSet(null, lockedUntil)) {
withReceiveLinkInformation(LOGGER.atInfo())
Expand All @@ -174,7 +167,7 @@ class ServiceBusSessionReceiver implements AsyncCloseable, AutoCloseable {

return;
}
this.renewalOperation.compareAndSet(null, new LockRenewalOperation(sessionId.get(),
this.renewalOperation.compareAndSet(null, new LockRenewalOperation(sessionId,
maxSessionLockRenewDuration, true, renewSessionLock, lockedUntil));
}));
}
Expand Down Expand Up @@ -204,7 +197,7 @@ String getLinkName() {
}

String getSessionId() {
return sessionId.get();
return sessionId;
}

/**
Expand Down Expand Up @@ -254,9 +247,7 @@ public void close() {
}

private LoggingEventBuilder withReceiveLinkInformation(LoggingEventBuilder builder) {
final String current = sessionId.get();

return builder.addKeyValue(SESSION_ID_KEY, current != null ? current : NOT_APPLICABLE)
return builder.addKeyValue(SESSION_ID_KEY, sessionId)
.addKeyValue(ENTITY_PATH_KEY, receiveLink.getEntityPath())
.addKeyValue(LINK_NAME_KEY, receiveLink.getLinkName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void getsProperties() {
final Duration maxSessionRenewalDuration = Duration.ofMinutes(5);

// Act
final ServiceBusSessionReceiver sessionReceiver = new ServiceBusSessionReceiver(amqpReceiveLink,
final ServiceBusSessionReceiver sessionReceiver = new ServiceBusSessionReceiver(sessionId, amqpReceiveLink,
messageSerializer, retryOptions, 1, scheduler,
unused -> renewSessionLock(Duration.ofMinutes(1)), maxSessionRenewalDuration, NO_SESSION_IDLE_TIMEOUT);

Expand Down Expand Up @@ -152,7 +152,7 @@ public void receivesMessages() {
final AmqpRetryOptions retryOptions = new AmqpRetryOptions();
final Scheduler scheduler = Schedulers.boundedElastic();
final Duration maxSessionRenewalDuration = Duration.ofMinutes(5);
final ServiceBusSessionReceiver sessionReceiver = new ServiceBusSessionReceiver(amqpReceiveLink,
final ServiceBusSessionReceiver sessionReceiver = new ServiceBusSessionReceiver(sessionId, amqpReceiveLink,
messageSerializer, retryOptions, 1, scheduler,
unused -> renewSessionLock(Duration.ofMinutes(1)), maxSessionRenewalDuration, NO_SESSION_IDLE_TIMEOUT);

Expand Down Expand Up @@ -222,7 +222,7 @@ public void disposesOnIdle() {
final AmqpRetryOptions retryOptions = new AmqpRetryOptions().setTryTimeout(Duration.ofMinutes(10));
final Scheduler scheduler = Schedulers.boundedElastic();
final Duration maxSessionRenewalDuration = Duration.ofMinutes(5);
final ServiceBusSessionReceiver sessionReceiver = new ServiceBusSessionReceiver(amqpReceiveLink,
final ServiceBusSessionReceiver sessionReceiver = new ServiceBusSessionReceiver(sessionId, amqpReceiveLink,
messageSerializer, retryOptions, 1, scheduler,
unused -> renewSessionLock(Duration.ofMinutes(1)), maxSessionRenewalDuration, waitTime);

Expand Down Expand Up @@ -281,7 +281,7 @@ public void removesLockOnUpdateDisposition() {
final AmqpRetryOptions retryOptions = new AmqpRetryOptions();
final Scheduler scheduler = Schedulers.boundedElastic();
final Duration maxSessionRenewalDuration = Duration.ofMinutes(5);
final ServiceBusSessionReceiver sessionReceiver = new ServiceBusSessionReceiver(amqpReceiveLink,
final ServiceBusSessionReceiver sessionReceiver = new ServiceBusSessionReceiver(sessionId, amqpReceiveLink,
messageSerializer, retryOptions, 1, scheduler,
unused -> renewSessionLock(Duration.ofMinutes(1)), maxSessionRenewalDuration, NO_SESSION_IDLE_TIMEOUT);

Expand Down

0 comments on commit 54d92a3

Please sign in to comment.