Move sample processing to transformer thread

PiperOrigin-RevId: 491623586
This commit is contained in:
kimvde 2022-11-29 14:52:05 +00:00 committed by Rohit Singh
parent 568fa1e1fa
commit 09df56f31a
6 changed files with 177 additions and 64 deletions

View File

@ -39,7 +39,6 @@ import org.checkerframework.checker.nullness.qual.RequiresNonNull;
@Nullable private DecoderInputBuffer inputBuffer;
private boolean muxerWrapperTrackAdded;
private boolean isEnded;
public BaseSamplePipeline(
Format inputFormat,
@ -81,11 +80,6 @@ import org.checkerframework.checker.nullness.qual.RequiresNonNull;
return feedMuxer() || processDataUpToMuxer();
}
@Override
public boolean isEnded() {
return isEnded;
}
@Nullable
protected abstract DecoderInputBuffer dequeueInputBufferInternal() throws TransformationException;
@ -148,7 +142,6 @@ import org.checkerframework.checker.nullness.qual.RequiresNonNull;
if (isMuxerInputEnded()) {
muxerWrapper.endTrack(trackType);
isEnded = true;
return false;
}

View File

@ -16,7 +16,6 @@
package androidx.media3.transformer;
import static androidx.media3.common.util.Assertions.checkNotNull;
import static androidx.media3.exoplayer.DefaultLoadControl.DEFAULT_BUFFER_FOR_PLAYBACK_AFTER_REBUFFER_MS;
import static androidx.media3.exoplayer.DefaultLoadControl.DEFAULT_BUFFER_FOR_PLAYBACK_MS;
import static androidx.media3.exoplayer.DefaultLoadControl.DEFAULT_MAX_BUFFER_MS;
@ -55,11 +54,10 @@ import androidx.media3.exoplayer.video.VideoRendererEventListener;
void onAllTracksRegistered();
SamplePipeline onTrackAdded(Format format, long streamStartPositionUs, long streamOffsetUs)
SamplePipeline.Input onTrackAdded(
Format format, long streamStartPositionUs, long streamOffsetUs)
throws TransformationException;
void onEnded();
void onError(Exception e);
}
@ -111,6 +109,7 @@ import androidx.media3.exoplayer.video.VideoRendererEventListener;
public void start() {
player.setMediaItem(mediaItem);
player.prepare();
player.play();
}
public void release() {
@ -167,13 +166,6 @@ import androidx.media3.exoplayer.video.VideoRendererEventListener;
this.listener = listener;
}
@Override
public void onPlaybackStateChanged(int state) {
if (state == Player.STATE_ENDED) {
listener.onEnded();
}
}
@Override
public void onTimelineChanged(Timeline timeline, int reason) {
if (hasSentDuration) {
@ -184,7 +176,6 @@ import androidx.media3.exoplayer.video.VideoRendererEventListener;
if (!window.isPlaceholder) {
listener.onDurationMs(Util.usToMs(window.durationUs));
hasSentDuration = true;
checkNotNull(player).play();
}
}

View File

@ -45,7 +45,8 @@ import org.checkerframework.checker.nullness.qual.RequiresNonNull;
private boolean isTransformationRunning;
private long streamStartPositionUs;
private long streamOffsetUs;
private @MonotonicNonNull SamplePipeline samplePipeline;
private SamplePipeline.@MonotonicNonNull Input samplePipelineInput;
private boolean isEnded;
public ExoPlayerAssetLoaderRenderer(
int trackType,
@ -88,7 +89,7 @@ import org.checkerframework.checker.nullness.qual.RequiresNonNull;
@Override
public boolean isEnded() {
return samplePipeline != null && samplePipeline.isEnded();
return isEnded;
}
@Override
@ -98,7 +99,7 @@ import org.checkerframework.checker.nullness.qual.RequiresNonNull;
return;
}
while (samplePipeline.processData() || feedPipelineFromInput()) {}
while (feedPipelineFromInput()) {}
} catch (TransformationException e) {
isTransformationRunning = false;
assetLoaderListener.onError(e);
@ -127,16 +128,9 @@ import org.checkerframework.checker.nullness.qual.RequiresNonNull;
isTransformationRunning = false;
}
@Override
protected void onReset() {
if (samplePipeline != null) {
samplePipeline.release();
}
}
@EnsuresNonNullIf(expression = "samplePipeline", result = true)
@EnsuresNonNullIf(expression = "samplePipelineInput", result = true)
private boolean ensureConfigured() throws TransformationException {
if (samplePipeline != null) {
if (samplePipelineInput != null) {
return true;
}
@ -147,7 +141,7 @@ import org.checkerframework.checker.nullness.qual.RequiresNonNull;
return false;
}
Format inputFormat = checkNotNull(formatHolder.format);
samplePipeline =
samplePipelineInput =
assetLoaderListener.onTrackAdded(inputFormat, streamStartPositionUs, streamOffsetUs);
return true;
}
@ -156,11 +150,11 @@ import org.checkerframework.checker.nullness.qual.RequiresNonNull;
* Attempts to read input data and pass the input data to the sample pipeline.
*
* @return Whether it may be possible to read more data immediately by calling this method again.
* @throws TransformationException If a {@link SamplePipeline} problem occurs.
*/
@RequiresNonNull("samplePipeline")
private boolean feedPipelineFromInput() throws TransformationException {
@Nullable DecoderInputBuffer samplePipelineInputBuffer = samplePipeline.dequeueInputBuffer();
@RequiresNonNull("samplePipelineInput")
private boolean feedPipelineFromInput() {
@Nullable
DecoderInputBuffer samplePipelineInputBuffer = samplePipelineInput.dequeueInputBuffer();
if (samplePipelineInputBuffer == null) {
return false;
}
@ -171,11 +165,12 @@ import org.checkerframework.checker.nullness.qual.RequiresNonNull;
case C.RESULT_BUFFER_READ:
samplePipelineInputBuffer.flip();
if (samplePipelineInputBuffer.isEndOfStream()) {
samplePipeline.queueInputBuffer();
samplePipelineInput.queueInputBuffer();
isEnded = true;
return false;
}
mediaClock.updateTimeForTrackType(getTrackType(), samplePipelineInputBuffer.timeUs);
samplePipeline.queueInputBuffer();
samplePipelineInput.queueInputBuffer();
return true;
case C.RESULT_FORMAT_READ:
throw new IllegalStateException("Format changes are not supported.");

View File

@ -69,6 +69,7 @@ import org.checkerframework.checker.nullness.qual.RequiresNonNull;
private int trackCount;
private int trackFormatCount;
private boolean isReady;
private boolean isEnded;
private @C.TrackType int previousTrackType;
private long minTrackTimeUs;
private @MonotonicNonNull ScheduledFuture<?> abortScheduledFuture;
@ -216,9 +217,15 @@ import org.checkerframework.checker.nullness.qual.RequiresNonNull;
trackTypeToIndex.delete(trackType);
if (trackTypeToIndex.size() == 0) {
abortScheduledExecutorService.shutdownNow();
isEnded = true;
}
}
/** Returns whether all the tracks are {@linkplain #endTrack(int) ended}. */
public boolean isEnded() {
return isEnded;
}
/**
* Finishes writing the output and releases any resources associated with muxing.
*

View File

@ -26,6 +26,17 @@ import androidx.media3.decoder.DecoderInputBuffer;
*/
/* package */ interface SamplePipeline {
/** Input of a {@link SamplePipeline}. */
interface Input {
/** See {@link SamplePipeline#dequeueInputBuffer()}. */
@Nullable
DecoderInputBuffer dequeueInputBuffer();
/** See {@link SamplePipeline#queueInputBuffer()}. */
void queueInputBuffer();
}
/** A listener for the sample pipeline events. */
interface Listener {
@ -63,9 +74,6 @@ import androidx.media3.decoder.DecoderInputBuffer;
*/
boolean processData() throws TransformationException;
/** Returns whether the pipeline has ended. */
boolean isEnded();
/** Releases all resources held by the pipeline. */
void release();
}

View File

@ -17,6 +17,7 @@
package androidx.media3.transformer;
import static androidx.media3.common.util.Assertions.checkNotNull;
import static androidx.media3.common.util.Assertions.checkStateNotNull;
import static androidx.media3.transformer.TransformationException.ERROR_CODE_MUXING_FAILED;
import static androidx.media3.transformer.Transformer.PROGRESS_STATE_AVAILABLE;
import static androidx.media3.transformer.Transformer.PROGRESS_STATE_UNAVAILABLE;
@ -45,6 +46,7 @@ import androidx.media3.common.util.Clock;
import androidx.media3.common.util.ConditionVariable;
import androidx.media3.common.util.HandlerWrapper;
import androidx.media3.common.util.Util;
import androidx.media3.decoder.DecoderInputBuffer;
import androidx.media3.exoplayer.source.MediaSource;
import androidx.media3.extractor.metadata.mp4.SlowMotionData;
import com.google.common.collect.ImmutableList;
@ -52,6 +54,8 @@ import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.ArrayList;
import java.util.List;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
/* package */ final class TransformerInternal {
@ -81,7 +85,13 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
// Internal messages.
private static final int MSG_START = 0;
private static final int MSG_END = 1;
private static final int MSG_REGISTER_SAMPLE_PIPELINE = 1;
private static final int MSG_DEQUEUE_INPUT = 2;
private static final int MSG_QUEUE_INPUT = 3;
private static final int MSG_DRAIN_PIPELINES = 4;
private static final int MSG_END = 5;
private static final int DRAIN_PIPELINES_DELAY_MS = 10;
private final Context context;
private final TransformationRequest transformationRequest;
@ -97,15 +107,20 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
private final HandlerThread internalHandlerThread;
private final HandlerWrapper internalHandler;
private final ExoPlayerAssetLoader exoPlayerAssetLoader;
private final List<SamplePipeline> samplePipelines;
private final ConditionVariable dequeueBufferConditionVariable;
private final MuxerWrapper muxerWrapper;
private final ConditionVariable cancellingConditionVariable;
@Nullable private DecoderInputBuffer pendingInputBuffer;
private boolean isDrainingPipelines;
private @Transformer.ProgressState int progressState;
private long progressPositionMs;
private long durationMs;
private boolean released;
private @MonotonicNonNull RuntimeException cancelException;
private volatile boolean released;
public TransformerInternal(
Context context,
MediaItem mediaItem,
@ -151,6 +166,8 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
internalLooper,
componentListener,
clock);
samplePipelines = new ArrayList<>();
dequeueBufferConditionVariable = new ConditionVariable();
muxerWrapper =
new MuxerWrapper(
outputPath,
@ -190,11 +207,27 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
}
private boolean handleMessage(Message msg) {
// Handle end messages even if resources have been released to report release timeouts.
if (released && msg.what != MSG_END) {
return true;
}
try {
switch (msg.what) {
case MSG_START:
startInternal();
break;
case MSG_REGISTER_SAMPLE_PIPELINE:
samplePipelines.add((SamplePipeline) msg.obj);
break;
case MSG_DEQUEUE_INPUT:
dequeueInputInternal(/* samplePipelineIndex= */ msg.arg1);
break;
case MSG_QUEUE_INPUT:
queueInputInternal(/* samplePipelineIndex= */ msg.arg1);
break;
case MSG_DRAIN_PIPELINES:
drainPipelinesInternal();
break;
case MSG_END:
endInternal(
/* endReason= */ msg.arg1,
@ -203,6 +236,8 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
default:
return false;
}
} catch (TransformationException e) {
endInternal(END_REASON_ERROR, e);
} catch (RuntimeException e) {
endInternal(END_REASON_ERROR, TransformationException.createForUnexpected(e));
}
@ -213,6 +248,41 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
exoPlayerAssetLoader.start();
}
private void dequeueInputInternal(int samplePipelineIndex) throws TransformationException {
SamplePipeline samplePipeline = samplePipelines.get(samplePipelineIndex);
// The sample pipeline is drained before dequeuing input. It can't be done before queuing
// input because, if the pipeline is full, dequeuing input would forever return a null buffer.
// Draining the pipeline at regular intervals would be inefficient because a low interval could
// result in many no-op operations, and a high interval could slow down data queuing.
while (samplePipeline.processData()) {}
pendingInputBuffer = samplePipeline.dequeueInputBuffer();
dequeueBufferConditionVariable.open();
}
private void queueInputInternal(int samplePipelineIndex) throws TransformationException {
DecoderInputBuffer pendingInputBuffer = checkStateNotNull(this.pendingInputBuffer);
samplePipelines.get(samplePipelineIndex).queueInputBuffer();
if (pendingInputBuffer.isEndOfStream() && !isDrainingPipelines) {
internalHandler.sendEmptyMessageDelayed(MSG_DRAIN_PIPELINES, DRAIN_PIPELINES_DELAY_MS);
isDrainingPipelines = true;
}
}
private void drainPipelinesInternal() throws TransformationException {
for (int i = 0; i < samplePipelines.size(); i++) {
while (samplePipelines.get(i).processData()) {}
}
if (muxerWrapper.isEnded()) {
internalHandler
.obtainMessage(
MSG_END, END_REASON_COMPLETED, /* unused */ 0, /* transformationException */ null)
.sendToTarget();
} else {
internalHandler.sendEmptyMessageDelayed(MSG_DRAIN_PIPELINES, DRAIN_PIPELINES_DELAY_MS);
}
}
private void endInternal(
@EndReason int endReason, @Nullable TransformationException transformationException) {
@Nullable TransformationResult transformationResult = null;
@ -220,21 +290,34 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
@Nullable TransformationException releaseTransformationException = null;
if (!released) {
released = true;
// Make sure there is no dequeue action waiting on the asset loader thread to avoid a
// deadlock when releasing it.
pendingInputBuffer = null;
dequeueBufferConditionVariable.open();
try {
try {
exoPlayerAssetLoader.release();
if (endReason == END_REASON_COMPLETED) {
transformationResult =
new TransformationResult.Builder()
.setDurationMs(checkNotNull(muxerWrapper).getDurationMs())
.setAverageAudioBitrate(muxerWrapper.getTrackAverageBitrate(C.TRACK_TYPE_AUDIO))
.setAverageVideoBitrate(muxerWrapper.getTrackAverageBitrate(C.TRACK_TYPE_VIDEO))
.setVideoFrameCount(muxerWrapper.getTrackSampleCount(C.TRACK_TYPE_VIDEO))
.setFileSizeBytes(muxerWrapper.getCurrentOutputSizeBytes())
.build();
}
} finally {
muxerWrapper.release(forCancellation);
try {
for (int i = 0; i < samplePipelines.size(); i++) {
samplePipelines.get(i).release();
}
if (endReason == END_REASON_COMPLETED) {
transformationResult =
new TransformationResult.Builder()
.setDurationMs(checkNotNull(muxerWrapper).getDurationMs())
.setAverageAudioBitrate(
muxerWrapper.getTrackAverageBitrate(C.TRACK_TYPE_AUDIO))
.setAverageVideoBitrate(
muxerWrapper.getTrackAverageBitrate(C.TRACK_TYPE_VIDEO))
.setVideoFrameCount(muxerWrapper.getTrackSampleCount(C.TRACK_TYPE_VIDEO))
.setFileSizeBytes(muxerWrapper.getCurrentOutputSizeBytes())
.build();
}
} finally {
muxerWrapper.release(forCancellation);
}
}
} catch (Muxer.MuxerException e) {
releaseTransformationException =
@ -273,6 +356,8 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
private final MediaItem mediaItem;
private final FallbackListener fallbackListener;
private int tracksAddedCount;
private long lastProgressUpdateMs;
private long lastProgressPositionMs;
@ -315,10 +400,16 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
}
@Override
public SamplePipeline onTrackAdded(
public SamplePipeline.Input onTrackAdded(
Format format, long streamStartPositionUs, long streamOffsetUs)
throws TransformationException {
return getSamplePipeline(format, streamStartPositionUs, streamOffsetUs);
SamplePipeline samplePipeline =
getSamplePipeline(format, streamStartPositionUs, streamOffsetUs);
internalHandler.obtainMessage(MSG_REGISTER_SAMPLE_PIPELINE, samplePipeline).sendToTarget();
int samplePipelineIndex = tracksAddedCount;
tracksAddedCount++;
return new SamplePipelineInput(samplePipelineIndex);
}
@Override
@ -335,14 +426,6 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
onTransformationError(transformationException);
}
@Override
public void onEnded() {
internalHandler
.obtainMessage(
MSG_END, END_REASON_COMPLETED, /* unused */ 0, /* transformationException */ null)
.sendToTarget();
}
// SamplePipeline.Listener implementation.
@Override
@ -486,5 +569,41 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
}
return false;
}
private class SamplePipelineInput implements SamplePipeline.Input {
private final int samplePipelineIndex;
public SamplePipelineInput(int samplePipelineIndex) {
this.samplePipelineIndex = samplePipelineIndex;
}
@Nullable
@Override
public DecoderInputBuffer dequeueInputBuffer() {
if (released) {
// Make sure there is no dequeue action waiting on the asset loader thread when it is
// being released to avoid a deadlock.
return null;
}
// TODO(b/252537210): Reduce the number of thread hops (for example by adding a queue at the
// start of thesample pipelines). Having 2 thread hops per sample (one for dequeuing and
// one for queuing) makes transmuxing slower than it used to be.
internalHandler
.obtainMessage(MSG_DEQUEUE_INPUT, samplePipelineIndex, /* unused */ 0)
.sendToTarget();
clock.onThreadBlocked();
dequeueBufferConditionVariable.blockUninterruptible();
dequeueBufferConditionVariable.close();
return pendingInputBuffer;
}
@Override
public void queueInputBuffer() {
internalHandler
.obtainMessage(MSG_QUEUE_INPUT, samplePipelineIndex, /* unused */ 0)
.sendToTarget();
}
}
}
}