Add CacheDataSource.Listener.

This commit is contained in:
Oliver Woodman 2014-08-01 15:54:32 +01:00
parent b1992c3848
commit 41ff1e4071

View File

@ -34,10 +34,26 @@ import java.io.IOException;
*/ */
public final class CacheDataSource implements DataSource { public final class CacheDataSource implements DataSource {
/**
* Interface definition for a callback to be notified of {@link CacheDataSource} events.
*/
public interface EventListener {
/**
* Invoked when bytes have been read from {@link #cache} since the last invocation.
*
* @param cacheSizeBytes Current cache size in bytes.
* @param cachedBytesRead Total bytes read from {@link #cache} since last report.
*/
void onCachedBytesRead(long cacheSizeBytes, long cachedBytesRead);
}
private final Cache cache; private final Cache cache;
private final DataSource cacheReadDataSource; private final DataSource cacheReadDataSource;
private final DataSource cacheWriteDataSource; private final DataSource cacheWriteDataSource;
private final DataSource upstreamDataSource; private final DataSource upstreamDataSource;
private final EventListener eventListener;
private final boolean blockOnCache; private final boolean blockOnCache;
private final boolean ignoreCacheOnError; private final boolean ignoreCacheOnError;
@ -49,6 +65,7 @@ public final class CacheDataSource implements DataSource {
private long bytesRemaining; private long bytesRemaining;
private CacheSpan lockedSpan; private CacheSpan lockedSpan;
private boolean ignoreCache; private boolean ignoreCache;
private long totalCachedBytesRead;
/** /**
* Constructs an instance with default {@link DataSource} and {@link DataSink} instances for * Constructs an instance with default {@link DataSource} and {@link DataSink} instances for
@ -67,7 +84,7 @@ public final class CacheDataSource implements DataSource {
public CacheDataSource(Cache cache, DataSource upstream, boolean blockOnCache, public CacheDataSource(Cache cache, DataSource upstream, boolean blockOnCache,
boolean ignoreCacheOnError, long maxCacheFileSize) { boolean ignoreCacheOnError, long maxCacheFileSize) {
this(cache, upstream, new FileDataSource(), new CacheDataSink(cache, maxCacheFileSize), this(cache, upstream, new FileDataSource(), new CacheDataSink(cache, maxCacheFileSize),
blockOnCache, ignoreCacheOnError); blockOnCache, ignoreCacheOnError, null);
} }
/** /**
@ -84,9 +101,11 @@ public final class CacheDataSource implements DataSource {
* @param ignoreCacheOnError Whether the cache is bypassed following any cache related error. If * @param ignoreCacheOnError Whether the cache is bypassed following any cache related error. If
* true, then cache related exceptions may be thrown for one cycle of open, read and close * true, then cache related exceptions may be thrown for one cycle of open, read and close
* calls. Subsequent cycles of these calls will then bypass the cache. * calls. Subsequent cycles of these calls will then bypass the cache.
* @param eventListener An optional {@link EventListener} to receive events.
*/ */
public CacheDataSource(Cache cache, DataSource upstream, DataSource cacheReadDataSource, public CacheDataSource(Cache cache, DataSource upstream, DataSource cacheReadDataSource,
DataSink cacheWriteDataSink, boolean blockOnCache, boolean ignoreCacheOnError) { DataSink cacheWriteDataSink, boolean blockOnCache, boolean ignoreCacheOnError,
EventListener eventListener) {
this.cache = cache; this.cache = cache;
this.cacheReadDataSource = cacheReadDataSource; this.cacheReadDataSource = cacheReadDataSource;
this.blockOnCache = blockOnCache; this.blockOnCache = blockOnCache;
@ -97,6 +116,7 @@ public final class CacheDataSource implements DataSource {
} else { } else {
this.cacheWriteDataSource = null; this.cacheWriteDataSource = null;
} }
this.eventListener = eventListener;
} }
@Override @Override
@ -121,10 +141,13 @@ public final class CacheDataSource implements DataSource {
@Override @Override
public int read(byte[] buffer, int offset, int max) throws IOException { public int read(byte[] buffer, int offset, int max) throws IOException {
try { try {
int num = currentDataSource.read(buffer, offset, max); int bytesRead = currentDataSource.read(buffer, offset, max);
if (num >= 0) { if (bytesRead >= 0) {
readPosition += num; if (currentDataSource == cacheReadDataSource) {
bytesRemaining -= num; totalCachedBytesRead += bytesRead;
}
readPosition += bytesRead;
bytesRemaining -= bytesRead;
} else { } else {
closeCurrentSource(); closeCurrentSource();
if (bytesRemaining > 0) { if (bytesRemaining > 0) {
@ -132,7 +155,7 @@ public final class CacheDataSource implements DataSource {
return read(buffer, offset, max); return read(buffer, offset, max);
} }
} }
return num; return bytesRead;
} catch (IOException e) { } catch (IOException e) {
handleBeforeThrow(e); handleBeforeThrow(e);
throw e; throw e;
@ -141,6 +164,7 @@ public final class CacheDataSource implements DataSource {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
notifyBytesRead();
try { try {
closeCurrentSource(); closeCurrentSource();
} catch (IOException e) { } catch (IOException e) {
@ -215,4 +239,11 @@ public final class CacheDataSource implements DataSource {
} }
} }
private void notifyBytesRead() {
if (eventListener != null && totalCachedBytesRead > 0) {
eventListener.onCachedBytesRead(cache.getCacheSpace(), totalCachedBytesRead);
totalCachedBytesRead = 0;
}
}
} }