Skip to content

Commit

Permalink
feat: add concurrency safety
Browse files Browse the repository at this point in the history
  • Loading branch information
eseliger committed May 10, 2018
1 parent 1f4a725 commit 4155f37
Show file tree
Hide file tree
Showing 9 changed files with 213 additions and 69 deletions.
47 changes: 46 additions & 1 deletion src/adapter.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import chalk from 'chalk'
import { parse } from 'url'
import { Logger } from '.'
import { Commit } from './git'
import { Migration, Task, TaskType } from './migration'

Expand All @@ -14,6 +16,11 @@ export class UnsupportedDialectError extends Error {
}
}

export class PendingMigrationTimedOutError extends Error {
/* istanbul ignore next */
public readonly name = 'PendingMigrationTimedOutError'
}

export interface TableRow {
id: number
name: string
Expand All @@ -26,9 +33,47 @@ export interface TableRow {
export abstract class DbAdapter {
public abstract init(): Promise<void>
public abstract getLastMigrationTask(): Promise<Task | null>
public abstract logMigrationTask(task: Task): Promise<void>
public abstract beginMigrationTask(task: Task): Promise<void>
public abstract finishMigrationTask(task: Task): Promise<void>
public abstract checkIfTaskCanExecute(task: Task): Promise<void>
public abstract close(): Promise<void>
protected abstract hasPendingMigration(): Promise<boolean>

public async waitForPending(logger: Logger): Promise<boolean> {
let wasPending = false
let shouldRetry = true
await Promise.race([
new Promise<never>((_, reject) =>
setTimeout(() => reject(new PendingMigrationTimedOutError()), 1000 * 60 * 10)
),
(async () => {
// fail after 10 min
let interval: NodeJS.Timer | undefined
while (shouldRetry) {
// if there are rows, a migration is already running
if (!(await this.hasPendingMigration())) {
if (wasPending) {
logger.log('\n\n')
}
break
}
if (!wasPending) {
logger.log(`${chalk.yellow('Waiting for pending migrations')} ...`)
// we had to wait for at least 1 pending migration
wasPending = true
interval = setInterval(() => logger.log('.'), 300)
}
// wait for 1000ms before retrying
await new Promise<void>(resolve => setTimeout(resolve, 1000))
}
if (interval) {
clearInterval(interval)
}
})(),
])
shouldRetry = false
return wasPending
}

protected rowToTask(row: TableRow): Task {
const task = new Task({
Expand Down
43 changes: 36 additions & 7 deletions src/adapters/postgres.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ export class PostgresAdapter extends DbAdapter {
"type" merkel_migration_type,
"commit" TEXT,
"head" TEXT NOT NULL,
"applied_at" TIMESTAMP WITH TIME ZONE NOT NULL
"applied_at" TIMESTAMP WITH TIME ZONE
);
`)
// migrate schema from merkel <= 0.19
await this.client.query(`ALTER TABLE "merkel_meta" ALTER COLUMN "applied_at" DROP NOT NULL`)
}

public close(): Promise<void> {
return new Promise<void>((resolve, reject) => {
return new Promise<void>(resolve => {
this.client.on('end', resolve)
// tslint:disable-next-line:no-floating-promises
this.client.end()
Expand All @@ -58,30 +60,46 @@ export class PostgresAdapter extends DbAdapter {
const { rows } = await this.client.query(`
SELECT "id", "name", "applied_at", "type", "commit", "head"
FROM "merkel_meta"
WHERE "applied_at" IS NOT NULL
ORDER BY "id" DESC
LIMIT 1
`)
return rows.length === 0 ? null : this.rowToTask(rows[0])
}

/**
* Logs an executed task to the database. Sets the task ID
* Logs a task to the database. Sets the task ID
*/
public async logMigrationTask(task: Task): Promise<void> {
public async beginMigrationTask(task: Task): Promise<void> {
const { rows } = await this.client.query(SQL`
INSERT INTO merkel_meta ("name", "type", "commit", "head", "applied_at")
INSERT INTO merkel_meta ("name", "type", "commit", "head")
VALUES (
${task.migration.name},
${task.type},
${task.commit ? task.commit.sha1 : null},
${task.head ? task.head.sha1 : null},
${task.appliedAt}
${task.head ? task.head.sha1 : null}
)
RETURNING id
`)
task.id = rows[0].id
}

/**
* Marks the task as finished
*/
public async finishMigrationTask(task: Task): Promise<void> {
const head = task.head ? task.head.sha1 : null
const commit = task.commit ? task.commit.sha1 : null
await this.client.query(SQL`
UPDATE merkel_meta
SET
"applied_at" = ${task.appliedAt},
"head" = ${head},
"commit" = ${commit}
WHERE "id" = ${task.id}
`)
}

/**
* Checks that the same task cannot be executed two times in a row and the first task cannot be
* a down task
Expand All @@ -91,6 +109,7 @@ export class PostgresAdapter extends DbAdapter {
SELECT "type"
FROM "merkel_meta"
WHERE "name" = ${task.migration.name}
AND "applied_at" IS NOT NULL
ORDER BY "id" DESC
LIMIT 1
`)
Expand All @@ -106,4 +125,14 @@ export class PostgresAdapter extends DbAdapter {
}
}
}

protected async hasPendingMigration(): Promise<boolean> {
const { rows } = await this.client.query(SQL`
SELECT "type"
FROM "merkel_meta"
WHERE "applied_at" IS NULL
LIMIT 1
`)
return rows.length !== 0
}
}
87 changes: 57 additions & 30 deletions src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { createAdapterFromUrl } from './adapter'
import { getHead } from './git'
import { addGitHook, HookAlreadyFoundError } from './git'
import {
CLI_LOGGER,
createConfig,
createMigrationDir,
generate,
Expand Down Expand Up @@ -133,12 +134,12 @@ yargs.command(
when: () => !!argv.db,
},
])
if (await createMigrationDir(migrationDir as string)) {
process.stdout.write(`Created ${chalk.cyan(migrationDir as string)}\n`)
if (await createMigrationDir(migrationDir)) {
process.stdout.write(`Created ${chalk.cyan(migrationDir)}\n`)
}
await createConfig({
migrationDir: migrationDir as string,
migrationOutDir: (migrationOutDir as string) || './migrations',
migrationDir,
migrationOutDir: migrationOutDir || './migrations',
})
process.stdout.write(`Created ${chalk.cyan(path.join('.', '.merkelrc.json'))}\n`)
if (initMetaNow) {
Expand Down Expand Up @@ -249,6 +250,8 @@ yargs.command(
const adapter = createAdapterFromUrl(argv.db!)
await adapter.init()
const head = await getHead()
// wait for current migration to finish
await adapter.waitForPending(CLI_LOGGER)
const status = await getStatus(adapter, head)
process.stdout.write('\n' + status.toString())
if (status.newCommits.some(commit => commit.tasks.length > 0)) {
Expand Down Expand Up @@ -283,33 +286,53 @@ yargs.command(
try {
const adapter = createAdapterFromUrl(argv.db!)
await adapter.init()
const head = await getHead()
const status = await getStatus(adapter, head)
process.stdout.write(status.toString())
if (status.newCommits.some(commit => commit.tasks.length > 0)) {
if (argv.confirm) {
const answer = await inquirer.prompt<{ continue: boolean }>({
type: 'confirm',
name: 'continue',
message: 'Continue?',
})
if (!answer.continue) {
process.exit(0)
while (true) {
const head = await getHead()
const status = await getStatus(adapter, head)
process.stdout.write(status.toString())
const tasks = status.newCommits.reduce<Task[]>((prev, next) => prev.concat(next.tasks), [])
if (tasks.length > 0) {
if (argv.confirm) {
const answer = await inquirer.prompt<{ continue: boolean }>({
type: 'confirm',
name: 'continue',
message: 'Continue?',
})
if (!answer.continue) {
process.exit(0)
}
process.stdout.write('\n')
}
process.stdout.write('\n')
}
process.stdout.write('Starting migration\n\n')
for (const commit of status.newCommits) {
process.stdout.write(`${chalk.yellow(commit.shortSha1)} ${commit.subject}\n`)
for (const task of commit.tasks) {
process.stdout.write(task.toString() + ' ...')
const interval = setInterval(() => process.stdout.write('.'), 100)
await task.execute(argv.migrationOutDir!, adapter, head, commit)
clearInterval(interval)
process.stdout.write(' Success\n')

process.stdout.write('Starting migration\n\n')

const hasChanged = await adapter.waitForPending(CLI_LOGGER)

if (hasChanged) {
process.stdout.write('The migrations have changed, reloading..\n\n')
continue
}
// create pending tasks
for (const task of tasks) {
await adapter.beginMigrationTask(task)
}

for (const commit of status.newCommits) {
process.stdout.write(`${chalk.yellow(commit.shortSha1)} ${commit.subject}\n`)
for (const task of commit.tasks) {
process.stdout.write(task.toString() + ' ...')
const interval = setInterval(() => process.stdout.write('.'), 100)
try {
await task.execute(argv.migrationOutDir!, adapter, head, commit)
} finally {
clearInterval(interval)
}
process.stdout.write(' Success\n')
}
}
process.stdout.write(chalk.green('\nAll migrations successful\n'))
}
process.stdout.write(chalk.green('\nAll migrations successful\n'))
break
}
process.exit(0)
} catch (err) {
Expand All @@ -328,9 +351,13 @@ const migrationCommand = (type: TaskType) => async (argv: MigrationCommandArgv)
try {
const adapter = createAdapterFromUrl(argv.db!)
await adapter.init()
await adapter.waitForPending(CLI_LOGGER)
const head = await getHead()
for (const name of argv.migrations!) {
const task = new Task({ type, migration: new Migration(name) })
const tasks = argv.migrations!.map(name => new Task({ type, migration: new Migration(name), head }))
for (const task of tasks) {
await adapter.beginMigrationTask(task)
}
for (const task of tasks) {
process.stdout.write(`${task.toString()} ...`)
const interval = setInterval(() => process.stdout.write('.'), 100)
try {
Expand Down
2 changes: 1 addition & 1 deletion src/git.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import chalk from 'chalk'
import { ChildProcess, execFile, spawn } from 'mz/child_process'
import { execFile, spawn } from 'mz/child_process'
import * as fs from 'mz/fs'
import * as path from 'path'
import { basename, resolve } from 'path'
Expand Down
44 changes: 33 additions & 11 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,41 @@ export class Status {
adapter: DbAdapter,
logger: Logger = CLI_LOGGER
): Promise<void> {
logger.log('Starting migration\n\n')
for (const commit of this.newCommits) {
logger.log(`${chalk.yellow.bold(commit.shortSha1)} ${commit.subject}\n`)
for (const task of commit.tasks) {
logger.log(task.toString() + '...')
/* istanbul ignore next */
const interval = setInterval(() => logger.log('.'), 100)
await task.execute(migrationDir, adapter, this.head, commit)
clearInterval(interval)
logger.log(' Success\n')
while (true) {
logger.log(this.toString())
const tasks = this.newCommits.reduce<Task[]>((prev, next) => prev.concat(next.tasks), [])
if (tasks.length > 0) {
logger.log('Starting migration\n\n')

const hasChanged = await adapter.waitForPending(logger)

if (hasChanged) {
logger.log('The migrations have changed, reloading..\n\n')
continue
}
// create pending tasks
for (const task of tasks) {
task.head = this.head
await adapter.beginMigrationTask(task)
}

for (const commit of this.newCommits) {
logger.log(`${chalk.yellow.bold(commit.shortSha1)} ${commit.subject}\n`)
for (const task of commit.tasks) {
logger.log(task.toString() + ' ...')
const interval = setInterval(() => logger.log('.'), 100)
try {
await task.execute(migrationDir, adapter, this.head, commit)
} finally {
clearInterval(interval)
}
logger.log(' Success\n')
}
}
logger.log(chalk.green('\nAll migrations successful\n'))
}
break
}
logger.log(chalk.green('\nAll migrations successful\n'))
}

/** Returns a string that can be printed to a CLI */
Expand Down
4 changes: 2 additions & 2 deletions src/migration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ export class Task {
let exceptionHandler: (() => void) | undefined
try {
await Promise.race([
new Promise<never>((resolve, reject) => {
new Promise<never>((_, reject) => {
exceptionHandler = reject
process.on('uncaughtException', reject)
}),
Expand All @@ -182,7 +182,7 @@ export class Task {
this.head = head
this.commit = commit
this.appliedAt = new Date()
await adapter.logMigrationTask(this)
await adapter.finishMigrationTask(this)
}

/**
Expand Down
Loading

0 comments on commit 4155f37

Please sign in to comment.