Skip to content

Commit

Permalink
Merge pull request #1226 from nats-io/direct-get-by-start-time
Browse files Browse the repository at this point in the history
Add Direct Get by start time
  • Loading branch information
MauriceVanVeen authored Sep 23, 2024
2 parents 45b2c66 + de4c0c8 commit 95987a2
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 7 deletions.
29 changes: 29 additions & 0 deletions src/main/java/io/nats/client/JetStreamManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,35 @@ public interface JetStreamManagement {
*/
MessageInfo getFirstMessage(String streamName, String subject) throws IOException, JetStreamApiException;

/**
* Get MessageInfo for the first message created at or after the start time.
* <p>
* This API is currently EXPERIMENTAL and is subject to change.
*
* @param streamName the name of the stream.
* @param startTime the start time to get the first message for.
* @return The MessageInfo
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
MessageInfo getFirstMessage(String streamName, ZonedDateTime startTime) throws IOException, JetStreamApiException;

/**
* Get MessageInfo for the first message created at or after the start time matching the subject.
* <p>
* This API is currently EXPERIMENTAL and is subject to change.
*
* @param streamName the name of the stream.
* @param startTime the start time to get the first message for.
* @param subject the subject to get the first message for.
* @return The MessageInfo
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
MessageInfo getFirstMessage(String streamName, ZonedDateTime startTime, String subject) throws IOException, JetStreamApiException;

/**
* Get MessageInfo for the message of the message sequence
* is equal to or greater the requested sequence for the subject.
Expand Down
27 changes: 20 additions & 7 deletions src/main/java/io/nats/client/api/MessageGetRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import io.nats.client.support.JsonSerializable;

import java.time.ZonedDateTime;

import static io.nats.client.support.ApiConstants.*;
import static io.nats.client.support.JsonUtils.*;

Expand All @@ -25,21 +27,30 @@ public class MessageGetRequest implements JsonSerializable {
private final long sequence;
private final String lastBySubject;
private final String nextBySubject;
private final ZonedDateTime startTime;

public static MessageGetRequest forSequence(long sequence) {
return new MessageGetRequest(sequence, null, null);
return new MessageGetRequest(sequence, null, null, null);
}

public static MessageGetRequest lastForSubject(String subject) {
return new MessageGetRequest(-1, subject, null);
return new MessageGetRequest(-1, subject, null, null);
}

public static MessageGetRequest firstForSubject(String subject) {
return new MessageGetRequest(-1, null, subject);
return new MessageGetRequest(-1, null, subject, null);
}

public static MessageGetRequest firstForStartTime(ZonedDateTime startTime) {
return new MessageGetRequest(-1, null, null, startTime);
}

public static MessageGetRequest firstForStartTimeAndSubject(ZonedDateTime startTime, String subject) {
return new MessageGetRequest(-1, null, subject, startTime);
}

public static MessageGetRequest nextForSubject(long sequence, String subject) {
return new MessageGetRequest(sequence, null, subject);
return new MessageGetRequest(sequence, null, subject, null);
}

/**
Expand Down Expand Up @@ -69,7 +80,7 @@ public static byte[] lastBySubjectBytes(String subject) {
*/
@Deprecated
public MessageGetRequest(long sequence) {
this(sequence, null, null);
this(sequence, null, null, null);
}

/**
Expand All @@ -79,13 +90,14 @@ public MessageGetRequest(long sequence) {
*/
@Deprecated
public MessageGetRequest(String lastBySubject) {
this(-1, lastBySubject, null);
this(-1, lastBySubject, null, null);
}

private MessageGetRequest(long sequence, String lastBySubject, String nextBySubject) {
private MessageGetRequest(long sequence, String lastBySubject, String nextBySubject, ZonedDateTime startTime) {
this.sequence = sequence;
this.lastBySubject = lastBySubject;
this.nextBySubject = nextBySubject;
this.startTime = startTime;
}

public long getSequence() {
Expand Down Expand Up @@ -118,6 +130,7 @@ public String toJson() {
addField(sb, SEQ, sequence);
addField(sb, LAST_BY_SUBJECT, lastBySubject);
addField(sb, NEXT_BY_SUBJECT, nextBySubject);
addField(sb, START_TIME, startTime);
return endJson(sb).toString();
}
}
16 changes: 16 additions & 0 deletions src/main/java/io/nats/client/impl/NatsJetStreamManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,22 @@ public MessageInfo getFirstMessage(String streamName, String subject) throws IOE
return _getMessage(streamName, MessageGetRequest.firstForSubject(subject));
}

/**
* {@inheritDoc}
*/
@Override
public MessageInfo getFirstMessage(String streamName, ZonedDateTime startTime) throws IOException, JetStreamApiException {
return _getMessage(streamName, MessageGetRequest.firstForStartTime(startTime));
}

/**
* {@inheritDoc}
*/
@Override
public MessageInfo getFirstMessage(String streamName, ZonedDateTime startTime, String subject) throws IOException, JetStreamApiException {
return _getMessage(streamName, MessageGetRequest.firstForStartTimeAndSubject(startTime, subject));
}

/**
* {@inheritDoc}
*/
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/nats/client/support/ApiConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ public interface ApiConstants {
String SOURCES = "sources";
String SRC = "src";
String STARTED = "started";
String START_TIME = "start_time";
String STATE = "state";
String STATS = "stats";
String STORAGE = "storage";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1293,6 +1293,8 @@ private void validateGetMessage(JetStreamManagement jsm, TestingStreamContainer
assertMessageInfo(tsc, 1, 2, jsm.getNextMessage(tsc.stream, 0, tsc.subject(1)), beforeCreated);
assertMessageInfo(tsc, 0, 1, jsm.getFirstMessage(tsc.stream, tsc.subject(0)), beforeCreated);
assertMessageInfo(tsc, 1, 2, jsm.getFirstMessage(tsc.stream, tsc.subject(1)), beforeCreated);
assertMessageInfo(tsc, 0, 1, jsm.getFirstMessage(tsc.stream, beforeCreated), beforeCreated);
assertMessageInfo(tsc, 1, 2, jsm.getFirstMessage(tsc.stream, beforeCreated, tsc.subject(1)), beforeCreated);

assertMessageInfo(tsc, 0, 1, jsm.getNextMessage(tsc.stream, 1, tsc.subject(0)), beforeCreated);
assertMessageInfo(tsc, 1, 2, jsm.getNextMessage(tsc.stream, 1, tsc.subject(1)), beforeCreated);
Expand All @@ -1305,6 +1307,7 @@ private void validateGetMessage(JetStreamManagement jsm, TestingStreamContainer

assertStatus(10003, assertThrows(JetStreamApiException.class, () -> jsm.getMessage(tsc.stream, -1)));
assertStatus(10003, assertThrows(JetStreamApiException.class, () -> jsm.getMessage(tsc.stream, 0)));
assertStatus(10003, assertThrows(JetStreamApiException.class, () -> jsm.getFirstMessage(tsc.stream, DEFAULT_TIME)));
assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getMessage(tsc.stream, 9)));
assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getLastMessage(tsc.stream, "not-a-subject")));
assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getFirstMessage(tsc.stream, "not-a-subject")));
Expand Down

0 comments on commit 95987a2

Please sign in to comment.