Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: optimized multipartUpload progress #1142

Open
wants to merge 4 commits into
base: 7.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/common/multipart/completeMultipartUpload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ export async function completeMultipartUpload(
params.successStatuses = [200];
const result = await this.request(params);

if (options.progress) {
await options.progress(1, null, result.res);
}

const ret: any = {
res: result.res,
bucket: params.bucket,
Expand Down
61 changes: 22 additions & 39 deletions src/common/multipart/resumeMultipart.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,7 @@ import { Checkpoint } from '../../types/params';
* @param {Object} checkpoint the checkpoint
* @param {Object} options
*/
export async function resumeMultipart(
this: any,
checkpoint: Checkpoint,
options: any = {}
) {
export async function resumeMultipart(this: any, checkpoint: Checkpoint, options: any = {}) {
if (this.isCancel()) {
throw _makeCancelEvent();
}
Expand All @@ -39,7 +35,7 @@ export async function resumeMultipart(
const stream = await this._createStream(file, pi.start, pi.end);
const data = {
stream,
size: pi.end - pi.start,
size: pi.end - pi.start
};

if (stream && stream.pipe) {
Expand All @@ -64,14 +60,7 @@ export async function resumeMultipart(

let result;
try {
result = await handleUploadPart.call(
this,
name,
uploadId,
partNo,
data,
options
);
result = await handleUploadPart.call(this, name, uploadId, partNo, data, options);
} catch (error) {
if (typeof stream.destroy === 'function') {
stream.destroy();
Expand All @@ -87,21 +76,17 @@ export async function resumeMultipart(
if (!this.isCancel()) {
doneParts.push({
number: partNo,
etag: result.res.headers.etag,
etag: result.res.headers.etag
});
checkpoint.doneParts = doneParts;

if (options.progress) {
await options.progress(
doneParts.length / numParts,
checkpoint,
result.res
);
await options.progress(doneParts.length / (numParts + 1), checkpoint, result.res);
}

resolve({
number: partNo,
etag: result.res.headers.etag,
etag: result.res.headers.etag
});
}
}
Expand All @@ -123,10 +108,7 @@ export async function resumeMultipart(
const defaultParallel = 5;
const parallel = options.parallel || defaultParallel;

if (
this.checkBrowserAndVersion('Internet Explorer', '10') ||
parallel === 1
) {
if (this.checkBrowserAndVersion('Internet Explorer', '10') || parallel === 1) {
for (let i = 0; i < todo.length; i++) {
if (this.isCancel()) {
throw _makeCancelEvent();
Expand All @@ -136,12 +118,18 @@ export async function resumeMultipart(
}
} else {
// upload in parallel
const jobErr = await _parallel.call(this, todo, parallel,
const jobErr = await _parallel.call(
this,
todo,
parallel,
value => new Promise((resolve, reject) => {
uploadPartJob(value).then(() => {
resolve();
}).catch(reject);
}));
uploadPartJob(value)
.then(() => {
resolve();
})
.catch(reject);
})
);

if (this.isCancel()) {
uploadPartJob = null;
Expand All @@ -152,17 +140,12 @@ export async function resumeMultipart(
const abortEvent = jobErr.find(err => err.name === 'abort');
if (abortEvent) throw abortEvent;

jobErr[0].message = `Failed to upload some parts with error: ${jobErr[0].toString()} part_num: ${jobErr[0].partNum
}`;
jobErr[0].message = `Failed to upload some parts with error: ${jobErr[0].toString()} part_num: ${
jobErr[0].partNum
}`;
throw jobErr[0];
}
}

return await completeMultipartUpload.call(
this,
name,
uploadId,
doneParts,
options
);
return await completeMultipartUpload.call(this, name, uploadId, doneParts, options);
}
17 changes: 9 additions & 8 deletions src/types/params.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ export interface MultipartUploadOptions extends RequestOptions {
export interface GetObjectOptions extends MultiVersionCommonOptions {
process?: string; // image process params, will send with x-oss-process e.g.: {process: 'image/resize,w_200'}
/** only support Browser.js */
responseCacheControl?: string
responseCacheControl?: string;
}

export interface PutObjectOptions extends RequestOptions {
Expand Down Expand Up @@ -183,6 +183,7 @@ export interface PutBucketWebsiteConfig {

export interface CompleteMultipartUploadOptions extends RequestOptions {
callback?: ObjectCallback;
progress?: Function;
}

export interface InitMultipartUploadOptions extends RequestOptions {
Expand Down Expand Up @@ -242,13 +243,13 @@ export interface listQuery {
}

export interface listV2Query {
prefix?: string,
delimiter?: string,
'start-after'?: string,
'continuation-token'?: string,
'max-keys'?: string,
'encoding-type'?: 'url',
'fetch-owner'?: boolean,
prefix?: string;
delimiter?: string;
'start-after'?: string;
'continuation-token'?: string;
'max-keys'?: string;
'encoding-type'?: 'url';
'fetch-owner'?: boolean;
}

export interface postAsyncFetchOptions extends RequestOptions {
Expand Down
3 changes: 2 additions & 1 deletion test/browser/browser.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1552,7 +1552,8 @@ describe('browser', () => {
Array(10)
.fill(1)
.map((v, i) =>
store.uploadPart(name, uploadId, i + 1, file, i * partSize, Math.min((i + 1) * partSize, 10 * 100 * 1024)))
store.uploadPart(name, uploadId, i + 1, file, i * partSize, Math.min((i + 1) * partSize, 10 * 100 * 1024))
)
);
const dones = parts.map((_, i) => ({
number: i + 1,
Expand Down
6 changes: 3 additions & 3 deletions test/node/multipart.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ describe('test/multipart.test.js', () => {
}
});
assert.equal(result.res.status, 200);
assert.equal(progress, 12);
assert.equal(progress, 13);

const object = await store.get(name);
assert.equal(object.res.status, 200);
Expand Down Expand Up @@ -327,7 +327,7 @@ describe('test/multipart.test.js', () => {
}
});
assert.equal(result.res.status, 200);
assert.equal(progress, 12);
assert.equal(progress, 13);

const object = await store.get(name);
assert.equal(object.res.status, 200);
Expand Down Expand Up @@ -444,7 +444,7 @@ describe('test/multipart.test.js', () => {
}
});
assert.equal(result.res.status, 200);
assert.equal(progress, 12);
assert.equal(progress, 13);

const object = await store.get(name);
assert.equal(object.res.status, 200);
Expand Down