Push all Downloader networking onto the executor

Issue: Issue: #6978
PiperOrigin-RevId: 318710782
This commit is contained in:
olly 2020-06-28 18:28:48 +01:00 committed by Oliver Woodman
parent 4227c8f19f
commit c9717f67ea
13 changed files with 851 additions and 161 deletions

View File

@ -18,6 +18,7 @@ package com.google.android.exoplayer2.offline;
import android.net.Uri;
import androidx.annotation.Nullable;
import com.google.android.exoplayer2.upstream.cache.CacheDataSource;
import com.google.android.exoplayer2.util.Assertions;
import java.lang.reflect.Constructor;
import java.util.List;
import java.util.concurrent.Executor;
@ -94,8 +95,8 @@ public class DefaultDownloaderFactory implements DownloaderFactory {
*/
public DefaultDownloaderFactory(
CacheDataSource.Factory cacheDataSourceFactory, Executor executor) {
this.cacheDataSourceFactory = cacheDataSourceFactory;
this.executor = executor;
this.cacheDataSourceFactory = Assertions.checkNotNull(cacheDataSourceFactory);
this.executor = Assertions.checkNotNull(executor);
}
@Override

View File

@ -1331,7 +1331,7 @@ public final class DownloadManager {
}
}
} catch (InterruptedException e) {
// The task was canceled. Do nothing.
Thread.currentThread().interrupt();
} catch (Exception e) {
finalException = e;
}

View File

