From b2ab4393a487d3163ba28c4c8b95c07f91f06c3d Mon Sep 17 00:00:00 2001 From: claincly Date: Wed, 1 Feb 2023 14:14:54 +0000 Subject: [PATCH] Support flushing in FrameProcessor Flushing resets all the texture processors within the `FrameProcessor`. This includes: - At the back, the FinalMatrixTextureProcessorWrapper, and its MatrixTextureProcessor - At the front, the ExternalTextureManager - All the texture processors in between - All the ChainingGlTextureProcessorListeners in between texture processors - All the internal states in the aforementioned components The flush process follows the order, from `GlEffectsFrameProcessor.flush()` 1. Flush the `FrameProcessingTaskExecutor`, so that after it returns, all tasks queued before calling `flush()` completes 2. Post to `FrameProcessingTaskExecutor`, to flush the `FinalMatrixTextureProcessorWrapper` 3. Flushing the `FinalMatrixTextureProcessorWrapper` will propagate flushing through, via the `ChainingGlTextureProcessorListener` Startblock: has LGTM from christosts and then add reviewer andrewlewis PiperOrigin-RevId: 506296469 --- .../demo/transformer/MediaPipeProcessor.java | 6 ++ .../exoplayer2/util/FrameProcessor.java | 10 +++ .../video/MediaCodecVideoRenderer.java | 22 +++++ ...EffectsFrameProcessorFrameReleaseTest.java | 5 ++ .../ChainingGlTextureProcessorListener.java | 7 ++ .../effect/ExternalTextureManager.java | 87 ++++++++++++++----- .../FinalMatrixTextureProcessorWrapper.java | 13 ++- .../effect/FrameCacheTextureProcessor.java | 10 +++ .../effect/FrameProcessingTaskExecutor.java | 35 +++++++- .../effect/GlEffectsFrameProcessor.java | 15 ++++ .../exoplayer2/effect/GlTextureProcessor.java | 17 ++++ .../effect/SingleFrameGlTextureProcessor.java | 8 ++ 12 files changed, 210 insertions(+), 25 deletions(-) diff --git a/demos/transformer/src/withMediaPipe/java/androidx/media3/demo/transformer/MediaPipeProcessor.java b/demos/transformer/src/withMediaPipe/java/androidx/media3/demo/transformer/MediaPipeProcessor.java index 86dc3b8bba..b3576a48cb 100644 --- a/demos/transformer/src/withMediaPipe/java/androidx/media3/demo/transformer/MediaPipeProcessor.java +++ b/demos/transformer/src/withMediaPipe/java/androidx/media3/demo/transformer/MediaPipeProcessor.java @@ -231,6 +231,12 @@ import java.util.concurrent.Future; } } + @Override + public void flush() { + // TODO(b/238302341) Support seeking in MediaPipeProcessor. + throw new UnsupportedOperationException(); + } + @Override public void release() { if (isSingleFrameGraph) { diff --git a/library/common/src/main/java/com/google/android/exoplayer2/util/FrameProcessor.java b/library/common/src/main/java/com/google/android/exoplayer2/util/FrameProcessor.java index 7d30e5da05..06ebd6f6ea 100644 --- a/library/common/src/main/java/com/google/android/exoplayer2/util/FrameProcessor.java +++ b/library/common/src/main/java/com/google/android/exoplayer2/util/FrameProcessor.java @@ -210,6 +210,16 @@ public interface FrameProcessor { */ void signalEndOfInput(); + /** + * Flushes the {@code FrameProcessor}. + * + *

All the frames that are {@linkplain #registerInputFrame() registered} prior to calling this + * method are no longer considered to be registered when this method returns. + * + *

{@link Listener} methods invoked prior to calling this method should be ignored. + */ + void flush(); + /** * Releases all resources. * diff --git a/library/core/src/main/java/com/google/android/exoplayer2/video/MediaCodecVideoRenderer.java b/library/core/src/main/java/com/google/android/exoplayer2/video/MediaCodecVideoRenderer.java index 94dd68940f..d45aac650e 100644 --- a/library/core/src/main/java/com/google/android/exoplayer2/video/MediaCodecVideoRenderer.java +++ b/library/core/src/main/java/com/google/android/exoplayer2/video/MediaCodecVideoRenderer.java @@ -559,6 +559,9 @@ public class MediaCodecVideoRenderer extends MediaCodecRenderer { @Override protected void onPositionReset(long positionUs, boolean joining) throws ExoPlaybackException { super.onPositionReset(positionUs, joining); + if (frameProcessorManager.isEnabled()) { + frameProcessorManager.flush(); + } clearRenderedFirstFrame(); frameReleaseHelper.onPositionReset(); lastBufferPresentationTimeUs = C.TIME_UNSET; @@ -1906,6 +1909,25 @@ public class MediaCodecVideoRenderer extends MediaCodecRenderer { return releasedLastFrame; } + /** + * Flushes the {@link FrameProcessor}. + * + *

Caller must ensure frame processing {@linkplain #isEnabled() is enabled} before calling + * this method. + */ + public void flush() { + checkStateNotNull(frameProcessor); + frameProcessor.flush(); + processedFramesTimestampsUs.clear(); + handler.removeCallbacksAndMessages(/* token= */ null); + + if (registeredLastFrame) { + registeredLastFrame = false; + processedLastFrame = false; + releasedLastFrame = false; + } + } + /** * Tries to enable frame processing. * diff --git a/library/effect/src/androidTest/java/com/google/android/exoplayer2/effect/GlEffectsFrameProcessorFrameReleaseTest.java b/library/effect/src/androidTest/java/com/google/android/exoplayer2/effect/GlEffectsFrameProcessorFrameReleaseTest.java index a1c1901716..1f8a52efea 100644 --- a/library/effect/src/androidTest/java/com/google/android/exoplayer2/effect/GlEffectsFrameProcessorFrameReleaseTest.java +++ b/library/effect/src/androidTest/java/com/google/android/exoplayer2/effect/GlEffectsFrameProcessorFrameReleaseTest.java @@ -420,6 +420,11 @@ public final class GlEffectsFrameProcessorFrameReleaseTest { throw new UnsupportedOperationException(); } + @Override + public void flush() { + throw new UnsupportedOperationException(); + } + @Override public void release() { // Do nothing as destroying the OpenGL context destroys the texture. diff --git a/library/effect/src/main/java/com/google/android/exoplayer2/effect/ChainingGlTextureProcessorListener.java b/library/effect/src/main/java/com/google/android/exoplayer2/effect/ChainingGlTextureProcessorListener.java index 3b130d4482..2891640c9c 100644 --- a/library/effect/src/main/java/com/google/android/exoplayer2/effect/ChainingGlTextureProcessorListener.java +++ b/library/effect/src/main/java/com/google/android/exoplayer2/effect/ChainingGlTextureProcessorListener.java @@ -91,6 +91,13 @@ import java.util.Queue; () -> producingGlTextureProcessor.releaseOutputFrame(inputTexture)); } + @Override + public synchronized void onFlush() { + consumingGlTextureProcessorInputCapacity = 0; + availableFrames.clear(); + frameProcessingTaskExecutor.submit(producingGlTextureProcessor::flush); + } + @Override public synchronized void onOutputFrameAvailable( TextureInfo outputTexture, long presentationTimeUs) { diff --git a/library/effect/src/main/java/com/google/android/exoplayer2/effect/ExternalTextureManager.java b/library/effect/src/main/java/com/google/android/exoplayer2/effect/ExternalTextureManager.java index c4ae3e0efd..4dc0fb22f4 100644 --- a/library/effect/src/main/java/com/google/android/exoplayer2/effect/ExternalTextureManager.java +++ b/library/effect/src/main/java/com/google/android/exoplayer2/effect/ExternalTextureManager.java @@ -43,11 +43,14 @@ import java.util.concurrent.atomic.AtomicInteger; private final float[] textureTransformMatrix; private final Queue pendingFrames; - // Incremented on any thread when a frame becomes available on the surfaceTexture, decremented on - // the GL thread only. - private final AtomicInteger availableFrameCount; // Incremented on any thread, decremented on the GL thread only. private final AtomicInteger externalTextureProcessorInputCapacity; + // Counts the frames that are registered before flush but are made available after flush. + // Read and written only on GL thread. + private int numberOfFramesToDropOnBecomingAvailable; + + // Read and written only on GL thread. + private int availableFrameCount; // Set to true on any thread. Read on the GL thread only. private volatile boolean inputStreamEnded; @@ -55,6 +58,8 @@ import java.util.concurrent.atomic.AtomicInteger; // Set to null on any thread. Read and set to non-null on the GL thread only. @Nullable private volatile FrameInfo currentFrame; + @Nullable private volatile FrameProcessingTask onFlushCompleteTask; + private long previousStreamOffsetUs; /** @@ -79,30 +84,53 @@ import java.util.concurrent.atomic.AtomicInteger; surfaceTexture = new SurfaceTexture(externalTexId); textureTransformMatrix = new float[16]; pendingFrames = new ConcurrentLinkedQueue<>(); - availableFrameCount = new AtomicInteger(); externalTextureProcessorInputCapacity = new AtomicInteger(); + previousStreamOffsetUs = C.TIME_UNSET; } public SurfaceTexture getSurfaceTexture() { surfaceTexture.setOnFrameAvailableListener( - unused -> { - availableFrameCount.getAndIncrement(); - frameProcessingTaskExecutor.submit(this::maybeQueueFrameToExternalTextureProcessor); - }); + unused -> + frameProcessingTaskExecutor.submit( + () -> { + if (numberOfFramesToDropOnBecomingAvailable > 0) { + numberOfFramesToDropOnBecomingAvailable--; + surfaceTexture.updateTexImage(); + } else { + availableFrameCount++; + maybeQueueFrameToExternalTextureProcessor(); + } + })); return surfaceTexture; } @Override public void onReadyToAcceptInputFrame() { - externalTextureProcessorInputCapacity.getAndIncrement(); - frameProcessingTaskExecutor.submit(this::maybeQueueFrameToExternalTextureProcessor); + frameProcessingTaskExecutor.submit( + () -> { + externalTextureProcessorInputCapacity.incrementAndGet(); + maybeQueueFrameToExternalTextureProcessor(); + }); } @Override public void onInputFrameProcessed(TextureInfo inputTexture) { - currentFrame = null; - frameProcessingTaskExecutor.submit(this::maybeQueueFrameToExternalTextureProcessor); + frameProcessingTaskExecutor.submit( + () -> { + currentFrame = null; + maybeQueueFrameToExternalTextureProcessor(); + }); + } + + /** Sets the task to run on completing flushing, or {@code null} to clear any task. */ + public void setOnFlushCompleteListener(@Nullable FrameProcessingTask task) { + onFlushCompleteTask = task; + } + + @Override + public void onFlush() { + frameProcessingTaskExecutor.submit(this::flush); } /** @@ -131,32 +159,51 @@ import java.util.concurrent.atomic.AtomicInteger; * * @see FrameProcessor#signalEndOfInput() */ - @WorkerThread public void signalEndOfInput() { - inputStreamEnded = true; - if (pendingFrames.isEmpty() && currentFrame == null) { - externalTextureProcessor.signalEndOfCurrentInputStream(); - } + frameProcessingTaskExecutor.submit( + () -> { + inputStreamEnded = true; + if (pendingFrames.isEmpty() && currentFrame == null) { + externalTextureProcessor.signalEndOfCurrentInputStream(); + } + }); } public void release() { surfaceTexture.release(); } + @WorkerThread + private void flush() { + // A frame that is registered before flush may arrive after flush. + numberOfFramesToDropOnBecomingAvailable = pendingFrames.size() - availableFrameCount; + while (availableFrameCount > 0) { + availableFrameCount--; + surfaceTexture.updateTexImage(); + } + externalTextureProcessorInputCapacity.set(0); + currentFrame = null; + pendingFrames.clear(); + + if (onFlushCompleteTask != null) { + frameProcessingTaskExecutor.submitWithHighPriority(onFlushCompleteTask); + } + } + @WorkerThread private void maybeQueueFrameToExternalTextureProcessor() { if (externalTextureProcessorInputCapacity.get() == 0 - || availableFrameCount.get() == 0 + || availableFrameCount == 0 || currentFrame != null) { return; } surfaceTexture.updateTexImage(); - availableFrameCount.getAndDecrement(); + availableFrameCount--; this.currentFrame = pendingFrames.peek(); FrameInfo currentFrame = checkStateNotNull(this.currentFrame); - externalTextureProcessorInputCapacity.getAndDecrement(); + externalTextureProcessorInputCapacity.decrementAndGet(); surfaceTexture.getTransformMatrix(textureTransformMatrix); externalTextureProcessor.setTextureTransformMatrix(textureTransformMatrix); long frameTimeNs = surfaceTexture.getTimestamp(); diff --git a/library/effect/src/main/java/com/google/android/exoplayer2/effect/FinalMatrixTextureProcessorWrapper.java b/library/effect/src/main/java/com/google/android/exoplayer2/effect/FinalMatrixTextureProcessorWrapper.java index bd2467d824..bcd337723f 100644 --- a/library/effect/src/main/java/com/google/android/exoplayer2/effect/FinalMatrixTextureProcessorWrapper.java +++ b/library/effect/src/main/java/com/google/android/exoplayer2/effect/FinalMatrixTextureProcessorWrapper.java @@ -184,13 +184,24 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; @Override public void signalEndOfCurrentInputStream() { checkState(!streamOffsetUsQueue.isEmpty(), "No input stream to end."); - + android.util.Log.e("LYC", "Signal end"); streamOffsetUsQueue.remove(); if (streamOffsetUsQueue.isEmpty()) { frameProcessorListenerExecutor.execute(frameProcessorListener::onFrameProcessingEnded); } } + @Override + public void flush() { + // Drops all frames that aren't released yet. + availableFrames.clear(); + if (matrixTextureProcessor != null) { + matrixTextureProcessor.flush(); + } + inputListener.onFlush(); + inputListener.onReadyToAcceptInputFrame(); + } + @Override @WorkerThread public synchronized void release() throws FrameProcessingException { diff --git a/library/effect/src/main/java/com/google/android/exoplayer2/effect/FrameCacheTextureProcessor.java b/library/effect/src/main/java/com/google/android/exoplayer2/effect/FrameCacheTextureProcessor.java index bf534e2f7f..ee372d7b95 100644 --- a/library/effect/src/main/java/com/google/android/exoplayer2/effect/FrameCacheTextureProcessor.java +++ b/library/effect/src/main/java/com/google/android/exoplayer2/effect/FrameCacheTextureProcessor.java @@ -156,6 +156,16 @@ import java.util.concurrent.Executor; outputListener.onCurrentOutputStreamEnded(); } + @Override + public void flush() { + freeOutputTextures.addAll(inUseOutputTextures); + inUseOutputTextures.clear(); + inputListener.onFlush(); + for (int i = 0; i < freeOutputTextures.size(); i++) { + inputListener.onReadyToAcceptInputFrame(); + } + } + @Override public void release() throws FrameProcessingException { try { diff --git a/library/effect/src/main/java/com/google/android/exoplayer2/effect/FrameProcessingTaskExecutor.java b/library/effect/src/main/java/com/google/android/exoplayer2/effect/FrameProcessingTaskExecutor.java index fa3fe16c99..73785c16f6 100644 --- a/library/effect/src/main/java/com/google/android/exoplayer2/effect/FrameProcessingTaskExecutor.java +++ b/library/effect/src/main/java/com/google/android/exoplayer2/effect/FrameProcessingTaskExecutor.java @@ -22,6 +22,7 @@ import androidx.annotation.Nullable; import com.google.android.exoplayer2.util.FrameProcessingException; import com.google.android.exoplayer2.util.FrameProcessor; import java.util.ArrayDeque; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -76,7 +77,7 @@ import java.util.concurrent.RejectedExecutionException; return; } try { - wrapTaskAndSubmitToExecutorService(task, /* isReleaseTask= */ false); + wrapTaskAndSubmitToExecutorService(task, /* isFlushOrReleaseTask= */ false); } catch (RejectedExecutionException e) { executionException = e; } @@ -107,6 +108,32 @@ import java.util.concurrent.RejectedExecutionException; submit(() -> {}); } + /** + * Flushes all scheduled tasks. + * + *

During flush, the {@code FrameProcessingTaskExecutor} ignores the {@linkplain #submit + * submission of new tasks}. The tasks that are submitted before flushing are either executed or + * canceled when this method returns. + */ + @SuppressWarnings("FutureReturnValueIgnored") + public void flush() throws InterruptedException { + synchronized (lock) { + shouldCancelTasks = true; + highPriorityTasks.clear(); + } + + CountDownLatch latch = new CountDownLatch(1); + wrapTaskAndSubmitToExecutorService( + () -> { + synchronized (lock) { + shouldCancelTasks = false; + } + latch.countDown(); + }, + /* isFlushOrReleaseTask= */ true); + latch.await(); + } + /** * Cancels remaining tasks, runs the given release task, and shuts down the background thread. * @@ -122,7 +149,7 @@ import java.util.concurrent.RejectedExecutionException; highPriorityTasks.clear(); } Future releaseFuture = - wrapTaskAndSubmitToExecutorService(releaseTask, /* isReleaseTask= */ true); + wrapTaskAndSubmitToExecutorService(releaseTask, /* isFlushOrReleaseTask= */ true); singleThreadExecutorService.shutdown(); try { if (!singleThreadExecutorService.awaitTermination(releaseWaitTimeMs, MILLISECONDS)) { @@ -135,12 +162,12 @@ import java.util.concurrent.RejectedExecutionException; } private Future wrapTaskAndSubmitToExecutorService( - FrameProcessingTask defaultPriorityTask, boolean isReleaseTask) { + FrameProcessingTask defaultPriorityTask, boolean isFlushOrReleaseTask) { return singleThreadExecutorService.submit( () -> { try { synchronized (lock) { - if (shouldCancelTasks && !isReleaseTask) { + if (shouldCancelTasks && !isFlushOrReleaseTask) { return; } } diff --git a/library/effect/src/main/java/com/google/android/exoplayer2/effect/GlEffectsFrameProcessor.java b/library/effect/src/main/java/com/google/android/exoplayer2/effect/GlEffectsFrameProcessor.java index e8a6159cf0..36de3cb0fe 100644 --- a/library/effect/src/main/java/com/google/android/exoplayer2/effect/GlEffectsFrameProcessor.java +++ b/library/effect/src/main/java/com/google/android/exoplayer2/effect/GlEffectsFrameProcessor.java @@ -43,6 +43,7 @@ import com.google.android.exoplayer2.video.ColorInfo; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.MoreExecutors; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -455,6 +456,20 @@ public final class GlEffectsFrameProcessor implements FrameProcessor { frameProcessingTaskExecutor.submit(inputExternalTextureManager::signalEndOfInput); } + @Override + public void flush() { + try { + frameProcessingTaskExecutor.flush(); + CountDownLatch latch = new CountDownLatch(1); + inputExternalTextureManager.setOnFlushCompleteListener(latch::countDown); + frameProcessingTaskExecutor.submit(finalTextureProcessorWrapper::flush); + latch.await(); + inputExternalTextureManager.setOnFlushCompleteListener(null); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + @Override public void release() { try { diff --git a/library/effect/src/main/java/com/google/android/exoplayer2/effect/GlTextureProcessor.java b/library/effect/src/main/java/com/google/android/exoplayer2/effect/GlTextureProcessor.java index a6fea91fbf..be291cf714 100644 --- a/library/effect/src/main/java/com/google/android/exoplayer2/effect/GlTextureProcessor.java +++ b/library/effect/src/main/java/com/google/android/exoplayer2/effect/GlTextureProcessor.java @@ -68,6 +68,14 @@ public interface GlTextureProcessor { * #queueInputFrame(TextureInfo, long) queue} the input frame. */ default void onInputFrameProcessed(TextureInfo inputTexture) {} + + /** + * Called when the {@link GlTextureProcessor} has been flushed. + * + *

The implementation shall not assume the {@link GlTextureProcessor} is {@linkplain + * #onReadyToAcceptInputFrame ready to accept another input frame} when this method is called. + */ + default void onFlush() {} } /** @@ -168,6 +176,15 @@ public interface GlTextureProcessor { */ void signalEndOfCurrentInputStream(); + /** + * Flushes the {@code GlTextureProcessor}. + * + *

The texture processor should reclaim the ownership of its allocated textures, {@linkplain + * InputListener#onFlush notify} its {@link InputListener} about the flush event, and {@linkplain + * InputListener#onReadyToAcceptInputFrame report its availability} if necessary. + */ + void flush(); + /** * Releases all resources. * diff --git a/library/effect/src/main/java/com/google/android/exoplayer2/effect/SingleFrameGlTextureProcessor.java b/library/effect/src/main/java/com/google/android/exoplayer2/effect/SingleFrameGlTextureProcessor.java index 576c1fd447..f5c0f6aacc 100644 --- a/library/effect/src/main/java/com/google/android/exoplayer2/effect/SingleFrameGlTextureProcessor.java +++ b/library/effect/src/main/java/com/google/android/exoplayer2/effect/SingleFrameGlTextureProcessor.java @@ -172,6 +172,14 @@ public abstract class SingleFrameGlTextureProcessor implements GlTextureProcesso outputListener.onCurrentOutputStreamEnded(); } + @Override + @CallSuper + public void flush() { + outputTextureInUse = false; + inputListener.onFlush(); + inputListener.onReadyToAcceptInputFrame(); + } + @Override @CallSuper public void release() throws FrameProcessingException {