Skip to content

Commit

Permalink
Fix NullPointerException and 'no more capacity' issues in blob store (#…
Browse files Browse the repository at this point in the history
…1168)

1. NPE occurs when error message of captured IOException is null. In
this case, we should use Objects.equals to compare error string.
2. 'no more capacity' issue occurs when new log segment creation failed,
however, the code didn't restore the counter of remainingUnallocatedSegments.
This patch will catch such failure and restore counter as well as clean
up the allocted file.
  • Loading branch information
jsjtzyy authored and cgtz committed May 6, 2019
1 parent 7f82832 commit 359a910
Show file tree
Hide file tree
Showing 11 changed files with 110 additions and 49 deletions.
5 changes: 3 additions & 2 deletions ambry-api/src/test/java/com.github.ambry/store/MockWrite.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.Objects;


/**
Expand Down Expand Up @@ -53,8 +54,8 @@ public void appendFrom(ReadableByteChannel channel, long size) throws StoreExcep
} catch (IOException e) {
// The IOException message may vary in different java versions. As code evolves, we may need to update IO_ERROR_STR
// in StoreException (based on java version that is being employed) to correctly capture disk I/O related errors.
StoreErrorCodes errorCode =
e.getMessage().equals(StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError : StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode = Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
throw new StoreException(errorCode.toString() + " while writing into store", e, errorCode);
}
buf.limit(savedLimit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;

import static com.github.ambry.replication.ReplicationTest.*;
Expand Down Expand Up @@ -123,8 +124,9 @@ public void appendFrom(ReadableByteChannel channel, long size) throws StoreExcep
sizeRead += channel.read(buf);
}
} catch (IOException e) {
StoreErrorCodes errorCode = e.getMessage().equals(StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode =
Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
throw new StoreException(errorCode.toString() + " while writing into dummy log", e, errorCode);
}
buf.flip();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1128,8 +1128,8 @@ public void replicaTokenTest() throws InterruptedException {
Time time = new MockTime();
MockFindToken token1 = new MockFindToken(0, 0);
RemoteReplicaInfo remoteReplicaInfo = new RemoteReplicaInfo(new MockReplicaId(), new MockReplicaId(),
new InMemoryStore(null, Collections.emptyList(), Collections.emptyList(), null), token1, tokenPersistInterval, time,
new Port(5000, PortType.PLAINTEXT));
new InMemoryStore(null, Collections.emptyList(), Collections.emptyList(), null), token1, tokenPersistInterval,
time, new Port(5000, PortType.PLAINTEXT));

// The equality check is for the reference, which is fine.
// Initially, the current token and the token to persist are the same.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -234,8 +235,8 @@ void performRecovery() throws StoreException {
}
} catch (IOException e) {
metrics.hardDeleteExceptionsCount.inc();
StoreErrorCodes errorCode =
e.getMessage().equals(StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError : StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode = Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
throw new StoreException(errorCode.toString() + " occurred while performing hard delete ", e, errorCode);
}
/* Now that all the blobs in the range were successfully hard deleted, the next time hard deletes can be resumed
Expand Down Expand Up @@ -540,8 +541,8 @@ private void persistCleanupToken() throws IOException, StoreException {
fileStream.getChannel().force(true);
tempFile.renameTo(actual);
} catch (IOException e) {
StoreErrorCodes errorCode =
e.getMessage().equals(StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError : StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode = Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
throw new StoreException(
errorCode.toString() + " while persisting cleanup tokens to disk " + tempFile.getAbsoluteFile(), errorCode);
} finally {
Expand Down Expand Up @@ -621,8 +622,8 @@ private void performHardDeletes(List<MessageInfo> messageInfoList) throws StoreE
diskIOScheduler.getSlice(HARD_DELETE_CLEANUP_JOB_NAME, HARD_DELETE_CLEANUP_JOB_NAME, logWriteInfo.size);
}
} catch (IOException e) {
StoreErrorCodes errorCode =
e.getMessage().equals(StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError : StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode = Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
throw new StoreException(errorCode.toString() + " while performing hard delete ", e, errorCode);
}
logger.trace("Performed hard deletes from {} to {} for {}", startToken, endToken, dataDir);
Expand Down
22 changes: 12 additions & 10 deletions ambry-store/src/main/java/com.github.ambry.store/IndexSegment.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListMap;
Expand Down Expand Up @@ -400,8 +401,8 @@ private void persistBloomFilter() throws StoreException {
stream.writeLong(crcValue);
stream.close();
} catch (IOException e) {
StoreErrorCodes errorCode =
e.getMessage().equals(StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError : StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode = Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
throw new StoreException(errorCode.toString() + " while trying to persist bloom filter", e, errorCode);
}
}
Expand Down Expand Up @@ -448,12 +449,12 @@ private StoreKey getKeyAt(ByteBuffer mmap, int index) throws StoreException {
mmap.position(firstKeyRelativeOffset + index * persistedEntrySize);
storeKey = factory.getStoreKey(new DataInputStream(new ByteBufferInputStream(mmap)));
} catch (InternalError e) {
throw e.getMessage().equals(StoreException.INTERNAL_ERROR_STR) ? new StoreException(
throw Objects.equals(e.getMessage(), StoreException.INTERNAL_ERROR_STR) ? new StoreException(
"Internal error occurred due to unsafe memory access", e, StoreErrorCodes.IOError)
: new StoreException("Unknown internal error while trying to get store key", e,
StoreErrorCodes.Unknown_Error);
} catch (IOException e) {
throw e.getMessage().equals(StoreException.IO_ERROR_STR) ? new StoreException(
throw Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? new StoreException(
"IO error while trying to get store key", e, StoreErrorCodes.IOError)
: new StoreException("Unknown IO error while trying to get store key", e, StoreErrorCodes.Unknown_Error);
} catch (Throwable t) {
Expand Down Expand Up @@ -700,8 +701,9 @@ void writeIndexSegmentToFile(Offset safeEndPoint) throws FileNotFoundException,
// swap temp file with the original file
temp.renameTo(getFile());
} catch (IOException e) {
StoreErrorCodes errorCode = e.getMessage().equals(StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode =
Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
throw new StoreException(
"IndexSegment : " + indexFile.getAbsolutePath() + " encountered " + errorCode.toString()
+ " while persisting index to disk", e, errorCode);
Expand Down Expand Up @@ -794,8 +796,8 @@ private void map() throws StoreException {
} catch (FileNotFoundException e) {
throw new StoreException("File not found while mapping the segment of index", e, StoreErrorCodes.File_Not_Found);
} catch (IOException e) {
StoreErrorCodes errorCode =
e.getMessage().equals(StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError : StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode = Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
throw new StoreException(errorCode.toString() + " while mapping the segment of index", e, errorCode);
} finally {
rwLock.writeLock().unlock();
Expand Down Expand Up @@ -906,8 +908,8 @@ private void readFromFile(File fileToRead, Journal journal) throws StoreExceptio
StoreErrorCodes.Index_Creation_Failure);
}
} catch (IOException e) {
StoreErrorCodes errorCode =
e.getMessage().equals(StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError : StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode = Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
throw new StoreException("IndexSegment : " + indexFile.getAbsolutePath() + " encountered " + errorCode.toString()
+ " while reading from file ", e, errorCode);
}
Expand Down
35 changes: 27 additions & 8 deletions ambry-store/src/main/java/com.github.ambry.store/Log.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -326,7 +327,7 @@ private LogSegment checkArgsAndGetFirstSegment(long segmentCapacity) throws Stor
* Creates {@link LogSegment} instances from {@code segmentFiles}.
* @param segmentFiles the files that form the segments of the log.
* @return {@code List} of {@link LogSegment} instances corresponding to {@code segmentFiles}.
* @throws IOException if there is an I/O error loading the segment files or creating {@link LogSegment} instances.
* @throws StoreException if there is an I/O error loading the segment files or creating {@link LogSegment} instances.
*/
private List<LogSegment> getSegmentsToLoad(File[] segmentFiles) throws StoreException {
List<LogSegment> segments = new ArrayList<>(segmentFiles.length);
Expand Down Expand Up @@ -391,8 +392,9 @@ private File allocate(String filename, long size) throws StoreException {
try {
diskSpaceAllocator.allocate(segmentFile, size);
} catch (IOException e) {
StoreErrorCodes errorCode = e.getMessage().equals(StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode =
Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
throw new StoreException(errorCode.toString() + " while allocating the file", e, errorCode);
}
}
Expand All @@ -410,8 +412,8 @@ private void free(LogSegment logSegment) throws StoreException {
logSegment.close();
diskSpaceAllocator.free(segmentFile, logSegment.getCapacityInBytes());
} catch (IOException e) {
StoreErrorCodes errorCode =
e.getMessage().equals(StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError : StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode = Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
throw new StoreException(errorCode.toString() + " while freeing log segment", e, errorCode);
}
}
Expand Down Expand Up @@ -463,9 +465,26 @@ private void ensureCapacity(long writeSize) throws StoreException {
Pair<String, String> segmentNameAndFilename = getNextSegmentNameAndFilename();
logger.info("Allocating new segment with name: " + segmentNameAndFilename.getFirst());
File newSegmentFile = allocate(segmentNameAndFilename.getSecond(), segmentCapacity);
LogSegment newSegment =
new LogSegment(segmentNameAndFilename.getFirst(), newSegmentFile, segmentCapacity, metrics, true);
segmentsByName.put(segmentNameAndFilename.getFirst(), newSegment);
try {
LogSegment newSegment =
new LogSegment(segmentNameAndFilename.getFirst(), newSegmentFile, segmentCapacity, metrics, true);
segmentsByName.put(segmentNameAndFilename.getFirst(), newSegment);
} catch (StoreException e) {
logger.error("Failed to create new log segment {} with store exception: ", segmentNameAndFilename.getFirst(), e);
try {
diskSpaceAllocator.free(newSegmentFile, segmentCapacity);
remainingUnallocatedSegments.incrementAndGet();
} catch (IOException exception) {
StoreErrorCodes errorCode =
Objects.equals(exception.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
throw new StoreException(
errorCode.toString() + " while freeing log segment " + segmentNameAndFilename.getFirst(), exception,
errorCode);
} finally {
metrics.overflowWriteError.inc();
}
}
}

/**
Expand Down
22 changes: 12 additions & 10 deletions ambry-store/src/main/java/com.github.ambry.store/LogSegment.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -135,8 +136,8 @@ class LogSegment implements Read, Write {
} catch (FileNotFoundException e) {
throw new StoreException("File not found while creating log segment", e, StoreErrorCodes.File_Not_Found);
} catch (IOException e) {
StoreErrorCodes errorCode =
e.getMessage().equals(StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError : StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode = Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
throw new StoreException(errorCode.toString() + " while creating log segment", e, errorCode);
}
}
Expand Down Expand Up @@ -164,8 +165,8 @@ public int appendFrom(ByteBuffer buffer) throws StoreException {
} catch (ClosedChannelException e) {
throw new StoreException("Channel closed while writing into the log segment", e, StoreErrorCodes.Channel_Closed);
} catch (IOException e) {
StoreErrorCodes errorCode =
e.getMessage().equals(StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError : StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode = Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
throw new StoreException(errorCode.toString() + " while writing into the log segment", e, errorCode);
}
endOffset.addAndGet(bytesWritten);
Expand Down Expand Up @@ -209,8 +210,9 @@ public void appendFrom(ReadableByteChannel channel, long size) throws StoreExcep
bytesWritten += fileChannel.write(byteBufferForAppend, endOffset.get() + bytesWritten);
}
} catch (IOException e) {
StoreErrorCodes errorCode = e.getMessage().equals(StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode =
Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
throw new StoreException(errorCode.toString() + " while writing into the log segment", e, errorCode);
}
}
Expand Down Expand Up @@ -239,8 +241,8 @@ int appendFromDirectly(byte[] byteArray, int offset, int length) throws StoreExc
directFile.write(byteArray, offset, length);
endOffset.addAndGet(length);
} catch (IOException e) {
StoreErrorCodes errorCode =
e.getMessage().equals(StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError : StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode = Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
throw new StoreException(errorCode.toString() + " while writing into segment via direct IO", e, errorCode);
}
return length;
Expand Down Expand Up @@ -422,8 +424,8 @@ void setEndOffset(long endOffset) throws StoreException {
fileChannel.position(endOffset);
this.endOffset.set(endOffset);
} catch (IOException e) {
StoreErrorCodes errorCode =
e.getMessage().equals(StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError : StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode = Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
throw new StoreException(errorCode.toString() + " while setting end offset of segment", e, errorCode);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.ListIterator;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
Expand Down Expand Up @@ -828,8 +829,8 @@ private BlobReadOptions getDeletedBlobReadOptions(IndexValue value, StoreKey key
+ "]", StoreErrorCodes.ID_Deleted);
}
} catch (IOException e) {
StoreErrorCodes errorCode =
e.getMessage().equals(StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError : StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode = Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
throw new StoreException(errorCode.toString() + " when reading delete blob info from the log " + dataDir, e,
errorCode);
}
Expand Down Expand Up @@ -1661,8 +1662,9 @@ public synchronized void write() throws StoreException {
} catch (FileNotFoundException e) {
throw new StoreException("File not found while writing index to file", e, StoreErrorCodes.File_Not_Found);
} catch (IOException e) {
StoreErrorCodes errorCode = e.getMessage().equals(StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode =
Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
throw new StoreException(errorCode.toString() + " while persisting index to disk", e, errorCode);
} finally {
context.stop();
Expand Down
Loading

0 comments on commit 359a910

Please sign in to comment.