Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node: Add XPENDING command. #2085

Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#### Changes
* Node: Added XPENDING commands ([#2085](https:/valkey-io/valkey-glide/pull/2085))
* Node: Added EXPIRETIME and PEXPIRETIME commands ([#2063](https:/valkey-io/valkey-glide/pull/2063))
* Node: Added SORT commands ([#2028](https:/valkey-io/valkey-glide/pull/2028))
* Node: Added LASTSAVE command ([#2059](https:/valkey-io/valkey-glide/pull/2059))
Expand Down
2 changes: 2 additions & 0 deletions node/npm/glide/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ function initialize() {
StreamTrimOptions,
StreamAddOptions,
StreamReadOptions,
StreamPendingOptions,
ScriptOptions,
ClosingError,
ConfigurationError,
Expand Down Expand Up @@ -224,6 +225,7 @@ function initialize() {
StreamTrimOptions,
StreamAddOptions,
StreamReadOptions,
StreamPendingOptions,
ScriptOptions,
ClosingError,
ConfigurationError,
Expand Down
73 changes: 73 additions & 0 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import {
SearchOrigin,
SetOptions,
StreamAddOptions,
StreamPendingOptions,
StreamReadOptions,
StreamTrimOptions,
ZAddOptions,
Expand Down Expand Up @@ -153,6 +154,7 @@ import {
createXAdd,
createXDel,
createXLen,
createXPending,
createXRead,
createXTrim,
createZAdd,
Expand Down Expand Up @@ -3669,6 +3671,77 @@ export class BaseClient {
return this.createWritePromise(createXLen(key));
}

/**
* Returns stream message summary information for pending messages matching a given range of IDs.
*
* See https://valkey.io/commands/xpending/ for more details.
*
* @param key - The key of the stream.
* @param group - The consumer group name.
* @returns An `array` that includes the summary of the pending messages. See example for more details.
* @example
* ```typescript
* console.log(await client.xpending("my_stream", "my_group")); // Output:
* // [
* // 42, // The total number of pending messages
* // "1722643465939-0", // The smallest ID among the pending messages
* // "1722643484626-0", // The greatest ID among the pending messages
* // [ // A 2D-`array` of every consumer in the group
* // [ "consumer1", "10" ], // with at least one pending message, and the
* // [ "consumer2", "32" ], // number of pending messages it has
* // ]
* // ]
* ```
*/
public async xpending(
key: string,
group: string,
): Promise<[number, string, string, [string, number][]]> {
return this.createWritePromise(createXPending(key, group));
}

/**
* Returns an extended form of stream message information for pending messages matching a given range of IDs.
*
* See https://valkey.io/commands/xpending/ for more details.
*
* @param key - The key of the stream.
* @param group - The consumer group name.
* @param options - Additional options to filter entries, see {@link StreamPendingOptions}.
* @returns A 2D-`array` of 4-tuples containing extended message information. See example for more details.
*
* @example
* ```typescript
* console.log(await client.xpending("my_stream", "my_group"), {
* start: { value: "0-1", isInclusive: true },
* end: InfScoreBoundary.PositiveInfinity,
* count: 2,
* consumer: "consumer1"
* }); // Output:
* // [
* // [
* // "1722643465939-0", // The ID of the message
* // "consumer1", // The name of the consumer that fetched the message and has still to acknowledge it
* // 174431, // The number of milliseconds that elapsed since the last time this message was delivered to this consumer
* // 1 // The number of times this message was delivered
* // ],
* // [
* // "1722643484626-0",
* // "consumer1",
* // 202231,
* // 1
* // ]
* // ]
* ```
*/
public async xpendingWithOptions(
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
key: string,
group: string,
options: StreamPendingOptions,
): Promise<[string, string, number, number][]> {
return this.createWritePromise(createXPending(key, group, options));
}

private readonly MAP_READ_FROM_STRATEGY: Record<
ReadFrom,
connection_request.ReadFrom
Expand Down
42 changes: 42 additions & 0 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2312,6 +2312,48 @@ export function createXLen(key: string): command_request.Command {
return createCommand(RequestType.XLen, [key]);
}

/** Optional arguments for {@link BaseClient.xpendingWithOptions|xpending}. */
export type StreamPendingOptions = {
/** Filter pending entries by their idle time - in milliseconds */
minIdleTime?: number;
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
/** Starting stream ID bound for range. */
start: ScoreBoundary<string>;
/** Ending stream ID bound for range. */
end: ScoreBoundary<string>;
/** Limit the number of messages returned. */
count: number;
/** Filter pending entries by consumer. */
consumer?: string;
};

/** @internal */
export function createXPending(
key: string,
group: string,
options?: StreamPendingOptions,
): command_request.Command {
const args = [key, group];

if (options) {
if (options.minIdleTime !== undefined)
args.push("IDLE", options.minIdleTime.toString());
args.push(
// pass enum (string actually) as is, because XRANGE and ZRANGE are not
// aligned in how range boundaries are converted to args :facepalm:
typeof options.start === "string"
? options.start
: getScoreBoundaryArg(options.start, false),
typeof options.end === "string"
? options.end
: getScoreBoundaryArg(options.end, false),
options.count.toString(),
);
if (options.consumer) args.push(options.consumer);
}

return createCommand(RequestType.XPending, args);
}

/**
* @internal
*/
Expand Down
38 changes: 38 additions & 0 deletions node/src/Transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import {
ReadFrom, // eslint-disable-line @typescript-eslint/no-unused-vars
BaseClient, // eslint-disable-line @typescript-eslint/no-unused-vars
} from "./BaseClient";

import {
Expand Down Expand Up @@ -212,6 +213,8 @@ import {
createZRevRankWithScore,
createZScan,
createZScore,
createXPending,
StreamPendingOptions,
} from "./Commands";
import { command_request } from "./ProtobufMessage";

Expand Down Expand Up @@ -2125,6 +2128,41 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
return this.addAndReturn(createXLen(key));
}

/**
* Returns stream message summary information for pending messages matching a given range of IDs.
*
* See https://valkey.io/commands/xpending/ for more details.
*
* @param key - The key of the stream.
* @param group - The consumer group name.
*
* Command Response - An `array` that includes the summary of the pending messages.
* See example of {@link BaseClient.xpending|xpending} for more details.
*/
public xpending(key: string, group: string): T {
return this.addAndReturn(createXPending(key, group));
}

/**
* Returns stream message summary information for pending messages matching a given range of IDs.
*
* See https://valkey.io/commands/xpending/ for more details.
*
* @param key - The key of the stream.
* @param group - The consumer group name.
* @param options - Additional options to filter entries, see {@link StreamPendingOptions}.
*
* Command Response - A 2D-`array` of 4-tuples containing extended message information.
* See example of {@link BaseClient.xpendingWithOptions|xpendingWithOptions} for more details.
*/
public xpendingWithOptions(
key: string,
group: string,
options: StreamPendingOptions,
): T {
return this.addAndReturn(createXPending(key, group, options));
}

/**
* Renames `key` to `newkey`.
* If `newkey` already exists it is overwritten.
Expand Down
115 changes: 108 additions & 7 deletions node/tests/SharedTests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3795,13 +3795,9 @@ export function runBaseTests<Context>(config: {
expect(await client.type(key)).toEqual("hash");
expect(await client.del([key])).toEqual(1);

await client.customCommand([
"XADD",
key,
"*",
"field",
"value",
]);
await client.xadd(key, [["field", "value"]], {
makeStream: true,
});
expect(await client.type(key)).toEqual("stream");
expect(await client.del([key])).toEqual(1);
expect(await client.type(key)).toEqual("none");
Expand Down Expand Up @@ -6665,6 +6661,111 @@ export function runBaseTests<Context>(config: {
config.timeout,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`xpending test_%p`,
async (protocol) => {
await runTest(async (client: BaseClient) => {
const key = uuidv4();
const group = uuidv4();

expect(
await client.customCommand([
"xgroup",
"create",
key,
group,
"0",
"MKSTREAM",
]),
).toEqual("OK");
expect(
await client.customCommand([
"xgroup",
"createconsumer",
key,
group,
"consumer",
]),
).toEqual(true);

expect(
await client.xadd(
key,
[
["entry1_field1", "entry1_value1"],
["entry1_field2", "entry1_value2"],
],
{ id: "0-1" },
),
).toEqual("0-1");
expect(
await client.xadd(
key,
[["entry2_field1", "entry2_value1"]],
{ id: "0-2" },
),
).toEqual("0-2");

expect(
await client.customCommand([
"xreadgroup",
"group",
group,
"consumer",
"STREAMS",
key,
">",
]),
).toEqual({
[key]: {
"0-1": [
["entry1_field1", "entry1_value1"],
["entry1_field2", "entry1_value2"],
],
"0-2": [["entry2_field1", "entry2_value1"]],
},
});

// wait to get some minIdleTime
await new Promise((resolve) => setTimeout(resolve, 500));

expect(await client.xpending(key, group)).toEqual([
2,
"0-1",
"0-2",
[["consumer", "2"]],
]);

const result = await client.xpendingWithOptions(key, group, {
start: InfScoreBoundary.NegativeInfinity,
end: InfScoreBoundary.PositiveInfinity,
count: 1,
minIdleTime: 42,
});
result[0][2] = 0; // overwrite msec counter to avoid test flakyness
expect(result).toEqual([["0-1", "consumer", 0, 1]]);

// not existing consumer
expect(
await client.xpendingWithOptions(key, group, {
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
start: { value: "0-1", isInclusive: true },
end: { value: "0-2", isInclusive: false },
count: 12,
consumer: "_",
}),
).toEqual([]);

// key exists, but it is not a stream
const stringKey = uuidv4();
expect(await client.set(stringKey, "foo")).toEqual("OK");
await expect(client.xpending(stringKey, "_")).rejects.toThrow(
RequestError,
);
}, protocol);
},
config.timeout,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`lmpop test_%p`,
async (protocol) => {
Expand Down
Loading
Loading