Simplify concurrency in FrameProcessingTaskExecutor

This CL replaces concurrent collections and atomic primitives with a single
lock, this way the code is easier to reason about.

PiperOrigin-RevId: 496718057
This commit is contained in:
claincly 2022-12-20 20:06:54 +00:00 committed by Tianyi Feng
parent 84c81b8575
commit cb0bc28af2

View File

@ -17,24 +17,27 @@ package androidx.media3.effect;
import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;
import androidx.annotation.GuardedBy;
import androidx.annotation.Nullable;
import androidx.media3.common.FrameProcessingException; import androidx.media3.common.FrameProcessingException;
import androidx.media3.common.FrameProcessor; import androidx.media3.common.FrameProcessor;
import androidx.media3.common.util.GlUtil; import java.util.ArrayDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* Wrapper around a single thread {@link ExecutorService} for executing {@link FrameProcessingTask} * Wrapper around a single thread {@link ExecutorService} for executing {@link FrameProcessingTask}
* instances. * instances.
* *
* <p>Public methods can be called from any thread.
*
* <p>The wrapper handles calling {@link * <p>The wrapper handles calling {@link
* FrameProcessor.Listener#onFrameProcessingError(FrameProcessingException)} for errors that occur * FrameProcessor.Listener#onFrameProcessingError(FrameProcessingException)} for errors that occur
* during these tasks. Errors are assumed to be non-recoverable, so the {@code * during these tasks. The listener is invoked from the {@link ExecutorService}. Errors are assumed
* FrameProcessingTaskExecutor} should be released if an error occurs. * to be non-recoverable, so the {@code FrameProcessingTaskExecutor} should be released if an error
* occurs.
* *
* <p>{@linkplain #submitWithHighPriority(FrameProcessingTask) High priority tasks} are always * <p>{@linkplain #submitWithHighPriority(FrameProcessingTask) High priority tasks} are always
* executed before {@linkplain #submit(FrameProcessingTask) default priority tasks}. Tasks with * executed before {@linkplain #submit(FrameProcessingTask) default priority tasks}. Tasks with
@ -44,33 +47,43 @@ import java.util.concurrent.atomic.AtomicBoolean;
private final ExecutorService singleThreadExecutorService; private final ExecutorService singleThreadExecutorService;
private final FrameProcessor.Listener listener; private final FrameProcessor.Listener listener;
private final ConcurrentLinkedQueue<Future<?>> futures; private final Object lock;
private final ConcurrentLinkedQueue<FrameProcessingTask> highPriorityTasks;
private final AtomicBoolean shouldCancelTasks; @GuardedBy("lock")
private final ArrayDeque<FrameProcessingTask> highPriorityTasks;
@GuardedBy("lock")
private boolean shouldCancelTasks;
/** Creates a new instance. */ /** Creates a new instance. */
public FrameProcessingTaskExecutor( public FrameProcessingTaskExecutor(
ExecutorService singleThreadExecutorService, FrameProcessor.Listener listener) { ExecutorService singleThreadExecutorService, FrameProcessor.Listener listener) {
this.singleThreadExecutorService = singleThreadExecutorService; this.singleThreadExecutorService = singleThreadExecutorService;
this.listener = listener; this.listener = listener;
lock = new Object();
futures = new ConcurrentLinkedQueue<>(); highPriorityTasks = new ArrayDeque<>();
highPriorityTasks = new ConcurrentLinkedQueue<>();
shouldCancelTasks = new AtomicBoolean();
} }
/** /**
* Submits the given {@link FrameProcessingTask} to be executed after all pending tasks have * Submits the given {@link FrameProcessingTask} to be executed after all pending tasks have
* completed. * completed.
*/ */
@SuppressWarnings("FutureReturnValueIgnored")
public void submit(FrameProcessingTask task) { public void submit(FrameProcessingTask task) {
if (shouldCancelTasks.get()) { @Nullable RejectedExecutionException executionException = null;
synchronized (lock) {
if (shouldCancelTasks) {
return; return;
} }
try { try {
futures.add(wrapTaskAndSubmitToExecutorService(task)); wrapTaskAndSubmitToExecutorService(task, /* isReleaseTask= */ false);
} catch (RejectedExecutionException e) { } catch (RejectedExecutionException e) {
handleException(e); executionException = e;
}
}
if (executionException != null) {
handleException(executionException);
} }
} }
@ -82,10 +95,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
* high-priority and have not started executing will be executed after this task is complete. * high-priority and have not started executing will be executed after this task is complete.
*/ */
public void submitWithHighPriority(FrameProcessingTask task) { public void submitWithHighPriority(FrameProcessingTask task) {
if (shouldCancelTasks.get()) { synchronized (lock) {
if (shouldCancelTasks) {
return; return;
} }
highPriorityTasks.add(task); highPriorityTasks.add(task);
}
// If the ExecutorService has non-started tasks, the first of these non-started tasks will run // If the ExecutorService has non-started tasks, the first of these non-started tasks will run
// the task passed to this method. Just in case there are no non-started tasks, submit another // the task passed to this method. Just in case there are no non-started tasks, submit another
// task to run high-priority tasks. // task to run high-priority tasks.
@ -102,9 +117,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/ */
public void release(FrameProcessingTask releaseTask, long releaseWaitTimeMs) public void release(FrameProcessingTask releaseTask, long releaseWaitTimeMs)
throws InterruptedException { throws InterruptedException {
shouldCancelTasks.getAndSet(true); synchronized (lock) {
cancelNonStartedTasks(); shouldCancelTasks = true;
Future<?> releaseFuture = wrapTaskAndSubmitToExecutorService(releaseTask); highPriorityTasks.clear();
}
Future<?> releaseFuture =
wrapTaskAndSubmitToExecutorService(releaseTask, /* isReleaseTask= */ true);
singleThreadExecutorService.shutdown(); singleThreadExecutorService.shutdown();
try { try {
if (!singleThreadExecutorService.awaitTermination(releaseWaitTimeMs, MILLISECONDS)) { if (!singleThreadExecutorService.awaitTermination(releaseWaitTimeMs, MILLISECONDS)) {
@ -116,51 +134,44 @@ import java.util.concurrent.atomic.AtomicBoolean;
} }
} }
private Future<?> wrapTaskAndSubmitToExecutorService(FrameProcessingTask defaultPriorityTask) { private Future<?> wrapTaskAndSubmitToExecutorService(
FrameProcessingTask defaultPriorityTask, boolean isReleaseTask) {
return singleThreadExecutorService.submit( return singleThreadExecutorService.submit(
() -> { () -> {
try { try {
while (!highPriorityTasks.isEmpty()) { synchronized (lock) {
highPriorityTasks.remove().run(); if (shouldCancelTasks && !isReleaseTask) {
return;
}
}
@Nullable FrameProcessingTask nextHighPriorityTask;
while (true) {
synchronized (lock) {
// Lock only polling to prevent blocking the public method calls.
nextHighPriorityTask = highPriorityTasks.poll();
}
if (nextHighPriorityTask == null) {
break;
}
nextHighPriorityTask.run();
} }
defaultPriorityTask.run(); defaultPriorityTask.run();
removeFinishedFutures(); } catch (Exception e) {
} catch (FrameProcessingException | GlUtil.GlException | RuntimeException e) {
handleException(e); handleException(e);
} }
}); });
} }
private void cancelNonStartedTasks() {
while (!futures.isEmpty()) {
futures.remove().cancel(/* mayInterruptIfRunning= */ false);
}
}
private void handleException(Exception exception) { private void handleException(Exception exception) {
if (shouldCancelTasks.getAndSet(true)) { synchronized (lock) {
if (shouldCancelTasks) {
// Ignore exception after cancelation as it can be caused by a previously reported exception // Ignore exception after cancelation as it can be caused by a previously reported exception
// that is the reason for the cancelation. // that is the reason for the cancelation.
return; return;
} }
shouldCancelTasks = true;
}
listener.onFrameProcessingError(FrameProcessingException.from(exception)); listener.onFrameProcessingError(FrameProcessingException.from(exception));
cancelNonStartedTasks();
}
private void removeFinishedFutures() {
while (!futures.isEmpty()) {
if (!futures.element().isDone()) {
return;
}
try {
futures.remove().get();
} catch (ExecutionException impossible) {
// All exceptions are already caught in wrapTaskAndSubmitToExecutorService.
handleException(new IllegalStateException("Unexpected error", impossible));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
handleException(e);
}
}
} }
} }