From dee80788e4fc5e82739ae014a5d2759b77f03daa Mon Sep 17 00:00:00 2001 From: tonihei Date: Tue, 19 Jul 2022 08:59:30 +0000 Subject: [PATCH] Use Futures for MediaSession command queue instead of Runnables Some commands may be asynchronous and subsequent commands need to wait for them to complete before running. This change updates the queue to use (and listen to) Futures instead of calling Runnables directly. The commands are currently still added as Runanbles though, so this change is a no-op. Also moves the permission check in MediaSessionImpl to before queueing the command because the permission should be check at the time of calling the method. When executing the comamnds in the queue, we need to be careful to avoid recursion in the same thread (which happens when both the Future is immediate and running on the correct thread already). To avoid recursion, we detect this case and loop the commands instead. Issue: androidx/media#85 PiperOrigin-RevId: 461827264 --- .../session/ConnectedControllersManager.java | 80 +++++++++++++++---- .../media3/session/MediaSessionStub.java | 66 +++++++-------- 2 files changed, 94 insertions(+), 52 deletions(-) diff --git a/libraries/session/src/main/java/androidx/media3/session/ConnectedControllersManager.java b/libraries/session/src/main/java/androidx/media3/session/ConnectedControllersManager.java index 8e557953d2..01f2c9a3e0 100644 --- a/libraries/session/src/main/java/androidx/media3/session/ConnectedControllersManager.java +++ b/libraries/session/src/main/java/androidx/media3/session/ConnectedControllersManager.java @@ -24,8 +24,12 @@ import androidx.collection.ArrayMap; import androidx.media3.common.Player; import androidx.media3.session.MediaSession.ControllerInfo; import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import java.util.ArrayDeque; import java.util.Deque; +import java.util.concurrent.atomic.AtomicBoolean; import org.checkerframework.checker.nullness.qual.NonNull; /** @@ -39,6 +43,17 @@ import org.checkerframework.checker.nullness.qual.NonNull; */ /* package */ final class ConnectedControllersManager { + /** An asynchronous controller command function. */ + public interface AsyncCommand { + + /** + * Runs the asynchronous command. + * + * @return A {@link ListenableFuture} to listen for the command completion. + */ + ListenableFuture run(); + } + private final Object lock; @GuardedBy("lock") @@ -213,34 +228,69 @@ import org.checkerframework.checker.nullness.qual.NonNull; } public void addToCommandQueue(ControllerInfo controllerInfo, Runnable commandRunnable) { - @Nullable ConnectedControllerRecord info; - synchronized (lock) { - info = controllerRecords.get(controllerInfo); - } - if (info != null) { - info.commandQueue.add(commandRunnable); - } - } - - public Deque getAndClearCommandQueue(ControllerInfo controllerInfo) { - Deque commandQueue = new ArrayDeque<>(); synchronized (lock) { @Nullable ConnectedControllerRecord info = controllerRecords.get(controllerInfo); if (info != null) { - commandQueue.addAll(info.commandQueue); - info.commandQueue.clear(); + info.commandQueue.add( + () -> { + commandRunnable.run(); + return Futures.immediateVoidFuture(); + }); } } - return commandQueue; + } + + public void flushCommandQueue(ControllerInfo controllerInfo) { + synchronized (lock) { + @Nullable ConnectedControllerRecord info = controllerRecords.get(controllerInfo); + if (info == null || info.commandQueueIsFlushing || info.commandQueue.isEmpty()) { + return; + } + info.commandQueueIsFlushing = true; + flushCommandQueue(info); + } + } + + @GuardedBy("lock") + private void flushCommandQueue(ConnectedControllerRecord info) { + AtomicBoolean continueRunning = new AtomicBoolean(true); + while (continueRunning.get()) { + continueRunning.set(false); + @Nullable AsyncCommand asyncCommand = info.commandQueue.poll(); + if (asyncCommand == null) { + info.commandQueueIsFlushing = false; + return; + } + AtomicBoolean commandExecuting = new AtomicBoolean(true); + postOrRun( + sessionImpl.getApplicationHandler(), + () -> + asyncCommand + .run() + .addListener( + () -> { + synchronized (lock) { + if (!commandExecuting.get()) { + flushCommandQueue(info); + } else { + continueRunning.set(true); + } + } + }, + MoreExecutors.directExecutor())); + commandExecuting.set(false); + } } private static final class ConnectedControllerRecord { public final T controllerKey; public final SequencedFutureManager sequencedFutureManager; + public final Deque commandQueue; + public SessionCommands sessionCommands; public Player.Commands playerCommands; - public Deque commandQueue; + public boolean commandQueueIsFlushing; public ConnectedControllerRecord( T controllerKey, diff --git a/libraries/session/src/main/java/androidx/media3/session/MediaSessionStub.java b/libraries/session/src/main/java/androidx/media3/session/MediaSessionStub.java index 1b5e554298..d1606270c7 100644 --- a/libraries/session/src/main/java/androidx/media3/session/MediaSessionStub.java +++ b/libraries/session/src/main/java/androidx/media3/session/MediaSessionStub.java @@ -81,7 +81,6 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import java.lang.ref.WeakReference; import java.util.Collections; -import java.util.Deque; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -225,17 +224,34 @@ import java.util.concurrent.ExecutionException; if (controller == null) { return; } - if (command == COMMAND_SET_VIDEO_SURFACE) { - postOrRun( - sessionImpl.getApplicationHandler(), - getSessionTaskWithPlayerCommandRunnable( - controller, seq, command, sessionImpl, task, postTask)); - } else { - connectedControllersManager.addToCommandQueue( - controller, - getSessionTaskWithPlayerCommandRunnable( - controller, seq, command, sessionImpl, task, postTask)); - } + postOrRun( + sessionImpl.getApplicationHandler(), + () -> { + if (!connectedControllersManager.isPlayerCommandAvailable(controller, command)) { + sendSessionResult( + sessionImpl, + controller, + seq, + new SessionResult(SessionResult.RESULT_ERROR_PERMISSION_DENIED)); + return; + } + @SessionResult.Code + int resultCode = sessionImpl.onPlayerCommandRequestOnHandler(controller, command); + if (resultCode != SessionResult.RESULT_SUCCESS) { + // Don't run rejected command. + sendSessionResult(sessionImpl, controller, seq, new SessionResult(resultCode)); + return; + } + if (command == COMMAND_SET_VIDEO_SURFACE) { + getSessionTaskWithPlayerCommandRunnable(controller, seq, sessionImpl, task, postTask) + .run(); + } else { + connectedControllersManager.addToCommandQueue( + controller, + getSessionTaskWithPlayerCommandRunnable( + controller, seq, sessionImpl, task, postTask)); + } + }); } finally { Binder.restoreCallingIdentity(token); } @@ -244,26 +260,10 @@ import java.util.concurrent.ExecutionException; private Runnable getSessionTaskWithPlayerCommandRunnable( ControllerInfo controller, int seq, - @Player.Command int command, K sessionImpl, SessionTask task, PostSessionTask postTask) { return () -> { - if (!connectedControllersManager.isPlayerCommandAvailable(controller, command)) { - sendSessionResult( - sessionImpl, - controller, - seq, - new SessionResult(SessionResult.RESULT_ERROR_PERMISSION_DENIED)); - return; - } - @SessionResult.Code - int resultCode = sessionImpl.onPlayerCommandRequestOnHandler(controller, command); - if (resultCode != SessionResult.RESULT_SUCCESS) { - // Don't run rejected command. - sendSessionResult(sessionImpl, controller, seq, new SessionResult(resultCode)); - return; - } T result = task.run(sessionImpl, controller); postTask.run(sessionImpl, controller, seq, result); }; @@ -1450,17 +1450,9 @@ import java.util.concurrent.ExecutionException; } ControllerInfo controllerInfo = connectedControllersManager.getController(caller.asBinder()); if (controllerInfo != null) { - Deque queue = connectedControllersManager.getAndClearCommandQueue(controllerInfo); postOrRun( sessionImpl.getApplicationHandler(), - () -> { - while (!queue.isEmpty()) { - Runnable runnable = queue.poll(); - if (runnable != null) { - runnable.run(); - } - } - }); + () -> connectedControllersManager.flushCommandQueue(controllerInfo)); } } finally { Binder.restoreCallingIdentity(token);