Fix thread-safety issue using DefaultTrackOutput.

Reading the format and/or a sample needs to be done as a
single operation. Else you can have a situation where the
queue is initially empty, and this happens:

1) Read downstream format X
2) Read downstream format X (unchanged)
3) Write format Y
4) Write first sample
5) Read first sample

The first sample then appears to be format X rather than Y.

Note that readData in the SampleSource implementations always
looks roughly the same. readReset is identical in all cases.
isReady is identical in all cases now I've fixed them to be
that way. So it should be pretty easy to get DefaultTrackOutput
to implement TrackStream directly, at which point a whole load
of duplication will disappear.
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=120546377
This commit is contained in:
olly 2016-04-22 07:41:13 -07:00 committed by Oliver Woodman
parent 90b7081824
commit 9a893e3003
4 changed files with 129 additions and 143 deletions

View File

@ -67,7 +67,6 @@ public class ChunkSampleSource implements SampleSource, TrackStream, Loader.Call
private TrackGroupArray trackGroups;
private boolean trackEnabled;
private boolean pendingReset;
private Format downstreamSampleFormat;
private long downstreamPositionUs;
private long lastSeekPositionUs;
@ -196,7 +195,7 @@ public class ChunkSampleSource implements SampleSource, TrackStream, Loader.Call
loadControl.register(this, bufferSizeContribution);
}
downstreamFormat = null;
downstreamSampleFormat = null;
sampleQueue.needDownstreamFormat();
downstreamPositionUs = positionUs;
lastSeekPositionUs = positionUs;
pendingReset = false;
@ -265,7 +264,7 @@ public class ChunkSampleSource implements SampleSource, TrackStream, Loader.Call
@Override
public boolean isReady() {
return loadingFinished || !sampleQueue.isEmpty();
return loadingFinished || (!isPendingReset() && !sampleQueue.isEmpty());
}
@Override
@ -289,44 +288,33 @@ public class ChunkSampleSource implements SampleSource, TrackStream, Loader.Call
return NOTHING_READ;
}
BaseMediaChunk currentChunk = mediaChunks.getFirst();
while (mediaChunks.size() > 1
&& mediaChunks.get(1).getFirstSampleIndex() <= sampleQueue.getReadIndex()) {
mediaChunks.removeFirst();
currentChunk = mediaChunks.getFirst();
}
if (downstreamFormat == null || !downstreamFormat.equals(currentChunk.format)) {
eventDispatcher.downstreamFormatChanged(currentChunk.format, currentChunk.trigger,
BaseMediaChunk currentChunk = mediaChunks.getFirst();
Format currentFormat = currentChunk.format;
if (downstreamFormat == null || !downstreamFormat.equals(currentFormat)) {
eventDispatcher.downstreamFormatChanged(currentFormat, currentChunk.trigger,
currentChunk.startTimeUs);
downstreamFormat = currentChunk.format;
downstreamFormat = currentFormat;
}
if (sampleQueue.isEmpty()) {
if (loadingFinished) {
buffer.addFlag(C.BUFFER_FLAG_END_OF_STREAM);
return BUFFER_READ;
}
return NOTHING_READ;
int result = sampleQueue.readData(formatHolder, buffer, loadingFinished);
switch (result) {
case FORMAT_READ:
formatHolder.drmInitData = currentChunk.getDrmInitData();
break;
case BUFFER_READ:
if (!buffer.isEndOfStream()) {
if (buffer.timeUs < lastSeekPositionUs) {
buffer.addFlag(C.BUFFER_FLAG_DECODE_ONLY);
}
onSampleRead(currentChunk, buffer);
}
break;
}
Format sampleFormat = sampleQueue.getDownstreamFormat();
if (!sampleFormat.equals(downstreamSampleFormat)) {
formatHolder.format = sampleFormat;
formatHolder.drmInitData = currentChunk.getDrmInitData();
downstreamSampleFormat = sampleFormat;
return FORMAT_READ;
}
if (sampleQueue.readSample(buffer)) {
if (buffer.timeUs < lastSeekPositionUs) {
buffer.addFlag(C.BUFFER_FLAG_DECODE_ONLY);
}
onSampleRead(currentChunk, buffer);
return BUFFER_READ;
}
return NOTHING_READ;
return result;
}
// Loader.Callback implementation.

View File

@ -18,6 +18,8 @@ package com.google.android.exoplayer.extractor;
import com.google.android.exoplayer.C;
import com.google.android.exoplayer.DecoderInputBuffer;
import com.google.android.exoplayer.Format;
import com.google.android.exoplayer.FormatHolder;
import com.google.android.exoplayer.TrackStream;
import com.google.android.exoplayer.upstream.Allocation;
import com.google.android.exoplayer.upstream.Allocator;
import com.google.android.exoplayer.util.Assertions;
@ -46,6 +48,7 @@ public final class DefaultTrackOutput implements TrackOutput {
// Accessed only by the consuming thread.
private long totalBytesDropped;
private Format downstreamFormat;
// Accessed only by the loading thread (or the consuming thread when there is no loading thread).
private long sampleOffsetUs;
@ -55,9 +58,6 @@ public final class DefaultTrackOutput implements TrackOutput {
private boolean needKeyframe;
private boolean pendingSplice;
// Accessed by both the loading and consuming threads.
private volatile Format upstreamFormat;
/**
* @param allocator An {@link Allocator} from which allocations for sample data can be obtained.
*/
@ -89,6 +89,14 @@ public final class DefaultTrackOutput implements TrackOutput {
needKeyframe = true;
}
/**
* Indicates that {@link #readData(FormatHolder, DecoderInputBuffer, boolean)} should provide
* the sample format before any samples, even if it has already been provided.
*/
public void needDownstreamFormat() {
downstreamFormat = null;
}
/**
* Indicates that samples subsequently queued to the buffer should be spliced into those already
* queued.
@ -157,18 +165,10 @@ public final class DefaultTrackOutput implements TrackOutput {
}
/**
* Returns the current upstream {@link Format}.
* Returns the upstream {@link Format} in which samples are being queued.
*/
public Format getUpstreamFormat() {
return upstreamFormat;
}
/**
* Returns the current downstream {@link Format}.
*/
public Format getDownstreamFormat() {
Format nextSampleFormat = infoQueue.peekFormat();
return nextSampleFormat != null ? nextSampleFormat : upstreamFormat;
return infoQueue.getUpstreamFormat();
}
/**
@ -212,28 +212,44 @@ public final class DefaultTrackOutput implements TrackOutput {
}
/**
* Reads the current sample, advancing the read index to the next sample.
* Attempts to read from the queue.
*
* @param buffer The buffer into which the current sample should be written.
* @return True if a sample was read. False if there is no current sample.
* @param formatHolder A {@link FormatHolder} to populate in the case of reading a format.
* @param buffer A {@link DecoderInputBuffer} to populate in the case of reading a sample or the
* end of the stream. If the caller requires the sample data then it must ensure that
* {@link DecoderInputBuffer#data} references a valid buffer. If the end of the stream has
* been reached, the {@link C#BUFFER_FLAG_END_OF_STREAM} flag will be set on the buffer.
* @param loadingFinished True if an empty queue should be considered the end of the stream.
* @return The result, which can be {@link TrackStream#NOTHING_READ},
* {@link TrackStream#FORMAT_READ} or {@link TrackStream#BUFFER_READ}.
*/
public boolean readSample(DecoderInputBuffer buffer) {
public int readData(FormatHolder formatHolder, DecoderInputBuffer buffer,
boolean loadingFinished) {
// Write the sample information into the buffer and extrasHolder.
boolean haveSample = infoQueue.readSample(buffer, extrasHolder);
if (!haveSample) {
return false;
int result = infoQueue.readData(formatHolder, buffer, downstreamFormat, extrasHolder);
switch (result) {
case TrackStream.NOTHING_READ:
if (loadingFinished) {
buffer.setFlags(C.BUFFER_FLAG_END_OF_STREAM);
return TrackStream.BUFFER_READ;
}
return TrackStream.NOTHING_READ;
case TrackStream.FORMAT_READ:
downstreamFormat = formatHolder.format;
break;
case TrackStream.BUFFER_READ:
// Read encryption data if the sample is encrypted.
if (buffer.isEncrypted()) {
readEncryptionData(buffer, extrasHolder);
}
// Write the sample data into the holder.
buffer.ensureSpaceForWrite(buffer.size);
readData(extrasHolder.offset, buffer.data, buffer.size);
// Advance the read head.
dropDownstreamTo(extrasHolder.nextOffset);
break;
}
// Read encryption data if the sample is encrypted.
if (buffer.isEncrypted()) {
readEncryptionData(buffer, extrasHolder);
}
// Write the sample data into the holder.
buffer.ensureSpaceForWrite(buffer.size);
readData(extrasHolder.offset, buffer.data, buffer.size);
// Advance the read head.
dropDownstreamTo(extrasHolder.nextOffset);
return true;
return result;
}
/**
@ -392,7 +408,7 @@ public final class DefaultTrackOutput implements TrackOutput {
@Override
public void format(Format format) {
upstreamFormat = getAdjustedSampleFormat(format, sampleOffsetUs);
infoQueue.format(getAdjustedSampleFormat(format, sampleOffsetUs));
}
@Override
@ -444,7 +460,7 @@ public final class DefaultTrackOutput implements TrackOutput {
}
timeUs += sampleOffsetUs;
long absoluteOffset = totalBytesWritten - size - offset;
infoQueue.commitSample(timeUs, flags, absoluteOffset, size, encryptionKey, upstreamFormat);
infoQueue.commitSample(timeUs, flags, absoluteOffset, size, encryptionKey);
}
/**
@ -500,6 +516,7 @@ public final class DefaultTrackOutput implements TrackOutput {
private long largestDequeuedTimestampUs;
private long largestQueuedTimestampUs;
private Format upstreamFormat;
public InfoQueue() {
capacity = SAMPLE_CAPACITY_INCREMENT;
@ -577,18 +594,18 @@ public final class DefaultTrackOutput implements TrackOutput {
return absoluteReadIndex;
}
/**
* Returns whether the queue is empty.
*/
public synchronized boolean isEmpty() {
return queueSize == 0;
}
/**
* Returns the {@link Format} of the next sample, or null of the queue is empty.
* Returns the upstream {@link Format} in which samples are being queued.
*/
public synchronized Format peekFormat() {
if (queueSize == 0) {
return null;
}
return formats[relativeReadIndex];
public synchronized Format getUpstreamFormat() {
return upstreamFormat;
}
/**
@ -606,23 +623,34 @@ public final class DefaultTrackOutput implements TrackOutput {
}
/**
* Fills {@code buffer} with information about the current sample, but does not write its data.
* The absolute position of the sample's data in the rolling buffer is stored in
* {@code extrasHolder}, along with an encryption id if present, and the absolute position of
* the first byte that may still be required after the current sample has been read.
* <p>
* Populates {@link DecoderInputBuffer#size}, {@link DecoderInputBuffer#timeUs}, the buffer
* flags and {@code extrasHolder}.
* Attempts to read from the queue.
*
* @param buffer The buffer into which the current sample information should be written.
* @param formatHolder A {@link FormatHolder} to populate in the case of reading a format.
* @param buffer A {@link DecoderInputBuffer} to populate in the case of reading a sample or the
* end of the stream. If a sample is read then the buffer is populated with information
* about the sample, but not its data. The absolute position of the data in the rolling
* buffer is stored in {@code extrasHolder}, along with an encryption id if present and the
* absolute position of the first byte that may still be required after the current sample
* has been read.
* @param extrasHolder The holder into which extra sample information should be written.
* @return True if the buffer and extras were filled. False if there is no current sample.
* @return The result, which can be {@link TrackStream#NOTHING_READ},
* {@link TrackStream#FORMAT_READ} or {@link TrackStream#BUFFER_READ}.
*/
public synchronized boolean readSample(DecoderInputBuffer buffer,
BufferExtrasHolder extrasHolder) {
public synchronized int readData(FormatHolder formatHolder, DecoderInputBuffer buffer,
Format downstreamFormat, BufferExtrasHolder extrasHolder) {
if (queueSize == 0) {
return false;
if (upstreamFormat != null && !upstreamFormat.equals(downstreamFormat)) {
formatHolder.format = upstreamFormat;
return TrackStream.FORMAT_READ;
}
return TrackStream.NOTHING_READ;
}
if (!formats[relativeReadIndex].equals(downstreamFormat)) {
formatHolder.format = formats[relativeReadIndex];
return TrackStream.FORMAT_READ;
}
buffer.timeUs = timesUs[relativeReadIndex];
buffer.size = sizes[relativeReadIndex];
buffer.setFlags(flags[relativeReadIndex]);
@ -640,7 +668,7 @@ public final class DefaultTrackOutput implements TrackOutput {
extrasHolder.nextOffset = queueSize > 0 ? offsets[relativeReadIndex]
: extrasHolder.offset + buffer.size;
return true;
return TrackStream.BUFFER_READ;
}
/**
@ -708,14 +736,18 @@ public final class DefaultTrackOutput implements TrackOutput {
// Called by the loading thread.
public synchronized void format(Format format) {
upstreamFormat = format;
}
public synchronized void commitSample(long timeUs, int sampleFlags, long offset, int size,
byte[] encryptionKey, Format format) {
byte[] encryptionKey) {
timesUs[relativeWriteIndex] = timeUs;
offsets[relativeWriteIndex] = offset;
sizes[relativeWriteIndex] = size;
flags[relativeWriteIndex] = sampleFlags;
encryptionKeys[relativeWriteIndex] = encryptionKey;
formats[relativeWriteIndex] = format;
formats[relativeWriteIndex] = upstreamFormat;
largestQueuedTimestampUs = Math.max(largestQueuedTimestampUs, timeUs);
// Increment the write index.
queueSize++;

View File

@ -212,7 +212,6 @@ public final class ExtractorSampleSource implements SampleSource, ExtractorOutpu
private DefaultTrackOutput[] sampleQueues;
private TrackGroupArray tracks;
private long durationUs;
private boolean[] pendingMediaFormat;
private boolean[] trackEnabledStates;
private long downstreamPositionUs;
@ -332,7 +331,6 @@ public final class ExtractorSampleSource implements SampleSource, ExtractorOutpu
int trackCount = sampleQueues.length;
TrackGroup[] trackArray = new TrackGroup[trackCount];
trackEnabledStates = new boolean[trackCount];
pendingMediaFormat = new boolean[trackCount];
durationUs = seekMap.getDurationUs();
for (int i = 0; i < trackCount; i++) {
trackArray[i] = new TrackGroup(sampleQueues[i].getUpstreamFormat());
@ -384,7 +382,7 @@ public final class ExtractorSampleSource implements SampleSource, ExtractorOutpu
Assertions.checkState(!trackEnabledStates[track]);
enabledTrackCount++;
trackEnabledStates[track] = true;
pendingMediaFormat[track] = true;
sampleQueues[track].needDownstreamFormat();
newStreams[i] = new TrackStreamImpl(track);
}
// Cancel or start requests as necessary.
@ -449,8 +447,7 @@ public final class ExtractorSampleSource implements SampleSource, ExtractorOutpu
// TrackStream methods.
/* package */ boolean isReady(int track) {
Assertions.checkState(trackEnabledStates[track]);
return sampleQueues[track].isEmpty();
return loadingFinished || (!isPendingReset() && !sampleQueues[track].isEmpty());
}
@ -466,27 +463,18 @@ public final class ExtractorSampleSource implements SampleSource, ExtractorOutpu
return TrackStream.NOTHING_READ;
}
DefaultTrackOutput sampleQueue = sampleQueues[track];
if (pendingMediaFormat[track]) {
formatHolder.format = sampleQueue.getUpstreamFormat();
formatHolder.drmInitData = drmInitData;
pendingMediaFormat[track] = false;
return TrackStream.FORMAT_READ;
int result = sampleQueues[track].readData(formatHolder, buffer, loadingFinished);
switch (result) {
case TrackStream.FORMAT_READ:
formatHolder.drmInitData = drmInitData;
break;
case TrackStream.BUFFER_READ:
if (!buffer.isEndOfStream() && buffer.timeUs < lastSeekPositionUs) {
buffer.addFlag(C.BUFFER_FLAG_DECODE_ONLY);
}
break;
}
if (sampleQueue.readSample(buffer)) {
if (buffer.timeUs < lastSeekPositionUs) {
buffer.addFlag(C.BUFFER_FLAG_DECODE_ONLY);
}
return TrackStream.BUFFER_READ;
}
if (loadingFinished) {
buffer.addFlag(C.BUFFER_FLAG_END_OF_STREAM);
return TrackStream.BUFFER_READ;
}
return TrackStream.NOTHING_READ;
return result;
}
// Loader.Callback implementation.

View File

@ -79,7 +79,6 @@ public final class HlsSampleSource implements SampleSource, Loader.Callback {
private int primaryTrackGroupIndex;
// Indexed by group.
private boolean[] groupEnabledStates;
private Format[] downstreamSampleFormats;
private long downstreamPositionUs;
private long lastSeekPositionUs;
@ -202,7 +201,7 @@ public final class HlsSampleSource implements SampleSource, Loader.Callback {
int group = selection.group;
int[] tracks = selection.getTracks();
setTrackGroupEnabledState(group, true);
downstreamSampleFormats[group] = null;
sampleQueues[group].needDownstreamFormat();
if (group == primaryTrackGroupIndex) {
primaryTracksDeselected |= chunkSource.selectTracks(tracks);
}
@ -291,7 +290,6 @@ public final class HlsSampleSource implements SampleSource, Loader.Callback {
// TrackStream implementation.
/* package */ boolean isReady(int group) {
Assertions.checkState(groupEnabledStates[group]);
return loadingFinished || (!isPendingReset() && !sampleQueues[group].isEmpty());
}
@ -305,6 +303,9 @@ public final class HlsSampleSource implements SampleSource, Loader.Callback {
return TrackStream.NOTHING_READ;
}
while (mediaChunks.size() > 1 && mediaChunks.get(1).startTimeUs <= downstreamPositionUs) {
mediaChunks.removeFirst();
}
HlsMediaChunk currentChunk = mediaChunks.getFirst();
Format currentFormat = currentChunk.format;
if (downstreamFormat == null || !downstreamFormat.equals(currentFormat)) {
@ -313,34 +314,12 @@ public final class HlsSampleSource implements SampleSource, Loader.Callback {
downstreamFormat = currentFormat;
}
DefaultTrackOutput sampleQueue = sampleQueues[group];
if (sampleQueue.isEmpty()) {
if (loadingFinished) {
buffer.addFlag(C.BUFFER_FLAG_END_OF_STREAM);
return TrackStream.BUFFER_READ;
}
return TrackStream.NOTHING_READ;
int result = sampleQueues[group].readData(formatHolder, buffer, loadingFinished);
if (result == TrackStream.BUFFER_READ && !buffer.isEndOfStream()
&& buffer.timeUs < lastSeekPositionUs) {
buffer.addFlag(C.BUFFER_FLAG_DECODE_ONLY);
}
Format sampleFormat = sampleQueue.getDownstreamFormat();
if (!sampleFormat.equals(downstreamSampleFormats[group])) {
formatHolder.format = sampleFormat;
downstreamSampleFormats[group] = sampleFormat;
return TrackStream.FORMAT_READ;
}
if (sampleQueue.readSample(buffer)) {
long sampleTimeUs = buffer.timeUs;
while (mediaChunks.size() > 1 && mediaChunks.get(1).startTimeUs <= sampleTimeUs) {
mediaChunks.removeFirst();
}
if (sampleTimeUs < lastSeekPositionUs) {
buffer.addFlag(C.BUFFER_FLAG_DECODE_ONLY);
}
return TrackStream.BUFFER_READ;
}
return TrackStream.NOTHING_READ;
return result;
}
// Loader.Callback implementation.
@ -463,7 +442,6 @@ public final class HlsSampleSource implements SampleSource, Loader.Callback {
// Instantiate the necessary internal data-structures.
primaryTrackGroupIndex = -1;
groupEnabledStates = new boolean[extractorTrackCount];
downstreamSampleFormats = new Format[extractorTrackCount];
// Construct the set of exposed track groups.
TrackGroup[] trackGroups = new TrackGroup[extractorTrackCount];