Refactor threading in FinalShaderProgramWrapper

Public methods either assert they're running GL thread, or
submit a task to run on GL thread.

Move methods to keep interface implementations together.

Add javadoc to VideoFrameProcessingTaskExecutor to clarify which
thread can call each public method.

PiperOrigin-RevId: 655978796
This commit is contained in:
dancho 2024-07-25 09:20:46 -07:00 committed by Copybara-Service
parent 300453820c
commit 940e28e4db
2 changed files with 113 additions and 75 deletions

View File

@ -78,6 +78,8 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
private static final String TAG = "FinalShaderWrapper"; private static final String TAG = "FinalShaderWrapper";
private static final int SURFACE_INPUT_CAPACITY = 1; private static final int SURFACE_INPUT_CAPACITY = 1;
// All fields but videoFrameProcessingTaskExecutor should be accessed only on the GL thread.
private final Context context; private final Context context;
private final List<GlMatrixTransformation> matrixTransformations; private final List<GlMatrixTransformation> matrixTransformations;
private final List<RgbMatrix> rgbMatrices; private final List<RgbMatrix> rgbMatrices;
@ -99,8 +101,6 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
private int inputWidth; private int inputWidth;
private int inputHeight; private int inputHeight;
private int outputWidth;
private int outputHeight;
@Nullable private DefaultShaderProgram defaultShaderProgram; @Nullable private DefaultShaderProgram defaultShaderProgram;
@Nullable private SurfaceViewWrapper debugSurfaceViewWrapper; @Nullable private SurfaceViewWrapper debugSurfaceViewWrapper;
// Whether the input stream has ended, but not all input has been released. This is relevant only // Whether the input stream has ended, but not all input has been released. This is relevant only
@ -113,18 +113,11 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
@Nullable private SurfaceView debugSurfaceView; @Nullable private SurfaceView debugSurfaceView;
@Nullable private OnInputStreamProcessedListener onInputStreamProcessedListener; @Nullable private OnInputStreamProcessedListener onInputStreamProcessedListener;
private boolean matrixTransformationsChanged; private boolean matrixTransformationsChanged;
@GuardedBy("this")
private boolean outputSurfaceInfoChanged; private boolean outputSurfaceInfoChanged;
@Nullable private SurfaceInfo outputSurfaceInfo;
@GuardedBy("this")
@Nullable
private SurfaceInfo outputSurfaceInfo;
/** Wraps the {@link Surface} in {@link #outputSurfaceInfo}. */ /** Wraps the {@link Surface} in {@link #outputSurfaceInfo}. */
@GuardedBy("this") @Nullable private EGLSurface outputEglSurface;
@Nullable
private EGLSurface outputEglSurface;
public FinalShaderProgramWrapper( public FinalShaderProgramWrapper(
Context context, Context context,
@ -164,8 +157,29 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
syncObjects = new LongArrayQueue(textureOutputCapacity); syncObjects = new LongArrayQueue(textureOutputCapacity);
} }
// GlTextureProducer interface. Can be called on any thread.
@Override
public void releaseOutputTexture(long presentationTimeUs) {
videoFrameProcessingTaskExecutor.submit(() -> releaseOutputTextureInternal(presentationTimeUs));
}
private void releaseOutputTextureInternal(long presentationTimeUs) throws GlUtil.GlException {
checkState(textureOutputListener != null);
while (outputTexturePool.freeTextureCount() < outputTexturePool.capacity()
&& outputTextureTimestamps.element() <= presentationTimeUs) {
outputTexturePool.freeTexture();
outputTextureTimestamps.remove();
GlUtil.deleteSyncObject(syncObjects.remove());
inputListener.onReadyToAcceptInputFrame();
}
}
// GlShaderProgram interface. Must be called on the GL thread.
@Override @Override
public void setInputListener(InputListener inputListener) { public void setInputListener(InputListener inputListener) {
videoFrameProcessingTaskExecutor.verifyVideoFrameProcessingThread();
this.inputListener = inputListener; this.inputListener = inputListener;
for (int i = 0; i < getInputCapacity(); i++) { for (int i = 0; i < getInputCapacity(); i++) {
inputListener.onReadyToAcceptInputFrame(); inputListener.onReadyToAcceptInputFrame();
@ -186,11 +200,13 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
public void setOnInputStreamProcessedListener( public void setOnInputStreamProcessedListener(
@Nullable OnInputStreamProcessedListener onInputStreamProcessedListener) { @Nullable OnInputStreamProcessedListener onInputStreamProcessedListener) {
videoFrameProcessingTaskExecutor.verifyVideoFrameProcessingThread();
this.onInputStreamProcessedListener = onInputStreamProcessedListener; this.onInputStreamProcessedListener = onInputStreamProcessedListener;
} }
@Override @Override
public void signalEndOfCurrentInputStream() { public void signalEndOfCurrentInputStream() {
videoFrameProcessingTaskExecutor.verifyVideoFrameProcessingThread();
if (availableFrames.isEmpty()) { if (availableFrames.isEmpty()) {
checkNotNull(onInputStreamProcessedListener).onInputStreamProcessed(); checkNotNull(onInputStreamProcessedListener).onInputStreamProcessed();
isInputStreamEndedWithPendingAvailableFrames = false; isInputStreamEndedWithPendingAvailableFrames = false;
@ -200,11 +216,10 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
} }
} }
// Methods that must be called on the GL thread.
@Override @Override
public void queueInputFrame( public void queueInputFrame(
GlObjectsProvider glObjectsProvider, GlTextureInfo inputTexture, long presentationTimeUs) { GlObjectsProvider glObjectsProvider, GlTextureInfo inputTexture, long presentationTimeUs) {
videoFrameProcessingTaskExecutor.verifyVideoFrameProcessingThread();
videoFrameProcessorListenerExecutor.execute( videoFrameProcessorListenerExecutor.execute(
() -> videoFrameProcessorListener.onOutputFrameAvailableForRendering(presentationTimeUs)); () -> videoFrameProcessorListener.onOutputFrameAvailableForRendering(presentationTimeUs));
if (textureOutputListener == null) { if (textureOutputListener == null) {
@ -234,40 +249,9 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public void releaseOutputTexture(long presentationTimeUs) {
videoFrameProcessingTaskExecutor.submit(() -> releaseOutputTextureInternal(presentationTimeUs));
}
private void releaseOutputTextureInternal(long presentationTimeUs) throws GlUtil.GlException {
checkState(textureOutputListener != null);
while (outputTexturePool.freeTextureCount() < outputTexturePool.capacity()
&& outputTextureTimestamps.element() <= presentationTimeUs) {
outputTexturePool.freeTexture();
outputTextureTimestamps.remove();
GlUtil.deleteSyncObject(syncObjects.remove());
inputListener.onReadyToAcceptInputFrame();
}
}
/**
* Sets the list of {@link GlMatrixTransformation GlMatrixTransformations} and list of {@link
* RgbMatrix RgbMatrices} to apply to the next {@linkplain #queueInputFrame queued} frame.
*
* <p>The new transformations will be applied to the next {@linkplain #queueInputFrame queued}
* frame.
*/
public void setMatrixTransformations(
List<GlMatrixTransformation> matrixTransformations, List<RgbMatrix> rgbMatrices) {
this.matrixTransformations.clear();
this.matrixTransformations.addAll(matrixTransformations);
this.rgbMatrices.clear();
this.rgbMatrices.addAll(rgbMatrices);
matrixTransformationsChanged = true;
}
@Override @Override
public void flush() { public void flush() {
videoFrameProcessingTaskExecutor.verifyVideoFrameProcessingThread();
// The downstream consumer must already have been flushed, so the textureOutputListener // The downstream consumer must already have been flushed, so the textureOutputListener
// implementation does not access its previously output textures, per its contract. However, the // implementation does not access its previously output textures, per its contract. However, the
// downstream consumer may not have called releaseOutputTexture on all these textures. Release // downstream consumer may not have called releaseOutputTexture on all these textures. Release
@ -292,14 +276,9 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
} }
} }
private int getInputCapacity() {
return textureOutputListener == null
? SURFACE_INPUT_CAPACITY
: outputTexturePool.freeTextureCount();
}
@Override @Override
public synchronized void release() throws VideoFrameProcessingException { public void release() throws VideoFrameProcessingException {
videoFrameProcessingTaskExecutor.verifyVideoFrameProcessingThread();
if (defaultShaderProgram != null) { if (defaultShaderProgram != null) {
defaultShaderProgram.release(); defaultShaderProgram.release();
} }
@ -312,7 +291,27 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
} }
} }
/**
* Sets the list of {@link GlMatrixTransformation GlMatrixTransformations} and list of {@link
* RgbMatrix RgbMatrices} to apply to the next {@linkplain #queueInputFrame queued} frame.
*
* <p>The new transformations will be applied to the next {@linkplain #queueInputFrame queued}
* frame.
*
* <p>Must be called on the GL thread.
*/
public void setMatrixTransformations(
List<GlMatrixTransformation> matrixTransformations, List<RgbMatrix> rgbMatrices) {
videoFrameProcessingTaskExecutor.verifyVideoFrameProcessingThread();
this.matrixTransformations.clear();
this.matrixTransformations.addAll(matrixTransformations);
this.rgbMatrices.clear();
this.rgbMatrices.addAll(rgbMatrices);
matrixTransformationsChanged = true;
}
public void renderOutputFrame(GlObjectsProvider glObjectsProvider, long renderTimeNs) { public void renderOutputFrame(GlObjectsProvider glObjectsProvider, long renderTimeNs) {
videoFrameProcessingTaskExecutor.verifyVideoFrameProcessingThread();
if (textureOutputListener != null) { if (textureOutputListener != null) {
return; return;
} }
@ -329,7 +328,11 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
} }
} }
/** See {@link DefaultVideoFrameProcessor#setOutputSurfaceInfo} */ /**
* See {@link DefaultVideoFrameProcessor#setOutputSurfaceInfo}
*
* <p>Can be called on any thread.
*/
public void setOutputSurfaceInfo(@Nullable SurfaceInfo outputSurfaceInfo) { public void setOutputSurfaceInfo(@Nullable SurfaceInfo outputSurfaceInfo) {
try { try {
videoFrameProcessingTaskExecutor.invoke( videoFrameProcessingTaskExecutor.invoke(
@ -342,7 +345,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
} }
/** Must be called on the GL thread. */ /** Must be called on the GL thread. */
private synchronized void setOutputSurfaceInfoInternal(@Nullable SurfaceInfo outputSurfaceInfo) { private void setOutputSurfaceInfoInternal(@Nullable SurfaceInfo outputSurfaceInfo) {
if (textureOutputListener != null) { if (textureOutputListener != null) {
return; return;
} }
@ -371,7 +374,13 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
this.outputSurfaceInfo = outputSurfaceInfo; this.outputSurfaceInfo = outputSurfaceInfo;
} }
private synchronized void destroyOutputEglSurface() { private int getInputCapacity() {
return textureOutputListener == null
? SURFACE_INPUT_CAPACITY
: outputTexturePool.freeTextureCount();
}
private void destroyOutputEglSurface() {
if (outputEglSurface == null) { if (outputEglSurface == null) {
return; return;
} }
@ -389,7 +398,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
} }
} }
private synchronized void renderFrame( private void renderFrame(
GlObjectsProvider glObjectsProvider, GlObjectsProvider glObjectsProvider,
GlTextureInfo inputTexture, GlTextureInfo inputTexture,
long presentationTimeUs, long presentationTimeUs,
@ -418,7 +427,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
inputListener.onInputFrameProcessed(inputTexture); inputListener.onInputFrameProcessed(inputTexture);
} }
private synchronized void renderFrameToOutputSurface( private void renderFrameToOutputSurface(
GlTextureInfo inputTexture, long presentationTimeUs, long renderTimeNs) GlTextureInfo inputTexture, long presentationTimeUs, long renderTimeNs)
throws VideoFrameProcessingException, GlUtil.GlException { throws VideoFrameProcessingException, GlUtil.GlException {
EGLSurface outputEglSurface = checkNotNull(this.outputEglSurface); EGLSurface outputEglSurface = checkNotNull(this.outputEglSurface);
@ -464,7 +473,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
* *
* <p>Returns {@code false} if {@code outputSurfaceInfo} is unset. * <p>Returns {@code false} if {@code outputSurfaceInfo} is unset.
*/ */
private synchronized boolean ensureConfigured( private boolean ensureConfigured(
GlObjectsProvider glObjectsProvider, int inputWidth, int inputHeight) GlObjectsProvider glObjectsProvider, int inputWidth, int inputHeight)
throws VideoFrameProcessingException, GlUtil.GlException { throws VideoFrameProcessingException, GlUtil.GlException {
// Clear extra or outdated resources. // Clear extra or outdated resources.
@ -499,11 +508,11 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
return false; return false;
} }
outputWidth = int outputWidth =
outputSurfaceInfo == null outputSurfaceInfo == null
? outputSizeBeforeSurfaceTransformation.getWidth() ? outputSizeBeforeSurfaceTransformation.getWidth()
: outputSurfaceInfo.width; : outputSurfaceInfo.width;
outputHeight = int outputHeight =
outputSurfaceInfo == null outputSurfaceInfo == null
? outputSizeBeforeSurfaceTransformation.getHeight() ? outputSizeBeforeSurfaceTransformation.getHeight()
: outputSurfaceInfo.height; : outputSurfaceInfo.height;
@ -551,7 +560,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
return true; return true;
} }
private synchronized DefaultShaderProgram createDefaultShaderProgram( private DefaultShaderProgram createDefaultShaderProgram(
int outputOrientationDegrees, int outputWidth, int outputHeight) int outputOrientationDegrees, int outputWidth, int outputHeight)
throws VideoFrameProcessingException { throws VideoFrameProcessingException {
ImmutableList.Builder<GlMatrixTransformation> matrixTransformationListBuilder = ImmutableList.Builder<GlMatrixTransformation> matrixTransformationListBuilder =
@ -615,6 +624,8 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
/** /**
* Wrapper around a {@link SurfaceView} that keeps track of whether the output surface is valid, * Wrapper around a {@link SurfaceView} that keeps track of whether the output surface is valid,
* and makes rendering a no-op if not. * and makes rendering a no-op if not.
*
* <p>This class should only be used for displaying a debug preview.
*/ */
private static final class SurfaceViewWrapper implements SurfaceHolder.Callback { private static final class SurfaceViewWrapper implements SurfaceHolder.Callback {
public final @C.ColorTransfer int outputColorTransfer; public final @C.ColorTransfer int outputColorTransfer;

View File

@ -15,6 +15,7 @@
*/ */
package androidx.media3.effect; package androidx.media3.effect;
import static androidx.media3.common.util.Assertions.checkState;
import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;
import androidx.annotation.GuardedBy; import androidx.annotation.GuardedBy;
@ -33,8 +34,6 @@ import java.util.concurrent.TimeoutException;
/** /**
* Wrapper around a single thread {@link ExecutorService} for executing {@link Task} instances. * Wrapper around a single thread {@link ExecutorService} for executing {@link Task} instances.
* *
* <p>Public methods can be called from any thread.
*
* <p>Calls {@link ErrorListener#onError} for errors that occur during these tasks. The listener is * <p>Calls {@link ErrorListener#onError} for errors that occur during these tasks. The listener is
* invoked from the {@link ExecutorService}. * invoked from the {@link ExecutorService}.
* *
@ -90,7 +89,11 @@ import java.util.concurrent.TimeoutException;
highPriorityTasks = new ArrayDeque<>(); highPriorityTasks = new ArrayDeque<>();
} }
/** Submits the given {@link Task} to be executed after all pending tasks have completed. */ /**
* Submits the given {@link Task} to be executed after all pending tasks have completed.
*
* <p>Can be called on any thread.
*/
@SuppressWarnings("FutureReturnValueIgnored") @SuppressWarnings("FutureReturnValueIgnored")
public void submit(Task task) { public void submit(Task task) {
@Nullable RejectedExecutionException executionException = null; @Nullable RejectedExecutionException executionException = null;
@ -110,20 +113,15 @@ import java.util.concurrent.TimeoutException;
} }
} }
/** Blocks the caller until the given {@link Task} has completed. */ /**
* Blocks the caller until the given {@link Task} has completed.
*
* <p>Can be called on any thread.
*/
public void invoke(Task task) throws InterruptedException { public void invoke(Task task) throws InterruptedException {
// If running on the executor service thread, run synchronously. // If running on the executor service thread, run synchronously.
// Calling future.get() on the single executor thread would deadlock. // Calling future.get() on the single executor thread would deadlock.
Thread videoFrameProcessingThread; if (isRunningOnVideoFrameProcessingThread()) {
try {
videoFrameProcessingThread = threadFuture.get(EXECUTOR_SERVICE_TIMEOUT_MS, MILLISECONDS);
} catch (InterruptedException e) {
throw e;
} catch (Exception e) {
handleException(e);
return;
}
if (Thread.currentThread() == videoFrameProcessingThread) {
try { try {
task.run(); task.run();
} catch (Exception e) { } catch (Exception e) {
@ -155,6 +153,8 @@ import java.util.concurrent.TimeoutException;
* *
* <p>Tasks that were previously {@linkplain #submit(Task) submitted} without high-priority and * <p>Tasks that were previously {@linkplain #submit(Task) submitted} without high-priority and
* have not started executing will be executed after this task is complete. * have not started executing will be executed after this task is complete.
*
* <p>Can be called on any thread.
*/ */
public void submitWithHighPriority(Task task) { public void submitWithHighPriority(Task task) {
synchronized (lock) { synchronized (lock) {
@ -175,6 +175,8 @@ import java.util.concurrent.TimeoutException;
* <p>During flush, the {@code VideoFrameProcessingTaskExecutor} ignores the {@linkplain #submit * <p>During flush, the {@code VideoFrameProcessingTaskExecutor} ignores the {@linkplain #submit
* submission of new tasks}. The tasks that are submitted before flushing are either executed or * submission of new tasks}. The tasks that are submitted before flushing are either executed or
* canceled when this method returns. * canceled when this method returns.
*
* <p>Can be called on any thread.
*/ */
@SuppressWarnings("FutureReturnValueIgnored") @SuppressWarnings("FutureReturnValueIgnored")
public void flush() throws InterruptedException { public void flush() throws InterruptedException {
@ -204,10 +206,13 @@ import java.util.concurrent.TimeoutException;
* <p>This {@link VideoFrameProcessingTaskExecutor} instance must not be used after this method is * <p>This {@link VideoFrameProcessingTaskExecutor} instance must not be used after this method is
* called. * called.
* *
* <p>Must not be called on the GL thread.
*
* @param releaseTask A {@link Task} to execute before shutting down the background thread. * @param releaseTask A {@link Task} to execute before shutting down the background thread.
* @throws InterruptedException If interrupted while releasing resources. * @throws InterruptedException If interrupted while releasing resources.
*/ */
public void release(Task releaseTask) throws InterruptedException { public void release(Task releaseTask) throws InterruptedException {
checkState(!isRunningOnVideoFrameProcessingThread());
synchronized (lock) { synchronized (lock) {
shouldCancelTasks = true; shouldCancelTasks = true;
highPriorityTasks.clear(); highPriorityTasks.clear();
@ -225,6 +230,28 @@ import java.util.concurrent.TimeoutException;
} }
} }
public void verifyVideoFrameProcessingThread() {
try {
checkState(isRunningOnVideoFrameProcessingThread());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
handleException(e);
}
}
private boolean isRunningOnVideoFrameProcessingThread() throws InterruptedException {
Thread videoFrameProcessingThread;
try {
videoFrameProcessingThread = threadFuture.get(EXECUTOR_SERVICE_TIMEOUT_MS, MILLISECONDS);
} catch (InterruptedException e) {
throw e;
} catch (Exception e) {
handleException(e);
return false;
}
return Thread.currentThread() == videoFrameProcessingThread;
}
private Future<?> wrapTaskAndSubmitToExecutorService( private Future<?> wrapTaskAndSubmitToExecutorService(
Task defaultPriorityTask, boolean isFlushOrReleaseTask) { Task defaultPriorityTask, boolean isFlushOrReleaseTask) {
return singleThreadExecutorService.submit( return singleThreadExecutorService.submit(