Skip to content

Commit

Permalink
Use Futures for MediaSession command queue instead of Runnables
Browse files Browse the repository at this point in the history
Some commands may be asynchronous and subsequent commands need to
wait for them to complete before running. This change updates the
queue to use (and listen to) Futures instead of calling Runnables
directly. The commands are currently still added as Runanbles
though, so this change is a no-op.

Also moves the permission check in MediaSessionImpl to before
queueing the command because the permission should be check at
the time of calling the method.

When executing the comamnds in the queue, we need to be careful
to avoid recursion in the same thread (which happens when both
the Future is immediate and running on the correct thread already).
To avoid recursion, we detect this case and loop the commands
instead.

Issue: #85
PiperOrigin-RevId: 461827264
  • Loading branch information
tonihei authored and rohitjoins committed Jul 21, 2022
1 parent ba9c9bb commit dee8078
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@
import androidx.media3.common.Player;
import androidx.media3.session.MediaSession.ControllerInfo;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.atomic.AtomicBoolean;
import org.checkerframework.checker.nullness.qual.NonNull;

/**
Expand All @@ -39,6 +43,17 @@
*/
/* package */ final class ConnectedControllersManager<T extends @NonNull Object> {

/** An asynchronous controller command function. */
public interface AsyncCommand {

/**
* Runs the asynchronous command.
*
* @return A {@link ListenableFuture} to listen for the command completion.
*/
ListenableFuture<Void> run();
}

private final Object lock;

@GuardedBy("lock")
Expand Down Expand Up @@ -213,34 +228,69 @@ public ControllerInfo getController(T controllerKey) {
}

public void addToCommandQueue(ControllerInfo controllerInfo, Runnable commandRunnable) {
@Nullable ConnectedControllerRecord<T> info;
synchronized (lock) {
info = controllerRecords.get(controllerInfo);
}
if (info != null) {
info.commandQueue.add(commandRunnable);
@Nullable ConnectedControllerRecord<T> info = controllerRecords.get(controllerInfo);
if (info != null) {
info.commandQueue.add(
() -> {
commandRunnable.run();
return Futures.immediateVoidFuture();
});
}
}
}

public Deque<Runnable> getAndClearCommandQueue(ControllerInfo controllerInfo) {
Deque<Runnable> commandQueue = new ArrayDeque<>();
public void flushCommandQueue(ControllerInfo controllerInfo) {
synchronized (lock) {
@Nullable ConnectedControllerRecord<T> info = controllerRecords.get(controllerInfo);
if (info != null) {
commandQueue.addAll(info.commandQueue);
info.commandQueue.clear();
if (info == null || info.commandQueueIsFlushing || info.commandQueue.isEmpty()) {
return;
}
info.commandQueueIsFlushing = true;
flushCommandQueue(info);
}
}

@GuardedBy("lock")
private void flushCommandQueue(ConnectedControllerRecord<T> info) {
AtomicBoolean continueRunning = new AtomicBoolean(true);
while (continueRunning.get()) {
continueRunning.set(false);
@Nullable AsyncCommand asyncCommand = info.commandQueue.poll();
if (asyncCommand == null) {
info.commandQueueIsFlushing = false;
return;
}
AtomicBoolean commandExecuting = new AtomicBoolean(true);
postOrRun(
sessionImpl.getApplicationHandler(),
() ->
asyncCommand
.run()
.addListener(
() -> {
synchronized (lock) {
if (!commandExecuting.get()) {
flushCommandQueue(info);
} else {
continueRunning.set(true);
}
}
},
MoreExecutors.directExecutor()));
commandExecuting.set(false);
}
return commandQueue;
}

private static final class ConnectedControllerRecord<T> {

public final T controllerKey;
public final SequencedFutureManager sequencedFutureManager;
public final Deque<AsyncCommand> commandQueue;

public SessionCommands sessionCommands;
public Player.Commands playerCommands;
public Deque<Runnable> commandQueue;
public boolean commandQueueIsFlushing;

public ConnectedControllerRecord(
T controllerKey,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@
import com.google.common.util.concurrent.MoreExecutors;
import java.lang.ref.WeakReference;
import java.util.Collections;
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -225,17 +224,34 @@ private <T, K extends MediaSessionImpl> void dispatchSessionTaskWithPlayerComman
if (controller == null) {
return;
}
if (command == COMMAND_SET_VIDEO_SURFACE) {
postOrRun(
sessionImpl.getApplicationHandler(),
getSessionTaskWithPlayerCommandRunnable(
controller, seq, command, sessionImpl, task, postTask));
} else {
connectedControllersManager.addToCommandQueue(
controller,
getSessionTaskWithPlayerCommandRunnable(
controller, seq, command, sessionImpl, task, postTask));
}
postOrRun(
sessionImpl.getApplicationHandler(),
() -> {
if (!connectedControllersManager.isPlayerCommandAvailable(controller, command)) {
sendSessionResult(
sessionImpl,
controller,
seq,
new SessionResult(SessionResult.RESULT_ERROR_PERMISSION_DENIED));
return;
}
@SessionResult.Code
int resultCode = sessionImpl.onPlayerCommandRequestOnHandler(controller, command);
if (resultCode != SessionResult.RESULT_SUCCESS) {
// Don't run rejected command.
sendSessionResult(sessionImpl, controller, seq, new SessionResult(resultCode));
return;
}
if (command == COMMAND_SET_VIDEO_SURFACE) {
getSessionTaskWithPlayerCommandRunnable(controller, seq, sessionImpl, task, postTask)
.run();
} else {
connectedControllersManager.addToCommandQueue(
controller,
getSessionTaskWithPlayerCommandRunnable(
controller, seq, sessionImpl, task, postTask));
}
});
} finally {
Binder.restoreCallingIdentity(token);
}
Expand All @@ -244,26 +260,10 @@ private <T, K extends MediaSessionImpl> void dispatchSessionTaskWithPlayerComman
private <T, K extends MediaSessionImpl> Runnable getSessionTaskWithPlayerCommandRunnable(
ControllerInfo controller,
int seq,
@Player.Command int command,
K sessionImpl,
SessionTask<T, K> task,
PostSessionTask<T, K> postTask) {
return () -> {
if (!connectedControllersManager.isPlayerCommandAvailable(controller, command)) {
sendSessionResult(
sessionImpl,
controller,
seq,
new SessionResult(SessionResult.RESULT_ERROR_PERMISSION_DENIED));
return;
}
@SessionResult.Code
int resultCode = sessionImpl.onPlayerCommandRequestOnHandler(controller, command);
if (resultCode != SessionResult.RESULT_SUCCESS) {
// Don't run rejected command.
sendSessionResult(sessionImpl, controller, seq, new SessionResult(resultCode));
return;
}
T result = task.run(sessionImpl, controller);
postTask.run(sessionImpl, controller, seq, result);
};
Expand Down Expand Up @@ -1450,17 +1450,9 @@ public void flushCommandQueue(@Nullable IMediaController caller) {
}
ControllerInfo controllerInfo = connectedControllersManager.getController(caller.asBinder());
if (controllerInfo != null) {
Deque<Runnable> queue = connectedControllersManager.getAndClearCommandQueue(controllerInfo);
postOrRun(
sessionImpl.getApplicationHandler(),
() -> {
while (!queue.isEmpty()) {
Runnable runnable = queue.poll();
if (runnable != null) {
runnable.run();
}
}
});
() -> connectedControllersManager.flushCommandQueue(controllerInfo));
}
} finally {
Binder.restoreCallingIdentity(token);
Expand Down

0 comments on commit dee8078

Please sign in to comment.