Move ExtractorMediaPeriod to new SampleQueue methods

This change allows you to enable/disable tracks within which
all samples are key-frames without any re-buffering (e.g. audio,
text and metadata). This effectively reverts V2 back to the
behavior in V1, only this time we're doing it properly. []ly
disabling/enabling, or disabling/enabling whilst paused, no longer
cause samples to get "lost" between the source and renderers.

Note it also becomes really easy to support a few other things,
although support is not exposed in this change:

- Enable/disable video tracks without any re-buffering, by
  changing the toKeyframe argument passed to discardTo to true.
- Retain media in the buffer for some time after it's been played
  (e.g. to support a single back-5s-seek efficiently), by
  subtracting the desired back-buffer time from the value that's
  passed to discardTo.

Issue: #2956
Issue: #2926

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=160128586
This commit is contained in:
olly 2017-06-26 04:09:59 -07:00 committed by Oliver Woodman
parent b3c6f6fb31
commit 1b71e3b40d
2 changed files with 109 additions and 55 deletions

View File

@ -47,7 +47,8 @@ import java.io.IOException;
* A {@link MediaPeriod} that extracts data using an {@link Extractor}. * A {@link MediaPeriod} that extracts data using an {@link Extractor}.
*/ */
/* package */ final class ExtractorMediaPeriod implements MediaPeriod, ExtractorOutput, /* package */ final class ExtractorMediaPeriod implements MediaPeriod, ExtractorOutput,
Loader.Callback<ExtractorMediaPeriod.ExtractingLoadable>, UpstreamFormatChangedListener { Loader.Callback<ExtractorMediaPeriod.ExtractingLoadable>, Loader.ReleaseCallback,
UpstreamFormatChangedListener {
/** /**
* When the source's duration is unknown, it is calculated by adding this value to the largest * When the source's duration is unknown, it is calculated by adding this value to the largest
@ -146,20 +147,26 @@ import java.io.IOException;
} }
public void release() { public void release() {
final ExtractorHolder extractorHolder = this.extractorHolder; boolean releasedSynchronously = loader.release(this);
loader.release(new Runnable() { if (!releasedSynchronously) {
// Discard as much as we can synchronously.
int trackCount = sampleQueues.size();
for (int i = 0; i < trackCount; i++) {
sampleQueues.valueAt(i).discardToEnd();
}
}
handler.removeCallbacksAndMessages(null);
released = true;
}
@Override @Override
public void run() { public void onLoaderReleased() {
extractorHolder.release(); extractorHolder.release();
int trackCount = sampleQueues.size(); int trackCount = sampleQueues.size();
for (int i = 0; i < trackCount; i++) { for (int i = 0; i < trackCount; i++) {
sampleQueues.valueAt(i).disable(); sampleQueues.valueAt(i).reset(true);
} }
} }
});
handler.removeCallbacksAndMessages(null);
released = true;
}
@Override @Override
public void prepare(Callback callback, long positionUs) { public void prepare(Callback callback, long positionUs) {
@ -182,19 +189,21 @@ import java.io.IOException;
public long selectTracks(TrackSelection[] selections, boolean[] mayRetainStreamFlags, public long selectTracks(TrackSelection[] selections, boolean[] mayRetainStreamFlags,
SampleStream[] streams, boolean[] streamResetFlags, long positionUs) { SampleStream[] streams, boolean[] streamResetFlags, long positionUs) {
Assertions.checkState(prepared); Assertions.checkState(prepared);
// Disable old tracks. int oldEnabledTrackCount = enabledTrackCount;
// Deselect old tracks.
for (int i = 0; i < selections.length; i++) { for (int i = 0; i < selections.length; i++) {
if (streams[i] != null && (selections[i] == null || !mayRetainStreamFlags[i])) { if (streams[i] != null && (selections[i] == null || !mayRetainStreamFlags[i])) {
int track = ((SampleStreamImpl) streams[i]).track; int track = ((SampleStreamImpl) streams[i]).track;
Assertions.checkState(trackEnabledStates[track]); Assertions.checkState(trackEnabledStates[track]);
enabledTrackCount--; enabledTrackCount--;
trackEnabledStates[track] = false; trackEnabledStates[track] = false;
sampleQueues.valueAt(track).disable();
streams[i] = null; streams[i] = null;
} }
} }
// Enable new tracks. // We'll always need to seek if this is a first selection to a non-zero position, or if we're
boolean selectedNewTracks = false; // making a selection having previously disabled all tracks.
boolean seekRequired = seenFirstTrackSelection ? oldEnabledTrackCount == 0 : positionUs != 0;
// Select new tracks.
for (int i = 0; i < selections.length; i++) { for (int i = 0; i < selections.length; i++) {
if (streams[i] == null && selections[i] != null) { if (streams[i] == null && selections[i] != null) {
TrackSelection selection = selections[i]; TrackSelection selection = selections[i];
@ -206,16 +215,12 @@ import java.io.IOException;
trackEnabledStates[track] = true; trackEnabledStates[track] = true;
streams[i] = new SampleStreamImpl(track); streams[i] = new SampleStreamImpl(track);
streamResetFlags[i] = true; streamResetFlags[i] = true;
selectedNewTracks = true; // If there's still a chance of avoiding a seek, try and seek within the sample queue.
} if (!seekRequired) {
} SampleQueue sampleQueue = sampleQueues.valueAt(i);
if (!seenFirstTrackSelection) { sampleQueue.rewind();
// At the time of the first track selection all queues will be enabled, so we need to disable seekRequired = !sampleQueue.advanceTo(positionUs, true, true)
// any that are no longer required. && sampleQueue.getReadIndex() != 0;
int trackCount = sampleQueues.size();
for (int i = 0; i < trackCount; i++) {
if (!trackEnabledStates[i]) {
sampleQueues.valueAt(i).disable();
} }
} }
} }
@ -224,7 +229,7 @@ import java.io.IOException;
if (loader.isLoading()) { if (loader.isLoading()) {
loader.cancelLoading(); loader.cancelLoading();
} }
} else if (seenFirstTrackSelection ? selectedNewTracks : positionUs != 0) { } else if (seekRequired) {
positionUs = seekToUs(positionUs); positionUs = seekToUs(positionUs);
// We'll need to reset renderers consuming from all streams due to the seek. // We'll need to reset renderers consuming from all streams due to the seek.
for (int i = 0; i < streams.length; i++) { for (int i = 0; i < streams.length; i++) {
@ -239,7 +244,10 @@ import java.io.IOException;
@Override @Override
public void discardBuffer(long positionUs) { public void discardBuffer(long positionUs) {
// Do nothing. int trackCount = sampleQueues.size();
for (int i = 0; i < trackCount; i++) {
sampleQueues.valueAt(i).discardTo(positionUs, false, trackEnabledStates[i]);
}
} }
@Override @Override
@ -303,9 +311,13 @@ import java.io.IOException;
// If we're not pending a reset, see if we can seek within the sample queues. // If we're not pending a reset, see if we can seek within the sample queues.
boolean seekInsideBuffer = !isPendingReset(); boolean seekInsideBuffer = !isPendingReset();
for (int i = 0; seekInsideBuffer && i < trackCount; i++) { for (int i = 0; seekInsideBuffer && i < trackCount; i++) {
if (trackEnabledStates[i]) { SampleQueue sampleQueue = sampleQueues.valueAt(i);
seekInsideBuffer = sampleQueues.valueAt(i).skipToKeyframeBefore(positionUs, false); sampleQueue.rewind();
} // TODO: For sparse tracks (e.g. text, metadata) this may return false when an in-buffer
// seek should be allowed. If there are non-sparse tracks (e.g. video, audio) for which
// in-buffer seeking is successful, we should perform an in-buffer seek unconditionally.
seekInsideBuffer = sampleQueue.advanceTo(positionUs, true, false);
sampleQueue.discardToRead();
} }
// If we failed to seek within the sample queues, we need to restart. // If we failed to seek within the sample queues, we need to restart.
if (!seekInsideBuffer) { if (!seekInsideBuffer) {
@ -338,17 +350,16 @@ import java.io.IOException;
if (notifyReset || isPendingReset()) { if (notifyReset || isPendingReset()) {
return C.RESULT_NOTHING_READ; return C.RESULT_NOTHING_READ;
} }
return sampleQueues.valueAt(track).read(formatHolder, buffer, formatRequired,
return sampleQueues.valueAt(track).readData(formatHolder, buffer, formatRequired,
loadingFinished, lastSeekPositionUs); loadingFinished, lastSeekPositionUs);
} }
/* package */ void skipData(int track, long positionUs) { /* package */ void skipData(int track, long positionUs) {
SampleQueue sampleQueue = sampleQueues.valueAt(track); SampleQueue sampleQueue = sampleQueues.valueAt(track);
if (loadingFinished && positionUs > sampleQueue.getLargestQueuedTimestampUs()) { if (loadingFinished && positionUs > sampleQueue.getLargestQueuedTimestampUs()) {
sampleQueue.skipAll(); sampleQueue.advanceToEnd();
} else { } else {
sampleQueue.skipToKeyframeBefore(positionUs, true); sampleQueue.advanceTo(positionUs, true, true);
} }
} }
@ -372,12 +383,15 @@ import java.io.IOException;
@Override @Override
public void onLoadCanceled(ExtractingLoadable loadable, long elapsedRealtimeMs, public void onLoadCanceled(ExtractingLoadable loadable, long elapsedRealtimeMs,
long loadDurationMs, boolean released) { long loadDurationMs, boolean released) {
if (released) {
return;
}
copyLengthFromLoader(loadable); copyLengthFromLoader(loadable);
if (!released && enabledTrackCount > 0) {
int trackCount = sampleQueues.size(); int trackCount = sampleQueues.size();
for (int i = 0; i < trackCount; i++) { for (int i = 0; i < trackCount; i++) {
sampleQueues.valueAt(i).reset(trackEnabledStates[i]); sampleQueues.valueAt(i).reset(true);
} }
if (enabledTrackCount > 0) {
callback.onContinueLoadingRequested(this); callback.onContinueLoadingRequested(this);
} }
} }
@ -508,7 +522,7 @@ import java.io.IOException;
notifyReset = prepared; notifyReset = prepared;
int trackCount = sampleQueues.size(); int trackCount = sampleQueues.size();
for (int i = 0; i < trackCount; i++) { for (int i = 0; i < trackCount; i++) {
sampleQueues.valueAt(i).reset(!prepared || trackEnabledStates[i]); sampleQueues.valueAt(i).reset(true);
} }
loadable.setLoadPosition(0, 0); loadable.setLoadPosition(0, 0);
} }

