Add CronetDataSource.read(ByteBuffer) method that writes directly into caller's buffer.

PiperOrigin-RevId: 251915459
This commit is contained in:
olly 2019-06-06 21:31:11 +01:00 committed by Toni
parent 1fb105bbb2
commit e525c1c59e
2 changed files with 492 additions and 25 deletions

View File

@ -40,6 +40,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -504,32 +505,9 @@ public class CronetDataSource extends BaseDataSource implements HttpDataSource {
// Fill readBuffer with more data from Cronet.
operation.close();
readBuffer.clear();
castNonNull(currentUrlRequest).read(readBuffer);
try {
if (!operation.block(readTimeoutMs)) {
throw new SocketTimeoutException();
}
} catch (InterruptedException e) {
// The operation is ongoing so replace readBuffer to avoid it being written to by this
// operation during a subsequent request.
this.readBuffer = null;
Thread.currentThread().interrupt();
throw new HttpDataSourceException(
new InterruptedIOException(e),
castNonNull(currentDataSpec),
HttpDataSourceException.TYPE_READ);
} catch (SocketTimeoutException e) {
// The operation is ongoing so replace readBuffer to avoid it being written to by this
// operation during a subsequent request.
this.readBuffer = null;
throw new HttpDataSourceException(
e, castNonNull(currentDataSpec), HttpDataSourceException.TYPE_READ);
}
readInternal(castNonNull(readBuffer));
if (exception != null) {
throw new HttpDataSourceException(
exception, castNonNull(currentDataSpec), HttpDataSourceException.TYPE_READ);
} else if (finished) {
if (finished) {
bytesRemaining = 0;
return C.RESULT_END_OF_INPUT;
} else {
@ -554,6 +532,115 @@ public class CronetDataSource extends BaseDataSource implements HttpDataSource {
return bytesRead;
}
/**
* Reads up to {@code buffer.remaining()} bytes of data and stores them into {@code buffer},
* starting at {@code buffer.position()}. Advances the position of the buffer by the number of
* bytes read and returns this length.
*
* <p>If there is an error, a {@link HttpDataSourceException} is thrown and the contents of {@code
* buffer} should be ignored. If the exception has error code {@code
* HttpDataSourceException.TYPE_READ}, note that Cronet may continue writing into {@code buffer}
* after the method has returned. Thus the caller should not attempt to reuse the buffer.
*
* <p>If {@code buffer.remaining()} is zero then 0 is returned. Otherwise, if no data is available
* because the end of the opened range has been reached, then {@link C#RESULT_END_OF_INPUT} is
* returned. Otherwise, the call will block until at least one byte of data has been read and the
* number of bytes read is returned.
*
* <p>Passed buffer must be direct ByteBuffer. If you have a non-direct ByteBuffer, consider the
* alternative read method with its backed array.
*
* @param buffer The ByteBuffer into which the read data should be stored. Must be a direct
* ByteBuffer.
* @return The number of bytes read, or {@link C#RESULT_END_OF_INPUT} if no data is available
* because the end of the opened range has been reached.
* @throws HttpDataSourceException If an error occurs reading from the source.
* @throws IllegalArgumentException If {@codes buffer} is not a direct ByteBuffer.
*/
public int read(ByteBuffer buffer) throws HttpDataSourceException {
Assertions.checkState(opened);
if (!buffer.isDirect()) {
throw new IllegalArgumentException("Passed buffer is not a direct ByteBuffer");
}
if (!buffer.hasRemaining()) {
return 0;
} else if (bytesRemaining == 0) {
return C.RESULT_END_OF_INPUT;
}
int readLength = buffer.remaining();
if (readBuffer != null) {
// Skip all the bytes we can from readBuffer if there are still bytes to skip.
if (bytesToSkip != 0) {
if (bytesToSkip >= readBuffer.remaining()) {
bytesToSkip -= readBuffer.remaining();
readBuffer.position(readBuffer.limit());
} else {
readBuffer.position(readBuffer.position() + (int) bytesToSkip);
bytesToSkip = 0;
}
}
// If there is existing data in the readBuffer, read as much as possible. Return if any read.
int copyBytes = copyByteBuffer(/* src= */ readBuffer, /* dst= */ buffer);
if (copyBytes != 0) {
if (bytesRemaining != C.LENGTH_UNSET) {
bytesRemaining -= copyBytes;
}
bytesTransferred(copyBytes);
return copyBytes;
}
}
boolean readMore = true;
while (readMore) {
// If bytesToSkip > 0, read into intermediate buffer that we can discard instead of caller's
// buffer. If we do not need to skip bytes, we may write to buffer directly.
final boolean useCallerBuffer = bytesToSkip == 0;
operation.close();
if (!useCallerBuffer) {
if (readBuffer == null) {
readBuffer = ByteBuffer.allocateDirect(READ_BUFFER_SIZE_BYTES);
} else {
readBuffer.clear();
}
if (bytesToSkip < READ_BUFFER_SIZE_BYTES) {
readBuffer.limit((int) bytesToSkip);
}
}
// Fill buffer with more data from Cronet.
readInternal(useCallerBuffer ? buffer : castNonNull(readBuffer));
if (finished) {
bytesRemaining = 0;
return C.RESULT_END_OF_INPUT;
} else {
// The operation didn't time out, fail or finish, and therefore data must have been read.
Assertions.checkState(
useCallerBuffer
? readLength > buffer.remaining()
: castNonNull(readBuffer).position() > 0);
// If we meant to skip bytes, subtract what was left and repeat, otherwise, continue.
if (useCallerBuffer) {
readMore = false;
} else {
bytesToSkip -= castNonNull(readBuffer).position();
}
}
}
final int bytesRead = readLength - buffer.remaining();
if (bytesRemaining != C.LENGTH_UNSET) {
bytesRemaining -= bytesRead;
}
bytesTransferred(bytesRead);
return bytesRead;
}
@Override
public synchronized void close() {
if (currentUrlRequest != null) {
@ -655,6 +742,47 @@ public class CronetDataSource extends BaseDataSource implements HttpDataSource {
currentConnectTimeoutMs = clock.elapsedRealtime() + connectTimeoutMs;
}
/**
* Reads up to {@code buffer.remaining()} bytes of data from {@code currentUrlRequest} and stores
* them into {@code buffer}. If there is an error and {@code buffer == readBuffer}, then it resets
* the current {@code readBuffer} object so that it is not reused in the future.
*
* @param buffer The ByteBuffer into which the read data is stored. Must be a direct ByteBuffer.
* @throws HttpDataSourceException If an error occurs reading from the source.
*/
private void readInternal(ByteBuffer buffer) throws HttpDataSourceException {
castNonNull(currentUrlRequest).read(buffer);
try {
if (!operation.block(readTimeoutMs)) {
throw new SocketTimeoutException();
}
} catch (InterruptedException e) {
// The operation is ongoing so replace buffer to avoid it being written to by this
// operation during a subsequent request.
if (Objects.equals(buffer, readBuffer)) {
readBuffer = null;
}
Thread.currentThread().interrupt();
throw new HttpDataSourceException(
new InterruptedIOException(e),
castNonNull(currentDataSpec),
HttpDataSourceException.TYPE_READ);
} catch (SocketTimeoutException e) {
// The operation is ongoing so replace buffer to avoid it being written to by this
// operation during a subsequent request.
if (Objects.equals(buffer, readBuffer)) {
readBuffer = null;
}
throw new HttpDataSourceException(
e, castNonNull(currentDataSpec), HttpDataSourceException.TYPE_READ);
}
if (exception != null) {
throw new HttpDataSourceException(
exception, castNonNull(currentDataSpec), HttpDataSourceException.TYPE_READ);
}
}
private static boolean isCompressed(UrlResponseInfo info) {
for (Map.Entry<String, String> entry : info.getAllHeadersAsList()) {
if (entry.getKey().equalsIgnoreCase("Content-Encoding")) {
@ -738,6 +866,17 @@ public class CronetDataSource extends BaseDataSource implements HttpDataSource {
return list == null || list.isEmpty();
}
// Copy as much as possible from the src buffer into dst buffer.
// Returns the number of bytes copied.
private static int copyByteBuffer(ByteBuffer src, ByteBuffer dst) {
int remaining = Math.min(src.remaining(), dst.remaining());
int limit = src.limit();
src.limit(src.position() + remaining);
dst.put(src);
src.limit(limit);
return remaining;
}
private final class UrlRequestCallback extends UrlRequest.Callback {
@Override

View File

@ -554,6 +554,260 @@ public final class CronetDataSourceTest {
assertThat(bytesRead).isEqualTo(16);
}
@Test
public void testRequestReadByteBufferTwice() throws HttpDataSourceException {
mockResponseStartSuccess();
mockReadSuccess(0, 16);
dataSourceUnderTest.open(testDataSpec);
ByteBuffer returnedBuffer = ByteBuffer.allocateDirect(8);
int bytesRead = dataSourceUnderTest.read(returnedBuffer);
assertThat(bytesRead).isEqualTo(8);
returnedBuffer.flip();
assertThat(copyByteBufferToArray(returnedBuffer)).isEqualTo(buildTestDataArray(0, 8));
// Use a wrapped ByteBuffer instead of direct for coverage.
returnedBuffer.rewind();
bytesRead = dataSourceUnderTest.read(returnedBuffer);
returnedBuffer.flip();
assertThat(copyByteBufferToArray(returnedBuffer)).isEqualTo(buildTestDataArray(8, 8));
assertThat(bytesRead).isEqualTo(8);
// Separate cronet calls for each read.
verify(mockUrlRequest, times(2)).read(any(ByteBuffer.class));
verify(mockTransferListener, times(2))
.onBytesTransferred(dataSourceUnderTest, testDataSpec, /* isNetwork= */ true, 8);
}
@Test
public void testRequestIntermixRead() throws HttpDataSourceException {
mockResponseStartSuccess();
// Chunking reads into parts 6, 7, 8, 9.
mockReadSuccess(0, 30);
dataSourceUnderTest.open(testDataSpec);
ByteBuffer returnedBuffer = ByteBuffer.allocateDirect(6);
int bytesRead = dataSourceUnderTest.read(returnedBuffer);
returnedBuffer.flip();
assertThat(copyByteBufferToArray(returnedBuffer)).isEqualTo(buildTestDataArray(0, 6));
assertThat(bytesRead).isEqualTo(6);
byte[] returnedBytes = new byte[7];
bytesRead += dataSourceUnderTest.read(returnedBytes, 0, 7);
assertThat(returnedBytes).isEqualTo(buildTestDataArray(6, 7));
assertThat(bytesRead).isEqualTo(6 + 7);
returnedBuffer = ByteBuffer.allocateDirect(8);
bytesRead += dataSourceUnderTest.read(returnedBuffer);
returnedBuffer.flip();
assertThat(copyByteBufferToArray(returnedBuffer)).isEqualTo(buildTestDataArray(13, 8));
assertThat(bytesRead).isEqualTo(6 + 7 + 8);
returnedBytes = new byte[9];
bytesRead += dataSourceUnderTest.read(returnedBytes, 0, 9);
assertThat(returnedBytes).isEqualTo(buildTestDataArray(21, 9));
assertThat(bytesRead).isEqualTo(6 + 7 + 8 + 9);
// First ByteBuffer call. The first byte[] call populates enough bytes for the rest.
verify(mockUrlRequest, times(2)).read(any(ByteBuffer.class));
verify(mockTransferListener, times(1))
.onBytesTransferred(dataSourceUnderTest, testDataSpec, /* isNetwork= */ true, 6);
verify(mockTransferListener, times(1))
.onBytesTransferred(dataSourceUnderTest, testDataSpec, /* isNetwork= */ true, 7);
verify(mockTransferListener, times(1))
.onBytesTransferred(dataSourceUnderTest, testDataSpec, /* isNetwork= */ true, 8);
verify(mockTransferListener, times(1))
.onBytesTransferred(dataSourceUnderTest, testDataSpec, /* isNetwork= */ true, 9);
}
@Test
public void testSecondRequestNoContentLengthReadByteBuffer() throws HttpDataSourceException {
mockResponseStartSuccess();
testResponseHeader.put("Content-Length", Long.toString(1L));
mockReadSuccess(0, 16);
// First request.
dataSourceUnderTest.open(testDataSpec);
ByteBuffer returnedBuffer = ByteBuffer.allocateDirect(8);
dataSourceUnderTest.read(returnedBuffer);
dataSourceUnderTest.close();
testResponseHeader.remove("Content-Length");
mockReadSuccess(0, 16);
// Second request.
dataSourceUnderTest.open(testDataSpec);
returnedBuffer = ByteBuffer.allocateDirect(16);
returnedBuffer.limit(10);
int bytesRead = dataSourceUnderTest.read(returnedBuffer);
assertThat(bytesRead).isEqualTo(10);
returnedBuffer.limit(returnedBuffer.capacity());
bytesRead = dataSourceUnderTest.read(returnedBuffer);
assertThat(bytesRead).isEqualTo(6);
returnedBuffer.rewind();
bytesRead = dataSourceUnderTest.read(returnedBuffer);
assertThat(bytesRead).isEqualTo(C.RESULT_END_OF_INPUT);
}
@Test
public void testRangeRequestWith206ResponseReadByteBuffer() throws HttpDataSourceException {
mockResponseStartSuccess();
mockReadSuccess(1000, 5000);
testUrlResponseInfo = createUrlResponseInfo(206); // Server supports range requests.
testDataSpec = new DataSpec(Uri.parse(TEST_URL), 1000, 5000, null);
dataSourceUnderTest.open(testDataSpec);
ByteBuffer returnedBuffer = ByteBuffer.allocateDirect(16);
int bytesRead = dataSourceUnderTest.read(returnedBuffer);
assertThat(bytesRead).isEqualTo(16);
returnedBuffer.flip();
assertThat(copyByteBufferToArray(returnedBuffer)).isEqualTo(buildTestDataArray(1000, 16));
verify(mockTransferListener)
.onBytesTransferred(dataSourceUnderTest, testDataSpec, /* isNetwork= */ true, 16);
}
@Test
public void testRangeRequestWith200ResponseReadByteBuffer() throws HttpDataSourceException {
// Tests for skipping bytes.
mockResponseStartSuccess();
mockReadSuccess(0, 7000);
testUrlResponseInfo = createUrlResponseInfo(200); // Server does not support range requests.
testDataSpec = new DataSpec(Uri.parse(TEST_URL), 1000, 5000, null);
dataSourceUnderTest.open(testDataSpec);
ByteBuffer returnedBuffer = ByteBuffer.allocateDirect(16);
int bytesRead = dataSourceUnderTest.read(returnedBuffer);
assertThat(bytesRead).isEqualTo(16);
returnedBuffer.flip();
assertThat(copyByteBufferToArray(returnedBuffer)).isEqualTo(buildTestDataArray(1000, 16));
verify(mockTransferListener)
.onBytesTransferred(dataSourceUnderTest, testDataSpec, /* isNetwork= */ true, 16);
}
@Test
public void testReadByteBufferWithUnsetLength() throws HttpDataSourceException {
testResponseHeader.remove("Content-Length");
mockResponseStartSuccess();
mockReadSuccess(0, 16);
dataSourceUnderTest.open(testDataSpec);
ByteBuffer returnedBuffer = ByteBuffer.allocateDirect(16);
returnedBuffer.limit(8);
int bytesRead = dataSourceUnderTest.read(returnedBuffer);
returnedBuffer.flip();
assertThat(copyByteBufferToArray(returnedBuffer)).isEqualTo(buildTestDataArray(0, 8));
assertThat(bytesRead).isEqualTo(8);
verify(mockTransferListener)
.onBytesTransferred(dataSourceUnderTest, testDataSpec, /* isNetwork= */ true, 8);
}
@Test
public void testReadByteBufferReturnsWhatItCan() throws HttpDataSourceException {
mockResponseStartSuccess();
mockReadSuccess(0, 16);
dataSourceUnderTest.open(testDataSpec);
ByteBuffer returnedBuffer = ByteBuffer.allocateDirect(24);
int bytesRead = dataSourceUnderTest.read(returnedBuffer);
returnedBuffer.flip();
assertThat(copyByteBufferToArray(returnedBuffer)).isEqualTo(buildTestDataArray(0, 16));
assertThat(bytesRead).isEqualTo(16);
verify(mockTransferListener)
.onBytesTransferred(dataSourceUnderTest, testDataSpec, /* isNetwork= */ true, 16);
}
@Test
public void testOverreadByteBuffer() throws HttpDataSourceException {
testDataSpec = new DataSpec(Uri.parse(TEST_URL), 0, 16, null);
testResponseHeader.put("Content-Length", Long.toString(16L));
mockResponseStartSuccess();
mockReadSuccess(0, 16);
dataSourceUnderTest.open(testDataSpec);
ByteBuffer returnedBuffer = ByteBuffer.allocateDirect(8);
int bytesRead = dataSourceUnderTest.read(returnedBuffer);
assertThat(bytesRead).isEqualTo(8);
returnedBuffer.flip();
assertThat(copyByteBufferToArray(returnedBuffer)).isEqualTo(buildTestDataArray(0, 8));
// The current buffer is kept if not completely consumed by DataSource reader.
returnedBuffer = ByteBuffer.allocateDirect(6);
bytesRead += dataSourceUnderTest.read(returnedBuffer);
assertThat(bytesRead).isEqualTo(14);
returnedBuffer.flip();
assertThat(copyByteBufferToArray(returnedBuffer)).isEqualTo(buildTestDataArray(8, 6));
// 2 bytes left at this point.
returnedBuffer = ByteBuffer.allocateDirect(8);
bytesRead += dataSourceUnderTest.read(returnedBuffer);
assertThat(bytesRead).isEqualTo(16);
returnedBuffer.flip();
assertThat(copyByteBufferToArray(returnedBuffer)).isEqualTo(buildTestDataArray(14, 2));
// Called on each.
verify(mockUrlRequest, times(3)).read(any(ByteBuffer.class));
verify(mockTransferListener, times(1))
.onBytesTransferred(dataSourceUnderTest, testDataSpec, /* isNetwork= */ true, 8);
verify(mockTransferListener, times(1))
.onBytesTransferred(dataSourceUnderTest, testDataSpec, /* isNetwork= */ true, 6);
verify(mockTransferListener, times(1))
.onBytesTransferred(dataSourceUnderTest, testDataSpec, /* isNetwork= */ true, 2);
// Now we already returned the 16 bytes initially asked.
// Try to read again even though all requested 16 bytes are already returned.
// Return C.RESULT_END_OF_INPUT
returnedBuffer = ByteBuffer.allocateDirect(16);
int bytesOverRead = dataSourceUnderTest.read(returnedBuffer);
assertThat(bytesOverRead).isEqualTo(C.RESULT_END_OF_INPUT);
assertThat(returnedBuffer.position()).isEqualTo(0);
// C.RESULT_END_OF_INPUT should not be reported though the TransferListener.
verify(mockTransferListener, never())
.onBytesTransferred(
dataSourceUnderTest, testDataSpec, /* isNetwork= */ true, C.RESULT_END_OF_INPUT);
// Number of calls to cronet should not have increased.
verify(mockUrlRequest, times(3)).read(any(ByteBuffer.class));
// Check for connection not automatically closed.
verify(mockUrlRequest, never()).cancel();
assertThat(bytesRead).isEqualTo(16);
}
@Test
public void testClosedMeansClosedReadByteBuffer() throws HttpDataSourceException {
mockResponseStartSuccess();
mockReadSuccess(0, 16);
int bytesRead = 0;
dataSourceUnderTest.open(testDataSpec);
ByteBuffer returnedBuffer = ByteBuffer.allocateDirect(16);
returnedBuffer.limit(8);
bytesRead += dataSourceUnderTest.read(returnedBuffer);
returnedBuffer.flip();
assertThat(copyByteBufferToArray(returnedBuffer)).isEqualTo(buildTestDataArray(0, 8));
assertThat(bytesRead).isEqualTo(8);
dataSourceUnderTest.close();
verify(mockTransferListener)
.onTransferEnd(dataSourceUnderTest, testDataSpec, /* isNetwork= */ true);
try {
bytesRead += dataSourceUnderTest.read(returnedBuffer);
fail();
} catch (IllegalStateException e) {
// Expected.
}
// 16 bytes were attempted but only 8 should have been successfully read.
assertThat(bytesRead).isEqualTo(8);
}
@Test
public void testConnectTimeout() throws InterruptedException {
long startTimeMs = SystemClock.elapsedRealtime();
@ -855,6 +1109,36 @@ public final class CronetDataSourceTest {
}
}
@Test
public void testReadByteBufferFailure() throws HttpDataSourceException {
mockResponseStartSuccess();
mockReadFailure();
dataSourceUnderTest.open(testDataSpec);
ByteBuffer returnedBuffer = ByteBuffer.allocateDirect(8);
try {
dataSourceUnderTest.read(returnedBuffer);
fail("dataSourceUnderTest.read() returned, but IOException expected");
} catch (IOException e) {
// Expected.
}
}
@Test
public void testReadNonDirectedByteBufferFailure() throws HttpDataSourceException {
mockResponseStartSuccess();
mockReadFailure();
dataSourceUnderTest.open(testDataSpec);
byte[] returnedBuffer = new byte[8];
try {
dataSourceUnderTest.read(ByteBuffer.wrap(returnedBuffer));
fail("dataSourceUnderTest.read() returned, but IllegalArgumentException expected");
} catch (IllegalArgumentException e) {
// Expected.
}
}
@Test
public void testReadInterrupted() throws HttpDataSourceException, InterruptedException {
mockResponseStartSuccess();
@ -886,6 +1170,37 @@ public final class CronetDataSourceTest {
timedOutLatch.await();
}
@Test
public void testReadByteBufferInterrupted() throws HttpDataSourceException, InterruptedException {
mockResponseStartSuccess();
dataSourceUnderTest.open(testDataSpec);
final ConditionVariable startCondition = buildReadStartedCondition();
final CountDownLatch timedOutLatch = new CountDownLatch(1);
ByteBuffer returnedBuffer = ByteBuffer.allocateDirect(8);
Thread thread =
new Thread() {
@Override
public void run() {
try {
dataSourceUnderTest.read(returnedBuffer);
fail();
} catch (HttpDataSourceException e) {
// Expected.
assertThat(e.getCause() instanceof CronetDataSource.InterruptedIOException).isTrue();
timedOutLatch.countDown();
}
}
};
thread.start();
startCondition.block();
assertNotCountedDown(timedOutLatch);
// Now we interrupt.
thread.interrupt();
timedOutLatch.await();
}
@Test
public void testAllowDirectExecutor() throws HttpDataSourceException {
testDataSpec = new DataSpec(Uri.parse(TEST_URL), 1000, 5000, null);
@ -1064,4 +1379,17 @@ public final class CronetDataSourceTest {
testBuffer.flip();
return testBuffer;
}
// Returns a copy of what is remaining in the src buffer from the current position to capacity.
private static byte[] copyByteBufferToArray(ByteBuffer src) {
if (src == null) {
return null;
}
byte[] copy = new byte[src.remaining()];
int index = 0;
while (src.hasRemaining()) {
copy[index++] = src.get();
}
return copy;
}
}