Enhance Cache/SimpleCache.

1. Fix SimpleCache startReadWrite asymmetry. Allow more concurrency.

- startReadWrite does not have the concept of a read lock. Once
  a cached span is returned, the caller can do whatever it likes
  for as long as it wants to. This allows a read to be performed
  in parallel with a write that starts after it.
- If there's an ongoing write, startReadWrite will block even if
  the return operation will be a read. So there's a weird asymmetry
  where reads can happen in parallel with writes, but only if the
  reads were started first.
- This CL removes the asymmetry, by allowing a read to start even
  if the write lock is held.
- Note that the reader needs to be prepared for the thing it's
  reading to disappear, but this was already the case, and will
  always be the case since the reader will need to handle disk
  read failures anyway.

2. Add isCached method.
This commit is contained in:
Oliver Woodman 2014-07-04 00:29:18 +01:00
parent 273fad845e
commit ea796f916c
2 changed files with 75 additions and 18 deletions

View File

@ -134,9 +134,8 @@ public interface Cache {
* @param key The key of the data being requested.
* @param position The position of the data being requested.
* @return The {@link CacheSpan}. Or null if the cache entry is locked.
* @throws InterruptedException
*/
CacheSpan startReadWriteNonBlocking(String key, long position) throws InterruptedException;
CacheSpan startReadWriteNonBlocking(String key, long position);
/**
* Obtains a cache file into which data can be written. Must only be called when holding a
@ -173,4 +172,14 @@ public interface Cache {
*/
void removeSpan(CacheSpan span);
/**
* Queries if a range is entirely available in the cache.
*
* @param key The cache key for the data.
* @param position The starting position of the data.
* @param length The length of the data.
* @return true if the data is available in the Cache otherwise false;
*/
boolean isCached(String key, long position, long length);
}

View File

@ -109,26 +109,29 @@ public class SimpleCache implements Cache {
public synchronized CacheSpan startReadWrite(String key, long position)
throws InterruptedException {
CacheSpan lookupSpan = CacheSpan.createLookup(key, position);
// Wait until no-one holds a lock for the key.
while (lockedSpans.containsKey(key)) {
while (true) {
CacheSpan span = startReadWriteNonBlocking(lookupSpan);
if (span != null) {
return span;
} else {
// Write case, lock not available. We'll be woken up when a locked span is released (if the
// released lock is for the requested key then we'll be able to make progress) or when a
// span is added to the cache (if the span is for the requested key and covers the requested
// position, then we'll become a read and be able to make progress).
wait();
}
return getSpanningRegion(key, lookupSpan);
}
}
@Override
public synchronized CacheSpan startReadWriteNonBlocking(String key, long position)
throws InterruptedException {
CacheSpan lookupSpan = CacheSpan.createLookup(key, position);
// Return null if key is locked
if (lockedSpans.containsKey(key)) {
return null;
}
return getSpanningRegion(key, lookupSpan);
public synchronized CacheSpan startReadWriteNonBlocking(String key, long position) {
return startReadWriteNonBlocking(CacheSpan.createLookup(key, position));
}
private CacheSpan getSpanningRegion(String key, CacheSpan lookupSpan) {
private synchronized CacheSpan startReadWriteNonBlocking(CacheSpan lookupSpan) {
CacheSpan spanningRegion = getSpan(lookupSpan);
// Read case.
if (spanningRegion.isCached) {
CacheSpan oldCacheSpan = spanningRegion;
// Remove the old span from the in-memory representation.
@ -139,12 +142,19 @@ public class SimpleCache implements Cache {
// Add the updated span back into the in-memory representation.
spansForKey.add(spanningRegion);
notifySpanTouched(oldCacheSpan, spanningRegion);
} else {
lockedSpans.put(key, spanningRegion);
}
return spanningRegion;
}
// Write case, lock available.
if (!lockedSpans.containsKey(lookupSpan.key)) {
lockedSpans.put(lookupSpan.key, spanningRegion);
return spanningRegion;
}
// Write case, lock not available.
return null;
}
@Override
public synchronized File startFile(String key, long position, long length) {
Assertions.checkState(lockedSpans.containsKey(key));
@ -173,6 +183,7 @@ public class SimpleCache implements Cache {
return;
}
addSpan(span);
notifyAll();
}
@Override
@ -330,4 +341,41 @@ public class SimpleCache implements Cache {
evictor.onSpanTouched(this, oldSpan, newSpan);
}
@Override
public synchronized boolean isCached(String key, long position, long length) {
TreeSet<CacheSpan> entries = cachedSpans.get(key);
if (entries == null) {
return false;
}
CacheSpan lookupSpan = CacheSpan.createLookup(key, position);
CacheSpan floorSpan = entries.floor(lookupSpan);
if (floorSpan == null || floorSpan.position + floorSpan.length <= position) {
// We don't have a span covering the start of the queried region.
return false;
}
long queryEndPosition = position + length;
long currentEndPosition = floorSpan.position + floorSpan.length;
if (currentEndPosition >= queryEndPosition) {
// floorSpan covers the queried region.
return true;
}
Iterator<CacheSpan> iterator = entries.tailSet(floorSpan, false).iterator();
while (iterator.hasNext()) {
CacheSpan next = iterator.next();
if (next.position > currentEndPosition) {
// There's a hole in the cache within the queried region.
return false;
}
// We expect currentEndPosition to always equal (next.position + next.length), but
// perform a max check anyway to guard against the existence of overlapping spans.
currentEndPosition = Math.max(currentEndPosition, next.position + next.length);
if (currentEndPosition >= queryEndPosition) {
// We've found spans covering the queried region.
return true;
}
}
// We ran out of spans before covering the queried region.
return false;
}
}