diff --git a/README.md b/README.md index 192b2782b..907d8ab1c 100644 --- a/README.md +++ b/README.md @@ -980,6 +980,8 @@ You can however set the deliver policy which will be used to start the subscript | JsConsumerCreate290NotAvailable | CON-90301 | Name field not valid when v2.9.0 consumer create api is not available. | | JsConsumerNameDurableMismatch | CON-90302 | Name must match durable if both are supplied. | | JsMultipleFilterSubjects210NotAvailable | CON-90303 | Multiple filter subjects not available until server version 2.10.0. | +| JsAllowDirectRequired | CON-90304 | Stream must have allow direct set. | +| JsDirectBatchGet211NotAvailable | CON-90305 | Batch direct get not available until server version 2.11.0. | | OsObjectNotFound | OS-90201 | The object was not found. | | OsObjectIsDeleted | OS-90202 | The object is deleted. | | OsObjectAlreadyExists | OS-90203 | An object with that name already exists. | diff --git a/src/main/java/io/nats/client/JetStreamManagement.java b/src/main/java/io/nats/client/JetStreamManagement.java index 499c8ef4d..6755e49d0 100644 --- a/src/main/java/io/nats/client/JetStreamManagement.java +++ b/src/main/java/io/nats/client/JetStreamManagement.java @@ -17,6 +17,7 @@ import java.io.IOException; import java.time.ZonedDateTime; import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; /** * JetStream Management context for creation and access to streams and consumers in NATS. @@ -324,6 +325,48 @@ public interface JetStreamManagement { */ MessageInfo getNextMessage(String streamName, long seq, String subject) throws IOException, JetStreamApiException; + /** + * Request a batch of messages using a {@link MessageBatchGetRequest}. + *

+ * This API is currently EXPERIMENTAL and is subject to change. + * + * @param streamName the name of the stream + * @param messageBatchGetRequest the request details + * @return a list containing {@link MessageInfo} + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + */ + List fetchMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException; + + /** + * Request a batch of messages using a {@link MessageBatchGetRequest}. + *

+ * This API is currently EXPERIMENTAL and is subject to change. + * + * @param streamName the name of the stream + * @param messageBatchGetRequest the request details + * @return a queue used to asynchronously receive {@link MessageInfo} + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + */ + LinkedBlockingQueue queueMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException; + + /** + * Request a batch of messages using a {@link MessageBatchGetRequest}. + *

+ * This API is currently EXPERIMENTAL and is subject to change. + * + * @param streamName the name of the stream + * @param messageBatchGetRequest the request details + * @param handler the handler used for receiving {@link MessageInfo} + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + */ + void requestMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) throws IOException, JetStreamApiException; + /** * Deletes a message, overwriting the message data with garbage * This can be considered an expensive (time-consuming) operation, but is more secure. diff --git a/src/main/java/io/nats/client/MessageInfoHandler.java b/src/main/java/io/nats/client/MessageInfoHandler.java new file mode 100644 index 000000000..6a9697a31 --- /dev/null +++ b/src/main/java/io/nats/client/MessageInfoHandler.java @@ -0,0 +1,29 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.client; + +import io.nats.client.api.MessageInfo; + +/** + * Handler for {@link MessageInfo}. + */ +public interface MessageInfoHandler { + /** + * Called to deliver a {@link MessageInfo} to the handler. + * + * @param messageInfo the received {@link MessageInfo} + * @throws InterruptedException if the thread for this handler is interrupted + */ + void onMessageInfo(MessageInfo messageInfo) throws InterruptedException; +} diff --git a/src/main/java/io/nats/client/api/ApiResponse.java b/src/main/java/io/nats/client/api/ApiResponse.java index 2dbe3a348..e97a42a41 100644 --- a/src/main/java/io/nats/client/api/ApiResponse.java +++ b/src/main/java/io/nats/client/api/ApiResponse.java @@ -76,6 +76,12 @@ public ApiResponse() { type = NO_TYPE; } + public ApiResponse(Error error) { + jv = null; + this.error = error; + type = NO_TYPE; + } + @SuppressWarnings("unchecked") public T throwOnHasError() throws JetStreamApiException { if (hasError()) { diff --git a/src/main/java/io/nats/client/api/MessageBatchGetRequest.java b/src/main/java/io/nats/client/api/MessageBatchGetRequest.java new file mode 100644 index 000000000..3be679f43 --- /dev/null +++ b/src/main/java/io/nats/client/api/MessageBatchGetRequest.java @@ -0,0 +1,347 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.client.api; + +import io.nats.client.support.JsonSerializable; + +import java.time.Duration; +import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import static io.nats.client.support.ApiConstants.*; +import static io.nats.client.support.JsonUtils.*; +import static io.nats.client.support.Validator.*; + +/** + * Object used to make a request for message batch get requests. + */ +public class MessageBatchGetRequest implements JsonSerializable { + + private final Duration timeout; + private final int batch; + private final int maxBytes; + private final long sequence; + private final ZonedDateTime startTime; + private final String nextBySubject; + private final List multiLastFor; + private final long upToSequence; + private final ZonedDateTime upToTime; + + MessageBatchGetRequest(Builder b) { + this.timeout = b.timeout; + this.batch = b.batch; + this.maxBytes = b.maxBytes; + this.sequence = b.sequence; + this.startTime = b.startTime; + this.nextBySubject = b.nextBySubject; + this.multiLastFor = b.multiLastFor; + this.upToSequence = b.upToSequence; + this.upToTime = b.upToTime; + } + + /** + * Timeout used for the request. + * + * @return Duration + */ + public Duration getTimeout() { + return timeout; + } + + /** + * Maximum amount of messages to be returned for this request. + * + * @return batch size + */ + public int getBatch() { + return batch; + } + + /** + * Maximum amount of returned bytes for this request. + * Limits the amount of returned messages to not exceed this. + * + * @return maximum bytes + */ + public int getMaxBytes() { + return maxBytes; + } + + /** + * Minimum sequence for returned messages. + * All returned messages will have a sequence equal to or higher than this. + * + * @return minimum message sequence + */ + public long getSequence() { + return sequence; + } + + /** + * Minimum start time for returned messages. + * All returned messages will have a start time equal to or higher than this. + * + * @return minimum message start time + */ + public ZonedDateTime getStartTime() { + return startTime; + } + + /** + * Subject used to filter messages that should be returned. + * + * @return the subject to filter + */ + public String getSubject() { + return nextBySubject; + } + + /** + * Subjects filter used, these can include wildcards. + * Will get the last messages matching the subjects. + * + * @return the subjects to get the last messages for + */ + public List getMultiLastForSubjects() { + return multiLastFor; + } + + /** + * Only return messages up to this sequence. + * + * @return the maximum message sequence to return results for + */ + public long getUpToSequence() { + return upToSequence; + } + + /** + * Only return messages up to this time. + * + * @return the maximum message time to return results for + */ + public ZonedDateTime getUpToTime() { + return upToTime; + } + + @Override + public String toJson() { + StringBuilder sb = beginJson(); + addField(sb, BATCH, batch); + addField(sb, MAX_BYTES, maxBytes); + addField(sb, SEQ, sequence); + addField(sb, START_TIME, startTime); + addField(sb, NEXT_BY_SUBJECT, nextBySubject); + addStrings(sb, MULTI_LAST, multiLastFor); + addField(sb, UP_TO_SEQ, upToSequence); + addField(sb, UP_TO_TIME, upToTime); + return endJson(sb).toString(); + } + + /** + * Creates a builder for the request. + * + * @return Builder + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Creates a builder for the request. + * + * @param req the {@link MessageBatchGetRequest} + * @return Builder + */ + public static Builder builder(MessageBatchGetRequest req) { + return req == null ? new Builder() : new Builder(req); + } + + /** + * {@link MessageBatchGetRequest} is created using a Builder. The builder supports chaining and will + * create a default set of options if no methods are calls. + * + *

{@code MessageBatchGetRequest.builder().build()} will create a default {@link MessageBatchGetRequest}. + */ + public static class Builder { + private Duration timeout = Duration.ofSeconds(5); + private int batch = -1; + private int maxBytes = -1; + private long sequence = -1; + private ZonedDateTime startTime = null; + private String nextBySubject = null; + private List multiLastFor = new ArrayList<>(); + private long upToSequence = -1; + private ZonedDateTime upToTime = null; + + /** + * Construct the builder + */ + public Builder() { + } + + /** + * Construct the builder and initialize values with the existing {@link MessageBatchGetRequest} + * + * @param req the {@link MessageBatchGetRequest} to clone + */ + public Builder(MessageBatchGetRequest req) { + if (req != null) { + this.timeout = req.timeout; + this.batch = req.batch; + this.maxBytes = req.maxBytes; + this.sequence = req.sequence; + this.startTime = req.startTime; + this.nextBySubject = req.nextBySubject; + this.multiLastFor = req.multiLastFor; + this.upToSequence = req.upToSequence; + this.upToTime = req.upToTime; + } + } + + /** + * Set the timeout used for the request. + * + * @param timeout the timeout + * @return Builder + */ + public Builder timeout(Duration timeout) { + validateDurationRequired(timeout); + this.timeout = timeout; + return this; + } + + /** + * Set the maximum amount of messages to be returned for this request. + * + * @param batch the batch size + * @return Builder + */ + public Builder batch(int batch) { + validateGtZero(batch, "Request batch size"); + this.batch = batch; + return this; + } + + /** + * Maximum amount of returned bytes for this request. + * Limits the amount of returned messages to not exceed this. + * + * @param maxBytes the maximum bytes + * @return Builder + */ + public Builder maxBytes(int maxBytes) { + this.maxBytes = maxBytes; + return this; + } + + /** + * Minimum sequence for returned messages. + * All returned messages will have a sequence equal to or higher than this. + * + * @param sequence the minimum message sequence + * @return Builder + */ + public Builder sequence(long sequence) { + validateGtEqZero(sequence, "Sequence"); + this.sequence = sequence; + return this; + } + + /** + * Minimum start time for returned messages. + * All returned messages will have a start time equal to or higher than this. + * + * @param startTime the minimum message start time + * @return Builder + */ + public Builder startTime(ZonedDateTime startTime) { + this.startTime = startTime; + return this; + } + + /** + * Subject used to filter messages that should be returned. + * + * @param subject the subject to filter + * @return Builder + */ + public Builder subject(String subject) { + this.nextBySubject = subject; + return this; + } + + /** + * Subjects filter used, these can include wildcards. + * Will get the last messages matching the subjects. + * + * @param subjects the subjects to get the last messages for + * @return Builder + */ + public Builder multiLastForSubjects(String... subjects) { + this.multiLastFor.clear(); + this.multiLastFor.addAll(Arrays.asList(subjects)); + return this; + } + + /** + * Subjects filter used, these can include wildcards. + * Will get the last messages matching the subjects. + * + * @param subjects the subjects to get the last messages for + * @return Builder + */ + public Builder multiLastForSubjects(Collection subjects) { + this.multiLastFor.clear(); + this.multiLastFor.addAll(subjects); + return this; + } + + /** + * Only return messages up to this sequence. + * If not set, will be last sequence for the stream. + * + * @param upToSequence the maximum message sequence to return results for + * @return Builder + */ + public Builder upToSequence(long upToSequence) { + validateGtZero(upToSequence, "Up to sequence"); + this.upToSequence = upToSequence; + return this; + } + + /** + * Only return messages up to this time. + * + * @param upToTime the maximum message time to return results for + * @return Builder + */ + public Builder upToTime(ZonedDateTime upToTime) { + this.upToTime = upToTime; + return this; + } + + /** + * Build the {@link MessageBatchGetRequest}. + * + * @return MessageBatchGetRequest + */ + public MessageBatchGetRequest build() { + return new MessageBatchGetRequest(this); + } + } +} diff --git a/src/main/java/io/nats/client/api/MessageInfo.java b/src/main/java/io/nats/client/api/MessageInfo.java index 206cd8b92..5ded6746f 100644 --- a/src/main/java/io/nats/client/api/MessageInfo.java +++ b/src/main/java/io/nats/client/api/MessageInfo.java @@ -32,6 +32,11 @@ */ public class MessageInfo extends ApiResponse { + /** + * Message returned as a response in {@link MessageBatchGetRequest} to signal end of data. + */ + public static final MessageInfo EOD = new MessageInfo(null, false); + private final boolean direct; private final String subject; private final long seq; @@ -40,6 +45,7 @@ public class MessageInfo extends ApiResponse { private final Headers headers; private final String stream; private final long lastSeq; + private final long numPending; /** * Create a Message Info @@ -51,6 +57,25 @@ public MessageInfo(Message msg) { this(msg, null, false); } + /** + * Create a Message Info + * This signature is public for testing purposes and is not intended to be used externally. + * @param error the error + * @param direct true if the object is being created from a get direct api call instead of the standard get message + */ + public MessageInfo(Error error, boolean direct) { + super(error); + this.direct = direct; + subject = null; + data = null; + seq = -1; + time = null; + headers = null; + stream = null; + lastSeq = -1; + numPending = -1; + } + /** * Create a Message Info * This signature is public for testing purposes and is not intended to be used externally. @@ -70,12 +95,20 @@ public MessageInfo(Message msg, String streamName, boolean direct) { seq = Long.parseLong(msgHeaders.getLast(NATS_SEQUENCE)); time = DateTimeUtils.parseDateTime(msgHeaders.getLast(NATS_TIMESTAMP)); stream = msgHeaders.getLast(NATS_STREAM); - String temp = msgHeaders.getLast(NATS_LAST_SEQUENCE); - if (temp == null) { + String tempLastSeq = msgHeaders.getLast(NATS_LAST_SEQUENCE); + if (tempLastSeq == null) { lastSeq = -1; } else { - lastSeq = JsonUtils.safeParseLong(temp, -1); + lastSeq = JsonUtils.safeParseLong(tempLastSeq, -1); + } + String tempNumPending = msgHeaders.getLast(NATS_NUM_PENDING); + if (tempNumPending == null) { + numPending = -1; + } + else { + // Num pending is +1 since it includes EOB message, correct that here. + numPending = Long.parseLong(tempNumPending) - 1; } // these are control headers, not real headers so don't give them to the user. headers = new Headers(msgHeaders, true, MESSAGE_INFO_HEADERS); @@ -88,6 +121,7 @@ else if (hasError()) { headers = null; stream = null; lastSeq = -1; + numPending = -1; } else { JsonValue mjv = readValue(jv, MESSAGE); @@ -99,6 +133,7 @@ else if (hasError()) { headers = hdrBytes == null ? null : new IncomingHeadersProcessor(hdrBytes).getHeaders(); stream = streamName; lastSeq = -1; + numPending = -1; } } @@ -158,11 +193,19 @@ public long getLastSeq() { return lastSeq; } + /** + * Amount of pending messages that can be requested with a subsequent batch request. + * @return number of pending messages + */ + public long getNumPending() { + return numPending; + } + @Override public String toString() { StringBuilder sb = JsonUtils.beginJsonPrefixed("\"MessageInfo\":"); JsonUtils.addField(sb, "direct", direct); - JsonUtils.addField(sb, "error", getError()); + JsonUtils.addField(sb, ERROR, getError()); JsonUtils.addField(sb, SUBJECT, subject); JsonUtils.addField(sb, SEQ, seq); if (data == null) { @@ -173,7 +216,8 @@ public String toString() { } JsonUtils.addField(sb, TIME, time); JsonUtils.addField(sb, STREAM, stream); - JsonUtils.addField(sb, "last_seq", lastSeq); + JsonUtils.addField(sb, LAST_SEQ, lastSeq); + JsonUtils.addField(sb, NUM_PENDING, numPending); JsonUtils.addField(sb, SUBJECT, subject); JsonUtils.addField(sb, HDRS, headers); return JsonUtils.endJson(sb).toString(); diff --git a/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java b/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java index 853bfc3ab..b712cff27 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java +++ b/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java @@ -47,6 +47,7 @@ public CachedStreamInfo(StreamInfo si) { final JetStreamOptions jso; final boolean consumerCreate290Available; final boolean multipleSubjectFilter210Available; + final boolean directBatchGet211Available; // ---------------------------------------------------------------------------------------------------- // Create / Init @@ -63,6 +64,7 @@ public CachedStreamInfo(StreamInfo si) { consumerCreate290Available = conn.getInfo().isSameOrNewerThanVersion("2.9.0") && !jso.isOptOut290ConsumerCreate(); multipleSubjectFilter210Available = conn.getInfo().isNewerVersionThan("2.9.99"); + directBatchGet211Available = conn.getInfo().isNewerVersionThan("2.10.99"); } NatsJetStreamImpl(NatsJetStreamImpl impl) { @@ -70,6 +72,7 @@ public CachedStreamInfo(StreamInfo si) { jso = impl.jso; consumerCreate290Available = impl.consumerCreate290Available; multipleSubjectFilter210Available = impl.multipleSubjectFilter210Available; + directBatchGet211Available = impl.directBatchGet211Available; } // ---------------------------------------------------------------------------------------------------- diff --git a/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java b/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java index b6fe2c7cd..7ac235f55 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java +++ b/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java @@ -16,12 +16,17 @@ import io.nats.client.*; import io.nats.client.api.Error; import io.nats.client.api.*; +import io.nats.client.support.Status; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.ZonedDateTime; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import static io.nats.client.support.NatsJetStreamClientError.JsAllowDirectRequired; +import static io.nats.client.support.NatsJetStreamClientError.JsDirectBatchGet211NotAvailable; import static io.nats.client.support.Validator.*; public class NatsJetStreamManagement extends NatsJetStreamImpl implements JetStreamManagement { @@ -340,6 +345,109 @@ private MessageInfo _getMessage(String streamName, MessageGetRequest messageGetR } } + /** + * {@inheritDoc} + */ + @Override + public List fetchMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException { + validateMessageBatchGetRequest(streamName, messageBatchGetRequest); + List results = new ArrayList<>(); + _requestMessageBatch(streamName, messageBatchGetRequest, msg -> { + if (msg != MessageInfo.EOD) { + results.add(msg); + } + }); + return results; + } + + /** + * {@inheritDoc} + */ + @Override + public LinkedBlockingQueue queueMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException { + validateMessageBatchGetRequest(streamName, messageBatchGetRequest); + final LinkedBlockingQueue q = new LinkedBlockingQueue<>(); + conn.getOptions().getExecutor().submit(() -> _requestMessageBatch(streamName, messageBatchGetRequest, q::add)); + return q; + } + + /** + * {@inheritDoc} + */ + @Override + public void requestMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) throws IOException, JetStreamApiException { + validateMessageBatchGetRequest(streamName, messageBatchGetRequest); + _requestMessageBatch(streamName, messageBatchGetRequest, handler); + } + + public void _requestMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) { + Subscription sub = null; + try { + String replyTo = conn.createInbox(); + sub = conn.subscribe(replyTo); + + String requestSubject = prependPrefix(String.format(JSAPI_DIRECT_GET, streamName)); + conn.publish(requestSubject, replyTo, messageBatchGetRequest.serialize()); + + long start = System.currentTimeMillis(); + long maxTimeMillis = messageBatchGetRequest.getTimeout().toMillis(); + long timeLeft = maxTimeMillis; + while (true) { + Message msg = sub.nextMessage(timeLeft); + if (msg == null) { + break; + } + if (msg.isStatusMessage()) { + Status status = msg.getStatus(); + // Report error, otherwise successful status. + if (status.getCode() < 200 || status.getCode() > 299) { + MessageInfo messageInfo = new MessageInfo(Error.convert(status), true); + handler.onMessageInfo(messageInfo); + } + break; + } + + Headers headers = msg.getHeaders(); + if (headers == null || headers.getLast(NATS_NUM_PENDING) == null) { + throw JsDirectBatchGet211NotAvailable.instance(); + } + + MessageInfo messageInfo = new MessageInfo(msg, streamName, true); + handler.onMessageInfo(messageInfo); + timeLeft = maxTimeMillis - (System.currentTimeMillis() - start); + } + } catch (InterruptedException e) { + // sub.nextMessage was fetching one message + // and data is not completely read + // so it seems like this is an error condition + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } finally { + try { + handler.onMessageInfo(MessageInfo.EOD); + } catch (Exception ignore) { + } + try { + //noinspection DataFlowIssue + sub.unsubscribe(); + } catch (Exception ignore) { + } + } + } + + private void validateMessageBatchGetRequest(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException { + validateNotNull(messageBatchGetRequest, "Message Batch Get Request"); + + if (!directBatchGet211Available) { + throw JsDirectBatchGet211NotAvailable.instance(); + } + + CachedStreamInfo csi = getCachedStreamInfo(streamName); + if (!csi.allowDirect) { + throw JsAllowDirectRequired.instance(); + } + } + /** * {@inheritDoc} */ diff --git a/src/main/java/io/nats/client/support/ApiConstants.java b/src/main/java/io/nats/client/support/ApiConstants.java index 16ff26f17..e3479b4b8 100644 --- a/src/main/java/io/nats/client/support/ApiConstants.java +++ b/src/main/java/io/nats/client/support/ApiConstants.java @@ -126,6 +126,7 @@ public interface ApiConstants { String MTIME = "mtime"; String MIRROR = "mirror"; String MSGS = "msgs"; + String MULTI_LAST = "multi_last"; String NAME = "name"; String NEXT_BY_SUBJECT = "next_by_subj"; String NO_ACK = "no_ack"; @@ -202,5 +203,7 @@ public interface ApiConstants { String TLS_AVAILABLE = "tls_available"; String TOTAL = "total"; String TYPE = "type"; + String UP_TO_SEQ = "up_to_seq"; + String UP_TO_TIME = "up_to_time"; String VERSION = "version"; } diff --git a/src/main/java/io/nats/client/support/NatsJetStreamClientError.java b/src/main/java/io/nats/client/support/NatsJetStreamClientError.java index 6bda9e345..5da56cdff 100644 --- a/src/main/java/io/nats/client/support/NatsJetStreamClientError.java +++ b/src/main/java/io/nats/client/support/NatsJetStreamClientError.java @@ -70,6 +70,8 @@ public class NatsJetStreamClientError { public static final NatsJetStreamClientError JsConsumerCreate290NotAvailable = new NatsJetStreamClientError(CON, 90301, "Name field not valid when v2.9.0 consumer create api is not available."); public static final NatsJetStreamClientError JsConsumerNameDurableMismatch = new NatsJetStreamClientError(CON, 90302, "Name must match durable if both are supplied."); public static final NatsJetStreamClientError JsMultipleFilterSubjects210NotAvailable = new NatsJetStreamClientError(CON, 90303, "Multiple filter subjects not available until server version 2.10.0."); + public static final NatsJetStreamClientError JsAllowDirectRequired = new NatsJetStreamClientError(CON, 90304, "Stream must have allow direct set."); + public static final NatsJetStreamClientError JsDirectBatchGet211NotAvailable = new NatsJetStreamClientError(CON, 90305, "Batch direct get not available until server version 2.11.0."); @Deprecated // Fixed spelling error public static final NatsJetStreamClientError JsSubFcHbHbNotValidQueue = new NatsJetStreamClientError(SUB, 90006, "Flow Control and/or heartbeat is not valid in queue mode."); diff --git a/src/main/java/io/nats/client/support/NatsJetStreamConstants.java b/src/main/java/io/nats/client/support/NatsJetStreamConstants.java index 45aed12b2..c7e4d4a3b 100644 --- a/src/main/java/io/nats/client/support/NatsJetStreamConstants.java +++ b/src/main/java/io/nats/client/support/NatsJetStreamConstants.java @@ -100,7 +100,8 @@ public interface NatsJetStreamConstants { String NATS_TIMESTAMP = "Nats-Time-Stamp"; String NATS_SUBJECT = "Nats-Subject"; String NATS_LAST_SEQUENCE = "Nats-Last-Sequence"; - String[] MESSAGE_INFO_HEADERS = new String[]{NATS_SUBJECT, NATS_SEQUENCE, NATS_TIMESTAMP, NATS_STREAM, NATS_LAST_SEQUENCE}; + String NATS_NUM_PENDING = "Nats-Num-Pending"; + String[] MESSAGE_INFO_HEADERS = new String[]{NATS_SUBJECT, NATS_SEQUENCE, NATS_TIMESTAMP, NATS_STREAM, NATS_LAST_SEQUENCE, NATS_NUM_PENDING}; String NATS_PENDING_MESSAGES = "Nats-Pending-Messages"; String NATS_PENDING_BYTES = "Nats-Pending-Bytes"; diff --git a/src/main/java/io/nats/client/support/Validator.java b/src/main/java/io/nats/client/support/Validator.java index 517832637..e8dcbf109 100644 --- a/src/main/java/io/nats/client/support/Validator.java +++ b/src/main/java/io/nats/client/support/Validator.java @@ -383,6 +383,13 @@ public static int validateGtZero(int i, String label) { return i; } + public static long validateGtZero(long l, String label) { + if (l < 1) { + throw new IllegalArgumentException(label + " must be greater than zero"); + } + return l; + } + public static long validateGtZeroOrMinus1(long l, String label) { if (zeroOrLtMinus1(l)) { throw new IllegalArgumentException(label + " must be greater than zero or -1 for unlimited"); diff --git a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java index 47f25d989..e05f21434 100644 --- a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java @@ -23,11 +23,13 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.time.Instant; +import java.time.ZoneOffset; import java.time.ZonedDateTime; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import static io.nats.client.support.DateTimeUtils.DEFAULT_TIME; import static io.nats.client.support.DateTimeUtils.ZONE_ID_GMT; @@ -1548,4 +1550,227 @@ public void testCreateConsumerUpdateConsumer() throws Exception { assertEquals(fs1, ci.getConsumerConfiguration().getFilterSubject()); }); } + + @Test + public void testBatchDirectGet() throws Exception { + jsServer.run(TestBase::atLeast2_11, nc -> { + JetStream js = nc.jetStream(); + JetStreamManagement jsm = nc.jetStreamManagement(); + + TestingStreamContainer tsc = new TestingStreamContainer(nc); + assertFalse(tsc.si.getConfiguration().getAllowDirect()); + + List expected = Arrays.asList("foo", "bar", "baz"); + for (String data : expected) { + js.publish(tsc.subject(), data.getBytes(StandardCharsets.UTF_8)); + } + + List batch = new ArrayList<>(); + MessageInfoHandler handler = msg -> { + if (!msg.hasError() && msg != MessageInfo.EOD) { + batch.add(msg); + } + }; + + // Stream doesn't have AllowDirect enabled, will error. + assertThrows(IllegalArgumentException.class, () -> { + MessageBatchGetRequest request = MessageBatchGetRequest.builder().build(); + jsm.requestMessageBatch(tsc.stream, request, handler); + }); + + // Enable AllowDirect. + StreamConfiguration sc = StreamConfiguration.builder(tsc.si.getConfiguration()).allowDirect(true).build(); + StreamInfo si = jsm.updateStream(sc); + assertTrue(si.getConfiguration().getAllowDirect()); + + // Empty request errors. + AtomicBoolean hasError = new AtomicBoolean(); + MessageInfoHandler errorHandler = msg -> { + hasError.compareAndSet(false, msg.hasError()); + }; + MessageBatchGetRequest request = MessageBatchGetRequest.builder().build(); + jsm.requestMessageBatch(tsc.stream, request, errorHandler); + assertTrue(hasError.get()); + List list = jsm.fetchMessageBatch(tsc.stream, request); + assertEquals(1, list.size()); + assertTrue(list.get(0).hasError()); + LinkedBlockingQueue queue = jsm.queueMessageBatch(tsc.stream, request); + assertTrue(queue.take().hasError()); + assertEquals(MessageInfo.EOD, queue.take()); + + // First batch gets first two messages. + request = MessageBatchGetRequest.builder() + .batch(2) + .subject(tsc.subject()) + .build(); + jsm.requestMessageBatch(tsc.stream, request, handler); + MessageInfo last = batch.get(batch.size() - 1); + assertEquals(1, last.getNumPending()); + assertEquals(2, last.getSeq()); + assertEquals(1, last.getLastSeq()); + + // Second batch gets last message. + request = MessageBatchGetRequest.builder(request) + .sequence(last.getSeq() + 1) + .build(); + jsm.requestMessageBatch(tsc.stream, request, handler); + + List actual = batch.stream().map(m -> new String(m.getData())).collect(Collectors.toList()); + assertEquals(expected, actual); + + last = batch.get(batch.size() - 1); + assertEquals(0, last.getNumPending()); + assertEquals(3, last.getSeq()); + assertEquals(0, last.getLastSeq()); + }); + } + + @Test + public void testBatchDirectGetAlternatives() throws Exception { + jsServer.run(TestBase::atLeast2_11, nc -> { + JetStream js = nc.jetStream(); + JetStreamManagement jsm = nc.jetStreamManagement(); + + TestingStreamContainer tsc = new TestingStreamContainer(nc); + assertFalse(tsc.si.getConfiguration().getAllowDirect()); + + // Enable AllowDirect. + StreamConfiguration sc = StreamConfiguration.builder(tsc.si.getConfiguration()).allowDirect(true).build(); + StreamInfo si = jsm.updateStream(sc); + assertTrue(si.getConfiguration().getAllowDirect()); + + List expected = Arrays.asList("foo", "bar", "baz"); + for (String data : expected) { + js.publish(tsc.subject(), data.getBytes(StandardCharsets.UTF_8)); + } + + // Request stays the same for all options. + MessageBatchGetRequest request = MessageBatchGetRequest.builder() + .batch(3) + .subject(tsc.subject()) + .build(); + + // Get using handler. + List batch = new ArrayList<>(); + MessageInfoHandler handler = msg -> { + if (!msg.hasError() && msg != MessageInfo.EOD) { + batch.add(msg); + } + }; + jsm.requestMessageBatch(tsc.stream, request, handler); + assertEquals(3, batch.size()); + MessageInfo last = batch.get(batch.size() - 1); + assertEquals(0, last.getNumPending()); + assertEquals(3, last.getSeq()); + assertEquals(2, last.getLastSeq()); + + // Get using queue. + batch.clear(); + LinkedBlockingQueue queue = jsm.queueMessageBatch(tsc.stream, request); + MessageInfo msg; + while ((msg = queue.take()) != MessageInfo.EOD) { + if (!msg.hasError()) { + batch.add(msg); + } + } + assertEquals(3, batch.size()); + last = batch.get(batch.size() - 1); + assertEquals(0, last.getNumPending()); + assertEquals(3, last.getSeq()); + assertEquals(2, last.getLastSeq()); + + // Get using fetch. + batch.clear(); + batch.addAll(jsm.fetchMessageBatch(tsc.stream, request)); + assertEquals(3, batch.size()); + last = batch.get(batch.size() - 1); + assertEquals(0, last.getNumPending()); + assertEquals(3, last.getSeq()); + assertEquals(2, last.getLastSeq()); + }); + } + + @Test + public void testBatchDirectGetMultiLast() throws Exception { + jsServer.run(TestBase::atLeast2_11, nc -> { + JetStream js = nc.jetStream(); + JetStreamManagement jsm = nc.jetStreamManagement(); + + String stream = stream(); + jsm.addStream(StreamConfiguration.builder() + .name(stream) + .subjects(stream + ".a.>") + .allowDirect(true) + .build()); + + String subjectAFoo = stream + ".a.foo"; + String subjectABar = stream + ".a.bar"; + String subjectABaz = stream + ".a.baz"; + js.publish(subjectAFoo, "foo".getBytes(StandardCharsets.UTF_8)); + js.publish(subjectABar, "bar".getBytes(StandardCharsets.UTF_8)); + js.publish(subjectABaz, "baz".getBytes(StandardCharsets.UTF_8)); + + MessageBatchGetRequest request = MessageBatchGetRequest.builder() + .multiLastForSubjects(subjectAFoo, subjectABaz) + .build(); + + List keys = new ArrayList<>(); + MessageInfoHandler handler = msg -> { + if (!msg.hasError() && msg != MessageInfo.EOD) { + keys.add(msg.getSubject()); + } + }; + jsm.requestMessageBatch(stream, request, handler); + assertEquals(2, keys.size()); + assertEquals(subjectAFoo, keys.get(0)); + assertEquals(subjectABaz, keys.get(1)); + }); + } + + @Test + public void testBatchDirectGetBuilder() { + // Default timeout + assertEquals(Duration.ofSeconds(5), MessageBatchGetRequest.builder().build().getTimeout()); + + // Request options. + MessageBatchGetRequest requestOptions = MessageBatchGetRequest.builder() + .timeout(Duration.ofSeconds(1)) + .maxBytes(1234) + .batch(2) + .build(); + assertEquals(Duration.ofSeconds(1), requestOptions.getTimeout()); + assertEquals(1234, requestOptions.getMaxBytes()); + assertEquals(2, requestOptions.getBatch()); + assertEquals("{\"batch\":2,\"max_bytes\":1234}", requestOptions.toJson()); + + // Batch direct get - simple + ZonedDateTime time = Instant.EPOCH.atZone(ZoneOffset.UTC); + MessageBatchGetRequest simple = MessageBatchGetRequest.builder() + .sequence(1) + .startTime(time) + .subject("subject") + .build(); + assertEquals(1, simple.getSequence()); + assertEquals(time, simple.getStartTime()); + assertEquals("subject", simple.getSubject()); + assertEquals("{\"seq\":1,\"start_time\":\"1970-01-01T00:00:00.000000000Z\",\"next_by_subj\":\"subject\"}", simple.toJson()); + + // Batch direct get - multi last + List multiLastFor = Collections.singletonList("multi.last"); + MessageBatchGetRequest multiLast = MessageBatchGetRequest.builder() + .multiLastForSubjects("multi.last") + .upToSequence(1) + .upToTime(time) + .build(); + assertEquals(Collections.singletonList("multi.last"), multiLast.getMultiLastForSubjects()); + assertEquals(1, multiLast.getUpToSequence()); + assertEquals(time, multiLast.getUpToTime()); + assertEquals("{\"multi_last\":[\"multi.last\"],\"up_to_seq\":1,\"up_to_time\":\"1970-01-01T00:00:00.000000000Z\"}", multiLast.toJson()); + + MessageBatchGetRequest multiLastAlternative = MessageBatchGetRequest.builder() + .multiLastForSubjects(multiLastFor) + .build(); + assertEquals(multiLastFor, multiLastAlternative.getMultiLastForSubjects()); + assertEquals("{\"multi_last\":[\"multi.last\"]}", multiLastAlternative.toJson()); + } }