View File

@ -119,17 +119,23 @@ public final class Loader implements LoaderErrorThrower {
} }
/**
* A callback to be notified when a {@link Loader} has finished being released.
*/
public interface ReleaseCallback {
/**
* Called when the {@link Loader} has finished being released.
*/
void onLoaderReleased();
}
public static final int RETRY = 0; public static final int RETRY = 0;
public static final int RETRY_RESET_ERROR_COUNT = 1; public static final int RETRY_RESET_ERROR_COUNT = 1;
public static final int DONT_RETRY = 2; public static final int DONT_RETRY = 2;
public static final int DONT_RETRY_FATAL = 3; public static final int DONT_RETRY_FATAL = 3;
private static final int MSG_START = 0;
private static final int MSG_CANCEL = 1;
private static final int MSG_END_OF_SOURCE = 2;
private static final int MSG_IO_EXCEPTION = 3;
private static final int MSG_FATAL_ERROR = 4;
private final ExecutorService downloadExecutorService; private final ExecutorService downloadExecutorService;
private LoadTask<? extends Loadable> currentTask; private LoadTask<? extends Loadable> currentTask;
@ -150,7 +156,7 @@ public final class Loader implements LoaderErrorThrower {
* *
* @param <T> The type of the loadable. * @param <T> The type of the loadable.
* @param loadable The {@link Loadable} to load. * @param loadable The {@link Loadable} to load.
* @param callback A callback to called when the load ends. * @param callback A callback to be called when the load ends.
* @param defaultMinRetryCount The minimum number of times the load must be retried before * @param defaultMinRetryCount The minimum number of times the load must be retried before
* {@link #maybeThrowError()} will propagate an error. * {@link #maybeThrowError()} will propagate an error.
* @throws IllegalStateException If the calling thread does not have an associated {@link Looper}. * @throws IllegalStateException If the calling thread does not have an associated {@link Looper}.
@ -188,20 +194,28 @@ public final class Loader implements LoaderErrorThrower {
} }
/** /**
* Releases the {@link Loader}, running {@code postLoadAction} on its thread. This method should * Releases the {@link Loader}. This method should be called when the {@link Loader} is no longer
* be called when the {@link Loader} is no longer required. * required.
* *
* @param postLoadAction A {@link Runnable} to run on the loader's thread when * @param callback A callback to be called when the release ends. Will be called synchronously
* {@link Loadable#load()} is no longer running. * from this method if no load is in progress, or asynchronously once the load has been
* canceled otherwise. May be null.
* @return True if {@code callback} was called synchronously. False if it will be called
* asynchronously or if {@code callback} is null.
*/ */
public void release(Runnable postLoadAction) { public boolean release(ReleaseCallback callback) {
boolean callbackInvoked = false;
if (currentTask != null) { if (currentTask != null) {
currentTask.cancel(true); currentTask.cancel(true);
if (callback != null) {
downloadExecutorService.execute(new ReleaseTask(callback));
} }
if (postLoadAction != null) { } else if (callback != null) {
downloadExecutorService.execute(postLoadAction); callback.onLoaderReleased();
callbackInvoked = true;
} }
downloadExecutorService.shutdown(); downloadExecutorService.shutdown();
return callbackInvoked;
} }
// LoaderErrorThrower implementation. // LoaderErrorThrower implementation.
@ -228,6 +242,12 @@ public final class Loader implements LoaderErrorThrower {
private static final String TAG = "LoadTask"; private static final String TAG = "LoadTask";
private static final int MSG_START = 0;
private static final int MSG_CANCEL = 1;
private static final int MSG_END_OF_SOURCE = 2;
private static final int MSG_IO_EXCEPTION = 3;
private static final int MSG_FATAL_ERROR = 4;
private final T loadable; private final T loadable;
private final Loader.Callback<T> callback; private final Loader.Callback<T> callback;
public final int defaultMinRetryCount; public final int defaultMinRetryCount;
@ -390,4 +410,24 @@ public final class Loader implements LoaderErrorThrower {
} }
private static final class ReleaseTask extends Handler implements Runnable {
private final ReleaseCallback callback;
public ReleaseTask(ReleaseCallback callback) {
this.callback = callback;
}
@Override
public void run() {
sendEmptyMessage(0);
}
@Override
public void handleMessage(Message msg) {
callback.onLoaderReleased();
}
}
} }