@ -18,6 +18,7 @@ package com.google.android.exoplayer2.offline;
import androidx.annotation.Nullable;
import com.google.android.exoplayer2.C;
import java.io.IOException;
import java.util.concurrent.CancellationException;
/** Downloads and removes a piece of content. */
public interface Downloader {
@ -44,15 +45,29 @@ public interface Downloader {
/**
* Downloads the content.
*
* <p>If downloading fails, this method can be called again to resume the download. It cannot be
* called again after the download has been {@link #cancel canceled}.
*
* <p>If downloading is canceled whilst this method is executing, then it is expected that it will
* return reasonably quickly. However, there are no guarantees about how the method will return,
* meaning that it can return without throwing, or by throwing any of its documented exceptions.
* The caller must use its own knowledge about whether downloading has been canceled to determine
* whether this is why the method has returned, rather than relying on the method returning in a
* particular way.
*
* @param progressListener A listener to receive progress updates, or {@code null}.
* @throws DownloadException Thrown if the content cannot be downloaded.
* @throws IOException If the download did not complete successfully.
* @throws IOException If the download failed to complete successfully.
* @throws InterruptedException If the download was interrupted.
* @throws CancellationException If the download was canceled.
*/
void download(@Nullable ProgressListener progressListener) throws IOException;
void download(@Nullable ProgressListener progressListener)
throws IOException, InterruptedException;
/**
* Cancels the download operation and prevents future download operations from running. The caller
* should also interrupt the downloading thread immediately after calling this method.
* Permanently cancels the downloading by this downloader. The caller should also interrupt the
* downloading thread immediately after calling this method.
*
* <p>Once canceled, {@link #download} cannot be called again.
*/
void cancel();

View File

@ -25,16 +25,24 @@ import com.google.android.exoplayer2.upstream.cache.CacheWriter;
import com.google.android.exoplayer2.util.Assertions;
import com.google.android.exoplayer2.util.PriorityTaskManager;
import com.google.android.exoplayer2.util.PriorityTaskManager.PriorityTooLowException;
import com.google.android.exoplayer2.util.RunnableFutureTask;
import com.google.android.exoplayer2.util.Util;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
/** A downloader for progressive media streams. */
public final class ProgressiveDownloader implements Downloader {
private final Executor executor;
private final DataSpec dataSpec;
private final CacheDataSource dataSource;
private final AtomicBoolean isCanceled;
@Nullable private final PriorityTaskManager priorityTaskManager;
@Nullable private ProgressListener progressListener;
private volatile @MonotonicNonNull RunnableFutureTask<Void, IOException> downloadRunnable;
private volatile boolean isCanceled;
/** @deprecated Use {@link #ProgressiveDownloader(MediaItem, CacheDataSource.Factory)} instead. */
@SuppressWarnings("deprecation")
@ -84,6 +92,7 @@ public final class ProgressiveDownloader implements Downloader {
*/
public ProgressiveDownloader(
MediaItem mediaItem, CacheDataSource.Factory cacheDataSourceFactory, Executor executor) {
this.executor = Assertions.checkNotNull(executor);
Assertions.checkNotNull(mediaItem.playbackProperties);
dataSpec =
new DataSpec.Builder()
@ -92,40 +101,65 @@ public final class ProgressiveDownloader implements Downloader {
.setFlags(DataSpec.FLAG_ALLOW_CACHE_FRAGMENTATION)
.build();
dataSource = cacheDataSourceFactory.createDataSourceForDownloading();
isCanceled = new AtomicBoolean();
priorityTaskManager = cacheDataSourceFactory.getUpstreamPriorityTaskManager();
}
@Override
public void download(@Nullable ProgressListener progressListener) throws IOException {
CacheWriter cacheWriter =
new CacheWriter(
dataSource,
dataSpec,
/* allowShortContent= */ false,
isCanceled,
/* temporaryBuffer= */ null,
progressListener == null ? null : new ProgressForwarder(progressListener));
public void download(@Nullable ProgressListener progressListener)
throws IOException, InterruptedException {
this.progressListener = progressListener;
if (downloadRunnable == null) {
CacheWriter cacheWriter =
new CacheWriter(
dataSource,
dataSpec,
/* allowShortContent= */ false,
/* temporaryBuffer= */ null,
this::onProgress);
downloadRunnable =
new RunnableFutureTask<Void, IOException>() {
@Override
protected Void doWork() throws IOException {
cacheWriter.cache();
return null;
}
@Override
protected void cancelWork() {
cacheWriter.cancel();
}
};
}
@Nullable PriorityTaskManager priorityTaskManager = dataSource.getUpstreamPriorityTaskManager();
if (priorityTaskManager != null) {
priorityTaskManager.add(C.PRIORITY_DOWNLOAD);
}
try {
boolean finished = false;
while (!finished && !isCanceled.get()) {
while (!finished && !isCanceled) {
if (priorityTaskManager != null) {
priorityTaskManager.proceed(dataSource.getUpstreamPriority());
priorityTaskManager.proceed(C.PRIORITY_DOWNLOAD);
}
executor.execute(downloadRunnable);
try {
cacheWriter.cache();
downloadRunnable.get();
finished = true;
} catch (PriorityTooLowException e) {
// The next loop iteration will block until the task is able to proceed.
} catch (ExecutionException e) {
Throwable cause = Assertions.checkNotNull(e.getCause());
if (cause instanceof PriorityTooLowException) {
// The next loop iteration will block until the task is able to proceed.
} else if (cause instanceof IOException) {
throw (IOException) cause;
} else {
// The cause must be an uncaught Throwable type.
Util.sneakyThrow(cause);
}
}
}
} catch (InterruptedException e) {
// The download was canceled.
} finally {
// If the main download thread was interrupted as part of cancelation, then it's possible that
// the runnable is still doing work. We need to wait until it's finished before returning.
downloadRunnable.blockUntilFinished();
if (priorityTaskManager != null) {
priorityTaskManager.remove(C.PRIORITY_DOWNLOAD);
}
@ -134,7 +168,11 @@ public final class ProgressiveDownloader implements Downloader {
@Override
public void cancel() {
isCanceled.set(true);
isCanceled = true;
RunnableFutureTask<Void, IOException> downloadRunnable = this.downloadRunnable;
if (downloadRunnable != null) {
downloadRunnable.cancel(/* interruptIfRunning= */ true);
}
}
@Override
@ -142,21 +180,14 @@ public final class ProgressiveDownloader implements Downloader {
dataSource.getCache().removeResource(dataSource.getCacheKeyFactory().buildCacheKey(dataSpec));
}
private static final class ProgressForwarder implements CacheWriter.ProgressListener {
private final ProgressListener progressListener;
public ProgressForwarder(ProgressListener progressListener) {
this.progressListener = progressListener;
}
@Override
public void onProgress(long contentLength, long bytesCached, long newBytesCached) {
float percentDownloaded =
contentLength == C.LENGTH_UNSET || contentLength == 0
? C.PERCENTAGE_UNSET
: ((bytesCached * 100f) / contentLength);
progressListener.onProgress(contentLength, bytesCached, percentDownloaded);
private void onProgress(long contentLength, long bytesCached, long newBytesCached) {
if (progressListener == null) {
return;
}
float percentDownloaded =
contentLength == C.LENGTH_UNSET || contentLength == 0
? C.PERCENTAGE_UNSET
: ((bytesCached * 100f) / contentLength);
progressListener.onProgress(contentLength, bytesCached, percentDownloaded);
}
}

View File

@ -33,14 +33,17 @@ import com.google.android.exoplayer2.upstream.cache.ContentMetadata;
import com.google.android.exoplayer2.util.Assertions;
import com.google.android.exoplayer2.util.PriorityTaskManager;
import com.google.android.exoplayer2.util.PriorityTaskManager.PriorityTooLowException;
import com.google.android.exoplayer2.util.RunnableFutureTask;
import com.google.android.exoplayer2.util.Util;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.checkerframework.checker.nullness.compatqual.NullableType;
/**
* Base class for multi segment stream downloaders.
@ -77,8 +80,22 @@ public abstract class SegmentDownloader<M extends FilterableManifest<M>> impleme
private final Parser<M> manifestParser;
private final ArrayList<StreamKey> streamKeys;
private final CacheDataSource.Factory cacheDataSourceFactory;
private final Cache cache;
private final CacheKeyFactory cacheKeyFactory;
@Nullable private final PriorityTaskManager priorityTaskManager;
private final Executor executor;
private final AtomicBoolean isCanceled;
/**
* The currently active runnables.
*
* <p>Note: Only the {@link #download} thread is permitted to modify this list. Modifications, as
* well as the iteration on the {@link #cancel} thread, must be synchronized on the instance for
* thread safety. Iterations on the {@link #download} thread do not need to be synchronized, and
* should not be synchronized because doing so can erroneously block {@link #cancel}.
*/
private final ArrayList<RunnableFutureTask<?, ?>> activeRunnables;
private volatile boolean isCanceled;
/**
* @param mediaItem The {@link MediaItem} to be downloaded.
@ -100,28 +117,31 @@ public abstract class SegmentDownloader<M extends FilterableManifest<M>> impleme
this.streamKeys = new ArrayList<>(mediaItem.playbackProperties.streamKeys);
this.cacheDataSourceFactory = cacheDataSourceFactory;
this.executor = executor;
isCanceled = new AtomicBoolean();
cache = Assertions.checkNotNull(cacheDataSourceFactory.getCache());
cacheKeyFactory = cacheDataSourceFactory.getCacheKeyFactory();
priorityTaskManager = cacheDataSourceFactory.getUpstreamPriorityTaskManager();
activeRunnables = new ArrayList<>();
}
@Override
public final void download(@Nullable ProgressListener progressListener) throws IOException {
@Nullable
PriorityTaskManager priorityTaskManager =
cacheDataSourceFactory.getUpstreamPriorityTaskManager();
public final void download(@Nullable ProgressListener progressListener)
throws IOException, InterruptedException {
ArrayDeque<Segment> pendingSegments = new ArrayDeque<>();
ArrayDeque<SegmentDownloadRunnable> recycledRunnables = new ArrayDeque<>();
if (priorityTaskManager != null) {
priorityTaskManager.add(C.PRIORITY_DOWNLOAD);
}
try {
Cache cache = Assertions.checkNotNull(cacheDataSourceFactory.getCache());
CacheKeyFactory cacheKeyFactory = cacheDataSourceFactory.getCacheKeyFactory();
CacheDataSource dataSource = cacheDataSourceFactory.createDataSourceForDownloading();
// Get the manifest and all of the segments.
M manifest = getManifest(dataSource, manifestDataSpec);
M manifest = getManifest(dataSource, manifestDataSpec, /* removing= */ false);
if (!streamKeys.isEmpty()) {
manifest = manifest.copy(streamKeys);
}
List<Segment> segments = getSegments(dataSource, manifest, /* allowIncompleteList= */ false);
List<Segment> segments = getSegments(dataSource, manifest, /* removing= */ false);
// Sort the segments so that we download media in the right order from the start of the
// content, and merge segments where possible to minimize the number of server round trips.
Collections.sort(segments);
mergeSegments(segments, cacheKeyFactory);
@ -169,34 +189,76 @@ public abstract class SegmentDownloader<M extends FilterableManifest<M>> impleme
bytesDownloaded,
segmentsDownloaded)
: null;
byte[] temporaryBuffer = new byte[BUFFER_SIZE_BYTES];
int segmentIndex = 0;
while (!isCanceled.get() && segmentIndex < segments.size()) {
pendingSegments.addAll(segments);
while (!isCanceled && !pendingSegments.isEmpty()) {
// Block until there aren't any higher priority tasks.
if (priorityTaskManager != null) {
priorityTaskManager.proceed(dataSource.getUpstreamPriority());
priorityTaskManager.proceed(C.PRIORITY_DOWNLOAD);
}
CacheWriter cacheWriter =
new CacheWriter(
dataSource,
segments.get(segmentIndex).dataSpec,
/* allowShortContent= */ false,
isCanceled,
temporaryBuffer,
progressNotifier);
try {
cacheWriter.cache();
segmentIndex++;
if (progressNotifier != null) {
progressNotifier.onSegmentDownloaded();
// Create and execute a runnable to download the next segment.
CacheDataSource segmentDataSource;
byte[] temporaryBuffer;
if (!recycledRunnables.isEmpty()) {
SegmentDownloadRunnable recycledRunnable = recycledRunnables.removeFirst();
segmentDataSource = recycledRunnable.dataSource;
temporaryBuffer = recycledRunnable.temporaryBuffer;
} else {
segmentDataSource = cacheDataSourceFactory.createDataSourceForDownloading();
temporaryBuffer = new byte[BUFFER_SIZE_BYTES];
}
Segment segment = pendingSegments.removeFirst();
SegmentDownloadRunnable downloadRunnable =
new SegmentDownloadRunnable(
segment, segmentDataSource, progressNotifier, temporaryBuffer);
addActiveRunnable(downloadRunnable);
executor.execute(downloadRunnable);
// Clean up runnables that have finished.
for (int j = activeRunnables.size() - 1; j >= 0; j--) {
SegmentDownloadRunnable activeRunnable = (SegmentDownloadRunnable) activeRunnables.get(j);
// Only block until the runnable has finished if we don't have any more pending segments
// to start. If we do have pending segments to start then only process the runnable if
// it's already finished.
if (pendingSegments.isEmpty() || activeRunnable.isDone()) {
try {
activeRunnable.get();
removeActiveRunnable(j);
recycledRunnables.addLast(activeRunnable);
} catch (ExecutionException e) {
Throwable cause = Assertions.checkNotNull(e.getCause());
if (cause instanceof PriorityTooLowException) {
// We need to schedule this segment again in a future loop iteration.
pendingSegments.addFirst(activeRunnable.segment);
removeActiveRunnable(j);
recycledRunnables.addLast(activeRunnable);
} else if (cause instanceof IOException) {
throw (IOException) cause;
} else {
// The cause must be an uncaught Throwable type.
Util.sneakyThrow(cause);
}
}
}
} catch (PriorityTooLowException e) {
// The next loop iteration will block until the task is able to proceed, then try and
// download the same segment again.
}
// Don't move on to the next segment until the runnable for this segment has started. This
// drip feeds runnables to the executor, rather than providing them all up front.
downloadRunnable.blockUntilStarted();
}
} catch (InterruptedException e) {
// The download was canceled.
} finally {
// If one of the runnables has thrown an exception, then it's possible there are other active
// runnables still doing work. We need to wait until they finish before exiting this method.
// Cancel them to speed this up.
for (int i = 0; i < activeRunnables.size(); i++) {
activeRunnables.get(i).cancel(/* interruptIfRunning= */ true);
}
// Wait until the runnables have finished. In addition to the failure case, we also need to
// do this for the case where the main download thread was interrupted as part of cancelation.
for (int i = activeRunnables.size() - 1; i >= 0; i--) {
activeRunnables.get(i).blockUntilFinished();
removeActiveRunnable(i);
}
if (priorityTaskManager != null) {
priorityTaskManager.remove(C.PRIORITY_DOWNLOAD);
}
@ -205,21 +267,26 @@ public abstract class SegmentDownloader<M extends FilterableManifest<M>> impleme
@Override
public void cancel() {
isCanceled.set(true);
synchronized (activeRunnables) {
isCanceled = true;
for (int i = 0; i < activeRunnables.size(); i++) {
activeRunnables.get(i).cancel(/* interruptIfRunning= */ true);
}
}
}
@Override
public final void remove() {
Cache cache = Assertions.checkNotNull(cacheDataSourceFactory.getCache());
CacheKeyFactory cacheKeyFactory = cacheDataSourceFactory.getCacheKeyFactory();
CacheDataSource dataSource = cacheDataSourceFactory.createDataSourceForRemovingDownload();
try {
M manifest = getManifest(dataSource, manifestDataSpec);
List<Segment> segments = getSegments(dataSource, manifest, true);
M manifest = getManifest(dataSource, manifestDataSpec, /* removing= */ true);
List<Segment> segments = getSegments(dataSource, manifest, /* removing= */ true);
for (int i = 0; i < segments.size(); i++) {
cache.removeResource(cacheKeyFactory.buildCacheKey(segments.get(i).dataSpec));
}
} catch (IOException e) {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
// Ignore exceptions when removing.
} finally {
// Always attempt to remove the manifest.
@ -232,34 +299,121 @@ public abstract class SegmentDownloader<M extends FilterableManifest<M>> impleme
/**
* Loads and parses a manifest.
*
* @param dataSource The {@link DataSource} through which to load.
* @param dataSpec The manifest {@link DataSpec}.
* @return The manifest.
* @throws IOException If an error occurs reading data.
* @param removing Whether the manifest is being loaded as part of the download being removed.
* @return The loaded manifest.
* @throws InterruptedException If the thread on which the method is called is interrupted.
* @throws IOException If an error occurs during execution.
*/
protected final M getManifest(DataSource dataSource, DataSpec dataSpec) throws IOException {
return ParsingLoadable.load(dataSource, manifestParser, dataSpec, C.DATA_TYPE_MANIFEST);
protected final M getManifest(DataSource dataSource, DataSpec dataSpec, boolean removing)
throws InterruptedException, IOException {
return execute(
new RunnableFutureTask<M, IOException>() {
@Override
protected M doWork() throws IOException {
return ParsingLoadable.load(dataSource, manifestParser, dataSpec, C.DATA_TYPE_MANIFEST);
}
},
removing);
}
/**
* Returns a list of all downloadable {@link Segment}s for a given manifest.
* Executes the provided {@link RunnableFutureTask}.
*
* @param runnable The {@link RunnableFutureTask} to execute.
* @param removing Whether the execution is part of the download being removed.
* @return The result.
* @throws InterruptedException If the thread on which the method is called is interrupted.
* @throws IOException If an error occurs during execution.
*/
protected final <T> T execute(RunnableFutureTask<T, ?> runnable, boolean removing)
throws InterruptedException, IOException {
if (removing) {
runnable.run();
try {
return runnable.get();
} catch (ExecutionException e) {
Throwable cause = Assertions.checkNotNull(e.getCause());
if (cause instanceof IOException) {
throw (IOException) cause;
} else {
// The cause must be an uncaught Throwable type.
Util.sneakyThrow(e);
}
}
}
while (true) {
if (isCanceled) {
throw new InterruptedException();
}
// Block until there aren't any higher priority tasks.
if (priorityTaskManager != null) {
priorityTaskManager.proceed(C.PRIORITY_DOWNLOAD);
}
addActiveRunnable(runnable);
executor.execute(runnable);
try {
return runnable.get();
} catch (ExecutionException e) {
Throwable cause = Assertions.checkNotNull(e.getCause());
if (cause instanceof PriorityTooLowException) {
// The next loop iteration will block until the task is able to proceed.
} else if (cause instanceof IOException) {
throw (IOException) cause;
} else {
// The cause must be an uncaught Throwable type.
Util.sneakyThrow(e);
}
} finally {
// We don't want to return for as long as the runnable might still be doing work.
runnable.blockUntilFinished();
removeActiveRunnable(runnable);
}
}
}
/**
* Returns a list of all downloadable {@link Segment}s for a given manifest. Any required data
* should be loaded using {@link #getManifest} or {@link #execute}.
*
* @param dataSource The {@link DataSource} through which to load any required data.
* @param manifest The manifest containing the segments.
* @param allowIncompleteList Whether to continue in the case that a load error prevents all
* segments from being listed. If true then a partial segment list will be returned. If false
* an {@link IOException} will be thrown.
* @param removing Whether the segments are being obtained as part of a removal. If true then a
* partial segment list is returned in the case that a load error prevents all segments from
* being listed. If false then an {@link IOException} will be thrown in this case.
* @return The list of downloadable {@link Segment}s.
* @throws IOException Thrown if {@code allowPartialIndex} is false and a load error occurs, or if
* the media is not in a form that allows for its segments to be listed.
* @throws IOException Thrown if {@code allowPartialIndex} is false and an execution error occurs,
* or if the media is not in a form that allows for its segments to be listed.
*/
protected abstract List<Segment> getSegments(
DataSource dataSource, M manifest, boolean allowIncompleteList) throws IOException;
protected abstract List<Segment> getSegments(DataSource dataSource, M manifest, boolean removing)
throws IOException, InterruptedException;
protected static DataSpec getCompressibleDataSpec(Uri uri) {
return new DataSpec.Builder().setUri(uri).setFlags(DataSpec.FLAG_ALLOW_GZIP).build();
}
private <T> void addActiveRunnable(RunnableFutureTask<T, ?> runnable)
throws InterruptedException {
synchronized (activeRunnables) {
if (isCanceled) {
throw new InterruptedException();
}
activeRunnables.add(runnable);
}
}
private void removeActiveRunnable(RunnableFutureTask<?, ?> runnable) {
synchronized (activeRunnables) {
activeRunnables.remove(runnable);
}
}
private void removeActiveRunnable(int index) {
synchronized (activeRunnables) {
activeRunnables.remove(index);
}
}
private static void mergeSegments(List<Segment> segments, CacheKeyFactory keyFactory) {
HashMap<String, Integer> lastIndexByCacheKey = new HashMap<>();
int nextOutIndex = 0;
@ -298,6 +452,47 @@ public abstract class SegmentDownloader<M extends FilterableManifest<M>> impleme
&& dataSpec1.httpRequestHeaders.equals(dataSpec2.httpRequestHeaders);
}
private static final class SegmentDownloadRunnable extends RunnableFutureTask<Void, IOException> {
public final Segment segment;
public final CacheDataSource dataSource;
@NullableType private final ProgressNotifier progressNotifier;
public final byte[] temporaryBuffer;
private final CacheWriter cacheWriter;
public SegmentDownloadRunnable(
Segment segment,
CacheDataSource dataSource,
@NullableType ProgressNotifier progressNotifier,
byte[] temporaryBuffer) {
this.segment = segment;
this.dataSource = dataSource;
this.progressNotifier = progressNotifier;
this.temporaryBuffer = temporaryBuffer;
this.cacheWriter =
new CacheWriter(
dataSource,
segment.dataSpec,
/* allowShortContent= */ false,
temporaryBuffer,
progressNotifier);
}
@Override
protected Void doWork() throws IOException {
cacheWriter.cache();
if (progressNotifier != null) {
progressNotifier.onSegmentDownloaded();
}
return null;
}
@Override
protected void cancelWork() {
cacheWriter.cancel();
}
}
private static final class ProgressNotifier implements CacheWriter.ProgressListener {
private final ProgressListener progressListener;

View File

@ -376,8 +376,6 @@ public final class CacheDataSource implements DataSource {
@Nullable private final DataSource cacheWriteDataSource;
private final DataSource upstreamDataSource;
private final CacheKeyFactory cacheKeyFactory;
@Nullable private final PriorityTaskManager upstreamPriorityTaskManager;
private final int upstreamPriority;
@Nullable private final EventListener eventListener;
private final boolean blockOnCache;
@ -513,8 +511,6 @@ public final class CacheDataSource implements DataSource {
this.ignoreCacheOnError = (flags & FLAG_IGNORE_CACHE_ON_ERROR) != 0;
this.ignoreCacheForUnsetLengthRequests =
(flags & FLAG_IGNORE_CACHE_FOR_UNSET_LENGTH_REQUESTS) != 0;
this.upstreamPriority = upstreamPriority;
this.upstreamPriorityTaskManager = upstreamPriorityTaskManager;
if (upstreamDataSource != null) {
if (upstreamPriorityTaskManager != null) {
upstreamDataSource =
@ -543,24 +539,6 @@ public final class CacheDataSource implements DataSource {
return cacheKeyFactory;
}
/**
* Returns the {@link PriorityTaskManager} used when there's a cache miss and requests need to be
* made to the upstream {@link DataSource}, or {@code null} if there is none.
*/
@Nullable
public PriorityTaskManager getUpstreamPriorityTaskManager() {
return upstreamPriorityTaskManager;
}
/**
* Returns the priority used when there's a cache miss and requests need to be made to the
* upstream {@link DataSource}. The priority is only used if the source has a {@link
* PriorityTaskManager}.
*/
public int getUpstreamPriority() {
return upstreamPriority;
}
@Override
public void addTransferListener(TransferListener transferListener) {
cacheReadDataSource.addTransferListener(transferListener);

View File

@ -25,7 +25,6 @@ import com.google.android.exoplayer2.util.PriorityTaskManager.PriorityTooLowExce
import com.google.android.exoplayer2.util.Util;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.atomic.AtomicBoolean;
/** Caching related utility methods. */
public final class CacheWriter {
@ -52,7 +51,6 @@ public final class CacheWriter {
private final Cache cache;
private final DataSpec dataSpec;
private final boolean allowShortContent;
private final AtomicBoolean isCanceled;
private final String cacheKey;
private final byte[] temporaryBuffer;
@Nullable private final ProgressListener progressListener;
@ -62,6 +60,8 @@ public final class CacheWriter {
private long endPosition;
private long bytesCached;
private volatile boolean isCanceled;
/**
* @param dataSource A {@link CacheDataSource} that writes to the target cache.
* @param dataSpec Defines the data to be written.
@ -69,9 +69,6 @@ public final class CacheWriter {
* defined by the {@link DataSpec}. If {@code true} and the request exceeds the length of the
* content, then the content will be cached to the end. If {@code false} and the request
* exceeds the length of the content, {@link #cache} will throw an {@link IOException}.
* @param isCanceled An optional cancelation signal. If specified, {@link #cache} will check the
* value of this signal frequently during caching. If the value is {@code true}, the operation
* will be considered canceled and {@link #cache} will throw {@link InterruptedIOException}.
* @param temporaryBuffer A temporary buffer to be used during caching, or {@code null} if the
* writer should instantiate its own internal temporary buffer.
* @param progressListener An optional progress listener.
@ -80,14 +77,12 @@ public final class CacheWriter {
CacheDataSource dataSource,
DataSpec dataSpec,
boolean allowShortContent,
@Nullable AtomicBoolean isCanceled,
@Nullable byte[] temporaryBuffer,
@Nullable ProgressListener progressListener) {
this.dataSource = dataSource;
this.cache = dataSource.getCache();
this.dataSpec = dataSpec;
this.allowShortContent = allowShortContent;
this.isCanceled = isCanceled == null ? new AtomicBoolean() : isCanceled;
this.temporaryBuffer =
temporaryBuffer == null ? new byte[DEFAULT_BUFFER_SIZE_BYTES] : temporaryBuffer;
this.progressListener = progressListener;
@ -95,6 +90,15 @@ public final class CacheWriter {
nextPosition = dataSpec.position;
}
/**
* Cancels this writer's caching operation. {@link #cache} checks for cancelation frequently
* during execution, and throws an {@link InterruptedIOException} if it sees that the caching
* operation has been canceled.
*/
public void cancel() {
isCanceled = true;
}
/**
* Caches the requested data, skipping any that's already cached.
*
@ -230,7 +234,7 @@ public final class CacheWriter {
}
private void throwIfCanceled() throws InterruptedIOException {
if (isCanceled.get()) {
if (isCanceled) {
throw new InterruptedIOException();
}
}

View File

@ -0,0 +1,172 @@
/*
* Copyright 2020 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.android.exoplayer2.util;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import androidx.annotation.Nullable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* A {@link RunnableFuture} that supports additional uninterruptible operations to query whether
* execution has started and finished.
*
* @param <R> The type of the result.
* @param <E> The type of any {@link ExecutionException} cause.
*/
public abstract class RunnableFutureTask<R, E extends Exception> implements RunnableFuture<R> {
private final ConditionVariable started;
private final ConditionVariable finished;
private final Object cancelLock;
@Nullable private Exception exception;
@Nullable private R result;
@Nullable private Thread workThread;
private boolean canceled;
protected RunnableFutureTask() {
started = new ConditionVariable();
finished = new ConditionVariable();
cancelLock = new Object();
}
/** Blocks until the task has started, or has been canceled without having been started. */
public final void blockUntilStarted() {
started.blockUninterruptible();
}
/** Blocks until the task has finished, or has been canceled without having been started. */
public final void blockUntilFinished() {
finished.blockUninterruptible();
}
// Future implementation.
@Override
@UnknownNull
public final R get() throws ExecutionException, InterruptedException {
finished.block();
return getResult();
}
@Override
@UnknownNull
public final R get(long timeout, TimeUnit unit)
throws ExecutionException, InterruptedException, TimeoutException {
long timeoutMs = MILLISECONDS.convert(timeout, unit);
if (!finished.block(timeoutMs)) {
throw new TimeoutException();
}
return getResult();
}
@Override
public final boolean cancel(boolean interruptIfRunning) {
synchronized (cancelLock) {
if (canceled || finished.isOpen()) {
return false;
}
canceled = true;
cancelWork();
@Nullable Thread workThread = this.workThread;
if (workThread != null) {
if (interruptIfRunning) {
workThread.interrupt();
}
} else {
started.open();
finished.open();
}
return true;
}
}
@Override
public final boolean isDone() {
return finished.isOpen();
}
@Override
public final boolean isCancelled() {
return canceled;
}
// Runnable implementation.
@Override
public final void run() {
synchronized (cancelLock) {
if (canceled) {
return;
}
workThread = Thread.currentThread();
}
started.open();
try {
result = doWork();
} catch (Exception e) {
// Must be an instance of E or RuntimeException.
exception = e;
} finally {
synchronized (cancelLock) {
finished.open();
workThread = null;
// Clear the interrupted flag if set, to avoid it leaking into any subsequent tasks executed
// using the calling thread.
Thread.interrupted();
}
}
}
// Internal methods.
/**
* Performs the work or computation.
*
* @return The computed result.
* @throws E If an error occurred.
*/
@UnknownNull
protected abstract R doWork() throws E;
/**
* Cancels any work being done by {@link #doWork()}. If {@link #doWork()} is currently executing
* then the thread on which it's executing may be interrupted immediately after this method
* returns.
*
* <p>The default implementation does nothing.
*/
protected void cancelWork() {
// Do nothing.
}
@SuppressWarnings("return.type.incompatible")
@UnknownNull
private R getResult() throws ExecutionException {
if (canceled) {
throw new CancellationException();
} else if (exception != null) {
throw new ExecutionException(exception);
}
return result;
}
}

View File

@ -362,7 +362,6 @@ public final class CacheDataSourceTest {
new CacheDataSource(cache, upstream2),
unboundedDataSpec,
/* allowShortContent= */ false,
/* isCanceled= */ null,
/* temporaryBuffer= */ null,
/* progressListener= */ null);
cacheWriter.cache();
@ -413,7 +412,6 @@ public final class CacheDataSourceTest {
new CacheDataSource(cache, upstream2),
unboundedDataSpec,
/* allowShortContent= */ false,
/* isCanceled= */ null,
/* temporaryBuffer= */ null,
/* progressListener= */ null);
cacheWriter.cache();
@ -439,7 +437,6 @@ public final class CacheDataSourceTest {
new CacheDataSource(cache, upstream),
dataSpec,
/* allowShortContent= */ false,
/* isCanceled= */ null,
/* temporaryBuffer= */ null,
/* progressListener= */ null);
cacheWriter.cache();
@ -477,7 +474,6 @@ public final class CacheDataSourceTest {
new CacheDataSource(cache, upstream),
dataSpec,
/* allowShortContent= */ false,
/* isCanceled= */ null,
/* temporaryBuffer= */ null,
/* progressListener= */ null);
cacheWriter.cache();

View File

@ -116,7 +116,6 @@ public final class CacheWriterTest {
new CacheDataSource(cache, dataSource),
new DataSpec(Uri.parse("test_data")),
/* allowShortContent= */ false,
/* isCanceled= */ null,
/* temporaryBuffer= */ null,
counters);
cacheWriter.cache();
@ -139,7 +138,6 @@ public final class CacheWriterTest {
new CacheDataSource(cache, dataSource),
dataSpec,
/* allowShortContent= */ false,
/* isCanceled= */ null,
/* temporaryBuffer= */ null,
counters);
cacheWriter.cache();
@ -152,7 +150,6 @@ public final class CacheWriterTest {
new CacheDataSource(cache, dataSource),
new DataSpec(testUri),
/* allowShortContent= */ false,
/* isCanceled= */ null,
/* temporaryBuffer= */ null,
counters);
cacheWriter.cache();
@ -176,7 +173,6 @@ public final class CacheWriterTest {
new CacheDataSource(cache, dataSource),
dataSpec,
/* allowShortContent= */ false,
/* isCanceled= */ null,
/* temporaryBuffer= */ null,
counters);
cacheWriter.cache();
@ -201,7 +197,6 @@ public final class CacheWriterTest {
new CacheDataSource(cache, dataSource),
dataSpec,
/* allowShortContent= */ false,
/* isCanceled= */ null,
/* temporaryBuffer= */ null,
counters);
cacheWriter.cache();
@ -214,7 +209,6 @@ public final class CacheWriterTest {
new CacheDataSource(cache, dataSource),
new DataSpec(testUri),
/* allowShortContent= */ false,
/* isCanceled= */ null,
/* temporaryBuffer= */ null,
counters);
cacheWriter.cache();
@ -237,7 +231,6 @@ public final class CacheWriterTest {
new CacheDataSource(cache, dataSource),
dataSpec,
/* allowShortContent= */ true,
/* isCanceled= */ null,
/* temporaryBuffer= */ null,
counters);
cacheWriter.cache();
@ -262,7 +255,6 @@ public final class CacheWriterTest {
new CacheDataSource(cache, dataSource),
dataSpec,
/* allowShortContent= */ false,
/* isCanceled= */ null,
/* temporaryBuffer= */ null,
/* progressListener= */ null)
.cache());
@ -288,7 +280,6 @@ public final class CacheWriterTest {
new CacheDataSource(cache, dataSource),
new DataSpec(Uri.parse("test_data")),
/* allowShortContent= */ false,
/* isCanceled= */ null,
/* temporaryBuffer= */ null,
counters);
cacheWriter.cache();

View File

@ -0,0 +1,302 @@
/*
* Copyright (C) 2020 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.android.exoplayer2.util;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;
import androidx.test.ext.junit.runners.AndroidJUnit4;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Test;
import org.junit.runner.RunWith;
/** Unit tests for {@link RunnableFutureTask}. */
@RunWith(AndroidJUnit4.class)
public class RunnableFutureTaskTest {
@Test
public void blockUntilStarted_ifNotStarted_blocks() throws InterruptedException {
RunnableFutureTask<Void, Exception> task =
new RunnableFutureTask<Void, Exception>() {
@Override
protected Void doWork() {
return null;
}
};
AtomicBoolean blockUntilStartedReturned = new AtomicBoolean();
Thread testThread =
new Thread() {
@Override
public void run() {
task.blockUntilStarted();
blockUntilStartedReturned.set(true);
}
};
testThread.start();
Thread.sleep(1000);
assertThat(blockUntilStartedReturned.get()).isFalse();
// Thread cleanup.
task.run();
testThread.join();
}
@Test(timeout = 1000)
public void blockUntilStarted_ifStarted_unblocks() throws InterruptedException {
ConditionVariable finish = new ConditionVariable();
RunnableFutureTask<Void, Exception> task =
new RunnableFutureTask<Void, Exception>() {
@Override
protected Void doWork() {
finish.blockUninterruptible();
return null;
}
};
Thread testThread = new Thread(task);
testThread.start();
task.blockUntilStarted(); // Should unblock.
// Thread cleanup.
finish.open();
testThread.join();
}
@Test(timeout = 1000)
public void blockUntilStarted_ifCanceled_unblocks() {
RunnableFutureTask<Void, Exception> task =
new RunnableFutureTask<Void, Exception>() {
@Override
protected Void doWork() {
return null;
}
};
task.cancel(/* interruptIfRunning= */ false);
// Should not block.
task.blockUntilStarted();
}
@Test
public void blockUntilFinished_ifNotFinished_blocks() throws InterruptedException {
ConditionVariable finish = new ConditionVariable();
RunnableFutureTask<Void, Exception> task =
new RunnableFutureTask<Void, Exception>() {
@Override
protected Void doWork() {
finish.blockUninterruptible();
return null;
}
};
Thread testThread1 = new Thread(task);
testThread1.start();
AtomicBoolean blockUntilFinishedReturned = new AtomicBoolean();
Thread testThread2 =
new Thread() {
@Override
public void run() {
task.blockUntilFinished();
blockUntilFinishedReturned.set(true);
}
};
testThread2.start();
Thread.sleep(1000);
assertThat(blockUntilFinishedReturned.get()).isFalse();
// Thread cleanup.
finish.open();
testThread1.join();
testThread2.join();
}
@Test(timeout = 1000)
public void blockUntilFinished_ifFinished_unblocks() throws InterruptedException {
RunnableFutureTask<Void, Exception> task =
new RunnableFutureTask<Void, Exception>() {
@Override
protected Void doWork() {
return null;
}
};
Thread testThread = new Thread(task);
testThread.start();
task.blockUntilFinished();
assertThat(task.isDone()).isTrue();
// Thread cleanup.
testThread.join();
}
@Test(timeout = 1000)
public void blockUntilFinished_ifCanceled_unblocks() {
RunnableFutureTask<Void, Exception> task =
new RunnableFutureTask<Void, Exception>() {
@Override
protected Void doWork() {
return null;
}
};
task.cancel(/* interruptIfRunning= */ false);
// Should not block.
task.blockUntilFinished();
}
@Test
public void get_ifNotFinished_blocks() throws InterruptedException {
ConditionVariable finish = new ConditionVariable();
RunnableFutureTask<Void, Exception> task =
new RunnableFutureTask<Void, Exception>() {
@Override
protected Void doWork() {
finish.blockUninterruptible();
return null;
}
};
Thread testThread1 = new Thread(task);
testThread1.start();
AtomicBoolean blockUntilGetResultReturned = new AtomicBoolean();
Thread testThread2 =
new Thread() {
@Override
public void run() {
try {
task.get();
} catch (ExecutionException | InterruptedException e) {
// Do nothing.
} finally {
blockUntilGetResultReturned.set(true);
}
}
};
testThread2.start();
Thread.sleep(1000);
assertThat(blockUntilGetResultReturned.get()).isFalse();
// Thread cleanup.
finish.open();
testThread1.join();
testThread2.join();
}
@Test(timeout = 1000)
public void get_returnsResult() throws ExecutionException, InterruptedException {
Object result = new Object();
RunnableFutureTask<Object, Exception> task =
new RunnableFutureTask<Object, Exception>() {
@Override
protected Object doWork() {
return result;
}
};
Thread testThread = new Thread(task);
testThread.start();
assertThat(task.get()).isSameInstanceAs(result);
// Thread cleanup.
testThread.join();
}
@Test(timeout = 1000)
public void get_throwsExecutionException_containsIOException() throws InterruptedException {
IOException exception = new IOException();
RunnableFutureTask<Object, IOException> task =
new RunnableFutureTask<Object, IOException>() {
@Override
protected Object doWork() throws IOException {
throw exception;
}
};
Thread testThread = new Thread(task);
testThread.start();
ExecutionException executionException = assertThrows(ExecutionException.class, task::get);
assertThat(executionException).hasCauseThat().isSameInstanceAs(exception);
// Thread cleanup.
testThread.join();
}
@Test(timeout = 1000)
public void get_throwsExecutionException_containsRuntimeException() throws InterruptedException {
RuntimeException exception = new RuntimeException();
RunnableFutureTask<Object, Exception> task =
new RunnableFutureTask<Object, Exception>() {
@Override
protected Object doWork() {
throw exception;
}
};
Thread testThread = new Thread(task);
testThread.start();
ExecutionException executionException = assertThrows(ExecutionException.class, task::get);
assertThat(executionException).hasCauseThat().isSameInstanceAs(exception);
// Thread cleanup.
testThread.join();
}
@Test
public void run_throwsError() {
Error error = new Error();
RunnableFutureTask<Object, Exception> task =
new RunnableFutureTask<Object, Exception>() {
@Override
protected Object doWork() {
throw error;
}
};
Error thrownError = assertThrows(Error.class, task::run);
assertThat(thrownError).isSameInstanceAs(error);
}
@Test
public void cancel_whenNotStarted_returnsTrue() {
RunnableFutureTask<Void, Exception> task =
new RunnableFutureTask<Void, Exception>() {
@Override
protected Void doWork() {
return null;
}
};
assertThat(task.cancel(/* interruptIfRunning= */ false)).isTrue();
}
@Test
public void cancel_whenCanceled_returnsFalse() {
RunnableFutureTask<Void, Exception> task =
new RunnableFutureTask<Void, Exception>() {
@Override
protected Void doWork() {
return null;
}
};
task.cancel(/* interruptIfRunning= */ false);
assertThat(task.cancel(/* interruptIfRunning= */ false)).isFalse();
}
}

View File

@ -36,10 +36,12 @@ import com.google.android.exoplayer2.upstream.DataSource;
import com.google.android.exoplayer2.upstream.DataSpec;
import com.google.android.exoplayer2.upstream.ParsingLoadable.Parser;
import com.google.android.exoplayer2.upstream.cache.CacheDataSource;
import com.google.android.exoplayer2.util.RunnableFutureTask;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import org.checkerframework.checker.nullness.compatqual.NullableType;
/**
* A downloader for DASH streams.
@ -140,8 +142,8 @@ public final class DashDownloader extends SegmentDownloader<DashManifest> {
@Override
protected List<Segment> getSegments(
DataSource dataSource, DashManifest manifest, boolean allowIncompleteList)
throws IOException {
DataSource dataSource, DashManifest manifest, boolean removing)
throws IOException, InterruptedException {
ArrayList<Segment> segments = new ArrayList<>();
for (int i = 0; i < manifest.getPeriodCount(); i++) {
Period period = manifest.getPeriod(i);
@ -150,36 +152,31 @@ public final class DashDownloader extends SegmentDownloader<DashManifest> {
List<AdaptationSet> adaptationSets = period.adaptationSets;
for (int j = 0; j < adaptationSets.size(); j++) {
addSegmentsForAdaptationSet(
dataSource,
adaptationSets.get(j),
periodStartUs,
periodDurationUs,
allowIncompleteList,
segments);
dataSource, adaptationSets.get(j), periodStartUs, periodDurationUs, removing, segments);
}
}
return segments;
}
private static void addSegmentsForAdaptationSet(
private void addSegmentsForAdaptationSet(
DataSource dataSource,
AdaptationSet adaptationSet,
long periodStartUs,
long periodDurationUs,
boolean allowIncompleteList,
boolean removing,
ArrayList<Segment> out)
throws IOException {
throws IOException, InterruptedException {
for (int i = 0; i < adaptationSet.representations.size(); i++) {
Representation representation = adaptationSet.representations.get(i);
DashSegmentIndex index;
try {
index = getSegmentIndex(dataSource, adaptationSet.type, representation);
index = getSegmentIndex(dataSource, adaptationSet.type, representation, removing);
if (index == null) {
// Loading succeeded but there was no index.
throw new DownloadException("Missing segment index");
}
} catch (IOException e) {
if (!allowIncompleteList) {
if (!removing) {
throw e;
}
// Generating an incomplete segment list is allowed. Advance to the next representation.
@ -215,16 +212,24 @@ public final class DashDownloader extends SegmentDownloader<DashManifest> {
out.add(new Segment(startTimeUs, dataSpec));
}
private static @Nullable DashSegmentIndex getSegmentIndex(
DataSource dataSource, int trackType, Representation representation) throws IOException {
@Nullable
private DashSegmentIndex getSegmentIndex(
DataSource dataSource, int trackType, Representation representation, boolean removing)
throws IOException, InterruptedException {
DashSegmentIndex index = representation.getIndex();
if (index != null) {
return index;
}
ChunkIndex seekMap = DashUtil.loadChunkIndex(dataSource, trackType, representation);
RunnableFutureTask<@NullableType ChunkIndex, IOException> runnable =
new RunnableFutureTask<@NullableType ChunkIndex, IOException>() {
@Override
protected @NullableType ChunkIndex doWork() throws IOException {
return DashUtil.loadChunkIndex(dataSource, trackType, representation);
}
};
@Nullable ChunkIndex seekMap = execute(runnable, removing);
return seekMap == null
? null
: new DashWrappingSegmentIndex(seekMap, representation.presentationTimeOffsetUs);
}
}

View File

@ -134,8 +134,8 @@ public final class HlsDownloader extends SegmentDownloader<HlsPlaylist> {
}
@Override
protected List<Segment> getSegments(
DataSource dataSource, HlsPlaylist playlist, boolean allowIncompleteList) throws IOException {
protected List<Segment> getSegments(DataSource dataSource, HlsPlaylist playlist, boolean removing)
throws IOException, InterruptedException {
ArrayList<DataSpec> mediaPlaylistDataSpecs = new ArrayList<>();
if (playlist instanceof HlsMasterPlaylist) {
HlsMasterPlaylist masterPlaylist = (HlsMasterPlaylist) playlist;
@ -151,9 +151,9 @@ public final class HlsDownloader extends SegmentDownloader<HlsPlaylist> {
segments.add(new Segment(/* startTimeUs= */ 0, mediaPlaylistDataSpec));
HlsMediaPlaylist mediaPlaylist;
try {
mediaPlaylist = (HlsMediaPlaylist) getManifest(dataSource, mediaPlaylistDataSpec);
mediaPlaylist = (HlsMediaPlaylist) getManifest(dataSource, mediaPlaylistDataSpec, removing);
} catch (IOException e) {
if (!allowIncompleteList) {
if (!removing) {
throw e;
}
// Generating an incomplete segment list is allowed. Advance to the next media playlist.