Skip to content

Commit

Permalink
Fix CacheWriter to handle potential DataSink write failures
Browse files Browse the repository at this point in the history
- If DataSource.close fails then it's unknown whether the underlying file was
  written to the cache. We should assume that it has not been.
- Always re-query cachedBytes at the start of CacheWriter.cache, since its
  current value may be incorrect if a previous failure was the result of a
  file not being written to the cache.

PiperOrigin-RevId: 359039109
  • Loading branch information
ojw28 committed Feb 23, 2021
1 parent 520f77b commit c067ee8
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 45 deletions.
4 changes: 4 additions & 0 deletions RELEASENOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
([#6384](https:/google/ExoPlayer/issues/6384)).
* Analytics:
* Add `onAudioCodecError` and `onVideoCodecError` to `AnalyticsListener`.
* Downloads and caching:
* Fix `CacheWriter` to correctly handle `DataSource.close` failures, for
which it cannot be assumed that data was successfully written to the
cache.
* Library restructuring:
* `DebugTextViewHelper` moved from `ui` package to `util` package.
* Spherical UI components moved from `video.spherical` package to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public interface ProgressListener {
private final byte[] temporaryBuffer;
@Nullable private final ProgressListener progressListener;

private boolean initialized;
private long nextPosition;
private long endPosition;
private long bytesCached;
Expand Down Expand Up @@ -118,18 +117,15 @@ public void cancel() {
public void cache() throws IOException {
throwIfCanceled();

if (!initialized) {
if (dataSpec.length != C.LENGTH_UNSET) {
endPosition = dataSpec.position + dataSpec.length;
} else {
long contentLength = ContentMetadata.getContentLength(cache.getContentMetadata(cacheKey));
endPosition = contentLength == C.LENGTH_UNSET ? C.POSITION_UNSET : contentLength;
}
bytesCached = cache.getCachedBytes(cacheKey, dataSpec.position, dataSpec.length);
if (progressListener != null) {
progressListener.onProgress(getLength(), bytesCached, /* newBytesCached= */ 0);
}
initialized = true;
bytesCached = cache.getCachedBytes(cacheKey, dataSpec.position, dataSpec.length);
if (dataSpec.length != C.LENGTH_UNSET) {
endPosition = dataSpec.position + dataSpec.length;
} else {
long contentLength = ContentMetadata.getContentLength(cache.getContentMetadata(cacheKey));
endPosition = contentLength == C.LENGTH_UNSET ? C.POSITION_UNSET : contentLength;
}
if (progressListener != null) {
progressListener.onProgress(getLength(), bytesCached, /* newBytesCached= */ 0);
}

while (endPosition == C.POSITION_UNSET || nextPosition < endPosition) {
Expand Down Expand Up @@ -158,42 +154,50 @@ public void cache() throws IOException {
*/
private long readBlockToCache(long position, long length) throws IOException {
boolean isLastBlock = position + length == endPosition || length == C.LENGTH_UNSET;
try {
long resolvedLength = C.LENGTH_UNSET;
boolean isDataSourceOpen = false;
if (length != C.LENGTH_UNSET) {
// If the length is specified, try to open the data source with a bounded request to avoid
// the underlying network stack requesting more data than required.
try {
DataSpec boundedDataSpec =
dataSpec.buildUpon().setPosition(position).setLength(length).build();
resolvedLength = dataSource.open(boundedDataSpec);
isDataSourceOpen = true;
} catch (IOException exception) {
if (allowShortContent
&& isLastBlock
&& DataSourceException.isCausedByPositionOutOfRange(exception)) {
// The length of the request exceeds the length of the content. If we allow shorter
// content and are reading the last block, fall through and try again with an unbounded
// request to read up to the end of the content.
Util.closeQuietly(dataSource);
} else {
throw exception;
}

long resolvedLength = C.LENGTH_UNSET;
boolean isDataSourceOpen = false;
if (length != C.LENGTH_UNSET) {
// If the length is specified, try to open the data source with a bounded request to avoid
// the underlying network stack requesting more data than required.
DataSpec boundedDataSpec =
dataSpec.buildUpon().setPosition(position).setLength(length).build();
try {
resolvedLength = dataSource.open(boundedDataSpec);
isDataSourceOpen = true;
} catch (IOException e) {
Util.closeQuietly(dataSource);
if (allowShortContent
&& isLastBlock
&& DataSourceException.isCausedByPositionOutOfRange(e)) {
// The length of the request exceeds the length of the content. If we allow shorter
// content and are reading the last block, fall through and try again with an unbounded
// request to read up to the end of the content.
} else {
throw e;
}
}
if (!isDataSourceOpen) {
// Either the length was unspecified, or we allow short content and our attempt to open the
// DataSource with the specified length failed.
throwIfCanceled();
DataSpec unboundedDataSpec =
dataSpec.buildUpon().setPosition(position).setLength(C.LENGTH_UNSET).build();
}

if (!isDataSourceOpen) {
// Either the length was unspecified, or we allow short content and our attempt to open the
// DataSource with the specified length failed.
throwIfCanceled();
DataSpec unboundedDataSpec =
dataSpec.buildUpon().setPosition(position).setLength(C.LENGTH_UNSET).build();
try {
resolvedLength = dataSource.open(unboundedDataSpec);
} catch (IOException e) {
Util.closeQuietly(dataSource);
throw e;
}
}

int totalBytesRead = 0;
try {
if (isLastBlock && resolvedLength != C.LENGTH_UNSET) {
onRequestEndPosition(position + resolvedLength);
}
int totalBytesRead = 0;
int bytesRead = 0;
while (bytesRead != C.RESULT_END_OF_INPUT) {
throwIfCanceled();
Expand All @@ -206,10 +210,16 @@ private long readBlockToCache(long position, long length) throws IOException {
if (isLastBlock) {
onRequestEndPosition(position + totalBytesRead);
}
return totalBytesRead;
} finally {
} catch (IOException e) {
Util.closeQuietly(dataSource);
throw e;
}

// Util.closeQuietly(dataSource) is not used here because it's important that an exception is
// thrown if DataSource.close fails. This is because there's no way of knowing whether the block
// was successfully cached in this case.
dataSource.close();
return totalBytesRead;
}

private void onRequestEndPosition(long endPosition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import androidx.test.ext.junit.runners.AndroidJUnit4;
import com.google.android.exoplayer2.MediaItem;
import com.google.android.exoplayer2.database.DatabaseProvider;
import com.google.android.exoplayer2.testutil.FailOnCloseDataSink;
import com.google.android.exoplayer2.testutil.FakeDataSet;
import com.google.android.exoplayer2.testutil.FakeDataSource;
import com.google.android.exoplayer2.testutil.TestUtil;
Expand All @@ -34,6 +35,7 @@
import com.google.android.exoplayer2.util.Util;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -66,7 +68,7 @@ public void deleteDownloadCache() {
}

@Test
public void download_afterSingleFailure_succeeds() throws Exception {
public void download_afterReadFailure_succeeds() throws Exception {
Uri uri = Uri.parse("test:///test.mp4");

// Fake data has a built in failure after 10 bytes.
Expand All @@ -92,6 +94,39 @@ public void download_afterSingleFailure_succeeds() throws Exception {
assertThat(progressListener.bytesDownloaded).isEqualTo(30);
}

@Test
public void download_afterWriteFailureOnClose_succeeds() throws Exception {
Uri uri = Uri.parse("test:///test.mp4");

FakeDataSet data = new FakeDataSet();
data.newData(uri).appendReadData(1024);
DataSource.Factory upstreamDataSource = new FakeDataSource.Factory().setFakeDataSet(data);

AtomicBoolean failOnClose = new AtomicBoolean(/* initialValue= */ true);
FailOnCloseDataSink.Factory dataSinkFactory =
new FailOnCloseDataSink.Factory(downloadCache, failOnClose);

MediaItem mediaItem = MediaItem.fromUri(uri);
CacheDataSource.Factory cacheDataSourceFactory =
new CacheDataSource.Factory()
.setCache(downloadCache)
.setCacheWriteDataSinkFactory(dataSinkFactory)
.setUpstreamDataSourceFactory(upstreamDataSource);
ProgressiveDownloader downloader = new ProgressiveDownloader(mediaItem, cacheDataSourceFactory);

TestProgressListener progressListener = new TestProgressListener();

// Failure expected after 1024 bytes.
assertThrows(IOException.class, () -> downloader.download(progressListener));
assertThat(progressListener.bytesDownloaded).isEqualTo(1024);

failOnClose.set(false);

// Retry should succeed.
downloader.download(progressListener);
assertThat(progressListener.bytesDownloaded).isEqualTo(1024);
}

private static final class TestProgressListener implements Downloader.ProgressListener {

public long bytesDownloaded;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@
import androidx.test.core.app.ApplicationProvider;
import androidx.test.ext.junit.runners.AndroidJUnit4;
import com.google.android.exoplayer2.C;
import com.google.android.exoplayer2.testutil.FailOnCloseDataSink;
import com.google.android.exoplayer2.testutil.FakeDataSet;
import com.google.android.exoplayer2.testutil.FakeDataSource;
import com.google.android.exoplayer2.testutil.TestUtil;
import com.google.android.exoplayer2.upstream.DataSourceException;
import com.google.android.exoplayer2.upstream.DataSpec;
import com.google.android.exoplayer2.upstream.FileDataSource;
import com.google.android.exoplayer2.util.Util;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -215,6 +218,50 @@ public void cacheThrowEOFException() throws Exception {
assertThat(DataSourceException.isCausedByPositionOutOfRange(exception)).isTrue();
}

@Test
public void cache_afterFailureOnClose_succeeds() throws Exception {
FakeDataSet fakeDataSet = new FakeDataSet().setRandomData("test_data", 100);
FakeDataSource upstreamDataSource = new FakeDataSource(fakeDataSet);

AtomicBoolean failOnClose = new AtomicBoolean(/* initialValue= */ true);
FailOnCloseDataSink dataSink = new FailOnCloseDataSink(cache, failOnClose);

CacheDataSource cacheDataSource =
new CacheDataSource(
cache,
upstreamDataSource,
new FileDataSource(),
dataSink,
/* flags= */ 0,
/* eventListener= */ null);

CachingCounters counters = new CachingCounters();

CacheWriter cacheWriter =
new CacheWriter(
cacheDataSource,
new DataSpec(Uri.parse("test_data")),
/* allowShortContent= */ false,
/* temporaryBuffer= */ null,
counters);

// DataSink.close failing must cause the operation to fail rather than succeed.
assertThrows(IOException.class, cacheWriter::cache);
// Since all of the bytes were read through the DataSource chain successfully before the sink
// was closed, the progress listener will have seen all of the bytes being cached, even though
// this may not really be the case.
counters.assertValues(
/* bytesAlreadyCached= */ 0, /* bytesNewlyCached= */ 100, /* contentLength= */ 100);

failOnClose.set(false);

// The bytes will be downloaded again, but cached successfully this time.
cacheWriter.cache();
counters.assertValues(
/* bytesAlreadyCached= */ 0, /* bytesNewlyCached= */ 100, /* contentLength= */ 100);
assertCachedData(cache, fakeDataSet);
}

@Test
public void cachePolling() throws Exception {
final CachingCounters counters = new CachingCounters();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2021 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.android.exoplayer2.testutil;

import com.google.android.exoplayer2.C;
import com.google.android.exoplayer2.upstream.DataSink;
import com.google.android.exoplayer2.upstream.DataSpec;
import com.google.android.exoplayer2.upstream.cache.Cache;
import com.google.android.exoplayer2.upstream.cache.CacheDataSink;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* A {@link DataSink} that can simulate caching the bytes being written to it, and then failing to
* persist them when {@link #close()} is called.
*/
public final class FailOnCloseDataSink implements DataSink {

/** Factory to create a {@link FailOnCloseDataSink}. */
public static final class Factory implements DataSink.Factory {

private final Cache cache;
private final AtomicBoolean failOnClose;

/**
* Creates an instance.
*
* @param cache The cache to write to when not in fail-on-close mode.
* @param failOnClose An {@link AtomicBoolean} whose value is read in each call to {@link #open}
* to determine whether to enable fail-on-close for the read that's being started.
*/
public Factory(Cache cache, AtomicBoolean failOnClose) {
this.cache = cache;
this.failOnClose = failOnClose;
}

@Override
public DataSink createDataSink() {
return new FailOnCloseDataSink(cache, failOnClose);
}
}

private final CacheDataSink wrappedSink;
private final AtomicBoolean failOnClose;
private boolean currentReadFailOnClose;

/**
* Creates an instance.
*
* @param cache The cache to write to when not in fail-on-close mode.
* @param failOnClose An {@link AtomicBoolean} whose value is read in each call to {@link #open}
* to determine whether to enable fail-on-close for the read that's being started.
*/
public FailOnCloseDataSink(Cache cache, AtomicBoolean failOnClose) {
this.wrappedSink = new CacheDataSink(cache, /* fragmentSize= */ C.LENGTH_UNSET);
this.failOnClose = failOnClose;
}

@Override
public void open(DataSpec dataSpec) throws IOException {
currentReadFailOnClose = failOnClose.get();
if (currentReadFailOnClose) {
return;
}
wrappedSink.open(dataSpec);
}

@Override
public void write(byte[] buffer, int offset, int length) throws IOException {
if (currentReadFailOnClose) {
return;
}
wrappedSink.write(buffer, offset, length);
}

@Override
public void close() throws IOException {
if (currentReadFailOnClose) {
throw new IOException("Fail on close");
}
wrappedSink.close();
}
}

0 comments on commit c067ee8

Please sign in to comment.