diff --git a/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java b/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java index ee3d6e0d86..f47eaba0a7 100644 --- a/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java +++ b/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java @@ -734,6 +734,14 @@ public void deleteStoreFiles() throws StoreException, IOException { logger.info("All files of store {} are deleted", storeId); } + /** + * @return return absolute end position of last PUT in current store when this method is invoked. + * @throws StoreException + */ + public long getEndPositionOfLastPut() throws StoreException { + return index.getAbsoluteEndPositionOfLastPut(); + } + /** * @return {@link ReplicaStatusDelegate} associated with this store */ diff --git a/ambry-store/src/main/java/com.github.ambry.store/IndexSegment.java b/ambry-store/src/main/java/com.github.ambry.store/IndexSegment.java index c5683c776a..d472540859 100644 --- a/ambry-store/src/main/java/com.github.ambry.store/IndexSegment.java +++ b/ambry-store/src/main/java/com.github.ambry.store/IndexSegment.java @@ -760,6 +760,46 @@ void seal() throws StoreException { persistBloomFilter(); } + /** + * @return index value of last PUT record in this index segment. Return {@code null} if no PUT is found + */ + IndexValue getIndexValueOfLastPut() throws StoreException { + IndexValue indexValueOfLastPut = null; + if (sealed.get()) { + ByteBuffer readBuf = serEntries.duplicate(); + int numOfIndexEntries = numberOfEntries(readBuf); + NavigableSet values = new TreeSet<>(); + for (int i = 0; i < numOfIndexEntries; i++) { + StoreKey key = getKeyAt(readBuf, i); + values.clear(); + getAllValuesFromMmap(readBuf, key, i, numOfIndexEntries, values); + for (IndexValue indexValue : values) { + // FLAGS_DEFAULT_VALUE means PUT record + if (indexValue.getFlags() == IndexValue.FLAGS_DEFAULT_VALUE && (indexValueOfLastPut == null + || indexValue.compareTo(indexValueOfLastPut) > 0)) { + indexValueOfLastPut = indexValue; + // note that values set contains all entries associated with specific key, so there are at most 3 entries in + // this set (one PUT, one TTL Update and one DELETE). Due to nature of log, PUT always comes first. And if we + // already find PUT, we can jump out of the inner loop. + break; + } + } + } + } else { + for (Map.Entry> entry : index.entrySet()) { + for (IndexValue indexValue : entry.getValue()) { + // FLAGS_DEFAULT_VALUE means PUT record + if (indexValue.getFlags() == IndexValue.FLAGS_DEFAULT_VALUE && (indexValueOfLastPut == null + || indexValue.compareTo(indexValueOfLastPut) > 0)) { + indexValueOfLastPut = indexValue; + break; + } + } + } + } + return indexValueOfLastPut; + } + /** * Maps the segment of index either as a memory map or a in memory buffer depending on config. * @throws StoreException if there are problems with the index diff --git a/ambry-store/src/main/java/com.github.ambry.store/Journal.java b/ambry-store/src/main/java/com.github.ambry.store/Journal.java index cd0dd4eff5..4986fecff1 100644 --- a/ambry-store/src/main/java/com.github.ambry.store/Journal.java +++ b/ambry-store/src/main/java/com.github.ambry.store/Journal.java @@ -16,6 +16,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.NavigableMap; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; @@ -40,6 +42,23 @@ Offset getOffset() { StoreKey getKey() { return key; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + JournalEntry entry = (JournalEntry) o; + return this.offset == entry.offset && this.key == entry.key; + } + + @Override + public int hashCode() { + return Objects.hash(offset, key); + } } /** @@ -154,6 +173,24 @@ List getEntriesSince(Offset offset, boolean inclusive) { return journalEntries; } + /** + * @return all journal entries at the time when this method is invoked. (Return empty list if journal is empty) + */ + List getAllEntries() { + List result = new ArrayList<>(); + // get current last Offset + Offset lastOffset = getLastOffset(); + journal.entrySet(); + if (lastOffset != null) { + // get portion view of journal whose key is less than or equal to lastOffset + NavigableMap journalView = journal.headMap(lastOffset, true); + for (Map.Entry entry : journalView.entrySet()) { + result.add(new JournalEntry(entry.getKey(), entry.getValue())); + } + } + return result; + } + /** * @return the first/smallest offset in the journal or {@code null} if no such entry exists. */ @@ -202,6 +239,14 @@ void finishBootstrap() { inBootstrapMode = false; } + boolean isInBootstrapMode() { + return inBootstrapMode; + } + + int getMaxEntriesToJournal() { + return maxEntriesToJournal; + } + /** * @return the number of entries that is currently in the {@link Journal}. */ diff --git a/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java b/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java index 553a8c4776..9743fe86aa 100644 --- a/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java +++ b/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java @@ -1063,6 +1063,48 @@ long getLogUsedCapacity() { return getAbsolutePositionInLogForOffset(getCurrentEndOffset(indexSegments), indexSegments); } + /** + * @return absolute end position (in bytes) of latest PUT record when this method is invoked. If no PUT is found in + * current store, -1 will be returned. + * @throws StoreException + */ + long getAbsoluteEndPositionOfLastPut() throws StoreException { + IndexValue indexValueOfLastPut = null; + // 1. go through keys in journal in reverse order to see if there is a PUT index entry associated with the key + List journalEntries = journal.getAllEntries(); + ConcurrentSkipListMap indexSegments = validIndexSegments; + // check entry in reverse order (from the most recent one) to find last PUT record in index + for (int i = journalEntries.size() - 1; i >= 0; --i) { + JournalEntry entry = journalEntries.get(i); + indexValueOfLastPut = findKey(entry.getKey(), new FileSpan(entry.getOffset(), getCurrentEndOffset(indexSegments)), + EnumSet.of(IndexEntryType.PUT), indexSegments); + if (indexValueOfLastPut != null) { + break; + } + } + if (!journalEntries.isEmpty() && indexValueOfLastPut == null) { + // 2. if not find in the journal, check index segments starting from most recent one (until latest PUT is found) + JournalEntry firstJournalEntry = journalEntries.get(0); + // generate a segment map in reverse order + ConcurrentNavigableMap segmentsMapToSearch = + indexSegments.subMap(indexSegments.firstKey(), true, indexSegments.lowerKey(firstJournalEntry.getOffset()), + true).descendingMap(); + for (Map.Entry entry : segmentsMapToSearch.entrySet()) { + indexValueOfLastPut = entry.getValue().getIndexValueOfLastPut(); + if (indexValueOfLastPut != null) { + break; + } + } + } + if (indexValueOfLastPut == null) { + // if no PUT record is found in this store, return -1. This is possible when current store is a brand new store. + logger.info("No PUT record is found for store {}", dataDir); + return -1; + } + return getAbsolutePositionInLogForOffset(indexValueOfLastPut.getOffset(), indexSegments) + + indexValueOfLastPut.getSize(); + } + /** * @return the number of valid log segments starting from the first segment. */ diff --git a/ambry-store/src/test/java/com.github.ambry.store/CuratedLogIndexState.java b/ambry-store/src/test/java/com.github.ambry.store/CuratedLogIndexState.java index a3bf06dcbb..5fbd41a395 100644 --- a/ambry-store/src/test/java/com.github.ambry.store/CuratedLogIndexState.java +++ b/ambry-store/src/test/java/com.github.ambry.store/CuratedLogIndexState.java @@ -69,8 +69,8 @@ class CuratedLogIndexState { private static final long SEGMENT_CAPACITY = 3000; private static final long HARD_DELETE_START_OFFSET = 11; private static final long HARD_DELETE_LAST_PART_SIZE = 13; - private static final int DEFAULT_MAX_IN_MEM_ELEMENTS = 5; + static final int DEFAULT_MAX_IN_MEM_ELEMENTS = 5; static final DiskIOScheduler DISK_IO_SCHEDULER = new DiskIOScheduler(null); static final long DELAY_BETWEEN_LAST_MODIFIED_TIMES_MS = 10 * Time.MsPerSec; static final StoreKeyFactory STORE_KEY_FACTORY; @@ -312,10 +312,9 @@ FileSpan makePermanent(MockId id, boolean forcePut) throws StoreException { * Adds a delete entry in the index (real and reference) for {@code idToDelete}. * @param idToDelete the id to be deleted. * @return the {@link FileSpan} of the added entries. - * @throws IOException * @throws StoreException */ - FileSpan addDeleteEntry(MockId idToDelete) throws IOException, StoreException { + FileSpan addDeleteEntry(MockId idToDelete) throws StoreException { return addDeleteEntry(idToDelete, null); } @@ -1004,11 +1003,10 @@ private void setupTestState(boolean isLogSegmented, long segmentCapacity, boolea * @param expectedLogSegmentCount the number of log segments that are expected to assist after the addition of the * first entry and at the end of the addition of all entries. * @param addTtlUpdates if {@code true}, adds entries that update TTL. - * @throws IOException * @throws StoreException */ private void addCuratedIndexEntriesToLogSegment(long sizeToMakeIndexEntriesFor, int expectedLogSegmentCount, - boolean addTtlUpdates) throws IOException, StoreException { + boolean addTtlUpdates) throws StoreException { // First Index Segment // 1 PUT Offset firstJournalEntryAddedNow = diff --git a/ambry-store/src/test/java/com.github.ambry.store/IndexTest.java b/ambry-store/src/test/java/com.github.ambry.store/IndexTest.java index 675eeb0248..afe2c8e468 100644 --- a/ambry-store/src/test/java/com.github.ambry.store/IndexTest.java +++ b/ambry-store/src/test/java/com.github.ambry.store/IndexTest.java @@ -19,6 +19,7 @@ import com.github.ambry.config.StoreConfig; import com.github.ambry.config.VerifiableProperties; import com.github.ambry.replication.FindToken; +import com.github.ambry.utils.MockTime; import com.github.ambry.utils.Pair; import com.github.ambry.utils.SystemTime; import com.github.ambry.utils.TestUtils; @@ -385,11 +386,10 @@ public void hardDeletePauseResumeRestartTest() throws InterruptedException, IOEx /** * Tests that expired values are correctly handled. - * @throws IOException * @throws StoreException */ @Test - public void expirationTest() throws IOException, StoreException { + public void expirationTest() throws StoreException { // add a PUT entry that will expire if time advances // advance time so that time moves to whole second with no residual milliseconds state.time.sleep(Time.MsPerSec - state.time.milliseconds()); @@ -463,7 +463,6 @@ public void setEndOffsetsTest() throws StoreException { /** * Tests {@link PersistentIndex#changeIndexSegments(List, Set)} for good and bad cases. - * @throws IOException * @throws StoreException */ @Test @@ -530,6 +529,63 @@ public void changeIndexSegmentsTest() throws StoreException { } } + /** + * Test getting absolute end position of last PUT under different circumstances (i.e. last PUT is out of Journal) + * @throws Exception + */ + @Test + public void getAbsoluteEndPositionOfLastPutTest() throws Exception { + File testDir = StoreTestUtils.createTempDirectory("indexDirTest-" + UtilsTest.getRandomString(10)); + CuratedLogIndexState indexState = new CuratedLogIndexState(isLogSegmented, testDir, false, false, true); + assertEquals("There should be no PUT record since index is empty", -1, + indexState.index.getAbsoluteEndPositionOfLastPut()); + MockTime time = new MockTime(); + long expiresAtMs = time.milliseconds() + TimeUnit.HOURS.toMillis(1); + // DEFAULT_MAX_IN_MEM_ELEMENTS = 5, here we put 5 entries into the log + List indexEntries = indexState.addPutEntries(5, PUT_RECORD_SIZE, expiresAtMs); + assertEquals("Number of index segments should be 1", 1, indexState.index.getIndexSegments().size()); + // update ttl for above 5 entries and the ttl entries fall in second index segment. + for (IndexEntry entry : indexEntries) { + indexState.makePermanent((MockId) entry.getKey(), false); + } + assertEquals("Number of index segments (after ttl update) should be 2", 2, + indexState.index.getIndexSegments().size()); + // default journal size should be 2 * DEFAULT_MAX_IN_MEM_ELEMENTS = 10 + assertEquals("Journal size not expected", 2 * DEFAULT_MAX_IN_MEM_ELEMENTS, + indexState.index.journal.getAllEntries().size()); + // delete above 5 entries, these delete entries should fall in third index segment + for (IndexEntry entry : indexEntries) { + indexState.addDeleteEntry((MockId) entry.getKey()); + } + assertEquals("Number of index segments (after deletion) should be 3", 3, + indexState.index.getIndexSegments().size()); + // although there are now 15 entries in total, journal size is still 10 (old entries are removed from journal) + assertEquals("Journal size (after deletion) shouldn't change", 2 * DEFAULT_MAX_IN_MEM_ELEMENTS, + indexState.index.journal.getAllEntries().size()); + // after deletion, the last PUT record falls in first segment (out of journal) + // for segmented log, there is a header size = 18 + assertEquals("Absolute end position of last PUT record not expected", + 5 * PUT_RECORD_SIZE + (isLogSegmented ? 18 : 0), indexState.index.getAbsoluteEndPositionOfLastPut()); + // close the index to seal all index segments + indexState.index.close(false); + // get end position of last PUT again, it should return same result (this is to test getting last PUT in sealed index segment) + assertEquals("Absolute end position of last PUT record not expected", + 5 * PUT_RECORD_SIZE + (isLogSegmented ? 18 : 0), indexState.index.getAbsoluteEndPositionOfLastPut()); + + // calculate current end position in log segment (note that there are 5 PUT, 5 TTL update and 5 DELETE entries) + long currentEndPosition = + 5 * PUT_RECORD_SIZE + 5 * TTL_UPDATE_RECORD_SIZE + 5 * DELETE_RECORD_SIZE + (isLogSegmented ? 18 : 0); + // add one more PUT entry and delete it afterwards + indexEntries = indexState.addPutEntries(1, PUT_RECORD_SIZE, Utils.Infinite_Time); + indexState.addDeleteEntry((MockId) indexEntries.get(0).getKey()); + assertEquals("Number of index segments after new PUT and delete should be 4", 4, + indexState.index.getIndexSegments().size()); + // now, the latest PUT entry should be in the journal + assertEquals("Absolute end position of last PUT record not expected", currentEndPosition + PUT_RECORD_SIZE, + indexState.index.getAbsoluteEndPositionOfLastPut()); + indexState.destroy(); + } + /** * Test that verifies that there are no concurrency issues with the execution of * {@link PersistentIndex#addToIndex(IndexEntry, FileSpan)} and {@link PersistentIndex#changeIndexSegments(List, Set)} diff --git a/ambry-store/src/test/java/com.github.ambry.store/JournalTest.java b/ambry-store/src/test/java/com.github.ambry.store/JournalTest.java index 669abbc5c1..aa1d98d031 100644 --- a/ambry-store/src/test/java/com.github.ambry.store/JournalTest.java +++ b/ambry-store/src/test/java/com.github.ambry.store/JournalTest.java @@ -15,6 +15,8 @@ import com.github.ambry.utils.TestUtils; import com.github.ambry.utils.Utils; +import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import org.junit.Assert; import org.junit.Test; @@ -29,24 +31,28 @@ public void testJournalOperation() { String firstLogSegmentName = LogSegmentNameHelper.getName(pos, gen); String secondLogSegmentName = LogSegmentNameHelper.getNextPositionName(firstLogSegmentName); Journal journal = new Journal("test", 10, 5); + // maintain a queue to keep track of entries in journal for verification purpose + List journalEntries = new LinkedList<>(); Assert.assertNull("First offset should be null", journal.getFirstOffset()); Assert.assertNull("Last offset should be null", journal.getLastOffset()); Assert.assertNull("Should not be able to get entries because there are none", journal.getEntriesSince(new Offset(firstLogSegmentName, 0), true)); - addEntryAndVerify(journal, new Offset(firstLogSegmentName, 0), new MockId("id1")); + Assert.assertTrue("Get all entries should return empty list", journal.getAllEntries().isEmpty()); + addEntryAndVerify(journal, new Offset(firstLogSegmentName, 0), new MockId("id1"), journalEntries); Assert.assertEquals("Did not get expected key at offset", new MockId("id1"), journal.getKeyAtOffset(new Offset(firstLogSegmentName, 0))); - addEntryAndVerify(journal, new Offset(firstLogSegmentName, 1000), new MockId("id2")); - addEntryAndVerify(journal, new Offset(firstLogSegmentName, 2000), new MockId("id3")); - addEntryAndVerify(journal, new Offset(firstLogSegmentName, 3000), new MockId("id4")); - addEntryAndVerify(journal, new Offset(firstLogSegmentName, 4000), new MockId("id5")); - addEntryAndVerify(journal, new Offset(secondLogSegmentName, 0), new MockId("id6")); - addEntryAndVerify(journal, new Offset(secondLogSegmentName, 1000), new MockId("id7")); - addEntryAndVerify(journal, new Offset(secondLogSegmentName, 2000), new MockId("id8")); - addEntryAndVerify(journal, new Offset(secondLogSegmentName, 3000), new MockId("id9")); - addEntryAndVerify(journal, new Offset(secondLogSegmentName, 4000), new MockId("id10")); + addEntryAndVerify(journal, new Offset(firstLogSegmentName, 1000), new MockId("id2"), journalEntries); + addEntryAndVerify(journal, new Offset(firstLogSegmentName, 2000), new MockId("id3"), journalEntries); + addEntryAndVerify(journal, new Offset(firstLogSegmentName, 3000), new MockId("id4"), journalEntries); + addEntryAndVerify(journal, new Offset(firstLogSegmentName, 4000), new MockId("id5"), journalEntries); + addEntryAndVerify(journal, new Offset(secondLogSegmentName, 0), new MockId("id6"), journalEntries); + addEntryAndVerify(journal, new Offset(secondLogSegmentName, 1000), new MockId("id7"), journalEntries); + addEntryAndVerify(journal, new Offset(secondLogSegmentName, 2000), new MockId("id8"), journalEntries); + addEntryAndVerify(journal, new Offset(secondLogSegmentName, 3000), new MockId("id9"), journalEntries); + addEntryAndVerify(journal, new Offset(secondLogSegmentName, 4000), new MockId("id10"), journalEntries); Assert.assertEquals("First offset not as expected", new Offset(firstLogSegmentName, 0), journal.getFirstOffset()); Assert.assertEquals("Last offset not as expected", new Offset(secondLogSegmentName, 4000), journal.getLastOffset()); + Assert.assertEquals("Current entries in journal not expected", journalEntries, journal.getAllEntries()); List entries = journal.getEntriesSince(new Offset(firstLogSegmentName, 0), true); Assert.assertEquals(entries.get(0).getOffset(), new Offset(firstLogSegmentName, 0)); Assert.assertEquals(entries.get(0).getKey(), new MockId("id1")); @@ -73,7 +79,10 @@ public void testJournalOperation() { Assert.assertEquals(entries.size(), 5); Assert.assertNull("Should not be able to get entries because offset does not exist", journal.getEntriesSince(new Offset(firstLogSegmentName, 1), true)); - addEntryAndVerify(journal, new Offset(secondLogSegmentName, 5000), new MockId("id11")); + addEntryAndVerify(journal, new Offset(secondLogSegmentName, 5000), new MockId("id11"), journalEntries); + Assert.assertEquals("Current entries in journal not expected", journalEntries, journal.getAllEntries()); + Assert.assertEquals("Number of entries in journal not expected", journal.getMaxEntriesToJournal(), + journal.getAllEntries().size()); Assert.assertEquals("First offset not as expected", new Offset(firstLogSegmentName, 1000), journal.getFirstOffset()); Assert.assertEquals("Last offset not as expected", new Offset(secondLogSegmentName, 5000), journal.getLastOffset()); @@ -110,22 +119,26 @@ public void testJournalBootstrapMode() { offsets[i] = new Offset(logSegmentName, i * 1000); keys[i] = new MockId("id" + i); } + List journalEntries = new ArrayList<>(); // Bootstrap mode is off by default and journal entries should respect the max constraint Journal journal = new Journal("test", 1, 1); - addEntryAndVerify(journal, offsets[0], keys[0]); - addEntryAndVerify(journal, offsets[1], keys[1]); + addEntryAndVerify(journal, offsets[0], keys[0], journalEntries); + addEntryAndVerify(journal, offsets[1], keys[1], journalEntries); Assert.assertEquals("Unexpected journal size", 1, journal.getCurrentNumberOfEntries()); Assert.assertEquals("Oldest entry is not being replaced", offsets[1], journal.getFirstOffset()); + Assert.assertEquals("Entries in journal not expected", journalEntries, journal.getAllEntries()); // Bootstrap mode is turned on and journal entries should be able to exceed the max constraint journal.startBootstrap(); - addEntryAndVerify(journal, offsets[2], keys[2]); + addEntryAndVerify(journal, offsets[2], keys[2], journalEntries); Assert.assertEquals("Unexpected journal size", 2, journal.getCurrentNumberOfEntries()); Assert.assertEquals("Oldest entry should not be replaced", offsets[1], journal.getFirstOffset()); + Assert.assertEquals("Entries in journal not expected", journalEntries, journal.getAllEntries()); // Bootstrap mode is off and journal entries should respect the max constraint again journal.finishBootstrap(); - addEntryAndVerify(journal, offsets[3], keys[3]); + addEntryAndVerify(journal, offsets[3], keys[3], journalEntries); Assert.assertEquals("Unexpected journal size", 2, journal.getCurrentNumberOfEntries()); Assert.assertEquals("Oldest entry is not being replaced", offsets[2], journal.getFirstOffset()); + Assert.assertEquals("Entries in journal not expected", journalEntries, journal.getAllEntries()); } /** @@ -133,10 +146,18 @@ public void testJournalBootstrapMode() { * @param journal the {@link Journal} to add to * @param offset the {@link Offset} to add an entry for * @param id the {@link StoreKey} at {@code offset} + * @param journalEntries a list of {@link JournalEntry} to track entries in current journal. This is for verification + * purpose. */ - private void addEntryAndVerify(Journal journal, Offset offset, MockId id) { + private void addEntryAndVerify(Journal journal, Offset offset, MockId id, List journalEntries) { long crc = Utils.getRandomLong(TestUtils.RANDOM, Long.MAX_VALUE); journal.addEntry(offset, id, crc); + if (journalEntries != null) { + if (!journal.isInBootstrapMode() && journalEntries.size() >= journal.getMaxEntriesToJournal()) { + journalEntries.remove(0); + } + journalEntries.add(new JournalEntry(offset, id)); + } Assert.assertEquals("Unexpected key at offset", id, journal.getKeyAtOffset(offset)); Assert.assertEquals("Unexpected crc for key", crc, journal.getCrcOfKey(id).longValue()); }