diff --git a/library/src/main/java/com/google/android/exoplayer/extractor/DefaultTrackOutput.java b/library/src/main/java/com/google/android/exoplayer/extractor/DefaultTrackOutput.java index 5e3cdbd829..3c80ecb12a 100644 --- a/library/src/main/java/com/google/android/exoplayer/extractor/DefaultTrackOutput.java +++ b/library/src/main/java/com/google/android/exoplayer/extractor/DefaultTrackOutput.java @@ -70,6 +70,17 @@ public final class DefaultTrackOutput implements TrackOutput { return rollingBuffer.getWriteIndex(); } + /** + * Discards samples from the write side of the queue. + * + * @param discardFromIndex The absolute index of the first sample to be discarded. + */ + public void discardUpstreamSamples(int discardFromIndex) { + rollingBuffer.discardUpstreamSamples(discardFromIndex); + largestParsedTimestampUs = rollingBuffer.peekSample(sampleInfoHolder) ? sampleInfoHolder.timeUs + : Long.MIN_VALUE; + } + // Called by the consuming thread. /** diff --git a/library/src/main/java/com/google/android/exoplayer/extractor/RollingSampleBuffer.java b/library/src/main/java/com/google/android/exoplayer/extractor/RollingSampleBuffer.java index 8aafa18385..124dc8b284 100644 --- a/library/src/main/java/com/google/android/exoplayer/extractor/RollingSampleBuffer.java +++ b/library/src/main/java/com/google/android/exoplayer/extractor/RollingSampleBuffer.java @@ -19,11 +19,12 @@ import com.google.android.exoplayer.C; import com.google.android.exoplayer.SampleHolder; import com.google.android.exoplayer.upstream.BufferPool; import com.google.android.exoplayer.upstream.DataSource; +import com.google.android.exoplayer.util.Assertions; import com.google.android.exoplayer.util.ParsableByteArray; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingDeque; /** * A rolling buffer of sample data and corresponding sample information. @@ -36,7 +37,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; private final int fragmentLength; private final InfoQueue infoQueue; - private final ConcurrentLinkedQueue dataQueue; + private final LinkedBlockingDeque dataQueue; private final SampleExtrasHolder extrasHolder; private final ParsableByteArray scratch; @@ -52,7 +53,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; this.fragmentPool = bufferPool; fragmentLength = bufferPool.bufferLength; infoQueue = new InfoQueue(); - dataQueue = new ConcurrentLinkedQueue(); + dataQueue = new LinkedBlockingDeque(); extrasHolder = new SampleExtrasHolder(); scratch = new ParsableByteArray(INITIAL_SCRATCH_SIZE); lastFragmentOffset = fragmentLength; @@ -81,6 +82,42 @@ import java.util.concurrent.ConcurrentLinkedQueue; return infoQueue.getWriteIndex(); } + /** + * Discards samples from the write side of the buffer. + * + * @param discardFromIndex The absolute index of the first sample to be discarded. + */ + public void discardUpstreamSamples(int discardFromIndex) { + totalBytesWritten = infoQueue.discardUpstreamSamples(discardFromIndex); + dropUpstreamFrom(totalBytesWritten); + } + + /** + * Discards data from the write side of the buffer. Data is discarded from the specified absolute + * position. Any fragments that are fully discarded are returned to the allocator. + * + * @param absolutePosition The absolute position (inclusive) from which to discard data. + */ + private void dropUpstreamFrom(long absolutePosition) { + int relativePosition = (int) (absolutePosition - totalBytesDropped); + // Calculate the index of the fragment containing the position, and the offset within it. + int fragmentIndex = relativePosition / fragmentLength; + int fragmentOffset = relativePosition % fragmentLength; + // We want to discard any fragments after the one at fragmentIndex. + int fragmentDiscardCount = dataQueue.size() - fragmentIndex - 1; + if (fragmentOffset == 0) { + // If the fragment at fragmentIndex is empty, we should discard that one too. + fragmentDiscardCount++; + } + // Discard the fragments. + for (int i = 0; i < fragmentDiscardCount; i++) { + fragmentPool.releaseDirect(dataQueue.removeLast()); + } + // Update lastFragment and lastFragmentOffset to reflect the new position. + lastFragment = dataQueue.peekLast(); + lastFragmentOffset = fragmentOffset == 0 ? fragmentLength : fragmentOffset; + } + // Called by the consuming thread. /** @@ -435,6 +472,30 @@ import java.util.concurrent.ConcurrentLinkedQueue; return absoluteReadIndex + queueSize; } + /** + * Discards samples from the write side of the buffer. + * + * @param discardFromIndex The absolute index of the first sample to be discarded. + * @return The reduced total number of bytes written, after the samples have been discarded. + */ + public long discardUpstreamSamples(int discardFromIndex) { + int discardCount = getWriteIndex() - discardFromIndex; + Assertions.checkArgument(0 <= discardCount && discardCount <= queueSize); + + if (discardCount == 0) { + if (absoluteReadIndex == 0) { + // queueSize == absoluteReadIndex == 0, so nothing has been written to the queue. + return 0; + } + int lastWriteIndex = (relativeWriteIndex == 0 ? capacity : relativeWriteIndex) - 1; + return offsets[lastWriteIndex] + sizes[lastWriteIndex]; + } + + queueSize -= discardCount; + relativeWriteIndex = (relativeWriteIndex + capacity - discardCount) % capacity; + return offsets[relativeWriteIndex]; + } + // Called by the consuming thread. /**