Allow high-priority tasks to be executed before other tasks.

This is needed as a pre-requisite for allowing MCVR to control
FrameProcessor frame release for previewing.

Submitting a high-priority task is conceptually different from
posting at the front of a single queue of tasks, as the high-priority
tasks are executed in FIFO order among themselves. This will ensure
that frame release tasks submitted in close succession are executed
in the order they are submitted but before any lower priority tasks.

PiperOrigin-RevId: 467675137
This commit is contained in:
Googler 2022-08-15 15:27:21 +00:00 committed by Marc Baechinger
parent b83b16eba7
commit 59be732230

View File

@ -34,12 +34,17 @@ import java.util.concurrent.atomic.AtomicBoolean;
* <p>The wrapper handles calling {@link
* FrameProcessor.Listener#onFrameProcessingError(FrameProcessingException)} for errors that occur
* during these tasks.
*
* <p>{@linkplain #submitWithHighPriority(FrameProcessingTask) High priority tasks} are always
* executed before {@linkplain #submit(FrameProcessingTask) default priority tasks}. Tasks with
* equal priority are executed in FIFO order.
*/
/* package */ final class FrameProcessingTaskExecutor {
private final ExecutorService singleThreadExecutorService;
private final FrameProcessor.Listener listener;
private final ConcurrentLinkedQueue<Future<?>> futures;
private final ConcurrentLinkedQueue<FrameProcessingTask> highPriorityTasks;
private final AtomicBoolean shouldCancelTasks;
/** Creates a new instance. */
@ -49,11 +54,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
this.listener = listener;
futures = new ConcurrentLinkedQueue<>();
highPriorityTasks = new ConcurrentLinkedQueue<>();
shouldCancelTasks = new AtomicBoolean();
}
/**
* Submits the given {@link FrameProcessingTask} to be executed after any pending tasks have
* Submits the given {@link FrameProcessingTask} to be executed after all pending tasks have
* completed.
*/
public void submit(FrameProcessingTask task) {
@ -61,7 +67,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
return;
}
try {
futures.add(submitTask(task));
futures.add(wrapTaskAndSubmitToExecutorService(task));
} catch (RejectedExecutionException e) {
if (!shouldCancelTasks.getAndSet(true)) {
listener.onFrameProcessingError(new FrameProcessingException(e));
@ -69,6 +75,24 @@ import java.util.concurrent.atomic.AtomicBoolean;
}
}
/**
* Submits the given {@link FrameProcessingTask} to be executed after the currently running task
* and all previously submitted high-priority tasks have completed.
*
* <p>Tasks that were previously {@linkplain #submit(FrameProcessingTask) submitted} without
* high-priority and have not started executing will be executed after this task is complete.
*/
public void submitWithHighPriority(FrameProcessingTask task) {
if (shouldCancelTasks.get()) {
return;
}
highPriorityTasks.add(task);
// If the ExecutorService has non-started tasks, the first of these non-started tasks will run
// the task passed to this method. Just in case there are no non-started tasks, submit another
// task to run high-priority tasks.
submit(() -> {});
}
/**
* Cancels remaining tasks, runs the given release task, and shuts down the background thread.
*
@ -83,7 +107,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
while (!futures.isEmpty()) {
futures.remove().cancel(/* mayInterruptIfRunning= */ false);
}
Future<?> releaseFuture = submitTask(releaseTask);
Future<?> releaseFuture = wrapTaskAndSubmitToExecutorService(releaseTask);
singleThreadExecutorService.shutdown();
try {
if (!singleThreadExecutorService.awaitTermination(releaseWaitTimeMs, MILLISECONDS)) {
@ -95,11 +119,14 @@ import java.util.concurrent.atomic.AtomicBoolean;
}
}
private Future<?> submitTask(FrameProcessingTask glTask) {
private Future<?> wrapTaskAndSubmitToExecutorService(FrameProcessingTask defaultPriorityTask) {
return singleThreadExecutorService.submit(
() -> {
try {
glTask.run();
while (!highPriorityTasks.isEmpty()) {
highPriorityTasks.remove().run();
}
defaultPriorityTask.run();
removeFinishedFutures();
} catch (FrameProcessingException | GlUtil.GlException | RuntimeException e) {
listener.onFrameProcessingError(FrameProcessingException.from(e));