diff --git a/library/core/src/main/java/com/google/android/exoplayer2/extractor/DefaultTrackOutput.java b/library/core/src/main/java/com/google/android/exoplayer2/extractor/DefaultTrackOutput.java index c879d8e695..09970aaff0 100644 --- a/library/core/src/main/java/com/google/android/exoplayer2/extractor/DefaultTrackOutput.java +++ b/library/core/src/main/java/com/google/android/exoplayer2/extractor/DefaultTrackOutput.java @@ -19,11 +19,10 @@ import com.google.android.exoplayer2.C; import com.google.android.exoplayer2.Format; import com.google.android.exoplayer2.FormatHolder; import com.google.android.exoplayer2.decoder.DecoderInputBuffer; +import com.google.android.exoplayer2.extractor.SampleMetadataQueue.SampleExtrasHolder; import com.google.android.exoplayer2.upstream.Allocation; import com.google.android.exoplayer2.upstream.Allocator; -import com.google.android.exoplayer2.util.Assertions; import com.google.android.exoplayer2.util.ParsableByteArray; -import com.google.android.exoplayer2.util.Util; import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; @@ -59,9 +58,9 @@ public final class DefaultTrackOutput implements TrackOutput { private final Allocator allocator; private final int allocationLength; - private final InfoQueue infoQueue; + private final SampleMetadataQueue metadataQueue; private final LinkedBlockingDeque dataQueue; - private final BufferExtrasHolder extrasHolder; + private final SampleExtrasHolder extrasHolder; private final ParsableByteArray scratch; private final AtomicInteger state; @@ -85,9 +84,9 @@ public final class DefaultTrackOutput implements TrackOutput { public DefaultTrackOutput(Allocator allocator) { this.allocator = allocator; allocationLength = allocator.getIndividualAllocationLength(); - infoQueue = new InfoQueue(); + metadataQueue = new SampleMetadataQueue(); dataQueue = new LinkedBlockingDeque<>(); - extrasHolder = new BufferExtrasHolder(); + extrasHolder = new SampleExtrasHolder(); scratch = new ParsableByteArray(INITIAL_SCRATCH_SIZE); state = new AtomicInteger(); lastAllocationOffset = allocationLength; @@ -103,7 +102,7 @@ public final class DefaultTrackOutput implements TrackOutput { public void reset(boolean enable) { int previousState = state.getAndSet(enable ? STATE_ENABLED : STATE_DISABLED); clearSampleData(); - infoQueue.resetLargestParsedTimestamps(); + metadataQueue.resetLargestParsedTimestamps(); if (previousState == STATE_DISABLED) { downstreamFormat = null; } @@ -115,7 +114,7 @@ public final class DefaultTrackOutput implements TrackOutput { * @param sourceId The source identifier. */ public void sourceId(int sourceId) { - infoQueue.sourceId(sourceId); + metadataQueue.sourceId(sourceId); } /** @@ -130,7 +129,7 @@ public final class DefaultTrackOutput implements TrackOutput { * Returns the current absolute write index. */ public int getWriteIndex() { - return infoQueue.getWriteIndex(); + return metadataQueue.getWriteIndex(); } /** @@ -139,7 +138,7 @@ public final class DefaultTrackOutput implements TrackOutput { * @param discardFromIndex The absolute index of the first sample to be discarded. */ public void discardUpstreamSamples(int discardFromIndex) { - totalBytesWritten = infoQueue.discardUpstreamSamples(discardFromIndex); + totalBytesWritten = metadataQueue.discardUpstreamSamples(discardFromIndex); dropUpstreamFrom(totalBytesWritten); } @@ -184,14 +183,14 @@ public final class DefaultTrackOutput implements TrackOutput { * Returns whether the buffer is empty. */ public boolean isEmpty() { - return infoQueue.isEmpty(); + return metadataQueue.isEmpty(); } /** * Returns the current absolute read index. */ public int getReadIndex() { - return infoQueue.getReadIndex(); + return metadataQueue.getReadIndex(); } /** @@ -201,14 +200,14 @@ public final class DefaultTrackOutput implements TrackOutput { * @return The source id. */ public int peekSourceId() { - return infoQueue.peekSourceId(); + return metadataQueue.peekSourceId(); } /** * Returns the upstream {@link Format} in which samples are being queued. */ public Format getUpstreamFormat() { - return infoQueue.getUpstreamFormat(); + return metadataQueue.getUpstreamFormat(); } /** @@ -222,14 +221,14 @@ public final class DefaultTrackOutput implements TrackOutput { * samples have been queued. */ public long getLargestQueuedTimestampUs() { - return infoQueue.getLargestQueuedTimestampUs(); + return metadataQueue.getLargestQueuedTimestampUs(); } /** * Skips all samples currently in the buffer. */ public void skipAll() { - long nextOffset = infoQueue.skipAll(); + long nextOffset = metadataQueue.skipAll(); if (nextOffset != C.POSITION_UNSET) { dropDownstreamTo(nextOffset); } @@ -247,7 +246,7 @@ public final class DefaultTrackOutput implements TrackOutput { * @return Whether the skip was successful. */ public boolean skipToKeyframeBefore(long timeUs, boolean allowTimeBeyondBuffer) { - long nextOffset = infoQueue.skipToKeyframeBefore(timeUs, allowTimeBeyondBuffer); + long nextOffset = metadataQueue.skipToKeyframeBefore(timeUs, allowTimeBeyondBuffer); if (nextOffset == C.POSITION_UNSET) { return false; } @@ -273,7 +272,7 @@ public final class DefaultTrackOutput implements TrackOutput { */ public int readData(FormatHolder formatHolder, DecoderInputBuffer buffer, boolean formatRequired, boolean loadingFinished, long decodeOnlyUntilUs) { - int result = infoQueue.readData(formatHolder, buffer, formatRequired, loadingFinished, + int result = metadataQueue.readData(formatHolder, buffer, formatRequired, loadingFinished, downstreamFormat, extrasHolder); switch (result) { case C.RESULT_FORMAT_READ: @@ -306,13 +305,13 @@ public final class DefaultTrackOutput implements TrackOutput { * Reads encryption data for the current sample. *

* The encryption data is written into {@link DecoderInputBuffer#cryptoInfo}, and - * {@link BufferExtrasHolder#size} is adjusted to subtract the number of bytes that were read. The - * same value is added to {@link BufferExtrasHolder#offset}. + * {@link SampleExtrasHolder#size} is adjusted to subtract the number of bytes that were read. The + * same value is added to {@link SampleExtrasHolder#offset}. * * @param buffer The buffer into which the encryption data should be written. * @param extrasHolder The extras holder whose offset should be read and subsequently adjusted. */ - private void readEncryptionData(DecoderInputBuffer buffer, BufferExtrasHolder extrasHolder) { + private void readEncryptionData(DecoderInputBuffer buffer, SampleExtrasHolder extrasHolder) { long offset = extrasHolder.offset; // Read the signal byte. @@ -459,7 +458,7 @@ public final class DefaultTrackOutput implements TrackOutput { @Override public void format(Format format) { Format adjustedFormat = getAdjustedSampleFormat(format, sampleOffsetUs); - boolean formatChanged = infoQueue.format(adjustedFormat); + boolean formatChanged = metadataQueue.format(adjustedFormat); lastUnadjustedFormat = format; pendingFormatAdjustment = false; if (upstreamFormatChangeListener != null && formatChanged) { @@ -522,19 +521,19 @@ public final class DefaultTrackOutput implements TrackOutput { format(lastUnadjustedFormat); } if (!startWriteOperation()) { - infoQueue.commitSampleTimestamp(timeUs); + metadataQueue.commitSampleTimestamp(timeUs); return; } try { if (pendingSplice) { - if ((flags & C.BUFFER_FLAG_KEY_FRAME) == 0 || !infoQueue.attemptSplice(timeUs)) { + if ((flags & C.BUFFER_FLAG_KEY_FRAME) == 0 || !metadataQueue.attemptSplice(timeUs)) { return; } pendingSplice = false; } timeUs += sampleOffsetUs; long absoluteOffset = totalBytesWritten - size - offset; - infoQueue.commitSample(timeUs, flags, absoluteOffset, size, cryptoData); + metadataQueue.commitSample(timeUs, flags, absoluteOffset, size, cryptoData); } finally { endWriteOperation(); } @@ -553,7 +552,7 @@ public final class DefaultTrackOutput implements TrackOutput { } private void clearSampleData() { - infoQueue.clearSampleData(); + metadataQueue.clearSampleData(); allocator.release(dataQueue.toArray(new Allocation[dataQueue.size()])); dataQueue.clear(); allocator.trim(); @@ -593,406 +592,4 @@ public final class DefaultTrackOutput implements TrackOutput { return format; } - /** - * Holds information about the samples in the rolling buffer. - */ - private static final class InfoQueue { - - private static final int SAMPLE_CAPACITY_INCREMENT = 1000; - - private int capacity; - - private int[] sourceIds; - private long[] offsets; - private int[] sizes; - private int[] flags; - private long[] timesUs; - private CryptoData[] cryptoDatas; - private Format[] formats; - - private int queueSize; - private int absoluteReadIndex; - private int relativeReadIndex; - private int relativeWriteIndex; - - private long largestDequeuedTimestampUs; - private long largestQueuedTimestampUs; - private boolean upstreamKeyframeRequired; - private boolean upstreamFormatRequired; - private Format upstreamFormat; - private int upstreamSourceId; - - public InfoQueue() { - capacity = SAMPLE_CAPACITY_INCREMENT; - sourceIds = new int[capacity]; - offsets = new long[capacity]; - timesUs = new long[capacity]; - flags = new int[capacity]; - sizes = new int[capacity]; - cryptoDatas = new CryptoData[capacity]; - formats = new Format[capacity]; - largestDequeuedTimestampUs = Long.MIN_VALUE; - largestQueuedTimestampUs = Long.MIN_VALUE; - upstreamFormatRequired = true; - upstreamKeyframeRequired = true; - } - - public void clearSampleData() { - absoluteReadIndex = 0; - relativeReadIndex = 0; - relativeWriteIndex = 0; - queueSize = 0; - upstreamKeyframeRequired = true; - } - - // Called by the consuming thread, but only when there is no loading thread. - - public void resetLargestParsedTimestamps() { - largestDequeuedTimestampUs = Long.MIN_VALUE; - largestQueuedTimestampUs = Long.MIN_VALUE; - } - - /** - * Returns the current absolute write index. - */ - public int getWriteIndex() { - return absoluteReadIndex + queueSize; - } - - /** - * Discards samples from the write side of the buffer. - * - * @param discardFromIndex The absolute index of the first sample to be discarded. - * @return The reduced total number of bytes written, after the samples have been discarded. - */ - public long discardUpstreamSamples(int discardFromIndex) { - int discardCount = getWriteIndex() - discardFromIndex; - Assertions.checkArgument(0 <= discardCount && discardCount <= queueSize); - - if (discardCount == 0) { - if (absoluteReadIndex == 0) { - // queueSize == absoluteReadIndex == 0, so nothing has been written to the queue. - return 0; - } - int lastWriteIndex = (relativeWriteIndex == 0 ? capacity : relativeWriteIndex) - 1; - return offsets[lastWriteIndex] + sizes[lastWriteIndex]; - } - - queueSize -= discardCount; - relativeWriteIndex = (relativeWriteIndex + capacity - discardCount) % capacity; - // Update the largest queued timestamp, assuming that the timestamps prior to a keyframe are - // always less than the timestamp of the keyframe itself, and of subsequent frames. - largestQueuedTimestampUs = Long.MIN_VALUE; - for (int i = queueSize - 1; i >= 0; i--) { - int sampleIndex = (relativeReadIndex + i) % capacity; - largestQueuedTimestampUs = Math.max(largestQueuedTimestampUs, timesUs[sampleIndex]); - if ((flags[sampleIndex] & C.BUFFER_FLAG_KEY_FRAME) != 0) { - break; - } - } - return offsets[relativeWriteIndex]; - } - - public void sourceId(int sourceId) { - upstreamSourceId = sourceId; - } - - // Called by the consuming thread. - - /** - * Returns the current absolute read index. - */ - public int getReadIndex() { - return absoluteReadIndex; - } - - /** - * Peeks the source id of the next sample, or the current upstream source id if the queue is - * empty. - */ - public int peekSourceId() { - return queueSize == 0 ? upstreamSourceId : sourceIds[relativeReadIndex]; - } - - /** - * Returns whether the queue is empty. - */ - public synchronized boolean isEmpty() { - return queueSize == 0; - } - - /** - * Returns the upstream {@link Format} in which samples are being queued. - */ - public synchronized Format getUpstreamFormat() { - return upstreamFormatRequired ? null : upstreamFormat; - } - - /** - * Returns the largest sample timestamp that has been queued since the last {@link #reset}. - *

- * Samples that were discarded by calling {@link #discardUpstreamSamples(int)} are not - * considered as having been queued. Samples that were dequeued from the front of the queue are - * considered as having been queued. - * - * @return The largest sample timestamp that has been queued, or {@link Long#MIN_VALUE} if no - * samples have been queued. - */ - public synchronized long getLargestQueuedTimestampUs() { - return Math.max(largestDequeuedTimestampUs, largestQueuedTimestampUs); - } - - /** - * Attempts to read from the queue. - * - * @param formatHolder A {@link FormatHolder} to populate in the case of reading a format. - * @param buffer A {@link DecoderInputBuffer} to populate in the case of reading a sample or the - * end of the stream. If a sample is read then the buffer is populated with information - * about the sample, but not its data. The size and absolute position of the data in the - * rolling buffer is stored in {@code extrasHolder}, along with an encryption id if present - * and the absolute position of the first byte that may still be required after the current - * sample has been read. May be null if the caller requires that the format of the stream be - * read even if it's not changing. - * @param formatRequired Whether the caller requires that the format of the stream be read even - * if it's not changing. A sample will never be read if set to true, however it is still - * possible for the end of stream or nothing to be read. - * @param loadingFinished True if an empty queue should be considered the end of the stream. - * @param downstreamFormat The current downstream {@link Format}. If the format of the next - * sample is different to the current downstream format then a format will be read. - * @param extrasHolder The holder into which extra sample information should be written. - * @return The result, which can be {@link C#RESULT_NOTHING_READ}, {@link C#RESULT_FORMAT_READ} - * or {@link C#RESULT_BUFFER_READ}. - */ - @SuppressWarnings("ReferenceEquality") - public synchronized int readData(FormatHolder formatHolder, DecoderInputBuffer buffer, - boolean formatRequired, boolean loadingFinished, Format downstreamFormat, - BufferExtrasHolder extrasHolder) { - if (queueSize == 0) { - if (loadingFinished) { - buffer.setFlags(C.BUFFER_FLAG_END_OF_STREAM); - return C.RESULT_BUFFER_READ; - } else if (upstreamFormat != null - && (formatRequired || upstreamFormat != downstreamFormat)) { - formatHolder.format = upstreamFormat; - return C.RESULT_FORMAT_READ; - } else { - return C.RESULT_NOTHING_READ; - } - } - - if (formatRequired || formats[relativeReadIndex] != downstreamFormat) { - formatHolder.format = formats[relativeReadIndex]; - return C.RESULT_FORMAT_READ; - } - - if (buffer.isFlagsOnly()) { - return C.RESULT_NOTHING_READ; - } - - buffer.timeUs = timesUs[relativeReadIndex]; - buffer.setFlags(flags[relativeReadIndex]); - extrasHolder.size = sizes[relativeReadIndex]; - extrasHolder.offset = offsets[relativeReadIndex]; - extrasHolder.cryptoData = cryptoDatas[relativeReadIndex]; - - largestDequeuedTimestampUs = Math.max(largestDequeuedTimestampUs, buffer.timeUs); - queueSize--; - relativeReadIndex++; - absoluteReadIndex++; - if (relativeReadIndex == capacity) { - // Wrap around. - relativeReadIndex = 0; - } - - extrasHolder.nextOffset = queueSize > 0 ? offsets[relativeReadIndex] - : extrasHolder.offset + extrasHolder.size; - return C.RESULT_BUFFER_READ; - } - - /** - * Skips all samples in the buffer. - * - * @return The offset up to which data should be dropped, or {@link C#POSITION_UNSET} if no - * dropping of data is required. - */ - public synchronized long skipAll() { - if (queueSize == 0) { - return C.POSITION_UNSET; - } - - int lastSampleIndex = (relativeReadIndex + queueSize - 1) % capacity; - relativeReadIndex = (relativeReadIndex + queueSize) % capacity; - absoluteReadIndex += queueSize; - queueSize = 0; - return offsets[lastSampleIndex] + sizes[lastSampleIndex]; - } - - /** - * Attempts to locate the keyframe before or at the specified time. If - * {@code allowTimeBeyondBuffer} is {@code false} then it is also required that {@code timeUs} - * falls within the buffer. - * - * @param timeUs The seek time. - * @param allowTimeBeyondBuffer Whether the skip can succeed if {@code timeUs} is beyond the end - * of the buffer. - * @return The offset of the keyframe's data if the keyframe was present. - * {@link C#POSITION_UNSET} otherwise. - */ - public synchronized long skipToKeyframeBefore(long timeUs, boolean allowTimeBeyondBuffer) { - if (queueSize == 0 || timeUs < timesUs[relativeReadIndex]) { - return C.POSITION_UNSET; - } - - if (timeUs > largestQueuedTimestampUs && !allowTimeBeyondBuffer) { - return C.POSITION_UNSET; - } - - // This could be optimized to use a binary search, however in practice callers to this method - // often pass times near to the start of the buffer. Hence it's unclear whether switching to - // a binary search would yield any real benefit. - int sampleCount = 0; - int sampleCountToKeyframe = -1; - int searchIndex = relativeReadIndex; - while (searchIndex != relativeWriteIndex) { - if (timesUs[searchIndex] > timeUs) { - // We've gone too far. - break; - } else if ((flags[searchIndex] & C.BUFFER_FLAG_KEY_FRAME) != 0) { - // We've found a keyframe, and we're still before the seek position. - sampleCountToKeyframe = sampleCount; - } - searchIndex = (searchIndex + 1) % capacity; - sampleCount++; - } - - if (sampleCountToKeyframe == -1) { - return C.POSITION_UNSET; - } - - relativeReadIndex = (relativeReadIndex + sampleCountToKeyframe) % capacity; - absoluteReadIndex += sampleCountToKeyframe; - queueSize -= sampleCountToKeyframe; - return offsets[relativeReadIndex]; - } - - // Called by the loading thread. - - public synchronized boolean format(Format format) { - if (format == null) { - upstreamFormatRequired = true; - return false; - } - upstreamFormatRequired = false; - if (Util.areEqual(format, upstreamFormat)) { - // Suppress changes between equal formats so we can use referential equality in readData. - return false; - } else { - upstreamFormat = format; - return true; - } - } - - public synchronized void commitSample(long timeUs, @C.BufferFlags int sampleFlags, long offset, - int size, CryptoData cryptoData) { - if (upstreamKeyframeRequired) { - if ((sampleFlags & C.BUFFER_FLAG_KEY_FRAME) == 0) { - return; - } - upstreamKeyframeRequired = false; - } - Assertions.checkState(!upstreamFormatRequired); - commitSampleTimestamp(timeUs); - timesUs[relativeWriteIndex] = timeUs; - offsets[relativeWriteIndex] = offset; - sizes[relativeWriteIndex] = size; - flags[relativeWriteIndex] = sampleFlags; - cryptoDatas[relativeWriteIndex] = cryptoData; - formats[relativeWriteIndex] = upstreamFormat; - sourceIds[relativeWriteIndex] = upstreamSourceId; - // Increment the write index. - queueSize++; - if (queueSize == capacity) { - // Increase the capacity. - int newCapacity = capacity + SAMPLE_CAPACITY_INCREMENT; - int[] newSourceIds = new int[newCapacity]; - long[] newOffsets = new long[newCapacity]; - long[] newTimesUs = new long[newCapacity]; - int[] newFlags = new int[newCapacity]; - int[] newSizes = new int[newCapacity]; - CryptoData[] newCryptoDatas = new CryptoData[newCapacity]; - Format[] newFormats = new Format[newCapacity]; - int beforeWrap = capacity - relativeReadIndex; - System.arraycopy(offsets, relativeReadIndex, newOffsets, 0, beforeWrap); - System.arraycopy(timesUs, relativeReadIndex, newTimesUs, 0, beforeWrap); - System.arraycopy(flags, relativeReadIndex, newFlags, 0, beforeWrap); - System.arraycopy(sizes, relativeReadIndex, newSizes, 0, beforeWrap); - System.arraycopy(cryptoDatas, relativeReadIndex, newCryptoDatas, 0, beforeWrap); - System.arraycopy(formats, relativeReadIndex, newFormats, 0, beforeWrap); - System.arraycopy(sourceIds, relativeReadIndex, newSourceIds, 0, beforeWrap); - int afterWrap = relativeReadIndex; - System.arraycopy(offsets, 0, newOffsets, beforeWrap, afterWrap); - System.arraycopy(timesUs, 0, newTimesUs, beforeWrap, afterWrap); - System.arraycopy(flags, 0, newFlags, beforeWrap, afterWrap); - System.arraycopy(sizes, 0, newSizes, beforeWrap, afterWrap); - System.arraycopy(cryptoDatas, 0, newCryptoDatas, beforeWrap, afterWrap); - System.arraycopy(formats, 0, newFormats, beforeWrap, afterWrap); - System.arraycopy(sourceIds, 0, newSourceIds, beforeWrap, afterWrap); - offsets = newOffsets; - timesUs = newTimesUs; - flags = newFlags; - sizes = newSizes; - cryptoDatas = newCryptoDatas; - formats = newFormats; - sourceIds = newSourceIds; - relativeReadIndex = 0; - relativeWriteIndex = capacity; - queueSize = capacity; - capacity = newCapacity; - } else { - relativeWriteIndex++; - if (relativeWriteIndex == capacity) { - // Wrap around. - relativeWriteIndex = 0; - } - } - } - - public synchronized void commitSampleTimestamp(long timeUs) { - largestQueuedTimestampUs = Math.max(largestQueuedTimestampUs, timeUs); - } - - /** - * Attempts to discard samples from the tail of the queue to allow samples starting from the - * specified timestamp to be spliced in. - * - * @param timeUs The timestamp at which the splice occurs. - * @return Whether the splice was successful. - */ - public synchronized boolean attemptSplice(long timeUs) { - if (largestDequeuedTimestampUs >= timeUs) { - return false; - } - int retainCount = queueSize; - while (retainCount > 0 - && timesUs[(relativeReadIndex + retainCount - 1) % capacity] >= timeUs) { - retainCount--; - } - discardUpstreamSamples(absoluteReadIndex + retainCount); - return true; - } - - } - - /** - * Holds additional buffer information not held by {@link DecoderInputBuffer}. - */ - private static final class BufferExtrasHolder { - - public int size; - public long offset; - public long nextOffset; - public CryptoData cryptoData; - - } - } diff --git a/library/core/src/main/java/com/google/android/exoplayer2/extractor/SampleMetadataQueue.java b/library/core/src/main/java/com/google/android/exoplayer2/extractor/SampleMetadataQueue.java new file mode 100644 index 0000000000..9802392913 --- /dev/null +++ b/library/core/src/main/java/com/google/android/exoplayer2/extractor/SampleMetadataQueue.java @@ -0,0 +1,426 @@ +/* + * Copyright (C) 2017 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.android.exoplayer2.extractor; + +import com.google.android.exoplayer2.C; +import com.google.android.exoplayer2.Format; +import com.google.android.exoplayer2.FormatHolder; +import com.google.android.exoplayer2.decoder.DecoderInputBuffer; +import com.google.android.exoplayer2.extractor.TrackOutput.CryptoData; +import com.google.android.exoplayer2.util.Assertions; +import com.google.android.exoplayer2.util.Util; + +/** + * A queue of metadata describing the contents of a media buffer. + */ +/* package */ final class SampleMetadataQueue { + + /** + * A holder for sample metadata not held by {@link DecoderInputBuffer}. + */ + public static final class SampleExtrasHolder { + + public int size; + public long offset; + public long nextOffset; + public CryptoData cryptoData; + + } + + private static final int SAMPLE_CAPACITY_INCREMENT = 1000; + + private int capacity; + private int[] sourceIds; + private long[] offsets; + private int[] sizes; + private int[] flags; + private long[] timesUs; + private CryptoData[] cryptoDatas; + private Format[] formats; + + private int queueSize; + private int absoluteReadIndex; + private int relativeReadIndex; + private int relativeWriteIndex; + + private long largestDequeuedTimestampUs; + private long largestQueuedTimestampUs; + private boolean upstreamKeyframeRequired; + private boolean upstreamFormatRequired; + private Format upstreamFormat; + private int upstreamSourceId; + + public SampleMetadataQueue() { + capacity = SAMPLE_CAPACITY_INCREMENT; + sourceIds = new int[capacity]; + offsets = new long[capacity]; + timesUs = new long[capacity]; + flags = new int[capacity]; + sizes = new int[capacity]; + cryptoDatas = new CryptoData[capacity]; + formats = new Format[capacity]; + largestDequeuedTimestampUs = Long.MIN_VALUE; + largestQueuedTimestampUs = Long.MIN_VALUE; + upstreamFormatRequired = true; + upstreamKeyframeRequired = true; + } + + public void clearSampleData() { + absoluteReadIndex = 0; + relativeReadIndex = 0; + relativeWriteIndex = 0; + queueSize = 0; + upstreamKeyframeRequired = true; + } + + // Called by the consuming thread, but only when there is no loading thread. + + public void resetLargestParsedTimestamps() { + largestDequeuedTimestampUs = Long.MIN_VALUE; + largestQueuedTimestampUs = Long.MIN_VALUE; + } + + /** + * Returns the current absolute write index. + */ + public int getWriteIndex() { + return absoluteReadIndex + queueSize; + } + + /** + * Discards samples from the write side of the buffer. + * + * @param discardFromIndex The absolute index of the first sample to be discarded. + * @return The reduced total number of bytes written, after the samples have been discarded. + */ + public long discardUpstreamSamples(int discardFromIndex) { + int discardCount = getWriteIndex() - discardFromIndex; + Assertions.checkArgument(0 <= discardCount && discardCount <= queueSize); + + if (discardCount == 0) { + if (absoluteReadIndex == 0) { + // queueSize == absoluteReadIndex == 0, so nothing has been written to the queue. + return 0; + } + int lastWriteIndex = (relativeWriteIndex == 0 ? capacity : relativeWriteIndex) - 1; + return offsets[lastWriteIndex] + sizes[lastWriteIndex]; + } + + queueSize -= discardCount; + relativeWriteIndex = (relativeWriteIndex + capacity - discardCount) % capacity; + // Update the largest queued timestamp, assuming that the timestamps prior to a keyframe are + // always less than the timestamp of the keyframe itself, and of subsequent frames. + largestQueuedTimestampUs = Long.MIN_VALUE; + for (int i = queueSize - 1; i >= 0; i--) { + int sampleIndex = (relativeReadIndex + i) % capacity; + largestQueuedTimestampUs = Math.max(largestQueuedTimestampUs, timesUs[sampleIndex]); + if ((flags[sampleIndex] & C.BUFFER_FLAG_KEY_FRAME) != 0) { + break; + } + } + return offsets[relativeWriteIndex]; + } + + public void sourceId(int sourceId) { + upstreamSourceId = sourceId; + } + + // Called by the consuming thread. + + /** + * Returns the current absolute read index. + */ + public int getReadIndex() { + return absoluteReadIndex; + } + + /** + * Peeks the source id of the next sample, or the current upstream source id if the queue is + * empty. + */ + public int peekSourceId() { + return queueSize == 0 ? upstreamSourceId : sourceIds[relativeReadIndex]; + } + + /** + * Returns whether the queue is empty. + */ + public synchronized boolean isEmpty() { + return queueSize == 0; + } + + /** + * Returns the upstream {@link Format} in which samples are being queued. + */ + public synchronized Format getUpstreamFormat() { + return upstreamFormatRequired ? null : upstreamFormat; + } + + /** + * Returns the largest sample timestamp that has been queued since the last call to + * {@link #resetLargestParsedTimestamps()}. + *

+ * Samples that were discarded by calling {@link #discardUpstreamSamples(int)} are not + * considered as having been queued. Samples that were dequeued from the front of the queue are + * considered as having been queued. + * + * @return The largest sample timestamp that has been queued, or {@link Long#MIN_VALUE} if no + * samples have been queued. + */ + public synchronized long getLargestQueuedTimestampUs() { + return Math.max(largestDequeuedTimestampUs, largestQueuedTimestampUs); + } + + /** + * Attempts to read from the queue. + * + * @param formatHolder A {@link FormatHolder} to populate in the case of reading a format. + * @param buffer A {@link DecoderInputBuffer} to populate in the case of reading a sample or the + * end of the stream. If a sample is read then the buffer is populated with information + * about the sample, but not its data. The size and absolute position of the data in the + * rolling buffer is stored in {@code extrasHolder}, along with an encryption id if present + * and the absolute position of the first byte that may still be required after the current + * sample has been read. May be null if the caller requires that the format of the stream be + * read even if it's not changing. + * @param formatRequired Whether the caller requires that the format of the stream be read even + * if it's not changing. A sample will never be read if set to true, however it is still + * possible for the end of stream or nothing to be read. + * @param loadingFinished True if an empty queue should be considered the end of the stream. + * @param downstreamFormat The current downstream {@link Format}. If the format of the next + * sample is different to the current downstream format then a format will be read. + * @param extrasHolder The holder into which extra sample information should be written. + * @return The result, which can be {@link C#RESULT_NOTHING_READ}, {@link C#RESULT_FORMAT_READ} + * or {@link C#RESULT_BUFFER_READ}. + */ + @SuppressWarnings("ReferenceEquality") + public synchronized int readData(FormatHolder formatHolder, DecoderInputBuffer buffer, + boolean formatRequired, boolean loadingFinished, Format downstreamFormat, + SampleExtrasHolder extrasHolder) { + if (queueSize == 0) { + if (loadingFinished) { + buffer.setFlags(C.BUFFER_FLAG_END_OF_STREAM); + return C.RESULT_BUFFER_READ; + } else if (upstreamFormat != null + && (formatRequired || upstreamFormat != downstreamFormat)) { + formatHolder.format = upstreamFormat; + return C.RESULT_FORMAT_READ; + } else { + return C.RESULT_NOTHING_READ; + } + } + + if (formatRequired || formats[relativeReadIndex] != downstreamFormat) { + formatHolder.format = formats[relativeReadIndex]; + return C.RESULT_FORMAT_READ; + } + + if (buffer.isFlagsOnly()) { + return C.RESULT_NOTHING_READ; + } + + buffer.timeUs = timesUs[relativeReadIndex]; + buffer.setFlags(flags[relativeReadIndex]); + extrasHolder.size = sizes[relativeReadIndex]; + extrasHolder.offset = offsets[relativeReadIndex]; + extrasHolder.cryptoData = cryptoDatas[relativeReadIndex]; + + largestDequeuedTimestampUs = Math.max(largestDequeuedTimestampUs, buffer.timeUs); + queueSize--; + relativeReadIndex++; + absoluteReadIndex++; + if (relativeReadIndex == capacity) { + // Wrap around. + relativeReadIndex = 0; + } + + extrasHolder.nextOffset = queueSize > 0 ? offsets[relativeReadIndex] + : extrasHolder.offset + extrasHolder.size; + return C.RESULT_BUFFER_READ; + } + + /** + * Skips all samples in the buffer. + * + * @return The offset up to which data should be dropped, or {@link C#POSITION_UNSET} if no + * dropping of data is required. + */ + public synchronized long skipAll() { + if (queueSize == 0) { + return C.POSITION_UNSET; + } + + int lastSampleIndex = (relativeReadIndex + queueSize - 1) % capacity; + relativeReadIndex = (relativeReadIndex + queueSize) % capacity; + absoluteReadIndex += queueSize; + queueSize = 0; + return offsets[lastSampleIndex] + sizes[lastSampleIndex]; + } + + /** + * Attempts to locate the keyframe before or at the specified time. If + * {@code allowTimeBeyondBuffer} is {@code false} then it is also required that {@code timeUs} + * falls within the buffer. + * + * @param timeUs The seek time. + * @param allowTimeBeyondBuffer Whether the skip can succeed if {@code timeUs} is beyond the end + * of the buffer. + * @return The offset of the keyframe's data if the keyframe was present. + * {@link C#POSITION_UNSET} otherwise. + */ + public synchronized long skipToKeyframeBefore(long timeUs, boolean allowTimeBeyondBuffer) { + if (queueSize == 0 || timeUs < timesUs[relativeReadIndex]) { + return C.POSITION_UNSET; + } + + if (timeUs > largestQueuedTimestampUs && !allowTimeBeyondBuffer) { + return C.POSITION_UNSET; + } + + // This could be optimized to use a binary search, however in practice callers to this method + // often pass times near to the start of the buffer. Hence it's unclear whether switching to + // a binary search would yield any real benefit. + int sampleCount = 0; + int sampleCountToKeyframe = -1; + int searchIndex = relativeReadIndex; + while (searchIndex != relativeWriteIndex) { + if (timesUs[searchIndex] > timeUs) { + // We've gone too far. + break; + } else if ((flags[searchIndex] & C.BUFFER_FLAG_KEY_FRAME) != 0) { + // We've found a keyframe, and we're still before the seek position. + sampleCountToKeyframe = sampleCount; + } + searchIndex = (searchIndex + 1) % capacity; + sampleCount++; + } + + if (sampleCountToKeyframe == -1) { + return C.POSITION_UNSET; + } + + relativeReadIndex = (relativeReadIndex + sampleCountToKeyframe) % capacity; + absoluteReadIndex += sampleCountToKeyframe; + queueSize -= sampleCountToKeyframe; + return offsets[relativeReadIndex]; + } + + // Called by the loading thread. + + public synchronized boolean format(Format format) { + if (format == null) { + upstreamFormatRequired = true; + return false; + } + upstreamFormatRequired = false; + if (Util.areEqual(format, upstreamFormat)) { + // Suppress changes between equal formats so we can use referential equality in readData. + return false; + } else { + upstreamFormat = format; + return true; + } + } + + public synchronized void commitSample(long timeUs, @C.BufferFlags int sampleFlags, long offset, + int size, CryptoData cryptoData) { + if (upstreamKeyframeRequired) { + if ((sampleFlags & C.BUFFER_FLAG_KEY_FRAME) == 0) { + return; + } + upstreamKeyframeRequired = false; + } + Assertions.checkState(!upstreamFormatRequired); + commitSampleTimestamp(timeUs); + timesUs[relativeWriteIndex] = timeUs; + offsets[relativeWriteIndex] = offset; + sizes[relativeWriteIndex] = size; + flags[relativeWriteIndex] = sampleFlags; + cryptoDatas[relativeWriteIndex] = cryptoData; + formats[relativeWriteIndex] = upstreamFormat; + sourceIds[relativeWriteIndex] = upstreamSourceId; + // Increment the write index. + queueSize++; + if (queueSize == capacity) { + // Increase the capacity. + int newCapacity = capacity + SAMPLE_CAPACITY_INCREMENT; + int[] newSourceIds = new int[newCapacity]; + long[] newOffsets = new long[newCapacity]; + long[] newTimesUs = new long[newCapacity]; + int[] newFlags = new int[newCapacity]; + int[] newSizes = new int[newCapacity]; + CryptoData[] newCryptoDatas = new CryptoData[newCapacity]; + Format[] newFormats = new Format[newCapacity]; + int beforeWrap = capacity - relativeReadIndex; + System.arraycopy(offsets, relativeReadIndex, newOffsets, 0, beforeWrap); + System.arraycopy(timesUs, relativeReadIndex, newTimesUs, 0, beforeWrap); + System.arraycopy(flags, relativeReadIndex, newFlags, 0, beforeWrap); + System.arraycopy(sizes, relativeReadIndex, newSizes, 0, beforeWrap); + System.arraycopy(cryptoDatas, relativeReadIndex, newCryptoDatas, 0, beforeWrap); + System.arraycopy(formats, relativeReadIndex, newFormats, 0, beforeWrap); + System.arraycopy(sourceIds, relativeReadIndex, newSourceIds, 0, beforeWrap); + int afterWrap = relativeReadIndex; + System.arraycopy(offsets, 0, newOffsets, beforeWrap, afterWrap); + System.arraycopy(timesUs, 0, newTimesUs, beforeWrap, afterWrap); + System.arraycopy(flags, 0, newFlags, beforeWrap, afterWrap); + System.arraycopy(sizes, 0, newSizes, beforeWrap, afterWrap); + System.arraycopy(cryptoDatas, 0, newCryptoDatas, beforeWrap, afterWrap); + System.arraycopy(formats, 0, newFormats, beforeWrap, afterWrap); + System.arraycopy(sourceIds, 0, newSourceIds, beforeWrap, afterWrap); + offsets = newOffsets; + timesUs = newTimesUs; + flags = newFlags; + sizes = newSizes; + cryptoDatas = newCryptoDatas; + formats = newFormats; + sourceIds = newSourceIds; + relativeReadIndex = 0; + relativeWriteIndex = capacity; + queueSize = capacity; + capacity = newCapacity; + } else { + relativeWriteIndex++; + if (relativeWriteIndex == capacity) { + // Wrap around. + relativeWriteIndex = 0; + } + } + } + + public synchronized void commitSampleTimestamp(long timeUs) { + largestQueuedTimestampUs = Math.max(largestQueuedTimestampUs, timeUs); + } + + /** + * Attempts to discard samples from the tail of the queue to allow samples starting from the + * specified timestamp to be spliced in. + * + * @param timeUs The timestamp at which the splice occurs. + * @return Whether the splice was successful. + */ + public synchronized boolean attemptSplice(long timeUs) { + if (largestDequeuedTimestampUs >= timeUs) { + return false; + } + int retainCount = queueSize; + while (retainCount > 0 + && timesUs[(relativeReadIndex + retainCount - 1) % capacity] >= timeUs) { + retainCount--; + } + discardUpstreamSamples(absoluteReadIndex + retainCount); + return true; + } + +}