From 940e28e4db650b9ea5a3f7331340b5f4e80746a2 Mon Sep 17 00:00:00 2001 From: dancho Date: Thu, 25 Jul 2024 09:20:46 -0700 Subject: [PATCH] Refactor threading in FinalShaderProgramWrapper Public methods either assert they're running GL thread, or submit a task to run on GL thread. Move methods to keep interface implementations together. Add javadoc to VideoFrameProcessingTaskExecutor to clarify which thread can call each public method. PiperOrigin-RevId: 655978796 --- .../effect/FinalShaderProgramWrapper.java | 133 ++++++++++-------- .../VideoFrameProcessingTaskExecutor.java | 55 ++++++-- 2 files changed, 113 insertions(+), 75 deletions(-) diff --git a/libraries/effect/src/main/java/androidx/media3/effect/FinalShaderProgramWrapper.java b/libraries/effect/src/main/java/androidx/media3/effect/FinalShaderProgramWrapper.java index 5ab0882b0b..cba3ad2d66 100644 --- a/libraries/effect/src/main/java/androidx/media3/effect/FinalShaderProgramWrapper.java +++ b/libraries/effect/src/main/java/androidx/media3/effect/FinalShaderProgramWrapper.java @@ -78,6 +78,8 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; private static final String TAG = "FinalShaderWrapper"; private static final int SURFACE_INPUT_CAPACITY = 1; + // All fields but videoFrameProcessingTaskExecutor should be accessed only on the GL thread. + private final Context context; private final List matrixTransformations; private final List rgbMatrices; @@ -99,8 +101,6 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; private int inputWidth; private int inputHeight; - private int outputWidth; - private int outputHeight; @Nullable private DefaultShaderProgram defaultShaderProgram; @Nullable private SurfaceViewWrapper debugSurfaceViewWrapper; // Whether the input stream has ended, but not all input has been released. This is relevant only @@ -113,18 +113,11 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; @Nullable private SurfaceView debugSurfaceView; @Nullable private OnInputStreamProcessedListener onInputStreamProcessedListener; private boolean matrixTransformationsChanged; - - @GuardedBy("this") private boolean outputSurfaceInfoChanged; - - @GuardedBy("this") - @Nullable - private SurfaceInfo outputSurfaceInfo; + @Nullable private SurfaceInfo outputSurfaceInfo; /** Wraps the {@link Surface} in {@link #outputSurfaceInfo}. */ - @GuardedBy("this") - @Nullable - private EGLSurface outputEglSurface; + @Nullable private EGLSurface outputEglSurface; public FinalShaderProgramWrapper( Context context, @@ -164,8 +157,29 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; syncObjects = new LongArrayQueue(textureOutputCapacity); } + // GlTextureProducer interface. Can be called on any thread. + + @Override + public void releaseOutputTexture(long presentationTimeUs) { + videoFrameProcessingTaskExecutor.submit(() -> releaseOutputTextureInternal(presentationTimeUs)); + } + + private void releaseOutputTextureInternal(long presentationTimeUs) throws GlUtil.GlException { + checkState(textureOutputListener != null); + while (outputTexturePool.freeTextureCount() < outputTexturePool.capacity() + && outputTextureTimestamps.element() <= presentationTimeUs) { + outputTexturePool.freeTexture(); + outputTextureTimestamps.remove(); + GlUtil.deleteSyncObject(syncObjects.remove()); + inputListener.onReadyToAcceptInputFrame(); + } + } + + // GlShaderProgram interface. Must be called on the GL thread. + @Override public void setInputListener(InputListener inputListener) { + videoFrameProcessingTaskExecutor.verifyVideoFrameProcessingThread(); this.inputListener = inputListener; for (int i = 0; i < getInputCapacity(); i++) { inputListener.onReadyToAcceptInputFrame(); @@ -186,11 +200,13 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; public void setOnInputStreamProcessedListener( @Nullable OnInputStreamProcessedListener onInputStreamProcessedListener) { + videoFrameProcessingTaskExecutor.verifyVideoFrameProcessingThread(); this.onInputStreamProcessedListener = onInputStreamProcessedListener; } @Override public void signalEndOfCurrentInputStream() { + videoFrameProcessingTaskExecutor.verifyVideoFrameProcessingThread(); if (availableFrames.isEmpty()) { checkNotNull(onInputStreamProcessedListener).onInputStreamProcessed(); isInputStreamEndedWithPendingAvailableFrames = false; @@ -200,11 +216,10 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; } } - // Methods that must be called on the GL thread. - @Override public void queueInputFrame( GlObjectsProvider glObjectsProvider, GlTextureInfo inputTexture, long presentationTimeUs) { + videoFrameProcessingTaskExecutor.verifyVideoFrameProcessingThread(); videoFrameProcessorListenerExecutor.execute( () -> videoFrameProcessorListener.onOutputFrameAvailableForRendering(presentationTimeUs)); if (textureOutputListener == null) { @@ -234,40 +249,9 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; throw new UnsupportedOperationException(); } - @Override - public void releaseOutputTexture(long presentationTimeUs) { - videoFrameProcessingTaskExecutor.submit(() -> releaseOutputTextureInternal(presentationTimeUs)); - } - - private void releaseOutputTextureInternal(long presentationTimeUs) throws GlUtil.GlException { - checkState(textureOutputListener != null); - while (outputTexturePool.freeTextureCount() < outputTexturePool.capacity() - && outputTextureTimestamps.element() <= presentationTimeUs) { - outputTexturePool.freeTexture(); - outputTextureTimestamps.remove(); - GlUtil.deleteSyncObject(syncObjects.remove()); - inputListener.onReadyToAcceptInputFrame(); - } - } - - /** - * Sets the list of {@link GlMatrixTransformation GlMatrixTransformations} and list of {@link - * RgbMatrix RgbMatrices} to apply to the next {@linkplain #queueInputFrame queued} frame. - * - *

