Skip to content

Commit

Permalink
Support getting end position of last PUT in BlobStore (#1307)
Browse files Browse the repository at this point in the history
The PR introduces a new method in PersistentIndex to get end position of
last PUT record in BlobStore. Note that, the "last" PUT record is valid
only when the method is invoked. That is, there could be new PUTs
afterwards. The intention here is to help replica to check if its peer
replicas have caught up with latest PUT record when PUT is disabled on
this replica (this happens when current replica is being decommissioned)
  • Loading branch information
jsjtzyy authored and zzmao committed Nov 26, 2019
1 parent 1b68691 commit f35f3ea
Show file tree
Hide file tree
Showing 7 changed files with 234 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,14 @@ public void deleteStoreFiles() throws StoreException, IOException {
logger.info("All files of store {} are deleted", storeId);
}

/**
* @return return absolute end position of last PUT in current store when this method is invoked.
* @throws StoreException
*/
public long getEndPositionOfLastPut() throws StoreException {
return index.getAbsoluteEndPositionOfLastPut();
}

/**
* @return {@link ReplicaStatusDelegate} associated with this store
*/
Expand Down
40 changes: 40 additions & 0 deletions ambry-store/src/main/java/com.github.ambry.store/IndexSegment.java
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,46 @@ void seal() throws StoreException {
persistBloomFilter();
}

/**
* @return index value of last PUT record in this index segment. Return {@code null} if no PUT is found
*/
IndexValue getIndexValueOfLastPut() throws StoreException {
IndexValue indexValueOfLastPut = null;
if (sealed.get()) {
ByteBuffer readBuf = serEntries.duplicate();
int numOfIndexEntries = numberOfEntries(readBuf);
NavigableSet<IndexValue> values = new TreeSet<>();
for (int i = 0; i < numOfIndexEntries; i++) {
StoreKey key = getKeyAt(readBuf, i);
values.clear();
getAllValuesFromMmap(readBuf, key, i, numOfIndexEntries, values);
for (IndexValue indexValue : values) {
// FLAGS_DEFAULT_VALUE means PUT record
if (indexValue.getFlags() == IndexValue.FLAGS_DEFAULT_VALUE && (indexValueOfLastPut == null
|| indexValue.compareTo(indexValueOfLastPut) > 0)) {
indexValueOfLastPut = indexValue;
// note that values set contains all entries associated with specific key, so there are at most 3 entries in
// this set (one PUT, one TTL Update and one DELETE). Due to nature of log, PUT always comes first. And if we
// already find PUT, we can jump out of the inner loop.
break;
}
}
}
} else {
for (Map.Entry<StoreKey, ConcurrentSkipListSet<IndexValue>> entry : index.entrySet()) {
for (IndexValue indexValue : entry.getValue()) {
// FLAGS_DEFAULT_VALUE means PUT record
if (indexValue.getFlags() == IndexValue.FLAGS_DEFAULT_VALUE && (indexValueOfLastPut == null
|| indexValue.compareTo(indexValueOfLastPut) > 0)) {
indexValueOfLastPut = indexValue;
break;
}
}
}
}
return indexValueOfLastPut;
}

/**
* Maps the segment of index either as a memory map or a in memory buffer depending on config.
* @throws StoreException if there are problems with the index
Expand Down
45 changes: 45 additions & 0 deletions ambry-store/src/main/java/com.github.ambry.store/Journal.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
Expand All @@ -40,6 +42,23 @@ Offset getOffset() {
StoreKey getKey() {
return key;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
JournalEntry entry = (JournalEntry) o;
return this.offset == entry.offset && this.key == entry.key;
}

@Override
public int hashCode() {
return Objects.hash(offset, key);
}
}

/**
Expand Down Expand Up @@ -154,6 +173,24 @@ List<JournalEntry> getEntriesSince(Offset offset, boolean inclusive) {
return journalEntries;
}

/**
* @return all journal entries at the time when this method is invoked. (Return empty list if journal is empty)
*/
List<JournalEntry> getAllEntries() {
List<JournalEntry> result = new ArrayList<>();
// get current last Offset
Offset lastOffset = getLastOffset();
journal.entrySet();
if (lastOffset != null) {
// get portion view of journal whose key is less than or equal to lastOffset
NavigableMap<Offset, StoreKey> journalView = journal.headMap(lastOffset, true);
for (Map.Entry<Offset, StoreKey> entry : journalView.entrySet()) {
result.add(new JournalEntry(entry.getKey(), entry.getValue()));
}
}
return result;
}

/**
* @return the first/smallest offset in the journal or {@code null} if no such entry exists.
*/
Expand Down Expand Up @@ -202,6 +239,14 @@ void finishBootstrap() {
inBootstrapMode = false;
}

boolean isInBootstrapMode() {
return inBootstrapMode;
}

int getMaxEntriesToJournal() {
return maxEntriesToJournal;
}

/**
* @return the number of entries that is currently in the {@link Journal}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1063,6 +1063,48 @@ long getLogUsedCapacity() {
return getAbsolutePositionInLogForOffset(getCurrentEndOffset(indexSegments), indexSegments);
}

/**
* @return absolute end position (in bytes) of latest PUT record when this method is invoked. If no PUT is found in
* current store, -1 will be returned.
* @throws StoreException
*/
long getAbsoluteEndPositionOfLastPut() throws StoreException {
IndexValue indexValueOfLastPut = null;
// 1. go through keys in journal in reverse order to see if there is a PUT index entry associated with the key
List<JournalEntry> journalEntries = journal.getAllEntries();
ConcurrentSkipListMap<Offset, IndexSegment> indexSegments = validIndexSegments;
// check entry in reverse order (from the most recent one) to find last PUT record in index
for (int i = journalEntries.size() - 1; i >= 0; --i) {
JournalEntry entry = journalEntries.get(i);
indexValueOfLastPut = findKey(entry.getKey(), new FileSpan(entry.getOffset(), getCurrentEndOffset(indexSegments)),
EnumSet.of(IndexEntryType.PUT), indexSegments);
if (indexValueOfLastPut != null) {
break;
}
}
if (!journalEntries.isEmpty() && indexValueOfLastPut == null) {
// 2. if not find in the journal, check index segments starting from most recent one (until latest PUT is found)
JournalEntry firstJournalEntry = journalEntries.get(0);
// generate a segment map in reverse order
ConcurrentNavigableMap<Offset, IndexSegment> segmentsMapToSearch =
indexSegments.subMap(indexSegments.firstKey(), true, indexSegments.lowerKey(firstJournalEntry.getOffset()),
true).descendingMap();
for (Map.Entry<Offset, IndexSegment> entry : segmentsMapToSearch.entrySet()) {
indexValueOfLastPut = entry.getValue().getIndexValueOfLastPut();
if (indexValueOfLastPut != null) {
break;
}
}
}
if (indexValueOfLastPut == null) {
// if no PUT record is found in this store, return -1. This is possible when current store is a brand new store.
logger.info("No PUT record is found for store {}", dataDir);
return -1;
}
return getAbsolutePositionInLogForOffset(indexValueOfLastPut.getOffset(), indexSegments)
+ indexValueOfLastPut.getSize();
}

/**
* @return the number of valid log segments starting from the first segment.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ class CuratedLogIndexState {
private static final long SEGMENT_CAPACITY = 3000;
private static final long HARD_DELETE_START_OFFSET = 11;
private static final long HARD_DELETE_LAST_PART_SIZE = 13;
private static final int DEFAULT_MAX_IN_MEM_ELEMENTS = 5;

static final int DEFAULT_MAX_IN_MEM_ELEMENTS = 5;
static final DiskIOScheduler DISK_IO_SCHEDULER = new DiskIOScheduler(null);
static final long DELAY_BETWEEN_LAST_MODIFIED_TIMES_MS = 10 * Time.MsPerSec;
static final StoreKeyFactory STORE_KEY_FACTORY;
Expand Down Expand Up @@ -312,10 +312,9 @@ FileSpan makePermanent(MockId id, boolean forcePut) throws StoreException {
* Adds a delete entry in the index (real and reference) for {@code idToDelete}.
* @param idToDelete the id to be deleted.
* @return the {@link FileSpan} of the added entries.
* @throws IOException
* @throws StoreException
*/
FileSpan addDeleteEntry(MockId idToDelete) throws IOException, StoreException {
FileSpan addDeleteEntry(MockId idToDelete) throws StoreException {
return addDeleteEntry(idToDelete, null);
}

Expand Down Expand Up @@ -1004,11 +1003,10 @@ private void setupTestState(boolean isLogSegmented, long segmentCapacity, boolea
* @param expectedLogSegmentCount the number of log segments that are expected to assist after the addition of the
* first entry and at the end of the addition of all entries.
* @param addTtlUpdates if {@code true}, adds entries that update TTL.
* @throws IOException
* @throws StoreException
*/
private void addCuratedIndexEntriesToLogSegment(long sizeToMakeIndexEntriesFor, int expectedLogSegmentCount,
boolean addTtlUpdates) throws IOException, StoreException {
boolean addTtlUpdates) throws StoreException {
// First Index Segment
// 1 PUT
Offset firstJournalEntryAddedNow =
Expand Down
62 changes: 59 additions & 3 deletions ambry-store/src/test/java/com.github.ambry.store/IndexTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.github.ambry.config.StoreConfig;
import com.github.ambry.config.VerifiableProperties;
import com.github.ambry.replication.FindToken;
import com.github.ambry.utils.MockTime;
import com.github.ambry.utils.Pair;
import com.github.ambry.utils.SystemTime;
import com.github.ambry.utils.TestUtils;
Expand Down Expand Up @@ -385,11 +386,10 @@ public void hardDeletePauseResumeRestartTest() throws InterruptedException, IOEx

/**
* Tests that expired values are correctly handled.
* @throws IOException
* @throws StoreException
*/
@Test
public void expirationTest() throws IOException, StoreException {
public void expirationTest() throws StoreException {
// add a PUT entry that will expire if time advances
// advance time so that time moves to whole second with no residual milliseconds
state.time.sleep(Time.MsPerSec - state.time.milliseconds());
Expand Down Expand Up @@ -463,7 +463,6 @@ public void setEndOffsetsTest() throws StoreException {

/**
* Tests {@link PersistentIndex#changeIndexSegments(List, Set)} for good and bad cases.
* @throws IOException
* @throws StoreException
*/
@Test
Expand Down Expand Up @@ -530,6 +529,63 @@ public void changeIndexSegmentsTest() throws StoreException {
}
}

/**
* Test getting absolute end position of last PUT under different circumstances (i.e. last PUT is out of Journal)
* @throws Exception
*/
@Test
public void getAbsoluteEndPositionOfLastPutTest() throws Exception {
File testDir = StoreTestUtils.createTempDirectory("indexDirTest-" + UtilsTest.getRandomString(10));
CuratedLogIndexState indexState = new CuratedLogIndexState(isLogSegmented, testDir, false, false, true);
assertEquals("There should be no PUT record since index is empty", -1,
indexState.index.getAbsoluteEndPositionOfLastPut());
MockTime time = new MockTime();
long expiresAtMs = time.milliseconds() + TimeUnit.HOURS.toMillis(1);
// DEFAULT_MAX_IN_MEM_ELEMENTS = 5, here we put 5 entries into the log
List<IndexEntry> indexEntries = indexState.addPutEntries(5, PUT_RECORD_SIZE, expiresAtMs);
assertEquals("Number of index segments should be 1", 1, indexState.index.getIndexSegments().size());
// update ttl for above 5 entries and the ttl entries fall in second index segment.
for (IndexEntry entry : indexEntries) {
indexState.makePermanent((MockId) entry.getKey(), false);
}
assertEquals("Number of index segments (after ttl update) should be 2", 2,
indexState.index.getIndexSegments().size());
// default journal size should be 2 * DEFAULT_MAX_IN_MEM_ELEMENTS = 10
assertEquals("Journal size not expected", 2 * DEFAULT_MAX_IN_MEM_ELEMENTS,
indexState.index.journal.getAllEntries().size());
// delete above 5 entries, these delete entries should fall in third index segment
for (IndexEntry entry : indexEntries) {
indexState.addDeleteEntry((MockId) entry.getKey());
}
assertEquals("Number of index segments (after deletion) should be 3", 3,
indexState.index.getIndexSegments().size());
// although there are now 15 entries in total, journal size is still 10 (old entries are removed from journal)
assertEquals("Journal size (after deletion) shouldn't change", 2 * DEFAULT_MAX_IN_MEM_ELEMENTS,
indexState.index.journal.getAllEntries().size());
// after deletion, the last PUT record falls in first segment (out of journal)
// for segmented log, there is a header size = 18
assertEquals("Absolute end position of last PUT record not expected",
5 * PUT_RECORD_SIZE + (isLogSegmented ? 18 : 0), indexState.index.getAbsoluteEndPositionOfLastPut());
// close the index to seal all index segments
indexState.index.close(false);
// get end position of last PUT again, it should return same result (this is to test getting last PUT in sealed index segment)
assertEquals("Absolute end position of last PUT record not expected",
5 * PUT_RECORD_SIZE + (isLogSegmented ? 18 : 0), indexState.index.getAbsoluteEndPositionOfLastPut());

// calculate current end position in log segment (note that there are 5 PUT, 5 TTL update and 5 DELETE entries)
long currentEndPosition =
5 * PUT_RECORD_SIZE + 5 * TTL_UPDATE_RECORD_SIZE + 5 * DELETE_RECORD_SIZE + (isLogSegmented ? 18 : 0);
// add one more PUT entry and delete it afterwards
indexEntries = indexState.addPutEntries(1, PUT_RECORD_SIZE, Utils.Infinite_Time);
indexState.addDeleteEntry((MockId) indexEntries.get(0).getKey());
assertEquals("Number of index segments after new PUT and delete should be 4", 4,
indexState.index.getIndexSegments().size());
// now, the latest PUT entry should be in the journal
assertEquals("Absolute end position of last PUT record not expected", currentEndPosition + PUT_RECORD_SIZE,
indexState.index.getAbsoluteEndPositionOfLastPut());
indexState.destroy();
}

/**
* Test that verifies that there are no concurrency issues with the execution of
* {@link PersistentIndex#addToIndex(IndexEntry, FileSpan)} and {@link PersistentIndex#changeIndexSegments(List, Set)}
Expand Down
Loading

0 comments on commit f35f3ea

Please sign in to comment.