From f35f3ea3b533b19c1e4d503f6c89327dabc379e5 Mon Sep 17 00:00:00 2001 From: Yingyi Zhang Date: Tue, 26 Nov 2019 14:20:51 -0800 Subject: [PATCH] Support getting end position of last PUT in BlobStore (#1307) The PR introduces a new method in PersistentIndex to get end position of last PUT record in BlobStore. Note that, the "last" PUT record is valid only when the method is invoked. That is, there could be new PUTs afterwards. The intention here is to help replica to check if its peer replicas have caught up with latest PUT record when PUT is disabled on this replica (this happens when current replica is being decommissioned) --- .../com.github.ambry.store/BlobStore.java | 8 +++ .../com.github.ambry.store/IndexSegment.java | 40 ++++++++++++ .../java/com.github.ambry.store/Journal.java | 45 ++++++++++++++ .../PersistentIndex.java | 42 +++++++++++++ .../CuratedLogIndexState.java | 8 +-- .../com.github.ambry.store/IndexTest.java | 62 ++++++++++++++++++- .../com.github.ambry.store/JournalTest.java | 53 +++++++++++----- 7 files changed, 234 insertions(+), 24 deletions(-) diff --git a/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java b/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java index ee3d6e0d86..f47eaba0a7 100644 --- a/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java +++ b/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java @@ -734,6 +734,14 @@ public void deleteStoreFiles() throws StoreException, IOException { logger.info("All files of store {} are deleted", storeId); } + /** + * @return return absolute end position of last PUT in current store when this method is invoked. + * @throws StoreException + */ + public long getEndPositionOfLastPut() throws StoreException { + return index.getAbsoluteEndPositionOfLastPut(); + } + /** * @return {@link ReplicaStatusDelegate} associated with this store */ diff --git a/ambry-store/src/main/java/com.github.ambry.store/IndexSegment.java b/ambry-store/src/main/java/com.github.ambry.store/IndexSegment.java index c5683c776a..d472540859 100644 --- a/ambry-store/src/main/java/com.github.ambry.store/IndexSegment.java +++ b/ambry-store/src/main/java/com.github.ambry.store/IndexSegment.java @@ -760,6 +760,46 @@ void seal() throws StoreException { persistBloomFilter(); } + /** + * @return index value of last PUT record in this index segment. Return {@code null} if no PUT is found + */ + IndexValue getIndexValueOfLastPut() throws StoreException { + IndexValue indexValueOfLastPut = null; + if (sealed.get()) { + ByteBuffer readBuf = serEntries.duplicate(); + int numOfIndexEntries = numberOfEntries(readBuf); + NavigableSet values = new TreeSet<>(); + for (int i = 0; i < numOfIndexEntries; i++) { + StoreKey key = getKeyAt(readBuf, i); + values.clear(); + getAllValuesFromMmap(readBuf, key, i, numOfIndexEntries, values); + for (IndexValue indexValue : values) { + // FLAGS_DEFAULT_VALUE means PUT record + if (indexValue.getFlags() == IndexValue.FLAGS_DEFAULT_VALUE && (indexValueOfLastPut == null + || indexValue.compareTo(indexValueOfLastPut) > 0)) { + indexValueOfLastPut = indexValue; + // note that values set contains all entries associated with specific key, so there are at most 3 entries in + // this set (one PUT, one TTL Update and one DELETE). Due to nature of log, PUT always comes first. And if we + // already find PUT, we can jump out of the inner loop. + break; + } + } + } + } else { + for (Map.Entry> entry : index.entrySet()) { + for (IndexValue indexValue : entry.getValue()) { + // FLAGS_DEFAULT_VALUE means PUT record + if (indexValue.getFlags() == IndexValue.FLAGS_DEFAULT_VALUE && (indexValueOfLastPut == null + || indexValue.compareTo(indexValueOfLastPut) > 0)) { + indexValueOfLastPut = indexValue; + break; + } + } + } + } + return indexValueOfLastPut; + } + /** * Maps the segment of index either as a memory map or a in memory buffer depending on config. * @throws StoreException if there are problems with the index diff --git a/ambry-store/src/main/java/com.github.ambry.store/Journal.java b/ambry-store/src/main/java/com.github.ambry.store/Journal.java index cd0dd4eff5..4986fecff1 100644 --- a/ambry-store/src/main/java/com.github.ambry.store/Journal.java +++ b/ambry-store/src/main/java/com.github.ambry.store/Journal.java @@ -16,6 +16,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.NavigableMap; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; @@ -40,6 +42,23 @@ Offset getOffset() { StoreKey getKey() { return key; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + JournalEntry entry = (JournalEntry) o; + return this.offset == entry.offset && this.key == entry.key; + } + + @Override + public int hashCode() { + return Objects.hash(offset, key); + } } /** @@ -154,6 +173,24 @@ List getEntriesSince(Offset offset, boolean inclusive) { return journalEntries; } + /** + * @return all journal entries at the time when this method is invoked. (Return empty list if journal is empty) + */ + List getAllEntries() { + List result = new ArrayList<>(); + // get current last Offset + Offset lastOffset = getLastOffset(); + journal.entrySet(); + if (lastOffset != null) { + // get portion view of journal whose key is less than or equal to lastOffset + NavigableMap journalView = journal.headMap(lastOffset, true); + for (Map.Entry entry : journalView.entrySet()) { + result.add(new JournalEntry(entry.getKey(), entry.getValue())); + } + } + return result; + } + /** * @return the first/smallest offset in the journal or {@code null} if no such entry exists. */ @@ -202,6 +239,14 @@ void finishBootstrap() { inBootstrapMode = false; } + boolean isInBootstrapMode() { + return inBootstrapMode; + } + + int getMaxEntriesToJournal() { + return maxEntriesToJournal; + } + /** * @return the number of entries that is currently in the {@link Journal}. */ diff --git a/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java b/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java index 553a8c4776..9743fe86aa 100644 --- a/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java +++ b/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java @@ -1063,6 +1063,48 @@ long getLogUsedCapacity() { return getAbsolutePositionInLogForOffset(getCurrentEndOffset(indexSegments), indexSegments); } + /** + * @return absolute end position (in bytes) of latest PUT record when this method is invoked. If no PUT is found in + * current store, -1 will be returned. + * @throws StoreException + */ + long getAbsoluteEndPositionOfLastPut() throws StoreException { + IndexValue indexValueOfLastPut = null; + // 1. go through keys in journal in reverse order to see if there is a PUT index entry associated with the key + List journalEntries = journal.getAllEntries(); + ConcurrentSkipListMap indexSegments = validIndexSegments; + // check entry in reverse order (from the most recent one) to find last PUT record in index + for (int i = journalEntries.size() - 1; i >= 0; --i) { + JournalEntry entry = journalEntries.get(i); + indexValueOfLastPut = findKey(entry.getKey(), new FileSpan(entry.getOffset(), getCurrentEndOffset(indexSegments)), + EnumSet.of(IndexEntryType.PUT), indexSegments); + if (indexValueOfLastPut != null) { + break; + } + } + if (!journalEntries.isEmpty() && indexValueOfLastPut == null) { + // 2. if not find in the journal, check index segments starting from most recent one (until latest PUT is found) + JournalEntry firstJournalEntry = journalEntries.get(0); + // generate a segment map in reverse order + ConcurrentNavigableMap segmentsMapToSearch = + indexSegments.subMap(indexSegments.firstKey(), true, indexSegments.lowerKey(firstJournalEntry.getOffset()), + true).descendingMap(); + for (Map.Entry entry : segmentsMapToSearch.entrySet()) { + indexValueOfLastPut = entry.getValue().getIndexValueOfLastPut(); + if (indexValueOfLastPut != null) { + break; + } + } + } + if (indexValueOfLastPut == null) { + // if no PUT record is found in this store, return -1. This is possible when current store is a brand new store. + logger.info("No PUT record is found for store {}", dataDir); + return -1; + } + return getAbsolutePositionInLogForOffset(indexValueOfLastPut.getOffset(), indexSegments) + + indexValueOfLastPut.getSize(); + } + /** * @return the number of valid log segments starting from the first segment. */ diff --git a/ambry-store/src/test/java/com.github.ambry.store/CuratedLogIndexState.java b/ambry-store/src/test/java/com.github.ambry.store/CuratedLogIndexState.java index a3bf06dcbb..5fbd41a395 100644 --- a/ambry-store/src/test/java/com.github.ambry.store/CuratedLogIndexState.java +++ b/ambry-store/src/test/java/com.github.ambry.store/CuratedLogIndexState.java @@ -69,8 +69,8 @@ class CuratedLogIndexState { private static final long SEGMENT_CAPACITY = 3000; private static final long HARD_DELETE_START_OFFSET = 11; private static final long HARD_DELETE_LAST_PART_SIZE = 13; - private static final int DEFAULT_MAX_IN_MEM_ELEMENTS = 5; + static final int DEFAULT_MAX_IN_MEM_ELEMENTS = 5; static final DiskIOScheduler DISK_IO_SCHEDULER = new DiskIOScheduler(null); static final long DELAY_BETWEEN_LAST_MODIFIED_TIMES_MS = 10 * Time.MsPerSec; static final StoreKeyFactory STORE_KEY_FACTORY; @@ -312,10 +312,9 @@ FileSpan makePermanent(MockId id, boolean forcePut) throws StoreException { * Adds a delete entry in the index (real and reference) for {@code idToDelete}. * @param idToDelete the id to be deleted. * @return the {@link FileSpan} of the added entries. - * @throws IOException * @throws StoreException */ - FileSpan addDeleteEntry(MockId idToDelete) throws IOException, StoreException { + FileSpan addDeleteEntry(MockId idToDelete) throws StoreException { return addDeleteEntry(idToDelete, null); } @@ -1004,11 +1003,10 @@ private void setupTestState(boolean isLogSegmented, long segmentCapacity, boolea * @param expectedLogSegmentCount the number of log segments that are expected to assist after the addition of the * first entry and at the end of the addition of all entries. * @param addTtlUpdates if {@code true}, adds entries that update TTL. - * @throws IOException * @throws StoreException */ private void addCuratedIndexEntriesToLogSegment(long sizeToMakeIndexEntriesFor, int expectedLogSegmentCount, - boolean addTtlUpdates) throws IOException, StoreException { + boolean addTtlUpdates) throws StoreException { // First Index Segment // 1 PUT Offset firstJournalEntryAddedNow = diff --git a/ambry-store/src/test/java/com.github.ambry.store/IndexTest.java b/ambry-store/src/test/java/com.github.ambry.store/IndexTest.java index 675eeb0248..afe2c8e468 100644 --- a/ambry-store/src/test/java/com.github.ambry.store/IndexTest.java +++ b/ambry-store/src/test/java/com.github.ambry.store/IndexTest.java @@ -19,6 +19,7 @@ import com.github.ambry.config.StoreConfig; import com.github.ambry.config.VerifiableProperties; import com.github.ambry.replication.FindToken; +import com.github.ambry.utils.MockTime; import com.github.ambry.utils.Pair; import com.github.ambry.utils.SystemTime; import com.github.ambry.utils.TestUtils; @@ -385,11 +386,10 @@ public void hardDeletePauseResumeRestartTest() throws InterruptedException, IOEx /** * Tests that expired values are correctly handled. - * @throws IOException * @throws StoreException */ @Test - public void expirationTest() throws IOException, StoreException { + public void expirationTest() throws StoreException { // add a PUT entry that will expire if time advances // advance time so that time moves to whole second with no residual milliseconds state.time.sleep(Time.MsPerSec - state.time.milliseconds()); @@ -463,7 +463,6 @@ public void setEndOffsetsTest() throws StoreException { /** * Tests {@link PersistentIndex#changeIndexSegments(List, Set)} for good and bad cases. - * @throws IOException * @throws StoreException */ @Test @@ -530,6 +529,63 @@ public void changeIndexSegmentsTest() throws StoreException { } } + /** + * Test getting absolute end position of last PUT under different circumstances (i.e. last PUT is out of Journal) + * @throws Exception + */ + @Test + public void getAbsoluteEndPositionOfLastPutTest() throws Exception { + File testDir = StoreTestUtils.createTempDirectory("indexDirTest-" + UtilsTest.getRandomString(10)); + CuratedLogIndexState indexState = new CuratedLogIndexState(isLogSegmented, testDir, false, false, true); + assertEquals("There should be no PUT record since index is empty", -1, + indexState.index.getAbsoluteEndPositionOfLastPut()); + MockTime time = new MockTime(); + long expiresAtMs = time.milliseconds() + TimeUnit.HOURS.toMillis(1); + // DEFAULT_MAX_IN_MEM_ELEMENTS = 5, here we put 5 entries into the log + List indexEntries = indexState.addPutEntries(5, PUT_RECORD_SIZE, expiresAtMs); + assertEquals("Number of index segments should be 1", 1, indexState.index.getIndexSegments().size()); + // update ttl for above 5 entries and the ttl entries fall in second index segment. + for (IndexEntry entry : indexEntries) { + indexState.makePermanent((MockId) entry.getKey(), false); + } + assertEquals("Number of index segments (after ttl update) should be 2", 2, + indexState.index.getIndexSegments().size()); + // default journal size should be 2 * DEFAULT_MAX_IN_MEM_ELEMENTS = 10 + assertEquals("Journal size not expected", 2 * DEFAULT_MAX_IN_MEM_ELEMENTS, + indexState.index.journal.getAllEntries().size()); + // delete above 5 entries, these delete entries should fall in third index segment + for (IndexEntry entry : indexEntries) { + indexState.addDeleteEntry((MockId) entry.getKey()); + } + assertEquals("Number of index segments (after deletion) should be 3", 3, + indexState.index.getIndexSegments().size()); + // although there are now 15 entries in total, journal size is still 10 (old entries are removed from journal) + assertEquals("Journal size (after deletion) shouldn't change", 2 * DEFAULT_MAX_IN_MEM_ELEMENTS, + indexState.index.journal.getAllEntries().size()); + // after deletion, the last PUT record falls in first segment (out of journal) + // for segmented log, there is a header size = 18 + assertEquals("Absolute end position of last PUT record not expected", + 5 * PUT_RECORD_SIZE + (isLogSegmented ? 18 : 0), indexState.index.getAbsoluteEndPositionOfLastPut()); + // close the index to seal all index segments + indexState.index.close(false); + // get end position of last PUT again, it should return same result (this is to test getting last PUT in sealed index segment) + assertEquals("Absolute end position of last PUT record not expected", + 5 * PUT_RECORD_SIZE + (isLogSegmented ? 18 : 0), indexState.index.getAbsoluteEndPositionOfLastPut()); + + // calculate current end position in log segment (note that there are 5 PUT, 5 TTL update and 5 DELETE entries) + long currentEndPosition = + 5 * PUT_RECORD_SIZE + 5 * TTL_UPDATE_RECORD_SIZE + 5 * DELETE_RECORD_SIZE + (isLogSegmented ? 18 : 0); + // add one more PUT entry and delete it afterwards + indexEntries = indexState.addPutEntries(1, PUT_RECORD_SIZE, Utils.Infinite_Time); + indexState.addDeleteEntry((MockId) indexEntries.get(0).getKey()); + assertEquals("Number of index segments after new PUT and delete should be 4", 4, + indexState.index.getIndexSegments().size()); + // now, the latest PUT entry should be in the journal + assertEquals("Absolute end position of last PUT record not expected", currentEndPosition + PUT_RECORD_SIZE, + indexState.index.getAbsoluteEndPositionOfLastPut()); + indexState.destroy(); + } + /** * Test that verifies that there are no concurrency issues with the execution of * {@link PersistentIndex#addToIndex(IndexEntry, FileSpan)} and {@link PersistentIndex#changeIndexSegments(List, Set)} diff --git a/ambry-store/src/test/java/com.github.ambry.store/JournalTest.java b/ambry-store/src/test/java/com.github.ambry.store/JournalTest.java index 669abbc5c1..aa1d98d031 100644 --- a/ambry-store/src/test/java/com.github.ambry.store/JournalTest.java +++ b/ambry-store/src/test/java/com.github.ambry.store/JournalTest.java @@ -15,6 +15,8 @@ import com.github.ambry.utils.TestUtils; import com.github.ambry.utils.Utils; +import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import org.junit.Assert; import org.junit.Test; @@ -29,24 +31,28 @@ public void testJournalOperation() { String firstLogSegmentName = LogSegmentNameHelper.getName(pos, gen); String secondLogSegmentName = LogSegmentNameHelper.getNextPositionName(firstLogSegmentName); Journal journal = new Journal("test", 10, 5); + // maintain a queue to keep track of entries in journal for verification purpose + List journalEntries = new LinkedList<>(); Assert.assertNull("First offset should be null", journal.getFirstOffset()); Assert.assertNull("Last offset should be null", journal.getLastOffset()); Assert.assertNull("Should not be able to get entries because there are none", journal.getEntriesSince(new Offset(firstLogSegmentName, 0), true)); - addEntryAndVerify(journal, new Offset(firstLogSegmentName, 0), new MockId("id1")); + Assert.assertTrue("Get all entries should return empty list", journal.getAllEntries().isEmpty()); + addEntryAndVerify(journal, new Offset(firstLogSegmentName, 0), new MockId("id1"), journalEntries); Assert.assertEquals("Did not get expected key at offset", new MockId("id1"), journal.getKeyAtOffset(new Offset(firstLogSegmentName, 0))); - addEntryAndVerify(journal, new Offset(firstLogSegmentName, 1000), new MockId("id2")); - addEntryAndVerify(journal, new Offset(firstLogSegmentName, 2000), new MockId("id3")); - addEntryAndVerify(journal, new Offset(firstLogSegmentName, 3000), new MockId("id4")); - addEntryAndVerify(journal, new Offset(firstLogSegmentName, 4000), new MockId("id5")); - addEntryAndVerify(journal, new Offset(secondLogSegmentName, 0), new MockId("id6")); - addEntryAndVerify(journal, new Offset(secondLogSegmentName, 1000), new MockId("id7")); - addEntryAndVerify(journal, new Offset(secondLogSegmentName, 2000), new MockId("id8")); - addEntryAndVerify(journal, new Offset(secondLogSegmentName, 3000), new MockId("id9")); - addEntryAndVerify(journal, new Offset(secondLogSegmentName, 4000), new MockId("id10")); + addEntryAndVerify(journal, new Offset(firstLogSegmentName, 1000), new MockId("id2"), journalEntries); + addEntryAndVerify(journal, new Offset(firstLogSegmentName, 2000), new MockId("id3"), journalEntries); + addEntryAndVerify(journal, new Offset(firstLogSegmentName, 3000), new MockId("id4"), journalEntries); + addEntryAndVerify(journal, new Offset(firstLogSegmentName, 4000), new MockId("id5"), journalEntries); + addEntryAndVerify(journal, new Offset(secondLogSegmentName, 0), new MockId("id6"), journalEntries); + addEntryAndVerify(journal, new Offset(secondLogSegmentName, 1000), new MockId("id7"), journalEntries); + addEntryAndVerify(journal, new Offset(secondLogSegmentName, 2000), new MockId("id8"), journalEntries); + addEntryAndVerify(journal, new Offset(secondLogSegmentName, 3000), new MockId("id9"), journalEntries); + addEntryAndVerify(journal, new Offset(secondLogSegmentName, 4000), new MockId("id10"), journalEntries); Assert.assertEquals("First offset not as expected", new Offset(firstLogSegmentName, 0), journal.getFirstOffset()); Assert.assertEquals("Last offset not as expected", new Offset(secondLogSegmentName, 4000), journal.getLastOffset()); + Assert.assertEquals("Current entries in journal not expected", journalEntries, journal.getAllEntries()); List entries = journal.getEntriesSince(new Offset(firstLogSegmentName, 0), true); Assert.assertEquals(entries.get(0).getOffset(), new Offset(firstLogSegmentName, 0)); Assert.assertEquals(entries.get(0).getKey(), new MockId("id1")); @@ -73,7 +79,10 @@ public void testJournalOperation() { Assert.assertEquals(entries.size(), 5); Assert.assertNull("Should not be able to get entries because offset does not exist", journal.getEntriesSince(new Offset(firstLogSegmentName, 1), true)); - addEntryAndVerify(journal, new Offset(secondLogSegmentName, 5000), new MockId("id11")); + addEntryAndVerify(journal, new Offset(secondLogSegmentName, 5000), new MockId("id11"), journalEntries); + Assert.assertEquals("Current entries in journal not expected", journalEntries, journal.getAllEntries()); + Assert.assertEquals("Number of entries in journal not expected", journal.getMaxEntriesToJournal(), + journal.getAllEntries().size()); Assert.assertEquals("First offset not as expected", new Offset(firstLogSegmentName, 1000), journal.getFirstOffset()); Assert.assertEquals("Last offset not as expected", new Offset(secondLogSegmentName, 5000), journal.getLastOffset()); @@ -110,22 +119,26 @@ public void testJournalBootstrapMode() { offsets[i] = new Offset(logSegmentName, i * 1000); keys[i] = new MockId("id" + i); } + List journalEntries = new ArrayList<>(); // Bootstrap mode is off by default and journal entries should respect the max constraint Journal journal = new Journal("test", 1, 1); - addEntryAndVerify(journal, offsets[0], keys[0]); - addEntryAndVerify(journal, offsets[1], keys[1]); + addEntryAndVerify(journal, offsets[0], keys[0], journalEntries); + addEntryAndVerify(journal, offsets[1], keys[1], journalEntries); Assert.assertEquals("Unexpected journal size", 1, journal.getCurrentNumberOfEntries()); Assert.assertEquals("Oldest entry is not being replaced", offsets[1], journal.getFirstOffset()); + Assert.assertEquals("Entries in journal not expected", journalEntries, journal.getAllEntries()); // Bootstrap mode is turned on and journal entries should be able to exceed the max constraint journal.startBootstrap(); - addEntryAndVerify(journal, offsets[2], keys[2]); + addEntryAndVerify(journal, offsets[2], keys[2], journalEntries); Assert.assertEquals("Unexpected journal size", 2, journal.getCurrentNumberOfEntries()); Assert.assertEquals("Oldest entry should not be replaced", offsets[1], journal.getFirstOffset()); + Assert.assertEquals("Entries in journal not expected", journalEntries, journal.getAllEntries()); // Bootstrap mode is off and journal entries should respect the max constraint again journal.finishBootstrap(); - addEntryAndVerify(journal, offsets[3], keys[3]); + addEntryAndVerify(journal, offsets[3], keys[3], journalEntries); Assert.assertEquals("Unexpected journal size", 2, journal.getCurrentNumberOfEntries()); Assert.assertEquals("Oldest entry is not being replaced", offsets[2], journal.getFirstOffset()); + Assert.assertEquals("Entries in journal not expected", journalEntries, journal.getAllEntries()); } /** @@ -133,10 +146,18 @@ public void testJournalBootstrapMode() { * @param journal the {@link Journal} to add to * @param offset the {@link Offset} to add an entry for * @param id the {@link StoreKey} at {@code offset} + * @param journalEntries a list of {@link JournalEntry} to track entries in current journal. This is for verification + * purpose. */ - private void addEntryAndVerify(Journal journal, Offset offset, MockId id) { + private void addEntryAndVerify(Journal journal, Offset offset, MockId id, List journalEntries) { long crc = Utils.getRandomLong(TestUtils.RANDOM, Long.MAX_VALUE); journal.addEntry(offset, id, crc); + if (journalEntries != null) { + if (!journal.isInBootstrapMode() && journalEntries.size() >= journal.getMaxEntriesToJournal()) { + journalEntries.remove(0); + } + journalEntries.add(new JournalEntry(offset, id)); + } Assert.assertEquals("Unexpected key at offset", id, journal.getKeyAtOffset(offset)); Assert.assertEquals("Unexpected crc for key", crc, journal.getCrcOfKey(id).longValue()); }