Skip to content

Commit

Permalink
Java: add PUBSUB CHANNELS, NUMPAT and NUMSUB commands (#2105)
Browse files Browse the repository at this point in the history
Signed-off-by: Yury-Fridlyand <[email protected]>
Co-authored-by: Andrew Carbonetto <[email protected]>
  • Loading branch information
Yury-Fridlyand and acarbonetto authored Aug 15, 2024
1 parent c550a43 commit 91fae38
Show file tree
Hide file tree
Showing 8 changed files with 679 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#### Changes
* Java: Added PUBSUB CHANNELS, NUMPAT and NUMSUB commands ([#2105](https:/valkey-io/valkey-glide/pull/2105))
* Java: Added binary support for custom command ([#2109](https:/valkey-io/valkey-glide/pull/2109))
* Node: Added SSCAN command ([#2132](https:/valkey-io/valkey-glide/pull/2132))
* Node: Added FUNCTION KILL command ([#2114](https:/valkey-io/valkey-glide/pull/2114))
Expand Down
51 changes: 51 additions & 0 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@
import static command_request.CommandRequestOuterClass.RequestType.PfAdd;
import static command_request.CommandRequestOuterClass.RequestType.PfCount;
import static command_request.CommandRequestOuterClass.RequestType.PfMerge;
import static command_request.CommandRequestOuterClass.RequestType.PubSubChannels;
import static command_request.CommandRequestOuterClass.RequestType.PubSubNumPat;
import static command_request.CommandRequestOuterClass.RequestType.PubSubNumSub;
import static command_request.CommandRequestOuterClass.RequestType.Publish;
import static command_request.CommandRequestOuterClass.RequestType.RPop;
import static command_request.CommandRequestOuterClass.RequestType.RPush;
Expand Down Expand Up @@ -4495,6 +4498,54 @@ public CompletableFuture<String> publish(
});
}

@Override
public CompletableFuture<String[]> pubsubChannels() {
return commandManager.submitNewCommand(
PubSubChannels,
new String[0],
response -> castArray(handleArrayResponse(response), String.class));
}

@Override
public CompletableFuture<GlideString[]> pubsubChannelsBinary() {
return commandManager.submitNewCommand(
PubSubChannels,
new GlideString[0],
response -> castArray(handleArrayResponseBinary(response), GlideString.class));
}

@Override
public CompletableFuture<String[]> pubsubChannels(@NonNull String pattern) {
return commandManager.submitNewCommand(
PubSubChannels,
new String[] {pattern},
response -> castArray(handleArrayResponse(response), String.class));
}

@Override
public CompletableFuture<GlideString[]> pubsubChannels(@NonNull GlideString pattern) {
return commandManager.submitNewCommand(
PubSubChannels,
new GlideString[] {pattern},
response -> castArray(handleArrayResponseBinary(response), GlideString.class));
}

@Override
public CompletableFuture<Long> pubsubNumPat() {
return commandManager.submitNewCommand(PubSubNumPat, new String[0], this::handleLongResponse);
}

@Override
public CompletableFuture<Map<String, Long>> pubsubNumSub(@NonNull String[] channels) {
return commandManager.submitNewCommand(PubSubNumSub, channels, this::handleMapResponse);
}

@Override
public CompletableFuture<Map<GlideString, Long>> pubsubNumSub(@NonNull GlideString[] channels) {
return commandManager.submitNewCommand(
PubSubNumSub, channels, this::handleBinaryStringMapResponse);
}

@Override
public CompletableFuture<String> watch(@NonNull String[] keys) {
return commandManager.submitNewCommand(Watch, keys, this::handleStringResponse);
Expand Down
121 changes: 121 additions & 0 deletions java/client/src/main/java/glide/api/commands/PubSubBaseCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package glide.api.commands;

import glide.api.models.GlideString;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
Expand Down Expand Up @@ -40,4 +41,124 @@ public interface PubSubBaseCommands {
* }</pre>
*/
CompletableFuture<String> publish(GlideString message, GlideString channel);

/**
* Lists the currently active channels.
*
* @apiNote When in cluster mode, the command is routed to all nodes, and aggregates the response
* into a single array.
* @see <a href="https://valkey.io/commands/pubsub-channels/">valkey.io</a> for details.
* @return An <code>Array</code> of all active channels.
* @example
* <pre>{@code
* String[] response = client.pubsubChannels().get();
* assert Arrays.equals(new String[] { "channel1", "channel2" });
* }</pre>
*/
CompletableFuture<String[]> pubsubChannels();

/**
* Lists the currently active channels.<br>
* Unlike of {@link #pubsubChannels()}, returns channel names as {@link GlideString}s.
*
* @apiNote When in cluster mode, the command is routed to all nodes, and aggregates the response
* into a single array.
* @see <a href="https://valkey.io/commands/pubsub-channels/">valkey.io</a> for details.
* @return An <code>Array</code> of all active channels.
* @example
* <pre>{@code
* GlideString[] response = client.pubsubChannels().get();
* assert Arrays.equals(new GlideString[] { "channel1", "channel2" });
* }</pre>
*/
CompletableFuture<GlideString[]> pubsubChannelsBinary();

/**
* Lists the currently active channels.
*
* @apiNote When in cluster mode, the command is routed to all nodes, and aggregates the response
* into a single array.
* @see <a href="https://valkey.io/commands/pubsub-channels/">valkey.io</a> for details.
* @param pattern A glob-style pattern to match active channels.
* @return An <code>Array</code> of currently active channels matching the given pattern.
* @example
* <pre>{@code
* String[] response = client.pubsubChannels("news.*").get();
* assert Arrays.equals(new String[] { "news.sports", "news.weather" });
* }</pre>
*/
CompletableFuture<String[]> pubsubChannels(String pattern);

/**
* Lists the currently active channels.
*
* @apiNote When in cluster mode, the command is routed to all nodes, and aggregates the response
* into a single array.
* @see <a href="https://valkey.io/commands/pubsub-channels/">valkey.io</a> for details.
* @param pattern A glob-style pattern to match active channels.
* @return An <code>Array</code> of currently active channels matching the given pattern.
* @example
* <pre>{@code
* GlideString[] response = client.pubsubChannels(gs("news.*")).get();
* assert Arrays.equals(new GlideString[] { gs("news.sports"), gs("news.weather") });
* }</pre>
*/
CompletableFuture<GlideString[]> pubsubChannels(GlideString pattern);

/**
* Returns the number of unique patterns that are subscribed to by clients.
*
* @apiNote
* <ul>
* <li>When in cluster mode, the command is routed to all nodes, and aggregates the response
* into a single array.
* <li>This is the total number of unique patterns all the clients are subscribed to, not
* the count of clients subscribed to patterns.
* </ul>
*
* @see <a href="https://valkey.io/commands/pubsub-numpat/">valkey.io</a> for details.
* @return The number of unique patterns.
* @example
* <pre>{@code
* Long result = client.pubsubNumPat().get();
* assert result == 3L;
* }</pre>
*/
CompletableFuture<Long> pubsubNumPat();

/**
* Returns the number of subscribers (exclusive of clients subscribed to patterns) for the
* specified channels.
*
* @apiNote When in cluster mode, the command is routed to all nodes, and aggregates the response
* into a single map.
* @see <a href="https://valkey.io/commands/pubsub-numsub/">valkey.io</a> for details.
* @param channels The list of channels to query for the number of subscribers.
* @return A <code>Map</code> where keys are the channel names and values are the numbers of
* subscribers.
* @example
* <pre>{@code
* Map<String, Long> result = client.pubsubNumSub(new String[] {"channel1", "channel2"}).get();
* assert result.equals(Map.of("channel1", 3L, "channel2", 5L));
* }</pre>
*/
CompletableFuture<Map<String, Long>> pubsubNumSub(String[] channels);

/**
* Returns the number of subscribers (exclusive of clients subscribed to patterns) for the
* specified channels.
*
* @apiNote When in cluster mode, the command is routed to all nodes, and aggregates the response
* into a single map.
* @see <a href="https://valkey.io/commands/pubsub-numsub/">valkey.io</a> for details.
* @param channels The list of channels to query for the number of subscribers.
* @return A <code>Map</code> where keys are the channel names and values are the numbers of
* subscribers.
* @example
* <pre>{@code
* Map<GlideString, Long> result = client.pubsubNumSub(new GlideString[] {gs("channel1"), gs("channel2")}).get();
* assert result.equals(Map.of(gs("channel1"), 3L, gs("channel2"), 5L));
* }</pre>
*/
CompletableFuture<Map<GlideString, Long>> pubsubNumSub(GlideString[] channels);
}
72 changes: 72 additions & 0 deletions java/client/src/main/java/glide/api/models/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@
import static command_request.CommandRequestOuterClass.RequestType.PfCount;
import static command_request.CommandRequestOuterClass.RequestType.PfMerge;
import static command_request.CommandRequestOuterClass.RequestType.Ping;
import static command_request.CommandRequestOuterClass.RequestType.PubSubChannels;
import static command_request.CommandRequestOuterClass.RequestType.PubSubNumPat;
import static command_request.CommandRequestOuterClass.RequestType.PubSubNumSub;
import static command_request.CommandRequestOuterClass.RequestType.Publish;
import static command_request.CommandRequestOuterClass.RequestType.RPop;
import static command_request.CommandRequestOuterClass.RequestType.RPush;
Expand Down Expand Up @@ -6295,6 +6298,75 @@ public <ArgType> T publish(@NonNull ArgType message, @NonNull ArgType channel) {
return getThis();
}

/**
* Lists the currently active channels.
*
* @apiNote When in cluster mode, the command is routed to all nodes, and aggregates the response
* into a single array.
* @see <a href="https://valkey.io/commands/pubsub-channels/">valkey.io</a> for details.
* @return Command response - An <code>Array</code> of all active channels.
*/
public T pubsubChannels() {
protobufTransaction.addCommands(buildCommand(PubSubChannels));
return getThis();
}

/**
* Lists the currently active channels.
*
* @implNote {@link ArgType} is limited to {@link String} or {@link GlideString}, any other type
* will throw {@link IllegalArgumentException}.
* @apiNote When in cluster mode, the command is routed to all nodes, and aggregates the response
* into a single array.
* @see <a href="https://valkey.io/commands/pubsub-channels/">valkey.io</a> for details.
* @param pattern A glob-style pattern to match active channels.
* @return Command response - An <code>Array</code> of currently active channels matching the
* given pattern.
*/
public <ArgType> T pubsubChannels(@NonNull ArgType pattern) {
checkTypeOrThrow(pattern);
protobufTransaction.addCommands(buildCommand(PubSubChannels, newArgsBuilder().add(pattern)));
return getThis();
}

/**
* Returns the number of unique patterns that are subscribed to by clients.
*
* @apiNote
* <ul>
* <li>When in cluster mode, the command is routed to all nodes, and aggregates the response
* into a single array.
* <li>This is the total number of unique patterns all the clients are subscribed to, not
* the count of clients subscribed to patterns.
* </ul>
*
* @see <a href="https://valkey.io/commands/pubsub-numpat/">valkey.io</a> for details.
* @return Command response - The number of unique patterns.
*/
public T pubsubNumPat() {
protobufTransaction.addCommands(buildCommand(PubSubNumPat));
return getThis();
}

/**
* Returns the number of subscribers (exclusive of clients subscribed to patterns) for the
* specified channels.
*
* @implNote {@link ArgType} is limited to {@link String} or {@link GlideString}, any other type
* will throw {@link IllegalArgumentException}.
* @apiNote When in cluster mode, the command is routed to all nodes, and aggregates the response
* into a single map.
* @see <a href="https://valkey.io/commands/pubsub-numsub/">valkey.io</a> for details.
* @param channels The list of channels to query for the number of subscribers.
* @return Command response - A <code>Map</code> where keys are the channel names and values are
* the numbers of subscribers.
*/
public <ArgType> T pubsubNumSub(@NonNull ArgType[] channels) {
checkTypeOrThrow(channels);
protobufTransaction.addCommands(buildCommand(PubSubNumSub, newArgsBuilder().add(channels)));
return getThis();
}

/**
* Gets the union of all the given sets.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.models.configuration;

import static glide.api.models.GlideString.gs;

import glide.api.GlideClusterClient;
import glide.api.models.GlideString;
import java.util.HashMap;
Expand Down Expand Up @@ -91,6 +93,17 @@ public ClusterSubscriptionConfigurationBuilder subscription(
return this;
}

/**
* Add a subscription to a channel or to multiple channels if {@link
* PubSubClusterChannelMode#PATTERN} is used.<br>
* See {@link ClusterSubscriptionConfiguration#subscriptions}.
*/
public ClusterSubscriptionConfigurationBuilder subscription(
PubSubClusterChannelMode mode, String channelOrPattern) {
addSubscription(subscriptions, mode, gs(channelOrPattern));
return this;
}

/**
* Set all subscriptions in a bulk. Rewrites previously stored configurations.<br>
* See {@link ClusterSubscriptionConfiguration#subscriptions}.
Expand Down
Loading

0 comments on commit 91fae38

Please sign in to comment.