Skip to content

Commit

Permalink
Node: Add XPENDING command. (valkey-io#2085)
Browse files Browse the repository at this point in the history
* Add `XPENDING` command.

Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand authored and cyip10 committed Aug 12, 2024
1 parent 637fd30 commit 8ca6681
Show file tree
Hide file tree
Showing 7 changed files with 327 additions and 41 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#### Changes
* Node: Added XPENDING commands ([#2085](https:/valkey-io/valkey-glide/pull/2085))
* Node: Added HSCAN command ([#2098](https:/valkey-io/valkey-glide/pull/2098/))
* Node: Added XINFO CONSUMERS command ([#2093](https:/valkey-io/valkey-glide/pull/2093))
* Node: Added HRANDFIELD command ([#2096](https:/valkey-io/valkey-glide/pull/2096))
Expand Down
4 changes: 3 additions & 1 deletion node/npm/glide/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ function initialize() {
StreamAddOptions,
StreamReadOptions,
StreamClaimOptions,
StreamPendingOptions,
ScriptOptions,
ClosingError,
ConfigurationError,
Expand Down Expand Up @@ -232,8 +233,9 @@ function initialize() {
StreamGroupOptions,
StreamTrimOptions,
StreamAddOptions,
StreamReadOptions,
StreamClaimOptions,
StreamReadOptions,
StreamPendingOptions,
ScriptOptions,
ClosingError,
ConfigurationError,
Expand Down
73 changes: 73 additions & 0 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import {
StreamAddOptions,
StreamClaimOptions,
StreamGroupOptions,
StreamPendingOptions,
StreamReadOptions,
StreamTrimOptions,
ZAddOptions,
Expand Down Expand Up @@ -171,6 +172,7 @@ import {
createXGroupCreateConsumer,
createXGroupDelConsumer,
createXLen,
createXPending,
createXRead,
createXTrim,
createZAdd,
Expand Down Expand Up @@ -3971,6 +3973,77 @@ export class BaseClient {
return this.createWritePromise(createXLen(key));
}

/**
* Returns stream message summary information for pending messages matching a given range of IDs.
*
* See https://valkey.io/commands/xpending/ for more details.
*
* @param key - The key of the stream.
* @param group - The consumer group name.
* @returns An `array` that includes the summary of the pending messages. See example for more details.
* @example
* ```typescript
* console.log(await client.xpending("my_stream", "my_group")); // Output:
* // [
* // 42, // The total number of pending messages
* // "1722643465939-0", // The smallest ID among the pending messages
* // "1722643484626-0", // The greatest ID among the pending messages
* // [ // A 2D-`array` of every consumer in the group
* // [ "consumer1", "10" ], // with at least one pending message, and the
* // [ "consumer2", "32" ], // number of pending messages it has
* // ]
* // ]
* ```
*/
public async xpending(
key: string,
group: string,
): Promise<[number, string, string, [string, number][]]> {
return this.createWritePromise(createXPending(key, group));
}

/**
* Returns an extended form of stream message information for pending messages matching a given range of IDs.
*
* See https://valkey.io/commands/xpending/ for more details.
*
* @param key - The key of the stream.
* @param group - The consumer group name.
* @param options - Additional options to filter entries, see {@link StreamPendingOptions}.
* @returns A 2D-`array` of 4-tuples containing extended message information. See example for more details.
*
* @example
* ```typescript
* console.log(await client.xpending("my_stream", "my_group"), {
* start: { value: "0-1", isInclusive: true },
* end: InfScoreBoundary.PositiveInfinity,
* count: 2,
* consumer: "consumer1"
* }); // Output:
* // [
* // [
* // "1722643465939-0", // The ID of the message
* // "consumer1", // The name of the consumer that fetched the message and has still to acknowledge it
* // 174431, // The number of milliseconds that elapsed since the last time this message was delivered to this consumer
* // 1 // The number of times this message was delivered
* // ],
* // [
* // "1722643484626-0",
* // "consumer1",
* // 202231,
* // 1
* // ]
* // ]
* ```
*/
public async xpendingWithOptions(
key: string,
group: string,
options: StreamPendingOptions,
): Promise<[string, string, number, number][]> {
return this.createWritePromise(createXPending(key, group, options));
}

/**
* Returns the list of all consumers and their attributes for the given consumer group of the
* stream stored at `key`.
Expand Down
138 changes: 98 additions & 40 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1645,37 +1645,52 @@ type SortedSetRange<T> = {
export type RangeByScore = SortedSetRange<number> & { type: "byScore" };
export type RangeByLex = SortedSetRange<string> & { type: "byLex" };

/**
* Returns a string representation of a score boundary in Redis protocol format.
* @param score - The score boundary object containing value and inclusivity
* information.
* @param isLex - Indicates whether to return lexical representation for
* positive/negative infinity.
* @returns A string representation of the score boundary in Redis protocol
* format.
*/
/** Returns a string representation of a score boundary as a command argument. */
function getScoreBoundaryArg(
score: ScoreBoundary<number> | ScoreBoundary<string>,
isLex: boolean = false,
): string {
if (score == InfScoreBoundary.PositiveInfinity) {
return (
InfScoreBoundary.PositiveInfinity.toString() + (isLex ? "" : "inf")
);
if (typeof score === "string") {
// InfScoreBoundary
return score + "inf";
}

if (score == InfScoreBoundary.NegativeInfinity) {
return (
InfScoreBoundary.NegativeInfinity.toString() + (isLex ? "" : "inf")
);
if (score.isInclusive == false) {
return "(" + score.value.toString();
}

return score.value.toString();
}

/** Returns a string representation of a lex boundary as a command argument. */
function getLexBoundaryArg(
score: ScoreBoundary<number> | ScoreBoundary<string>,
): string {
if (typeof score === "string") {
// InfScoreBoundary
return score;
}

if (score.isInclusive == false) {
return "(" + score.value.toString();
}

return "[" + score.value.toString();
}

/** Returns a string representation of a stream boundary as a command argument. */
function getStreamBoundaryArg(
score: ScoreBoundary<number> | ScoreBoundary<string>,
): string {
if (typeof score === "string") {
// InfScoreBoundary
return score;
}

if (score.isInclusive == false) {
return "(" + score.value.toString();
}

const value = isLex ? "[" + score.value.toString() : score.value.toString();
return value;
return score.value.toString();
}

function createZRangeArgs(
Expand All @@ -1688,10 +1703,20 @@ function createZRangeArgs(

if (typeof rangeQuery.start != "number") {
rangeQuery = rangeQuery as RangeByScore | RangeByLex;
const isLex = rangeQuery.type == "byLex";
args.push(getScoreBoundaryArg(rangeQuery.start, isLex));
args.push(getScoreBoundaryArg(rangeQuery.stop, isLex));
args.push(isLex == true ? "BYLEX" : "BYSCORE");

if (rangeQuery.type == "byLex") {
args.push(
getLexBoundaryArg(rangeQuery.start),
getLexBoundaryArg(rangeQuery.stop),
"BYLEX",
);
} else {
args.push(
getScoreBoundaryArg(rangeQuery.start),
getScoreBoundaryArg(rangeQuery.stop),
"BYSCORE",
);
}
} else {
args.push(rangeQuery.start.toString());
args.push(rangeQuery.stop.toString());
Expand Down Expand Up @@ -1724,9 +1749,11 @@ export function createZCount(
minScore: ScoreBoundary<number>,
maxScore: ScoreBoundary<number>,
): command_request.Command {
const args = [key];
args.push(getScoreBoundaryArg(minScore));
args.push(getScoreBoundaryArg(maxScore));
const args = [
key,
getScoreBoundaryArg(minScore),
getScoreBoundaryArg(maxScore),
];
return createCommand(RequestType.ZCount, args);
}

Expand Down Expand Up @@ -1879,11 +1906,7 @@ export function createZRemRangeByLex(
minLex: ScoreBoundary<string>,
maxLex: ScoreBoundary<string>,
): command_request.Command {
const args = [
key,
getScoreBoundaryArg(minLex, true),
getScoreBoundaryArg(maxLex, true),
];
const args = [key, getLexBoundaryArg(minLex), getLexBoundaryArg(maxLex)];
return createCommand(RequestType.ZRemRangeByLex, args);
}

Expand All @@ -1895,12 +1918,15 @@ export function createZRemRangeByScore(
minScore: ScoreBoundary<number>,
maxScore: ScoreBoundary<number>,
): command_request.Command {
const args = [key];
args.push(getScoreBoundaryArg(minScore));
args.push(getScoreBoundaryArg(maxScore));
const args = [
key,
getScoreBoundaryArg(minScore),
getScoreBoundaryArg(maxScore),
];
return createCommand(RequestType.ZRemRangeByScore, args);
}

/** @internal */
export function createPersist(key: string): command_request.Command {
return createCommand(RequestType.Persist, [key]);
}
Expand All @@ -1913,11 +1939,7 @@ export function createZLexCount(
minLex: ScoreBoundary<string>,
maxLex: ScoreBoundary<string>,
): command_request.Command {
const args = [
key,
getScoreBoundaryArg(minLex, true),
getScoreBoundaryArg(maxLex, true),
];
const args = [key, getLexBoundaryArg(minLex), getLexBoundaryArg(maxLex)];
return createCommand(RequestType.ZLexCount, args);
}

Expand Down Expand Up @@ -2434,6 +2456,42 @@ export function createXLen(key: string): command_request.Command {
return createCommand(RequestType.XLen, [key]);
}

/** Optional arguments for {@link BaseClient.xpendingWithOptions|xpending}. */
export type StreamPendingOptions = {
/** Filter pending entries by their idle time - in milliseconds */
minIdleTime?: number;
/** Starting stream ID bound for range. */
start: ScoreBoundary<string>;
/** Ending stream ID bound for range. */
end: ScoreBoundary<string>;
/** Limit the number of messages returned. */
count: number;
/** Filter pending entries by consumer. */
consumer?: string;
};

/** @internal */
export function createXPending(
key: string,
group: string,
options?: StreamPendingOptions,
): command_request.Command {
const args = [key, group];

if (options) {
if (options.minIdleTime !== undefined)
args.push("IDLE", options.minIdleTime.toString());
args.push(
getStreamBoundaryArg(options.start),
getStreamBoundaryArg(options.end),
options.count.toString(),
);
if (options.consumer) args.push(options.consumer);
}

return createCommand(RequestType.XPending, args);
}

/** @internal */
export function createXInfoConsumers(
key: string,
Expand Down
38 changes: 38 additions & 0 deletions node/src/Transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import {
StreamAddOptions,
StreamClaimOptions,
StreamGroupOptions,
StreamPendingOptions,
StreamReadOptions,
StreamTrimOptions,
ZAddOptions,
Expand Down Expand Up @@ -206,6 +207,7 @@ import {
createXInfoConsumers,
createXInfoStream,
createXLen,
createXPending,
createXRead,
createXTrim,
createZAdd,
Expand Down Expand Up @@ -2332,6 +2334,9 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
}

/**
* Returns stream message summary information for pending messages matching a given range of IDs.
*
* See https://valkey.io/commands/xpending/ for more details.
* Returns the list of all consumers and their attributes for the given consumer group of the
* stream stored at `key`.
*
Expand All @@ -2340,6 +2345,39 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
* @param key - The key of the stream.
* @param group - The consumer group name.
*
* Command Response - An `array` that includes the summary of the pending messages.
* See example of {@link BaseClient.xpending|xpending} for more details.
*/
public xpending(key: string, group: string): T {
return this.addAndReturn(createXPending(key, group));
}

/**
* Returns stream message summary information for pending messages matching a given range of IDs.
*
* See https://valkey.io/commands/xpending/ for more details.
*
* @param key - The key of the stream.
* @param group - The consumer group name.
* @param options - Additional options to filter entries, see {@link StreamPendingOptions}.
*
* Command Response - A 2D-`array` of 4-tuples containing extended message information.
* See example of {@link BaseClient.xpendingWithOptions|xpendingWithOptions} for more details.
*/
public xpendingWithOptions(
key: string,
group: string,
options: StreamPendingOptions,
): T {
return this.addAndReturn(createXPending(key, group, options));
}

/**
* Returns the list of all consumers and their attributes for the given consumer group of the
* stream stored at `key`.
*
* See https://valkey.io/commands/xinfo-consumers/ for more details.
*
* Command Response - An `Array` of `Records`, where each mapping contains the attributes
* of a consumer for the given consumer group of the stream at `key`.
*/
Expand Down
Loading

0 comments on commit 8ca6681

Please sign in to comment.