Wrap processor chain executor service for better exception handling.
The wrapper * catches exceptions for each task and notifies the listener (this will be used more in follow-ups when processFrame is split into lots of listeners and callbacks), * removes finished tasks from the queue and signals any exceptions that occurred to the listener each time a new task is executed. PiperOrigin-RevId: 455345184
This commit is contained in:
parent
8f844b32fd
commit
ee847d92c5
@ -0,0 +1,27 @@
|
||||
/*
|
||||
* Copyright 2022 The Android Open Source Project
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package androidx.media3.transformer;
|
||||
|
||||
import androidx.media3.common.util.GlUtil;
|
||||
|
||||
/**
|
||||
* Interface for tasks that may throw a {@link GlUtil.GlException} or {@link
|
||||
* FrameProcessingException}.
|
||||
*/
|
||||
/* package */ interface FrameProcessingTask {
|
||||
/** Runs the task. */
|
||||
void run() throws FrameProcessingException, GlUtil.GlException;
|
||||
}
|
@ -0,0 +1,123 @@
|
||||
/*
|
||||
* Copyright 2022 The Android Open Source Project
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package androidx.media3.transformer;
|
||||
|
||||
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||
|
||||
import androidx.media3.common.util.GlUtil;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* Wrapper around a single thread {@link ExecutorService} for executing {@link FrameProcessingTask}
|
||||
* instances.
|
||||
*
|
||||
* <p>The wrapper handles calling {@link
|
||||
* FrameProcessorChain.Listener#onFrameProcessingError(FrameProcessingException)} for errors that
|
||||
* occur during these tasks.
|
||||
*/
|
||||
/* package */ final class FrameProcessingTaskExecutor {
|
||||
|
||||
private final ExecutorService singleThreadExecutorService;
|
||||
private final FrameProcessorChain.Listener listener;
|
||||
private final ConcurrentLinkedQueue<Future<?>> futures;
|
||||
private final AtomicBoolean shouldCancelTasks;
|
||||
|
||||
/** Creates a new instance. */
|
||||
public FrameProcessingTaskExecutor(
|
||||
ExecutorService singleThreadExecutorService, FrameProcessorChain.Listener listener) {
|
||||
this.singleThreadExecutorService = singleThreadExecutorService;
|
||||
this.listener = listener;
|
||||
|
||||
futures = new ConcurrentLinkedQueue<>();
|
||||
shouldCancelTasks = new AtomicBoolean();
|
||||
}
|
||||
|
||||
/**
|
||||
* Submits the given {@link FrameProcessingTask} to be executed after any pending tasks have
|
||||
* completed.
|
||||
*/
|
||||
public void submit(FrameProcessingTask task) {
|
||||
if (shouldCancelTasks.get()) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
futures.add(submitTask(task));
|
||||
} catch (RejectedExecutionException e) {
|
||||
if (!shouldCancelTasks.getAndSet(true)) {
|
||||
listener.onFrameProcessingError(new FrameProcessingException(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancels remaining tasks, runs the given release task, and shuts down the background thread.
|
||||
*
|
||||
* @param releaseTask A {@link FrameProcessingTask} to execute before shutting down the background
|
||||
* thread.
|
||||
* @param releaseWaitTimeMs How long to wait for the release task to terminate, in milliseconds.
|
||||
* @throws InterruptedException If interrupted while releasing resources.
|
||||
*/
|
||||
public void release(FrameProcessingTask releaseTask, long releaseWaitTimeMs)
|
||||
throws InterruptedException {
|
||||
shouldCancelTasks.getAndSet(true);
|
||||
while (!futures.isEmpty()) {
|
||||
futures.remove().cancel(/* mayInterruptIfRunning= */ false);
|
||||
}
|
||||
Future<?> releaseFuture = submitTask(releaseTask);
|
||||
singleThreadExecutorService.shutdown();
|
||||
try {
|
||||
if (!singleThreadExecutorService.awaitTermination(releaseWaitTimeMs, MILLISECONDS)) {
|
||||
listener.onFrameProcessingError(new FrameProcessingException("Release timed out"));
|
||||
}
|
||||
releaseFuture.get();
|
||||
} catch (ExecutionException e) {
|
||||
listener.onFrameProcessingError(new FrameProcessingException(e));
|
||||
}
|
||||
}
|
||||
|
||||
private Future<?> submitTask(FrameProcessingTask glTask) {
|
||||
return singleThreadExecutorService.submit(
|
||||
() -> {
|
||||
try {
|
||||
glTask.run();
|
||||
removeFinishedFutures();
|
||||
} catch (FrameProcessingException | GlUtil.GlException | RuntimeException e) {
|
||||
listener.onFrameProcessingError(FrameProcessingException.from(e));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void removeFinishedFutures() {
|
||||
while (!futures.isEmpty()) {
|
||||
if (!futures.element().isDone()) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
futures.remove().get();
|
||||
} catch (ExecutionException e) {
|
||||
listener.onFrameProcessingError(new FrameProcessingException(e));
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
listener.onFrameProcessingError(new FrameProcessingException(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -16,10 +16,8 @@
|
||||
package androidx.media3.transformer;
|
||||
|
||||
import static androidx.media3.common.util.Assertions.checkArgument;
|
||||
import static androidx.media3.common.util.Assertions.checkNotNull;
|
||||
import static androidx.media3.common.util.Assertions.checkState;
|
||||
import static com.google.common.collect.Iterables.getLast;
|
||||
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||
|
||||
import android.content.Context;
|
||||
import android.graphics.SurfaceTexture;
|
||||
@ -42,11 +40,9 @@ import androidx.media3.common.util.Util;
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
|
||||
@ -323,15 +319,12 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
|
||||
private final boolean enableExperimentalHdrEditing;
|
||||
private final EGLDisplay eglDisplay;
|
||||
private final EGLContext eglContext;
|
||||
/** Some OpenGL commands may block, so all OpenGL commands are run on a background thread. */
|
||||
private final ExecutorService singleThreadExecutorService;
|
||||
private final FrameProcessingTaskExecutor frameProcessingTaskExecutor;
|
||||
/**
|
||||
* Offset compared to original media presentation time that has been added to incoming frame
|
||||
* timestamps, in microseconds.
|
||||
*/
|
||||
private final long streamOffsetUs;
|
||||
/** Futures corresponding to the executor service's pending tasks. */
|
||||
private final ConcurrentLinkedQueue<Future<?>> futures;
|
||||
/** Number of frames {@linkplain #registerInputFrame() registered} but not fully processed. */
|
||||
private final AtomicInteger pendingFrameCount;
|
||||
/** Wraps the {@link #inputSurfaceTexture}. */
|
||||
@ -399,7 +392,6 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
|
||||
|
||||
this.eglDisplay = eglDisplay;
|
||||
this.eglContext = eglContext;
|
||||
this.singleThreadExecutorService = singleThreadExecutorService;
|
||||
this.inputExternalTexId = inputExternalTexId;
|
||||
this.streamOffsetUs = streamOffsetUs;
|
||||
this.intermediateTextures = intermediateTextures;
|
||||
@ -411,7 +403,8 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
|
||||
this.stopProcessing = new AtomicBoolean();
|
||||
this.enableExperimentalHdrEditing = enableExperimentalHdrEditing;
|
||||
|
||||
futures = new ConcurrentLinkedQueue<>();
|
||||
frameProcessingTaskExecutor =
|
||||
new FrameProcessingTaskExecutor(singleThreadExecutorService, listener);
|
||||
pendingFrameCount = new AtomicInteger();
|
||||
inputSurfaceTexture = new SurfaceTexture(inputExternalTexId);
|
||||
inputSurface = new Surface(inputSurfaceTexture);
|
||||
@ -425,20 +418,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
|
||||
public Surface getInputSurface() {
|
||||
// TODO(b/227625423): Allow input surface to be recreated for input size change.
|
||||
inputSurfaceTexture.setOnFrameAvailableListener(
|
||||
surfaceTexture -> {
|
||||
if (stopProcessing.get()) {
|
||||
// Frames can still become available after a transformation is cancelled but they can be
|
||||
// ignored.
|
||||
return;
|
||||
}
|
||||
try {
|
||||
futures.add(singleThreadExecutorService.submit(this::processFrame));
|
||||
} catch (RejectedExecutionException e) {
|
||||
if (!stopProcessing.get()) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
});
|
||||
surfaceTexture -> frameProcessingTaskExecutor.submit(this::processFrame));
|
||||
return inputSurface;
|
||||
}
|
||||
|
||||
@ -470,7 +450,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
|
||||
public void signalEndOfInputStream() {
|
||||
checkState(!inputStreamEnded);
|
||||
inputStreamEnded = true;
|
||||
futures.add(singleThreadExecutorService.submit(this::signalEndOfOutputStream));
|
||||
frameProcessingTaskExecutor.submit(this::signalEndOfOutputStream);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -485,19 +465,13 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
|
||||
*/
|
||||
public void release() {
|
||||
stopProcessing.set(true);
|
||||
while (!futures.isEmpty()) {
|
||||
checkNotNull(futures.poll()).cancel(/* mayInterruptIfRunning= */ false);
|
||||
}
|
||||
futures.add(
|
||||
singleThreadExecutorService.submit(this::releaseTextureProcessorsAndDestroyGlContext));
|
||||
singleThreadExecutorService.shutdown();
|
||||
try {
|
||||
if (!singleThreadExecutorService.awaitTermination(RELEASE_WAIT_TIME_MS, MILLISECONDS)) {
|
||||
Log.d(TAG, "Failed to release FrameProcessorChain");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Log.d(TAG, "FrameProcessorChain release was interrupted", e);
|
||||
frameProcessingTaskExecutor.release(
|
||||
/* releaseTask= */ this::releaseTextureProcessorsAndDestroyGlContext,
|
||||
RELEASE_WAIT_TIME_MS);
|
||||
} catch (InterruptedException unexpected) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new IllegalStateException(unexpected);
|
||||
}
|
||||
inputSurfaceTexture.release();
|
||||
inputSurface.release();
|
||||
@ -509,24 +483,19 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
|
||||
* <p>This method must be called on the {@linkplain #THREAD_NAME background thread}.
|
||||
*/
|
||||
@WorkerThread
|
||||
private void processFrame() {
|
||||
if (stopProcessing.get()) {
|
||||
return;
|
||||
}
|
||||
private void processFrame() throws FrameProcessingException {
|
||||
checkState(Thread.currentThread().getName().equals(THREAD_NAME));
|
||||
|
||||
inputSurfaceTexture.updateTexImage();
|
||||
long inputFrameTimeNs = inputSurfaceTexture.getTimestamp();
|
||||
// Correct for the stream offset so processors see original media presentation timestamps.
|
||||
long presentationTimeUs = inputFrameTimeNs / 1000 - streamOffsetUs;
|
||||
inputSurfaceTexture.getTransformMatrix(textureTransformMatrix);
|
||||
((ExternalTextureProcessor) textureProcessors.get(0))
|
||||
.setTextureTransformMatrix(textureTransformMatrix);
|
||||
int inputTexId = inputExternalTexId;
|
||||
|
||||
long presentationTimeUs = C.TIME_UNSET;
|
||||
try {
|
||||
checkState(Thread.currentThread().getName().equals(THREAD_NAME));
|
||||
|
||||
inputSurfaceTexture.updateTexImage();
|
||||
long inputFrameTimeNs = inputSurfaceTexture.getTimestamp();
|
||||
// Correct for the stream offset so processors see original media presentation timestamps.
|
||||
presentationTimeUs = inputFrameTimeNs / 1000 - streamOffsetUs;
|
||||
inputSurfaceTexture.getTransformMatrix(textureTransformMatrix);
|
||||
((ExternalTextureProcessor) textureProcessors.get(0))
|
||||
.setTextureTransformMatrix(textureTransformMatrix);
|
||||
int inputTexId = inputExternalTexId;
|
||||
|
||||
for (int i = 0; i < textureProcessors.size() - 1; i++) {
|
||||
if (stopProcessing.get()) {
|
||||
return;
|
||||
@ -550,27 +519,25 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
|
||||
|
||||
EGLExt.eglPresentationTimeANDROID(eglDisplay, outputEglSurface, inputFrameTimeNs);
|
||||
EGL14.eglSwapBuffers(eglDisplay, outputEglSurface);
|
||||
} catch (GlUtil.GlException e) {
|
||||
throw new FrameProcessingException(e, presentationTimeUs);
|
||||
}
|
||||
|
||||
try {
|
||||
if (debugSurfaceViewWrapper != null) {
|
||||
long finalPresentationTimeUs = presentationTimeUs;
|
||||
int finalInputTexId = inputTexId;
|
||||
debugSurfaceViewWrapper.maybeRenderToSurfaceView(
|
||||
() -> {
|
||||
try {
|
||||
GlUtil.clearOutputFrame();
|
||||
getLast(textureProcessors).drawFrame(finalInputTexId, finalPresentationTimeUs);
|
||||
} catch (GlUtil.GlException | FrameProcessingException e) {
|
||||
Log.d(TAG, "Error rendering to debug preview", e);
|
||||
}
|
||||
GlUtil.clearOutputFrame();
|
||||
getLast(textureProcessors).drawFrame(finalInputTexId, finalPresentationTimeUs);
|
||||
});
|
||||
}
|
||||
|
||||
checkState(pendingFrameCount.getAndDecrement() > 0);
|
||||
} catch (FrameProcessingException | GlUtil.GlException | RuntimeException e) {
|
||||
if (!stopProcessing.getAndSet(true)) {
|
||||
listener.onFrameProcessingError(FrameProcessingException.from(e, presentationTimeUs));
|
||||
}
|
||||
} catch (FrameProcessingException | GlUtil.GlException e) {
|
||||
Log.d(TAG, "Error rendering to debug preview", e);
|
||||
}
|
||||
|
||||
checkState(pendingFrameCount.getAndDecrement() > 0);
|
||||
}
|
||||
|
||||
/** Calls {@link Listener#onFrameProcessingEnded()} once no more frames are pending. */
|
||||
@ -579,7 +546,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
|
||||
if (getPendingFrameCount() == 0) {
|
||||
listener.onFrameProcessingEnded();
|
||||
} else {
|
||||
futures.add(singleThreadExecutorService.submit(this::signalEndOfOutputStream));
|
||||
frameProcessingTaskExecutor.submit(this::signalEndOfOutputStream);
|
||||
}
|
||||
}
|
||||
|
||||
@ -590,15 +557,12 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
|
||||
* <p>This method must be called on the {@linkplain #THREAD_NAME background thread}.
|
||||
*/
|
||||
@WorkerThread
|
||||
private void releaseTextureProcessorsAndDestroyGlContext() {
|
||||
try {
|
||||
for (int i = 0; i < textureProcessors.size(); i++) {
|
||||
textureProcessors.get(i).release();
|
||||
}
|
||||
GlUtil.destroyEglContext(eglDisplay, eglContext);
|
||||
} catch (FrameProcessingException | GlUtil.GlException | RuntimeException e) {
|
||||
listener.onFrameProcessingError(FrameProcessingException.from(e));
|
||||
private void releaseTextureProcessorsAndDestroyGlContext()
|
||||
throws GlUtil.GlException, FrameProcessingException {
|
||||
for (int i = 0; i < textureProcessors.size(); i++) {
|
||||
textureProcessors.get(i).release();
|
||||
}
|
||||
GlUtil.destroyEglContext(eglDisplay, eglContext);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -627,12 +591,12 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
|
||||
|
||||
/**
|
||||
* Focuses the wrapped surface view's surface as an {@link EGLSurface}, renders using {@code
|
||||
* renderRunnable} and swaps buffers, if the view's holder has a valid surface. Does nothing
|
||||
* renderingTask} and swaps buffers, if the view's holder has a valid surface. Does nothing
|
||||
* otherwise.
|
||||
*/
|
||||
@WorkerThread
|
||||
public synchronized void maybeRenderToSurfaceView(Runnable renderRunnable)
|
||||
throws GlUtil.GlException {
|
||||
public synchronized void maybeRenderToSurfaceView(FrameProcessingTask renderingTask)
|
||||
throws GlUtil.GlException, FrameProcessingException {
|
||||
if (surface == null) {
|
||||
return;
|
||||
}
|
||||
@ -646,7 +610,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
|
||||
}
|
||||
EGLSurface eglSurface = this.eglSurface;
|
||||
GlUtil.focusEglSurface(eglDisplay, eglContext, eglSurface, width, height);
|
||||
renderRunnable.run();
|
||||
renderingTask.run();
|
||||
EGL14.eglSwapBuffers(eglDisplay, eglSurface);
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user