Simplify EOS signaling in DefaultVideoFrameProcessor

Previously, TextureMangers have a method to signal ending of a current input
stream, and a method to end the **entire input**. The responsibility of both
methods are not easy to document, understand and read.

With the new design,

- Only `TextureManager.signalEndOfCurrentInputStream()` is kept
  - It's called for every MediaItem in the sequence, include the final one
- FinalWrapper now takes explicit signal that frame processing is ending,
  rather than relying on the return value of `onCurrentInputStreamProcessed()`
- On DVFP receiving EOS from the pipeline, it signals FinalWrapper the stream
  is ending, **before** signaling the input switcher, so that FinalWrapper is
  able to end the stream when the onCurrentInputStreamEnded signal eventually
  reaches FinalWrapper

PiperOrigin-RevId: 540856680
This commit is contained in:
claincly 2023-06-16 14:03:42 +01:00 committed by Marc Baechinger
parent 0d3082e6ad
commit fe33f0e390
7 changed files with 17 additions and 75 deletions

View File

@ -102,11 +102,6 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
@Override @Override
public void signalEndOfCurrentInputStream() { public void signalEndOfCurrentInputStream() {
signalEndOfInput();
}
@Override
public void signalEndOfInput() {
videoFrameProcessingTaskExecutor.submit( videoFrameProcessingTaskExecutor.submit(
() -> { () -> {
if (framesToQueueForCurrentBitmap == 0 && pendingBitmaps.isEmpty()) { if (framesToQueueForCurrentBitmap == 0 && pendingBitmaps.isEmpty()) {

View File

@ -54,8 +54,6 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.CanIgnoreReturnValue; import com.google.errorprone.annotations.CanIgnoreReturnValue;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
@ -298,9 +296,9 @@ public final class DefaultVideoFrameProcessor implements VideoFrameProcessor {
// Shader programs that apply Effects. // Shader programs that apply Effects.
private final List<GlShaderProgram> intermediateGlShaderPrograms; private final List<GlShaderProgram> intermediateGlShaderPrograms;
// A queue of input streams that have not been fully processed identified by their input types. // Whether DefaultVideoFrameProcessor is currently processing an input stream.
@GuardedBy("lock") @GuardedBy("lock")
private final Queue<@InputType Integer> unprocessedInputStreams; private boolean processingInput;
private final List<Effect> activeEffects; private final List<Effect> activeEffects;
private final Object lock; private final Object lock;
@ -334,7 +332,6 @@ public final class DefaultVideoFrameProcessor implements VideoFrameProcessor {
this.listener = listener; this.listener = listener;
this.listenerExecutor = listenerExecutor; this.listenerExecutor = listenerExecutor;
this.renderFramesAutomatically = renderFramesAutomatically; this.renderFramesAutomatically = renderFramesAutomatically;
this.unprocessedInputStreams = new ConcurrentLinkedQueue<>();
this.activeEffects = new ArrayList<>(); this.activeEffects = new ArrayList<>();
this.lock = new Object(); this.lock = new Object();
this.outputColorInfo = outputColorInfo; this.outputColorInfo = outputColorInfo;
@ -342,12 +339,18 @@ public final class DefaultVideoFrameProcessor implements VideoFrameProcessor {
this.finalShaderProgramWrapper = finalShaderProgramWrapper; this.finalShaderProgramWrapper = finalShaderProgramWrapper;
finalShaderProgramWrapper.setOnInputStreamProcessedListener( finalShaderProgramWrapper.setOnInputStreamProcessedListener(
() -> { () -> {
boolean inputEndedAfterThisInputStream;
synchronized (lock) { synchronized (lock) {
unprocessedInputStreams.remove(); processingInput = false;
// inputStreamEnded could be overwritten right after counting down the latch.
inputEndedAfterThisInputStream = this.inputStreamEnded;
if (latch != null) { if (latch != null) {
latch.countDown(); latch.countDown();
} }
return inputStreamEnded && unprocessedInputStreams.isEmpty(); }
if (inputEndedAfterThisInputStream) {
listenerExecutor.execute(listener::onEnded);
DebugTraceUtil.recordVideoFrameProcessorSignalEos();
} }
}); });
this.intermediateGlShaderPrograms = new ArrayList<>(intermediateGlShaderPrograms); this.intermediateGlShaderPrograms = new ArrayList<>(intermediateGlShaderPrograms);
@ -412,9 +415,9 @@ public final class DefaultVideoFrameProcessor implements VideoFrameProcessor {
@Override @Override
public void registerInputStream(@InputType int inputType, List<Effect> effects) { public void registerInputStream(@InputType int inputType, List<Effect> effects) {
synchronized (lock) { synchronized (lock) {
if (unprocessedInputStreams.isEmpty()) { if (!processingInput) {
inputSwitcher.switchToInput(inputType); inputSwitcher.switchToInput(inputType);
unprocessedInputStreams.add(inputType); processingInput = true;
activeEffects.clear(); activeEffects.clear();
activeEffects.addAll(effects); activeEffects.addAll(effects);
return; return;
@ -432,7 +435,7 @@ public final class DefaultVideoFrameProcessor implements VideoFrameProcessor {
} }
synchronized (lock) { synchronized (lock) {
unprocessedInputStreams.add(inputType); processingInput = true;
} }
if (!activeEffects.equals(effects)) { if (!activeEffects.equals(effects)) {
@ -526,15 +529,7 @@ public final class DefaultVideoFrameProcessor implements VideoFrameProcessor {
DebugTraceUtil.recordVideoFrameProcessorReceiveDecoderEos(); DebugTraceUtil.recordVideoFrameProcessorReceiveDecoderEos();
checkState(!inputStreamEnded); checkState(!inputStreamEnded);
inputStreamEnded = true; inputStreamEnded = true;
boolean allInputStreamsProcessed; inputSwitcher.signalEndOfCurrentInputStream();
synchronized (lock) {
allInputStreamsProcessed = unprocessedInputStreams.isEmpty();
}
if (allInputStreamsProcessed) {
inputSwitcher.signalEndOfInput();
} else {
inputSwitcher.signalEndOfCurrentInputStream();
}
} }
@Override @Override

View File

@ -15,7 +15,6 @@
*/ */
package androidx.media3.effect; package androidx.media3.effect;
import static androidx.media3.common.util.Assertions.checkState;
import static androidx.media3.common.util.Assertions.checkStateNotNull; import static androidx.media3.common.util.Assertions.checkStateNotNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;
@ -74,9 +73,6 @@ import java.util.concurrent.atomic.AtomicInteger;
// Read and written only on GL thread. // Read and written only on GL thread.
private int availableFrameCount; private int availableFrameCount;
// Read and written on the GL thread only.
private boolean inputStreamEnded;
// Read and written on the GL thread only. // Read and written on the GL thread only.
private boolean currentInputStreamEnded; private boolean currentInputStreamEnded;
@ -198,7 +194,6 @@ import java.util.concurrent.atomic.AtomicInteger;
*/ */
@Override @Override
public void registerInputFrame(FrameInfo frame) { public void registerInputFrame(FrameInfo frame) {
checkState(!inputStreamEnded);
pendingFrames.add(frame); pendingFrames.add(frame);
videoFrameProcessingTaskExecutor.submit(() -> shouldRejectIncomingFrames = false); videoFrameProcessingTaskExecutor.submit(() -> shouldRejectIncomingFrames = false);
} }
@ -229,12 +224,6 @@ import java.util.concurrent.atomic.AtomicInteger;
}); });
} }
@Override
public void signalEndOfInput() {
// TODO(b/274109008) Consider remove inputStreamEnded boolean.
videoFrameProcessingTaskExecutor.submit(() -> inputStreamEnded = true);
}
@Override @Override
public void release() { public void release() {
surfaceTexture.release(); surfaceTexture.release();

View File

@ -73,7 +73,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
* Returns whether {@link FinalShaderProgramWrapper} should invoke {@link * Returns whether {@link FinalShaderProgramWrapper} should invoke {@link
* VideoFrameProcessor.Listener#signalEndOfInput}. * VideoFrameProcessor.Listener#signalEndOfInput}.
*/ */
boolean onInputStreamProcessed(); void onInputStreamProcessed();
} }
private static final String TAG = "FinalShaderWrapper"; private static final String TAG = "FinalShaderWrapper";
@ -193,12 +193,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
@Override @Override
public void signalEndOfCurrentInputStream() { public void signalEndOfCurrentInputStream() {
frameProcessingStarted = true; frameProcessingStarted = true;
boolean frameProcessingEnded = checkNotNull(onInputStreamProcessedListener).onInputStreamProcessed();
checkNotNull(onInputStreamProcessedListener).onInputStreamProcessed();
if (frameProcessingEnded) {
DebugTraceUtil.recordVideoFrameProcessorSignalEos();
videoFrameProcessorListenerExecutor.execute(videoFrameProcessorListener::onEnded);
}
} }
// Methods that must be called on the GL thread. // Methods that must be called on the GL thread.

View File

@ -45,7 +45,6 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
private @MonotonicNonNull GlShaderProgram downstreamShaderProgram; private @MonotonicNonNull GlShaderProgram downstreamShaderProgram;
private @MonotonicNonNull TextureManager activeTextureManager; private @MonotonicNonNull TextureManager activeTextureManager;
private boolean inputEnded;
public InputSwitcher( public InputSwitcher(
Context context, Context context,
@ -185,16 +184,6 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
checkNotNull(activeTextureManager).signalEndOfCurrentInputStream(); checkNotNull(activeTextureManager).signalEndOfCurrentInputStream();
} }
/** Signals end of input to all {@linkplain #registerInput registered inputs}. */
public void signalEndOfInput() {
checkState(!inputEnded);
inputEnded = true;
for (int i = 0; i < inputs.size(); i++) {
@VideoFrameProcessor.InputType int inputType = inputs.keyAt(i);
inputs.get(inputType).signalEndOfInput();
}
}
/** Releases the resources. */ /** Releases the resources. */
public void release() throws VideoFrameProcessingException { public void release() throws VideoFrameProcessingException {
for (int i = 0; i < inputs.size(); i++) { for (int i = 0; i < inputs.size(); i++) {
@ -232,10 +221,6 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
gatedChainingListenerWrapper.setActive(active); gatedChainingListenerWrapper.setActive(active);
} }
public void signalEndOfInput() {
textureManager.signalEndOfInput();
}
public void release() throws VideoFrameProcessingException { public void release() throws VideoFrameProcessingException {
textureManager.release(); textureManager.release();
samplingGlShaderProgram.release(); samplingGlShaderProgram.release();

View File

@ -106,11 +106,6 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
videoFrameProcessingTaskExecutor.submit(frameConsumptionManager::signalEndOfCurrentStream); videoFrameProcessingTaskExecutor.submit(frameConsumptionManager::signalEndOfCurrentStream);
} }
@Override
public void signalEndOfInput() {
// Do nothing.
}
@Override @Override
public void setOnFlushCompleteListener(@Nullable VideoFrameProcessingTaskExecutor.Task task) { public void setOnFlushCompleteListener(@Nullable VideoFrameProcessingTaskExecutor.Task task) {
// Do nothing. // Do nothing.

View File

@ -95,21 +95,9 @@ import androidx.media3.common.VideoFrameProcessor;
/** See {@link VideoFrameProcessor#getPendingInputFrameCount}. */ /** See {@link VideoFrameProcessor#getPendingInputFrameCount}. */
int getPendingFrameCount(); int getPendingFrameCount();
/** /** Signals the end of the current input stream. */
* Signals the end of the current input stream.
*
* <p>This method must be called on the last input stream, before calling {@link
* #signalEndOfInput}.
*/
void signalEndOfCurrentInputStream(); void signalEndOfCurrentInputStream();
/**
* Signals the end of the input.
*
* @see VideoFrameProcessor#signalEndOfInput()
*/
void signalEndOfInput();
/** Sets the task to run on completing flushing, or {@code null} to clear any task. */ /** Sets the task to run on completing flushing, or {@code null} to clear any task. */
void setOnFlushCompleteListener(@Nullable VideoFrameProcessingTaskExecutor.Task task); void setOnFlushCompleteListener(@Nullable VideoFrameProcessingTaskExecutor.Task task);