diff --git a/libraries/session/src/main/java/androidx/media3/session/ConnectedControllersManager.java b/libraries/session/src/main/java/androidx/media3/session/ConnectedControllersManager.java index 8e557953d2..01f2c9a3e0 100644 --- a/libraries/session/src/main/java/androidx/media3/session/ConnectedControllersManager.java +++ b/libraries/session/src/main/java/androidx/media3/session/ConnectedControllersManager.java @@ -24,8 +24,12 @@ import androidx.collection.ArrayMap; import androidx.media3.common.Player; import androidx.media3.session.MediaSession.ControllerInfo; import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import java.util.ArrayDeque; import java.util.Deque; +import java.util.concurrent.atomic.AtomicBoolean; import org.checkerframework.checker.nullness.qual.NonNull; /** @@ -39,6 +43,17 @@ import org.checkerframework.checker.nullness.qual.NonNull; */ /* package */ final class ConnectedControllersManager { + /** An asynchronous controller command function. */ + public interface AsyncCommand { + + /** + * Runs the asynchronous command. + * + * @return A {@link ListenableFuture} to listen for the command completion. + */ + ListenableFuture run(); + } + private final Object lock; @GuardedBy("lock") @@ -213,34 +228,69 @@ import org.checkerframework.checker.nullness.qual.NonNull; } public void addToCommandQueue(ControllerInfo controllerInfo, Runnable commandRunnable) { - @Nullable ConnectedControllerRecord info; - synchronized (lock) { - info = controllerRecords.get(controllerInfo); - } - if (info != null) { - info.commandQueue.add(commandRunnable); - } - } - - public Deque getAndClearCommandQueue(ControllerInfo controllerInfo) { - Deque commandQueue = new ArrayDeque<>(); synchronized (lock) { @Nullable ConnectedControllerRecord info = controllerRecords.get(controllerInfo); if (info != null) { - commandQueue.addAll(info.commandQueue); - info.commandQueue.clear(); + info.commandQueue.add( + () -> { + commandRunnable.run(); + return Futures.immediateVoidFuture(); + }); } } - return commandQueue; + } + + public void flushCommandQueue(ControllerInfo controllerInfo) { + synchronized (lock) { + @Nullable ConnectedControllerRecord info = controllerRecords.get(controllerInfo); + if (info == null || info.commandQueueIsFlushing || info.commandQueue.isEmpty()) { + return; + } + info.commandQueueIsFlushing = true; + flushCommandQueue(info); + } + } + + @GuardedBy("lock") + private void flushCommandQueue(ConnectedControllerRecord info) { + AtomicBoolean continueRunning = new AtomicBoolean(true); + while (continueRunning.get()) { + continueRunning.set(false); + @Nullable AsyncCommand asyncCommand = info.commandQueue.poll(); + if (asyncCommand == null) { + info.commandQueueIsFlushing = false; + return; + } + AtomicBoolean commandExecuting = new AtomicBoolean(true); + postOrRun( + sessionImpl.getApplicationHandler(), + () -> + asyncCommand + .run() + .addListener( + () -> { + synchronized (lock) { + if (!commandExecuting.get()) { + flushCommandQueue(info); + } else { + continueRunning.set(true); + } + } + }, + MoreExecutors.directExecutor())); + commandExecuting.set(false); + } } private static final class ConnectedControllerRecord { public final T controllerKey; public final SequencedFutureManager sequencedFutureManager; + public final Deque commandQueue; + public SessionCommands sessionCommands; public Player.Commands playerCommands; - public Deque commandQueue; + public boolean commandQueueIsFlushing; public ConnectedControllerRecord( T controllerKey, diff --git a/libraries/session/src/main/java/androidx/media3/session/MediaSessionStub.java b/libraries/session/src/main/java/androidx/media3/session/MediaSessionStub.java index 1b5e554298..d1606270c7 100644 --- a/libraries/session/src/main/java/androidx/media3/session/MediaSessionStub.java +++ b/libraries/session/src/main/java/androidx/media3/session/MediaSessionStub.java @@ -81,7 +81,6 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import java.lang.ref.WeakReference; import java.util.Collections; -import java.util.Deque; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -225,17 +224,34 @@ import java.util.concurrent.ExecutionException; if (controller == null) { return; } - if (command == COMMAND_SET_VIDEO_SURFACE) { - postOrRun( - sessionImpl.getApplicationHandler(), - getSessionTaskWithPlayerCommandRunnable( - controller, seq, command, sessionImpl, task, postTask)); - } else { - connectedControllersManager.addToCommandQueue( - controller, - getSessionTaskWithPlayerCommandRunnable( - controller, seq, command, sessionImpl, task, postTask)); - } + postOrRun( + sessionImpl.getApplicationHandler(), + () -> { + if (!connectedControllersManager.isPlayerCommandAvailable(controller, command)) { + sendSessionResult( + sessionImpl, + controller, + seq, + new SessionResult(SessionResult.RESULT_ERROR_PERMISSION_DENIED)); + return; + } + @SessionResult.Code + int resultCode = sessionImpl.onPlayerCommandRequestOnHandler(controller, command); + if (resultCode != SessionResult.RESULT_SUCCESS) { + // Don't run rejected command. + sendSessionResult(sessionImpl, controller, seq, new SessionResult(resultCode)); + return; + } + if (command == COMMAND_SET_VIDEO_SURFACE) { + getSessionTaskWithPlayerCommandRunnable(controller, seq, sessionImpl, task, postTask) + .run(); + } else { + connectedControllersManager.addToCommandQueue( + controller, + getSessionTaskWithPlayerCommandRunnable( + controller, seq, sessionImpl, task, postTask)); + } + }); } finally { Binder.restoreCallingIdentity(token); } @@ -244,26 +260,10 @@ import java.util.concurrent.ExecutionException; private Runnable getSessionTaskWithPlayerCommandRunnable( ControllerInfo controller, int seq, - @Player.Command int command, K sessionImpl, SessionTask task, PostSessionTask postTask) { return () -> { - if (!connectedControllersManager.isPlayerCommandAvailable(controller, command)) { - sendSessionResult( - sessionImpl, - controller, - seq, - new SessionResult(SessionResult.RESULT_ERROR_PERMISSION_DENIED)); - return; - } - @SessionResult.Code - int resultCode = sessionImpl.onPlayerCommandRequestOnHandler(controller, command); - if (resultCode != SessionResult.RESULT_SUCCESS) { - // Don't run rejected command. - sendSessionResult(sessionImpl, controller, seq, new SessionResult(resultCode)); - return; - } T result = task.run(sessionImpl, controller); postTask.run(sessionImpl, controller, seq, result); }; @@ -1450,17 +1450,9 @@ import java.util.concurrent.ExecutionException; } ControllerInfo controllerInfo = connectedControllersManager.getController(caller.asBinder()); if (controllerInfo != null) { - Deque queue = connectedControllersManager.getAndClearCommandQueue(controllerInfo); postOrRun( sessionImpl.getApplicationHandler(), - () -> { - while (!queue.isEmpty()) { - Runnable runnable = queue.poll(); - if (runnable != null) { - runnable.run(); - } - } - }); + () -> connectedControllersManager.flushCommandQueue(controllerInfo)); } } finally { Binder.restoreCallingIdentity(token);