From 09df56f31a7d7464f57fa70310bcc10602f122fd Mon Sep 17 00:00:00 2001 From: kimvde Date: Tue, 29 Nov 2022 14:52:05 +0000 Subject: [PATCH] Move sample processing to transformer thread PiperOrigin-RevId: 491623586 --- .../transformer/BaseSamplePipeline.java | 7 - .../transformer/ExoPlayerAssetLoader.java | 15 +- .../ExoPlayerAssetLoaderRenderer.java | 33 ++-- .../media3/transformer/MuxerWrapper.java | 7 + .../media3/transformer/SamplePipeline.java | 14 +- .../transformer/TransformerInternal.java | 165 +++++++++++++++--- 6 files changed, 177 insertions(+), 64 deletions(-) diff --git a/libraries/transformer/src/main/java/androidx/media3/transformer/BaseSamplePipeline.java b/libraries/transformer/src/main/java/androidx/media3/transformer/BaseSamplePipeline.java index 999f4cbe1b..a81dbb3246 100644 --- a/libraries/transformer/src/main/java/androidx/media3/transformer/BaseSamplePipeline.java +++ b/libraries/transformer/src/main/java/androidx/media3/transformer/BaseSamplePipeline.java @@ -39,7 +39,6 @@ import org.checkerframework.checker.nullness.qual.RequiresNonNull; @Nullable private DecoderInputBuffer inputBuffer; private boolean muxerWrapperTrackAdded; - private boolean isEnded; public BaseSamplePipeline( Format inputFormat, @@ -81,11 +80,6 @@ import org.checkerframework.checker.nullness.qual.RequiresNonNull; return feedMuxer() || processDataUpToMuxer(); } - @Override - public boolean isEnded() { - return isEnded; - } - @Nullable protected abstract DecoderInputBuffer dequeueInputBufferInternal() throws TransformationException; @@ -148,7 +142,6 @@ import org.checkerframework.checker.nullness.qual.RequiresNonNull; if (isMuxerInputEnded()) { muxerWrapper.endTrack(trackType); - isEnded = true; return false; } diff --git a/libraries/transformer/src/main/java/androidx/media3/transformer/ExoPlayerAssetLoader.java b/libraries/transformer/src/main/java/androidx/media3/transformer/ExoPlayerAssetLoader.java index 400ffeb514..e4ad0385a3 100644 --- a/libraries/transformer/src/main/java/androidx/media3/transformer/ExoPlayerAssetLoader.java +++ b/libraries/transformer/src/main/java/androidx/media3/transformer/ExoPlayerAssetLoader.java @@ -16,7 +16,6 @@ package androidx.media3.transformer; -import static androidx.media3.common.util.Assertions.checkNotNull; import static androidx.media3.exoplayer.DefaultLoadControl.DEFAULT_BUFFER_FOR_PLAYBACK_AFTER_REBUFFER_MS; import static androidx.media3.exoplayer.DefaultLoadControl.DEFAULT_BUFFER_FOR_PLAYBACK_MS; import static androidx.media3.exoplayer.DefaultLoadControl.DEFAULT_MAX_BUFFER_MS; @@ -55,11 +54,10 @@ import androidx.media3.exoplayer.video.VideoRendererEventListener; void onAllTracksRegistered(); - SamplePipeline onTrackAdded(Format format, long streamStartPositionUs, long streamOffsetUs) + SamplePipeline.Input onTrackAdded( + Format format, long streamStartPositionUs, long streamOffsetUs) throws TransformationException; - void onEnded(); - void onError(Exception e); } @@ -111,6 +109,7 @@ import androidx.media3.exoplayer.video.VideoRendererEventListener; public void start() { player.setMediaItem(mediaItem); player.prepare(); + player.play(); } public void release() { @@ -167,13 +166,6 @@ import androidx.media3.exoplayer.video.VideoRendererEventListener; this.listener = listener; } - @Override - public void onPlaybackStateChanged(int state) { - if (state == Player.STATE_ENDED) { - listener.onEnded(); - } - } - @Override public void onTimelineChanged(Timeline timeline, int reason) { if (hasSentDuration) { @@ -184,7 +176,6 @@ import androidx.media3.exoplayer.video.VideoRendererEventListener; if (!window.isPlaceholder) { listener.onDurationMs(Util.usToMs(window.durationUs)); hasSentDuration = true; - checkNotNull(player).play(); } } diff --git a/libraries/transformer/src/main/java/androidx/media3/transformer/ExoPlayerAssetLoaderRenderer.java b/libraries/transformer/src/main/java/androidx/media3/transformer/ExoPlayerAssetLoaderRenderer.java index ec913facc0..e541ed933f 100644 --- a/libraries/transformer/src/main/java/androidx/media3/transformer/ExoPlayerAssetLoaderRenderer.java +++ b/libraries/transformer/src/main/java/androidx/media3/transformer/ExoPlayerAssetLoaderRenderer.java @@ -45,7 +45,8 @@ import org.checkerframework.checker.nullness.qual.RequiresNonNull; private boolean isTransformationRunning; private long streamStartPositionUs; private long streamOffsetUs; - private @MonotonicNonNull SamplePipeline samplePipeline; + private SamplePipeline.@MonotonicNonNull Input samplePipelineInput; + private boolean isEnded; public ExoPlayerAssetLoaderRenderer( int trackType, @@ -88,7 +89,7 @@ import org.checkerframework.checker.nullness.qual.RequiresNonNull; @Override public boolean isEnded() { - return samplePipeline != null && samplePipeline.isEnded(); + return isEnded; } @Override @@ -98,7 +99,7 @@ import org.checkerframework.checker.nullness.qual.RequiresNonNull; return; } - while (samplePipeline.processData() || feedPipelineFromInput()) {} + while (feedPipelineFromInput()) {} } catch (TransformationException e) { isTransformationRunning = false; assetLoaderListener.onError(e); @@ -127,16 +128,9 @@ import org.checkerframework.checker.nullness.qual.RequiresNonNull; isTransformationRunning = false; } - @Override - protected void onReset() { - if (samplePipeline != null) { - samplePipeline.release(); - } - } - - @EnsuresNonNullIf(expression = "samplePipeline", result = true) + @EnsuresNonNullIf(expression = "samplePipelineInput", result = true) private boolean ensureConfigured() throws TransformationException { - if (samplePipeline != null) { + if (samplePipelineInput != null) { return true; } @@ -147,7 +141,7 @@ import org.checkerframework.checker.nullness.qual.RequiresNonNull; return false; } Format inputFormat = checkNotNull(formatHolder.format); - samplePipeline = + samplePipelineInput = assetLoaderListener.onTrackAdded(inputFormat, streamStartPositionUs, streamOffsetUs); return true; } @@ -156,11 +150,11 @@ import org.checkerframework.checker.nullness.qual.RequiresNonNull; * Attempts to read input data and pass the input data to the sample pipeline. * * @return Whether it may be possible to read more data immediately by calling this method again. - * @throws TransformationException If a {@link SamplePipeline} problem occurs. */ - @RequiresNonNull("samplePipeline") - private boolean feedPipelineFromInput() throws TransformationException { - @Nullable DecoderInputBuffer samplePipelineInputBuffer = samplePipeline.dequeueInputBuffer(); + @RequiresNonNull("samplePipelineInput") + private boolean feedPipelineFromInput() { + @Nullable + DecoderInputBuffer samplePipelineInputBuffer = samplePipelineInput.dequeueInputBuffer(); if (samplePipelineInputBuffer == null) { return false; } @@ -171,11 +165,12 @@ import org.checkerframework.checker.nullness.qual.RequiresNonNull; case C.RESULT_BUFFER_READ: samplePipelineInputBuffer.flip(); if (samplePipelineInputBuffer.isEndOfStream()) { - samplePipeline.queueInputBuffer(); + samplePipelineInput.queueInputBuffer(); + isEnded = true; return false; } mediaClock.updateTimeForTrackType(getTrackType(), samplePipelineInputBuffer.timeUs); - samplePipeline.queueInputBuffer(); + samplePipelineInput.queueInputBuffer(); return true; case C.RESULT_FORMAT_READ: throw new IllegalStateException("Format changes are not supported."); diff --git a/libraries/transformer/src/main/java/androidx/media3/transformer/MuxerWrapper.java b/libraries/transformer/src/main/java/androidx/media3/transformer/MuxerWrapper.java index 91a4ddb58c..1b93f7ca7f 100644 --- a/libraries/transformer/src/main/java/androidx/media3/transformer/MuxerWrapper.java +++ b/libraries/transformer/src/main/java/androidx/media3/transformer/MuxerWrapper.java @@ -69,6 +69,7 @@ import org.checkerframework.checker.nullness.qual.RequiresNonNull; private int trackCount; private int trackFormatCount; private boolean isReady; + private boolean isEnded; private @C.TrackType int previousTrackType; private long minTrackTimeUs; private @MonotonicNonNull ScheduledFuture abortScheduledFuture; @@ -216,9 +217,15 @@ import org.checkerframework.checker.nullness.qual.RequiresNonNull; trackTypeToIndex.delete(trackType); if (trackTypeToIndex.size() == 0) { abortScheduledExecutorService.shutdownNow(); + isEnded = true; } } + /** Returns whether all the tracks are {@linkplain #endTrack(int) ended}. */ + public boolean isEnded() { + return isEnded; + } + /** * Finishes writing the output and releases any resources associated with muxing. * diff --git a/libraries/transformer/src/main/java/androidx/media3/transformer/SamplePipeline.java b/libraries/transformer/src/main/java/androidx/media3/transformer/SamplePipeline.java index 1af278cf4d..a93a32952f 100644 --- a/libraries/transformer/src/main/java/androidx/media3/transformer/SamplePipeline.java +++ b/libraries/transformer/src/main/java/androidx/media3/transformer/SamplePipeline.java @@ -26,6 +26,17 @@ import androidx.media3.decoder.DecoderInputBuffer; */ /* package */ interface SamplePipeline { + /** Input of a {@link SamplePipeline}. */ + interface Input { + + /** See {@link SamplePipeline#dequeueInputBuffer()}. */ + @Nullable + DecoderInputBuffer dequeueInputBuffer(); + + /** See {@link SamplePipeline#queueInputBuffer()}. */ + void queueInputBuffer(); + } + /** A listener for the sample pipeline events. */ interface Listener { @@ -63,9 +74,6 @@ import androidx.media3.decoder.DecoderInputBuffer; */ boolean processData() throws TransformationException; - /** Returns whether the pipeline has ended. */ - boolean isEnded(); - /** Releases all resources held by the pipeline. */ void release(); } diff --git a/libraries/transformer/src/main/java/androidx/media3/transformer/TransformerInternal.java b/libraries/transformer/src/main/java/androidx/media3/transformer/TransformerInternal.java index fa8593f0cd..f332fa6ff2 100644 --- a/libraries/transformer/src/main/java/androidx/media3/transformer/TransformerInternal.java +++ b/libraries/transformer/src/main/java/androidx/media3/transformer/TransformerInternal.java @@ -17,6 +17,7 @@ package androidx.media3.transformer; import static androidx.media3.common.util.Assertions.checkNotNull; +import static androidx.media3.common.util.Assertions.checkStateNotNull; import static androidx.media3.transformer.TransformationException.ERROR_CODE_MUXING_FAILED; import static androidx.media3.transformer.Transformer.PROGRESS_STATE_AVAILABLE; import static androidx.media3.transformer.Transformer.PROGRESS_STATE_UNAVAILABLE; @@ -45,6 +46,7 @@ import androidx.media3.common.util.Clock; import androidx.media3.common.util.ConditionVariable; import androidx.media3.common.util.HandlerWrapper; import androidx.media3.common.util.Util; +import androidx.media3.decoder.DecoderInputBuffer; import androidx.media3.exoplayer.source.MediaSource; import androidx.media3.extractor.metadata.mp4.SlowMotionData; import com.google.common.collect.ImmutableList; @@ -52,6 +54,8 @@ import java.lang.annotation.Documented; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import java.util.ArrayList; +import java.util.List; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; /* package */ final class TransformerInternal { @@ -81,7 +85,13 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; // Internal messages. private static final int MSG_START = 0; - private static final int MSG_END = 1; + private static final int MSG_REGISTER_SAMPLE_PIPELINE = 1; + private static final int MSG_DEQUEUE_INPUT = 2; + private static final int MSG_QUEUE_INPUT = 3; + private static final int MSG_DRAIN_PIPELINES = 4; + private static final int MSG_END = 5; + + private static final int DRAIN_PIPELINES_DELAY_MS = 10; private final Context context; private final TransformationRequest transformationRequest; @@ -97,15 +107,20 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; private final HandlerThread internalHandlerThread; private final HandlerWrapper internalHandler; private final ExoPlayerAssetLoader exoPlayerAssetLoader; + private final List samplePipelines; + private final ConditionVariable dequeueBufferConditionVariable; private final MuxerWrapper muxerWrapper; private final ConditionVariable cancellingConditionVariable; + @Nullable private DecoderInputBuffer pendingInputBuffer; + private boolean isDrainingPipelines; private @Transformer.ProgressState int progressState; private long progressPositionMs; private long durationMs; - private boolean released; private @MonotonicNonNull RuntimeException cancelException; + private volatile boolean released; + public TransformerInternal( Context context, MediaItem mediaItem, @@ -151,6 +166,8 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; internalLooper, componentListener, clock); + samplePipelines = new ArrayList<>(); + dequeueBufferConditionVariable = new ConditionVariable(); muxerWrapper = new MuxerWrapper( outputPath, @@ -190,11 +207,27 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; } private boolean handleMessage(Message msg) { + // Handle end messages even if resources have been released to report release timeouts. + if (released && msg.what != MSG_END) { + return true; + } try { switch (msg.what) { case MSG_START: startInternal(); break; + case MSG_REGISTER_SAMPLE_PIPELINE: + samplePipelines.add((SamplePipeline) msg.obj); + break; + case MSG_DEQUEUE_INPUT: + dequeueInputInternal(/* samplePipelineIndex= */ msg.arg1); + break; + case MSG_QUEUE_INPUT: + queueInputInternal(/* samplePipelineIndex= */ msg.arg1); + break; + case MSG_DRAIN_PIPELINES: + drainPipelinesInternal(); + break; case MSG_END: endInternal( /* endReason= */ msg.arg1, @@ -203,6 +236,8 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; default: return false; } + } catch (TransformationException e) { + endInternal(END_REASON_ERROR, e); } catch (RuntimeException e) { endInternal(END_REASON_ERROR, TransformationException.createForUnexpected(e)); } @@ -213,6 +248,41 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; exoPlayerAssetLoader.start(); } + private void dequeueInputInternal(int samplePipelineIndex) throws TransformationException { + SamplePipeline samplePipeline = samplePipelines.get(samplePipelineIndex); + // The sample pipeline is drained before dequeuing input. It can't be done before queuing + // input because, if the pipeline is full, dequeuing input would forever return a null buffer. + // Draining the pipeline at regular intervals would be inefficient because a low interval could + // result in many no-op operations, and a high interval could slow down data queuing. + while (samplePipeline.processData()) {} + pendingInputBuffer = samplePipeline.dequeueInputBuffer(); + dequeueBufferConditionVariable.open(); + } + + private void queueInputInternal(int samplePipelineIndex) throws TransformationException { + DecoderInputBuffer pendingInputBuffer = checkStateNotNull(this.pendingInputBuffer); + samplePipelines.get(samplePipelineIndex).queueInputBuffer(); + if (pendingInputBuffer.isEndOfStream() && !isDrainingPipelines) { + internalHandler.sendEmptyMessageDelayed(MSG_DRAIN_PIPELINES, DRAIN_PIPELINES_DELAY_MS); + isDrainingPipelines = true; + } + } + + private void drainPipelinesInternal() throws TransformationException { + for (int i = 0; i < samplePipelines.size(); i++) { + while (samplePipelines.get(i).processData()) {} + } + + if (muxerWrapper.isEnded()) { + internalHandler + .obtainMessage( + MSG_END, END_REASON_COMPLETED, /* unused */ 0, /* transformationException */ null) + .sendToTarget(); + } else { + internalHandler.sendEmptyMessageDelayed(MSG_DRAIN_PIPELINES, DRAIN_PIPELINES_DELAY_MS); + } + } + private void endInternal( @EndReason int endReason, @Nullable TransformationException transformationException) { @Nullable TransformationResult transformationResult = null; @@ -220,21 +290,34 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; @Nullable TransformationException releaseTransformationException = null; if (!released) { released = true; + + // Make sure there is no dequeue action waiting on the asset loader thread to avoid a + // deadlock when releasing it. + pendingInputBuffer = null; + dequeueBufferConditionVariable.open(); try { try { exoPlayerAssetLoader.release(); - if (endReason == END_REASON_COMPLETED) { - transformationResult = - new TransformationResult.Builder() - .setDurationMs(checkNotNull(muxerWrapper).getDurationMs()) - .setAverageAudioBitrate(muxerWrapper.getTrackAverageBitrate(C.TRACK_TYPE_AUDIO)) - .setAverageVideoBitrate(muxerWrapper.getTrackAverageBitrate(C.TRACK_TYPE_VIDEO)) - .setVideoFrameCount(muxerWrapper.getTrackSampleCount(C.TRACK_TYPE_VIDEO)) - .setFileSizeBytes(muxerWrapper.getCurrentOutputSizeBytes()) - .build(); - } } finally { - muxerWrapper.release(forCancellation); + try { + for (int i = 0; i < samplePipelines.size(); i++) { + samplePipelines.get(i).release(); + } + if (endReason == END_REASON_COMPLETED) { + transformationResult = + new TransformationResult.Builder() + .setDurationMs(checkNotNull(muxerWrapper).getDurationMs()) + .setAverageAudioBitrate( + muxerWrapper.getTrackAverageBitrate(C.TRACK_TYPE_AUDIO)) + .setAverageVideoBitrate( + muxerWrapper.getTrackAverageBitrate(C.TRACK_TYPE_VIDEO)) + .setVideoFrameCount(muxerWrapper.getTrackSampleCount(C.TRACK_TYPE_VIDEO)) + .setFileSizeBytes(muxerWrapper.getCurrentOutputSizeBytes()) + .build(); + } + } finally { + muxerWrapper.release(forCancellation); + } } } catch (Muxer.MuxerException e) { releaseTransformationException = @@ -273,6 +356,8 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; private final MediaItem mediaItem; private final FallbackListener fallbackListener; + + private int tracksAddedCount; private long lastProgressUpdateMs; private long lastProgressPositionMs; @@ -315,10 +400,16 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; } @Override - public SamplePipeline onTrackAdded( + public SamplePipeline.Input onTrackAdded( Format format, long streamStartPositionUs, long streamOffsetUs) throws TransformationException { - return getSamplePipeline(format, streamStartPositionUs, streamOffsetUs); + SamplePipeline samplePipeline = + getSamplePipeline(format, streamStartPositionUs, streamOffsetUs); + internalHandler.obtainMessage(MSG_REGISTER_SAMPLE_PIPELINE, samplePipeline).sendToTarget(); + + int samplePipelineIndex = tracksAddedCount; + tracksAddedCount++; + return new SamplePipelineInput(samplePipelineIndex); } @Override @@ -335,14 +426,6 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; onTransformationError(transformationException); } - @Override - public void onEnded() { - internalHandler - .obtainMessage( - MSG_END, END_REASON_COMPLETED, /* unused */ 0, /* transformationException */ null) - .sendToTarget(); - } - // SamplePipeline.Listener implementation. @Override @@ -486,5 +569,41 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; } return false; } + + private class SamplePipelineInput implements SamplePipeline.Input { + + private final int samplePipelineIndex; + + public SamplePipelineInput(int samplePipelineIndex) { + this.samplePipelineIndex = samplePipelineIndex; + } + + @Nullable + @Override + public DecoderInputBuffer dequeueInputBuffer() { + if (released) { + // Make sure there is no dequeue action waiting on the asset loader thread when it is + // being released to avoid a deadlock. + return null; + } + // TODO(b/252537210): Reduce the number of thread hops (for example by adding a queue at the + // start of thesample pipelines). Having 2 thread hops per sample (one for dequeuing and + // one for queuing) makes transmuxing slower than it used to be. + internalHandler + .obtainMessage(MSG_DEQUEUE_INPUT, samplePipelineIndex, /* unused */ 0) + .sendToTarget(); + clock.onThreadBlocked(); + dequeueBufferConditionVariable.blockUninterruptible(); + dequeueBufferConditionVariable.close(); + return pendingInputBuffer; + } + + @Override + public void queueInputBuffer() { + internalHandler + .obtainMessage(MSG_QUEUE_INPUT, samplePipelineIndex, /* unused */ 0) + .sendToTarget(); + } + } } }