From c9717f67ea7ccd647188210c83ec582e98e4c5c7 Mon Sep 17 00:00:00 2001 From: olly Date: Sun, 28 Jun 2020 18:28:48 +0100 Subject: [PATCH] Push all Downloader networking onto the executor Issue: Issue: #6978 PiperOrigin-RevId: 318710782 --- .../offline/DefaultDownloaderFactory.java | 5 +- .../exoplayer2/offline/DownloadManager.java | 2 +- .../exoplayer2/offline/Downloader.java | 25 +- .../offline/ProgressiveDownloader.java | 103 +++--- .../exoplayer2/offline/SegmentDownloader.java | 301 ++++++++++++++--- .../upstream/cache/CacheDataSource.java | 22 -- .../upstream/cache/CacheWriter.java | 20 +- .../exoplayer2/util/RunnableFutureTask.java | 172 ++++++++++ .../upstream/cache/CacheDataSourceTest.java | 4 - .../upstream/cache/CacheWriterTest.java | 9 - .../util/RunnableFutureTaskTest.java | 302 ++++++++++++++++++ .../source/dash/offline/DashDownloader.java | 39 ++- .../source/hls/offline/HlsDownloader.java | 8 +- 13 files changed, 851 insertions(+), 161 deletions(-) create mode 100644 library/core/src/main/java/com/google/android/exoplayer2/util/RunnableFutureTask.java create mode 100644 library/core/src/test/java/com/google/android/exoplayer2/util/RunnableFutureTaskTest.java diff --git a/library/core/src/main/java/com/google/android/exoplayer2/offline/DefaultDownloaderFactory.java b/library/core/src/main/java/com/google/android/exoplayer2/offline/DefaultDownloaderFactory.java index 0b7434c339..67e2bd8c77 100644 --- a/library/core/src/main/java/com/google/android/exoplayer2/offline/DefaultDownloaderFactory.java +++ b/library/core/src/main/java/com/google/android/exoplayer2/offline/DefaultDownloaderFactory.java @@ -18,6 +18,7 @@ package com.google.android.exoplayer2.offline; import android.net.Uri; import androidx.annotation.Nullable; import com.google.android.exoplayer2.upstream.cache.CacheDataSource; +import com.google.android.exoplayer2.util.Assertions; import java.lang.reflect.Constructor; import java.util.List; import java.util.concurrent.Executor; @@ -94,8 +95,8 @@ public class DefaultDownloaderFactory implements DownloaderFactory { */ public DefaultDownloaderFactory( CacheDataSource.Factory cacheDataSourceFactory, Executor executor) { - this.cacheDataSourceFactory = cacheDataSourceFactory; - this.executor = executor; + this.cacheDataSourceFactory = Assertions.checkNotNull(cacheDataSourceFactory); + this.executor = Assertions.checkNotNull(executor); } @Override diff --git a/library/core/src/main/java/com/google/android/exoplayer2/offline/DownloadManager.java b/library/core/src/main/java/com/google/android/exoplayer2/offline/DownloadManager.java index 50df4a0e8a..5b80b64ad8 100644 --- a/library/core/src/main/java/com/google/android/exoplayer2/offline/DownloadManager.java +++ b/library/core/src/main/java/com/google/android/exoplayer2/offline/DownloadManager.java @@ -1331,7 +1331,7 @@ public final class DownloadManager { } } } catch (InterruptedException e) { - // The task was canceled. Do nothing. + Thread.currentThread().interrupt(); } catch (Exception e) { finalException = e; } diff --git a/library/core/src/main/java/com/google/android/exoplayer2/offline/Downloader.java b/library/core/src/main/java/com/google/android/exoplayer2/offline/Downloader.java index 56f8c0ce8d..8e51bf685e 100644 --- a/library/core/src/main/java/com/google/android/exoplayer2/offline/Downloader.java +++ b/library/core/src/main/java/com/google/android/exoplayer2/offline/Downloader.java @@ -18,6 +18,7 @@ package com.google.android.exoplayer2.offline; import androidx.annotation.Nullable; import com.google.android.exoplayer2.C; import java.io.IOException; +import java.util.concurrent.CancellationException; /** Downloads and removes a piece of content. */ public interface Downloader { @@ -44,15 +45,29 @@ public interface Downloader { /** * Downloads the content. * + *

