Support chunked requests.

This commit is contained in:
Oliver Woodman 2014-08-14 15:44:09 +01:00
parent af6e144adc
commit 4e7b333aee
8 changed files with 135 additions and 59 deletions

View File

@ -57,24 +57,27 @@ public class ChunkSampleSource implements SampleSource, Loader.Listener {
* load is for initialization data.
* @param mediaEndTimeMs The media time of the end of the data being loaded, or -1 if this
* load is for initialization data.
* @param totalBytes The length of the data being loaded in bytes.
* @param length The length of the data being loaded in bytes, or {@link C#LENGTH_UNBOUNDED} if
* the length of the data has not yet been determined.
*/
void onLoadStarted(int sourceId, String formatId, int trigger, boolean isInitialization,
int mediaStartTimeMs, int mediaEndTimeMs, long totalBytes);
int mediaStartTimeMs, int mediaEndTimeMs, long length);
/**
* Invoked when the current load operation completes.
*
* @param sourceId The id of the reporting {@link SampleSource}.
* @param bytesLoaded The number of bytes that were loaded.
*/
void onLoadCompleted(int sourceId);
void onLoadCompleted(int sourceId, long bytesLoaded);
/**
* Invoked when the current upstream load operation is canceled.
*
* @param sourceId The id of the reporting {@link SampleSource}.
* @param bytesLoaded The number of bytes that were loaded prior to the cancellation.
*/
void onLoadCanceled(int sourceId);
void onLoadCanceled(int sourceId, long bytesLoaded);
/**
* Invoked when data is removed from the back of the buffer, typically so that it can be
@ -83,10 +86,10 @@ public class ChunkSampleSource implements SampleSource, Loader.Listener {
* @param sourceId The id of the reporting {@link SampleSource}.
* @param mediaStartTimeMs The media time of the start of the discarded data.
* @param mediaEndTimeMs The media time of the end of the discarded data.
* @param totalBytes The length of the data being discarded in bytes.
* @param bytesDiscarded The length of the data being discarded in bytes.
*/
void onUpstreamDiscarded(int sourceId, int mediaStartTimeMs, int mediaEndTimeMs,
long totalBytes);
long bytesDiscarded);
/**
* Invoked when an error occurs loading media data.
@ -111,10 +114,10 @@ public class ChunkSampleSource implements SampleSource, Loader.Listener {
* @param sourceId The id of the reporting {@link SampleSource}.
* @param mediaStartTimeMs The media time of the start of the discarded data.
* @param mediaEndTimeMs The media time of the end of the discarded data.
* @param totalBytes The length of the data being discarded in bytes.
* @param bytesDiscarded The length of the data being discarded in bytes.
*/
void onDownstreamDiscarded(int sourceId, int mediaStartTimeMs, int mediaEndTimeMs,
long totalBytes);
long bytesDiscarded);
/**
* Invoked when the downstream format changes (i.e. when the format being supplied to the
@ -409,6 +412,7 @@ public class ChunkSampleSource implements SampleSource, Loader.Listener {
@Override
public void onLoaded() {
Chunk currentLoadable = currentLoadableHolder.chunk;
notifyLoadCompleted(currentLoadable.bytesLoaded());
try {
currentLoadable.consume();
} catch (IOException e) {
@ -424,7 +428,6 @@ public class ChunkSampleSource implements SampleSource, Loader.Listener {
if (!currentLoadableExceptionFatal) {
clearCurrentLoadable();
}
notifyLoadCompleted();
updateLoadControl();
}
}
@ -432,11 +435,11 @@ public class ChunkSampleSource implements SampleSource, Loader.Listener {
@Override
public void onCanceled() {
Chunk currentLoadable = currentLoadableHolder.chunk;
notifyLoadCanceled(currentLoadable.bytesLoaded());
if (!isMediaChunk(currentLoadable)) {
currentLoadable.release();
}
clearCurrentLoadable();
notifyLoadCanceled();
if (state == STATE_ENABLED) {
restartFrom(pendingResetTime);
} else {
@ -677,35 +680,35 @@ public class ChunkSampleSource implements SampleSource, Loader.Listener {
private void notifyLoadStarted(final String formatId, final int trigger,
final boolean isInitialization, final long mediaStartTimeUs, final long mediaEndTimeUs,
final long totalBytes) {
final long length) {
if (eventHandler != null && eventListener != null) {
eventHandler.post(new Runnable() {
@Override
public void run() {
eventListener.onLoadStarted(eventSourceId, formatId, trigger, isInitialization,
usToMs(mediaStartTimeUs), usToMs(mediaEndTimeUs), totalBytes);
usToMs(mediaStartTimeUs), usToMs(mediaEndTimeUs), length);
}
});
}
}
private void notifyLoadCompleted() {
private void notifyLoadCompleted(final long bytesLoaded) {
if (eventHandler != null && eventListener != null) {
eventHandler.post(new Runnable() {
@Override
public void run() {
eventListener.onLoadCompleted(eventSourceId);
eventListener.onLoadCompleted(eventSourceId, bytesLoaded);
}
});
}
}
private void notifyLoadCanceled() {
private void notifyLoadCanceled(final long bytesLoaded) {
if (eventHandler != null && eventListener != null) {
eventHandler.post(new Runnable() {
@Override
public void run() {
eventListener.onLoadCanceled(eventSourceId);
eventListener.onLoadCanceled(eventSourceId, bytesLoaded);
}
});
}
@ -760,13 +763,13 @@ public class ChunkSampleSource implements SampleSource, Loader.Listener {
}
private void notifyDownstreamDiscarded(final long mediaStartTimeUs, final long mediaEndTimeUs,
final long totalBytes) {
final long bytesDiscarded) {
if (eventHandler != null && eventListener != null) {
eventHandler.post(new Runnable() {
@Override
public void run() {
eventListener.onDownstreamDiscarded(eventSourceId, usToMs(mediaStartTimeUs),
usToMs(mediaEndTimeUs), totalBytes);
usToMs(mediaEndTimeUs), bytesDiscarded);
}
});
}

View File

@ -24,6 +24,28 @@ package com.google.android.exoplayer.upstream;
*/
public interface Allocation {
/**
* Ensures the allocation has a capacity greater than or equal to the specified size in bytes.
* <p>
* If {@code size} is greater than the current capacity of the allocation, then it will grow
* to have a capacity of at least {@code size}. The allocation is grown by adding new fragments.
* Existing fragments remain unchanged, and any data that has been written to them will be
* preserved.
* <p>
* If {@code size} is less than or equal to the capacity of the allocation, then the call is a
* no-op.
*
* @param size The minimum required capacity, in bytes.
*/
public void ensureCapacity(int size);
/**
* Gets the capacity of the allocation, in bytes.
*
* @return The capacity of the allocation, in bytes.
*/
public int capacity();
/**
* Gets the buffers in which the fragments are allocated.
*

View File

@ -67,15 +67,39 @@ public final class BufferPool implements Allocator {
@Override
public synchronized Allocation allocate(int size) {
return new AllocationImpl(allocate(size, null));
}
/**
* Allocates byte arrays whose combined length is at least {@code size}.
* <p>
* An existing array of byte arrays may be provided to form the start of the allocation.
*
* @param size The total size required, in bytes.
* @param existing Existing byte arrays to use as the start of the allocation. May be null.
* @return The allocated byte arrays.
*/
/* package */ synchronized byte[][] allocate(int size, byte[][] existing) {
int requiredBufferCount = requiredBufferCount(size);
allocatedBufferCount += requiredBufferCount;
if (existing != null && requiredBufferCount <= existing.length) {
// The existing buffers are sufficient.
return existing;
}
// We need to allocate additional buffers.
byte[][] buffers = new byte[requiredBufferCount][];
for (int i = 0; i < requiredBufferCount; i++) {
int firstNewBufferIndex = 0;
if (existing != null) {
firstNewBufferIndex = existing.length;
System.arraycopy(existing, 0, buffers, 0, firstNewBufferIndex);
}
// Allocate the new buffers
allocatedBufferCount += requiredBufferCount - firstNewBufferIndex;
for (int i = firstNewBufferIndex; i < requiredBufferCount; i++) {
// Use a recycled buffer if one is available. Else instantiate a new one.
buffers[i] = recycledBufferCount > 0 ? recycledBuffers[--recycledBufferCount] :
new byte[bufferLength];
}
return new AllocationImpl(buffers);
return buffers;
}
/**
@ -112,6 +136,16 @@ public final class BufferPool implements Allocator {
this.buffers = buffers;
}
@Override
public void ensureCapacity(int size) {
buffers = allocate(size, buffers);
}
@Override
public int capacity() {
return bufferLength * buffers.length;
}
@Override
public byte[][] getBuffers() {
return buffers;

View File

@ -37,8 +37,9 @@ public interface DataSource {
* @throws IOException If an error occurs opening the source.
* @return The number of bytes that can be read from the opened source. For unbounded requests
* (i.e. requests where {@link DataSpec#length} equals {@link C#LENGTH_UNBOUNDED}) this value
* is the resolved length of the request. For all other requests, the value returned will be
* equal to the request's {@link DataSpec#length}.
* is the resolved length of the request, or {@link C#LENGTH_UNBOUNDED} if the length is still
* unresolved. For all other requests, the value returned will be equal to the request's
* {@link DataSpec#length}.
*/
public long open(DataSpec dataSpec) throws IOException;

View File

@ -40,6 +40,8 @@ public final class DataSourceStream implements Loadable, NonBlockingInputStream
}
private static final int CHUNKED_ALLOCATION_INCREMENT = 256 * 1024;
private final DataSource dataSource;
private final DataSpec dataSpec;
private final Allocator allocator;
@ -58,7 +60,7 @@ public final class DataSourceStream implements Loadable, NonBlockingInputStream
/**
* @param dataSource The source from which the data should be loaded.
* @param dataSpec Defines the data to be loaded. {@code dataSpec.length} must not exceed
* {@link Integer#MAX_VALUE}. If {@code dataSpec.length == DataSpec.LENGTH_UNBOUNDED} then
* {@link Integer#MAX_VALUE}. If {@code dataSpec.length == C.LENGTH_UNBOUNDED} then
* the length resolved by {@code dataSource.open(dataSpec)} must not exceed
* {@link Integer#MAX_VALUE}.
* @param allocator Used to obtain an {@link Allocation} for holding the data.
@ -98,7 +100,8 @@ public final class DataSourceStream implements Loadable, NonBlockingInputStream
}
/**
* Returns the length of the streamin bytes.
* Returns the length of the stream in bytes, or {@value C#LENGTH_UNBOUNDED} if the length has
* yet to be determined.
*
* @return The length of the stream in bytes, or {@value C#LENGTH_UNBOUNDED} if the length has
* yet to be determined.
@ -124,7 +127,7 @@ public final class DataSourceStream implements Loadable, NonBlockingInputStream
* Note: The read methods provide a more efficient way of consuming the loaded data. Use this
* method only when a freshly allocated byte[] containing all of the loaded data is required.
*
* @return The loaded data or null.
* @return The loaded data, or null.
*/
public final byte[] getLoadedData() {
if (loadPosition == 0) {
@ -192,6 +195,11 @@ public final class DataSourceStream implements Loadable, NonBlockingInputStream
int bytesRead = 0;
byte[][] buffers = allocation.getBuffers();
while (bytesRead < bytesToRead) {
if (readHead.fragmentRemaining == 0) {
readHead.fragmentIndex++;
readHead.fragmentOffset = allocation.getFragmentOffset(readHead.fragmentIndex);
readHead.fragmentRemaining = allocation.getFragmentLength(readHead.fragmentIndex);
}
int bufferReadLength = Math.min(readHead.fragmentRemaining, bytesToRead - bytesRead);
if (target != null) {
target.put(buffers[readHead.fragmentIndex], readHead.fragmentOffset, bufferReadLength);
@ -204,11 +212,6 @@ public final class DataSourceStream implements Loadable, NonBlockingInputStream
bytesRead += bufferReadLength;
readHead.fragmentOffset += bufferReadLength;
readHead.fragmentRemaining -= bufferReadLength;
if (readHead.fragmentRemaining == 0 && readHead.position < resolvedLength) {
readHead.fragmentIndex++;
readHead.fragmentOffset = allocation.getFragmentOffset(readHead.fragmentIndex);
readHead.fragmentRemaining = allocation.getFragmentLength(readHead.fragmentIndex);
}
}
return bytesRead;
@ -232,23 +235,32 @@ public final class DataSourceStream implements Loadable, NonBlockingInputStream
// The load was canceled, or is already complete.
return;
}
try {
DataSpec loadDataSpec;
if (resolvedLength == C.LENGTH_UNBOUNDED) {
if (loadPosition == 0 && resolvedLength == C.LENGTH_UNBOUNDED) {
loadDataSpec = dataSpec;
resolvedLength = dataSource.open(loadDataSpec);
long resolvedLength = dataSource.open(loadDataSpec);
if (resolvedLength > Integer.MAX_VALUE) {
throw new DataSourceStreamLoadException(
new UnexpectedLengthException(dataSpec.length, resolvedLength));
}
this.resolvedLength = resolvedLength;
} else {
long remainingLength = resolvedLength != C.LENGTH_UNBOUNDED
? resolvedLength - loadPosition : C.LENGTH_UNBOUNDED;
loadDataSpec = new DataSpec(dataSpec.uri, dataSpec.position + loadPosition,
resolvedLength - loadPosition, dataSpec.key);
remainingLength, dataSpec.key);
dataSource.open(loadDataSpec);
}
if (allocation == null) {
allocation = allocator.allocate((int) resolvedLength);
int initialAllocationSize = resolvedLength != C.LENGTH_UNBOUNDED
? (int) resolvedLength : CHUNKED_ALLOCATION_INCREMENT;
allocation = allocator.allocate(initialAllocationSize);
}
int allocationCapacity = allocation.capacity();
if (loadPosition == 0) {
writeFragmentIndex = 0;
writeFragmentOffset = allocation.getFragmentOffset(0);
@ -257,22 +269,28 @@ public final class DataSourceStream implements Loadable, NonBlockingInputStream
int read = Integer.MAX_VALUE;
byte[][] buffers = allocation.getBuffers();
while (!loadCanceled && loadPosition < resolvedLength && read > 0) {
while (!loadCanceled && read > 0 && maybeMoreToLoad()) {
if (Thread.interrupted()) {
throw new InterruptedException();
}
int writeLength = (int) Math.min(writeFragmentRemainingLength,
resolvedLength - loadPosition);
read = dataSource.read(buffers[writeFragmentIndex], writeFragmentOffset, writeLength);
read = dataSource.read(buffers[writeFragmentIndex], writeFragmentOffset,
writeFragmentRemainingLength);
if (read > 0) {
loadPosition += read;
writeFragmentOffset += read;
writeFragmentRemainingLength -= read;
if (writeFragmentRemainingLength == 0 && loadPosition < resolvedLength) {
if (writeFragmentRemainingLength == 0 && maybeMoreToLoad()) {
writeFragmentIndex++;
if (loadPosition == allocationCapacity) {
allocation.ensureCapacity(allocationCapacity + CHUNKED_ALLOCATION_INCREMENT);
allocationCapacity = allocation.capacity();
buffers = allocation.getBuffers();
}
writeFragmentOffset = allocation.getFragmentOffset(writeFragmentIndex);
writeFragmentRemainingLength = allocation.getFragmentLength(writeFragmentIndex);
}
} else if (resolvedLength == C.LENGTH_UNBOUNDED) {
resolvedLength = loadPosition;
} else if (resolvedLength != loadPosition) {
throw new DataSourceStreamLoadException(
new UnexpectedLengthException(resolvedLength, loadPosition));
@ -283,6 +301,10 @@ public final class DataSourceStream implements Loadable, NonBlockingInputStream
}
}
private boolean maybeMoreToLoad() {
return resolvedLength == C.LENGTH_UNBOUNDED || loadPosition < resolvedLength;
}
private static class ReadHead {
private int position;

View File

@ -260,13 +260,6 @@ public class HttpDataSource implements DataSource {
long contentLength = getContentLength(connection);
dataLength = dataSpec.length == C.LENGTH_UNBOUNDED ? contentLength : dataSpec.length;
if (dataLength == C.LENGTH_UNBOUNDED) {
// The DataSpec specified unbounded length and we failed to resolve a length from the
// response headers.
throw new HttpDataSourceException(
new UnexpectedLengthException(C.LENGTH_UNBOUNDED, C.LENGTH_UNBOUNDED),
dataSpec);
}
if (dataSpec.length != C.LENGTH_UNBOUNDED && contentLength != C.LENGTH_UNBOUNDED
&& contentLength != dataSpec.length) {
@ -306,9 +299,9 @@ public class HttpDataSource implements DataSource {
if (listener != null) {
listener.onBytesTransferred(read);
}
} else if (dataLength != bytesRead) {
} else if (dataLength != C.LENGTH_UNBOUNDED && dataLength != bytesRead) {
// Check for cases where the server closed the connection having not sent the correct amount
// of data.
// of data. We can only do this if we know the length of the data we were expecting.
throw new HttpDataSourceException(new UnexpectedLengthException(dataLength, bytesRead),
dataSpec);
}
@ -365,14 +358,15 @@ public class HttpDataSource implements DataSource {
}
/**
* Returns the number of bytes that are still to be read for the current {@link DataSpec}. This
* value is equivalent to {@code dataSpec.length - bytesRead()}, where dataSpec is the
* {@link DataSpec} that was passed to the most recent call of {@link #open(DataSpec)}.
* Returns the number of bytes that are still to be read for the current {@link DataSpec}.
* <p>
* If the total length of the data being read is known, then this length minus {@code bytesRead()}
* is returned. If the total length is unknown, {@link C#LENGTH_UNBOUNDED} is returned.
*
* @return The number of bytes remaining.
* @return The remaining length, or {@link C#LENGTH_UNBOUNDED}.
*/
protected final long bytesRemaining() {
return dataLength - bytesRead;
return dataLength == C.LENGTH_UNBOUNDED ? dataLength : dataLength - bytesRead;
}
private HttpURLConnection makeConnection(DataSpec dataSpec) throws IOException {
@ -436,10 +430,6 @@ public class HttpDataSource implements DataSource {
}
}
}
if (contentLength == C.LENGTH_UNBOUNDED) {
Log.w(TAG, "Unable to parse content length [" + contentLengthHeader + "] [" +
contentRangeHeader + "]");
}
return contentLength;
}

View File

@ -40,7 +40,7 @@ public final class TeeDataSource implements DataSource {
@Override
public long open(DataSpec dataSpec) throws IOException {
long dataLength = upstream.open(dataSpec);
if (dataSpec.length == C.LENGTH_UNBOUNDED) {
if (dataSpec.length == C.LENGTH_UNBOUNDED && dataLength != C.LENGTH_UNBOUNDED) {
// Reconstruct dataSpec in order to provide the resolved length to the sink.
dataSpec = new DataSpec(dataSpec.uri, dataSpec.absoluteStreamPosition, dataLength,
dataSpec.key, dataSpec.position, dataSpec.uriIsFullStream);

View File

@ -15,6 +15,7 @@
*/
package com.google.android.exoplayer.upstream.cache;
import com.google.android.exoplayer.C;
import com.google.android.exoplayer.upstream.DataSink;
import com.google.android.exoplayer.upstream.DataSpec;
import com.google.android.exoplayer.util.Assertions;
@ -63,6 +64,9 @@ public class CacheDataSink implements DataSink {
@Override
public DataSink open(DataSpec dataSpec) throws CacheDataSinkException {
// TODO: Support caching for unbounded requests. See TODO in {@link CacheDataSource} for
// more details.
Assertions.checkState(dataSpec.length != C.LENGTH_UNBOUNDED);
try {
this.dataSpec = dataSpec;
dataSpecBytesWritten = 0;