Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix NullPointerException and 'no more capacity' issues in blob store #1168

Merged
merged 2 commits into from
May 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions ambry-api/src/test/java/com.github.ambry/store/MockWrite.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.Objects;


/**
Expand Down Expand Up @@ -53,8 +54,8 @@ public void appendFrom(ReadableByteChannel channel, long size) throws StoreExcep
} catch (IOException e) {
// The IOException message may vary in different java versions. As code evolves, we may need to update IO_ERROR_STR
// in StoreException (based on java version that is being employed) to correctly capture disk I/O related errors.
StoreErrorCodes errorCode =
e.getMessage().equals(StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError : StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode = Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
throw new StoreException(errorCode.toString() + " while writing into store", e, errorCode);
}
buf.limit(savedLimit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;

import static com.github.ambry.replication.ReplicationTest.*;
Expand Down Expand Up @@ -123,8 +124,9 @@ public void appendFrom(ReadableByteChannel channel, long size) throws StoreExcep
sizeRead += channel.read(buf);
}
} catch (IOException e) {
StoreErrorCodes errorCode = e.getMessage().equals(StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode =
Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
throw new StoreException(errorCode.toString() + " while writing into dummy log", e, errorCode);
}
buf.flip();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1128,8 +1128,8 @@ public void replicaTokenTest() throws InterruptedException {
Time time = new MockTime();
MockFindToken token1 = new MockFindToken(0, 0);
RemoteReplicaInfo remoteReplicaInfo = new RemoteReplicaInfo(new MockReplicaId(), new MockReplicaId(),
new InMemoryStore(null, Collections.emptyList(), Collections.emptyList(), null), token1, tokenPersistInterval, time,
new Port(5000, PortType.PLAINTEXT));
new InMemoryStore(null, Collections.emptyList(), Collections.emptyList(), null), token1, tokenPersistInterval,
time, new Port(5000, PortType.PLAINTEXT));

// The equality check is for the reference, which is fine.
// Initially, the current token and the token to persist are the same.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -234,8 +235,8 @@ void performRecovery() throws StoreException {
}
} catch (IOException e) {
metrics.hardDeleteExceptionsCount.inc();
StoreErrorCodes errorCode =
e.getMessage().equals(StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError : StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode = Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
throw new StoreException(errorCode.toString() + " occurred while performing hard delete ", e, errorCode);
}
/* Now that all the blobs in the range were successfully hard deleted, the next time hard deletes can be resumed
Expand Down Expand Up @@ -540,8 +541,8 @@ private void persistCleanupToken() throws IOException, StoreException {
fileStream.getChannel().force(true);
tempFile.renameTo(actual);
} catch (IOException e) {
StoreErrorCodes errorCode =
e.getMessage().equals(StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError : StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode = Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
throw new StoreException(
errorCode.toString() + " while persisting cleanup tokens to disk " + tempFile.getAbsoluteFile(), errorCode);
} finally {
Expand Down Expand Up @@ -621,8 +622,8 @@ private void performHardDeletes(List<MessageInfo> messageInfoList) throws StoreE
diskIOScheduler.getSlice(HARD_DELETE_CLEANUP_JOB_NAME, HARD_DELETE_CLEANUP_JOB_NAME, logWriteInfo.size);
}
} catch (IOException e) {
StoreErrorCodes errorCode =
e.getMessage().equals(StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError : StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode = Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
throw new StoreException(errorCode.toString() + " while performing hard delete ", e, errorCode);
}
logger.trace("Performed hard deletes from {} to {} for {}", startToken, endToken, dataDir);
Expand Down
22 changes: 12 additions & 10 deletions ambry-store/src/main/java/com.github.ambry.store/IndexSegment.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListMap;
Expand Down Expand Up @@ -400,8 +401,8 @@ private void persistBloomFilter() throws StoreException {
stream.writeLong(crcValue);
stream.close();
} catch (IOException e) {
StoreErrorCodes errorCode =
e.getMessage().equals(StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError : StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode = Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
throw new StoreException(errorCode.toString() + " while trying to persist bloom filter", e, errorCode);
}
}
Expand Down Expand Up @@ -448,12 +449,12 @@ private StoreKey getKeyAt(ByteBuffer mmap, int index) throws StoreException {
mmap.position(firstKeyRelativeOffset + index * persistedEntrySize);
storeKey = factory.getStoreKey(new DataInputStream(new ByteBufferInputStream(mmap)));
} catch (InternalError e) {
throw e.getMessage().equals(StoreException.INTERNAL_ERROR_STR) ? new StoreException(
throw Objects.equals(e.getMessage(), StoreException.INTERNAL_ERROR_STR) ? new StoreException(
"Internal error occurred due to unsafe memory access", e, StoreErrorCodes.IOError)
: new StoreException("Unknown internal error while trying to get store key", e,
StoreErrorCodes.Unknown_Error);
} catch (IOException e) {
throw e.getMessage().equals(StoreException.IO_ERROR_STR) ? new StoreException(
throw Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? new StoreException(
"IO error while trying to get store key", e, StoreErrorCodes.IOError)
: new StoreException("Unknown IO error while trying to get store key", e, StoreErrorCodes.Unknown_Error);
} catch (Throwable t) {
Expand Down Expand Up @@ -700,8 +701,9 @@ void writeIndexSegmentToFile(Offset safeEndPoint) throws FileNotFoundException,
// swap temp file with the original file
temp.renameTo(getFile());
} catch (IOException e) {
StoreErrorCodes errorCode = e.getMessage().equals(StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode =
Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
throw new StoreException(
"IndexSegment : " + indexFile.getAbsolutePath() + " encountered " + errorCode.toString()
+ " while persisting index to disk", e, errorCode);
Expand Down Expand Up @@ -794,8 +796,8 @@ private void map() throws StoreException {
} catch (FileNotFoundException e) {
throw new StoreException("File not found while mapping the segment of index", e, StoreErrorCodes.File_Not_Found);
} catch (IOException e) {
StoreErrorCodes errorCode =
e.getMessage().equals(StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError : StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode = Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
throw new StoreException(errorCode.toString() + " while mapping the segment of index", e, errorCode);
} finally {
rwLock.writeLock().unlock();
Expand Down Expand Up @@ -906,8 +908,8 @@ private void readFromFile(File fileToRead, Journal journal) throws StoreExceptio
StoreErrorCodes.Index_Creation_Failure);
}
} catch (IOException e) {
StoreErrorCodes errorCode =
e.getMessage().equals(StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError : StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode = Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic for going from IOException to StoreErrorCode is repeated in a lot of places. Since this is a bugfix, you don't have to address it now, but it would be good to move this into a static method in StoreException in the future.

: StoreErrorCodes.Unknown_Error;
throw new StoreException("IndexSegment : " + indexFile.getAbsolutePath() + " encountered " + errorCode.toString()
+ " while reading from file ", e, errorCode);
}
Expand Down
35 changes: 27 additions & 8 deletions ambry-store/src/main/java/com.github.ambry.store/Log.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -326,7 +327,7 @@ private LogSegment checkArgsAndGetFirstSegment(long segmentCapacity) throws Stor
* Creates {@link LogSegment} instances from {@code segmentFiles}.
* @param segmentFiles the files that form the segments of the log.
* @return {@code List} of {@link LogSegment} instances corresponding to {@code segmentFiles}.
* @throws IOException if there is an I/O error loading the segment files or creating {@link LogSegment} instances.
* @throws StoreException if there is an I/O error loading the segment files or creating {@link LogSegment} instances.
*/
private List<LogSegment> getSegmentsToLoad(File[] segmentFiles) throws StoreException {
List<LogSegment> segments = new ArrayList<>(segmentFiles.length);
Expand Down Expand Up @@ -391,8 +392,9 @@ private File allocate(String filename, long size) throws StoreException {
try {
diskSpaceAllocator.allocate(segmentFile, size);
} catch (IOException e) {
StoreErrorCodes errorCode = e.getMessage().equals(StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode =
Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
throw new StoreException(errorCode.toString() + " while allocating the file", e, errorCode);
}
}
Expand All @@ -410,8 +412,8 @@ private void free(LogSegment logSegment) throws StoreException {
logSegment.close();
diskSpaceAllocator.free(segmentFile, logSegment.getCapacityInBytes());
} catch (IOException e) {
StoreErrorCodes errorCode =
e.getMessage().equals(StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError : StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode = Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
throw new StoreException(errorCode.toString() + " while freeing log segment", e, errorCode);
}
}
Expand Down Expand Up @@ -463,9 +465,26 @@ private void ensureCapacity(long writeSize) throws StoreException {
Pair<String, String> segmentNameAndFilename = getNextSegmentNameAndFilename();
logger.info("Allocating new segment with name: " + segmentNameAndFilename.getFirst());
File newSegmentFile = allocate(segmentNameAndFilename.getSecond(), segmentCapacity);
LogSegment newSegment =
new LogSegment(segmentNameAndFilename.getFirst(), newSegmentFile, segmentCapacity, metrics, true);
segmentsByName.put(segmentNameAndFilename.getFirst(), newSegment);
try {
LogSegment newSegment =
new LogSegment(segmentNameAndFilename.getFirst(), newSegmentFile, segmentCapacity, metrics, true);
segmentsByName.put(segmentNameAndFilename.getFirst(), newSegment);
} catch (StoreException e) {
logger.error("Failed to create new log segment {} with store exception: ", segmentNameAndFilename.getFirst(), e);
try {
diskSpaceAllocator.free(newSegmentFile, segmentCapacity);
remainingUnallocatedSegments.incrementAndGet();
} catch (IOException exception) {
StoreErrorCodes errorCode =
Objects.equals(exception.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
throw new StoreException(
errorCode.toString() + " while freeing log segment " + segmentNameAndFilename.getFirst(), exception,
errorCode);
} finally {
metrics.overflowWriteError.inc();
}
}
}

/**
Expand Down
22 changes: 12 additions & 10 deletions ambry-store/src/main/java/com.github.ambry.store/LogSegment.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -135,8 +136,8 @@ class LogSegment implements Read, Write {
} catch (FileNotFoundException e) {
throw new StoreException("File not found while creating log segment", e, StoreErrorCodes.File_Not_Found);
} catch (IOException e) {
StoreErrorCodes errorCode =
e.getMessage().equals(StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError : StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode = Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
throw new StoreException(errorCode.toString() + " while creating log segment", e, errorCode);
}
}
Expand Down Expand Up @@ -164,8 +165,8 @@ public int appendFrom(ByteBuffer buffer) throws StoreException {
} catch (ClosedChannelException e) {
throw new StoreException("Channel closed while writing into the log segment", e, StoreErrorCodes.Channel_Closed);
} catch (IOException e) {
StoreErrorCodes errorCode =
e.getMessage().equals(StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError : StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode = Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
throw new StoreException(errorCode.toString() + " while writing into the log segment", e, errorCode);
}
endOffset.addAndGet(bytesWritten);
Expand Down Expand Up @@ -209,8 +210,9 @@ public void appendFrom(ReadableByteChannel channel, long size) throws StoreExcep
bytesWritten += fileChannel.write(byteBufferForAppend, endOffset.get() + bytesWritten);
}
} catch (IOException e) {
StoreErrorCodes errorCode = e.getMessage().equals(StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode =
Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
throw new StoreException(errorCode.toString() + " while writing into the log segment", e, errorCode);
}
}
Expand Down Expand Up @@ -239,8 +241,8 @@ int appendFromDirectly(byte[] byteArray, int offset, int length) throws StoreExc
directFile.write(byteArray, offset, length);
endOffset.addAndGet(length);
} catch (IOException e) {
StoreErrorCodes errorCode =
e.getMessage().equals(StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError : StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode = Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
throw new StoreException(errorCode.toString() + " while writing into segment via direct IO", e, errorCode);
}
return length;
Expand Down Expand Up @@ -422,8 +424,8 @@ void setEndOffset(long endOffset) throws StoreException {
fileChannel.position(endOffset);
this.endOffset.set(endOffset);
} catch (IOException e) {
StoreErrorCodes errorCode =
e.getMessage().equals(StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError : StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode = Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
throw new StoreException(errorCode.toString() + " while setting end offset of segment", e, errorCode);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.ListIterator;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
Expand Down Expand Up @@ -828,8 +829,8 @@ private BlobReadOptions getDeletedBlobReadOptions(IndexValue value, StoreKey key
+ "]", StoreErrorCodes.ID_Deleted);
}
} catch (IOException e) {
StoreErrorCodes errorCode =
e.getMessage().equals(StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError : StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode = Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
throw new StoreException(errorCode.toString() + " when reading delete blob info from the log " + dataDir, e,
errorCode);
}
Expand Down Expand Up @@ -1661,8 +1662,9 @@ public synchronized void write() throws StoreException {
} catch (FileNotFoundException e) {
throw new StoreException("File not found while writing index to file", e, StoreErrorCodes.File_Not_Found);
} catch (IOException e) {
StoreErrorCodes errorCode = e.getMessage().equals(StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
StoreErrorCodes errorCode =
Objects.equals(e.getMessage(), StoreException.IO_ERROR_STR) ? StoreErrorCodes.IOError
: StoreErrorCodes.Unknown_Error;
throw new StoreException(errorCode.toString() + " while persisting index to disk", e, errorCode);
} finally {
context.stop();
Expand Down
Loading