diff --git a/libraries/transformer/src/main/java/androidx/media3/transformer/FrameProcessingTask.java b/libraries/transformer/src/main/java/androidx/media3/transformer/FrameProcessingTask.java new file mode 100644 index 0000000000..e5e12dc14c --- /dev/null +++ b/libraries/transformer/src/main/java/androidx/media3/transformer/FrameProcessingTask.java @@ -0,0 +1,27 @@ +/* + * Copyright 2022 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package androidx.media3.transformer; + +import androidx.media3.common.util.GlUtil; + +/** + * Interface for tasks that may throw a {@link GlUtil.GlException} or {@link + * FrameProcessingException}. + */ +/* package */ interface FrameProcessingTask { + /** Runs the task. */ + void run() throws FrameProcessingException, GlUtil.GlException; +} diff --git a/libraries/transformer/src/main/java/androidx/media3/transformer/FrameProcessingTaskExecutor.java b/libraries/transformer/src/main/java/androidx/media3/transformer/FrameProcessingTaskExecutor.java new file mode 100644 index 0000000000..d228494831 --- /dev/null +++ b/libraries/transformer/src/main/java/androidx/media3/transformer/FrameProcessingTaskExecutor.java @@ -0,0 +1,123 @@ +/* + * Copyright 2022 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package androidx.media3.transformer; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import androidx.media3.common.util.GlUtil; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Wrapper around a single thread {@link ExecutorService} for executing {@link FrameProcessingTask} + * instances. + * + *

The wrapper handles calling {@link + * FrameProcessorChain.Listener#onFrameProcessingError(FrameProcessingException)} for errors that + * occur during these tasks. + */ +/* package */ final class FrameProcessingTaskExecutor { + + private final ExecutorService singleThreadExecutorService; + private final FrameProcessorChain.Listener listener; + private final ConcurrentLinkedQueue> futures; + private final AtomicBoolean shouldCancelTasks; + + /** Creates a new instance. */ + public FrameProcessingTaskExecutor( + ExecutorService singleThreadExecutorService, FrameProcessorChain.Listener listener) { + this.singleThreadExecutorService = singleThreadExecutorService; + this.listener = listener; + + futures = new ConcurrentLinkedQueue<>(); + shouldCancelTasks = new AtomicBoolean(); + } + + /** + * Submits the given {@link FrameProcessingTask} to be executed after any pending tasks have + * completed. + */ + public void submit(FrameProcessingTask task) { + if (shouldCancelTasks.get()) { + return; + } + try { + futures.add(submitTask(task)); + } catch (RejectedExecutionException e) { + if (!shouldCancelTasks.getAndSet(true)) { + listener.onFrameProcessingError(new FrameProcessingException(e)); + } + } + } + + /** + * Cancels remaining tasks, runs the given release task, and shuts down the background thread. + * + * @param releaseTask A {@link FrameProcessingTask} to execute before shutting down the background + * thread. + * @param releaseWaitTimeMs How long to wait for the release task to terminate, in milliseconds. + * @throws InterruptedException If interrupted while releasing resources. + */ + public void release(FrameProcessingTask releaseTask, long releaseWaitTimeMs) + throws InterruptedException { + shouldCancelTasks.getAndSet(true); + while (!futures.isEmpty()) { + futures.remove().cancel(/* mayInterruptIfRunning= */ false); + } + Future releaseFuture = submitTask(releaseTask); + singleThreadExecutorService.shutdown(); + try { + if (!singleThreadExecutorService.awaitTermination(releaseWaitTimeMs, MILLISECONDS)) { + listener.onFrameProcessingError(new FrameProcessingException("Release timed out")); + } + releaseFuture.get(); + } catch (ExecutionException e) { + listener.onFrameProcessingError(new FrameProcessingException(e)); + } + } + + private Future submitTask(FrameProcessingTask glTask) { + return singleThreadExecutorService.submit( + () -> { + try { + glTask.run(); + removeFinishedFutures(); + } catch (FrameProcessingException | GlUtil.GlException | RuntimeException e) { + listener.onFrameProcessingError(FrameProcessingException.from(e)); + } + }); + } + + private void removeFinishedFutures() { + while (!futures.isEmpty()) { + if (!futures.element().isDone()) { + return; + } + try { + futures.remove().get(); + } catch (ExecutionException e) { + listener.onFrameProcessingError(new FrameProcessingException(e)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + listener.onFrameProcessingError(new FrameProcessingException(e)); + } + } + } +} diff --git a/libraries/transformer/src/main/java/androidx/media3/transformer/FrameProcessorChain.java b/libraries/transformer/src/main/java/androidx/media3/transformer/FrameProcessorChain.java index 2167da9744..510160e3f1 100644 --- a/libraries/transformer/src/main/java/androidx/media3/transformer/FrameProcessorChain.java +++ b/libraries/transformer/src/main/java/androidx/media3/transformer/FrameProcessorChain.java @@ -16,10 +16,8 @@ package androidx.media3.transformer; import static androidx.media3.common.util.Assertions.checkArgument; -import static androidx.media3.common.util.Assertions.checkNotNull; import static androidx.media3.common.util.Assertions.checkState; import static com.google.common.collect.Iterables.getLast; -import static java.util.concurrent.TimeUnit.MILLISECONDS; import android.content.Context; import android.graphics.SurfaceTexture; @@ -42,11 +40,9 @@ import androidx.media3.common.util.Util; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; @@ -323,15 +319,12 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; private final boolean enableExperimentalHdrEditing; private final EGLDisplay eglDisplay; private final EGLContext eglContext; - /** Some OpenGL commands may block, so all OpenGL commands are run on a background thread. */ - private final ExecutorService singleThreadExecutorService; + private final FrameProcessingTaskExecutor frameProcessingTaskExecutor; /** * Offset compared to original media presentation time that has been added to incoming frame * timestamps, in microseconds. */ private final long streamOffsetUs; - /** Futures corresponding to the executor service's pending tasks. */ - private final ConcurrentLinkedQueue> futures; /** Number of frames {@linkplain #registerInputFrame() registered} but not fully processed. */ private final AtomicInteger pendingFrameCount; /** Wraps the {@link #inputSurfaceTexture}. */ @@ -399,7 +392,6 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; this.eglDisplay = eglDisplay; this.eglContext = eglContext; - this.singleThreadExecutorService = singleThreadExecutorService; this.inputExternalTexId = inputExternalTexId; this.streamOffsetUs = streamOffsetUs; this.intermediateTextures = intermediateTextures; @@ -411,7 +403,8 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; this.stopProcessing = new AtomicBoolean(); this.enableExperimentalHdrEditing = enableExperimentalHdrEditing; - futures = new ConcurrentLinkedQueue<>(); + frameProcessingTaskExecutor = + new FrameProcessingTaskExecutor(singleThreadExecutorService, listener); pendingFrameCount = new AtomicInteger(); inputSurfaceTexture = new SurfaceTexture(inputExternalTexId); inputSurface = new Surface(inputSurfaceTexture); @@ -425,20 +418,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; public Surface getInputSurface() { // TODO(b/227625423): Allow input surface to be recreated for input size change. inputSurfaceTexture.setOnFrameAvailableListener( - surfaceTexture -> { - if (stopProcessing.get()) { - // Frames can still become available after a transformation is cancelled but they can be - // ignored. - return; - } - try { - futures.add(singleThreadExecutorService.submit(this::processFrame)); - } catch (RejectedExecutionException e) { - if (!stopProcessing.get()) { - throw e; - } - } - }); + surfaceTexture -> frameProcessingTaskExecutor.submit(this::processFrame)); return inputSurface; } @@ -470,7 +450,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; public void signalEndOfInputStream() { checkState(!inputStreamEnded); inputStreamEnded = true; - futures.add(singleThreadExecutorService.submit(this::signalEndOfOutputStream)); + frameProcessingTaskExecutor.submit(this::signalEndOfOutputStream); } /** @@ -485,19 +465,13 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; */ public void release() { stopProcessing.set(true); - while (!futures.isEmpty()) { - checkNotNull(futures.poll()).cancel(/* mayInterruptIfRunning= */ false); - } - futures.add( - singleThreadExecutorService.submit(this::releaseTextureProcessorsAndDestroyGlContext)); - singleThreadExecutorService.shutdown(); try { - if (!singleThreadExecutorService.awaitTermination(RELEASE_WAIT_TIME_MS, MILLISECONDS)) { - Log.d(TAG, "Failed to release FrameProcessorChain"); - } - } catch (InterruptedException e) { - Log.d(TAG, "FrameProcessorChain release was interrupted", e); + frameProcessingTaskExecutor.release( + /* releaseTask= */ this::releaseTextureProcessorsAndDestroyGlContext, + RELEASE_WAIT_TIME_MS); + } catch (InterruptedException unexpected) { Thread.currentThread().interrupt(); + throw new IllegalStateException(unexpected); } inputSurfaceTexture.release(); inputSurface.release(); @@ -509,24 +483,19 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; *

This method must be called on the {@linkplain #THREAD_NAME background thread}. */ @WorkerThread - private void processFrame() { - if (stopProcessing.get()) { - return; - } + private void processFrame() throws FrameProcessingException { + checkState(Thread.currentThread().getName().equals(THREAD_NAME)); + + inputSurfaceTexture.updateTexImage(); + long inputFrameTimeNs = inputSurfaceTexture.getTimestamp(); + // Correct for the stream offset so processors see original media presentation timestamps. + long presentationTimeUs = inputFrameTimeNs / 1000 - streamOffsetUs; + inputSurfaceTexture.getTransformMatrix(textureTransformMatrix); + ((ExternalTextureProcessor) textureProcessors.get(0)) + .setTextureTransformMatrix(textureTransformMatrix); + int inputTexId = inputExternalTexId; - long presentationTimeUs = C.TIME_UNSET; try { - checkState(Thread.currentThread().getName().equals(THREAD_NAME)); - - inputSurfaceTexture.updateTexImage(); - long inputFrameTimeNs = inputSurfaceTexture.getTimestamp(); - // Correct for the stream offset so processors see original media presentation timestamps. - presentationTimeUs = inputFrameTimeNs / 1000 - streamOffsetUs; - inputSurfaceTexture.getTransformMatrix(textureTransformMatrix); - ((ExternalTextureProcessor) textureProcessors.get(0)) - .setTextureTransformMatrix(textureTransformMatrix); - int inputTexId = inputExternalTexId; - for (int i = 0; i < textureProcessors.size() - 1; i++) { if (stopProcessing.get()) { return; @@ -550,27 +519,25 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; EGLExt.eglPresentationTimeANDROID(eglDisplay, outputEglSurface, inputFrameTimeNs); EGL14.eglSwapBuffers(eglDisplay, outputEglSurface); + } catch (GlUtil.GlException e) { + throw new FrameProcessingException(e, presentationTimeUs); + } + try { if (debugSurfaceViewWrapper != null) { long finalPresentationTimeUs = presentationTimeUs; int finalInputTexId = inputTexId; debugSurfaceViewWrapper.maybeRenderToSurfaceView( () -> { - try { - GlUtil.clearOutputFrame(); - getLast(textureProcessors).drawFrame(finalInputTexId, finalPresentationTimeUs); - } catch (GlUtil.GlException | FrameProcessingException e) { - Log.d(TAG, "Error rendering to debug preview", e); - } + GlUtil.clearOutputFrame(); + getLast(textureProcessors).drawFrame(finalInputTexId, finalPresentationTimeUs); }); } - - checkState(pendingFrameCount.getAndDecrement() > 0); - } catch (FrameProcessingException | GlUtil.GlException | RuntimeException e) { - if (!stopProcessing.getAndSet(true)) { - listener.onFrameProcessingError(FrameProcessingException.from(e, presentationTimeUs)); - } + } catch (FrameProcessingException | GlUtil.GlException e) { + Log.d(TAG, "Error rendering to debug preview", e); } + + checkState(pendingFrameCount.getAndDecrement() > 0); } /** Calls {@link Listener#onFrameProcessingEnded()} once no more frames are pending. */ @@ -579,7 +546,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; if (getPendingFrameCount() == 0) { listener.onFrameProcessingEnded(); } else { - futures.add(singleThreadExecutorService.submit(this::signalEndOfOutputStream)); + frameProcessingTaskExecutor.submit(this::signalEndOfOutputStream); } } @@ -590,15 +557,12 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; *

This method must be called on the {@linkplain #THREAD_NAME background thread}. */ @WorkerThread - private void releaseTextureProcessorsAndDestroyGlContext() { - try { - for (int i = 0; i < textureProcessors.size(); i++) { - textureProcessors.get(i).release(); - } - GlUtil.destroyEglContext(eglDisplay, eglContext); - } catch (FrameProcessingException | GlUtil.GlException | RuntimeException e) { - listener.onFrameProcessingError(FrameProcessingException.from(e)); + private void releaseTextureProcessorsAndDestroyGlContext() + throws GlUtil.GlException, FrameProcessingException { + for (int i = 0; i < textureProcessors.size(); i++) { + textureProcessors.get(i).release(); } + GlUtil.destroyEglContext(eglDisplay, eglContext); } /** @@ -627,12 +591,12 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; /** * Focuses the wrapped surface view's surface as an {@link EGLSurface}, renders using {@code - * renderRunnable} and swaps buffers, if the view's holder has a valid surface. Does nothing + * renderingTask} and swaps buffers, if the view's holder has a valid surface. Does nothing * otherwise. */ @WorkerThread - public synchronized void maybeRenderToSurfaceView(Runnable renderRunnable) - throws GlUtil.GlException { + public synchronized void maybeRenderToSurfaceView(FrameProcessingTask renderingTask) + throws GlUtil.GlException, FrameProcessingException { if (surface == null) { return; } @@ -646,7 +610,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; } EGLSurface eglSurface = this.eglSurface; GlUtil.focusEglSurface(eglDisplay, eglContext, eglSurface, width, height); - renderRunnable.run(); + renderingTask.run(); EGL14.eglSwapBuffers(eglDisplay, eglSurface); }