Add ability to discard from write-side of DefaultTrackOutput.

This commit is contained in:
Oliver Woodman 2015-04-11 02:03:43 +01:00
parent 4c8f9a8c6f
commit ad56490bde
2 changed files with 75 additions and 3 deletions

View File

@ -70,6 +70,17 @@ public final class DefaultTrackOutput implements TrackOutput {
return rollingBuffer.getWriteIndex();
}
/**
* Discards samples from the write side of the queue.
*
* @param discardFromIndex The absolute index of the first sample to be discarded.
*/
public void discardUpstreamSamples(int discardFromIndex) {
rollingBuffer.discardUpstreamSamples(discardFromIndex);
largestParsedTimestampUs = rollingBuffer.peekSample(sampleInfoHolder) ? sampleInfoHolder.timeUs
: Long.MIN_VALUE;
}
// Called by the consuming thread.
/**

View File

@ -19,11 +19,12 @@ import com.google.android.exoplayer.C;
import com.google.android.exoplayer.SampleHolder;
import com.google.android.exoplayer.upstream.BufferPool;
import com.google.android.exoplayer.upstream.DataSource;
import com.google.android.exoplayer.util.Assertions;
import com.google.android.exoplayer.util.ParsableByteArray;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingDeque;
/**
* A rolling buffer of sample data and corresponding sample information.
@ -36,7 +37,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
private final int fragmentLength;
private final InfoQueue infoQueue;
private final ConcurrentLinkedQueue<byte[]> dataQueue;
private final LinkedBlockingDeque<byte[]> dataQueue;
private final SampleExtrasHolder extrasHolder;
private final ParsableByteArray scratch;
@ -52,7 +53,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
this.fragmentPool = bufferPool;
fragmentLength = bufferPool.bufferLength;
infoQueue = new InfoQueue();
dataQueue = new ConcurrentLinkedQueue<byte[]>();
dataQueue = new LinkedBlockingDeque<byte[]>();
extrasHolder = new SampleExtrasHolder();
scratch = new ParsableByteArray(INITIAL_SCRATCH_SIZE);
lastFragmentOffset = fragmentLength;
@ -81,6 +82,42 @@ import java.util.concurrent.ConcurrentLinkedQueue;
return infoQueue.getWriteIndex();
}
/**
* Discards samples from the write side of the buffer.
*
* @param discardFromIndex The absolute index of the first sample to be discarded.
*/
public void discardUpstreamSamples(int discardFromIndex) {
totalBytesWritten = infoQueue.discardUpstreamSamples(discardFromIndex);
dropUpstreamFrom(totalBytesWritten);
}
/**
* Discards data from the write side of the buffer. Data is discarded from the specified absolute
* position. Any fragments that are fully discarded are returned to the allocator.
*
* @param absolutePosition The absolute position (inclusive) from which to discard data.
*/
private void dropUpstreamFrom(long absolutePosition) {
int relativePosition = (int) (absolutePosition - totalBytesDropped);
// Calculate the index of the fragment containing the position, and the offset within it.
int fragmentIndex = relativePosition / fragmentLength;
int fragmentOffset = relativePosition % fragmentLength;
// We want to discard any fragments after the one at fragmentIndex.
int fragmentDiscardCount = dataQueue.size() - fragmentIndex - 1;
if (fragmentOffset == 0) {
// If the fragment at fragmentIndex is empty, we should discard that one too.
fragmentDiscardCount++;
}
// Discard the fragments.
for (int i = 0; i < fragmentDiscardCount; i++) {
fragmentPool.releaseDirect(dataQueue.removeLast());
}
// Update lastFragment and lastFragmentOffset to reflect the new position.
lastFragment = dataQueue.peekLast();
lastFragmentOffset = fragmentOffset == 0 ? fragmentLength : fragmentOffset;
}
// Called by the consuming thread.
/**
@ -435,6 +472,30 @@ import java.util.concurrent.ConcurrentLinkedQueue;
return absoluteReadIndex + queueSize;
}
/**
* Discards samples from the write side of the buffer.
*
* @param discardFromIndex The absolute index of the first sample to be discarded.
* @return The reduced total number of bytes written, after the samples have been discarded.
*/
public long discardUpstreamSamples(int discardFromIndex) {
int discardCount = getWriteIndex() - discardFromIndex;
Assertions.checkArgument(0 <= discardCount && discardCount <= queueSize);
if (discardCount == 0) {
if (absoluteReadIndex == 0) {
// queueSize == absoluteReadIndex == 0, so nothing has been written to the queue.
return 0;
}
int lastWriteIndex = (relativeWriteIndex == 0 ? capacity : relativeWriteIndex) - 1;
return offsets[lastWriteIndex] + sizes[lastWriteIndex];
}
queueSize -= discardCount;
relativeWriteIndex = (relativeWriteIndex + capacity - discardCount) % capacity;
return offsets[relativeWriteIndex];
}
// Called by the consuming thread.
/**