Skip to content

Commit

Permalink
Queue page requests to wp endpoints (#4735)
Browse files Browse the repository at this point in the history
* Wrap better-queue with Promise syntax that resolves when the queue
drains

* Queue requests for wp objects.
Added config to set concurreny of the requests

* Use concurrency instead of batchSize

* Fix passing the config to the getPages function
Fix how options are passed in to better-queue
  • Loading branch information
tsimons authored and KyleAMathews committed Mar 27, 2018
1 parent 24bd7a8 commit fe55f6d
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 13 deletions.
49 changes: 49 additions & 0 deletions packages/gatsby-source-wordpress/src/__tests__/request-in-queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
jest.mock(`axios`)

const requestInQueue = require(`../request-in-queue`)
const axios = require(`axios`)

axios.mockImplementation(opts => {
if (opts.throw) { throw new Error(opts.throw) }

return opts.url.slice(opts.url.lastIndexOf(`/`) + 1)
})

describe(`requestInQueue`, () => {
let requests

beforeEach(() => {
requests = [
{ method: `get`, url: `https://gatsbyjs.org/1` },
{ method: `get`, url: `https://gatsbyjs.org/2` },
{ method: `get`, url: `https://gatsbyjs.org/3` },
{ method: `get`, url: `https://gatsbyjs.org/4` },
]
})

afterEach(() => {
axios.mockClear()
})

it(`runs all requests in queue`, async () => {
await requestInQueue(requests)

requests.forEach((req) => {
expect(axios).toHaveBeenCalledWith(req)
})
})

it(`returns the values in the same order they were requested`, async () => {
const responses = await requestInQueue(requests)
expect(responses).toEqual([`1`, `2`, `3`, `4`])
})

it(`stops any requests when one throws an error`, async () => {
try {
await requestInQueue([{ throw: `error` }, ...requests])
} catch (err) {
expect(err).toBeDefined()
}
expect(axios).toHaveBeenCalledTimes(1)
})
})
27 changes: 14 additions & 13 deletions packages/gatsby-source-wordpress/src/fetch.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const axios = require(`axios`)
const _ = require(`lodash`)
const colorized = require(`./output-color`)
const httpExceptionHandler = require(`./http-exception-handler`)
const requestInQueue = require(`./request-in-queue`)

/**
* High-level function to coordinate fetching data from a WordPress
Expand All @@ -18,6 +19,7 @@ async function fetch({
baseUrl,
typePrefix,
refactoredEntityTypes,
concurrentRequests,
}) {
// If the site is hosted on wordpress.com, the API Route differs.
// Same entity types are exposed (excepted for medias and users which need auth)
Expand Down Expand Up @@ -127,6 +129,7 @@ async function fetch({
_hostingWPCOM,
_auth,
_accessToken,
concurrentRequests,
})
)
if (_verbose) console.log(``)
Expand Down Expand Up @@ -185,6 +188,7 @@ async function fetchData({
_hostingWPCOM,
_auth,
_accessToken,
concurrentRequests,
}) {
const type = route.type
const url = route.url
Expand All @@ -200,7 +204,7 @@ async function fetchData({
if (_verbose) console.time(`Fetching the ${type} took`)

let routeResponse = await getPages(
{ url, _perPage, _hostingWPCOM, _auth, _accessToken },
{ url, _perPage, _hostingWPCOM, _auth, _accessToken, getPages, concurrentRequests },
1
)

Expand Down Expand Up @@ -263,7 +267,7 @@ async function fetchData({
* @returns
*/
async function getPages(
{ url, _perPage, _hostingWPCOM, _auth, _accessToken, _verbose },
{ url, _perPage, _hostingWPCOM, _auth, _accessToken, _verbose, concurrentRequests },
page = 1
) {
try {
Expand Down Expand Up @@ -313,18 +317,16 @@ async function getPages(
}

// We got page 1, now we want pages 2 through totalPages
const requests = _.range(2, totalPages + 1).map(getPage => {
const options = getOptions(getPage)
return axios(options)
})
const pageOptions = _.range(2, totalPages + 1).map(getPage => getOptions(getPage))

return Promise.all(requests).then(pages => {
const data = pages.map(page => page.data)
data.forEach(list => {
result = result.concat(list)
})
return result
const pages = await requestInQueue(pageOptions, { concurrent: concurrentRequests })

const pageData = pages.map(page => page.data)
pageData.forEach(list => {
result = result.concat(list)
})

return result
} catch (e) {
return httpExceptionHandler(e)
}
Expand All @@ -349,7 +351,6 @@ function getValidRoutes({
refactoredEntityTypes,
}) {
let validRoutes = []

for (let key of Object.keys(allRoutes.data.routes)) {
if (_verbose) console.log(`Route discovered :`, key)
let route = allRoutes.data.routes[key]
Expand Down
2 changes: 2 additions & 0 deletions packages/gatsby-source-wordpress/src/gatsby-node.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ exports.sourceNodes = async (
verboseOutput,
perPage = 100,
searchAndReplaceContentUrls = {},
concurrentRequests = 10,
}
) => {
const { createNode } = boundActionCreators
Expand All @@ -48,6 +49,7 @@ exports.sourceNodes = async (
_perPage,
typePrefix,
refactoredEntityTypes,
concurrentRequests,
})

// Normalize data & create nodes
Expand Down
58 changes: 58 additions & 0 deletions packages/gatsby-source-wordpress/src/request-in-queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
const Queue = require(`better-queue`)
const Promise = require(`bluebird`)
const request = require(`axios`)

const _defaults = {
id: `url`,
}

/**
* [handleQueue description]
* @param {[type]} task [description]
* @param {Function} cb [description]
* @return {[type]} [description]
*/
async function handleQueue(task, cb) {
try {
const response = await request(task)
cb(null, response)
} catch (err) {
cb(err)
}
}

/**
* @typedef {Options}
* @type {Object}
* @see For a detailed descriptions of the options,
* see {@link https://www.npmjs.com/package/better-queue#full-documentation|better-queue on Github}
*/

/**
* Run a series of requests tasks in a queue for better flow control
*
* @param {Object[]} tasks An array of Axios formatted request objects
* @param {Options} opts Options that will be given to better-queue
* @return {Promise} Resolves with the accumulated values from the tasks
*/
module.exports = function requestInQueue (tasks, opts = {}) {
return new Promise((res, rej) => {
const q = new Queue(handleQueue, { ..._defaults, ...opts })

const taskMap = new Map(tasks.map((t) => {
q.push(t)
return [t.url, null]
}))

q.on(`task_failed`, (id, err) => {
rej(`${id} failed with err: ${err}`)
q.destroy()
})

q.on(`task_finish`, (id, response) => {
taskMap.set(id, response)
})

q.on(`drain`, () => res(Array.from(taskMap.values())))
})
}

0 comments on commit fe55f6d

Please sign in to comment.