SpeedChangingAP: synchronize fields accessed from multiple threads

PiperOrigin-RevId: 643046385
This commit is contained in:
tofunmi 2024-06-13 10:33:37 -07:00 committed by Copybara-Service
parent df5352752f
commit 81f15dbd37
2 changed files with 239 additions and 71 deletions

View File

@ -35,6 +35,7 @@ import java.util.Queue;
import java.util.function.LongConsumer;
import org.checkerframework.checker.initialization.qual.UnknownInitialization;
import org.checkerframework.checker.nullness.qual.EnsuresNonNull;
import org.checkerframework.checker.nullness.qual.RequiresNonNull;
/**
* An {@link AudioProcessor} that changes the speed of audio samples depending on their timestamp.
@ -44,6 +45,8 @@ import org.checkerframework.checker.nullness.qual.EnsuresNonNull;
@UnstableApi
public final class SpeedChangingAudioProcessor extends BaseAudioProcessor {
private final Object lock;
/** The speed provider that provides the speed for each timestamp. */
private final SpeedProvider speedProvider;
@ -52,35 +55,47 @@ public final class SpeedChangingAudioProcessor extends BaseAudioProcessor {
* change required, the input buffer is copied to the output buffer and this processor is not
* used.
*/
private final SonicAudioProcessor sonicAudioProcessor;
private final Object pendingCallbacksLock;
private final SynchronizedSonicAudioProcessor sonicAudioProcessor;
// Elements in the same positions in the queues are associated.
@GuardedBy("pendingCallbacksLock")
@GuardedBy("lock")
private final LongArrayQueue pendingCallbackInputTimesUs;
@GuardedBy("pendingCallbacksLock")
@GuardedBy("lock")
private final Queue<TimestampConsumer> pendingCallbacks;
// Elements in the same positions in the arrays are associated.
@GuardedBy("lock")
private LongArray inputSegmentStartTimesUs;
@GuardedBy("lock")
private LongArray outputSegmentStartTimesUs;
private float currentSpeed;
private long bytesRead;
@GuardedBy("lock")
private long lastProcessedInputTimeUs;
private long lastSpeedAdjustedInputTimeUs;
private long lastSpeedAdjustedOutputTimeUs;
private boolean endOfStreamQueuedToSonic;
@GuardedBy("pendingCallbacksLock")
@GuardedBy("lock")
private long lastSpeedAdjustedInputTimeUs;
@GuardedBy("lock")
private long lastSpeedAdjustedOutputTimeUs;
@GuardedBy("lock")
private long speedAdjustedTimeAsyncInputTimeUs;
@GuardedBy("lock")
private float currentSpeed;
private long bytesRead;
private boolean endOfStreamQueuedToSonic;
public SpeedChangingAudioProcessor(SpeedProvider speedProvider) {
this.speedProvider = speedProvider;
sonicAudioProcessor = new SonicAudioProcessor();
pendingCallbacksLock = new Object();
lock = new Object();
sonicAudioProcessor = new SynchronizedSonicAudioProcessor(lock);
pendingCallbackInputTimesUs = new LongArrayQueue();
pendingCallbacks = new ArrayDeque<>();
speedAdjustedTimeAsyncInputTimeUs = C.TIME_UNSET;
@ -106,18 +121,8 @@ public final class SpeedChangingAudioProcessor extends BaseAudioProcessor {
/* multiplier= */ C.MICROS_PER_SECOND,
/* divisor= */ (long) inputAudioFormat.sampleRate * inputAudioFormat.bytesPerFrame);
float newSpeed = speedProvider.getSpeed(timeUs);
if (newSpeed != currentSpeed) {
updateSpeedChangeArrays(timeUs);
currentSpeed = newSpeed;
if (isUsingSonic()) {
sonicAudioProcessor.setSpeed(newSpeed);
sonicAudioProcessor.setPitch(newSpeed);
}
// Invalidate any previously created buffers in SonicAudioProcessor and the base class.
sonicAudioProcessor.flush();
endOfStreamQueuedToSonic = false;
super.getOutput();
}
updateSpeed(newSpeed, timeUs);
int inputBufferLimit = inputBuffer.limit();
long nextSpeedChangeTimeUs = speedProvider.getNextSpeedChangeTimeUs(timeUs);
@ -158,7 +163,7 @@ public final class SpeedChangingAudioProcessor extends BaseAudioProcessor {
buffer.flip();
}
bytesRead += inputBuffer.position() - startPosition;
lastProcessedInputTimeUs = updateLastProcessedInputTime();
updateLastProcessedInputTime();
inputBuffer.limit(inputBufferLimit);
}
@ -213,7 +218,7 @@ public final class SpeedChangingAudioProcessor extends BaseAudioProcessor {
* from the caller of this method.
*/
public void getSpeedAdjustedTimeAsync(long inputTimeUs, TimestampConsumer callback) {
synchronized (pendingCallbacksLock) {
synchronized (lock) {
checkArgument(speedAdjustedTimeAsyncInputTimeUs < inputTimeUs);
speedAdjustedTimeAsyncInputTimeUs = inputTimeUs;
if ((inputTimeUs <= lastProcessedInputTimeUs && pendingCallbackInputTimesUs.isEmpty())
@ -238,33 +243,36 @@ public final class SpeedChangingAudioProcessor extends BaseAudioProcessor {
* @return The corresponding input duration in microseconds.
*/
public long getMediaDurationUs(long playoutDurationUs) {
int floorIndex = outputSegmentStartTimesUs.size() - 1;
while (floorIndex > 0 && outputSegmentStartTimesUs.get(floorIndex) > playoutDurationUs) {
floorIndex--;
}
long lastSegmentOutputDurationUs =
playoutDurationUs - outputSegmentStartTimesUs.get(floorIndex);
long lastSegmentInputDurationUs;
if (floorIndex == outputSegmentStartTimesUs.size() - 1) {
lastSegmentInputDurationUs = getMediaDurationUsAtCurrentSpeed(lastSegmentOutputDurationUs);
synchronized (lock) {
int floorIndex = outputSegmentStartTimesUs.size() - 1;
while (floorIndex > 0 && outputSegmentStartTimesUs.get(floorIndex) > playoutDurationUs) {
floorIndex--;
}
long lastSegmentOutputDurationUs =
playoutDurationUs - outputSegmentStartTimesUs.get(floorIndex);
long lastSegmentInputDurationUs;
if (floorIndex == outputSegmentStartTimesUs.size() - 1) {
lastSegmentInputDurationUs = getMediaDurationUsAtCurrentSpeed(lastSegmentOutputDurationUs);
} else {
lastSegmentInputDurationUs =
round(
lastSegmentOutputDurationUs
* divide(
inputSegmentStartTimesUs.get(floorIndex + 1)
- inputSegmentStartTimesUs.get(floorIndex),
outputSegmentStartTimesUs.get(floorIndex + 1)
- outputSegmentStartTimesUs.get(floorIndex)));
} else {
lastSegmentInputDurationUs =
round(
lastSegmentOutputDurationUs
* divide(
inputSegmentStartTimesUs.get(floorIndex + 1)
- inputSegmentStartTimesUs.get(floorIndex),
outputSegmentStartTimesUs.get(floorIndex + 1)
- outputSegmentStartTimesUs.get(floorIndex)));
}
return inputSegmentStartTimesUs.get(floorIndex) + lastSegmentInputDurationUs;
}
return inputSegmentStartTimesUs.get(floorIndex) + lastSegmentInputDurationUs;
}
/**
* Assuming enough audio has been processed, calculates the time at which the {@code inputTimeUs}
* is outputted at after the speed changes has been applied.
*/
@SuppressWarnings("GuardedBy") // All call sites are guarded.
private long calculateSpeedAdjustedTime(long inputTimeUs) {
int floorIndex = inputSegmentStartTimesUs.size() - 1;
while (floorIndex > 0 && inputSegmentStartTimesUs.get(floorIndex) > inputTimeUs) {
@ -299,7 +307,7 @@ public final class SpeedChangingAudioProcessor extends BaseAudioProcessor {
}
private void processPendingCallbacks() {
synchronized (pendingCallbacksLock) {
synchronized (lock) {
while (!pendingCallbacks.isEmpty()
&& (pendingCallbackInputTimesUs.element() <= lastProcessedInputTimeUs || isEnded())) {
pendingCallbacks
@ -309,6 +317,24 @@ public final class SpeedChangingAudioProcessor extends BaseAudioProcessor {
}
}
private void updateSpeed(float newSpeed, long timeUs) {
synchronized (lock) {
if (newSpeed != currentSpeed) {
updateSpeedChangeArrays(timeUs);
currentSpeed = newSpeed;
if (isUsingSonic()) {
sonicAudioProcessor.setSpeed(newSpeed);
sonicAudioProcessor.setPitch(newSpeed);
}
// Invalidate any previously created buffers in SonicAudioProcessor and the base class.
sonicAudioProcessor.flush();
endOfStreamQueuedToSonic = false;
super.getOutput();
}
}
}
@SuppressWarnings("GuardedBy") // All call sites are guarded.
private void updateSpeedChangeArrays(long currentSpeedChangeInputTimeUs) {
long lastSpeedChangeOutputTimeUs =
outputSegmentStartTimesUs.get(outputSegmentStartTimesUs.size() - 1);
@ -334,39 +360,50 @@ public final class SpeedChangingAudioProcessor extends BaseAudioProcessor {
: playoutDurationUs;
}
private long updateLastProcessedInputTime() {
if (isUsingSonic()) {
// TODO - b/320242819: Investigate whether bytesRead can be used here rather than
// sonicAudioProcessor.getProcessedInputBytes().
long currentProcessedInputDurationUs =
Util.scaleLargeTimestamp(
/* timestamp= */ sonicAudioProcessor.getProcessedInputBytes(),
/* multiplier= */ C.MICROS_PER_SECOND,
/* divisor= */ (long) inputAudioFormat.sampleRate * inputAudioFormat.bytesPerFrame);
return inputSegmentStartTimesUs.get(inputSegmentStartTimesUs.size() - 1)
+ currentProcessedInputDurationUs;
private void updateLastProcessedInputTime() {
synchronized (lock) {
if (isUsingSonic()) {
// TODO - b/320242819: Investigate whether bytesRead can be used here rather than
// sonicAudioProcessor.getProcessedInputBytes().
long currentProcessedInputDurationUs =
Util.scaleLargeTimestamp(
/* timestamp= */ sonicAudioProcessor.getProcessedInputBytes(),
/* multiplier= */ C.MICROS_PER_SECOND,
/* divisor= */ (long) inputAudioFormat.sampleRate * inputAudioFormat.bytesPerFrame);
lastProcessedInputTimeUs =
inputSegmentStartTimesUs.get(inputSegmentStartTimesUs.size() - 1)
+ currentProcessedInputDurationUs;
} else {
lastProcessedInputTimeUs =
Util.scaleLargeTimestamp(
/* timestamp= */ bytesRead,
/* multiplier= */ C.MICROS_PER_SECOND,
/* divisor= */ (long) inputAudioFormat.sampleRate * inputAudioFormat.bytesPerFrame);
}
}
return Util.scaleLargeTimestamp(
/* timestamp= */ bytesRead,
/* multiplier= */ C.MICROS_PER_SECOND,
/* divisor= */ (long) inputAudioFormat.sampleRate * inputAudioFormat.bytesPerFrame);
}
private boolean isUsingSonic() {
return currentSpeed != 1f;
synchronized (lock) {
return currentSpeed != 1f;
}
}
@EnsuresNonNull({"inputSegmentStartTimesUs", "outputSegmentStartTimesUs"})
@RequiresNonNull("lock")
private void resetState(@UnknownInitialization SpeedChangingAudioProcessor this) {
currentSpeed = 1f;
synchronized (lock) {
inputSegmentStartTimesUs = new LongArray();
outputSegmentStartTimesUs = new LongArray();
inputSegmentStartTimesUs.add(0);
outputSegmentStartTimesUs.add(0);
lastProcessedInputTimeUs = 0;
lastSpeedAdjustedInputTimeUs = 0;
lastSpeedAdjustedOutputTimeUs = 0;
currentSpeed = 1f;
}
bytesRead = 0;
inputSegmentStartTimesUs = new LongArray();
outputSegmentStartTimesUs = new LongArray();
inputSegmentStartTimesUs.add(0);
outputSegmentStartTimesUs.add(0);
lastProcessedInputTimeUs = 0;
lastSpeedAdjustedInputTimeUs = 0;
lastSpeedAdjustedOutputTimeUs = 0;
endOfStreamQueuedToSonic = false;
// TODO: b/339842724 - This should ideally also reset speedAdjustedTimeAsyncInputTimeUs and
// clear pendingCallbacks and pendingCallbacksInputTimes. We can't do this at the moment

View File

@ -0,0 +1,131 @@
/*
* Copyright (C) 2017 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package androidx.media3.common.audio;
import java.nio.ByteBuffer;
/**
* A thread safe version {@link SonicAudioProcessor} that synchronizes calls before forwarding them
* to {@link SonicAudioProcessor}.
*/
/* package */ class SynchronizedSonicAudioProcessor implements AudioProcessor {
private final Object lock;
private final SonicAudioProcessor sonicAudioProcessor;
public SynchronizedSonicAudioProcessor(Object lock) {
this.lock = lock;
sonicAudioProcessor = new SonicAudioProcessor();
}
public final void setSpeed(float speed) {
synchronized (lock) {
sonicAudioProcessor.setSpeed(speed);
}
}
public final void setPitch(float pitch) {
synchronized (lock) {
sonicAudioProcessor.setPitch(pitch);
}
}
public final void setOutputSampleRateHz(int sampleRateHz) {
synchronized (lock) {
sonicAudioProcessor.setOutputSampleRateHz(sampleRateHz);
}
}
public final long getMediaDuration(long playoutDuration) {
synchronized (lock) {
return sonicAudioProcessor.getMediaDuration(playoutDuration);
}
}
public final long getPlayoutDuration(long mediaDuration) {
synchronized (lock) {
return sonicAudioProcessor.getPlayoutDuration(mediaDuration);
}
}
public final long getProcessedInputBytes() {
synchronized (lock) {
return sonicAudioProcessor.getProcessedInputBytes();
}
}
@Override
public long getDurationAfterProcessorApplied(long durationUs) {
return getPlayoutDuration(durationUs);
}
@Override
public final AudioFormat configure(AudioFormat inputAudioFormat)
throws UnhandledAudioFormatException {
synchronized (lock) {
return sonicAudioProcessor.configure(inputAudioFormat);
}
}
@Override
public final boolean isActive() {
synchronized (lock) {
return sonicAudioProcessor.isActive();
}
}
@Override
public final void queueInput(ByteBuffer inputBuffer) {
synchronized (lock) {
sonicAudioProcessor.queueInput(inputBuffer);
}
}
@Override
public final void queueEndOfStream() {
synchronized (lock) {
sonicAudioProcessor.queueEndOfStream();
}
}
@Override
public final ByteBuffer getOutput() {
synchronized (lock) {
return sonicAudioProcessor.getOutput();
}
}
@Override
public final boolean isEnded() {
synchronized (lock) {
return sonicAudioProcessor.isEnded();
}
}
@Override
public final void flush() {
synchronized (lock) {
sonicAudioProcessor.flush();
}
}
@Override
public final void reset() {
synchronized (lock) {
sonicAudioProcessor.reset();
}
}
}