diff --git a/library/core/src/main/java/com/google/android/exoplayer2/offline/DefaultDownloaderFactory.java b/library/core/src/main/java/com/google/android/exoplayer2/offline/DefaultDownloaderFactory.java
index 0b7434c339..67e2bd8c77 100644
--- a/library/core/src/main/java/com/google/android/exoplayer2/offline/DefaultDownloaderFactory.java
+++ b/library/core/src/main/java/com/google/android/exoplayer2/offline/DefaultDownloaderFactory.java
@@ -18,6 +18,7 @@ package com.google.android.exoplayer2.offline;
import android.net.Uri;
import androidx.annotation.Nullable;
import com.google.android.exoplayer2.upstream.cache.CacheDataSource;
+import com.google.android.exoplayer2.util.Assertions;
import java.lang.reflect.Constructor;
import java.util.List;
import java.util.concurrent.Executor;
@@ -94,8 +95,8 @@ public class DefaultDownloaderFactory implements DownloaderFactory {
*/
public DefaultDownloaderFactory(
CacheDataSource.Factory cacheDataSourceFactory, Executor executor) {
- this.cacheDataSourceFactory = cacheDataSourceFactory;
- this.executor = executor;
+ this.cacheDataSourceFactory = Assertions.checkNotNull(cacheDataSourceFactory);
+ this.executor = Assertions.checkNotNull(executor);
}
@Override
diff --git a/library/core/src/main/java/com/google/android/exoplayer2/offline/DownloadManager.java b/library/core/src/main/java/com/google/android/exoplayer2/offline/DownloadManager.java
index 50df4a0e8a..5b80b64ad8 100644
--- a/library/core/src/main/java/com/google/android/exoplayer2/offline/DownloadManager.java
+++ b/library/core/src/main/java/com/google/android/exoplayer2/offline/DownloadManager.java
@@ -1331,7 +1331,7 @@ public final class DownloadManager {
}
}
} catch (InterruptedException e) {
- // The task was canceled. Do nothing.
+ Thread.currentThread().interrupt();
} catch (Exception e) {
finalException = e;
}
diff --git a/library/core/src/main/java/com/google/android/exoplayer2/offline/Downloader.java b/library/core/src/main/java/com/google/android/exoplayer2/offline/Downloader.java
index 56f8c0ce8d..8e51bf685e 100644
--- a/library/core/src/main/java/com/google/android/exoplayer2/offline/Downloader.java
+++ b/library/core/src/main/java/com/google/android/exoplayer2/offline/Downloader.java
@@ -18,6 +18,7 @@ package com.google.android.exoplayer2.offline;
import androidx.annotation.Nullable;
import com.google.android.exoplayer2.C;
import java.io.IOException;
+import java.util.concurrent.CancellationException;
/** Downloads and removes a piece of content. */
public interface Downloader {
@@ -44,15 +45,29 @@ public interface Downloader {
/**
* Downloads the content.
*
+ *
If downloading fails, this method can be called again to resume the download. It cannot be
+ * called again after the download has been {@link #cancel canceled}.
+ *
+ *
If downloading is canceled whilst this method is executing, then it is expected that it will
+ * return reasonably quickly. However, there are no guarantees about how the method will return,
+ * meaning that it can return without throwing, or by throwing any of its documented exceptions.
+ * The caller must use its own knowledge about whether downloading has been canceled to determine
+ * whether this is why the method has returned, rather than relying on the method returning in a
+ * particular way.
+ *
* @param progressListener A listener to receive progress updates, or {@code null}.
- * @throws DownloadException Thrown if the content cannot be downloaded.
- * @throws IOException If the download did not complete successfully.
+ * @throws IOException If the download failed to complete successfully.
+ * @throws InterruptedException If the download was interrupted.
+ * @throws CancellationException If the download was canceled.
*/
- void download(@Nullable ProgressListener progressListener) throws IOException;
+ void download(@Nullable ProgressListener progressListener)
+ throws IOException, InterruptedException;
/**
- * Cancels the download operation and prevents future download operations from running. The caller
- * should also interrupt the downloading thread immediately after calling this method.
+ * Permanently cancels the downloading by this downloader. The caller should also interrupt the
+ * downloading thread immediately after calling this method.
+ *
+ *
Once canceled, {@link #download} cannot be called again.
*/
void cancel();
diff --git a/library/core/src/main/java/com/google/android/exoplayer2/offline/ProgressiveDownloader.java b/library/core/src/main/java/com/google/android/exoplayer2/offline/ProgressiveDownloader.java
index dd251dad26..09fa444cf3 100644
--- a/library/core/src/main/java/com/google/android/exoplayer2/offline/ProgressiveDownloader.java
+++ b/library/core/src/main/java/com/google/android/exoplayer2/offline/ProgressiveDownloader.java
@@ -25,16 +25,24 @@ import com.google.android.exoplayer2.upstream.cache.CacheWriter;
import com.google.android.exoplayer2.util.Assertions;
import com.google.android.exoplayer2.util.PriorityTaskManager;
import com.google.android.exoplayer2.util.PriorityTaskManager.PriorityTooLowException;
+import com.google.android.exoplayer2.util.RunnableFutureTask;
+import com.google.android.exoplayer2.util.Util;
import java.io.IOException;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
/** A downloader for progressive media streams. */
public final class ProgressiveDownloader implements Downloader {
+ private final Executor executor;
private final DataSpec dataSpec;
private final CacheDataSource dataSource;
- private final AtomicBoolean isCanceled;
+ @Nullable private final PriorityTaskManager priorityTaskManager;
+
+ @Nullable private ProgressListener progressListener;
+ private volatile @MonotonicNonNull RunnableFutureTask downloadRunnable;
+ private volatile boolean isCanceled;
/** @deprecated Use {@link #ProgressiveDownloader(MediaItem, CacheDataSource.Factory)} instead. */
@SuppressWarnings("deprecation")
@@ -84,6 +92,7 @@ public final class ProgressiveDownloader implements Downloader {
*/
public ProgressiveDownloader(
MediaItem mediaItem, CacheDataSource.Factory cacheDataSourceFactory, Executor executor) {
+ this.executor = Assertions.checkNotNull(executor);
Assertions.checkNotNull(mediaItem.playbackProperties);
dataSpec =
new DataSpec.Builder()
@@ -92,40 +101,65 @@ public final class ProgressiveDownloader implements Downloader {
.setFlags(DataSpec.FLAG_ALLOW_CACHE_FRAGMENTATION)
.build();
dataSource = cacheDataSourceFactory.createDataSourceForDownloading();
- isCanceled = new AtomicBoolean();
+ priorityTaskManager = cacheDataSourceFactory.getUpstreamPriorityTaskManager();
}
@Override
- public void download(@Nullable ProgressListener progressListener) throws IOException {
- CacheWriter cacheWriter =
- new CacheWriter(
- dataSource,
- dataSpec,
- /* allowShortContent= */ false,
- isCanceled,
- /* temporaryBuffer= */ null,
- progressListener == null ? null : new ProgressForwarder(progressListener));
+ public void download(@Nullable ProgressListener progressListener)
+ throws IOException, InterruptedException {
+ this.progressListener = progressListener;
+ if (downloadRunnable == null) {
+ CacheWriter cacheWriter =
+ new CacheWriter(
+ dataSource,
+ dataSpec,
+ /* allowShortContent= */ false,
+ /* temporaryBuffer= */ null,
+ this::onProgress);
+ downloadRunnable =
+ new RunnableFutureTask() {
+ @Override
+ protected Void doWork() throws IOException {
+ cacheWriter.cache();
+ return null;
+ }
+
+ @Override
+ protected void cancelWork() {
+ cacheWriter.cancel();
+ }
+ };
+ }
- @Nullable PriorityTaskManager priorityTaskManager = dataSource.getUpstreamPriorityTaskManager();
if (priorityTaskManager != null) {
priorityTaskManager.add(C.PRIORITY_DOWNLOAD);
}
try {
boolean finished = false;
- while (!finished && !isCanceled.get()) {
+ while (!finished && !isCanceled) {
if (priorityTaskManager != null) {
- priorityTaskManager.proceed(dataSource.getUpstreamPriority());
+ priorityTaskManager.proceed(C.PRIORITY_DOWNLOAD);
}
+ executor.execute(downloadRunnable);
try {
- cacheWriter.cache();
+ downloadRunnable.get();
finished = true;
- } catch (PriorityTooLowException e) {
- // The next loop iteration will block until the task is able to proceed.
+ } catch (ExecutionException e) {
+ Throwable cause = Assertions.checkNotNull(e.getCause());
+ if (cause instanceof PriorityTooLowException) {
+ // The next loop iteration will block until the task is able to proceed.
+ } else if (cause instanceof IOException) {
+ throw (IOException) cause;
+ } else {
+ // The cause must be an uncaught Throwable type.
+ Util.sneakyThrow(cause);
+ }
}
}
- } catch (InterruptedException e) {
- // The download was canceled.
} finally {
+ // If the main download thread was interrupted as part of cancelation, then it's possible that
+ // the runnable is still doing work. We need to wait until it's finished before returning.
+ downloadRunnable.blockUntilFinished();
if (priorityTaskManager != null) {
priorityTaskManager.remove(C.PRIORITY_DOWNLOAD);
}
@@ -134,7 +168,11 @@ public final class ProgressiveDownloader implements Downloader {
@Override
public void cancel() {
- isCanceled.set(true);
+ isCanceled = true;
+ RunnableFutureTask downloadRunnable = this.downloadRunnable;
+ if (downloadRunnable != null) {
+ downloadRunnable.cancel(/* interruptIfRunning= */ true);
+ }
}
@Override
@@ -142,21 +180,14 @@ public final class ProgressiveDownloader implements Downloader {
dataSource.getCache().removeResource(dataSource.getCacheKeyFactory().buildCacheKey(dataSpec));
}
- private static final class ProgressForwarder implements CacheWriter.ProgressListener {
-
- private final ProgressListener progressListener;
-
- public ProgressForwarder(ProgressListener progressListener) {
- this.progressListener = progressListener;
- }
-
- @Override
- public void onProgress(long contentLength, long bytesCached, long newBytesCached) {
- float percentDownloaded =
- contentLength == C.LENGTH_UNSET || contentLength == 0
- ? C.PERCENTAGE_UNSET
- : ((bytesCached * 100f) / contentLength);
- progressListener.onProgress(contentLength, bytesCached, percentDownloaded);
+ private void onProgress(long contentLength, long bytesCached, long newBytesCached) {
+ if (progressListener == null) {
+ return;
}
+ float percentDownloaded =
+ contentLength == C.LENGTH_UNSET || contentLength == 0
+ ? C.PERCENTAGE_UNSET
+ : ((bytesCached * 100f) / contentLength);
+ progressListener.onProgress(contentLength, bytesCached, percentDownloaded);
}
}
diff --git a/library/core/src/main/java/com/google/android/exoplayer2/offline/SegmentDownloader.java b/library/core/src/main/java/com/google/android/exoplayer2/offline/SegmentDownloader.java
index 7360b65f70..d824dfd1e3 100644
--- a/library/core/src/main/java/com/google/android/exoplayer2/offline/SegmentDownloader.java
+++ b/library/core/src/main/java/com/google/android/exoplayer2/offline/SegmentDownloader.java
@@ -33,14 +33,17 @@ import com.google.android.exoplayer2.upstream.cache.ContentMetadata;
import com.google.android.exoplayer2.util.Assertions;
import com.google.android.exoplayer2.util.PriorityTaskManager;
import com.google.android.exoplayer2.util.PriorityTaskManager.PriorityTooLowException;
+import com.google.android.exoplayer2.util.RunnableFutureTask;
import com.google.android.exoplayer2.util.Util;
import java.io.IOException;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
+import org.checkerframework.checker.nullness.compatqual.NullableType;
/**
* Base class for multi segment stream downloaders.
@@ -77,8 +80,22 @@ public abstract class SegmentDownloader> impleme
private final Parser manifestParser;
private final ArrayList streamKeys;
private final CacheDataSource.Factory cacheDataSourceFactory;
+ private final Cache cache;
+ private final CacheKeyFactory cacheKeyFactory;
+ @Nullable private final PriorityTaskManager priorityTaskManager;
private final Executor executor;
- private final AtomicBoolean isCanceled;
+
+ /**
+ * The currently active runnables.
+ *
+ *
Note: Only the {@link #download} thread is permitted to modify this list. Modifications, as
+ * well as the iteration on the {@link #cancel} thread, must be synchronized on the instance for
+ * thread safety. Iterations on the {@link #download} thread do not need to be synchronized, and
+ * should not be synchronized because doing so can erroneously block {@link #cancel}.
+ */
+ private final ArrayList> activeRunnables;
+
+ private volatile boolean isCanceled;
/**
* @param mediaItem The {@link MediaItem} to be downloaded.
@@ -100,28 +117,31 @@ public abstract class SegmentDownloader> impleme
this.streamKeys = new ArrayList<>(mediaItem.playbackProperties.streamKeys);
this.cacheDataSourceFactory = cacheDataSourceFactory;
this.executor = executor;
- isCanceled = new AtomicBoolean();
+ cache = Assertions.checkNotNull(cacheDataSourceFactory.getCache());
+ cacheKeyFactory = cacheDataSourceFactory.getCacheKeyFactory();
+ priorityTaskManager = cacheDataSourceFactory.getUpstreamPriorityTaskManager();
+ activeRunnables = new ArrayList<>();
}
@Override
- public final void download(@Nullable ProgressListener progressListener) throws IOException {
- @Nullable
- PriorityTaskManager priorityTaskManager =
- cacheDataSourceFactory.getUpstreamPriorityTaskManager();
+ public final void download(@Nullable ProgressListener progressListener)
+ throws IOException, InterruptedException {
+ ArrayDeque pendingSegments = new ArrayDeque<>();
+ ArrayDeque recycledRunnables = new ArrayDeque<>();
if (priorityTaskManager != null) {
priorityTaskManager.add(C.PRIORITY_DOWNLOAD);
}
try {
- Cache cache = Assertions.checkNotNull(cacheDataSourceFactory.getCache());
- CacheKeyFactory cacheKeyFactory = cacheDataSourceFactory.getCacheKeyFactory();
CacheDataSource dataSource = cacheDataSourceFactory.createDataSourceForDownloading();
-
// Get the manifest and all of the segments.
- M manifest = getManifest(dataSource, manifestDataSpec);
+ M manifest = getManifest(dataSource, manifestDataSpec, /* removing= */ false);
if (!streamKeys.isEmpty()) {
manifest = manifest.copy(streamKeys);
}
- List segments = getSegments(dataSource, manifest, /* allowIncompleteList= */ false);
+ List segments = getSegments(dataSource, manifest, /* removing= */ false);
+
+ // Sort the segments so that we download media in the right order from the start of the
+ // content, and merge segments where possible to minimize the number of server round trips.
Collections.sort(segments);
mergeSegments(segments, cacheKeyFactory);
@@ -169,34 +189,76 @@ public abstract class SegmentDownloader> impleme
bytesDownloaded,
segmentsDownloaded)
: null;
- byte[] temporaryBuffer = new byte[BUFFER_SIZE_BYTES];
- int segmentIndex = 0;
- while (!isCanceled.get() && segmentIndex < segments.size()) {
+ pendingSegments.addAll(segments);
+ while (!isCanceled && !pendingSegments.isEmpty()) {
+ // Block until there aren't any higher priority tasks.
if (priorityTaskManager != null) {
- priorityTaskManager.proceed(dataSource.getUpstreamPriority());
+ priorityTaskManager.proceed(C.PRIORITY_DOWNLOAD);
}
- CacheWriter cacheWriter =
- new CacheWriter(
- dataSource,
- segments.get(segmentIndex).dataSpec,
- /* allowShortContent= */ false,
- isCanceled,
- temporaryBuffer,
- progressNotifier);
- try {
- cacheWriter.cache();
- segmentIndex++;
- if (progressNotifier != null) {
- progressNotifier.onSegmentDownloaded();
+
+ // Create and execute a runnable to download the next segment.
+ CacheDataSource segmentDataSource;
+ byte[] temporaryBuffer;
+ if (!recycledRunnables.isEmpty()) {
+ SegmentDownloadRunnable recycledRunnable = recycledRunnables.removeFirst();
+ segmentDataSource = recycledRunnable.dataSource;
+ temporaryBuffer = recycledRunnable.temporaryBuffer;
+ } else {
+ segmentDataSource = cacheDataSourceFactory.createDataSourceForDownloading();
+ temporaryBuffer = new byte[BUFFER_SIZE_BYTES];
+ }
+ Segment segment = pendingSegments.removeFirst();
+ SegmentDownloadRunnable downloadRunnable =
+ new SegmentDownloadRunnable(
+ segment, segmentDataSource, progressNotifier, temporaryBuffer);
+ addActiveRunnable(downloadRunnable);
+ executor.execute(downloadRunnable);
+
+ // Clean up runnables that have finished.
+ for (int j = activeRunnables.size() - 1; j >= 0; j--) {
+ SegmentDownloadRunnable activeRunnable = (SegmentDownloadRunnable) activeRunnables.get(j);
+ // Only block until the runnable has finished if we don't have any more pending segments
+ // to start. If we do have pending segments to start then only process the runnable if
+ // it's already finished.
+ if (pendingSegments.isEmpty() || activeRunnable.isDone()) {
+ try {
+ activeRunnable.get();
+ removeActiveRunnable(j);
+ recycledRunnables.addLast(activeRunnable);
+ } catch (ExecutionException e) {
+ Throwable cause = Assertions.checkNotNull(e.getCause());
+ if (cause instanceof PriorityTooLowException) {
+ // We need to schedule this segment again in a future loop iteration.
+ pendingSegments.addFirst(activeRunnable.segment);
+ removeActiveRunnable(j);
+ recycledRunnables.addLast(activeRunnable);
+ } else if (cause instanceof IOException) {
+ throw (IOException) cause;
+ } else {
+ // The cause must be an uncaught Throwable type.
+ Util.sneakyThrow(cause);
+ }
+ }
}
- } catch (PriorityTooLowException e) {
- // The next loop iteration will block until the task is able to proceed, then try and
- // download the same segment again.
}
+
+ // Don't move on to the next segment until the runnable for this segment has started. This
+ // drip feeds runnables to the executor, rather than providing them all up front.
+ downloadRunnable.blockUntilStarted();
}
- } catch (InterruptedException e) {
- // The download was canceled.
} finally {
+ // If one of the runnables has thrown an exception, then it's possible there are other active
+ // runnables still doing work. We need to wait until they finish before exiting this method.
+ // Cancel them to speed this up.
+ for (int i = 0; i < activeRunnables.size(); i++) {
+ activeRunnables.get(i).cancel(/* interruptIfRunning= */ true);
+ }
+ // Wait until the runnables have finished. In addition to the failure case, we also need to
+ // do this for the case where the main download thread was interrupted as part of cancelation.
+ for (int i = activeRunnables.size() - 1; i >= 0; i--) {
+ activeRunnables.get(i).blockUntilFinished();
+ removeActiveRunnable(i);
+ }
if (priorityTaskManager != null) {
priorityTaskManager.remove(C.PRIORITY_DOWNLOAD);
}
@@ -205,21 +267,26 @@ public abstract class SegmentDownloader> impleme
@Override
public void cancel() {
- isCanceled.set(true);
+ synchronized (activeRunnables) {
+ isCanceled = true;
+ for (int i = 0; i < activeRunnables.size(); i++) {
+ activeRunnables.get(i).cancel(/* interruptIfRunning= */ true);
+ }
+ }
}
@Override
public final void remove() {
- Cache cache = Assertions.checkNotNull(cacheDataSourceFactory.getCache());
- CacheKeyFactory cacheKeyFactory = cacheDataSourceFactory.getCacheKeyFactory();
CacheDataSource dataSource = cacheDataSourceFactory.createDataSourceForRemovingDownload();
try {
- M manifest = getManifest(dataSource, manifestDataSpec);
- List segments = getSegments(dataSource, manifest, true);
+ M manifest = getManifest(dataSource, manifestDataSpec, /* removing= */ true);
+ List segments = getSegments(dataSource, manifest, /* removing= */ true);
for (int i = 0; i < segments.size(); i++) {
cache.removeResource(cacheKeyFactory.buildCacheKey(segments.get(i).dataSpec));
}
- } catch (IOException e) {
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
// Ignore exceptions when removing.
} finally {
// Always attempt to remove the manifest.
@@ -232,34 +299,121 @@ public abstract class SegmentDownloader> impleme
/**
* Loads and parses a manifest.
*
- * @param dataSource The {@link DataSource} through which to load.
* @param dataSpec The manifest {@link DataSpec}.
- * @return The manifest.
- * @throws IOException If an error occurs reading data.
+ * @param removing Whether the manifest is being loaded as part of the download being removed.
+ * @return The loaded manifest.
+ * @throws InterruptedException If the thread on which the method is called is interrupted.
+ * @throws IOException If an error occurs during execution.
*/
- protected final M getManifest(DataSource dataSource, DataSpec dataSpec) throws IOException {
- return ParsingLoadable.load(dataSource, manifestParser, dataSpec, C.DATA_TYPE_MANIFEST);
+ protected final M getManifest(DataSource dataSource, DataSpec dataSpec, boolean removing)
+ throws InterruptedException, IOException {
+ return execute(
+ new RunnableFutureTask() {
+ @Override
+ protected M doWork() throws IOException {
+ return ParsingLoadable.load(dataSource, manifestParser, dataSpec, C.DATA_TYPE_MANIFEST);
+ }
+ },
+ removing);
}
/**
- * Returns a list of all downloadable {@link Segment}s for a given manifest.
+ * Executes the provided {@link RunnableFutureTask}.
+ *
+ * @param runnable The {@link RunnableFutureTask} to execute.
+ * @param removing Whether the execution is part of the download being removed.
+ * @return The result.
+ * @throws InterruptedException If the thread on which the method is called is interrupted.
+ * @throws IOException If an error occurs during execution.
+ */
+ protected final T execute(RunnableFutureTask runnable, boolean removing)
+ throws InterruptedException, IOException {
+ if (removing) {
+ runnable.run();
+ try {
+ return runnable.get();
+ } catch (ExecutionException e) {
+ Throwable cause = Assertions.checkNotNull(e.getCause());
+ if (cause instanceof IOException) {
+ throw (IOException) cause;
+ } else {
+ // The cause must be an uncaught Throwable type.
+ Util.sneakyThrow(e);
+ }
+ }
+ }
+ while (true) {
+ if (isCanceled) {
+ throw new InterruptedException();
+ }
+ // Block until there aren't any higher priority tasks.
+ if (priorityTaskManager != null) {
+ priorityTaskManager.proceed(C.PRIORITY_DOWNLOAD);
+ }
+ addActiveRunnable(runnable);
+ executor.execute(runnable);
+ try {
+ return runnable.get();
+ } catch (ExecutionException e) {
+ Throwable cause = Assertions.checkNotNull(e.getCause());
+ if (cause instanceof PriorityTooLowException) {
+ // The next loop iteration will block until the task is able to proceed.
+ } else if (cause instanceof IOException) {
+ throw (IOException) cause;
+ } else {
+ // The cause must be an uncaught Throwable type.
+ Util.sneakyThrow(e);
+ }
+ } finally {
+ // We don't want to return for as long as the runnable might still be doing work.
+ runnable.blockUntilFinished();
+ removeActiveRunnable(runnable);
+ }
+ }
+ }
+
+ /**
+ * Returns a list of all downloadable {@link Segment}s for a given manifest. Any required data
+ * should be loaded using {@link #getManifest} or {@link #execute}.
*
* @param dataSource The {@link DataSource} through which to load any required data.
* @param manifest The manifest containing the segments.
- * @param allowIncompleteList Whether to continue in the case that a load error prevents all
- * segments from being listed. If true then a partial segment list will be returned. If false
- * an {@link IOException} will be thrown.
+ * @param removing Whether the segments are being obtained as part of a removal. If true then a
+ * partial segment list is returned in the case that a load error prevents all segments from
+ * being listed. If false then an {@link IOException} will be thrown in this case.
* @return The list of downloadable {@link Segment}s.
- * @throws IOException Thrown if {@code allowPartialIndex} is false and a load error occurs, or if
- * the media is not in a form that allows for its segments to be listed.
+ * @throws IOException Thrown if {@code allowPartialIndex} is false and an execution error occurs,
+ * or if the media is not in a form that allows for its segments to be listed.
*/
- protected abstract List getSegments(
- DataSource dataSource, M manifest, boolean allowIncompleteList) throws IOException;
+ protected abstract List getSegments(DataSource dataSource, M manifest, boolean removing)
+ throws IOException, InterruptedException;
protected static DataSpec getCompressibleDataSpec(Uri uri) {
return new DataSpec.Builder().setUri(uri).setFlags(DataSpec.FLAG_ALLOW_GZIP).build();
}
+ private void addActiveRunnable(RunnableFutureTask runnable)
+ throws InterruptedException {
+ synchronized (activeRunnables) {
+ if (isCanceled) {
+ throw new InterruptedException();
+ }
+ activeRunnables.add(runnable);
+ }
+ }
+
+ private void removeActiveRunnable(RunnableFutureTask, ?> runnable) {
+ synchronized (activeRunnables) {
+ activeRunnables.remove(runnable);
+ }
+ }
+
+ private void removeActiveRunnable(int index) {
+ synchronized (activeRunnables) {
+ activeRunnables.remove(index);
+ }
+ }
+
private static void mergeSegments(List segments, CacheKeyFactory keyFactory) {
HashMap lastIndexByCacheKey = new HashMap<>();
int nextOutIndex = 0;
@@ -298,6 +452,47 @@ public abstract class SegmentDownloader> impleme
&& dataSpec1.httpRequestHeaders.equals(dataSpec2.httpRequestHeaders);
}
+ private static final class SegmentDownloadRunnable extends RunnableFutureTask {
+
+ public final Segment segment;
+ public final CacheDataSource dataSource;
+ @NullableType private final ProgressNotifier progressNotifier;
+ public final byte[] temporaryBuffer;
+ private final CacheWriter cacheWriter;
+
+ public SegmentDownloadRunnable(
+ Segment segment,
+ CacheDataSource dataSource,
+ @NullableType ProgressNotifier progressNotifier,
+ byte[] temporaryBuffer) {
+ this.segment = segment;
+ this.dataSource = dataSource;
+ this.progressNotifier = progressNotifier;
+ this.temporaryBuffer = temporaryBuffer;
+ this.cacheWriter =
+ new CacheWriter(
+ dataSource,
+ segment.dataSpec,
+ /* allowShortContent= */ false,
+ temporaryBuffer,
+ progressNotifier);
+ }
+
+ @Override
+ protected Void doWork() throws IOException {
+ cacheWriter.cache();
+ if (progressNotifier != null) {
+ progressNotifier.onSegmentDownloaded();
+ }
+ return null;
+ }
+
+ @Override
+ protected void cancelWork() {
+ cacheWriter.cancel();
+ }
+ }
+
private static final class ProgressNotifier implements CacheWriter.ProgressListener {
private final ProgressListener progressListener;
diff --git a/library/core/src/main/java/com/google/android/exoplayer2/upstream/cache/CacheDataSource.java b/library/core/src/main/java/com/google/android/exoplayer2/upstream/cache/CacheDataSource.java
index e1e2e5194b..7398ff58a2 100644
--- a/library/core/src/main/java/com/google/android/exoplayer2/upstream/cache/CacheDataSource.java
+++ b/library/core/src/main/java/com/google/android/exoplayer2/upstream/cache/CacheDataSource.java
@@ -376,8 +376,6 @@ public final class CacheDataSource implements DataSource {
@Nullable private final DataSource cacheWriteDataSource;
private final DataSource upstreamDataSource;
private final CacheKeyFactory cacheKeyFactory;
- @Nullable private final PriorityTaskManager upstreamPriorityTaskManager;
- private final int upstreamPriority;
@Nullable private final EventListener eventListener;
private final boolean blockOnCache;
@@ -513,8 +511,6 @@ public final class CacheDataSource implements DataSource {
this.ignoreCacheOnError = (flags & FLAG_IGNORE_CACHE_ON_ERROR) != 0;
this.ignoreCacheForUnsetLengthRequests =
(flags & FLAG_IGNORE_CACHE_FOR_UNSET_LENGTH_REQUESTS) != 0;
- this.upstreamPriority = upstreamPriority;
- this.upstreamPriorityTaskManager = upstreamPriorityTaskManager;
if (upstreamDataSource != null) {
if (upstreamPriorityTaskManager != null) {
upstreamDataSource =
@@ -543,24 +539,6 @@ public final class CacheDataSource implements DataSource {
return cacheKeyFactory;
}
- /**
- * Returns the {@link PriorityTaskManager} used when there's a cache miss and requests need to be
- * made to the upstream {@link DataSource}, or {@code null} if there is none.
- */
- @Nullable
- public PriorityTaskManager getUpstreamPriorityTaskManager() {
- return upstreamPriorityTaskManager;
- }
-
- /**
- * Returns the priority used when there's a cache miss and requests need to be made to the
- * upstream {@link DataSource}. The priority is only used if the source has a {@link
- * PriorityTaskManager}.
- */
- public int getUpstreamPriority() {
- return upstreamPriority;
- }
-
@Override
public void addTransferListener(TransferListener transferListener) {
cacheReadDataSource.addTransferListener(transferListener);
diff --git a/library/core/src/main/java/com/google/android/exoplayer2/upstream/cache/CacheWriter.java b/library/core/src/main/java/com/google/android/exoplayer2/upstream/cache/CacheWriter.java
index ee44b0dc51..8ea2b4e280 100644
--- a/library/core/src/main/java/com/google/android/exoplayer2/upstream/cache/CacheWriter.java
+++ b/library/core/src/main/java/com/google/android/exoplayer2/upstream/cache/CacheWriter.java
@@ -25,7 +25,6 @@ import com.google.android.exoplayer2.util.PriorityTaskManager.PriorityTooLowExce
import com.google.android.exoplayer2.util.Util;
import java.io.IOException;
import java.io.InterruptedIOException;
-import java.util.concurrent.atomic.AtomicBoolean;
/** Caching related utility methods. */
public final class CacheWriter {
@@ -52,7 +51,6 @@ public final class CacheWriter {
private final Cache cache;
private final DataSpec dataSpec;
private final boolean allowShortContent;
- private final AtomicBoolean isCanceled;
private final String cacheKey;
private final byte[] temporaryBuffer;
@Nullable private final ProgressListener progressListener;
@@ -62,6 +60,8 @@ public final class CacheWriter {
private long endPosition;
private long bytesCached;
+ private volatile boolean isCanceled;
+
/**
* @param dataSource A {@link CacheDataSource} that writes to the target cache.
* @param dataSpec Defines the data to be written.
@@ -69,9 +69,6 @@ public final class CacheWriter {
* defined by the {@link DataSpec}. If {@code true} and the request exceeds the length of the
* content, then the content will be cached to the end. If {@code false} and the request
* exceeds the length of the content, {@link #cache} will throw an {@link IOException}.
- * @param isCanceled An optional cancelation signal. If specified, {@link #cache} will check the
- * value of this signal frequently during caching. If the value is {@code true}, the operation
- * will be considered canceled and {@link #cache} will throw {@link InterruptedIOException}.
* @param temporaryBuffer A temporary buffer to be used during caching, or {@code null} if the
* writer should instantiate its own internal temporary buffer.
* @param progressListener An optional progress listener.
@@ -80,14 +77,12 @@ public final class CacheWriter {
CacheDataSource dataSource,
DataSpec dataSpec,
boolean allowShortContent,
- @Nullable AtomicBoolean isCanceled,
@Nullable byte[] temporaryBuffer,
@Nullable ProgressListener progressListener) {
this.dataSource = dataSource;
this.cache = dataSource.getCache();
this.dataSpec = dataSpec;
this.allowShortContent = allowShortContent;
- this.isCanceled = isCanceled == null ? new AtomicBoolean() : isCanceled;
this.temporaryBuffer =
temporaryBuffer == null ? new byte[DEFAULT_BUFFER_SIZE_BYTES] : temporaryBuffer;
this.progressListener = progressListener;
@@ -95,6 +90,15 @@ public final class CacheWriter {
nextPosition = dataSpec.position;
}
+ /**
+ * Cancels this writer's caching operation. {@link #cache} checks for cancelation frequently
+ * during execution, and throws an {@link InterruptedIOException} if it sees that the caching
+ * operation has been canceled.
+ */
+ public void cancel() {
+ isCanceled = true;
+ }
+
/**
* Caches the requested data, skipping any that's already cached.
*
@@ -230,7 +234,7 @@ public final class CacheWriter {
}
private void throwIfCanceled() throws InterruptedIOException {
- if (isCanceled.get()) {
+ if (isCanceled) {
throw new InterruptedIOException();
}
}
diff --git a/library/core/src/main/java/com/google/android/exoplayer2/util/RunnableFutureTask.java b/library/core/src/main/java/com/google/android/exoplayer2/util/RunnableFutureTask.java
new file mode 100644
index 0000000000..9f06f40a67
--- /dev/null
+++ b/library/core/src/main/java/com/google/android/exoplayer2/util/RunnableFutureTask.java
@@ -0,0 +1,172 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.android.exoplayer2.util;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import androidx.annotation.Nullable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A {@link RunnableFuture} that supports additional uninterruptible operations to query whether
+ * execution has started and finished.
+ *
+ * @param The type of the result.
+ * @param The type of any {@link ExecutionException} cause.
+ */
+public abstract class RunnableFutureTask implements RunnableFuture {
+
+ private final ConditionVariable started;
+ private final ConditionVariable finished;
+ private final Object cancelLock;
+
+ @Nullable private Exception exception;
+ @Nullable private R result;
+
+ @Nullable private Thread workThread;
+ private boolean canceled;
+
+ protected RunnableFutureTask() {
+ started = new ConditionVariable();
+ finished = new ConditionVariable();
+ cancelLock = new Object();
+ }
+
+ /** Blocks until the task has started, or has been canceled without having been started. */
+ public final void blockUntilStarted() {
+ started.blockUninterruptible();
+ }
+
+ /** Blocks until the task has finished, or has been canceled without having been started. */
+ public final void blockUntilFinished() {
+ finished.blockUninterruptible();
+ }
+
+ // Future implementation.
+
+ @Override
+ @UnknownNull
+ public final R get() throws ExecutionException, InterruptedException {
+ finished.block();
+ return getResult();
+ }
+
+ @Override
+ @UnknownNull
+ public final R get(long timeout, TimeUnit unit)
+ throws ExecutionException, InterruptedException, TimeoutException {
+ long timeoutMs = MILLISECONDS.convert(timeout, unit);
+ if (!finished.block(timeoutMs)) {
+ throw new TimeoutException();
+ }
+ return getResult();
+ }
+
+ @Override
+ public final boolean cancel(boolean interruptIfRunning) {
+ synchronized (cancelLock) {
+ if (canceled || finished.isOpen()) {
+ return false;
+ }
+ canceled = true;
+ cancelWork();
+ @Nullable Thread workThread = this.workThread;
+ if (workThread != null) {
+ if (interruptIfRunning) {
+ workThread.interrupt();
+ }
+ } else {
+ started.open();
+ finished.open();
+ }
+ return true;
+ }
+ }
+
+ @Override
+ public final boolean isDone() {
+ return finished.isOpen();
+ }
+
+ @Override
+ public final boolean isCancelled() {
+ return canceled;
+ }
+
+ // Runnable implementation.
+
+ @Override
+ public final void run() {
+ synchronized (cancelLock) {
+ if (canceled) {
+ return;
+ }
+ workThread = Thread.currentThread();
+ }
+ started.open();
+ try {
+ result = doWork();
+ } catch (Exception e) {
+ // Must be an instance of E or RuntimeException.
+ exception = e;
+ } finally {
+ synchronized (cancelLock) {
+ finished.open();
+ workThread = null;
+ // Clear the interrupted flag if set, to avoid it leaking into any subsequent tasks executed
+ // using the calling thread.
+ Thread.interrupted();
+ }
+ }
+ }
+
+ // Internal methods.
+
+ /**
+ * Performs the work or computation.
+ *
+ * @return The computed result.
+ * @throws E If an error occurred.
+ */
+ @UnknownNull
+ protected abstract R doWork() throws E;
+
+ /**
+ * Cancels any work being done by {@link #doWork()}. If {@link #doWork()} is currently executing
+ * then the thread on which it's executing may be interrupted immediately after this method
+ * returns.
+ *
+ *
The default implementation does nothing.
+ */
+ protected void cancelWork() {
+ // Do nothing.
+ }
+
+ @SuppressWarnings("return.type.incompatible")
+ @UnknownNull
+ private R getResult() throws ExecutionException {
+ if (canceled) {
+ throw new CancellationException();
+ } else if (exception != null) {
+ throw new ExecutionException(exception);
+ }
+ return result;
+ }
+}
diff --git a/library/core/src/test/java/com/google/android/exoplayer2/upstream/cache/CacheDataSourceTest.java b/library/core/src/test/java/com/google/android/exoplayer2/upstream/cache/CacheDataSourceTest.java
index 328d80bf48..652a5643a7 100644
--- a/library/core/src/test/java/com/google/android/exoplayer2/upstream/cache/CacheDataSourceTest.java
+++ b/library/core/src/test/java/com/google/android/exoplayer2/upstream/cache/CacheDataSourceTest.java
@@ -362,7 +362,6 @@ public final class CacheDataSourceTest {
new CacheDataSource(cache, upstream2),
unboundedDataSpec,
/* allowShortContent= */ false,
- /* isCanceled= */ null,
/* temporaryBuffer= */ null,
/* progressListener= */ null);
cacheWriter.cache();
@@ -413,7 +412,6 @@ public final class CacheDataSourceTest {
new CacheDataSource(cache, upstream2),
unboundedDataSpec,
/* allowShortContent= */ false,
- /* isCanceled= */ null,
/* temporaryBuffer= */ null,
/* progressListener= */ null);
cacheWriter.cache();
@@ -439,7 +437,6 @@ public final class CacheDataSourceTest {
new CacheDataSource(cache, upstream),
dataSpec,
/* allowShortContent= */ false,
- /* isCanceled= */ null,
/* temporaryBuffer= */ null,
/* progressListener= */ null);
cacheWriter.cache();
@@ -477,7 +474,6 @@ public final class CacheDataSourceTest {
new CacheDataSource(cache, upstream),
dataSpec,
/* allowShortContent= */ false,
- /* isCanceled= */ null,
/* temporaryBuffer= */ null,
/* progressListener= */ null);
cacheWriter.cache();
diff --git a/library/core/src/test/java/com/google/android/exoplayer2/upstream/cache/CacheWriterTest.java b/library/core/src/test/java/com/google/android/exoplayer2/upstream/cache/CacheWriterTest.java
index 3e5cb119fd..d0cc42b062 100644
--- a/library/core/src/test/java/com/google/android/exoplayer2/upstream/cache/CacheWriterTest.java
+++ b/library/core/src/test/java/com/google/android/exoplayer2/upstream/cache/CacheWriterTest.java
@@ -116,7 +116,6 @@ public final class CacheWriterTest {
new CacheDataSource(cache, dataSource),
new DataSpec(Uri.parse("test_data")),
/* allowShortContent= */ false,
- /* isCanceled= */ null,
/* temporaryBuffer= */ null,
counters);
cacheWriter.cache();
@@ -139,7 +138,6 @@ public final class CacheWriterTest {
new CacheDataSource(cache, dataSource),
dataSpec,
/* allowShortContent= */ false,
- /* isCanceled= */ null,
/* temporaryBuffer= */ null,
counters);
cacheWriter.cache();
@@ -152,7 +150,6 @@ public final class CacheWriterTest {
new CacheDataSource(cache, dataSource),
new DataSpec(testUri),
/* allowShortContent= */ false,
- /* isCanceled= */ null,
/* temporaryBuffer= */ null,
counters);
cacheWriter.cache();
@@ -176,7 +173,6 @@ public final class CacheWriterTest {
new CacheDataSource(cache, dataSource),
dataSpec,
/* allowShortContent= */ false,
- /* isCanceled= */ null,
/* temporaryBuffer= */ null,
counters);
cacheWriter.cache();
@@ -201,7 +197,6 @@ public final class CacheWriterTest {
new CacheDataSource(cache, dataSource),
dataSpec,
/* allowShortContent= */ false,
- /* isCanceled= */ null,
/* temporaryBuffer= */ null,
counters);
cacheWriter.cache();
@@ -214,7 +209,6 @@ public final class CacheWriterTest {
new CacheDataSource(cache, dataSource),
new DataSpec(testUri),
/* allowShortContent= */ false,
- /* isCanceled= */ null,
/* temporaryBuffer= */ null,
counters);
cacheWriter.cache();
@@ -237,7 +231,6 @@ public final class CacheWriterTest {
new CacheDataSource(cache, dataSource),
dataSpec,
/* allowShortContent= */ true,
- /* isCanceled= */ null,
/* temporaryBuffer= */ null,
counters);
cacheWriter.cache();
@@ -262,7 +255,6 @@ public final class CacheWriterTest {
new CacheDataSource(cache, dataSource),
dataSpec,
/* allowShortContent= */ false,
- /* isCanceled= */ null,
/* temporaryBuffer= */ null,
/* progressListener= */ null)
.cache());
@@ -288,7 +280,6 @@ public final class CacheWriterTest {
new CacheDataSource(cache, dataSource),
new DataSpec(Uri.parse("test_data")),
/* allowShortContent= */ false,
- /* isCanceled= */ null,
/* temporaryBuffer= */ null,
counters);
cacheWriter.cache();
diff --git a/library/core/src/test/java/com/google/android/exoplayer2/util/RunnableFutureTaskTest.java b/library/core/src/test/java/com/google/android/exoplayer2/util/RunnableFutureTaskTest.java
new file mode 100644
index 0000000000..9a8aac3020
--- /dev/null
+++ b/library/core/src/test/java/com/google/android/exoplayer2/util/RunnableFutureTaskTest.java
@@ -0,0 +1,302 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.android.exoplayer2.util;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertThrows;
+
+import androidx.test.ext.junit.runners.AndroidJUnit4;
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+/** Unit tests for {@link RunnableFutureTask}. */
+@RunWith(AndroidJUnit4.class)
+public class RunnableFutureTaskTest {
+
+ @Test
+ public void blockUntilStarted_ifNotStarted_blocks() throws InterruptedException {
+ RunnableFutureTask task =
+ new RunnableFutureTask() {
+ @Override
+ protected Void doWork() {
+ return null;
+ }
+ };
+
+ AtomicBoolean blockUntilStartedReturned = new AtomicBoolean();
+ Thread testThread =
+ new Thread() {
+ @Override
+ public void run() {
+ task.blockUntilStarted();
+ blockUntilStartedReturned.set(true);
+ }
+ };
+ testThread.start();
+
+ Thread.sleep(1000);
+ assertThat(blockUntilStartedReturned.get()).isFalse();
+
+ // Thread cleanup.
+ task.run();
+ testThread.join();
+ }
+
+ @Test(timeout = 1000)
+ public void blockUntilStarted_ifStarted_unblocks() throws InterruptedException {
+ ConditionVariable finish = new ConditionVariable();
+ RunnableFutureTask task =
+ new RunnableFutureTask() {
+ @Override
+ protected Void doWork() {
+ finish.blockUninterruptible();
+ return null;
+ }
+ };
+ Thread testThread = new Thread(task);
+ testThread.start();
+ task.blockUntilStarted(); // Should unblock.
+
+ // Thread cleanup.
+ finish.open();
+ testThread.join();
+ }
+
+ @Test(timeout = 1000)
+ public void blockUntilStarted_ifCanceled_unblocks() {
+ RunnableFutureTask task =
+ new RunnableFutureTask() {
+ @Override
+ protected Void doWork() {
+ return null;
+ }
+ };
+
+ task.cancel(/* interruptIfRunning= */ false);
+
+ // Should not block.
+ task.blockUntilStarted();
+ }
+
+ @Test
+ public void blockUntilFinished_ifNotFinished_blocks() throws InterruptedException {
+ ConditionVariable finish = new ConditionVariable();
+ RunnableFutureTask task =
+ new RunnableFutureTask() {
+ @Override
+ protected Void doWork() {
+ finish.blockUninterruptible();
+ return null;
+ }
+ };
+ Thread testThread1 = new Thread(task);
+ testThread1.start();
+
+ AtomicBoolean blockUntilFinishedReturned = new AtomicBoolean();
+ Thread testThread2 =
+ new Thread() {
+ @Override
+ public void run() {
+ task.blockUntilFinished();
+ blockUntilFinishedReturned.set(true);
+ }
+ };
+ testThread2.start();
+
+ Thread.sleep(1000);
+ assertThat(blockUntilFinishedReturned.get()).isFalse();
+
+ // Thread cleanup.
+ finish.open();
+ testThread1.join();
+ testThread2.join();
+ }
+
+ @Test(timeout = 1000)
+ public void blockUntilFinished_ifFinished_unblocks() throws InterruptedException {
+ RunnableFutureTask task =
+ new RunnableFutureTask() {
+ @Override
+ protected Void doWork() {
+ return null;
+ }
+ };
+ Thread testThread = new Thread(task);
+ testThread.start();
+
+ task.blockUntilFinished();
+ assertThat(task.isDone()).isTrue();
+
+ // Thread cleanup.
+ testThread.join();
+ }
+
+ @Test(timeout = 1000)
+ public void blockUntilFinished_ifCanceled_unblocks() {
+ RunnableFutureTask task =
+ new RunnableFutureTask() {
+ @Override
+ protected Void doWork() {
+ return null;
+ }
+ };
+
+ task.cancel(/* interruptIfRunning= */ false);
+
+ // Should not block.
+ task.blockUntilFinished();
+ }
+
+ @Test
+ public void get_ifNotFinished_blocks() throws InterruptedException {
+ ConditionVariable finish = new ConditionVariable();
+ RunnableFutureTask task =
+ new RunnableFutureTask() {
+ @Override
+ protected Void doWork() {
+ finish.blockUninterruptible();
+ return null;
+ }
+ };
+ Thread testThread1 = new Thread(task);
+ testThread1.start();
+
+ AtomicBoolean blockUntilGetResultReturned = new AtomicBoolean();
+ Thread testThread2 =
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ task.get();
+ } catch (ExecutionException | InterruptedException e) {
+ // Do nothing.
+ } finally {
+ blockUntilGetResultReturned.set(true);
+ }
+ }
+ };
+ testThread2.start();
+
+ Thread.sleep(1000);
+ assertThat(blockUntilGetResultReturned.get()).isFalse();
+
+ // Thread cleanup.
+ finish.open();
+ testThread1.join();
+ testThread2.join();
+ }
+
+ @Test(timeout = 1000)
+ public void get_returnsResult() throws ExecutionException, InterruptedException {
+ Object result = new Object();
+ RunnableFutureTask