diff --git a/.changeset/tiny-coins-approve.md b/.changeset/tiny-coins-approve.md new file mode 100644 index 0000000000000..c26efb5f477ff --- /dev/null +++ b/.changeset/tiny-coins-approve.md @@ -0,0 +1,7 @@ +--- +"@medusajs/locking-postgres": patch +"@medusajs/modules-sdk": patch +"@medusajs/types": patch +--- + +Locking Module - locking-postgres diff --git a/.eslintignore b/.eslintignore index ed0ba98e31562..43e9a8fa72807 100644 --- a/.eslintignore +++ b/.eslintignore @@ -33,6 +33,7 @@ packages/* !packages/workflow-engine-inmemory !packages/fulfillment !packages/fulfillment-manual +!packages/locking-postgres !packages/locking-redis !packages/index diff --git a/.eslintrc.js b/.eslintrc.js index 02c2ee5f161b3..705010c5c6280 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -134,6 +134,7 @@ module.exports = { "./packages/modules/providers/file-s3/tsconfig.spec.json", "./packages/modules/providers/fulfillment-manual/tsconfig.spec.json", "./packages/modules/providers/payment-stripe/tsconfig.spec.json", + "./packages/modules/providers/locking-postgres/tsconfig.spec.json", "./packages/modules/providers/locking-redis/tsconfig.spec.json", "./packages/framework/tsconfig.json", diff --git a/packages/medusa/package.json b/packages/medusa/package.json index 660a6d01e2f3b..13a46b76506fb 100644 --- a/packages/medusa/package.json +++ b/packages/medusa/package.json @@ -84,6 +84,7 @@ "@medusajs/inventory": "^0.0.3", "@medusajs/link-modules": "^0.2.11", "@medusajs/locking": "^0.0.1", + "@medusajs/locking-postgres": "^0.0.1", "@medusajs/locking-redis": "^0.0.1", "@medusajs/notification": "^0.1.2", "@medusajs/notification-local": "^0.0.1", diff --git a/packages/medusa/src/modules/locking-postgres.ts b/packages/medusa/src/modules/locking-postgres.ts new file mode 100644 index 0000000000000..ef779fbdca7cd --- /dev/null +++ b/packages/medusa/src/modules/locking-postgres.ts @@ -0,0 +1,6 @@ +import PostgresLockingProvider from "@medusajs/locking-postgres" + +export * from "@medusajs/locking-postgres" + +export default PostgresLockingProvider +export const discoveryPath = require.resolve("@medusajs/locking-postgres") diff --git a/packages/modules/providers/locking-postgres/.gitignore b/packages/modules/providers/locking-postgres/.gitignore new file mode 100644 index 0000000000000..83cb36a41ea17 --- /dev/null +++ b/packages/modules/providers/locking-postgres/.gitignore @@ -0,0 +1,4 @@ +dist +node_modules +.DS_store +yarn.lock diff --git a/packages/modules/providers/locking-postgres/integration-tests/__tests__/index.spec.ts b/packages/modules/providers/locking-postgres/integration-tests/__tests__/index.spec.ts new file mode 100644 index 0000000000000..75e459745f23c --- /dev/null +++ b/packages/modules/providers/locking-postgres/integration-tests/__tests__/index.spec.ts @@ -0,0 +1,213 @@ +import { ILockingModule } from "@medusajs/framework/types" +import { Modules, promiseAll } from "@medusajs/framework/utils" +import { moduleIntegrationTestRunner } from "@medusajs/test-utils" +import { setTimeout } from "node:timers/promises" + +jest.setTimeout(10000) + +const providerId = "locking-postgres" +moduleIntegrationTestRunner({ + moduleName: Modules.LOCKING, + moduleOptions: { + providers: [ + { + id: providerId, + resolve: require.resolve("../../src"), + is_default: true, + }, + ], + }, + testSuite: ({ service }) => { + describe("Locking Module Service", () => { + let stock = 5 + function replenishStock() { + stock = 5 + } + function hasStock() { + return stock > 0 + } + async function reduceStock() { + await setTimeout(10) + stock-- + } + async function buy() { + if (hasStock()) { + await reduceStock() + return true + } + return false + } + + it("should execute functions respecting the key locked", async () => { + // 10 parallel calls to buy should oversell the stock + const prom: any[] = [] + for (let i = 0; i < 10; i++) { + prom.push(buy()) + } + await Promise.all(prom) + expect(stock).toBe(-5) + + replenishStock() + + // 10 parallel calls to buy with lock should not oversell the stock + const promWLock: any[] = [] + for (let i = 0; i < 10; i++) { + promWLock.push(service.execute("item_1", buy)) + } + await Promise.all(promWLock) + + expect(stock).toBe(0) + }) + + it("should acquire lock and release it", async () => { + await service.acquire("key_name", { + ownerId: "user_id_123", + }) + + const userReleased = await service.release("key_name", { + ownerId: "user_id_456", + }) + const anotherUserLock = service.acquire("key_name", { + ownerId: "user_id_456", + }) + + expect(userReleased).toBe(false) + await expect(anotherUserLock).rejects.toThrow( + `Failed to acquire lock for key "key_name"` + ) + + const releasing = await service.release("key_name", { + ownerId: "user_id_123", + }) + + expect(releasing).toBe(true) + }) + + it("should acquire lock and release it during parallel calls", async () => { + const keyToLock = "mySpecialKey" + const user_1 = { + ownerId: "user_id_456", + } + const user_2 = { + ownerId: "user_id_000", + } + + await expect( + service.acquire(keyToLock, user_1) + ).resolves.toBeUndefined() + + await expect( + service.acquire(keyToLock, user_1) + ).resolves.toBeUndefined() + + await expect(service.acquire(keyToLock, user_2)).rejects.toThrow( + `Failed to acquire lock for key "${keyToLock}"` + ) + + await expect(service.acquire(keyToLock, user_2)).rejects.toThrow( + `Failed to acquire lock for key "${keyToLock}"` + ) + + await service.acquire(keyToLock, user_1) + + const releaseNotLocked = await service.release(keyToLock, { + ownerId: "user_id_000", + }) + expect(releaseNotLocked).toBe(false) + + const release = await service.release(keyToLock, user_1) + expect(release).toBe(true) + }) + + it("should fail to acquire the same key when no owner is provided", async () => { + const keyToLock = "mySpecialKey" + + const user_2 = { + ownerId: "user_id_000", + } + + await expect(service.acquire(keyToLock)).resolves.toBeUndefined() + + await expect(service.acquire(keyToLock)).rejects.toThrow( + `Failed to acquire lock for key "${keyToLock}"` + ) + + await expect(service.acquire(keyToLock)).rejects.toThrow( + `Failed to acquire lock for key "${keyToLock}"` + ) + + await expect(service.acquire(keyToLock, user_2)).rejects.toThrow( + `Failed to acquire lock for key "${keyToLock}"` + ) + + await expect(service.acquire(keyToLock, user_2)).rejects.toThrow( + `Failed to acquire lock for key "${keyToLock}"` + ) + + const releaseNotLocked = await service.release(keyToLock, { + ownerId: "user_id_000", + }) + expect(releaseNotLocked).toBe(false) + + const release = await service.release(keyToLock) + expect(release).toBe(true) + }) + }) + + it("should release lock in case of failure", async () => { + const fn_1 = jest.fn(async () => { + throw new Error("Error") + }) + const fn_2 = jest.fn(async () => {}) + + await service.execute("lock_key", fn_1).catch(() => {}) + await service.execute("lock_key", fn_2).catch(() => {}) + + expect(fn_1).toHaveBeenCalledTimes(1) + expect(fn_2).toHaveBeenCalledTimes(1) + }) + + it("should release lock in case of timeout failure", async () => { + const fn_1 = jest.fn(async () => { + await setTimeout(1010) + return "fn_1" + }) + + const fn_2 = jest.fn(async () => { + return "fn_2" + }) + + const fn_3 = jest.fn(async () => { + return "fn_3" + }) + + const ops = [ + service + .execute("lock_key", fn_1, { + timeout: 1, + }) + .catch((e) => e), + + service + .execute("lock_key", fn_2, { + timeout: 1, + }) + .catch((e) => e), + + service + .execute("lock_key", fn_3, { + timeout: 2, + }) + .catch((e) => e), + ] + + const res = await promiseAll(ops) + + expect(res).toEqual(["fn_1", expect.any(Error), "fn_3"]) + + expect(fn_1).toHaveBeenCalledTimes(1) + expect(fn_2).toHaveBeenCalledTimes(0) + expect(fn_3).toHaveBeenCalledTimes(1) + }) + }, +}) diff --git a/packages/modules/providers/locking-postgres/jest.config.js b/packages/modules/providers/locking-postgres/jest.config.js new file mode 100644 index 0000000000000..818699559a62f --- /dev/null +++ b/packages/modules/providers/locking-postgres/jest.config.js @@ -0,0 +1,10 @@ +const defineJestConfig = require("../../../../define_jest_config") +module.exports = defineJestConfig({ + moduleNameMapper: { + "^@models": "/src/models", + "^@services": "/src/services", + "^@repositories": "/src/repositories", + "^@types": "/src/types", + "^@utils": "/src/utils", + }, +}) diff --git a/packages/modules/providers/locking-postgres/mikro-orm.config.dev.ts b/packages/modules/providers/locking-postgres/mikro-orm.config.dev.ts new file mode 100644 index 0000000000000..a06eaed4790a8 --- /dev/null +++ b/packages/modules/providers/locking-postgres/mikro-orm.config.dev.ts @@ -0,0 +1,7 @@ +import * as entities from "./src/models" + +import { defineMikroOrmCliConfig } from "@medusajs/framework/utils" + +export default defineMikroOrmCliConfig("lockingPostgres", { + entities: Object.values(entities), +}) diff --git a/packages/modules/providers/locking-postgres/package.json b/packages/modules/providers/locking-postgres/package.json new file mode 100644 index 0000000000000..7cf10b04b00bb --- /dev/null +++ b/packages/modules/providers/locking-postgres/package.json @@ -0,0 +1,54 @@ +{ + "name": "@medusajs/locking-postgres", + "version": "0.0.1", + "description": "Postgres Advisory Locks for Medusa", + "main": "dist/index.js", + "repository": { + "type": "git", + "url": "https://github.com/medusajs/medusa", + "directory": "packages/locking-postgres" + }, + "files": [ + "dist", + "!dist/**/__tests__", + "!dist/**/__mocks__", + "!dist/**/__fixtures__" + ], + "engines": { + "node": ">=20" + }, + "author": "Medusa", + "license": "MIT", + "devDependencies": { + "@medusajs/framework": "^0.0.1", + "@mikro-orm/cli": "5.9.7", + "@mikro-orm/core": "5.9.7", + "@mikro-orm/migrations": "5.9.7", + "@mikro-orm/postgresql": "5.9.7", + "@swc/core": "^1.7.28", + "@swc/jest": "^0.2.36", + "jest": "^29.7.0", + "rimraf": "^5.0.1", + "typescript": "^5.6.2" + }, + "peerDependencies": { + "@medusajs/framework": "^0.0.1" + }, + "scripts": { + "watch": "tsc --build --watch", + "watch:test": "tsc --build tsconfig.spec.json --watch", + "resolve:aliases": "tsc --showConfig -p tsconfig.json > tsconfig.resolved.json && tsc-alias -p tsconfig.resolved.json && rimraf tsconfig.resolved.json", + "build": "rimraf dist && tsc --build && npm run resolve:aliases", + "test": "jest --passWithNoTests src", + "test:integration": "jest --runInBand --forceExit -- integration-tests/**/__tests__/**/*.spec.ts", + "migration:generate": " MIKRO_ORM_CLI=./mikro-orm.config.dev.ts medusa-mikro-orm migration:generate", + "migration:initial": " MIKRO_ORM_CLI=./mikro-orm.config.dev.ts medusa-mikro-orm migration:create --initial -n InitialSetupMigration", + "migration:create": " MIKRO_ORM_CLI=./mikro-orm.config.dev.ts medusa-mikro-orm migration:create", + "migration:up": " MIKRO_ORM_CLI=./mikro-orm.config.dev.ts medusa-mikro-orm migration:up", + "orm:cache:clear": " MIKRO_ORM_CLI=./mikro-orm.config.dev.ts medusa-mikro-orm cache:clear" + }, + "keywords": [ + "medusa-providers", + "medusa-providers-locking" + ] +} diff --git a/packages/modules/providers/locking-postgres/src/index.ts b/packages/modules/providers/locking-postgres/src/index.ts new file mode 100644 index 0000000000000..d706266db46c2 --- /dev/null +++ b/packages/modules/providers/locking-postgres/src/index.ts @@ -0,0 +1,8 @@ +import { ModuleProvider, Modules } from "@medusajs/framework/utils" +import { PostgresAdvisoryLockProvider } from "./services/advisory-lock" + +const services = [PostgresAdvisoryLockProvider] + +export default ModuleProvider(Modules.LOCKING, { + services, +}) diff --git a/packages/modules/providers/locking-postgres/src/migrations/.snapshot-medusa-locking-postgres.json b/packages/modules/providers/locking-postgres/src/migrations/.snapshot-medusa-locking-postgres.json new file mode 100644 index 0000000000000..4e289fc22c5d8 --- /dev/null +++ b/packages/modules/providers/locking-postgres/src/migrations/.snapshot-medusa-locking-postgres.json @@ -0,0 +1,55 @@ +{ + "namespaces": [ + "public" + ], + "name": "public", + "tables": [ + { + "columns": { + "id": { + "name": "id", + "type": "text", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "mappedType": "text" + }, + "owner_id": { + "name": "owner_id", + "type": "text", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": true, + "mappedType": "text" + }, + "expiration": { + "name": "expiration", + "type": "timestamptz", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": true, + "length": 6, + "mappedType": "datetime" + } + }, + "name": "locking", + "schema": "public", + "indexes": [ + { + "keyName": "locking_pkey", + "columnNames": [ + "id" + ], + "composite": false, + "primary": true, + "unique": true + } + ], + "checks": [], + "foreignKeys": {} + } + ] +} diff --git a/packages/modules/providers/locking-postgres/src/migrations/Migration20241009222919_InitialSetupMigration.ts b/packages/modules/providers/locking-postgres/src/migrations/Migration20241009222919_InitialSetupMigration.ts new file mode 100644 index 0000000000000..451b314df7b50 --- /dev/null +++ b/packages/modules/providers/locking-postgres/src/migrations/Migration20241009222919_InitialSetupMigration.ts @@ -0,0 +1,13 @@ +import { Migration } from "@mikro-orm/migrations" + +export class Migration20241009222919_InitialSetupMigration extends Migration { + async up(): Promise { + this.addSql( + 'create table if not exists "locking" ("id" text not null, "owner_id" text null, "expiration" timestamptz null, constraint "locking_pkey" primary key ("id"));' + ) + } + + async down(): Promise { + this.addSql(`drop table "locking";`) + } +} diff --git a/packages/modules/providers/locking-postgres/src/models/index.ts b/packages/modules/providers/locking-postgres/src/models/index.ts new file mode 100644 index 0000000000000..c3224414d5662 --- /dev/null +++ b/packages/modules/providers/locking-postgres/src/models/index.ts @@ -0,0 +1 @@ +export { default as Locking } from "./locking" diff --git a/packages/modules/providers/locking-postgres/src/models/locking.ts b/packages/modules/providers/locking-postgres/src/models/locking.ts new file mode 100644 index 0000000000000..8b9558befbcb6 --- /dev/null +++ b/packages/modules/providers/locking-postgres/src/models/locking.ts @@ -0,0 +1,32 @@ +import { generateEntityId } from "@medusajs/framework/utils" +import { + BeforeCreate, + Entity, + OnInit, + PrimaryKey, + Property, +} from "@mikro-orm/core" + +@Entity({ tableName: "locking" }) +class Locking { + @PrimaryKey({ columnType: "text" }) + id!: string + + @Property({ columnType: "text", nullable: true }) + owner_id: string | null = null + + @Property({ columnType: "timestamptz", nullable: true }) + expiration: Date | null = null + + @BeforeCreate() + onCreate() { + this.id = generateEntityId(this.id, "lk") + } + + @OnInit() + onInit() { + this.id = generateEntityId(this.id, "lk") + } +} + +export default Locking diff --git a/packages/modules/providers/locking-postgres/src/services/advisory-lock.ts b/packages/modules/providers/locking-postgres/src/services/advisory-lock.ts new file mode 100644 index 0000000000000..27912cb749a78 --- /dev/null +++ b/packages/modules/providers/locking-postgres/src/services/advisory-lock.ts @@ -0,0 +1,196 @@ +import { ILockingProvider } from "@medusajs/framework/types" +import { isDefined, MedusaService } from "@medusajs/framework/utils" +import { EntityManager } from "@mikro-orm/core" +import { Locking } from "@models" + +type InjectedDependencies = { + manager: EntityManager +} + +export class PostgresAdvisoryLockProvider + extends MedusaService({ Locking }) + implements ILockingProvider +{ + static identifier = "locking-postgres" + + protected manager: EntityManager + + constructor(container: InjectedDependencies) { + // @ts-ignore + super(...arguments) + this.manager = container.manager + } + + private getManager(): any { + return this.manager + } + + async execute( + keys: string | string[], + job: () => Promise, + args?: { + timeout?: number + } + ): Promise { + const timeout = Math.max(args?.timeout ?? 5, 1) + const timeoutSeconds = Number.isNaN(timeout) ? 1 : timeout + + return await this.getManager().transactional(async (manager) => { + const ops: Promise[] = [] + if (timeoutSeconds > 0) { + ops.push(this.getTimeout(timeoutSeconds)) + } + + const fnName = "pg_advisory_xact_lock" + + const allKeys = Array.isArray(keys) ? keys : [keys] + const numKeys = allKeys.map(this.hashStringToInt) + const lockPromises = numKeys.map((numKey) => + manager.execute(`SELECT ${fnName}(?)`, [numKey]) + ) + + const lock = Promise.all(lockPromises) + + ops.push(lock) + + try { + await Promise.race(ops) + + const ret = await job() + await manager.commit() + + return ret + } catch (e) { + await manager.rollback() + throw e + } + }) + } + + private async loadLock(key: string): Promise<{ + owner_id: string | null + expiration: number | null + now: number + }> { + const [row] = await this.getManager().execute( + `SELECT owner_id, expiration, NOW() AS now FROM locking WHERE id = ?`, + [key] + ) + + return row + } + + async acquire( + keys: string | string[], + args?: { + ownerId?: string | null + expire?: number + } + ): Promise { + keys = Array.isArray(keys) ? keys : [keys] + + const { ownerId, expire } = args ?? {} + for (const key of keys) { + const row = await this.loadLock(key) + + if (!row) { + const expireSql = expire + ? `NOW() + INTERVAL '${+expire} SECONDS'` + : "NULL" + + try { + await this.getManager().execute( + `INSERT INTO locking (id, owner_id, expiration) VALUES (?, ?, ${expireSql})`, + [key, ownerId ?? null] + ) + } catch (err) { + if (err.toString().includes("locking_pkey")) { + const owner = await this.loadLock(key) + if (ownerId != owner.owner_id) { + throw new Error(`"${key}" is already locked.`) + } + } else { + throw err + } + } + continue + } + + const errMessage = `Failed to acquire lock for key "${key}"` + if (row.owner_id === null || row.owner_id !== ownerId) { + throw new Error(errMessage) + } + + if (!row.expiration && row.owner_id == ownerId) { + continue + } + + const canRefresh = + row.owner_id == ownerId && (expire || row.expiration! <= row.now) + + if (!canRefresh || !expire) { + continue + } + + await this.getManager().execute( + `UPDATE locking SET owner_id = ?, expiration = NOW() + INTERVAL '${+expire} SECONDS' WHERE id = ?`, + [ownerId ?? null, key] + ) + } + } + + async release( + keys: string | string[], + args?: { + ownerId?: string | null + } + ): Promise { + const { ownerId } = args ?? {} + + keys = Array.isArray(keys) ? keys : [keys] + + let success = true + for (const key of keys) { + const row = await this.loadLock(key) + + if (!row || row.owner_id != ownerId) { + success = false + continue + } + + await this.getManager().execute(`DELETE FROM locking WHERE id = ?`, [key]) + + success = success && (!row.expiration || row.expiration > row.now) + } + return success + } + + async releaseAll(args?: { ownerId?: string | null }): Promise { + const { ownerId } = args ?? {} + + if (!isDefined(ownerId)) { + await this.getManager().execute(`TRUNCATE TABLE locking`) + } else { + await this.getManager().execute( + `DELETE FROM locking WHERE owner_id = ?`, + [ownerId] + ) + } + } + + private hashStringToInt(str: string): number { + let hash = 5381 + for (let i = str.length; i--; ) { + hash = (hash * 33) ^ str.charCodeAt(i) + } + return hash >>> 0 + } + + private async getTimeout(seconds: number): Promise { + return new Promise((_, reject) => { + setTimeout(() => { + reject(new Error("Timed-out acquiring lock.")) + }, seconds * 1000) + }) + } +} diff --git a/packages/modules/providers/locking-postgres/tsconfig.json b/packages/modules/providers/locking-postgres/tsconfig.json new file mode 100644 index 0000000000000..90f3a70b383ef --- /dev/null +++ b/packages/modules/providers/locking-postgres/tsconfig.json @@ -0,0 +1,12 @@ +{ + "extends": "../../../../_tsconfig.base.json", + "compilerOptions": { + "paths": { + "@models": ["./src/models"], + "@services": ["./src/services"], + "@repositories": ["./src/repositories"], + "@types": ["./src/types"], + "@utils": ["./src/utils"] + } + } +} diff --git a/yarn.lock b/yarn.lock index b1686f2a8e542..6444db38346a6 100644 --- a/yarn.lock +++ b/yarn.lock @@ -5958,6 +5958,25 @@ __metadata: languageName: unknown linkType: soft +"@medusajs/locking-postgres@^0.0.1, @medusajs/locking-postgres@workspace:packages/modules/providers/locking-postgres": + version: 0.0.0-use.local + resolution: "@medusajs/locking-postgres@workspace:packages/modules/providers/locking-postgres" + dependencies: + "@medusajs/framework": ^0.0.1 + "@mikro-orm/cli": 5.9.7 + "@mikro-orm/core": 5.9.7 + "@mikro-orm/migrations": 5.9.7 + "@mikro-orm/postgresql": 5.9.7 + "@swc/core": ^1.7.28 + "@swc/jest": ^0.2.36 + jest: ^29.7.0 + rimraf: ^5.0.1 + typescript: ^5.6.2 + peerDependencies: + "@medusajs/framework": ^0.0.1 + languageName: unknown + linkType: soft + "@medusajs/locking-redis@^0.0.1, @medusajs/locking-redis@workspace:packages/modules/providers/locking-redis": version: 0.0.0-use.local resolution: "@medusajs/locking-redis@workspace:packages/modules/providers/locking-redis" @@ -6058,6 +6077,7 @@ __metadata: "@medusajs/inventory": ^0.0.3 "@medusajs/link-modules": ^0.2.11 "@medusajs/locking": ^0.0.1 + "@medusajs/locking-postgres": ^0.0.1 "@medusajs/locking-redis": ^0.0.1 "@medusajs/notification": ^0.1.2 "@medusajs/notification-local": ^0.0.1