From 4e7b333aee2af26376230c054eee0d25c443134d Mon Sep 17 00:00:00 2001 From: Oliver Woodman Date: Thu, 14 Aug 2014 15:44:09 +0100 Subject: [PATCH] Support chunked requests. --- .../exoplayer/chunk/ChunkSampleSource.java | 39 +++++++------ .../exoplayer/upstream/Allocation.java | 22 ++++++++ .../exoplayer/upstream/BufferPool.java | 40 ++++++++++++- .../exoplayer/upstream/DataSource.java | 5 +- .../exoplayer/upstream/DataSourceStream.java | 56 +++++++++++++------ .../exoplayer/upstream/HttpDataSource.java | 26 +++------ .../exoplayer/upstream/TeeDataSource.java | 2 +- .../upstream/cache/CacheDataSink.java | 4 ++ 8 files changed, 135 insertions(+), 59 deletions(-) diff --git a/library/src/main/java/com/google/android/exoplayer/chunk/ChunkSampleSource.java b/library/src/main/java/com/google/android/exoplayer/chunk/ChunkSampleSource.java index 069f5c69dc..980de4ec23 100644 --- a/library/src/main/java/com/google/android/exoplayer/chunk/ChunkSampleSource.java +++ b/library/src/main/java/com/google/android/exoplayer/chunk/ChunkSampleSource.java @@ -57,24 +57,27 @@ public class ChunkSampleSource implements SampleSource, Loader.Listener { * load is for initialization data. * @param mediaEndTimeMs The media time of the end of the data being loaded, or -1 if this * load is for initialization data. - * @param totalBytes The length of the data being loaded in bytes. + * @param length The length of the data being loaded in bytes, or {@link C#LENGTH_UNBOUNDED} if + * the length of the data has not yet been determined. */ void onLoadStarted(int sourceId, String formatId, int trigger, boolean isInitialization, - int mediaStartTimeMs, int mediaEndTimeMs, long totalBytes); + int mediaStartTimeMs, int mediaEndTimeMs, long length); /** * Invoked when the current load operation completes. * * @param sourceId The id of the reporting {@link SampleSource}. + * @param bytesLoaded The number of bytes that were loaded. */ - void onLoadCompleted(int sourceId); + void onLoadCompleted(int sourceId, long bytesLoaded); /** * Invoked when the current upstream load operation is canceled. * * @param sourceId The id of the reporting {@link SampleSource}. + * @param bytesLoaded The number of bytes that were loaded prior to the cancellation. */ - void onLoadCanceled(int sourceId); + void onLoadCanceled(int sourceId, long bytesLoaded); /** * Invoked when data is removed from the back of the buffer, typically so that it can be @@ -83,10 +86,10 @@ public class ChunkSampleSource implements SampleSource, Loader.Listener { * @param sourceId The id of the reporting {@link SampleSource}. * @param mediaStartTimeMs The media time of the start of the discarded data. * @param mediaEndTimeMs The media time of the end of the discarded data. - * @param totalBytes The length of the data being discarded in bytes. + * @param bytesDiscarded The length of the data being discarded in bytes. */ void onUpstreamDiscarded(int sourceId, int mediaStartTimeMs, int mediaEndTimeMs, - long totalBytes); + long bytesDiscarded); /** * Invoked when an error occurs loading media data. @@ -111,10 +114,10 @@ public class ChunkSampleSource implements SampleSource, Loader.Listener { * @param sourceId The id of the reporting {@link SampleSource}. * @param mediaStartTimeMs The media time of the start of the discarded data. * @param mediaEndTimeMs The media time of the end of the discarded data. - * @param totalBytes The length of the data being discarded in bytes. + * @param bytesDiscarded The length of the data being discarded in bytes. */ void onDownstreamDiscarded(int sourceId, int mediaStartTimeMs, int mediaEndTimeMs, - long totalBytes); + long bytesDiscarded); /** * Invoked when the downstream format changes (i.e. when the format being supplied to the @@ -409,6 +412,7 @@ public class ChunkSampleSource implements SampleSource, Loader.Listener { @Override public void onLoaded() { Chunk currentLoadable = currentLoadableHolder.chunk; + notifyLoadCompleted(currentLoadable.bytesLoaded()); try { currentLoadable.consume(); } catch (IOException e) { @@ -424,7 +428,6 @@ public class ChunkSampleSource implements SampleSource, Loader.Listener { if (!currentLoadableExceptionFatal) { clearCurrentLoadable(); } - notifyLoadCompleted(); updateLoadControl(); } } @@ -432,11 +435,11 @@ public class ChunkSampleSource implements SampleSource, Loader.Listener { @Override public void onCanceled() { Chunk currentLoadable = currentLoadableHolder.chunk; + notifyLoadCanceled(currentLoadable.bytesLoaded()); if (!isMediaChunk(currentLoadable)) { currentLoadable.release(); } clearCurrentLoadable(); - notifyLoadCanceled(); if (state == STATE_ENABLED) { restartFrom(pendingResetTime); } else { @@ -677,35 +680,35 @@ public class ChunkSampleSource implements SampleSource, Loader.Listener { private void notifyLoadStarted(final String formatId, final int trigger, final boolean isInitialization, final long mediaStartTimeUs, final long mediaEndTimeUs, - final long totalBytes) { + final long length) { if (eventHandler != null && eventListener != null) { eventHandler.post(new Runnable() { @Override public void run() { eventListener.onLoadStarted(eventSourceId, formatId, trigger, isInitialization, - usToMs(mediaStartTimeUs), usToMs(mediaEndTimeUs), totalBytes); + usToMs(mediaStartTimeUs), usToMs(mediaEndTimeUs), length); } }); } } - private void notifyLoadCompleted() { + private void notifyLoadCompleted(final long bytesLoaded) { if (eventHandler != null && eventListener != null) { eventHandler.post(new Runnable() { @Override public void run() { - eventListener.onLoadCompleted(eventSourceId); + eventListener.onLoadCompleted(eventSourceId, bytesLoaded); } }); } } - private void notifyLoadCanceled() { + private void notifyLoadCanceled(final long bytesLoaded) { if (eventHandler != null && eventListener != null) { eventHandler.post(new Runnable() { @Override public void run() { - eventListener.onLoadCanceled(eventSourceId); + eventListener.onLoadCanceled(eventSourceId, bytesLoaded); } }); } @@ -760,13 +763,13 @@ public class ChunkSampleSource implements SampleSource, Loader.Listener { } private void notifyDownstreamDiscarded(final long mediaStartTimeUs, final long mediaEndTimeUs, - final long totalBytes) { + final long bytesDiscarded) { if (eventHandler != null && eventListener != null) { eventHandler.post(new Runnable() { @Override public void run() { eventListener.onDownstreamDiscarded(eventSourceId, usToMs(mediaStartTimeUs), - usToMs(mediaEndTimeUs), totalBytes); + usToMs(mediaEndTimeUs), bytesDiscarded); } }); } diff --git a/library/src/main/java/com/google/android/exoplayer/upstream/Allocation.java b/library/src/main/java/com/google/android/exoplayer/upstream/Allocation.java index 2b7619595b..b3fb38921a 100644 --- a/library/src/main/java/com/google/android/exoplayer/upstream/Allocation.java +++ b/library/src/main/java/com/google/android/exoplayer/upstream/Allocation.java @@ -24,6 +24,28 @@ package com.google.android.exoplayer.upstream; */ public interface Allocation { + /** + * Ensures the allocation has a capacity greater than or equal to the specified size in bytes. + *

+ * If {@code size} is greater than the current capacity of the allocation, then it will grow + * to have a capacity of at least {@code size}. The allocation is grown by adding new fragments. + * Existing fragments remain unchanged, and any data that has been written to them will be + * preserved. + *

+ * If {@code size} is less than or equal to the capacity of the allocation, then the call is a + * no-op. + * + * @param size The minimum required capacity, in bytes. + */ + public void ensureCapacity(int size); + + /** + * Gets the capacity of the allocation, in bytes. + * + * @return The capacity of the allocation, in bytes. + */ + public int capacity(); + /** * Gets the buffers in which the fragments are allocated. * diff --git a/library/src/main/java/com/google/android/exoplayer/upstream/BufferPool.java b/library/src/main/java/com/google/android/exoplayer/upstream/BufferPool.java index 979dc39e46..a7d847d5a1 100644 --- a/library/src/main/java/com/google/android/exoplayer/upstream/BufferPool.java +++ b/library/src/main/java/com/google/android/exoplayer/upstream/BufferPool.java @@ -67,15 +67,39 @@ public final class BufferPool implements Allocator { @Override public synchronized Allocation allocate(int size) { + return new AllocationImpl(allocate(size, null)); + } + + /** + * Allocates byte arrays whose combined length is at least {@code size}. + *

+ * An existing array of byte arrays may be provided to form the start of the allocation. + * + * @param size The total size required, in bytes. + * @param existing Existing byte arrays to use as the start of the allocation. May be null. + * @return The allocated byte arrays. + */ + /* package */ synchronized byte[][] allocate(int size, byte[][] existing) { int requiredBufferCount = requiredBufferCount(size); - allocatedBufferCount += requiredBufferCount; + if (existing != null && requiredBufferCount <= existing.length) { + // The existing buffers are sufficient. + return existing; + } + // We need to allocate additional buffers. byte[][] buffers = new byte[requiredBufferCount][]; - for (int i = 0; i < requiredBufferCount; i++) { + int firstNewBufferIndex = 0; + if (existing != null) { + firstNewBufferIndex = existing.length; + System.arraycopy(existing, 0, buffers, 0, firstNewBufferIndex); + } + // Allocate the new buffers + allocatedBufferCount += requiredBufferCount - firstNewBufferIndex; + for (int i = firstNewBufferIndex; i < requiredBufferCount; i++) { // Use a recycled buffer if one is available. Else instantiate a new one. buffers[i] = recycledBufferCount > 0 ? recycledBuffers[--recycledBufferCount] : new byte[bufferLength]; } - return new AllocationImpl(buffers); + return buffers; } /** @@ -112,6 +136,16 @@ public final class BufferPool implements Allocator { this.buffers = buffers; } + @Override + public void ensureCapacity(int size) { + buffers = allocate(size, buffers); + } + + @Override + public int capacity() { + return bufferLength * buffers.length; + } + @Override public byte[][] getBuffers() { return buffers; diff --git a/library/src/main/java/com/google/android/exoplayer/upstream/DataSource.java b/library/src/main/java/com/google/android/exoplayer/upstream/DataSource.java index eb269ef2dc..624e42a111 100644 --- a/library/src/main/java/com/google/android/exoplayer/upstream/DataSource.java +++ b/library/src/main/java/com/google/android/exoplayer/upstream/DataSource.java @@ -37,8 +37,9 @@ public interface DataSource { * @throws IOException If an error occurs opening the source. * @return The number of bytes that can be read from the opened source. For unbounded requests * (i.e. requests where {@link DataSpec#length} equals {@link C#LENGTH_UNBOUNDED}) this value - * is the resolved length of the request. For all other requests, the value returned will be - * equal to the request's {@link DataSpec#length}. + * is the resolved length of the request, or {@link C#LENGTH_UNBOUNDED} if the length is still + * unresolved. For all other requests, the value returned will be equal to the request's + * {@link DataSpec#length}. */ public long open(DataSpec dataSpec) throws IOException; diff --git a/library/src/main/java/com/google/android/exoplayer/upstream/DataSourceStream.java b/library/src/main/java/com/google/android/exoplayer/upstream/DataSourceStream.java index c786a05779..dc5227e426 100644 --- a/library/src/main/java/com/google/android/exoplayer/upstream/DataSourceStream.java +++ b/library/src/main/java/com/google/android/exoplayer/upstream/DataSourceStream.java @@ -40,6 +40,8 @@ public final class DataSourceStream implements Loadable, NonBlockingInputStream } + private static final int CHUNKED_ALLOCATION_INCREMENT = 256 * 1024; + private final DataSource dataSource; private final DataSpec dataSpec; private final Allocator allocator; @@ -58,7 +60,7 @@ public final class DataSourceStream implements Loadable, NonBlockingInputStream /** * @param dataSource The source from which the data should be loaded. * @param dataSpec Defines the data to be loaded. {@code dataSpec.length} must not exceed - * {@link Integer#MAX_VALUE}. If {@code dataSpec.length == DataSpec.LENGTH_UNBOUNDED} then + * {@link Integer#MAX_VALUE}. If {@code dataSpec.length == C.LENGTH_UNBOUNDED} then * the length resolved by {@code dataSource.open(dataSpec)} must not exceed * {@link Integer#MAX_VALUE}. * @param allocator Used to obtain an {@link Allocation} for holding the data. @@ -98,7 +100,8 @@ public final class DataSourceStream implements Loadable, NonBlockingInputStream } /** - * Returns the length of the streamin bytes. + * Returns the length of the stream in bytes, or {@value C#LENGTH_UNBOUNDED} if the length has + * yet to be determined. * * @return The length of the stream in bytes, or {@value C#LENGTH_UNBOUNDED} if the length has * yet to be determined. @@ -124,7 +127,7 @@ public final class DataSourceStream implements Loadable, NonBlockingInputStream * Note: The read methods provide a more efficient way of consuming the loaded data. Use this * method only when a freshly allocated byte[] containing all of the loaded data is required. * - * @return The loaded data or null. + * @return The loaded data, or null. */ public final byte[] getLoadedData() { if (loadPosition == 0) { @@ -192,6 +195,11 @@ public final class DataSourceStream implements Loadable, NonBlockingInputStream int bytesRead = 0; byte[][] buffers = allocation.getBuffers(); while (bytesRead < bytesToRead) { + if (readHead.fragmentRemaining == 0) { + readHead.fragmentIndex++; + readHead.fragmentOffset = allocation.getFragmentOffset(readHead.fragmentIndex); + readHead.fragmentRemaining = allocation.getFragmentLength(readHead.fragmentIndex); + } int bufferReadLength = Math.min(readHead.fragmentRemaining, bytesToRead - bytesRead); if (target != null) { target.put(buffers[readHead.fragmentIndex], readHead.fragmentOffset, bufferReadLength); @@ -204,11 +212,6 @@ public final class DataSourceStream implements Loadable, NonBlockingInputStream bytesRead += bufferReadLength; readHead.fragmentOffset += bufferReadLength; readHead.fragmentRemaining -= bufferReadLength; - if (readHead.fragmentRemaining == 0 && readHead.position < resolvedLength) { - readHead.fragmentIndex++; - readHead.fragmentOffset = allocation.getFragmentOffset(readHead.fragmentIndex); - readHead.fragmentRemaining = allocation.getFragmentLength(readHead.fragmentIndex); - } } return bytesRead; @@ -232,23 +235,32 @@ public final class DataSourceStream implements Loadable, NonBlockingInputStream // The load was canceled, or is already complete. return; } + try { DataSpec loadDataSpec; - if (resolvedLength == C.LENGTH_UNBOUNDED) { + if (loadPosition == 0 && resolvedLength == C.LENGTH_UNBOUNDED) { loadDataSpec = dataSpec; - resolvedLength = dataSource.open(loadDataSpec); + long resolvedLength = dataSource.open(loadDataSpec); if (resolvedLength > Integer.MAX_VALUE) { throw new DataSourceStreamLoadException( new UnexpectedLengthException(dataSpec.length, resolvedLength)); } + this.resolvedLength = resolvedLength; } else { + long remainingLength = resolvedLength != C.LENGTH_UNBOUNDED + ? resolvedLength - loadPosition : C.LENGTH_UNBOUNDED; loadDataSpec = new DataSpec(dataSpec.uri, dataSpec.position + loadPosition, - resolvedLength - loadPosition, dataSpec.key); + remainingLength, dataSpec.key); dataSource.open(loadDataSpec); } + if (allocation == null) { - allocation = allocator.allocate((int) resolvedLength); + int initialAllocationSize = resolvedLength != C.LENGTH_UNBOUNDED + ? (int) resolvedLength : CHUNKED_ALLOCATION_INCREMENT; + allocation = allocator.allocate(initialAllocationSize); } + int allocationCapacity = allocation.capacity(); + if (loadPosition == 0) { writeFragmentIndex = 0; writeFragmentOffset = allocation.getFragmentOffset(0); @@ -257,22 +269,28 @@ public final class DataSourceStream implements Loadable, NonBlockingInputStream int read = Integer.MAX_VALUE; byte[][] buffers = allocation.getBuffers(); - while (!loadCanceled && loadPosition < resolvedLength && read > 0) { + while (!loadCanceled && read > 0 && maybeMoreToLoad()) { if (Thread.interrupted()) { throw new InterruptedException(); } - int writeLength = (int) Math.min(writeFragmentRemainingLength, - resolvedLength - loadPosition); - read = dataSource.read(buffers[writeFragmentIndex], writeFragmentOffset, writeLength); + read = dataSource.read(buffers[writeFragmentIndex], writeFragmentOffset, + writeFragmentRemainingLength); if (read > 0) { loadPosition += read; writeFragmentOffset += read; writeFragmentRemainingLength -= read; - if (writeFragmentRemainingLength == 0 && loadPosition < resolvedLength) { + if (writeFragmentRemainingLength == 0 && maybeMoreToLoad()) { writeFragmentIndex++; + if (loadPosition == allocationCapacity) { + allocation.ensureCapacity(allocationCapacity + CHUNKED_ALLOCATION_INCREMENT); + allocationCapacity = allocation.capacity(); + buffers = allocation.getBuffers(); + } writeFragmentOffset = allocation.getFragmentOffset(writeFragmentIndex); writeFragmentRemainingLength = allocation.getFragmentLength(writeFragmentIndex); } + } else if (resolvedLength == C.LENGTH_UNBOUNDED) { + resolvedLength = loadPosition; } else if (resolvedLength != loadPosition) { throw new DataSourceStreamLoadException( new UnexpectedLengthException(resolvedLength, loadPosition)); @@ -283,6 +301,10 @@ public final class DataSourceStream implements Loadable, NonBlockingInputStream } } + private boolean maybeMoreToLoad() { + return resolvedLength == C.LENGTH_UNBOUNDED || loadPosition < resolvedLength; + } + private static class ReadHead { private int position; diff --git a/library/src/main/java/com/google/android/exoplayer/upstream/HttpDataSource.java b/library/src/main/java/com/google/android/exoplayer/upstream/HttpDataSource.java index 7f9bdd4d26..f9d3bf8f1a 100644 --- a/library/src/main/java/com/google/android/exoplayer/upstream/HttpDataSource.java +++ b/library/src/main/java/com/google/android/exoplayer/upstream/HttpDataSource.java @@ -260,13 +260,6 @@ public class HttpDataSource implements DataSource { long contentLength = getContentLength(connection); dataLength = dataSpec.length == C.LENGTH_UNBOUNDED ? contentLength : dataSpec.length; - if (dataLength == C.LENGTH_UNBOUNDED) { - // The DataSpec specified unbounded length and we failed to resolve a length from the - // response headers. - throw new HttpDataSourceException( - new UnexpectedLengthException(C.LENGTH_UNBOUNDED, C.LENGTH_UNBOUNDED), - dataSpec); - } if (dataSpec.length != C.LENGTH_UNBOUNDED && contentLength != C.LENGTH_UNBOUNDED && contentLength != dataSpec.length) { @@ -306,9 +299,9 @@ public class HttpDataSource implements DataSource { if (listener != null) { listener.onBytesTransferred(read); } - } else if (dataLength != bytesRead) { + } else if (dataLength != C.LENGTH_UNBOUNDED && dataLength != bytesRead) { // Check for cases where the server closed the connection having not sent the correct amount - // of data. + // of data. We can only do this if we know the length of the data we were expecting. throw new HttpDataSourceException(new UnexpectedLengthException(dataLength, bytesRead), dataSpec); } @@ -365,14 +358,15 @@ public class HttpDataSource implements DataSource { } /** - * Returns the number of bytes that are still to be read for the current {@link DataSpec}. This - * value is equivalent to {@code dataSpec.length - bytesRead()}, where dataSpec is the - * {@link DataSpec} that was passed to the most recent call of {@link #open(DataSpec)}. + * Returns the number of bytes that are still to be read for the current {@link DataSpec}. + *

+ * If the total length of the data being read is known, then this length minus {@code bytesRead()} + * is returned. If the total length is unknown, {@link C#LENGTH_UNBOUNDED} is returned. * - * @return The number of bytes remaining. + * @return The remaining length, or {@link C#LENGTH_UNBOUNDED}. */ protected final long bytesRemaining() { - return dataLength - bytesRead; + return dataLength == C.LENGTH_UNBOUNDED ? dataLength : dataLength - bytesRead; } private HttpURLConnection makeConnection(DataSpec dataSpec) throws IOException { @@ -436,10 +430,6 @@ public class HttpDataSource implements DataSource { } } } - if (contentLength == C.LENGTH_UNBOUNDED) { - Log.w(TAG, "Unable to parse content length [" + contentLengthHeader + "] [" + - contentRangeHeader + "]"); - } return contentLength; } diff --git a/library/src/main/java/com/google/android/exoplayer/upstream/TeeDataSource.java b/library/src/main/java/com/google/android/exoplayer/upstream/TeeDataSource.java index 6d05f8c344..cbb571f308 100644 --- a/library/src/main/java/com/google/android/exoplayer/upstream/TeeDataSource.java +++ b/library/src/main/java/com/google/android/exoplayer/upstream/TeeDataSource.java @@ -40,7 +40,7 @@ public final class TeeDataSource implements DataSource { @Override public long open(DataSpec dataSpec) throws IOException { long dataLength = upstream.open(dataSpec); - if (dataSpec.length == C.LENGTH_UNBOUNDED) { + if (dataSpec.length == C.LENGTH_UNBOUNDED && dataLength != C.LENGTH_UNBOUNDED) { // Reconstruct dataSpec in order to provide the resolved length to the sink. dataSpec = new DataSpec(dataSpec.uri, dataSpec.absoluteStreamPosition, dataLength, dataSpec.key, dataSpec.position, dataSpec.uriIsFullStream); diff --git a/library/src/main/java/com/google/android/exoplayer/upstream/cache/CacheDataSink.java b/library/src/main/java/com/google/android/exoplayer/upstream/cache/CacheDataSink.java index 02177f7d93..942a29f0c7 100644 --- a/library/src/main/java/com/google/android/exoplayer/upstream/cache/CacheDataSink.java +++ b/library/src/main/java/com/google/android/exoplayer/upstream/cache/CacheDataSink.java @@ -15,6 +15,7 @@ */ package com.google.android.exoplayer.upstream.cache; +import com.google.android.exoplayer.C; import com.google.android.exoplayer.upstream.DataSink; import com.google.android.exoplayer.upstream.DataSpec; import com.google.android.exoplayer.util.Assertions; @@ -63,6 +64,9 @@ public class CacheDataSink implements DataSink { @Override public DataSink open(DataSpec dataSpec) throws CacheDataSinkException { + // TODO: Support caching for unbounded requests. See TODO in {@link CacheDataSource} for + // more details. + Assertions.checkState(dataSpec.length != C.LENGTH_UNBOUNDED); try { this.dataSpec = dataSpec; dataSpecBytesWritten = 0;