From 59be732230fe9bfdc27c27d6a6866abacbdda3fe Mon Sep 17 00:00:00 2001 From: Googler Date: Mon, 15 Aug 2022 15:27:21 +0000 Subject: [PATCH] Allow high-priority tasks to be executed before other tasks. This is needed as a pre-requisite for allowing MCVR to control FrameProcessor frame release for previewing. Submitting a high-priority task is conceptually different from posting at the front of a single queue of tasks, as the high-priority tasks are executed in FIFO order among themselves. This will ensure that frame release tasks submitted in close succession are executed in the order they are submitted but before any lower priority tasks. PiperOrigin-RevId: 467675137 --- .../effect/FrameProcessingTaskExecutor.java | 37 ++++++++++++++++--- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/libraries/effect/src/main/java/androidx/media3/effect/FrameProcessingTaskExecutor.java b/libraries/effect/src/main/java/androidx/media3/effect/FrameProcessingTaskExecutor.java index a87356e965..d5e08c7d95 100644 --- a/libraries/effect/src/main/java/androidx/media3/effect/FrameProcessingTaskExecutor.java +++ b/libraries/effect/src/main/java/androidx/media3/effect/FrameProcessingTaskExecutor.java @@ -34,12 +34,17 @@ import java.util.concurrent.atomic.AtomicBoolean; *

The wrapper handles calling {@link * FrameProcessor.Listener#onFrameProcessingError(FrameProcessingException)} for errors that occur * during these tasks. + * + *

{@linkplain #submitWithHighPriority(FrameProcessingTask) High priority tasks} are always + * executed before {@linkplain #submit(FrameProcessingTask) default priority tasks}. Tasks with + * equal priority are executed in FIFO order. */ /* package */ final class FrameProcessingTaskExecutor { private final ExecutorService singleThreadExecutorService; private final FrameProcessor.Listener listener; private final ConcurrentLinkedQueue> futures; + private final ConcurrentLinkedQueue highPriorityTasks; private final AtomicBoolean shouldCancelTasks; /** Creates a new instance. */ @@ -49,11 +54,12 @@ import java.util.concurrent.atomic.AtomicBoolean; this.listener = listener; futures = new ConcurrentLinkedQueue<>(); + highPriorityTasks = new ConcurrentLinkedQueue<>(); shouldCancelTasks = new AtomicBoolean(); } /** - * Submits the given {@link FrameProcessingTask} to be executed after any pending tasks have + * Submits the given {@link FrameProcessingTask} to be executed after all pending tasks have * completed. */ public void submit(FrameProcessingTask task) { @@ -61,7 +67,7 @@ import java.util.concurrent.atomic.AtomicBoolean; return; } try { - futures.add(submitTask(task)); + futures.add(wrapTaskAndSubmitToExecutorService(task)); } catch (RejectedExecutionException e) { if (!shouldCancelTasks.getAndSet(true)) { listener.onFrameProcessingError(new FrameProcessingException(e)); @@ -69,6 +75,24 @@ import java.util.concurrent.atomic.AtomicBoolean; } } + /** + * Submits the given {@link FrameProcessingTask} to be executed after the currently running task + * and all previously submitted high-priority tasks have completed. + * + *

Tasks that were previously {@linkplain #submit(FrameProcessingTask) submitted} without + * high-priority and have not started executing will be executed after this task is complete. + */ + public void submitWithHighPriority(FrameProcessingTask task) { + if (shouldCancelTasks.get()) { + return; + } + highPriorityTasks.add(task); + // If the ExecutorService has non-started tasks, the first of these non-started tasks will run + // the task passed to this method. Just in case there are no non-started tasks, submit another + // task to run high-priority tasks. + submit(() -> {}); + } + /** * Cancels remaining tasks, runs the given release task, and shuts down the background thread. * @@ -83,7 +107,7 @@ import java.util.concurrent.atomic.AtomicBoolean; while (!futures.isEmpty()) { futures.remove().cancel(/* mayInterruptIfRunning= */ false); } - Future releaseFuture = submitTask(releaseTask); + Future releaseFuture = wrapTaskAndSubmitToExecutorService(releaseTask); singleThreadExecutorService.shutdown(); try { if (!singleThreadExecutorService.awaitTermination(releaseWaitTimeMs, MILLISECONDS)) { @@ -95,11 +119,14 @@ import java.util.concurrent.atomic.AtomicBoolean; } } - private Future submitTask(FrameProcessingTask glTask) { + private Future wrapTaskAndSubmitToExecutorService(FrameProcessingTask defaultPriorityTask) { return singleThreadExecutorService.submit( () -> { try { - glTask.run(); + while (!highPriorityTasks.isEmpty()) { + highPriorityTasks.remove().run(); + } + defaultPriorityTask.run(); removeFinishedFutures(); } catch (FrameProcessingException | GlUtil.GlException | RuntimeException e) { listener.onFrameProcessingError(FrameProcessingException.from(e));