Finish cleaning DataSource implementations.

- Enfroce read returns 0 if readLength==0 everywhere.
- Fixes and simplifications for CronetDataSource.

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=135138232
This commit is contained in:
olly 2016-10-04 12:30:50 -07:00 committed by Oliver Woodman
parent f8ed4cfdee
commit b3c6f1caae
4 changed files with 103 additions and 118 deletions

View File

@ -28,7 +28,6 @@ import com.google.android.exoplayer2.util.Assertions;
import com.google.android.exoplayer2.util.Clock; import com.google.android.exoplayer2.util.Clock;
import com.google.android.exoplayer2.util.Predicate; import com.google.android.exoplayer2.util.Predicate;
import com.google.android.exoplayer2.util.SystemClock; import com.google.android.exoplayer2.util.SystemClock;
import com.google.android.exoplayer2.util.TraceUtil;
import java.io.IOException; import java.io.IOException;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
@ -99,7 +98,7 @@ public class CronetDataSource extends UrlRequest.Callback implements HttpDataSou
private final CronetEngine cronetEngine; private final CronetEngine cronetEngine;
private final Executor executor; private final Executor executor;
private final Predicate<String> contentTypePredicate; private final Predicate<String> contentTypePredicate;
private final TransferListener transferListener; private final TransferListener<? super CronetDataSource> listener;
private final int connectTimeoutMs; private final int connectTimeoutMs;
private final int readTimeoutMs; private final int readTimeoutMs;
private final boolean resetTimeoutOnRedirects; private final boolean resetTimeoutOnRedirects;
@ -127,12 +126,12 @@ public class CronetDataSource extends UrlRequest.Callback implements HttpDataSou
* @param contentTypePredicate An optional {@link Predicate}. If a content type is rejected by the * @param contentTypePredicate An optional {@link Predicate}. If a content type is rejected by the
* predicate then an {@link InvalidContentTypeException} is thrown from * predicate then an {@link InvalidContentTypeException} is thrown from
* {@link #open(DataSpec)}. * {@link #open(DataSpec)}.
* @param transferListener A listener. * @param listener An optional listener.
*/ */
public CronetDataSource(CronetEngine cronetEngine, Executor executor, public CronetDataSource(CronetEngine cronetEngine, Executor executor,
Predicate<String> contentTypePredicate, TransferListener transferListener) { Predicate<String> contentTypePredicate, TransferListener<? super CronetDataSource> listener) {
this(cronetEngine, executor, contentTypePredicate, transferListener, this(cronetEngine, executor, contentTypePredicate, listener, DEFAULT_CONNECT_TIMEOUT_MILLIS,
DEFAULT_CONNECT_TIMEOUT_MILLIS, DEFAULT_READ_TIMEOUT_MILLIS, false); DEFAULT_READ_TIMEOUT_MILLIS, false);
} }
/** /**
@ -141,25 +140,25 @@ public class CronetDataSource extends UrlRequest.Callback implements HttpDataSou
* @param contentTypePredicate An optional {@link Predicate}. If a content type is rejected by the * @param contentTypePredicate An optional {@link Predicate}. If a content type is rejected by the
* predicate then an {@link InvalidContentTypeException} is thrown from * predicate then an {@link InvalidContentTypeException} is thrown from
* {@link #open(DataSpec)}. * {@link #open(DataSpec)}.
* @param transferListener A listener. * @param listener An optional listener.
* @param connectTimeoutMs The connection timeout, in milliseconds. * @param connectTimeoutMs The connection timeout, in milliseconds.
* @param readTimeoutMs The read timeout, in milliseconds. * @param readTimeoutMs The read timeout, in milliseconds.
* @param resetTimeoutOnRedirects Whether the connect timeout is reset when a redirect occurs. * @param resetTimeoutOnRedirects Whether the connect timeout is reset when a redirect occurs.
*/ */
public CronetDataSource(CronetEngine cronetEngine, Executor executor, public CronetDataSource(CronetEngine cronetEngine, Executor executor,
Predicate<String> contentTypePredicate, TransferListener transferListener, Predicate<String> contentTypePredicate, TransferListener<? super CronetDataSource> listener,
int connectTimeoutMs, int readTimeoutMs, boolean resetTimeoutOnRedirects) { int connectTimeoutMs, int readTimeoutMs, boolean resetTimeoutOnRedirects) {
this(cronetEngine, executor, contentTypePredicate, transferListener, connectTimeoutMs, this(cronetEngine, executor, contentTypePredicate, listener, connectTimeoutMs,
readTimeoutMs, resetTimeoutOnRedirects, new SystemClock()); readTimeoutMs, resetTimeoutOnRedirects, new SystemClock());
} }
/* package */ CronetDataSource(CronetEngine cronetEngine, Executor executor, /* package */ CronetDataSource(CronetEngine cronetEngine, Executor executor,
Predicate<String> contentTypePredicate, TransferListener transferListener, Predicate<String> contentTypePredicate, TransferListener<? super CronetDataSource> listener,
int connectTimeoutMs, int readTimeoutMs, boolean resetTimeoutOnRedirects, Clock clock) { int connectTimeoutMs, int readTimeoutMs, boolean resetTimeoutOnRedirects, Clock clock) {
this.cronetEngine = Assertions.checkNotNull(cronetEngine); this.cronetEngine = Assertions.checkNotNull(cronetEngine);
this.executor = Assertions.checkNotNull(executor); this.executor = Assertions.checkNotNull(executor);
this.contentTypePredicate = contentTypePredicate; this.contentTypePredicate = contentTypePredicate;
this.transferListener = transferListener; this.listener = listener;
this.connectTimeoutMs = connectTimeoutMs; this.connectTimeoutMs = connectTimeoutMs;
this.readTimeoutMs = readTimeoutMs; this.readTimeoutMs = readTimeoutMs;
this.resetTimeoutOnRedirects = resetTimeoutOnRedirects; this.resetTimeoutOnRedirects = resetTimeoutOnRedirects;
@ -198,36 +197,31 @@ public class CronetDataSource extends UrlRequest.Callback implements HttpDataSou
@Override @Override
public long open(DataSpec dataSpec) throws HttpDataSourceException { public long open(DataSpec dataSpec) throws HttpDataSourceException {
TraceUtil.beginSection("CronetDataSource.open"); Assertions.checkNotNull(dataSpec);
try { synchronized (this) {
Assertions.checkNotNull(dataSpec); Assertions.checkState(connectionState == IDLE_CONNECTION, "Connection already open");
synchronized (this) { connectionState = OPENING_CONNECTION;
Assertions.checkState(connectionState == IDLE_CONNECTION, "Connection already open");
connectionState = OPENING_CONNECTION;
}
operation.close();
resetConnectTimeout();
startRequest(dataSpec);
boolean requestStarted = blockUntilConnectTimeout();
if (exception != null) {
// An error occurred opening the connection.
throw exception;
} else if (!requestStarted) {
// The timeout was reached before the connection was opened.
throw new OpenException(new SocketTimeoutException(), dataSpec, getCurrentRequestStatus());
}
// Connection was opened.
if (transferListener != null) {
transferListener.onTransferStart(this, dataSpec);
}
connectionState = OPEN_CONNECTION;
return contentLength;
} finally {
TraceUtil.endSection();
} }
operation.close();
resetConnectTimeout();
startRequest(dataSpec);
boolean requestStarted = blockUntilConnectTimeout();
if (exception != null) {
// An error occurred opening the connection.
throw exception;
} else if (!requestStarted) {
// The timeout was reached before the connection was opened.
throw new OpenException(new SocketTimeoutException(), dataSpec, getCurrentRequestStatus());
}
// Connection was opened.
if (listener != null) {
listener.onTransferStart(this, dataSpec);
}
connectionState = OPEN_CONNECTION;
return contentLength;
} }
private void startRequest(DataSpec dataSpec) throws HttpDataSourceException { private void startRequest(DataSpec dataSpec) throws HttpDataSourceException {
@ -274,8 +268,8 @@ public class CronetDataSource extends UrlRequest.Callback implements HttpDataSou
} }
@Override @Override
public synchronized void onFailed( public synchronized void onFailed(UrlRequest request, UrlResponseInfo info,
UrlRequest request, UrlResponseInfo info, UrlRequestException error) { UrlRequestException error) {
if (request != currentUrlRequest) { if (request != currentUrlRequest) {
return; return;
} }
@ -284,7 +278,6 @@ public class CronetDataSource extends UrlRequest.Callback implements HttpDataSou
? new UnknownHostException() : error; ? new UnknownHostException() : error;
exception = new OpenException(cause, currentDataSpec, getCurrentRequestStatus()); exception = new OpenException(cause, currentDataSpec, getCurrentRequestStatus());
} else if (connectionState == OPEN_CONNECTION) { } else if (connectionState == OPEN_CONNECTION) {
readBuffer.limit(0);
exception = new HttpDataSourceException(error, currentDataSpec, exception = new HttpDataSourceException(error, currentDataSpec,
HttpDataSourceException.TYPE_READ); HttpDataSourceException.TYPE_READ);
} }
@ -296,7 +289,6 @@ public class CronetDataSource extends UrlRequest.Callback implements HttpDataSou
if (request != currentUrlRequest) { if (request != currentUrlRequest) {
return; return;
} }
TraceUtil.beginSection("CronetDataSource.onResponseStarted");
try { try {
validateResponse(info); validateResponse(info);
responseInfo = info; responseInfo = info;
@ -327,7 +319,6 @@ public class CronetDataSource extends UrlRequest.Callback implements HttpDataSou
exception = e; exception = e;
} finally { } finally {
operation.open(); operation.open();
TraceUtil.endSection();
} }
} }
@ -415,58 +406,49 @@ public class CronetDataSource extends UrlRequest.Callback implements HttpDataSou
@Override @Override
public int read(byte[] buffer, int offset, int readLength) throws HttpDataSourceException { public int read(byte[] buffer, int offset, int readLength) throws HttpDataSourceException {
TraceUtil.beginSection("CronetDataSource.read"); synchronized (this) {
try { Assertions.checkState(connectionState == OPEN_CONNECTION);
synchronized (this) { }
if (connectionState != OPEN_CONNECTION) {
throw new IllegalStateException("Connection not ready");
}
}
// If being asked to read beyond the amount of bytes initially requested, return if (readLength == 0) {
// RESULT_END_OF_INPUT. return 0;
if (expectedBytesRemainingToRead != null && expectedBytesRemainingToRead.get() <= 0) { }
if (expectedBytesRemainingToRead != null && expectedBytesRemainingToRead.get() == 0) {
return C.RESULT_END_OF_INPUT;
}
if (!hasData) {
// Read more data from cronet.
operation.close();
readBuffer.clear();
currentUrlRequest.read(readBuffer);
if (!operation.block(readTimeoutMs)) {
throw new HttpDataSourceException(
new SocketTimeoutException(), currentDataSpec, HttpDataSourceException.TYPE_READ);
}
if (exception != null) {
throw exception;
}
// The expected response length is unknown, but cronet has indicated that the request
// already finished successfully.
if (responseFinished) {
return C.RESULT_END_OF_INPUT; return C.RESULT_END_OF_INPUT;
} }
if (!hasData) {
// Read more data from cronet.
operation.close();
currentUrlRequest.read(readBuffer);
if (!operation.block(readTimeoutMs)) {
throw new HttpDataSourceException(
new SocketTimeoutException(), currentDataSpec, HttpDataSourceException.TYPE_READ);
}
if (exception != null) {
throw exception;
}
// The expected response length is unknown, but cronet has indicated that the request
// already finished successfully.
if (responseFinished) {
return C.RESULT_END_OF_INPUT;
}
}
int bytesRead = Math.min(readBuffer.remaining(), readLength);
readBuffer.get(buffer, offset, bytesRead);
if (!readBuffer.hasRemaining()) {
readBuffer.clear();
hasData = false;
}
if (expectedBytesRemainingToRead != null) {
expectedBytesRemainingToRead.addAndGet(-bytesRead);
}
if (transferListener != null && bytesRead >= 0) {
transferListener.onBytesTransferred(this, bytesRead);
}
return bytesRead;
} finally {
TraceUtil.endSection();
} }
int bytesRead = Math.min(readBuffer.remaining(), readLength);
readBuffer.get(buffer, offset, bytesRead);
if (!readBuffer.hasRemaining()) {
hasData = false;
}
if (expectedBytesRemainingToRead != null) {
expectedBytesRemainingToRead.addAndGet(-bytesRead);
}
if (listener != null) {
listener.onBytesTransferred(this, bytesRead);
}
return bytesRead;
} }
@Override @Override
@ -499,9 +481,7 @@ public class CronetDataSource extends UrlRequest.Callback implements HttpDataSou
return; return;
} }
readBuffer.flip(); readBuffer.flip();
if (readBuffer.limit() > 0) { hasData = true;
hasData = true;
}
operation.open(); operation.open();
} }
@ -516,27 +496,24 @@ public class CronetDataSource extends UrlRequest.Callback implements HttpDataSou
@Override @Override
public synchronized void close() { public synchronized void close() {
TraceUtil.beginSection("CronetDataSource.close"); if (currentUrlRequest != null) {
currentUrlRequest.cancel();
currentUrlRequest = null;
}
currentDataSpec = null;
currentUrl = null;
exception = null;
contentLength = 0;
hasData = false;
responseInfo = null;
expectedBytesRemainingToRead = null;
responseFinished = false;
try { try {
if (currentUrlRequest != null) { if (listener != null && connectionState == OPEN_CONNECTION) {
currentUrlRequest.cancel(); listener.onTransferEnd(this);
currentUrlRequest = null;
}
readBuffer.clear();
currentDataSpec = null;
currentUrl = null;
exception = null;
contentLength = 0;
hasData = false;
responseInfo = null;
expectedBytesRemainingToRead = null;
responseFinished = false;
if (transferListener != null && connectionState == OPEN_CONNECTION) {
transferListener.onTransferEnd(this);
} }
} finally { } finally {
connectionState = IDLE_CONNECTION; connectionState = IDLE_CONNECTION;
TraceUtil.endSection();
} }
} }

