Allow specifying an Executor in FrameProcessor.Factory.create()

From this CL on, FrameProcessor listeners will be invoked from an Executor that
is passed in when creating the FrameProcessor.

GlTextureProcessor needs to invoke the ErrorListener on the said Executor too.

PiperOrigin-RevId: 493018583
This commit is contained in:
claincly 2022-12-05 16:40:34 +00:00 committed by Ian Baker
parent ead7070b60
commit 900e86ffc8
9 changed files with 113 additions and 36 deletions

View File

@ -29,6 +29,7 @@ import androidx.media3.common.util.LibraryLoader;
import androidx.media3.common.util.Util;
import androidx.media3.effect.GlTextureProcessor;
import androidx.media3.effect.TextureInfo;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.mediapipe.components.FrameProcessor;
import com.google.mediapipe.framework.AppTextureFrame;
import com.google.mediapipe.framework.TextureFrame;
@ -37,6 +38,7 @@ import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@ -74,6 +76,7 @@ import java.util.concurrent.Future;
private InputListener inputListener;
private OutputListener outputListener;
private ErrorListener errorListener;
private Executor errorListenerExecutor;
private boolean acceptedFrame;
/**
@ -110,6 +113,7 @@ import java.util.concurrent.Future;
inputListener = new InputListener() {};
outputListener = new OutputListener() {};
errorListener = (frameProcessingException) -> {};
errorListenerExecutor = MoreExecutors.directExecutor();
EglManager eglManager = new EglManager(EGL14.eglGetCurrentContext());
frameProcessor =
new FrameProcessor(
@ -145,10 +149,13 @@ import java.util.concurrent.Future;
}
@Override
public void setErrorListener(ErrorListener errorListener) {
public void setErrorListener(Executor executor, ErrorListener errorListener) {
this.errorListenerExecutor = executor;
this.errorListener = errorListener;
frameProcessor.setAsynchronousErrorListener(
error -> errorListener.onFrameProcessingError(new FrameProcessingException(error)));
error ->
errorListenerExecutor.execute(
() -> errorListener.onFrameProcessingError(new FrameProcessingException(error))));
}
@Override
@ -183,7 +190,8 @@ import java.util.concurrent.Future;
appTextureFrame.waitUntilReleasedWithGpuSync();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
errorListener.onFrameProcessingError(new FrameProcessingException(e));
errorListenerExecutor.execute(
() -> errorListener.onFrameProcessingError(new FrameProcessingException(e)));
}
if (acceptedFrame) {
inputListener.onInputFrameProcessed(inputTexture);
@ -204,7 +212,10 @@ import java.util.concurrent.Future;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (errorListener != null) {
errorListener.onFrameProcessingError(new FrameProcessingException(e));
errorListenerExecutor.execute(
() ->
errorListener.onFrameProcessingError(
new FrameProcessingException(e)));
}
}
}
@ -236,11 +247,15 @@ import java.util.concurrent.Future;
singleThreadExecutorService.shutdown();
try {
if (!singleThreadExecutorService.awaitTermination(RELEASE_WAIT_TIME_MS, MILLISECONDS)) {
errorListener.onFrameProcessingError(new FrameProcessingException("Release timed out"));
errorListenerExecutor.execute(
() ->
errorListener.onFrameProcessingError(
new FrameProcessingException("Release timed out")));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
errorListener.onFrameProcessingError(new FrameProcessingException(e));
errorListenerExecutor.execute(
() -> errorListener.onFrameProcessingError(new FrameProcessingException(e)));
}
frameProcessor.close();
@ -272,10 +287,12 @@ import java.util.concurrent.Future;
try {
futures.remove().get();
} catch (ExecutionException e) {
errorListener.onFrameProcessingError(new FrameProcessingException(e));
errorListenerExecutor.execute(
() -> errorListener.onFrameProcessingError(new FrameProcessingException(e)));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
errorListener.onFrameProcessingError(new FrameProcessingException(e));
errorListenerExecutor.execute(
() -> errorListener.onFrameProcessingError(new FrameProcessingException(e)));
}
}
}

View File

@ -21,6 +21,7 @@ import android.view.Surface;
import androidx.annotation.Nullable;
import androidx.media3.common.util.UnstableApi;
import java.util.List;
import java.util.concurrent.Executor;
/**
* Interface for a frame processor that applies changes to individual video frames.
@ -45,6 +46,7 @@ public interface FrameProcessor {
*
* @param context A {@link Context}.
* @param listener A {@link Listener}.
* @param executor The {@link Executor} on which the {@code listener} is invoked.
* @param effects The {@link Effect} instances to apply to each frame.
* @param debugViewProvider A {@link DebugViewProvider}.
* @param colorInfo The {@link ColorInfo} for input and output frames.
@ -60,6 +62,7 @@ public interface FrameProcessor {
FrameProcessor create(
Context context,
Listener listener,
Executor executor,
List<Effect> effects,
DebugViewProvider debugViewProvider,
ColorInfo colorInfo,
@ -70,7 +73,8 @@ public interface FrameProcessor {
/**
* Listener for asynchronous frame processing events.
*
* <p>All listener methods must be called from the same thread.
* <p>All listener methods must be called from the {@link Executor} passed in at {@linkplain
* Factory#create creation}.
*/
interface Listener {

View File

@ -32,9 +32,11 @@ import androidx.media3.common.util.GlUtil;
import androidx.media3.common.util.Util;
import androidx.test.ext.junit.runners.AndroidJUnit4;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ -313,6 +315,7 @@ public final class GlEffectsFrameProcessorFrameReleaseTest {
@Override
public void onFrameProcessingEnded() {}
},
MoreExecutors.directExecutor(),
ImmutableList.of(
(GlEffect)
(context, useHdr) ->
@ -362,7 +365,7 @@ public final class GlEffectsFrameProcessorFrameReleaseTest {
}
@Override
public void setErrorListener(ErrorListener errorListener) {}
public void setErrorListener(Executor executor, ErrorListener errorListener) {}
@Override
public void queueInputFrame(TextureInfo inputTexture, long presentationTimeUs) {

View File

@ -44,6 +44,7 @@ import androidx.media3.common.SurfaceInfo;
import androidx.media3.test.utils.DecodeOneFrameUtil;
import androidx.test.ext.junit.runners.AndroidJUnit4;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
@ -441,6 +442,7 @@ public final class GlEffectsFrameProcessorPixelTest {
frameProcessingEnded = true;
}
},
MoreExecutors.directExecutor(),
effects,
DebugViewProvider.NONE,
ColorInfo.SDR_BT709_LIMITED,

View File

@ -44,6 +44,7 @@ import androidx.media3.common.util.Util;
import com.google.common.collect.ImmutableList;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import org.checkerframework.checker.nullness.qual.EnsuresNonNullIf;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
@ -69,6 +70,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
private final EGLContext eglContext;
private final DebugViewProvider debugViewProvider;
private final FrameProcessor.Listener frameProcessorListener;
private final Executor frameProcessorListenerExecutor;
private final boolean sampleFromExternalTexture;
private final ColorInfo colorInfo;
private final boolean releaseFramesAutomatically;
@ -101,6 +103,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
ImmutableList<GlMatrixTransformation> matrixTransformations,
ImmutableList<RgbMatrix> rgbMatrices,
FrameProcessor.Listener frameProcessorListener,
Executor frameProcessorListenerExecutor,
DebugViewProvider debugViewProvider,
boolean sampleFromExternalTexture,
ColorInfo colorInfo,
@ -112,6 +115,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
this.eglContext = eglContext;
this.debugViewProvider = debugViewProvider;
this.frameProcessorListener = frameProcessorListener;
this.frameProcessorListenerExecutor = frameProcessorListenerExecutor;
this.sampleFromExternalTexture = sampleFromExternalTexture;
this.colorInfo = colorInfo;
this.releaseFramesAutomatically = releaseFramesAutomatically;
@ -135,7 +139,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
}
@Override
public void setErrorListener(ErrorListener errorListener) {
public void setErrorListener(Executor executor, ErrorListener errorListener) {
// The FrameProcessor.Listener passed to the constructor is used for errors.
throw new UnsupportedOperationException();
}
@ -145,7 +149,8 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
long streamOffsetUs =
checkStateNotNull(streamOffsetUsQueue.peek(), "No input stream specified.");
long offsetPresentationTimeUs = presentationTimeUs + streamOffsetUs;
frameProcessorListener.onOutputFrameAvailable(offsetPresentationTimeUs);
frameProcessorListenerExecutor.execute(
() -> frameProcessorListener.onOutputFrameAvailable(offsetPresentationTimeUs));
if (releaseFramesAutomatically) {
renderFrameToSurfaces(
inputTexture, presentationTimeUs, /* releaseTimeNs= */ offsetPresentationTimeUs * 1000);
@ -177,7 +182,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
streamOffsetUsQueue.remove();
if (streamOffsetUsQueue.isEmpty()) {
frameProcessorListener.onFrameProcessingEnded();
frameProcessorListenerExecutor.execute(frameProcessorListener::onFrameProcessingEnded);
}
}
@ -234,7 +239,9 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
try {
GlUtil.destroyEglSurface(eglDisplay, outputEglSurface);
} catch (GlUtil.GlException e) {
frameProcessorListener.onFrameProcessingError(FrameProcessingException.from(e));
frameProcessorListenerExecutor.execute(
() ->
frameProcessorListener.onFrameProcessingError(FrameProcessingException.from(e)));
}
this.outputEglSurface = null;
}
@ -253,8 +260,10 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
try {
maybeRenderFrameToOutputSurface(inputTexture, presentationTimeUs, releaseTimeNs);
} catch (FrameProcessingException | GlUtil.GlException e) {
frameProcessorListener.onFrameProcessingError(
FrameProcessingException.from(e, presentationTimeUs));
frameProcessorListenerExecutor.execute(
() ->
frameProcessorListener.onFrameProcessingError(
FrameProcessingException.from(e, presentationTimeUs)));
}
maybeRenderFrameToDebugSurface(inputTexture, presentationTimeUs);
inputListener.onInputFrameProcessed(inputTexture);
@ -306,9 +315,11 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
if (!Util.areEqual(
this.outputSizeBeforeSurfaceTransformation, outputSizeBeforeSurfaceTransformation)) {
this.outputSizeBeforeSurfaceTransformation = outputSizeBeforeSurfaceTransformation;
frameProcessorListener.onOutputSizeChanged(
outputSizeBeforeSurfaceTransformation.first,
outputSizeBeforeSurfaceTransformation.second);
frameProcessorListenerExecutor.execute(
() ->
frameProcessorListener.onOutputSizeChanged(
outputSizeBeforeSurfaceTransformation.first,
outputSizeBeforeSurfaceTransformation.second));
}
}