The new transformations will be applied to the next {@linkplain #queueInputFrame queued} - * frame. - */ - public void setMatrixTransformations( - List matrixTransformations, List rgbMatrices) { - this.matrixTransformations.clear(); - this.matrixTransformations.addAll(matrixTransformations); - this.rgbMatrices.clear(); - this.rgbMatrices.addAll(rgbMatrices); - matrixTransformationsChanged = true; - } - @Override public void flush() { + videoFrameProcessingTaskExecutor.verifyVideoFrameProcessingThread(); // The downstream consumer must already have been flushed, so the textureOutputListener // implementation does not access its previously output textures, per its contract. However, the // downstream consumer may not have called releaseOutputTexture on all these textures. Release @@ -292,14 +276,9 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; } } - private int getInputCapacity() { - return textureOutputListener == null - ? SURFACE_INPUT_CAPACITY - : outputTexturePool.freeTextureCount(); - } - @Override - public synchronized void release() throws VideoFrameProcessingException { + public void release() throws VideoFrameProcessingException { + videoFrameProcessingTaskExecutor.verifyVideoFrameProcessingThread(); if (defaultShaderProgram != null) { defaultShaderProgram.release(); } @@ -312,7 +291,27 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; } } + /** + * Sets the list of {@link GlMatrixTransformation GlMatrixTransformations} and list of {@link + * RgbMatrix RgbMatrices} to apply to the next {@linkplain #queueInputFrame queued} frame. + * + *

The new transformations will be applied to the next {@linkplain #queueInputFrame queued} + * frame. + * + *

Must be called on the GL thread. + */ + public void setMatrixTransformations( + List matrixTransformations, List rgbMatrices) { + videoFrameProcessingTaskExecutor.verifyVideoFrameProcessingThread(); + this.matrixTransformations.clear(); + this.matrixTransformations.addAll(matrixTransformations); + this.rgbMatrices.clear(); + this.rgbMatrices.addAll(rgbMatrices); + matrixTransformationsChanged = true; + } + public void renderOutputFrame(GlObjectsProvider glObjectsProvider, long renderTimeNs) { + videoFrameProcessingTaskExecutor.verifyVideoFrameProcessingThread(); if (textureOutputListener != null) { return; } @@ -329,7 +328,11 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; } } - /** See {@link DefaultVideoFrameProcessor#setOutputSurfaceInfo} */ + /** + * See {@link DefaultVideoFrameProcessor#setOutputSurfaceInfo} + * + *

Can be called on any thread. + */ public void setOutputSurfaceInfo(@Nullable SurfaceInfo outputSurfaceInfo) { try { videoFrameProcessingTaskExecutor.invoke( @@ -342,7 +345,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; } /** Must be called on the GL thread. */ - private synchronized void setOutputSurfaceInfoInternal(@Nullable SurfaceInfo outputSurfaceInfo) { + private void setOutputSurfaceInfoInternal(@Nullable SurfaceInfo outputSurfaceInfo) { if (textureOutputListener != null) { return; } @@ -371,7 +374,13 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; this.outputSurfaceInfo = outputSurfaceInfo; } - private synchronized void destroyOutputEglSurface() { + private int getInputCapacity() { + return textureOutputListener == null + ? SURFACE_INPUT_CAPACITY + : outputTexturePool.freeTextureCount(); + } + + private void destroyOutputEglSurface() { if (outputEglSurface == null) { return; } @@ -389,7 +398,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; } } - private synchronized void renderFrame( + private void renderFrame( GlObjectsProvider glObjectsProvider, GlTextureInfo inputTexture, long presentationTimeUs, @@ -418,7 +427,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; inputListener.onInputFrameProcessed(inputTexture); } - private synchronized void renderFrameToOutputSurface( + private void renderFrameToOutputSurface( GlTextureInfo inputTexture, long presentationTimeUs, long renderTimeNs) throws VideoFrameProcessingException, GlUtil.GlException { EGLSurface outputEglSurface = checkNotNull(this.outputEglSurface); @@ -464,7 +473,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; * *

Returns {@code false} if {@code outputSurfaceInfo} is unset. */ - private synchronized boolean ensureConfigured( + private boolean ensureConfigured( GlObjectsProvider glObjectsProvider, int inputWidth, int inputHeight) throws VideoFrameProcessingException, GlUtil.GlException { // Clear extra or outdated resources. @@ -499,11 +508,11 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; return false; } - outputWidth = + int outputWidth = outputSurfaceInfo == null ? outputSizeBeforeSurfaceTransformation.getWidth() : outputSurfaceInfo.width; - outputHeight = + int outputHeight = outputSurfaceInfo == null ? outputSizeBeforeSurfaceTransformation.getHeight() : outputSurfaceInfo.height; @@ -551,7 +560,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; return true; } - private synchronized DefaultShaderProgram createDefaultShaderProgram( + private DefaultShaderProgram createDefaultShaderProgram( int outputOrientationDegrees, int outputWidth, int outputHeight) throws VideoFrameProcessingException { ImmutableList.Builder matrixTransformationListBuilder = @@ -615,6 +624,8 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; /** * Wrapper around a {@link SurfaceView} that keeps track of whether the output surface is valid, * and makes rendering a no-op if not. + * + *

This class should only be used for displaying a debug preview. */ private static final class SurfaceViewWrapper implements SurfaceHolder.Callback { public final @C.ColorTransfer int outputColorTransfer; diff --git a/libraries/effect/src/main/java/androidx/media3/effect/VideoFrameProcessingTaskExecutor.java b/libraries/effect/src/main/java/androidx/media3/effect/VideoFrameProcessingTaskExecutor.java index 0445113edc..b9e1ed2fd9 100644 --- a/libraries/effect/src/main/java/androidx/media3/effect/VideoFrameProcessingTaskExecutor.java +++ b/libraries/effect/src/main/java/androidx/media3/effect/VideoFrameProcessingTaskExecutor.java @@ -15,6 +15,7 @@ */ package androidx.media3.effect; +import static androidx.media3.common.util.Assertions.checkState; import static java.util.concurrent.TimeUnit.MILLISECONDS; import androidx.annotation.GuardedBy; @@ -33,8 +34,6 @@ import java.util.concurrent.TimeoutException; /** * Wrapper around a single thread {@link ExecutorService} for executing {@link Task} instances. * - *

Public methods can be called from any thread. - * *

Calls {@link ErrorListener#onError} for errors that occur during these tasks. The listener is * invoked from the {@link ExecutorService}. * @@ -90,7 +89,11 @@ import java.util.concurrent.TimeoutException; highPriorityTasks = new ArrayDeque<>(); } - /** Submits the given {@link Task} to be executed after all pending tasks have completed. */ + /** + * Submits the given {@link Task} to be executed after all pending tasks have completed. + * + *

Can be called on any thread. + */ @SuppressWarnings("FutureReturnValueIgnored") public void submit(Task task) { @Nullable RejectedExecutionException executionException = null; @@ -110,20 +113,15 @@ import java.util.concurrent.TimeoutException; } } - /** Blocks the caller until the given {@link Task} has completed. */ + /** + * Blocks the caller until the given {@link Task} has completed. + * + *

Can be called on any thread. + */ public void invoke(Task task) throws InterruptedException { // If running on the executor service thread, run synchronously. // Calling future.get() on the single executor thread would deadlock. - Thread videoFrameProcessingThread; - try { - videoFrameProcessingThread = threadFuture.get(EXECUTOR_SERVICE_TIMEOUT_MS, MILLISECONDS); - } catch (InterruptedException e) { - throw e; - } catch (Exception e) { - handleException(e); - return; - } - if (Thread.currentThread() == videoFrameProcessingThread) { + if (isRunningOnVideoFrameProcessingThread()) { try { task.run(); } catch (Exception e) { @@ -155,6 +153,8 @@ import java.util.concurrent.TimeoutException; * *

Tasks that were previously {@linkplain #submit(Task) submitted} without high-priority and * have not started executing will be executed after this task is complete. + * + *

Can be called on any thread. */ public void submitWithHighPriority(Task task) { synchronized (lock) { @@ -175,6 +175,8 @@ import java.util.concurrent.TimeoutException; *

During flush, the {@code VideoFrameProcessingTaskExecutor} ignores the {@linkplain #submit * submission of new tasks}. The tasks that are submitted before flushing are either executed or * canceled when this method returns. + * + *

Can be called on any thread. */ @SuppressWarnings("FutureReturnValueIgnored") public void flush() throws InterruptedException { @@ -204,10 +206,13 @@ import java.util.concurrent.TimeoutException; *

This {@link VideoFrameProcessingTaskExecutor} instance must not be used after this method is * called. * + *

Must not be called on the GL thread. + * * @param releaseTask A {@link Task} to execute before shutting down the background thread. * @throws InterruptedException If interrupted while releasing resources. */ public void release(Task releaseTask) throws InterruptedException { + checkState(!isRunningOnVideoFrameProcessingThread()); synchronized (lock) { shouldCancelTasks = true; highPriorityTasks.clear(); @@ -225,6 +230,28 @@ import java.util.concurrent.TimeoutException; } } + public void verifyVideoFrameProcessingThread() { + try { + checkState(isRunningOnVideoFrameProcessingThread()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + handleException(e); + } + } + + private boolean isRunningOnVideoFrameProcessingThread() throws InterruptedException { + Thread videoFrameProcessingThread; + try { + videoFrameProcessingThread = threadFuture.get(EXECUTOR_SERVICE_TIMEOUT_MS, MILLISECONDS); + } catch (InterruptedException e) { + throw e; + } catch (Exception e) { + handleException(e); + return false; + } + return Thread.currentThread() == videoFrameProcessingThread; + } + private Future wrapTaskAndSubmitToExecutorService( Task defaultPriorityTask, boolean isFlushOrReleaseTask) { return singleThreadExecutorService.submit(