Support flushing in FrameProcessor

Flushing resets all the texture processors within the `FrameProcessor`. This
includes:

- At the back, the FinalMatrixTextureProcessorWrapper, and its MatrixTextureProcessor
- At the front, the ExternalTextureManager
- All the texture processors in between
- All the ChainingGlTextureProcessorListeners in between texture processors
- All the internal states in the aforementioned components

The flush process follows the order, from `GlEffectsFrameProcessor.flush()`

1. Flush the `FrameProcessingTaskExecutor`, so that after it returns, all tasks queued before calling `flush()` completes
2. Post to `FrameProcessingTaskExecutor`, to flush the `FinalMatrixTextureProcessorWrapper`
3. Flushing the `FinalMatrixTextureProcessorWrapper` will propagate flushing through, via the `ChainingGlTextureProcessorListener`

Startblock:
   has LGTM from christosts
   and then
   add reviewer andrewlewis
PiperOrigin-RevId: 506296469
This commit is contained in:
claincly 2023-02-01 14:14:54 +00:00 committed by christosts
parent eb6c1a5254
commit 4a1cf3d839
12 changed files with 210 additions and 25 deletions

View File

@ -231,6 +231,12 @@ import java.util.concurrent.Future;
}
}
@Override
public void flush() {
// TODO(b/238302341) Support seeking in MediaPipeProcessor.
throw new UnsupportedOperationException();
}
@Override
public void release() {
if (isSingleFrameGraph) {

View File

@ -211,6 +211,16 @@ public interface FrameProcessor {
*/
void signalEndOfInput();
/**
* Flushes the {@code FrameProcessor}.
*
* <p>All the frames that are {@linkplain #registerInputFrame() registered} prior to calling this
* method are no longer considered to be registered when this method returns.
*
* <p>{@link Listener} methods invoked prior to calling this method should be ignored.
*/
void flush();
/**
* Releases all resources.
*

View File

@ -420,6 +420,11 @@ public final class GlEffectsFrameProcessorFrameReleaseTest {
throw new UnsupportedOperationException();
}
@Override
public void flush() {
throw new UnsupportedOperationException();
}
@Override
public void release() {
// Do nothing as destroying the OpenGL context destroys the texture.

View File

@ -91,6 +91,13 @@ import java.util.Queue;
() -> producingGlTextureProcessor.releaseOutputFrame(inputTexture));
}
@Override
public synchronized void onFlush() {
consumingGlTextureProcessorInputCapacity = 0;
availableFrames.clear();
frameProcessingTaskExecutor.submit(producingGlTextureProcessor::flush);
}
@Override
public synchronized void onOutputFrameAvailable(
TextureInfo outputTexture, long presentationTimeUs) {

View File

@ -43,11 +43,14 @@ import java.util.concurrent.atomic.AtomicInteger;
private final float[] textureTransformMatrix;
private final Queue<FrameInfo> pendingFrames;
// Incremented on any thread when a frame becomes available on the surfaceTexture, decremented on
// the GL thread only.
private final AtomicInteger availableFrameCount;
// Incremented on any thread, decremented on the GL thread only.
private final AtomicInteger externalTextureProcessorInputCapacity;
// Counts the frames that are registered before flush but are made available after flush.
// Read and written only on GL thread.
private int numberOfFramesToDropOnBecomingAvailable;
// Read and written only on GL thread.
private int availableFrameCount;
// Set to true on any thread. Read on the GL thread only.
private volatile boolean inputStreamEnded;
@ -55,6 +58,8 @@ import java.util.concurrent.atomic.AtomicInteger;
// Set to null on any thread. Read and set to non-null on the GL thread only.
@Nullable private volatile FrameInfo currentFrame;
@Nullable private volatile FrameProcessingTask onFlushCompleteTask;
private long previousStreamOffsetUs;
/**
@ -79,30 +84,53 @@ import java.util.concurrent.atomic.AtomicInteger;
surfaceTexture = new SurfaceTexture(externalTexId);
textureTransformMatrix = new float[16];
pendingFrames = new ConcurrentLinkedQueue<>();
availableFrameCount = new AtomicInteger();
externalTextureProcessorInputCapacity = new AtomicInteger();
previousStreamOffsetUs = C.TIME_UNSET;
}
public SurfaceTexture getSurfaceTexture() {
surfaceTexture.setOnFrameAvailableListener(
unused -> {
availableFrameCount.getAndIncrement();
frameProcessingTaskExecutor.submit(this::maybeQueueFrameToExternalTextureProcessor);
});
unused ->
frameProcessingTaskExecutor.submit(
() -> {
if (numberOfFramesToDropOnBecomingAvailable > 0) {
numberOfFramesToDropOnBecomingAvailable--;
surfaceTexture.updateTexImage();
} else {
availableFrameCount++;
maybeQueueFrameToExternalTextureProcessor();
}
}));
return surfaceTexture;
}
@Override
public void onReadyToAcceptInputFrame() {
externalTextureProcessorInputCapacity.getAndIncrement();
frameProcessingTaskExecutor.submit(this::maybeQueueFrameToExternalTextureProcessor);
frameProcessingTaskExecutor.submit(
() -> {
externalTextureProcessorInputCapacity.incrementAndGet();
maybeQueueFrameToExternalTextureProcessor();
});
}
@Override
public void onInputFrameProcessed(TextureInfo inputTexture) {
currentFrame = null;
frameProcessingTaskExecutor.submit(this::maybeQueueFrameToExternalTextureProcessor);
frameProcessingTaskExecutor.submit(
() -> {
currentFrame = null;
maybeQueueFrameToExternalTextureProcessor();
});
}
/** Sets the task to run on completing flushing, or {@code null} to clear any task. */
public void setOnFlushCompleteListener(@Nullable FrameProcessingTask task) {
onFlushCompleteTask = task;
}
@Override
public void onFlush() {
frameProcessingTaskExecutor.submit(this::flush);
}
/**
@ -131,32 +159,51 @@ import java.util.concurrent.atomic.AtomicInteger;
*
* @see FrameProcessor#signalEndOfInput()
*/
@WorkerThread
public void signalEndOfInput() {
inputStreamEnded = true;
if (pendingFrames.isEmpty() && currentFrame == null) {
externalTextureProcessor.signalEndOfCurrentInputStream();
}
frameProcessingTaskExecutor.submit(
() -> {
inputStreamEnded = true;
if (pendingFrames.isEmpty() && currentFrame == null) {
externalTextureProcessor.signalEndOfCurrentInputStream();
}
});
}
public void release() {
surfaceTexture.release();
}
@WorkerThread
private void flush() {
// A frame that is registered before flush may arrive after flush.
numberOfFramesToDropOnBecomingAvailable = pendingFrames.size() - availableFrameCount;
while (availableFrameCount > 0) {
availableFrameCount--;
surfaceTexture.updateTexImage();
}
externalTextureProcessorInputCapacity.set(0);
currentFrame = null;
pendingFrames.clear();
if (onFlushCompleteTask != null) {
frameProcessingTaskExecutor.submitWithHighPriority(onFlushCompleteTask);
}
}
@WorkerThread
private void maybeQueueFrameToExternalTextureProcessor() {
if (externalTextureProcessorInputCapacity.get() == 0
|| availableFrameCount.get() == 0
|| availableFrameCount == 0
|| currentFrame != null) {
return;
}
surfaceTexture.updateTexImage();
availableFrameCount.getAndDecrement();
availableFrameCount--;
this.currentFrame = pendingFrames.peek();
FrameInfo currentFrame = checkStateNotNull(this.currentFrame);
externalTextureProcessorInputCapacity.getAndDecrement();
externalTextureProcessorInputCapacity.decrementAndGet();
surfaceTexture.getTransformMatrix(textureTransformMatrix);
externalTextureProcessor.setTextureTransformMatrix(textureTransformMatrix);
long frameTimeNs = surfaceTexture.getTimestamp();

View File

@ -184,13 +184,24 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
@Override
public void signalEndOfCurrentInputStream() {
checkState(!streamOffsetUsQueue.isEmpty(), "No input stream to end.");
android.util.Log.e("LYC", "Signal end");
streamOffsetUsQueue.remove();
if (streamOffsetUsQueue.isEmpty()) {
frameProcessorListenerExecutor.execute(frameProcessorListener::onFrameProcessingEnded);
}
}
@Override
public void flush() {
// Drops all frames that aren't released yet.
availableFrames.clear();
if (matrixTextureProcessor != null) {
matrixTextureProcessor.flush();
}
inputListener.onFlush();
inputListener.onReadyToAcceptInputFrame();
}
@Override
@WorkerThread
public synchronized void release() throws FrameProcessingException {

View File

@ -156,6 +156,16 @@ import java.util.concurrent.Executor;
outputListener.onCurrentOutputStreamEnded();
}
@Override
public void flush() {
freeOutputTextures.addAll(inUseOutputTextures);
inUseOutputTextures.clear();
inputListener.onFlush();
for (int i = 0; i < freeOutputTextures.size(); i++) {
inputListener.onReadyToAcceptInputFrame();
}
}
@Override
public void release() throws FrameProcessingException {
try {

View File

@ -22,6 +22,7 @@ import androidx.annotation.Nullable;
import androidx.media3.common.FrameProcessingException;
import androidx.media3.common.FrameProcessor;
import java.util.ArrayDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@ -76,7 +77,7 @@ import java.util.concurrent.RejectedExecutionException;
return;
}
try {
wrapTaskAndSubmitToExecutorService(task, /* isReleaseTask= */ false);
wrapTaskAndSubmitToExecutorService(task, /* isFlushOrReleaseTask= */ false);
} catch (RejectedExecutionException e) {
executionException = e;
}
@ -107,6 +108,32 @@ import java.util.concurrent.RejectedExecutionException;
submit(() -> {});
}
/**
* Flushes all scheduled tasks.
*
* <p>During flush, the {@code FrameProcessingTaskExecutor} ignores the {@linkplain #submit
* submission of new tasks}. The tasks that are submitted before flushing are either executed or
* canceled when this method returns.
*/
@SuppressWarnings("FutureReturnValueIgnored")
public void flush() throws InterruptedException {
synchronized (lock) {
shouldCancelTasks = true;
highPriorityTasks.clear();
}
CountDownLatch latch = new CountDownLatch(1);
wrapTaskAndSubmitToExecutorService(
() -> {
synchronized (lock) {
shouldCancelTasks = false;
}
latch.countDown();
},
/* isFlushOrReleaseTask= */ true);
latch.await();
}
/**
* Cancels remaining tasks, runs the given release task, and shuts down the background thread.
*
@ -122,7 +149,7 @@ import java.util.concurrent.RejectedExecutionException;
highPriorityTasks.clear();
}
Future<?> releaseFuture =
wrapTaskAndSubmitToExecutorService(releaseTask, /* isReleaseTask= */ true);
wrapTaskAndSubmitToExecutorService(releaseTask, /* isFlushOrReleaseTask= */ true);
singleThreadExecutorService.shutdown();
try {
if (!singleThreadExecutorService.awaitTermination(releaseWaitTimeMs, MILLISECONDS)) {
@ -135,12 +162,12 @@ import java.util.concurrent.RejectedExecutionException;
}
private Future<?> wrapTaskAndSubmitToExecutorService(
FrameProcessingTask defaultPriorityTask, boolean isReleaseTask) {
FrameProcessingTask defaultPriorityTask, boolean isFlushOrReleaseTask) {
return singleThreadExecutorService.submit(
() -> {
try {
synchronized (lock) {
if (shouldCancelTasks && !isReleaseTask) {
if (shouldCancelTasks && !isFlushOrReleaseTask) {
return;
}
}

View File

@ -44,6 +44,7 @@ import androidx.media3.common.util.Util;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
@ -457,6 +458,20 @@ public final class GlEffectsFrameProcessor implements FrameProcessor {
frameProcessingTaskExecutor.submit(inputExternalTextureManager::signalEndOfInput);
}
@Override
public void flush() {
try {
frameProcessingTaskExecutor.flush();
CountDownLatch latch = new CountDownLatch(1);
inputExternalTextureManager.setOnFlushCompleteListener(latch::countDown);
frameProcessingTaskExecutor.submit(finalTextureProcessorWrapper::flush);
latch.await();
inputExternalTextureManager.setOnFlushCompleteListener(null);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void release() {
try {

View File

@ -70,6 +70,14 @@ public interface GlTextureProcessor {
* #queueInputFrame(TextureInfo, long) queue} the input frame.
*/
default void onInputFrameProcessed(TextureInfo inputTexture) {}
/**
* Called when the {@link GlTextureProcessor} has been flushed.
*
* <p>The implementation shall not assume the {@link GlTextureProcessor} is {@linkplain
* #onReadyToAcceptInputFrame ready to accept another input frame} when this method is called.
*/
default void onFlush() {}
}
/**
@ -170,6 +178,15 @@ public interface GlTextureProcessor {
*/
void signalEndOfCurrentInputStream();
/**
* Flushes the {@code GlTextureProcessor}.
*
* <p>The texture processor should reclaim the ownership of its allocated textures, {@linkplain
* InputListener#onFlush notify} its {@link InputListener} about the flush event, and {@linkplain
* InputListener#onReadyToAcceptInputFrame report its availability} if necessary.
*/
void flush();
/**
* Releases all resources.
*

View File

@ -174,6 +174,14 @@ public abstract class SingleFrameGlTextureProcessor implements GlTextureProcesso
outputListener.onCurrentOutputStreamEnded();
}
@Override
@CallSuper
public void flush() {
outputTextureInUse = false;
inputListener.onFlush();
inputListener.onReadyToAcceptInputFrame();
}
@Override
@CallSuper
public void release() throws FrameProcessingException {

View File

@ -563,6 +563,9 @@ public class MediaCodecVideoRenderer extends MediaCodecRenderer {
@Override
protected void onPositionReset(long positionUs, boolean joining) throws ExoPlaybackException {
super.onPositionReset(positionUs, joining);
if (frameProcessorManager.isEnabled()) {
frameProcessorManager.flush();
}
clearRenderedFirstFrame();
frameReleaseHelper.onPositionReset();
lastBufferPresentationTimeUs = C.TIME_UNSET;
@ -1910,6 +1913,25 @@ public class MediaCodecVideoRenderer extends MediaCodecRenderer {
return releasedLastFrame;
}
/**
* Flushes the {@link FrameProcessor}.
*
* <p>Caller must ensure frame processing {@linkplain #isEnabled() is enabled} before calling
* this method.
*/
public void flush() {
checkStateNotNull(frameProcessor);
frameProcessor.flush();
processedFramesTimestampsUs.clear();
handler.removeCallbacksAndMessages(/* token= */ null);
if (registeredLastFrame) {
registeredLastFrame = false;
processedLastFrame = false;
releasedLastFrame = false;
}
}
/**
* Tries to enable frame processing.
*