Add Util helper methods to work with Futures

This adds two methods that are helpful when working with Futures.
One is a version of postOrRun that can indicate completion by a
Future and the other is a simplified version of Guava's
Futures.transformAsync (which can't be used as it's in Beta).

PiperOrigin-RevId: 461896598
This commit is contained in:
tonihei 2022-07-19 16:04:45 +00:00 committed by Rohit Singh
parent 5bf9e2fb31
commit f9eec0c0e8
2 changed files with 349 additions and 0 deletions

View File

@ -80,6 +80,11 @@ import androidx.media3.common.Player;
import androidx.media3.common.Player.Commands;
import com.google.common.base.Ascii;
import com.google.common.base.Charsets;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.File;
@ -102,6 +107,8 @@ import java.util.MissingResourceException;
import java.util.NoSuchElementException;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.regex.Matcher;
@ -624,6 +631,94 @@ public final class Util {
}
}
/**
* Posts the {@link Runnable} if the calling thread differs with the {@link Looper} of the {@link
* Handler}. Otherwise, runs the {@link Runnable} directly. Also returns a {@link
* ListenableFuture} for when the {@link Runnable} has run.
*
* @param handler The handler to which the {@link Runnable} will be posted.
* @param runnable The runnable to either post or run.
* @param successValue The value to set in the {@link ListenableFuture} once the runnable
* completes.
* @param <T> The type of {@code successValue}.
* @return A {@link ListenableFuture} for when the {@link Runnable} has run.
*/
@UnstableApi
public static <T> ListenableFuture<T> postOrRunWithCompletion(
Handler handler, Runnable runnable, T successValue) {
SettableFuture<T> outputFuture = SettableFuture.create();
postOrRun(
handler,
() -> {
try {
if (outputFuture.isCancelled()) {
return;
}
runnable.run();
outputFuture.set(successValue);
} catch (Throwable e) {
outputFuture.setException(e);
}
});
return outputFuture;
}
/**
* Asynchronously transforms the result of a {@link ListenableFuture}.
*
* <p>The transformation function is called using a {@linkplain MoreExecutors#directExecutor()
* direct executor}.
*
* <p>The returned Future attempts to keep its cancellation state in sync with that of the input
* future and that of the future returned by the transform function. That is, if the returned
* Future is cancelled, it will attempt to cancel the other two, and if either of the other two is
* cancelled, the returned Future will also be cancelled. All forwarded cancellations will not
* attempt to interrupt.
*
* @param future The input {@link ListenableFuture}.
* @param transformFunction The function transforming the result of the input future.
* @param <T> The result type of the input future.
* @param <U> The result type of the transformation function.
* @return A {@link ListenableFuture} for the transformed result.
*/
@UnstableApi
public static <T, U> ListenableFuture<T> transformFutureAsync(
ListenableFuture<U> future, AsyncFunction<U, T> transformFunction) {
// This is a simplified copy of Guava's Futures.transformAsync.
SettableFuture<T> outputFuture = SettableFuture.create();
outputFuture.addListener(
() -> {
if (outputFuture.isCancelled()) {
future.cancel(/* mayInterruptIfRunning= */ false);
}
},
MoreExecutors.directExecutor());
future.addListener(
() -> {
U inputFutureResult;
try {
inputFutureResult = Futures.getDone(future);
} catch (CancellationException cancellationException) {
outputFuture.cancel(/* mayInterruptIfRunning= */ false);
return;
} catch (ExecutionException exception) {
@Nullable Throwable cause = exception.getCause();
outputFuture.setException(cause == null ? exception : cause);
return;
} catch (RuntimeException | Error error) {
outputFuture.setException(error);
return;
}
try {
outputFuture.setFuture(transformFunction.apply(inputFutureResult));
} catch (Throwable exception) {
outputFuture.setException(exception);
}
},
MoreExecutors.directExecutor());
return outputFuture;
}
/**
* Returns the {@link Looper} associated with the current thread, or the {@link Looper} of the
* application's main thread if the current thread doesn't have a {@link Looper}.

View File

@ -28,10 +28,16 @@ import static androidx.media3.common.util.Util.parseXsDuration;
import static androidx.media3.common.util.Util.unescapeFileName;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import android.database.sqlite.SQLiteDatabase;
import android.database.sqlite.SQLiteOpenHelper;
import android.net.Uri;
import android.os.Handler;
import android.os.HandlerThread;
import android.os.Looper;
import android.text.SpannableString;
import android.text.Spanned;
import android.text.style.StrikethroughSpan;
@ -41,6 +47,9 @@ import androidx.media3.common.C;
import androidx.media3.test.utils.TestUtil;
import androidx.test.ext.junit.runners.AndroidJUnit4;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.io.ByteArrayInputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@ -49,16 +58,21 @@ import java.util.Arrays;
import java.util.Formatter;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.Deflater;
import java.util.zip.GZIPInputStream;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.robolectric.annotation.Config;
import org.robolectric.shadows.ShadowLooper;
/** Unit tests for {@link Util}. */
@RunWith(AndroidJUnit4.class)
public class UtilTest {
private static final int TIMEOUT_MS = 10000;
@Test
public void addWithOverflowDefault_withoutOverFlow_returnsSum() {
long res = Util.addWithOverflowDefault(5, 10, /* overflowResult= */ 0);
@ -1238,6 +1252,246 @@ public class UtilTest {
.isEqualTo(0);
}
@Test
public void postOrRun_withMatchingThread_runsInline() {
Runnable mockRunnable = mock(Runnable.class);
Util.postOrRun(new Handler(Looper.myLooper()), mockRunnable);
verify(mockRunnable).run();
}
@Test
public void postOrRun_fromDifferentThread_posts() throws Exception {
Runnable mockRunnable = mock(Runnable.class);
HandlerThread handlerThread = new HandlerThread("TestThread");
handlerThread.start();
Handler handler = new Handler(handlerThread.getLooper());
ConditionVariable postedCondition = new ConditionVariable();
handler.post(
() -> {
Util.postOrRun(new Handler(Looper.getMainLooper()), mockRunnable);
postedCondition.open();
});
postedCondition.block(TIMEOUT_MS);
handlerThread.quit();
verify(mockRunnable, never()).run();
ShadowLooper.idleMainLooper();
verify(mockRunnable).run();
}
@Test
public void postOrRunWithCompletion_withMatchingThread_runsInline() throws Exception {
Runnable mockRunnable = mock(Runnable.class);
Object expectedResult = new Object();
ListenableFuture<Object> future =
Util.postOrRunWithCompletion(new Handler(Looper.myLooper()), mockRunnable, expectedResult);
assertThat(future.isDone()).isTrue();
assertThat(future.get()).isEqualTo(expectedResult);
verify(mockRunnable).run();
}
@Test
public void postOrRunWithCompletion_withException_hasException() throws Exception {
Object expectedResult = new Object();
ListenableFuture<Object> future =
Util.postOrRunWithCompletion(
new Handler(Looper.myLooper()),
() -> {
throw new IllegalStateException();
},
expectedResult);
assertThat(future.isDone()).isTrue();
ExecutionException executionException = assertThrows(ExecutionException.class, future::get);
assertThat(executionException).hasCauseThat().isInstanceOf(IllegalStateException.class);
}
@Test
public void postOrRunWithCompletion_fromDifferentThread_posts() throws Exception {
Runnable mockRunnable = mock(Runnable.class);
Object expectedResult = new Object();
HandlerThread handlerThread = new HandlerThread("TestThread");
handlerThread.start();
Handler handler = new Handler(handlerThread.getLooper());
ConditionVariable postedCondition = new ConditionVariable();
AtomicReference<ListenableFuture<Object>> futureReference = new AtomicReference<>();
handler.post(
() -> {
futureReference.set(
Util.postOrRunWithCompletion(
new Handler(Looper.getMainLooper()), mockRunnable, expectedResult));
postedCondition.open();
});
postedCondition.block(TIMEOUT_MS);
handlerThread.quit();
ListenableFuture<Object> future = futureReference.get();
verify(mockRunnable, never()).run();
assertThat(future.isDone()).isFalse();
ShadowLooper.idleMainLooper();
assertThat(future.isDone()).isTrue();
assertThat(future.get()).isEqualTo(expectedResult);
verify(mockRunnable).run();
}
@Test
public void postOrRunWithCompletion_withCancel_isNeverRun() throws Exception {
Runnable mockRunnable = mock(Runnable.class);
Object expectedResult = new Object();
HandlerThread handlerThread = new HandlerThread("TestThread");
handlerThread.start();
Handler handler = new Handler(handlerThread.getLooper());
ConditionVariable postedCondition = new ConditionVariable();
AtomicReference<ListenableFuture<Object>> futureReference = new AtomicReference<>();
handler.post(
() -> {
futureReference.set(
Util.postOrRunWithCompletion(
new Handler(Looper.getMainLooper()), mockRunnable, expectedResult));
postedCondition.open();
});
postedCondition.block(TIMEOUT_MS);
handlerThread.quit();
ListenableFuture<Object> future = futureReference.get();
future.cancel(/* mayInterruptIfRunning= */ false);
ShadowLooper.idleMainLooper();
verify(mockRunnable, never()).run();
}
@Test
public void transformFutureAsync_withCancelledInput_isCancelled() {
SettableFuture<Object> inputFuture = SettableFuture.create();
inputFuture.cancel(/* mayInterruptIfRunning= */ false);
ListenableFuture<Object> outputFuture =
Util.transformFutureAsync(inputFuture, input -> Futures.immediateFuture(new Object()));
assertThat(outputFuture.isCancelled()).isTrue();
}
@Test
public void transformFutureAsync_withExceptionInput_hasException() {
SettableFuture<Object> inputFuture = SettableFuture.create();
Exception expectedException = new Exception();
inputFuture.setException(expectedException);
ListenableFuture<Object> outputFuture =
Util.transformFutureAsync(inputFuture, input -> Futures.immediateFuture(new Object()));
assertThat(outputFuture.isDone()).isTrue();
ExecutionException executionException =
assertThrows(ExecutionException.class, outputFuture::get);
assertThat(executionException).hasCauseThat().isEqualTo(expectedException);
}
@Test
public void transformFutureAsync_withCancelledTransform_isCancelled() {
SettableFuture<Object> inputFuture = SettableFuture.create();
SettableFuture<Object> transformFuture = SettableFuture.create();
ListenableFuture<Object> outputFuture =
Util.transformFutureAsync(inputFuture, input -> transformFuture);
assertThat(outputFuture.isDone()).isFalse();
inputFuture.set(new Object());
assertThat(outputFuture.isDone()).isFalse();
transformFuture.cancel(/* mayInterruptIfRunning= */ false);
assertThat(outputFuture.isCancelled()).isTrue();
}
@Test
public void transformFutureAsync_withExceptionInTransformFunction_hasException() {
SettableFuture<Object> inputFuture = SettableFuture.create();
Exception expectedException = new Exception();
ListenableFuture<Object> outputFuture =
Util.transformFutureAsync(
inputFuture,
input -> {
throw expectedException;
});
assertThat(outputFuture.isDone()).isFalse();
inputFuture.set(new Object());
assertThat(outputFuture.isDone()).isTrue();
ExecutionException executionException =
assertThrows(ExecutionException.class, outputFuture::get);
assertThat(executionException).hasCauseThat().isEqualTo(expectedException);
}
@Test
public void transformFutureAsync_withExceptionDuringTransform_hasException() {
SettableFuture<Object> inputFuture = SettableFuture.create();
Exception expectedException = new Exception();
ListenableFuture<Object> outputFuture =
Util.transformFutureAsync(
inputFuture, input -> Futures.immediateFailedFuture(expectedException));
assertThat(outputFuture.isDone()).isFalse();
inputFuture.set(new Object());
assertThat(outputFuture.isDone()).isTrue();
ExecutionException executionException =
assertThrows(ExecutionException.class, outputFuture::get);
assertThat(executionException).hasCauseThat().isEqualTo(expectedException);
}
@Test
public void transformFutureAsync_cancelDuringInput_inputIsCancelled() {
SettableFuture<Object> inputFuture = SettableFuture.create();
ListenableFuture<Object> outputFuture =
Util.transformFutureAsync(inputFuture, input -> Futures.immediateFuture(new Object()));
assertThat(outputFuture.isDone()).isFalse();
outputFuture.cancel(/* mayInterruptIfRunning= */ true);
assertThat(inputFuture.isCancelled()).isTrue();
}
@Test
public void transformFutureAsync_cancelDuringTransform_transformIsCancelled() {
SettableFuture<Object> inputFuture = SettableFuture.create();
SettableFuture<Object> transformFuture = SettableFuture.create();
ListenableFuture<Object> outputFuture =
Util.transformFutureAsync(inputFuture, input -> transformFuture);
assertThat(outputFuture.isDone()).isFalse();
inputFuture.set(new Object());
assertThat(outputFuture.isDone()).isFalse();
outputFuture.cancel(/* mayInterruptIfRunning= */ true);
assertThat(transformFuture.isCancelled()).isTrue();
}
@Test
public void transformFutureAsync_withSuccessfulTransform_returnsTransformedResult()
throws Exception {
SettableFuture<Object> inputFuture = SettableFuture.create();
SettableFuture<Object> transformFuture = SettableFuture.create();
Object expectedOutput = new Object();
ListenableFuture<Object> outputFuture =
Util.transformFutureAsync(inputFuture, input -> transformFuture);
assertThat(outputFuture.isDone()).isFalse();
inputFuture.set(new Object());
assertThat(outputFuture.isDone()).isFalse();
transformFuture.set(expectedOutput);
assertThat(outputFuture.isDone()).isTrue();
assertThat(outputFuture.get()).isEqualTo(expectedOutput);
}
private static void assertEscapeUnescapeFileName(String fileName, String escapedFileName) {
assertThat(escapeFileName(fileName)).isEqualTo(escapedFileName);
assertThat(unescapeFileName(escapedFileName)).isEqualTo(fileName);