View File

@ -38,8 +38,10 @@ import androidx.media3.common.util.GlUtil;
import androidx.media3.common.util.UnstableApi;
import androidx.media3.common.util.Util;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
@ -59,17 +61,21 @@ public final class GlEffectsFrameProcessor implements FrameProcessor {
* <p>All {@link Effect} instances must be {@link GlEffect} instances.
*
* <p>Using HDR requires the {@code EXT_YUV_target} OpenGL extension.
*
* <p>Pass a {@link MoreExecutors#directExecutor() direct listenerExecutor} if invoking the
* {@code listener} on {@link GlEffectsFrameProcessor}'s internal thread is desired.
*/
@Override
public GlEffectsFrameProcessor create(
Context context,
FrameProcessor.Listener listener,
Listener listener,
Executor listenerExecutor,
List<Effect> effects,
DebugViewProvider debugViewProvider,
ColorInfo colorInfo,
boolean releaseFramesAutomatically)
throws FrameProcessingException {
// TODO(b/261188041) Add tests to verify the Listener is invoked on the given Executor.
ExecutorService singleThreadExecutorService = Util.newSingleThreadExecutor(THREAD_NAME);
Future<GlEffectsFrameProcessor> glFrameProcessorFuture =
@ -78,6 +84,7 @@ public final class GlEffectsFrameProcessor implements FrameProcessor {
createOpenGlObjectsAndFrameProcessor(
context,
listener,
listenerExecutor,
effects,
debugViewProvider,
colorInfo,
@ -96,7 +103,7 @@ public final class GlEffectsFrameProcessor implements FrameProcessor {
}
/**
* Creates the OpenGL context, surfaces, textures, and framebuffers, initializes {@link
* Creates the OpenGL context, surfaces, textures, and frame buffers, initializes {@link
* GlTextureProcessor} instances corresponding to the {@link GlEffect} instances, and returns a
* new {@code GlEffectsFrameProcessor}.
*
@ -108,7 +115,8 @@ public final class GlEffectsFrameProcessor implements FrameProcessor {
@WorkerThread
private static GlEffectsFrameProcessor createOpenGlObjectsAndFrameProcessor(
Context context,
FrameProcessor.Listener listener,
Listener listener,
Executor executor,
List<Effect> effects,
DebugViewProvider debugViewProvider,
ColorInfo colorInfo,
@ -133,12 +141,14 @@ public final class GlEffectsFrameProcessor implements FrameProcessor {
eglDisplay,
eglContext,
listener,
executor,
debugViewProvider,
colorInfo,
releaseFramesAutomatically);
FrameProcessingTaskExecutor frameProcessingTaskExecutor =
new FrameProcessingTaskExecutor(singleThreadExecutorService, listener);
chainTextureProcessorsWithListeners(textureProcessors, frameProcessingTaskExecutor, listener);
chainTextureProcessorsWithListeners(
textureProcessors, frameProcessingTaskExecutor, listener, executor);
return new GlEffectsFrameProcessor(
eglDisplay,
@ -164,7 +174,8 @@ public final class GlEffectsFrameProcessor implements FrameProcessor {
List<Effect> effects,
EGLDisplay eglDisplay,
EGLContext eglContext,
FrameProcessor.Listener listener,
Listener listener,
Executor executor,
DebugViewProvider debugViewProvider,
ColorInfo colorInfo,
boolean releaseFramesAutomatically)
@ -222,6 +233,7 @@ public final class GlEffectsFrameProcessor implements FrameProcessor {
matrixTransformationListBuilder.build(),
rgbMatrixListBuilder.build(),
listener,
executor,
debugViewProvider,
sampleFromExternalTexture,
colorInfo,
@ -236,7 +248,8 @@ public final class GlEffectsFrameProcessor implements FrameProcessor {
private static void chainTextureProcessorsWithListeners(
ImmutableList<GlTextureProcessor> textureProcessors,
FrameProcessingTaskExecutor frameProcessingTaskExecutor,
FrameProcessor.Listener frameProcessorListener) {
Listener frameProcessorListener,
Executor frameProcessorListenerExecutor) {
for (int i = 0; i < textureProcessors.size() - 1; i++) {
GlTextureProcessor producingGlTextureProcessor = textureProcessors.get(i);
GlTextureProcessor consumingGlTextureProcessor = textureProcessors.get(i + 1);
@ -246,7 +259,8 @@ public final class GlEffectsFrameProcessor implements FrameProcessor {
consumingGlTextureProcessor,
frameProcessingTaskExecutor);
producingGlTextureProcessor.setOutputListener(chainingGlTextureProcessorListener);
producingGlTextureProcessor.setErrorListener(frameProcessorListener::onFrameProcessingError);
producingGlTextureProcessor.setErrorListener(
frameProcessorListenerExecutor, frameProcessorListener::onFrameProcessingError);
consumingGlTextureProcessor.setInputListener(chainingGlTextureProcessorListener);
}
}

View File

@ -17,6 +17,7 @@ package androidx.media3.effect;
import androidx.media3.common.FrameProcessingException;
import androidx.media3.common.util.UnstableApi;
import java.util.concurrent.Executor;
/**
* Processes frames from one OpenGL 2D texture to another.
@ -113,14 +114,30 @@ public interface GlTextureProcessor {
void onFrameProcessingError(FrameProcessingException e);
}
/** Sets the {@link InputListener}. */
/**
* Sets the {@link InputListener}.
*
* <p>The {@link InputListener} should be invoked on the thread that owns the parent OpenGL
* context. For example, {@link GlEffectsFrameProcessor} invokes the {@link InputListener} methods
* on its internal thread.
*/
void setInputListener(InputListener inputListener);
/** Sets the {@link OutputListener}. */
/**
* Sets the {@link OutputListener}.
*
* <p>The {@link OutputListener} should be invoked on the thread that owns the parent OpenGL
* context. For example, {@link GlEffectsFrameProcessor} invokes the {@link OutputListener}
* methods on its internal thread.
*/
void setOutputListener(OutputListener outputListener);
/** Sets the {@link ErrorListener}. */
void setErrorListener(ErrorListener errorListener);
/**
* Sets the {@link ErrorListener}.
*
* <p>The {@link ErrorListener} is invoked on the provided {@link Executor}.
*/
void setErrorListener(Executor executor, ErrorListener errorListener);
/**
* Processes an input frame if possible.

View File

@ -22,6 +22,8 @@ import androidx.annotation.CallSuper;
import androidx.media3.common.FrameProcessingException;
import androidx.media3.common.util.GlUtil;
import androidx.media3.common.util.UnstableApi;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.Executor;
import org.checkerframework.checker.nullness.qual.EnsuresNonNull;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
@ -43,6 +45,7 @@ public abstract class SingleFrameGlTextureProcessor implements GlTextureProcesso
private InputListener inputListener;
private OutputListener outputListener;
private ErrorListener errorListener;
private Executor errorListenerExecutor;
private int inputWidth;
private int inputHeight;
private @MonotonicNonNull TextureInfo outputTexture;
@ -59,6 +62,7 @@ public abstract class SingleFrameGlTextureProcessor implements GlTextureProcesso
inputListener = new InputListener() {};
outputListener = new OutputListener() {};
errorListener = (frameProcessingException) -> {};
errorListenerExecutor = MoreExecutors.directExecutor();
}
/**
@ -104,7 +108,8 @@ public abstract class SingleFrameGlTextureProcessor implements GlTextureProcesso
}
@Override
public final void setErrorListener(ErrorListener errorListener) {
public final void setErrorListener(Executor errorListenerExecutor, ErrorListener errorListener) {
this.errorListenerExecutor = errorListenerExecutor;
this.errorListener = errorListener;
}
@ -129,10 +134,12 @@ public abstract class SingleFrameGlTextureProcessor implements GlTextureProcesso
inputListener.onInputFrameProcessed(inputTexture);
outputListener.onOutputFrameAvailable(outputTexture, presentationTimeUs);
} catch (FrameProcessingException | GlUtil.GlException | RuntimeException e) {
errorListener.onFrameProcessingError(
e instanceof FrameProcessingException
? (FrameProcessingException) e
: new FrameProcessingException(e));
errorListenerExecutor.execute(
() ->
errorListener.onFrameProcessingError(
e instanceof FrameProcessingException
? (FrameProcessingException) e
: new FrameProcessingException(e)));
}
}

View File

@ -42,6 +42,7 @@ import androidx.media3.decoder.DecoderInputBuffer;
import androidx.media3.effect.Presentation;
import androidx.media3.effect.ScaleToFitTransformation;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@ -195,6 +196,7 @@ import org.checkerframework.dataflow.qual.Pure;
}
}
},
MoreExecutors.directExecutor(),
effectsListBuilder.build(),
debugViewProvider,
// HDR colors are only used if the MediaCodec encoder supports FEATURE_HdrEditing.