If downloading fails, this method can be called again to resume the download. It cannot be + * called again after the download has been {@link #cancel canceled}. + * + *

If downloading is canceled whilst this method is executing, then it is expected that it will + * return reasonably quickly. However, there are no guarantees about how the method will return, + * meaning that it can return without throwing, or by throwing any of its documented exceptions. + * The caller must use its own knowledge about whether downloading has been canceled to determine + * whether this is why the method has returned, rather than relying on the method returning in a + * particular way. + * * @param progressListener A listener to receive progress updates, or {@code null}. - * @throws DownloadException Thrown if the content cannot be downloaded. - * @throws IOException If the download did not complete successfully. + * @throws IOException If the download failed to complete successfully. + * @throws InterruptedException If the download was interrupted. + * @throws CancellationException If the download was canceled. */ - void download(@Nullable ProgressListener progressListener) throws IOException; + void download(@Nullable ProgressListener progressListener) + throws IOException, InterruptedException; /** - * Cancels the download operation and prevents future download operations from running. The caller - * should also interrupt the downloading thread immediately after calling this method. + * Permanently cancels the downloading by this downloader. The caller should also interrupt the + * downloading thread immediately after calling this method. + * + *

Once canceled, {@link #download} cannot be called again. */ void cancel(); diff --git a/library/core/src/main/java/com/google/android/exoplayer2/offline/ProgressiveDownloader.java b/library/core/src/main/java/com/google/android/exoplayer2/offline/ProgressiveDownloader.java index dd251dad26..09fa444cf3 100644 --- a/library/core/src/main/java/com/google/android/exoplayer2/offline/ProgressiveDownloader.java +++ b/library/core/src/main/java/com/google/android/exoplayer2/offline/ProgressiveDownloader.java @@ -25,16 +25,24 @@ import com.google.android.exoplayer2.upstream.cache.CacheWriter; import com.google.android.exoplayer2.util.Assertions; import com.google.android.exoplayer2.util.PriorityTaskManager; import com.google.android.exoplayer2.util.PriorityTaskManager.PriorityTooLowException; +import com.google.android.exoplayer2.util.RunnableFutureTask; +import com.google.android.exoplayer2.util.Util; import java.io.IOException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; /** A downloader for progressive media streams. */ public final class ProgressiveDownloader implements Downloader { + private final Executor executor; private final DataSpec dataSpec; private final CacheDataSource dataSource; - private final AtomicBoolean isCanceled; + @Nullable private final PriorityTaskManager priorityTaskManager; + + @Nullable private ProgressListener progressListener; + private volatile @MonotonicNonNull RunnableFutureTask downloadRunnable; + private volatile boolean isCanceled; /** @deprecated Use {@link #ProgressiveDownloader(MediaItem, CacheDataSource.Factory)} instead. */ @SuppressWarnings("deprecation") @@ -84,6 +92,7 @@ public final class ProgressiveDownloader implements Downloader { */ public ProgressiveDownloader( MediaItem mediaItem, CacheDataSource.Factory cacheDataSourceFactory, Executor executor) { + this.executor = Assertions.checkNotNull(executor); Assertions.checkNotNull(mediaItem.playbackProperties); dataSpec = new DataSpec.Builder() @@ -92,40 +101,65 @@ public final class ProgressiveDownloader implements Downloader { .setFlags(DataSpec.FLAG_ALLOW_CACHE_FRAGMENTATION) .build(); dataSource = cacheDataSourceFactory.createDataSourceForDownloading(); - isCanceled = new AtomicBoolean(); + priorityTaskManager = cacheDataSourceFactory.getUpstreamPriorityTaskManager(); } @Override - public void download(@Nullable ProgressListener progressListener) throws IOException { - CacheWriter cacheWriter = - new CacheWriter( - dataSource, - dataSpec, - /* allowShortContent= */ false, - isCanceled, - /* temporaryBuffer= */ null, - progressListener == null ? null : new ProgressForwarder(progressListener)); + public void download(@Nullable ProgressListener progressListener) + throws IOException, InterruptedException { + this.progressListener = progressListener; + if (downloadRunnable == null) { + CacheWriter cacheWriter = + new CacheWriter( + dataSource, + dataSpec, + /* allowShortContent= */ false, + /* temporaryBuffer= */ null, + this::onProgress); + downloadRunnable = + new RunnableFutureTask() { + @Override + protected Void doWork() throws IOException { + cacheWriter.cache(); + return null; + } + + @Override + protected void cancelWork() { + cacheWriter.cancel(); + } + }; + } - @Nullable PriorityTaskManager priorityTaskManager = dataSource.getUpstreamPriorityTaskManager(); if (priorityTaskManager != null) { priorityTaskManager.add(C.PRIORITY_DOWNLOAD); } try { boolean finished = false; - while (!finished && !isCanceled.get()) { + while (!finished && !isCanceled) { if (priorityTaskManager != null) { - priorityTaskManager.proceed(dataSource.getUpstreamPriority()); + priorityTaskManager.proceed(C.PRIORITY_DOWNLOAD); } + executor.execute(downloadRunnable); try { - cacheWriter.cache(); + downloadRunnable.get(); finished = true; - } catch (PriorityTooLowException e) { - // The next loop iteration will block until the task is able to proceed. + } catch (ExecutionException e) { + Throwable cause = Assertions.checkNotNull(e.getCause()); + if (cause instanceof PriorityTooLowException) { + // The next loop iteration will block until the task is able to proceed. + } else if (cause instanceof IOException) { + throw (IOException) cause; + } else { + // The cause must be an uncaught Throwable type. + Util.sneakyThrow(cause); + } } } - } catch (InterruptedException e) { - // The download was canceled. } finally { + // If the main download thread was interrupted as part of cancelation, then it's possible that + // the runnable is still doing work. We need to wait until it's finished before returning. + downloadRunnable.blockUntilFinished(); if (priorityTaskManager != null) { priorityTaskManager.remove(C.PRIORITY_DOWNLOAD); } @@ -134,7 +168,11 @@ public final class ProgressiveDownloader implements Downloader { @Override public void cancel() { - isCanceled.set(true); + isCanceled = true; + RunnableFutureTask downloadRunnable = this.downloadRunnable; + if (downloadRunnable != null) { + downloadRunnable.cancel(/* interruptIfRunning= */ true); + } } @Override @@ -142,21 +180,14 @@ public final class ProgressiveDownloader implements Downloader { dataSource.getCache().removeResource(dataSource.getCacheKeyFactory().buildCacheKey(dataSpec)); } - private static final class ProgressForwarder implements CacheWriter.ProgressListener { - - private final ProgressListener progressListener; - - public ProgressForwarder(ProgressListener progressListener) { - this.progressListener = progressListener; - } - - @Override - public void onProgress(long contentLength, long bytesCached, long newBytesCached) { - float percentDownloaded = - contentLength == C.LENGTH_UNSET || contentLength == 0 - ? C.PERCENTAGE_UNSET - : ((bytesCached * 100f) / contentLength); - progressListener.onProgress(contentLength, bytesCached, percentDownloaded); + private void onProgress(long contentLength, long bytesCached, long newBytesCached) { + if (progressListener == null) { + return; } + float percentDownloaded = + contentLength == C.LENGTH_UNSET || contentLength == 0 + ? C.PERCENTAGE_UNSET + : ((bytesCached * 100f) / contentLength); + progressListener.onProgress(contentLength, bytesCached, percentDownloaded); } } diff --git a/library/core/src/main/java/com/google/android/exoplayer2/offline/SegmentDownloader.java b/library/core/src/main/java/com/google/android/exoplayer2/offline/SegmentDownloader.java index 7360b65f70..d824dfd1e3 100644 --- a/library/core/src/main/java/com/google/android/exoplayer2/offline/SegmentDownloader.java +++ b/library/core/src/main/java/com/google/android/exoplayer2/offline/SegmentDownloader.java @@ -33,14 +33,17 @@ import com.google.android.exoplayer2.upstream.cache.ContentMetadata; import com.google.android.exoplayer2.util.Assertions; import com.google.android.exoplayer2.util.PriorityTaskManager; import com.google.android.exoplayer2.util.PriorityTaskManager.PriorityTooLowException; +import com.google.android.exoplayer2.util.RunnableFutureTask; import com.google.android.exoplayer2.util.Util; import java.io.IOException; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; +import org.checkerframework.checker.nullness.compatqual.NullableType; /** * Base class for multi segment stream downloaders. @@ -77,8 +80,22 @@ public abstract class SegmentDownloader> impleme private final Parser manifestParser; private final ArrayList streamKeys; private final CacheDataSource.Factory cacheDataSourceFactory; + private final Cache cache; + private final CacheKeyFactory cacheKeyFactory; + @Nullable private final PriorityTaskManager priorityTaskManager; private final Executor executor; - private final AtomicBoolean isCanceled; + + /** + * The currently active runnables. + * + *

Note: Only the {@link #download} thread is permitted to modify this list. Modifications, as + * well as the iteration on the {@link #cancel} thread, must be synchronized on the instance for + * thread safety. Iterations on the {@link #download} thread do not need to be synchronized, and + * should not be synchronized because doing so can erroneously block {@link #cancel}. + */ + private final ArrayList> activeRunnables; + + private volatile boolean isCanceled; /** * @param mediaItem The {@link MediaItem} to be downloaded. @@ -100,28 +117,31 @@ public abstract class SegmentDownloader> impleme this.streamKeys = new ArrayList<>(mediaItem.playbackProperties.streamKeys); this.cacheDataSourceFactory = cacheDataSourceFactory; this.executor = executor; - isCanceled = new AtomicBoolean(); + cache = Assertions.checkNotNull(cacheDataSourceFactory.getCache()); + cacheKeyFactory = cacheDataSourceFactory.getCacheKeyFactory(); + priorityTaskManager = cacheDataSourceFactory.getUpstreamPriorityTaskManager(); + activeRunnables = new ArrayList<>(); } @Override - public final void download(@Nullable ProgressListener progressListener) throws IOException { - @Nullable - PriorityTaskManager priorityTaskManager = - cacheDataSourceFactory.getUpstreamPriorityTaskManager(); + public final void download(@Nullable ProgressListener progressListener) + throws IOException, InterruptedException { + ArrayDeque pendingSegments = new ArrayDeque<>(); + ArrayDeque recycledRunnables = new ArrayDeque<>(); if (priorityTaskManager != null) { priorityTaskManager.add(C.PRIORITY_DOWNLOAD); } try { - Cache cache = Assertions.checkNotNull(cacheDataSourceFactory.getCache()); - CacheKeyFactory cacheKeyFactory = cacheDataSourceFactory.getCacheKeyFactory(); CacheDataSource dataSource = cacheDataSourceFactory.createDataSourceForDownloading(); - // Get the manifest and all of the segments. - M manifest = getManifest(dataSource, manifestDataSpec); + M manifest = getManifest(dataSource, manifestDataSpec, /* removing= */ false); if (!streamKeys.isEmpty()) { manifest = manifest.copy(streamKeys); } - List segments = getSegments(dataSource, manifest, /* allowIncompleteList= */ false); + List segments = getSegments(dataSource, manifest, /* removing= */ false); + + // Sort the segments so that we download media in the right order from the start of the + // content, and merge segments where possible to minimize the number of server round trips. Collections.sort(segments); mergeSegments(segments, cacheKeyFactory); @@ -169,34 +189,76 @@ public abstract class SegmentDownloader> impleme bytesDownloaded, segmentsDownloaded) : null; - byte[] temporaryBuffer = new byte[BUFFER_SIZE_BYTES]; - int segmentIndex = 0; - while (!isCanceled.get() && segmentIndex < segments.size()) { + pendingSegments.addAll(segments); + while (!isCanceled && !pendingSegments.isEmpty()) { + // Block until there aren't any higher priority tasks. if (priorityTaskManager != null) { - priorityTaskManager.proceed(dataSource.getUpstreamPriority()); + priorityTaskManager.proceed(C.PRIORITY_DOWNLOAD); } - CacheWriter cacheWriter = - new CacheWriter( - dataSource, - segments.get(segmentIndex).dataSpec, - /* allowShortContent= */ false, - isCanceled, - temporaryBuffer, - progressNotifier); - try { - cacheWriter.cache(); - segmentIndex++; - if (progressNotifier != null) { - progressNotifier.onSegmentDownloaded(); + + // Create and execute a runnable to download the next segment. + CacheDataSource segmentDataSource; + byte[] temporaryBuffer; + if (!recycledRunnables.isEmpty()) { + SegmentDownloadRunnable recycledRunnable = recycledRunnables.removeFirst(); + segmentDataSource = recycledRunnable.dataSource; + temporaryBuffer = recycledRunnable.temporaryBuffer; + } else { + segmentDataSource = cacheDataSourceFactory.createDataSourceForDownloading(); + temporaryBuffer = new byte[BUFFER_SIZE_BYTES]; + } + Segment segment = pendingSegments.removeFirst(); + SegmentDownloadRunnable downloadRunnable = + new SegmentDownloadRunnable( + segment, segmentDataSource, progressNotifier, temporaryBuffer); + addActiveRunnable(downloadRunnable); + executor.execute(downloadRunnable); + + // Clean up runnables that have finished. + for (int j = activeRunnables.size() - 1; j >= 0; j--) { + SegmentDownloadRunnable activeRunnable = (SegmentDownloadRunnable) activeRunnables.get(j); + // Only block until the runnable has finished if we don't have any more pending segments + // to start. If we do have pending segments to start then only process the runnable if + // it's already finished. + if (pendingSegments.isEmpty() || activeRunnable.isDone()) { + try { + activeRunnable.get(); + removeActiveRunnable(j); + recycledRunnables.addLast(activeRunnable); + } catch (ExecutionException e) { + Throwable cause = Assertions.checkNotNull(e.getCause()); + if (cause instanceof PriorityTooLowException) { + // We need to schedule this segment again in a future loop iteration. + pendingSegments.addFirst(activeRunnable.segment); + removeActiveRunnable(j); + recycledRunnables.addLast(activeRunnable); + } else if (cause instanceof IOException) { + throw (IOException) cause; + } else { + // The cause must be an uncaught Throwable type. + Util.sneakyThrow(cause); + } + } } - } catch (PriorityTooLowException e) { - // The next loop iteration will block until the task is able to proceed, then try and - // download the same segment again. } + + // Don't move on to the next segment until the runnable for this segment has started. This + // drip feeds runnables to the executor, rather than providing them all up front. + downloadRunnable.blockUntilStarted(); } - } catch (InterruptedException e) { - // The download was canceled. } finally { + // If one of the runnables has thrown an exception, then it's possible there are other active + // runnables still doing work. We need to wait until they finish before exiting this method. + // Cancel them to speed this up. + for (int i = 0; i < activeRunnables.size(); i++) { + activeRunnables.get(i).cancel(/* interruptIfRunning= */ true); + } + // Wait until the runnables have finished. In addition to the failure case, we also need to + // do this for the case where the main download thread was interrupted as part of cancelation. + for (int i = activeRunnables.size() - 1; i >= 0; i--) { + activeRunnables.get(i).blockUntilFinished(); + removeActiveRunnable(i); + } if (priorityTaskManager != null) { priorityTaskManager.remove(C.PRIORITY_DOWNLOAD); } @@ -205,21 +267,26 @@ public abstract class SegmentDownloader> impleme @Override public void cancel() { - isCanceled.set(true); + synchronized (activeRunnables) { + isCanceled = true; + for (int i = 0; i < activeRunnables.size(); i++) { + activeRunnables.get(i).cancel(/* interruptIfRunning= */ true); + } + } } @Override public final void remove() { - Cache cache = Assertions.checkNotNull(cacheDataSourceFactory.getCache()); - CacheKeyFactory cacheKeyFactory = cacheDataSourceFactory.getCacheKeyFactory(); CacheDataSource dataSource = cacheDataSourceFactory.createDataSourceForRemovingDownload(); try { - M manifest = getManifest(dataSource, manifestDataSpec); - List segments = getSegments(dataSource, manifest, true); + M manifest = getManifest(dataSource, manifestDataSpec, /* removing= */ true); + List segments = getSegments(dataSource, manifest, /* removing= */ true); for (int i = 0; i < segments.size(); i++) { cache.removeResource(cacheKeyFactory.buildCacheKey(segments.get(i).dataSpec)); } - } catch (IOException e) { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { // Ignore exceptions when removing. } finally { // Always attempt to remove the manifest. @@ -232,34 +299,121 @@ public abstract class SegmentDownloader> impleme /** * Loads and parses a manifest. * - * @param dataSource The {@link DataSource} through which to load. * @param dataSpec The manifest {@link DataSpec}. - * @return The manifest. - * @throws IOException If an error occurs reading data. + * @param removing Whether the manifest is being loaded as part of the download being removed. + * @return The loaded manifest. + * @throws InterruptedException If the thread on which the method is called is interrupted. + * @throws IOException If an error occurs during execution. */ - protected final M getManifest(DataSource dataSource, DataSpec dataSpec) throws IOException { - return ParsingLoadable.load(dataSource, manifestParser, dataSpec, C.DATA_TYPE_MANIFEST); + protected final M getManifest(DataSource dataSource, DataSpec dataSpec, boolean removing) + throws InterruptedException, IOException { + return execute( + new RunnableFutureTask() { + @Override + protected M doWork() throws IOException { + return ParsingLoadable.load(dataSource, manifestParser, dataSpec, C.DATA_TYPE_MANIFEST); + } + }, + removing); } /** - * Returns a list of all downloadable {@link Segment}s for a given manifest. + * Executes the provided {@link RunnableFutureTask}. + * + * @param runnable The {@link RunnableFutureTask} to execute. + * @param removing Whether the execution is part of the download being removed. + * @return The result. + * @throws InterruptedException If the thread on which the method is called is interrupted. + * @throws IOException If an error occurs during execution. + */ + protected final T execute(RunnableFutureTask runnable, boolean removing) + throws InterruptedException, IOException { + if (removing) { + runnable.run(); + try { + return runnable.get(); + } catch (ExecutionException e) { + Throwable cause = Assertions.checkNotNull(e.getCause()); + if (cause instanceof IOException) { + throw (IOException) cause; + } else { + // The cause must be an uncaught Throwable type. + Util.sneakyThrow(e); + } + } + } + while (true) { + if (isCanceled) { + throw new InterruptedException(); + } + // Block until there aren't any higher priority tasks. + if (priorityTaskManager != null) { + priorityTaskManager.proceed(C.PRIORITY_DOWNLOAD); + } + addActiveRunnable(runnable); + executor.execute(runnable); + try { + return runnable.get(); + } catch (ExecutionException e) { + Throwable cause = Assertions.checkNotNull(e.getCause()); + if (cause instanceof PriorityTooLowException) { + // The next loop iteration will block until the task is able to proceed. + } else if (cause instanceof IOException) { + throw (IOException) cause; + } else { + // The cause must be an uncaught Throwable type. + Util.sneakyThrow(e); + } + } finally { + // We don't want to return for as long as the runnable might still be doing work. + runnable.blockUntilFinished(); + removeActiveRunnable(runnable); + } + } + } + + /** + * Returns a list of all downloadable {@link Segment}s for a given manifest. Any required data + * should be loaded using {@link #getManifest} or {@link #execute}. * * @param dataSource The {@link DataSource} through which to load any required data. * @param manifest The manifest containing the segments. - * @param allowIncompleteList Whether to continue in the case that a load error prevents all - * segments from being listed. If true then a partial segment list will be returned. If false - * an {@link IOException} will be thrown. + * @param removing Whether the segments are being obtained as part of a removal. If true then a + * partial segment list is returned in the case that a load error prevents all segments from + * being listed. If false then an {@link IOException} will be thrown in this case. * @return The list of downloadable {@link Segment}s. - * @throws IOException Thrown if {@code allowPartialIndex} is false and a load error occurs, or if - * the media is not in a form that allows for its segments to be listed. + * @throws IOException Thrown if {@code allowPartialIndex} is false and an execution error occurs, + * or if the media is not in a form that allows for its segments to be listed. */ - protected abstract List getSegments( - DataSource dataSource, M manifest, boolean allowIncompleteList) throws IOException; + protected abstract List getSegments(DataSource dataSource, M manifest, boolean removing) + throws IOException, InterruptedException; protected static DataSpec getCompressibleDataSpec(Uri uri) { return new DataSpec.Builder().setUri(uri).setFlags(DataSpec.FLAG_ALLOW_GZIP).build(); } + private void addActiveRunnable(RunnableFutureTask runnable) + throws InterruptedException { + synchronized (activeRunnables) { + if (isCanceled) { + throw new InterruptedException(); + } + activeRunnables.add(runnable); + } + } + + private void removeActiveRunnable(RunnableFutureTask runnable) { + synchronized (activeRunnables) { + activeRunnables.remove(runnable); + } + } + + private void removeActiveRunnable(int index) { + synchronized (activeRunnables) { + activeRunnables.remove(index); + } + } + private static void mergeSegments(List segments, CacheKeyFactory keyFactory) { HashMap lastIndexByCacheKey = new HashMap<>(); int nextOutIndex = 0; @@ -298,6 +452,47 @@ public abstract class SegmentDownloader> impleme && dataSpec1.httpRequestHeaders.equals(dataSpec2.httpRequestHeaders); } + private static final class SegmentDownloadRunnable extends RunnableFutureTask { + + public final Segment segment; + public final CacheDataSource dataSource; + @NullableType private final ProgressNotifier progressNotifier; + public final byte[] temporaryBuffer; + private final CacheWriter cacheWriter; + + public SegmentDownloadRunnable( + Segment segment, + CacheDataSource dataSource, + @NullableType ProgressNotifier progressNotifier, + byte[] temporaryBuffer) { + this.segment = segment; + this.dataSource = dataSource; + this.progressNotifier = progressNotifier; + this.temporaryBuffer = temporaryBuffer; + this.cacheWriter = + new CacheWriter( + dataSource, + segment.dataSpec, + /* allowShortContent= */ false, + temporaryBuffer, + progressNotifier); + } + + @Override + protected Void doWork() throws IOException { + cacheWriter.cache(); + if (progressNotifier != null) { + progressNotifier.onSegmentDownloaded(); + } + return null; + } + + @Override + protected void cancelWork() { + cacheWriter.cancel(); + } + } + private static final class ProgressNotifier implements CacheWriter.ProgressListener { private final ProgressListener progressListener; diff --git a/library/core/src/main/java/com/google/android/exoplayer2/upstream/cache/CacheDataSource.java b/library/core/src/main/java/com/google/android/exoplayer2/upstream/cache/CacheDataSource.java index e1e2e5194b..7398ff58a2 100644 --- a/library/core/src/main/java/com/google/android/exoplayer2/upstream/cache/CacheDataSource.java +++ b/library/core/src/main/java/com/google/android/exoplayer2/upstream/cache/CacheDataSource.java @@ -376,8 +376,6 @@ public final class CacheDataSource implements DataSource { @Nullable private final DataSource cacheWriteDataSource; private final DataSource upstreamDataSource; private final CacheKeyFactory cacheKeyFactory; - @Nullable private final PriorityTaskManager upstreamPriorityTaskManager; - private final int upstreamPriority; @Nullable private final EventListener eventListener; private final boolean blockOnCache; @@ -513,8 +511,6 @@ public final class CacheDataSource implements DataSource { this.ignoreCacheOnError = (flags & FLAG_IGNORE_CACHE_ON_ERROR) != 0; this.ignoreCacheForUnsetLengthRequests = (flags & FLAG_IGNORE_CACHE_FOR_UNSET_LENGTH_REQUESTS) != 0; - this.upstreamPriority = upstreamPriority; - this.upstreamPriorityTaskManager = upstreamPriorityTaskManager; if (upstreamDataSource != null) { if (upstreamPriorityTaskManager != null) { upstreamDataSource = @@ -543,24 +539,6 @@ public final class CacheDataSource implements DataSource { return cacheKeyFactory; } - /** - * Returns the {@link PriorityTaskManager} used when there's a cache miss and requests need to be - * made to the upstream {@link DataSource}, or {@code null} if there is none. - */ - @Nullable - public PriorityTaskManager getUpstreamPriorityTaskManager() { - return upstreamPriorityTaskManager; - } - - /** - * Returns the priority used when there's a cache miss and requests need to be made to the - * upstream {@link DataSource}. The priority is only used if the source has a {@link - * PriorityTaskManager}. - */ - public int getUpstreamPriority() { - return upstreamPriority; - } - @Override public void addTransferListener(TransferListener transferListener) { cacheReadDataSource.addTransferListener(transferListener); diff --git a/library/core/src/main/java/com/google/android/exoplayer2/upstream/cache/CacheWriter.java b/library/core/src/main/java/com/google/android/exoplayer2/upstream/cache/CacheWriter.java index ee44b0dc51..8ea2b4e280 100644 --- a/library/core/src/main/java/com/google/android/exoplayer2/upstream/cache/CacheWriter.java +++ b/library/core/src/main/java/com/google/android/exoplayer2/upstream/cache/CacheWriter.java @@ -25,7 +25,6 @@ import com.google.android.exoplayer2.util.PriorityTaskManager.PriorityTooLowExce import com.google.android.exoplayer2.util.Util; import java.io.IOException; import java.io.InterruptedIOException; -import java.util.concurrent.atomic.AtomicBoolean; /** Caching related utility methods. */ public final class CacheWriter { @@ -52,7 +51,6 @@ public final class CacheWriter { private final Cache cache; private final DataSpec dataSpec; private final boolean allowShortContent; - private final AtomicBoolean isCanceled; private final String cacheKey; private final byte[] temporaryBuffer; @Nullable private final ProgressListener progressListener; @@ -62,6 +60,8 @@ public final class CacheWriter { private long endPosition; private long bytesCached; + private volatile boolean isCanceled; + /** * @param dataSource A {@link CacheDataSource} that writes to the target cache. * @param dataSpec Defines the data to be written. @@ -69,9 +69,6 @@ public final class CacheWriter { * defined by the {@link DataSpec}. If {@code true} and the request exceeds the length of the * content, then the content will be cached to the end. If {@code false} and the request * exceeds the length of the content, {@link #cache} will throw an {@link IOException}. - * @param isCanceled An optional cancelation signal. If specified, {@link #cache} will check the - * value of this signal frequently during caching. If the value is {@code true}, the operation - * will be considered canceled and {@link #cache} will throw {@link InterruptedIOException}. * @param temporaryBuffer A temporary buffer to be used during caching, or {@code null} if the * writer should instantiate its own internal temporary buffer. * @param progressListener An optional progress listener. @@ -80,14 +77,12 @@ public final class CacheWriter { CacheDataSource dataSource, DataSpec dataSpec, boolean allowShortContent, - @Nullable AtomicBoolean isCanceled, @Nullable byte[] temporaryBuffer, @Nullable ProgressListener progressListener) { this.dataSource = dataSource; this.cache = dataSource.getCache(); this.dataSpec = dataSpec; this.allowShortContent = allowShortContent; - this.isCanceled = isCanceled == null ? new AtomicBoolean() : isCanceled; this.temporaryBuffer = temporaryBuffer == null ? new byte[DEFAULT_BUFFER_SIZE_BYTES] : temporaryBuffer; this.progressListener = progressListener; @@ -95,6 +90,15 @@ public final class CacheWriter { nextPosition = dataSpec.position; } + /** + * Cancels this writer's caching operation. {@link #cache} checks for cancelation frequently + * during execution, and throws an {@link InterruptedIOException} if it sees that the caching + * operation has been canceled. + */ + public void cancel() { + isCanceled = true; + } + /** * Caches the requested data, skipping any that's already cached. * @@ -230,7 +234,7 @@ public final class CacheWriter { } private void throwIfCanceled() throws InterruptedIOException { - if (isCanceled.get()) { + if (isCanceled) { throw new InterruptedIOException(); } } diff --git a/library/core/src/main/java/com/google/android/exoplayer2/util/RunnableFutureTask.java b/library/core/src/main/java/com/google/android/exoplayer2/util/RunnableFutureTask.java new file mode 100644 index 0000000000..9f06f40a67 --- /dev/null +++ b/library/core/src/main/java/com/google/android/exoplayer2/util/RunnableFutureTask.java @@ -0,0 +1,172 @@ +/* + * Copyright 2020 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.android.exoplayer2.util; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import androidx.annotation.Nullable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * A {@link RunnableFuture} that supports additional uninterruptible operations to query whether + * execution has started and finished. + * + * @param The type of the result. + * @param The type of any {@link ExecutionException} cause. + */ +public abstract class RunnableFutureTask implements RunnableFuture { + + private final ConditionVariable started; + private final ConditionVariable finished; + private final Object cancelLock; + + @Nullable private Exception exception; + @Nullable private R result; + + @Nullable private Thread workThread; + private boolean canceled; + + protected RunnableFutureTask() { + started = new ConditionVariable(); + finished = new ConditionVariable(); + cancelLock = new Object(); + } + + /** Blocks until the task has started, or has been canceled without having been started. */ + public final void blockUntilStarted() { + started.blockUninterruptible(); + } + + /** Blocks until the task has finished, or has been canceled without having been started. */ + public final void blockUntilFinished() { + finished.blockUninterruptible(); + } + + // Future implementation. + + @Override + @UnknownNull + public final R get() throws ExecutionException, InterruptedException { + finished.block(); + return getResult(); + } + + @Override + @UnknownNull + public final R get(long timeout, TimeUnit unit) + throws ExecutionException, InterruptedException, TimeoutException { + long timeoutMs = MILLISECONDS.convert(timeout, unit); + if (!finished.block(timeoutMs)) { + throw new TimeoutException(); + } + return getResult(); + } + + @Override + public final boolean cancel(boolean interruptIfRunning) { + synchronized (cancelLock) { + if (canceled || finished.isOpen()) { + return false; + } + canceled = true; + cancelWork(); + @Nullable Thread workThread = this.workThread; + if (workThread != null) { + if (interruptIfRunning) { + workThread.interrupt(); + } + } else { + started.open(); + finished.open(); + } + return true; + } + } + + @Override + public final boolean isDone() { + return finished.isOpen(); + } + + @Override + public final boolean isCancelled() { + return canceled; + } + + // Runnable implementation. + + @Override + public final void run() { + synchronized (cancelLock) { + if (canceled) { + return; + } + workThread = Thread.currentThread(); + } + started.open(); + try { + result = doWork(); + } catch (Exception e) { + // Must be an instance of E or RuntimeException. + exception = e; + } finally { + synchronized (cancelLock) { + finished.open(); + workThread = null; + // Clear the interrupted flag if set, to avoid it leaking into any subsequent tasks executed + // using the calling thread. + Thread.interrupted(); + } + } + } + + // Internal methods. + + /** + * Performs the work or computation. + * + * @return The computed result. + * @throws E If an error occurred. + */ + @UnknownNull + protected abstract R doWork() throws E; + + /** + * Cancels any work being done by {@link #doWork()}. If {@link #doWork()} is currently executing + * then the thread on which it's executing may be interrupted immediately after this method + * returns. + * + *

The default implementation does nothing. + */ + protected void cancelWork() { + // Do nothing. + } + + @SuppressWarnings("return.type.incompatible") + @UnknownNull + private R getResult() throws ExecutionException { + if (canceled) { + throw new CancellationException(); + } else if (exception != null) { + throw new ExecutionException(exception); + } + return result; + } +} diff --git a/library/core/src/test/java/com/google/android/exoplayer2/upstream/cache/CacheDataSourceTest.java b/library/core/src/test/java/com/google/android/exoplayer2/upstream/cache/CacheDataSourceTest.java index 328d80bf48..652a5643a7 100644 --- a/library/core/src/test/java/com/google/android/exoplayer2/upstream/cache/CacheDataSourceTest.java +++ b/library/core/src/test/java/com/google/android/exoplayer2/upstream/cache/CacheDataSourceTest.java @@ -362,7 +362,6 @@ public final class CacheDataSourceTest { new CacheDataSource(cache, upstream2), unboundedDataSpec, /* allowShortContent= */ false, - /* isCanceled= */ null, /* temporaryBuffer= */ null, /* progressListener= */ null); cacheWriter.cache(); @@ -413,7 +412,6 @@ public final class CacheDataSourceTest { new CacheDataSource(cache, upstream2), unboundedDataSpec, /* allowShortContent= */ false, - /* isCanceled= */ null, /* temporaryBuffer= */ null, /* progressListener= */ null); cacheWriter.cache(); @@ -439,7 +437,6 @@ public final class CacheDataSourceTest { new CacheDataSource(cache, upstream), dataSpec, /* allowShortContent= */ false, - /* isCanceled= */ null, /* temporaryBuffer= */ null, /* progressListener= */ null); cacheWriter.cache(); @@ -477,7 +474,6 @@ public final class CacheDataSourceTest { new CacheDataSource(cache, upstream), dataSpec, /* allowShortContent= */ false, - /* isCanceled= */ null, /* temporaryBuffer= */ null, /* progressListener= */ null); cacheWriter.cache(); diff --git a/library/core/src/test/java/com/google/android/exoplayer2/upstream/cache/CacheWriterTest.java b/library/core/src/test/java/com/google/android/exoplayer2/upstream/cache/CacheWriterTest.java index 3e5cb119fd..d0cc42b062 100644 --- a/library/core/src/test/java/com/google/android/exoplayer2/upstream/cache/CacheWriterTest.java +++ b/library/core/src/test/java/com/google/android/exoplayer2/upstream/cache/CacheWriterTest.java @@ -116,7 +116,6 @@ public final class CacheWriterTest { new CacheDataSource(cache, dataSource), new DataSpec(Uri.parse("test_data")), /* allowShortContent= */ false, - /* isCanceled= */ null, /* temporaryBuffer= */ null, counters); cacheWriter.cache(); @@ -139,7 +138,6 @@ public final class CacheWriterTest { new CacheDataSource(cache, dataSource), dataSpec, /* allowShortContent= */ false, - /* isCanceled= */ null, /* temporaryBuffer= */ null, counters); cacheWriter.cache(); @@ -152,7 +150,6 @@ public final class CacheWriterTest { new CacheDataSource(cache, dataSource), new DataSpec(testUri), /* allowShortContent= */ false, - /* isCanceled= */ null, /* temporaryBuffer= */ null, counters); cacheWriter.cache(); @@ -176,7 +173,6 @@ public final class CacheWriterTest { new CacheDataSource(cache, dataSource), dataSpec, /* allowShortContent= */ false, - /* isCanceled= */ null, /* temporaryBuffer= */ null, counters); cacheWriter.cache(); @@ -201,7 +197,6 @@ public final class CacheWriterTest { new CacheDataSource(cache, dataSource), dataSpec, /* allowShortContent= */ false, - /* isCanceled= */ null, /* temporaryBuffer= */ null, counters); cacheWriter.cache(); @@ -214,7 +209,6 @@ public final class CacheWriterTest { new CacheDataSource(cache, dataSource), new DataSpec(testUri), /* allowShortContent= */ false, - /* isCanceled= */ null, /* temporaryBuffer= */ null, counters); cacheWriter.cache(); @@ -237,7 +231,6 @@ public final class CacheWriterTest { new CacheDataSource(cache, dataSource), dataSpec, /* allowShortContent= */ true, - /* isCanceled= */ null, /* temporaryBuffer= */ null, counters); cacheWriter.cache(); @@ -262,7 +255,6 @@ public final class CacheWriterTest { new CacheDataSource(cache, dataSource), dataSpec, /* allowShortContent= */ false, - /* isCanceled= */ null, /* temporaryBuffer= */ null, /* progressListener= */ null) .cache()); @@ -288,7 +280,6 @@ public final class CacheWriterTest { new CacheDataSource(cache, dataSource), new DataSpec(Uri.parse("test_data")), /* allowShortContent= */ false, - /* isCanceled= */ null, /* temporaryBuffer= */ null, counters); cacheWriter.cache(); diff --git a/library/core/src/test/java/com/google/android/exoplayer2/util/RunnableFutureTaskTest.java b/library/core/src/test/java/com/google/android/exoplayer2/util/RunnableFutureTaskTest.java new file mode 100644 index 0000000000..9a8aac3020 --- /dev/null +++ b/library/core/src/test/java/com/google/android/exoplayer2/util/RunnableFutureTaskTest.java @@ -0,0 +1,302 @@ +/* + * Copyright (C) 2020 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.android.exoplayer2.util; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; + +import androidx.test.ext.junit.runners.AndroidJUnit4; +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.Test; +import org.junit.runner.RunWith; + +/** Unit tests for {@link RunnableFutureTask}. */ +@RunWith(AndroidJUnit4.class) +public class RunnableFutureTaskTest { + + @Test + public void blockUntilStarted_ifNotStarted_blocks() throws InterruptedException { + RunnableFutureTask task = + new RunnableFutureTask() { + @Override + protected Void doWork() { + return null; + } + }; + + AtomicBoolean blockUntilStartedReturned = new AtomicBoolean(); + Thread testThread = + new Thread() { + @Override + public void run() { + task.blockUntilStarted(); + blockUntilStartedReturned.set(true); + } + }; + testThread.start(); + + Thread.sleep(1000); + assertThat(blockUntilStartedReturned.get()).isFalse(); + + // Thread cleanup. + task.run(); + testThread.join(); + } + + @Test(timeout = 1000) + public void blockUntilStarted_ifStarted_unblocks() throws InterruptedException { + ConditionVariable finish = new ConditionVariable(); + RunnableFutureTask task = + new RunnableFutureTask() { + @Override + protected Void doWork() { + finish.blockUninterruptible(); + return null; + } + }; + Thread testThread = new Thread(task); + testThread.start(); + task.blockUntilStarted(); // Should unblock. + + // Thread cleanup. + finish.open(); + testThread.join(); + } + + @Test(timeout = 1000) + public void blockUntilStarted_ifCanceled_unblocks() { + RunnableFutureTask task = + new RunnableFutureTask() { + @Override + protected Void doWork() { + return null; + } + }; + + task.cancel(/* interruptIfRunning= */ false); + + // Should not block. + task.blockUntilStarted(); + } + + @Test + public void blockUntilFinished_ifNotFinished_blocks() throws InterruptedException { + ConditionVariable finish = new ConditionVariable(); + RunnableFutureTask task = + new RunnableFutureTask() { + @Override + protected Void doWork() { + finish.blockUninterruptible(); + return null; + } + }; + Thread testThread1 = new Thread(task); + testThread1.start(); + + AtomicBoolean blockUntilFinishedReturned = new AtomicBoolean(); + Thread testThread2 = + new Thread() { + @Override + public void run() { + task.blockUntilFinished(); + blockUntilFinishedReturned.set(true); + } + }; + testThread2.start(); + + Thread.sleep(1000); + assertThat(blockUntilFinishedReturned.get()).isFalse(); + + // Thread cleanup. + finish.open(); + testThread1.join(); + testThread2.join(); + } + + @Test(timeout = 1000) + public void blockUntilFinished_ifFinished_unblocks() throws InterruptedException { + RunnableFutureTask task = + new RunnableFutureTask() { + @Override + protected Void doWork() { + return null; + } + }; + Thread testThread = new Thread(task); + testThread.start(); + + task.blockUntilFinished(); + assertThat(task.isDone()).isTrue(); + + // Thread cleanup. + testThread.join(); + } + + @Test(timeout = 1000) + public void blockUntilFinished_ifCanceled_unblocks() { + RunnableFutureTask task = + new RunnableFutureTask() { + @Override + protected Void doWork() { + return null; + } + }; + + task.cancel(/* interruptIfRunning= */ false); + + // Should not block. + task.blockUntilFinished(); + } + + @Test + public void get_ifNotFinished_blocks() throws InterruptedException { + ConditionVariable finish = new ConditionVariable(); + RunnableFutureTask task = + new RunnableFutureTask() { + @Override + protected Void doWork() { + finish.blockUninterruptible(); + return null; + } + }; + Thread testThread1 = new Thread(task); + testThread1.start(); + + AtomicBoolean blockUntilGetResultReturned = new AtomicBoolean(); + Thread testThread2 = + new Thread() { + @Override + public void run() { + try { + task.get(); + } catch (ExecutionException | InterruptedException e) { + // Do nothing. + } finally { + blockUntilGetResultReturned.set(true); + } + } + }; + testThread2.start(); + + Thread.sleep(1000); + assertThat(blockUntilGetResultReturned.get()).isFalse(); + + // Thread cleanup. + finish.open(); + testThread1.join(); + testThread2.join(); + } + + @Test(timeout = 1000) + public void get_returnsResult() throws ExecutionException, InterruptedException { + Object result = new Object(); + RunnableFutureTask task = + new RunnableFutureTask() { + @Override + protected Object doWork() { + return result; + } + }; + Thread testThread = new Thread(task); + testThread.start(); + + assertThat(task.get()).isSameInstanceAs(result); + + // Thread cleanup. + testThread.join(); + } + + @Test(timeout = 1000) + public void get_throwsExecutionException_containsIOException() throws InterruptedException { + IOException exception = new IOException(); + RunnableFutureTask task = + new RunnableFutureTask() { + @Override + protected Object doWork() throws IOException { + throw exception; + } + }; + Thread testThread = new Thread(task); + testThread.start(); + + ExecutionException executionException = assertThrows(ExecutionException.class, task::get); + assertThat(executionException).hasCauseThat().isSameInstanceAs(exception); + + // Thread cleanup. + testThread.join(); + } + + @Test(timeout = 1000) + public void get_throwsExecutionException_containsRuntimeException() throws InterruptedException { + RuntimeException exception = new RuntimeException(); + RunnableFutureTask task = + new RunnableFutureTask() { + @Override + protected Object doWork() { + throw exception; + } + }; + Thread testThread = new Thread(task); + testThread.start(); + + ExecutionException executionException = assertThrows(ExecutionException.class, task::get); + assertThat(executionException).hasCauseThat().isSameInstanceAs(exception); + + // Thread cleanup. + testThread.join(); + } + + @Test + public void run_throwsError() { + Error error = new Error(); + RunnableFutureTask task = + new RunnableFutureTask() { + @Override + protected Object doWork() { + throw error; + } + }; + Error thrownError = assertThrows(Error.class, task::run); + assertThat(thrownError).isSameInstanceAs(error); + } + + @Test + public void cancel_whenNotStarted_returnsTrue() { + RunnableFutureTask task = + new RunnableFutureTask() { + @Override + protected Void doWork() { + return null; + } + }; + assertThat(task.cancel(/* interruptIfRunning= */ false)).isTrue(); + } + + @Test + public void cancel_whenCanceled_returnsFalse() { + RunnableFutureTask task = + new RunnableFutureTask() { + @Override + protected Void doWork() { + return null; + } + }; + task.cancel(/* interruptIfRunning= */ false); + assertThat(task.cancel(/* interruptIfRunning= */ false)).isFalse(); + } +} diff --git a/library/dash/src/main/java/com/google/android/exoplayer2/source/dash/offline/DashDownloader.java b/library/dash/src/main/java/com/google/android/exoplayer2/source/dash/offline/DashDownloader.java index 31a1f84674..7b99d55fd9 100644 --- a/library/dash/src/main/java/com/google/android/exoplayer2/source/dash/offline/DashDownloader.java +++ b/library/dash/src/main/java/com/google/android/exoplayer2/source/dash/offline/DashDownloader.java @@ -36,10 +36,12 @@ import com.google.android.exoplayer2.upstream.DataSource; import com.google.android.exoplayer2.upstream.DataSpec; import com.google.android.exoplayer2.upstream.ParsingLoadable.Parser; import com.google.android.exoplayer2.upstream.cache.CacheDataSource; +import com.google.android.exoplayer2.util.RunnableFutureTask; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executor; +import org.checkerframework.checker.nullness.compatqual.NullableType; /** * A downloader for DASH streams. @@ -140,8 +142,8 @@ public final class DashDownloader extends SegmentDownloader { @Override protected List getSegments( - DataSource dataSource, DashManifest manifest, boolean allowIncompleteList) - throws IOException { + DataSource dataSource, DashManifest manifest, boolean removing) + throws IOException, InterruptedException { ArrayList segments = new ArrayList<>(); for (int i = 0; i < manifest.getPeriodCount(); i++) { Period period = manifest.getPeriod(i); @@ -150,36 +152,31 @@ public final class DashDownloader extends SegmentDownloader { List adaptationSets = period.adaptationSets; for (int j = 0; j < adaptationSets.size(); j++) { addSegmentsForAdaptationSet( - dataSource, - adaptationSets.get(j), - periodStartUs, - periodDurationUs, - allowIncompleteList, - segments); + dataSource, adaptationSets.get(j), periodStartUs, periodDurationUs, removing, segments); } } return segments; } - private static void addSegmentsForAdaptationSet( + private void addSegmentsForAdaptationSet( DataSource dataSource, AdaptationSet adaptationSet, long periodStartUs, long periodDurationUs, - boolean allowIncompleteList, + boolean removing, ArrayList out) - throws IOException { + throws IOException, InterruptedException { for (int i = 0; i < adaptationSet.representations.size(); i++) { Representation representation = adaptationSet.representations.get(i); DashSegmentIndex index; try { - index = getSegmentIndex(dataSource, adaptationSet.type, representation); + index = getSegmentIndex(dataSource, adaptationSet.type, representation, removing); if (index == null) { // Loading succeeded but there was no index. throw new DownloadException("Missing segment index"); } } catch (IOException e) { - if (!allowIncompleteList) { + if (!removing) { throw e; } // Generating an incomplete segment list is allowed. Advance to the next representation. @@ -215,16 +212,24 @@ public final class DashDownloader extends SegmentDownloader { out.add(new Segment(startTimeUs, dataSpec)); } - private static @Nullable DashSegmentIndex getSegmentIndex( - DataSource dataSource, int trackType, Representation representation) throws IOException { + @Nullable + private DashSegmentIndex getSegmentIndex( + DataSource dataSource, int trackType, Representation representation, boolean removing) + throws IOException, InterruptedException { DashSegmentIndex index = representation.getIndex(); if (index != null) { return index; } - ChunkIndex seekMap = DashUtil.loadChunkIndex(dataSource, trackType, representation); + RunnableFutureTask<@NullableType ChunkIndex, IOException> runnable = + new RunnableFutureTask<@NullableType ChunkIndex, IOException>() { + @Override + protected @NullableType ChunkIndex doWork() throws IOException { + return DashUtil.loadChunkIndex(dataSource, trackType, representation); + } + }; + @Nullable ChunkIndex seekMap = execute(runnable, removing); return seekMap == null ? null : new DashWrappingSegmentIndex(seekMap, representation.presentationTimeOffsetUs); } - } diff --git a/library/hls/src/main/java/com/google/android/exoplayer2/source/hls/offline/HlsDownloader.java b/library/hls/src/main/java/com/google/android/exoplayer2/source/hls/offline/HlsDownloader.java index 858fe8f527..39462f3d06 100644 --- a/library/hls/src/main/java/com/google/android/exoplayer2/source/hls/offline/HlsDownloader.java +++ b/library/hls/src/main/java/com/google/android/exoplayer2/source/hls/offline/HlsDownloader.java @@ -134,8 +134,8 @@ public final class HlsDownloader extends SegmentDownloader { } @Override - protected List getSegments( - DataSource dataSource, HlsPlaylist playlist, boolean allowIncompleteList) throws IOException { + protected List getSegments(DataSource dataSource, HlsPlaylist playlist, boolean removing) + throws IOException, InterruptedException { ArrayList mediaPlaylistDataSpecs = new ArrayList<>(); if (playlist instanceof HlsMasterPlaylist) { HlsMasterPlaylist masterPlaylist = (HlsMasterPlaylist) playlist; @@ -151,9 +151,9 @@ public final class HlsDownloader extends SegmentDownloader { segments.add(new Segment(/* startTimeUs= */ 0, mediaPlaylistDataSpec)); HlsMediaPlaylist mediaPlaylist; try { - mediaPlaylist = (HlsMediaPlaylist) getManifest(dataSource, mediaPlaylistDataSpec); + mediaPlaylist = (HlsMediaPlaylist) getManifest(dataSource, mediaPlaylistDataSpec, removing); } catch (IOException e) { - if (!allowIncompleteList) { + if (!removing) { throw e; } // Generating an incomplete segment list is allowed. Advance to the next media playlist.