View File

@ -80,7 +80,9 @@ public final class FileDataSource implements DataSource {
@Override @Override
public int read(byte[] buffer, int offset, int readLength) throws FileDataSourceException { public int read(byte[] buffer, int offset, int readLength) throws FileDataSourceException {
if (bytesRemaining == 0) { if (readLength == 0) {
return 0;
} else if (bytesRemaining == 0) {
return C.RESULT_END_OF_INPUT; return C.RESULT_END_OF_INPUT;
} else { } else {
int bytesRead; int bytesRead;

View File

@ -129,6 +129,10 @@ public final class UdpDataSource implements DataSource {
@Override @Override
public int read(byte[] buffer, int offset, int readLength) throws UdpDataSourceException { public int read(byte[] buffer, int offset, int readLength) throws UdpDataSourceException {
if (readLength == 0) {
return 0;
}
if (packetRemaining == 0) { if (packetRemaining == 0) {
// We've read all of the data from the current packet. Get another. // We've read all of the data from the current packet. Get another.
try { try {
@ -136,7 +140,6 @@ public final class UdpDataSource implements DataSource {
} catch (IOException e) { } catch (IOException e) {
throw new UdpDataSourceException(e); throw new UdpDataSourceException(e);
} }
packetRemaining = packet.getLength(); packetRemaining = packet.getLength();
if (listener != null) { if (listener != null) {
listener.onBytesTransferred(this, packetRemaining); listener.onBytesTransferred(this, packetRemaining);

View File

@ -194,12 +194,15 @@ public final class CacheDataSource implements DataSource {
} }
@Override @Override
public int read(byte[] buffer, int offset, int max) throws IOException { public int read(byte[] buffer, int offset, int readLength) throws IOException {
if (readLength == 0) {
return 0;
}
if (bytesRemaining == 0) { if (bytesRemaining == 0) {
return C.RESULT_END_OF_INPUT; return C.RESULT_END_OF_INPUT;
} }
try { try {
int bytesRead = currentDataSource.read(buffer, offset, max); int bytesRead = currentDataSource.read(buffer, offset, readLength);
if (bytesRead >= 0) { if (bytesRead >= 0) {
if (currentDataSource == cacheReadDataSource) { if (currentDataSource == cacheReadDataSource) {
totalCachedBytesRead += bytesRead; totalCachedBytesRead += bytesRead;
@ -218,7 +221,7 @@ public final class CacheDataSource implements DataSource {
closeCurrentSource(); closeCurrentSource();
if (bytesRemaining > 0 || bytesRemaining == C.LENGTH_UNSET) { if (bytesRemaining > 0 || bytesRemaining == C.LENGTH_UNSET) {
if (openNextSource(false)) { if (openNextSource(false)) {
return read(buffer, offset, max); return read(buffer, offset, readLength);
} }
} }
} }