Skip to content

Commit

Permalink
Add batch direct get (#1229)
Browse files Browse the repository at this point in the history
  • Loading branch information
MauriceVanVeen authored Oct 3, 2024
1 parent e6426b7 commit 27efeab
Show file tree
Hide file tree
Showing 13 changed files with 830 additions and 10 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,8 @@ You can however set the deliver policy which will be used to start the subscript
| JsConsumerCreate290NotAvailable | CON-90301 | Name field not valid when v2.9.0 consumer create api is not available. |
| JsConsumerNameDurableMismatch | CON-90302 | Name must match durable if both are supplied. |
| JsMultipleFilterSubjects210NotAvailable | CON-90303 | Multiple filter subjects not available until server version 2.10.0. |
| JsAllowDirectRequired | CON-90304 | Stream must have allow direct set. |
| JsDirectBatchGet211NotAvailable | CON-90305 | Batch direct get not available until server version 2.11.0. |
| OsObjectNotFound | OS-90201 | The object was not found. |
| OsObjectIsDeleted | OS-90202 | The object is deleted. |
| OsObjectAlreadyExists | OS-90203 | An object with that name already exists. |
Expand Down
43 changes: 43 additions & 0 deletions src/main/java/io/nats/client/JetStreamManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.io.IOException;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;

/**
* JetStream Management context for creation and access to streams and consumers in NATS.
Expand Down Expand Up @@ -324,6 +325,48 @@ public interface JetStreamManagement {
*/
MessageInfo getNextMessage(String streamName, long seq, String subject) throws IOException, JetStreamApiException;

/**
* Request a batch of messages using a {@link MessageBatchGetRequest}.
* <p>
* This API is currently EXPERIMENTAL and is subject to change.
*
* @param streamName the name of the stream
* @param messageBatchGetRequest the request details
* @return a list containing {@link MessageInfo}
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
List<MessageInfo> fetchMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException;

/**
* Request a batch of messages using a {@link MessageBatchGetRequest}.
* <p>
* This API is currently EXPERIMENTAL and is subject to change.
*
* @param streamName the name of the stream
* @param messageBatchGetRequest the request details
* @return a queue used to asynchronously receive {@link MessageInfo}
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
LinkedBlockingQueue<MessageInfo> queueMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException;

/**
* Request a batch of messages using a {@link MessageBatchGetRequest}.
* <p>
* This API is currently EXPERIMENTAL and is subject to change.
*
* @param streamName the name of the stream
* @param messageBatchGetRequest the request details
* @param handler the handler used for receiving {@link MessageInfo}
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
void requestMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) throws IOException, JetStreamApiException;

/**
* Deletes a message, overwriting the message data with garbage
* This can be considered an expensive (time-consuming) operation, but is more secure.
Expand Down
29 changes: 29 additions & 0 deletions src/main/java/io/nats/client/MessageInfoHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.client;

import io.nats.client.api.MessageInfo;

/**
* Handler for {@link MessageInfo}.
*/
public interface MessageInfoHandler {
/**
* Called to deliver a {@link MessageInfo} to the handler.
*
* @param messageInfo the received {@link MessageInfo}
* @throws InterruptedException if the thread for this handler is interrupted
*/
void onMessageInfo(MessageInfo messageInfo) throws InterruptedException;
}
6 changes: 6 additions & 0 deletions src/main/java/io/nats/client/api/ApiResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ public ApiResponse() {
type = NO_TYPE;
}

public ApiResponse(Error error) {
jv = null;
this.error = error;
type = NO_TYPE;
}

@SuppressWarnings("unchecked")
public T throwOnHasError() throws JetStreamApiException {
if (hasError()) {
Expand Down
Loading

0 comments on commit 27efeab

Please sign in to comment.