Effect: Move VideoFrameProcessingTask to VFPTaskexecutor

Group things that are closely related. VFPTask is only ever
used alongside VFPTaskExecutor.

PiperOrigin-RevId: 540850948
This commit is contained in:
huangdarwin 2023-06-16 13:33:13 +01:00 committed by Marc Baechinger
parent 251fb013c3
commit 4178d61ccd
7 changed files with 33 additions and 63 deletions

View File

@ -118,7 +118,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
}
@Override
public void setOnFlushCompleteListener(@Nullable VideoFrameProcessingTask task) {
public void setOnFlushCompleteListener(@Nullable VideoFrameProcessingTaskExecutor.Task task) {
// Do nothing.
}

View File

@ -85,7 +85,7 @@ import java.util.concurrent.atomic.AtomicInteger;
@Nullable private volatile FrameInfo currentFrame;
// TODO(b/238302341) Remove the use of after flush task, block the calling thread instead.
@Nullable private volatile VideoFrameProcessingTask onFlushCompleteTask;
@Nullable private volatile VideoFrameProcessingTaskExecutor.Task onFlushCompleteTask;
@Nullable private Future<?> forceSignalEndOfStreamFuture;
// Whether to reject frames from the SurfaceTexture. Accessed only on GL thread.
@ -180,7 +180,7 @@ import java.util.concurrent.atomic.AtomicInteger;
}
@Override
public void setOnFlushCompleteListener(@Nullable VideoFrameProcessingTask task) {
public void setOnFlushCompleteListener(@Nullable VideoFrameProcessingTaskExecutor.Task task) {
onFlushCompleteTask = task;
}

View File

@ -621,7 +621,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
* <p>Must be called on the GL thread.
*/
public synchronized void maybeRenderToSurfaceView(
VideoFrameProcessingTask renderingTask, GlObjectsProvider glObjectsProvider)
VideoFrameProcessingTaskExecutor.Task renderingTask, GlObjectsProvider glObjectsProvider)
throws GlUtil.GlException, VideoFrameProcessingException {
if (surface == null) {
return;

View File

@ -112,7 +112,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
}
@Override
public void setOnFlushCompleteListener(@Nullable VideoFrameProcessingTask task) {
public void setOnFlushCompleteListener(@Nullable VideoFrameProcessingTaskExecutor.Task task) {
// Do nothing.
}

View File

@ -111,7 +111,7 @@ import androidx.media3.common.VideoFrameProcessor;
void signalEndOfInput();
/** Sets the task to run on completing flushing, or {@code null} to clear any task. */
void setOnFlushCompleteListener(@Nullable VideoFrameProcessingTask task);
void setOnFlushCompleteListener(@Nullable VideoFrameProcessingTaskExecutor.Task task);
/**
* Releases all resources.

View File

@ -1,30 +0,0 @@
/*
* Copyright 2022 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package androidx.media3.effect;
import androidx.media3.common.VideoFrameProcessingException;
import androidx.media3.common.util.GlUtil;
import androidx.media3.common.util.UnstableApi;
/**
* Interface for tasks that may throw a {@link GlUtil.GlException} or {@link
* VideoFrameProcessingException}.
*/
@UnstableApi
/* package */ interface VideoFrameProcessingTask {
/** Runs the task. */
void run() throws VideoFrameProcessingException, GlUtil.GlException;
}

View File

@ -21,6 +21,7 @@ import androidx.annotation.GuardedBy;
import androidx.annotation.Nullable;
import androidx.media3.common.VideoFrameProcessingException;
import androidx.media3.common.VideoFrameProcessor;
import androidx.media3.common.util.GlUtil;
import androidx.media3.common.util.UnstableApi;
import java.util.ArrayDeque;
import java.util.Queue;
@ -31,8 +32,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
/**
* Wrapper around a single thread {@link ExecutorService} for executing {@link
* VideoFrameProcessingTask} instances.
* Wrapper around a single thread {@link ExecutorService} for executing {@link Task} instances.
*
* <p>Public methods can be called from any thread.
*
@ -42,19 +42,27 @@ import java.util.concurrent.RejectedExecutionException;
* non-recoverable, so the {@code VideoFrameProcessingTaskExecutor} should be released if an error
* occurs.
*
* <p>{@linkplain #submitWithHighPriority(VideoFrameProcessingTask) High priority tasks} are always
* executed before {@linkplain #submit(VideoFrameProcessingTask) default priority tasks}. Tasks with
* equal priority are executed in FIFO order.
* <p>{@linkplain #submitWithHighPriority(Task) High priority tasks} are always executed before
* {@linkplain #submit(Task) default priority tasks}. Tasks with equal priority are executed in FIFO
* order.
*/
@UnstableApi
/* package */ final class VideoFrameProcessingTaskExecutor {
/**
* Interface for tasks that may throw a {@link GlUtil.GlException} or {@link
* VideoFrameProcessingException}.
*/
public interface Task {
/** Runs the task. */
void run() throws VideoFrameProcessingException, GlUtil.GlException;
}
private final ExecutorService singleThreadExecutorService;
private final VideoFrameProcessor.Listener listener;
private final Object lock;
@GuardedBy("lock")
private final Queue<VideoFrameProcessingTask> highPriorityTasks;
private final Queue<Task> highPriorityTasks;
@GuardedBy("lock")
private boolean shouldCancelTasks;
@ -68,12 +76,9 @@ import java.util.concurrent.RejectedExecutionException;
highPriorityTasks = new ArrayDeque<>();
}
/**
* Submits the given {@link VideoFrameProcessingTask} to be executed after all pending tasks have
* completed.
*/
/** Submits the given {@link Task} to be executed after all pending tasks have completed. */
@SuppressWarnings("FutureReturnValueIgnored")
public void submit(VideoFrameProcessingTask task) {
public void submit(Task task) {
@Nullable RejectedExecutionException executionException = null;
synchronized (lock) {
if (shouldCancelTasks) {
@ -91,11 +96,8 @@ import java.util.concurrent.RejectedExecutionException;
}
}
/**
* Submits the given {@link VideoFrameProcessingTask} to execute, and returns after the task is
* executed.
*/
public void submitAndBlock(VideoFrameProcessingTask task) {
/** Submits the given {@link Task} to execute, and returns after the task is executed. */
public void submitAndBlock(Task task) {
synchronized (lock) {
if (shouldCancelTasks) {
return;
@ -114,13 +116,13 @@ import java.util.concurrent.RejectedExecutionException;
}
/**
* Submits the given {@link VideoFrameProcessingTask} to be executed after the currently running
* task and all previously submitted high-priority tasks have completed.
* Submits the given {@link Task} to be executed after the currently running task and all
* previously submitted high-priority tasks have completed.
*
* <p>Tasks that were previously {@linkplain #submit(VideoFrameProcessingTask) submitted} without
* high-priority and have not started executing will be executed after this task is complete.
* <p>Tasks that were previously {@linkplain #submit(Task) submitted} without high-priority and
* have not started executing will be executed after this task is complete.
*/
public void submitWithHighPriority(VideoFrameProcessingTask task) {
public void submitWithHighPriority(Task task) {
synchronized (lock) {
if (shouldCancelTasks) {
return;
@ -162,13 +164,11 @@ import java.util.concurrent.RejectedExecutionException;
/**
* Cancels remaining tasks, runs the given release task, and shuts down the background thread.
*
* @param releaseTask A {@link VideoFrameProcessingTask} to execute before shutting down the
* background thread.
* @param releaseTask A {@link Task} to execute before shutting down the background thread.
* @param releaseWaitTimeMs How long to wait for the release task to terminate, in milliseconds.
* @throws InterruptedException If interrupted while releasing resources.
*/
public void release(VideoFrameProcessingTask releaseTask, long releaseWaitTimeMs)
throws InterruptedException {
public void release(Task releaseTask, long releaseWaitTimeMs) throws InterruptedException {
synchronized (lock) {
shouldCancelTasks = true;
highPriorityTasks.clear();
@ -184,7 +184,7 @@ import java.util.concurrent.RejectedExecutionException;
}
private Future<?> wrapTaskAndSubmitToExecutorService(
VideoFrameProcessingTask defaultPriorityTask, boolean isFlushOrReleaseTask) {
Task defaultPriorityTask, boolean isFlushOrReleaseTask) {
return singleThreadExecutorService.submit(
() -> {
try {
@ -194,7 +194,7 @@ import java.util.concurrent.RejectedExecutionException;
}
}
@Nullable VideoFrameProcessingTask nextHighPriorityTask;
@Nullable Task nextHighPriorityTask;
while (true) {
synchronized (lock) {
// Lock only polling to prevent blocking the public method calls.