Skip to content

Commit

Permalink
[gatsby-source-filesystem/createRemoteFileNode] wait for file stream …
Browse files Browse the repository at this point in the history
…to finish, not just for response (gatsbyjs#4877)

* format

* [gatsby-source-filesystem/createRemoteFileNode] wait for file stream to finish, not just for response
  • Loading branch information
pieh authored and KyleAMathews committed Apr 6, 2018
1 parent 181b940 commit f0e1ae3
Showing 1 changed file with 52 additions and 46 deletions.
98 changes: 52 additions & 46 deletions src/create-remote-file-node.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,11 @@ const cacheId = url => `create-remote-file-node-${url}`
* @param {Stringq} str
* @return {String}
*/
const createHash = (str) => crypto
.createHash(`md5`)
.update(str)
.digest(`hex`)
const createHash = str =>
crypto
.createHash(`md5`)
.update(str)
.digest(`hex`)

const CACHE_DIR = `.cache`
const FS_PLUGIN_DIR = `gatsby-source-filesystem`
Expand All @@ -70,12 +71,8 @@ const FS_PLUGIN_DIR = `gatsby-source-filesystem`
* @param {String} url
* @return {String}
*/
const createFilePath = (directory, filename, ext) => path.join(
directory,
CACHE_DIR,
FS_PLUGIN_DIR,
`${filename}${ext}`
)
const createFilePath = (directory, filename, ext) =>
path.join(directory, CACHE_DIR, FS_PLUGIN_DIR, `${filename}${ext}`)

/********************
* Queue Management *
Expand Down Expand Up @@ -109,7 +106,7 @@ const queue = new Queue(pushToQueue, {
* @param {Queue~queueCallback} cb
* @return {Promise<null>}
*/
async function pushToQueue (task, cb) {
async function pushToQueue(task, cb) {
try {
const node = await processRemoteNode(task)
return cb(null, node)
Expand All @@ -133,21 +130,25 @@ async function pushToQueue (task, cb) {
* @param {String} filename
* @return {Promise<Object>} Resolves with the [http Result Object]{@link https://nodejs.org/api/http.html#http_class_http_serverresponse}
*/
const requestRemoteNode = (url, headers, tmpFilename, filename) => new Promise((resolve, reject) => {
const responseStream = got.stream(url, { ...headers, timeout: 30000 })
responseStream.pipe(fs.createWriteStream(tmpFilename))
responseStream.on(`downloadProgress`, pro => console.log(pro))

// If there's a 400/500 response or other error.
responseStream.on(`error`, (error, body, response) => {
fs.removeSync(tmpFilename)
reject({ error, body, response })
})
const requestRemoteNode = (url, headers, tmpFilename, filename) =>
new Promise((resolve, reject) => {
const responseStream = got.stream(url, { ...headers, timeout: 30000 })
const fsWriteStream = fs.createWriteStream(tmpFilename)
responseStream.pipe(fsWriteStream)
responseStream.on(`downloadProgress`, pro => console.log(pro))

// If there's a 400/500 response or other error.
responseStream.on(`error`, (error, body, response) => {
fs.removeSync(tmpFilename)
reject({ error, body, response })
})

responseStream.on(`response`, response => {
resolve(response)
responseStream.on(`response`, response => {
fsWriteStream.on(`finish`, () => {
resolve(response)
})
})
})
})

/**
* processRemoteNode
Expand All @@ -157,16 +158,10 @@ const requestRemoteNode = (url, headers, tmpFilename, filename) => new Promise((
* @param {CreateRemoteFileNodePayload} options
* @return {Promise<Object>} Resolves with the fileNode
*/
async function processRemoteNode ({ url, store, cache, createNode, auth = {} }) {
async function processRemoteNode({ url, store, cache, createNode, auth = {} }) {
// Ensure our cache directory exists.
const programDir = store.getState().program.directory
await fs.ensureDir(
path.join(
programDir,
CACHE_DIR,
FS_PLUGIN_DIR
)
)
await fs.ensureDir(path.join(programDir, CACHE_DIR, FS_PLUGIN_DIR))

// See if there's response headers for this url
// from a previous request.
Expand All @@ -192,14 +187,19 @@ async function processRemoteNode ({ url, store, cache, createNode, auth = {} })

// Fetch the file.
try {
const response = await requestRemoteNode(url, headers, tmpFilename, filename)
const response = await requestRemoteNode(
url,
headers,
tmpFilename,
filename
)
// Save the response headers for future requests.
cache.set(cacheId(url), response.headers)

// If the status code is 200, move the piped temp file to the real name.
if (response.statusCode === 200) {
await fs.move(tmpFilename, filename, { overwrite: true })
// Else if 304, remove the empty response.
// Else if 304, remove the empty response.
} else {
await fs.remove(tmpFilename)
}
Expand Down Expand Up @@ -233,16 +233,17 @@ const processingCache = {}
* @param {CreateRemoteFileNodePayload} task
* @return {Promise<Object>}
*/
const pushTask = (task) => new Promise((resolve, reject) => {
queue
.push(task)
.on(`finish`, (task) => {
resolve(task)
})
.on(`failed`, () => {
resolve()
})
})
const pushTask = task =>
new Promise((resolve, reject) => {
queue
.push(task)
.on(`finish`, task => {
resolve(task)
})
.on(`failed`, () => {
resolve()
})
})

/***************
* Entry Point *
Expand All @@ -266,12 +267,17 @@ module.exports = ({ url, store, cache, createNode, auth = {} }) => {
return processingCache[url]
}


if (!url || isWebUri(url) === undefined) {
// should we resolve here, or reject?
// Technically, it's invalid input
return Promise.resolve()
}

return (processingCache[url] = pushTask({ url, store, cache, createNode, auth }))
return (processingCache[url] = pushTask({
url,
store,
cache,
createNode,
auth,
}))
}

0 comments on commit f0e1ae3

Please sign in to comment.