From b115593659596832b6fe1389d5fdbc0eb4b661a2 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Tue, 26 Jul 2022 14:25:38 +0530 Subject: [PATCH 1/3] Add RemoteSegmentStoreDirectory to interact with remote segment store Signed-off-by: Sachin Kale --- .../index/store/RemoteDirectory.java | 12 +- .../store/RemoteSegmentStoreDirectory.java | 412 ++++++++++++++++++ .../index/store/RemoteDirectoryTests.java | 20 + .../RemoteSegmentStoreDirectoryTests.java | 390 +++++++++++++++++ 4 files changed, 833 insertions(+), 1 deletion(-) create mode 100644 server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java create mode 100644 server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index 855457f275122..35c173da91183 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -33,7 +33,7 @@ * * @opensearch.internal */ -public final class RemoteDirectory extends Directory { +public class RemoteDirectory extends Directory { private final BlobContainer blobContainer; @@ -50,6 +50,16 @@ public String[] listAll() throws IOException { return blobContainer.listBlobs().keySet().stream().sorted().toArray(String[]::new); } + /** + * Returns names of files with given prefix in this directory. + * @param filenamePrefix The prefix to match against file names in the directory + * @return A list of the matching filenames in the directory + * @throws IOException IOException if there were any failures in reading from the blob container + */ + public Collection listFilesByPrefix(String filenamePrefix) throws IOException { + return blobContainer.listBlobsByPrefix(filenamePrefix).keySet(); + } + /** * Removes an existing file in the directory. * diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java new file mode 100644 index 0000000000000..9db7efe9891b8 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -0,0 +1,412 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.opensearch.common.UUIDs; + +import java.io.IOException; +import java.nio.file.NoSuchFileException; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * A RemoteDirectory extension for remote segment store. We need to make sure we don't overwrite a segment file once uploaded. + * In order to prevent segment overwrite which can occur due to two primary nodes for the same shard at the same time, + * a unique suffix is added to the uploaded segment file. This class keeps track of filename of segments stored + * in remote segment store vs filename in local filesystem and provides the consistent Directory interface so that + * caller will be accessing segment files in the same way as {@code FSDirectory}. Apart from storing actual segment files, + * remote segment store also keeps track of refresh and commit checkpoints in a separate path which is handled by + * another instance of {@code RemoteDirectory}. + * @opensearch.internal + */ +public final class RemoteSegmentStoreDirectory extends FilterDirectory { + /** + * Each segment file is uploaded with unique suffix. + * For example, _0.cfe in local filesystem will be uploaded to remote segment store as _0.cfe__gX7bNIIBrs0AUNsR2yEG + */ + public static final String SEGMENT_NAME_UUID_SEPARATOR = "__"; + + /** + * remoteDataDirectory is used to store segment files at path: cluster_UUID/index_UUID/shardId/segments/data + */ + private final RemoteDirectory remoteDataDirectory; + /** + * remoteMetadataDirectory is used to store metadata files at path: cluster_UUID/index_UUID/shardId/segments/metadata + */ + private final RemoteDirectory remoteMetadataDirectory; + + /** + * To prevent explosion of refresh metadata files, we replace refresh files for the given primary term and generation + * This is achieved by uploading refresh metadata file with the same suffix as last commit metadata file. + */ + private String refreshMetadataFileUniqueSuffix; + /** + * Keeps track of local segment filename to uploaded filename along with other attributes like checksum. + * This map acts as a cache layer for uploaded segment filenames which helps avoid calling listAll() each time. + * It is important to initialize this map on creation of RemoteSegmentStoreDirectory and update it on each upload and delete. + */ + private Map segmentsUploadedToRemoteStore; + + private static final Logger logger = LogManager.getLogger(RemoteSegmentStoreDirectory.class); + + public RemoteSegmentStoreDirectory(RemoteDirectory remoteDataDirectory, RemoteDirectory remoteMetadataDirectory) throws IOException { + super(remoteDataDirectory); + this.remoteDataDirectory = remoteDataDirectory; + this.remoteMetadataDirectory = remoteMetadataDirectory; + init(); + } + + /** + * Initializes the cache which keeps track of all the segment files uploaded to the remote segment store. + * As this cache is specific to an instance of RemoteSegmentStoreDirectory, it is possible that cache becomes stale + * if another instance of RemoteSegmentStoreDirectory is used to upload/delete segment files. + * It is caller's responsibility to call init() again to ensure that cache is properly updated. + * @throws IOException if there were any failures in reading the metadata file + */ + public void init() throws IOException { + this.refreshMetadataFileUniqueSuffix = UUIDs.base64UUID(); + this.segmentsUploadedToRemoteStore = new ConcurrentHashMap<>(readLatestMetadataFile()); + } + + /** + * Read latest metadata file to get the list of segments uploaded to the remote segment store. + * We upload a separate metadata file per commit, commit_metadata__PrimaryTerm__Generation__UUID + * We also keep a metadata file per refresh, but it is not unique per refresh. Refresh metadata file is unique for a given commit. + * The format of refresh metadata filename is: refresh_metadata__PrimaryTerm__Generation__LastCommitUUID + * Commit metadata files keep track of all the segments of the given shard that are part of the commit. + * Refresh metadata files keep track of segments that were created since the last commit. + * In order to get the list of segment files uploaded to the remote segment store, we need to read the latest commit metadata file + * and corresponding refresh metadata file. + * Each metadata file contains a map where + * Key is - Segment local filename and + * Value is - local filename::uploaded filename::checksum + * @return Map of segment filename to uploaded filename with checksum + * @throws IOException if there were any failures in reading the metadata file + */ + private Map readLatestMetadataFile() throws IOException { + Map segmentMetadataMap = new HashMap<>(); + Collection commitMetadataFiles = remoteMetadataDirectory.listFilesByPrefix(MetadataFilenameUtils.COMMIT_METADATA_PREFIX); + MetadataFilenameUtils.MetadataFilenameComparator metadataFilenameComparator = + new MetadataFilenameUtils.MetadataFilenameComparator(); + Optional latestCommitMetadataFile = commitMetadataFiles.stream().max(metadataFilenameComparator); + + if (latestCommitMetadataFile.isPresent()) { + logger.info("Reading latest commit Metadata file {}", latestCommitMetadataFile.get()); + segmentMetadataMap = readMetadataFile(latestCommitMetadataFile.get()); + } else { + logger.info("No commit metadata file found"); + } + + Collection refreshMetadataFiles = remoteMetadataDirectory.listFilesByPrefix(MetadataFilenameUtils.REFRESH_METADATA_PREFIX); + Optional latestRefreshMetadataFile = refreshMetadataFiles.stream() + .filter( + file -> latestCommitMetadataFile.map( + s -> metadataFilenameComparator.comparePrimaryTermGeneration(file, s) == 0 + && MetadataFilenameUtils.getUuid(file).equals(MetadataFilenameUtils.getUuid(s)) + ).orElse(true) + ) + .max(metadataFilenameComparator); + + if (latestRefreshMetadataFile.isPresent()) { + logger.info("Reading latest refresh metadata file {}", latestRefreshMetadataFile.get()); + segmentMetadataMap.putAll(readMetadataFile(latestRefreshMetadataFile.get())); + this.refreshMetadataFileUniqueSuffix = MetadataFilenameUtils.getUuid(latestRefreshMetadataFile.get()); + } + return segmentMetadataMap; + } + + private Map readMetadataFile(String metadataFilename) throws IOException { + try (IndexInput indexInput = remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)) { + Map segmentMetadata = indexInput.readMapOfStrings(); + return segmentMetadata.entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> UploadedSegmentMetadata.fromString(entry.getValue()))); + } + } + + /** + * Metadata of a segment that is uploaded to remote segment store. + */ + static class UploadedSegmentMetadata { + private static final String SEPARATOR = "::"; + private final String originalFilename; + private final String uploadedFilename; + private final String checksum; + + UploadedSegmentMetadata(String originalFilename, String uploadedFilename, String checksum) { + this.originalFilename = originalFilename; + this.uploadedFilename = uploadedFilename; + this.checksum = checksum; + } + + @Override + public String toString() { + return String.join(SEPARATOR, originalFilename, uploadedFilename, checksum); + } + + public static UploadedSegmentMetadata fromString(String uploadedFilename) { + String[] values = uploadedFilename.split(SEPARATOR); + return new UploadedSegmentMetadata(values[0], values[1], values[2]); + } + } + + /** + * Contains utility methods that provide various parts of metadata filename along with comparator + * Each metadata filename is of format: PREFIX__PrimaryTerm__Generation__UUID + */ + static class MetadataFilenameUtils { + public static final String SEPARATOR = "__"; + public static final String COMMIT_METADATA_PREFIX = "commit_metadata"; + public static final String REFRESH_METADATA_PREFIX = "refresh_metadata"; + + /** + * Comparator to sort the metadata filenames. The order of sorting is: Primary Term, Generation, UUID + * Even though UUID sort does not provide any info on recency, it provides a consistent way to sort the filenames. + */ + static class MetadataFilenameComparator implements Comparator { + @Override + public int compare(String first, String second) { + if (!first.split(SEPARATOR)[0].equals(second.split(SEPARATOR)[0])) { + return first.split(SEPARATOR)[0].compareTo(second.split(SEPARATOR)[0]); + } + int fixedSuffixComparison = comparePrimaryTermGeneration(first, second); + if (fixedSuffixComparison == 0) { + return getUuid(first).compareTo(getUuid(second)); + } else { + return fixedSuffixComparison; + } + } + + public int comparePrimaryTermGeneration(String first, String second) { + long firstPrimaryTerm = getPrimaryTerm(first); + long secondPrimaryTerm = getPrimaryTerm(second); + if (firstPrimaryTerm != secondPrimaryTerm) { + return firstPrimaryTerm > secondPrimaryTerm ? 1 : -1; + } else { + long firstGeneration = getGeneration(first); + long secondGeneration = getGeneration(second); + if (firstGeneration != secondGeneration) { + return firstGeneration > secondGeneration ? 1 : -1; + } else { + return 0; + } + } + } + } + + public static String getMetadataFilename(String prefix, long primaryTerm, long generation) { + return getMetadataFilename(prefix, primaryTerm, generation, UUIDs.base64UUID()); + } + + public static String getMetadataFilename(String prefix, long primaryTerm, long generation, String uuid) { + return String.join(SEPARATOR, prefix, Long.toString(primaryTerm), Long.toString(generation, Character.MAX_RADIX), uuid); + } + + public static long getPrimaryTerm(String filename) { + return Long.parseLong(filename.split(SEPARATOR)[1]); + } + + public static long getGeneration(String filename) { + return Long.parseLong(filename.split(SEPARATOR)[2], Character.MAX_RADIX); + } + + public static String getUuid(String filename) { + return filename.split(SEPARATOR)[3]; + } + } + + /** + * Returns list of all the segment files uploaded to remote segment store till the last refresh checkpoint. + * Any segment file that is uploaded without corresponding refresh/commit file will not be visible as part of listAll(). + * We chose not to return cache entries for listAll as cache can have entries for stale segments as well. + * Even if we plan to delete stale segments from remote segment store, it will be a periodic operation. + * @return segment filenames stored in remote segment store + * @throws IOException if there were any failures in reading the metadata file + */ + @Override + public String[] listAll() throws IOException { + return readLatestMetadataFile().keySet().toArray(new String[0]); + } + + /** + * Delete segment file from remote segment store. + * @param name the name of an existing segment file in local filesystem. + * @throws IOException if the file exists but could not be deleted. + */ + @Override + public void deleteFile(String name) throws IOException { + String remoteFilename = getExistingRemoteFilename(name); + if (remoteFilename != null) { + remoteDataDirectory.deleteFile(remoteFilename); + segmentsUploadedToRemoteStore.remove(name); + } + } + + /** + * Returns the byte length of a segment file in the remote segment store. + * @param name the name of an existing segment file in local filesystem. + * @throws IOException in case of I/O error + * @throws NoSuchFileException if the file does not exist in the cache or remote segment store + */ + @Override + public long fileLength(String name) throws IOException { + String remoteFilename = getExistingRemoteFilename(name); + if (remoteFilename != null) { + return remoteDataDirectory.fileLength(remoteFilename); + } else { + throw new NoSuchFileException(name); + } + } + + /** + * Creates and returns a new instance of {@link RemoteIndexOutput} which will be used to copy files to the remote + * segment store. + * @param name the name of the file to create. + * @throws IOException in case of I/O error + */ + @Override + public IndexOutput createOutput(String name, IOContext context) throws IOException { + return remoteDataDirectory.createOutput(getNewRemoteSegmentFilename(name), context); + } + + /** + * Opens a stream for reading an existing file and returns {@link RemoteIndexInput} enclosing the stream. + * @param name the name of an existing file. + * @throws IOException in case of I/O error + * @throws NoSuchFileException if the file does not exist either in cache or remote segment store + */ + @Override + public IndexInput openInput(String name, IOContext context) throws IOException { + String remoteFilename = getExistingRemoteFilename(name); + if (remoteFilename != null) { + return remoteDataDirectory.openInput(remoteFilename, context); + } else { + throw new NoSuchFileException(name); + } + } + + /** + * Copies an existing src file from directory from to a non-existent file dest in this directory. + * Once the segment is uploaded to remote segment store, update the cache accordingly. + */ + @Override + public void copyFrom(Directory from, String src, String dest, IOContext context) throws IOException { + String remoteFilename = getNewRemoteSegmentFilename(dest); + remoteDataDirectory.copyFrom(from, src, remoteFilename, context); + String checksum = getChecksumOfLocalFile(from, src); + UploadedSegmentMetadata metadata = new UploadedSegmentMetadata(src, remoteFilename, checksum); + segmentsUploadedToRemoteStore.put(src, metadata); + } + + /** + * Checks if the file exists in the uploadedSegments cache and the checksum matches. + * It is important to match the checksum as the same segment filename can be used for different + * segments due to a concurrency issue. + * @param localFilename filename of segment stored in local filesystem + * @param checksum checksum of the segment file + * @return true if file exists in cache and checksum matches. + */ + public boolean containsFile(String localFilename, String checksum) { + return segmentsUploadedToRemoteStore.containsKey(localFilename) + && segmentsUploadedToRemoteStore.get(localFilename).checksum.equals(checksum); + } + + /** + * Upload commit metadata file + * @param committedFiles segment files that are part of the latest segments_N file + * @param storeDirectory instance of local directory to temporarily create metadata file before upload + * @param primaryTerm primary term to be used in the name of metadata file + * @param generation commit generation + * @throws IOException in case of I/O error while uploading the metadata file + */ + public void uploadCommitMetadata(Collection committedFiles, Directory storeDirectory, long primaryTerm, long generation) + throws IOException { + String commitFilename = MetadataFilenameUtils.getMetadataFilename( + MetadataFilenameUtils.COMMIT_METADATA_PREFIX, + primaryTerm, + generation + ); + uploadMetadataFile(committedFiles, storeDirectory, commitFilename); + this.refreshMetadataFileUniqueSuffix = MetadataFilenameUtils.getUuid(commitFilename); + } + + /** + * Upload commit metadata file + * @param refreshedFiles segment files that are created since last commit and part of the latest refresh + * @param storeDirectory instance of local directory to temporarily create metadata file before upload + * @param primaryTerm primary term to be used in the name of metadata file + * @param generation commit generation + * @throws IOException in case of I/O error while uploading the metadata file + */ + public void uploadRefreshMetadata(Collection refreshedFiles, Directory storeDirectory, long primaryTerm, long generation) + throws IOException { + String refreshFilename = MetadataFilenameUtils.getMetadataFilename( + MetadataFilenameUtils.REFRESH_METADATA_PREFIX, + primaryTerm, + generation, + this.refreshMetadataFileUniqueSuffix + ); + uploadMetadataFile(refreshedFiles, storeDirectory, refreshFilename); + } + + private void uploadMetadataFile(Collection files, Directory storeDirectory, String filename) throws IOException { + IndexOutput indexOutput = storeDirectory.createOutput(filename, IOContext.DEFAULT); + indexOutput.writeMapOfStrings( + segmentsUploadedToRemoteStore.entrySet() + .stream() + .filter(entry -> files.contains(entry.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().toString())) + ); + indexOutput.close(); + storeDirectory.sync(Collections.singleton(filename)); + remoteMetadataDirectory.copyFrom(storeDirectory, filename, filename, IOContext.DEFAULT); + storeDirectory.deleteFile(filename); + } + + private String getChecksumOfLocalFile(Directory directory, String file) throws IOException { + try (IndexInput indexInput = directory.openInput(file, IOContext.DEFAULT)) { + return Long.toString(CodecUtil.retrieveChecksum(indexInput)); + } + } + + private String getExistingRemoteFilename(String localFilename) { + if (segmentsUploadedToRemoteStore.containsKey(localFilename)) { + return segmentsUploadedToRemoteStore.get(localFilename).uploadedFilename; + } else { + return null; + } + } + + private String getNewRemoteSegmentFilename(String localFilename) { + return localFilename + SEGMENT_NAME_UUID_SEPARATOR + UUIDs.base64UUID(); + } + + private String getLocalSegmentFilename(String remoteFilename) { + return remoteFilename.split(SEGMENT_NAME_UUID_SEPARATOR)[0]; + } + + // Visible for testing + Map getSegmentsUploadedToRemoteStore() { + return this.segmentsUploadedToRemoteStore; + } +} diff --git a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java index 2ded77d2cecfd..97575248b4ad3 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java @@ -15,11 +15,13 @@ import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.support.PlainBlobMetadata; +import org.opensearch.common.collect.Set; import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; import java.io.InputStream; import java.nio.file.NoSuchFileException; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -67,6 +69,24 @@ public void testListAllException() throws IOException { assertThrows(IOException.class, () -> remoteDirectory.listAll()); } + public void testListFilesByPrefix() throws IOException { + Map fileNames = Stream.of("abc", "abd", "abe", "abf", "abg") + .collect(Collectors.toMap(filename -> filename, filename -> new PlainBlobMetadata(filename, 100))); + + when(blobContainer.listBlobsByPrefix("ab")).thenReturn(fileNames); + + Collection actualFileNames = remoteDirectory.listFilesByPrefix("ab"); + Collection expectedFileName = Set.of("abc", "abd", "abe", "abf", "abg"); + assertEquals(expectedFileName, actualFileNames); + } + + public void testListFilesByPrefixException() throws IOException { + when(blobContainer.listBlobsByPrefix("abc")).thenThrow(new IOException("Error reading blob store")); + + assertThrows(IOException.class, () -> remoteDirectory.listFilesByPrefix("abc")); + verify(blobContainer).listBlobsByPrefix("abc"); + } + public void testDeleteFile() throws IOException { remoteDirectory.deleteFile("segment_1"); diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java new file mode 100644 index 0000000000000..9fa5b62647a91 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -0,0 +1,390 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.tests.util.LuceneTestCase; +import org.junit.Before; +import org.opensearch.common.collect.Set; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.nio.file.NoSuchFileException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.startsWith; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class RemoteSegmentStoreDirectoryTests extends OpenSearchTestCase { + private RemoteDirectory remoteDataDirectory; + private RemoteDirectory remoteMetadataDirectory; + + private RemoteSegmentStoreDirectory remoteSegmentStoreDirectory; + + @Before + public void setup() throws IOException { + remoteDataDirectory = mock(RemoteDirectory.class); + remoteMetadataDirectory = mock(RemoteDirectory.class); + + remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory(remoteDataDirectory, remoteMetadataDirectory); + } + + public void testUploadedSegmentMetadataToString() { + RemoteSegmentStoreDirectory.UploadedSegmentMetadata metadata = new RemoteSegmentStoreDirectory.UploadedSegmentMetadata( + "abc", + "pqr", + "123456" + ); + assertEquals("abc::pqr::123456", metadata.toString()); + } + + public void testUploadedSegmentMetadataFromString() { + RemoteSegmentStoreDirectory.UploadedSegmentMetadata metadata = RemoteSegmentStoreDirectory.UploadedSegmentMetadata.fromString( + "_0.cfe::_0.cfe__uuidxyz::4567" + ); + assertEquals("_0.cfe::_0.cfe__uuidxyz::4567", metadata.toString()); + } + + public void testGetMetadataFilename() { + // Generation 23 is replaced by n due to radix 32 + assertTrue(RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename("abc", 12, 23).startsWith("abc__12__n__")); + assertEquals( + "abc__12__n__uuid_xyz", + RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename("abc", 12, 23, "uuid_xyz") + ); + } + + public void testGetPrimaryTermGenerationUuid() { + assertEquals(12, RemoteSegmentStoreDirectory.MetadataFilenameUtils.getPrimaryTerm("abc__12__n__uuid_xyz")); + assertEquals(23, RemoteSegmentStoreDirectory.MetadataFilenameUtils.getGeneration("abc__12__n__uuid_xyz")); + assertEquals("uuid_xyz", RemoteSegmentStoreDirectory.MetadataFilenameUtils.getUuid("abc__12__n__uuid_xyz")); + } + + public void testMetadataFilenameComparator() { + List metadataFilenames = new ArrayList<>( + List.of( + "abc__10__20__uuid1", + "abc__12__2__uuid2", + "pqr__1__1__uuid0", + "abc__3__n__uuid3", + "abc__10__8__uuid8", + "abc__3__a__uuid4", + "abc__3__a__uuid5" + ) + ); + RemoteSegmentStoreDirectory.MetadataFilenameUtils.MetadataFilenameComparator metadataFilenameComparator = + new RemoteSegmentStoreDirectory.MetadataFilenameUtils.MetadataFilenameComparator(); + metadataFilenames.sort(metadataFilenameComparator); + assertEquals( + List.of( + "abc__3__a__uuid4", + "abc__3__a__uuid5", + "abc__3__n__uuid3", + "abc__10__8__uuid8", + "abc__10__20__uuid1", + "abc__12__2__uuid2", + "pqr__1__1__uuid0" + ), + metadataFilenames + ); + } + + public void testInitException() throws IOException { + when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.COMMIT_METADATA_PREFIX)) + .thenReturn(List.of()); + when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.REFRESH_METADATA_PREFIX)) + .thenThrow(new IOException("Error")); + + assertThrows(IOException.class, () -> remoteSegmentStoreDirectory.init()); + } + + public void testInitNoMetadataFile() throws IOException { + when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.COMMIT_METADATA_PREFIX)) + .thenReturn(List.of()); + when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.REFRESH_METADATA_PREFIX)) + .thenReturn(List.of()); + + remoteSegmentStoreDirectory.init(); + Map actualCache = remoteSegmentStoreDirectory + .getSegmentsUploadedToRemoteStore(); + + assertEquals(Set.of(), actualCache.keySet()); + } + + private Map getDummyMetadata(String prefix, int commitGeneration) { + Map metadata = new HashMap<>(); + metadata.put(prefix + ".cfe", prefix + ".cfe::" + prefix + ".cfe__qrt::" + randomIntBetween(1000, 5000)); + metadata.put(prefix + ".cfs", prefix + ".cfs::" + prefix + ".cfs__zxd::" + randomIntBetween(1000, 5000)); + metadata.put(prefix + ".si", prefix + ".si::" + prefix + ".si__yui::" + randomIntBetween(1000, 5000)); + metadata.put( + "segments_" + commitGeneration, + "segments_" + commitGeneration + "::segments_" + commitGeneration + "__exv::" + randomIntBetween(1000, 5000) + ); + return metadata; + } + + private void populateCommitMetadata() throws IOException { + List commitFiles = List.of("commit_metadata__1__5__abc", "commit_metadata__1__6__pqr", "commit_metadata__2__1__zxv"); + when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.COMMIT_METADATA_PREFIX)) + .thenReturn(commitFiles); + + IndexInput indexInput = mock(IndexInput.class); + Map commitMetadata = getDummyMetadata("_0", 1); + when(indexInput.readMapOfStrings()).thenReturn(commitMetadata); + when(remoteMetadataDirectory.openInput("commit_metadata__2__1__zxv", IOContext.DEFAULT)).thenReturn(indexInput); + } + + private void populateRefreshMetadata() throws IOException { + List refreshedFiles = List.of("refresh_metadata__1__5__abc", "refresh_metadata__1__6__pqr", "refresh_metadata__2__1__zxv"); + when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.REFRESH_METADATA_PREFIX)) + .thenReturn(refreshedFiles); + + IndexInput indexInput = mock(IndexInput.class); + Map refreshMetadata = getDummyMetadata("_1", 2); + when(indexInput.readMapOfStrings()).thenReturn(refreshMetadata); + when(remoteMetadataDirectory.openInput("refresh_metadata__2__1__zxv", IOContext.DEFAULT)).thenReturn(indexInput); + } + + public void testInitOnlyCommit() throws IOException { + populateCommitMetadata(); + + when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.REFRESH_METADATA_PREFIX)) + .thenReturn(List.of()); + + remoteSegmentStoreDirectory.init(); + + Map actualCache = remoteSegmentStoreDirectory + .getSegmentsUploadedToRemoteStore(); + + assertEquals(Set.of("_0.cfe", "_0.cfs", "_0.si", "segments_1"), actualCache.keySet()); + } + + public void testInitOnlyRefresh() throws IOException { + when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.COMMIT_METADATA_PREFIX)) + .thenReturn(List.of()); + + populateRefreshMetadata(); + + remoteSegmentStoreDirectory.init(); + + Map actualCache = remoteSegmentStoreDirectory + .getSegmentsUploadedToRemoteStore(); + + assertEquals(Set.of("_1.cfe", "_1.cfs", "_1.si", "segments_2"), actualCache.keySet()); + } + + public void testInitBothCommitAndRefresh() throws IOException { + populateCommitMetadata(); + populateRefreshMetadata(); + + remoteSegmentStoreDirectory.init(); + + Map actualCache = remoteSegmentStoreDirectory + .getSegmentsUploadedToRemoteStore(); + + assertEquals(Set.of("_0.cfe", "_0.cfs", "_0.si", "segments_1", "_1.cfe", "_1.cfs", "_1.si", "segments_2"), actualCache.keySet()); + } + + public void testListAll() throws IOException { + populateCommitMetadata(); + populateRefreshMetadata(); + + assertEquals( + Set.of("_0.cfe", "_0.cfs", "_0.si", "segments_1", "_1.cfe", "_1.cfs", "_1.si", "segments_2"), + Set.of(remoteSegmentStoreDirectory.listAll()) + ); + } + + public void testDeleteFile() throws IOException { + populateCommitMetadata(); + remoteSegmentStoreDirectory.init(); + + Map uploadedSegments = remoteSegmentStoreDirectory + .getSegmentsUploadedToRemoteStore(); + + assertTrue(uploadedSegments.containsKey("_0.si")); + assertFalse(uploadedSegments.containsKey("_100.si")); + + remoteSegmentStoreDirectory.deleteFile("_0.si"); + remoteSegmentStoreDirectory.deleteFile("_100.si"); + + verify(remoteDataDirectory).deleteFile(startsWith("_0.si")); + verify(remoteDataDirectory, times(0)).deleteFile(startsWith("_100.si")); + assertFalse(uploadedSegments.containsKey("_0.si")); + } + + public void testDeleteFileException() throws IOException { + populateCommitMetadata(); + remoteSegmentStoreDirectory.init(); + + doThrow(new IOException("Error")).when(remoteDataDirectory).deleteFile(any()); + assertThrows(IOException.class, () -> remoteSegmentStoreDirectory.deleteFile("_0.si")); + } + + public void testFileLenght() throws IOException { + populateCommitMetadata(); + remoteSegmentStoreDirectory.init(); + + Map uploadedSegments = remoteSegmentStoreDirectory + .getSegmentsUploadedToRemoteStore(); + + assertTrue(uploadedSegments.containsKey("_0.si")); + + when(remoteDataDirectory.fileLength(startsWith("_0.si"))).thenReturn(1234L); + + assertEquals(1234L, remoteSegmentStoreDirectory.fileLength("_0.si")); + } + + public void testFileLenghtNoSuchFile() throws IOException { + populateCommitMetadata(); + remoteSegmentStoreDirectory.init(); + + Map uploadedSegments = remoteSegmentStoreDirectory + .getSegmentsUploadedToRemoteStore(); + + assertFalse(uploadedSegments.containsKey("_100.si")); + assertThrows(NoSuchFileException.class, () -> remoteSegmentStoreDirectory.fileLength("_100.si")); + } + + public void testCreateOutput() throws IOException { + IndexOutput indexOutput = mock(IndexOutput.class); + when(remoteDataDirectory.createOutput(startsWith("abc"), eq(IOContext.DEFAULT))).thenReturn(indexOutput); + + assertEquals(indexOutput, remoteSegmentStoreDirectory.createOutput("abc", IOContext.DEFAULT)); + } + + public void testCreateOutputException() { + when(remoteDataDirectory.createOutput(startsWith("abc"), eq(IOContext.DEFAULT))).thenThrow(new IOException("Error")); + + assertThrows(IOException.class, () -> remoteSegmentStoreDirectory.createOutput("abc", IOContext.DEFAULT)); + } + + public void testOpenInput() throws IOException { + populateCommitMetadata(); + remoteSegmentStoreDirectory.init(); + + IndexInput indexInput = mock(IndexInput.class); + when(remoteDataDirectory.openInput(startsWith("_0.si"), eq(IOContext.DEFAULT))).thenReturn(indexInput); + + assertEquals(indexInput, remoteSegmentStoreDirectory.openInput("_0.si", IOContext.DEFAULT)); + } + + public void testOpenInputNoSuchFile() { + assertThrows(NoSuchFileException.class, () -> remoteSegmentStoreDirectory.openInput("_0.si", IOContext.DEFAULT)); + } + + public void testOpenInputException() throws IOException { + populateCommitMetadata(); + remoteSegmentStoreDirectory.init(); + + when(remoteDataDirectory.openInput(startsWith("_0.si"), eq(IOContext.DEFAULT))).thenThrow(new IOException("Error")); + + assertThrows(IOException.class, () -> remoteSegmentStoreDirectory.openInput("_0.si", IOContext.DEFAULT)); + } + + public void testCopyFrom() throws IOException { + String filename = "_100.si"; + populateCommitMetadata(); + remoteSegmentStoreDirectory.init(); + + Directory storeDirectory = LuceneTestCase.newDirectory(); + IndexOutput indexOutput = storeDirectory.createOutput(filename, IOContext.DEFAULT); + indexOutput.writeString("Hello World!"); + CodecUtil.writeFooter(indexOutput); + indexOutput.close(); + storeDirectory.sync(List.of(filename)); + + assertFalse(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename)); + remoteSegmentStoreDirectory.copyFrom(storeDirectory, filename, filename, IOContext.DEFAULT); + assertTrue(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename)); + + storeDirectory.close(); + } + + public void testCopyFromException() throws IOException { + String filename = "_100.si"; + Directory storeDirectory = LuceneTestCase.newDirectory(); + assertFalse(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename)); + doThrow(new IOException("Error")).when(remoteDataDirectory).copyFrom(storeDirectory, filename, filename, IOContext.DEFAULT); + + assertThrows(IOException.class, () -> remoteSegmentStoreDirectory.copyFrom(storeDirectory, filename, filename, IOContext.DEFAULT)); + assertFalse(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename)); + + storeDirectory.close(); + } + + public void testContainsFile() throws IOException { + populateCommitMetadata(); + remoteSegmentStoreDirectory.init(); + + // This is not the correct way to add files but the other way is to open up access to fields in UploadedSegmentMetadata + Map uploadedSegmentMetadataMap = remoteSegmentStoreDirectory + .getSegmentsUploadedToRemoteStore(); + uploadedSegmentMetadataMap.put( + "_100.si", + new RemoteSegmentStoreDirectory.UploadedSegmentMetadata("_100.si", "_100.si__uuid1", "1234") + ); + + assertTrue(remoteSegmentStoreDirectory.containsFile("_100.si", "1234")); + assertFalse(remoteSegmentStoreDirectory.containsFile("_100.si", "2345")); + assertFalse(remoteSegmentStoreDirectory.containsFile("_200.si", "1234")); + } + + public void testUploadCommitMetadataEmpty() throws IOException { + Directory storeDirectory = mock(Directory.class); + IndexOutput indexOutput = mock(IndexOutput.class); + when(storeDirectory.createOutput(startsWith("commit_metadata__12__o"), eq(IOContext.DEFAULT))).thenReturn(indexOutput); + + Collection commitFiles = List.of("s1", "s2", "s3"); + remoteSegmentStoreDirectory.uploadCommitMetadata(commitFiles, storeDirectory, 12L, 24L); + + verify(remoteMetadataDirectory).copyFrom( + eq(storeDirectory), + startsWith("commit_metadata__12__o"), + startsWith("commit_metadata__12__o"), + eq(IOContext.DEFAULT) + ); + verify(indexOutput).writeMapOfStrings(Map.of()); + } + + public void testUploadRefreshMetadataNonEmpty() throws IOException { + populateCommitMetadata(); + remoteSegmentStoreDirectory.init(); + + Directory storeDirectory = mock(Directory.class); + IndexOutput indexOutput = mock(IndexOutput.class); + when(storeDirectory.createOutput(startsWith("refresh_metadata__12__o"), eq(IOContext.DEFAULT))).thenReturn(indexOutput); + + Collection commitFiles = List.of("_0.si"); + remoteSegmentStoreDirectory.uploadRefreshMetadata(commitFiles, storeDirectory, 12L, 24L); + + verify(remoteMetadataDirectory).copyFrom( + eq(storeDirectory), + startsWith("refresh_metadata__12__o"), + startsWith("refresh_metadata__12__o"), + eq(IOContext.DEFAULT) + ); + String metadataString = remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().get("_0.si").toString(); + verify(indexOutput).writeMapOfStrings(Map.of("_0.si", metadataString)); + } +} From 02bc44dac4ca24391c1c31bf5d807b65583a06d1 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Sun, 31 Jul 2022 21:45:29 +0530 Subject: [PATCH 2/3] Use one type of metadata file Signed-off-by: Sachin Kale --- .../index/store/RemoteDirectory.java | 2 +- .../store/RemoteSegmentStoreDirectory.java | 176 +++++++----------- .../RemoteSegmentStoreDirectoryTests.java | 145 +++++---------- 3 files changed, 113 insertions(+), 210 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index 35c173da91183..62e2b12896411 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -54,7 +54,7 @@ public String[] listAll() throws IOException { * Returns names of files with given prefix in this directory. * @param filenamePrefix The prefix to match against file names in the directory * @return A list of the matching filenames in the directory - * @throws IOException IOException if there were any failures in reading from the blob container + * @throws IOException if there were any failures in reading from the blob container */ public Collection listFilesByPrefix(String filenamePrefix) throws IOException { return blobContainer.listBlobsByPrefix(filenamePrefix).keySet(); diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 9db7efe9891b8..6810535692c0a 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -35,7 +35,7 @@ * a unique suffix is added to the uploaded segment file. This class keeps track of filename of segments stored * in remote segment store vs filename in local filesystem and provides the consistent Directory interface so that * caller will be accessing segment files in the same way as {@code FSDirectory}. Apart from storing actual segment files, - * remote segment store also keeps track of refresh and commit checkpoints in a separate path which is handled by + * remote segment store also keeps track of refresh checkpoints as metadata in a separate path which is handled by * another instance of {@code RemoteDirectory}. * @opensearch.internal */ @@ -46,6 +46,9 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory { */ public static final String SEGMENT_NAME_UUID_SEPARATOR = "__"; + public static final MetadataFilenameUtils.MetadataFilenameComparator METADATA_FILENAME_COMPARATOR = + new MetadataFilenameUtils.MetadataFilenameComparator(); + /** * remoteDataDirectory is used to store segment files at path: cluster_UUID/index_UUID/shardId/segments/data */ @@ -57,9 +60,10 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory { /** * To prevent explosion of refresh metadata files, we replace refresh files for the given primary term and generation - * This is achieved by uploading refresh metadata file with the same suffix as last commit metadata file. + * This is achieved by uploading refresh metadata file with the same UUID suffix. */ - private String refreshMetadataFileUniqueSuffix; + private String metadataFileUniqueSuffix; + /** * Keeps track of local segment filename to uploaded filename along with other attributes like checksum. * This map acts as a cache layer for uploaded segment filenames which helps avoid calling listAll() each time. @@ -84,19 +88,16 @@ public RemoteSegmentStoreDirectory(RemoteDirectory remoteDataDirectory, RemoteDi * @throws IOException if there were any failures in reading the metadata file */ public void init() throws IOException { - this.refreshMetadataFileUniqueSuffix = UUIDs.base64UUID(); + this.metadataFileUniqueSuffix = UUIDs.base64UUID(); this.segmentsUploadedToRemoteStore = new ConcurrentHashMap<>(readLatestMetadataFile()); } /** - * Read latest metadata file to get the list of segments uploaded to the remote segment store. - * We upload a separate metadata file per commit, commit_metadata__PrimaryTerm__Generation__UUID - * We also keep a metadata file per refresh, but it is not unique per refresh. Refresh metadata file is unique for a given commit. - * The format of refresh metadata filename is: refresh_metadata__PrimaryTerm__Generation__LastCommitUUID - * Commit metadata files keep track of all the segments of the given shard that are part of the commit. - * Refresh metadata files keep track of segments that were created since the last commit. - * In order to get the list of segment files uploaded to the remote segment store, we need to read the latest commit metadata file - * and corresponding refresh metadata file. + * Read the latest metadata file to get the list of segments uploaded to the remote segment store. + * We upload a metadata file per refresh, but it is not unique per refresh. Refresh metadata file is unique for a given commit. + * The format of refresh metadata filename is: refresh_metadata__PrimaryTerm__Generation__UUID + * Refresh metadata files keep track of active segments for the shard at the time of refresh. + * In order to get the list of segment files uploaded to the remote segment store, we need to read the latest metadata file. * Each metadata file contains a map where * Key is - Segment local filename and * Value is - local filename::uploaded filename::checksum @@ -105,33 +106,17 @@ public void init() throws IOException { */ private Map readLatestMetadataFile() throws IOException { Map segmentMetadataMap = new HashMap<>(); - Collection commitMetadataFiles = remoteMetadataDirectory.listFilesByPrefix(MetadataFilenameUtils.COMMIT_METADATA_PREFIX); - MetadataFilenameUtils.MetadataFilenameComparator metadataFilenameComparator = - new MetadataFilenameUtils.MetadataFilenameComparator(); - Optional latestCommitMetadataFile = commitMetadataFiles.stream().max(metadataFilenameComparator); - - if (latestCommitMetadataFile.isPresent()) { - logger.info("Reading latest commit Metadata file {}", latestCommitMetadataFile.get()); - segmentMetadataMap = readMetadataFile(latestCommitMetadataFile.get()); + + Collection metadataFiles = remoteMetadataDirectory.listFilesByPrefix(MetadataFilenameUtils.METADATA_PREFIX); + Optional latestMetadataFile = metadataFiles.stream().max(METADATA_FILENAME_COMPARATOR); + + if (latestMetadataFile.isPresent()) { + logger.info("Reading latest Metadata file {}", latestMetadataFile.get()); + segmentMetadataMap = readMetadataFile(latestMetadataFile.get()); } else { - logger.info("No commit metadata file found"); + logger.info("No metadata file found"); } - Collection refreshMetadataFiles = remoteMetadataDirectory.listFilesByPrefix(MetadataFilenameUtils.REFRESH_METADATA_PREFIX); - Optional latestRefreshMetadataFile = refreshMetadataFiles.stream() - .filter( - file -> latestCommitMetadataFile.map( - s -> metadataFilenameComparator.comparePrimaryTermGeneration(file, s) == 0 - && MetadataFilenameUtils.getUuid(file).equals(MetadataFilenameUtils.getUuid(s)) - ).orElse(true) - ) - .max(metadataFilenameComparator); - - if (latestRefreshMetadataFile.isPresent()) { - logger.info("Reading latest refresh metadata file {}", latestRefreshMetadataFile.get()); - segmentMetadataMap.putAll(readMetadataFile(latestRefreshMetadataFile.get())); - this.refreshMetadataFileUniqueSuffix = MetadataFilenameUtils.getUuid(latestRefreshMetadataFile.get()); - } return segmentMetadataMap; } @@ -176,8 +161,7 @@ public static UploadedSegmentMetadata fromString(String uploadedFilename) { */ static class MetadataFilenameUtils { public static final String SEPARATOR = "__"; - public static final String COMMIT_METADATA_PREFIX = "commit_metadata"; - public static final String REFRESH_METADATA_PREFIX = "refresh_metadata"; + public static final String METADATA_PREFIX = "metadata"; /** * Comparator to sort the metadata filenames. The order of sorting is: Primary Term, Generation, UUID @@ -186,58 +170,53 @@ static class MetadataFilenameUtils { static class MetadataFilenameComparator implements Comparator { @Override public int compare(String first, String second) { - if (!first.split(SEPARATOR)[0].equals(second.split(SEPARATOR)[0])) { - return first.split(SEPARATOR)[0].compareTo(second.split(SEPARATOR)[0]); - } - int fixedSuffixComparison = comparePrimaryTermGeneration(first, second); - if (fixedSuffixComparison == 0) { - return getUuid(first).compareTo(getUuid(second)); - } else { - return fixedSuffixComparison; + String[] firstTokens = first.split(SEPARATOR); + String[] secondTokens = second.split(SEPARATOR); + if (!firstTokens[0].equals(secondTokens[0])) { + return firstTokens[0].compareTo(secondTokens[0]); } - } - - public int comparePrimaryTermGeneration(String first, String second) { - long firstPrimaryTerm = getPrimaryTerm(first); - long secondPrimaryTerm = getPrimaryTerm(second); + long firstPrimaryTerm = getPrimaryTerm(firstTokens); + long secondPrimaryTerm = getPrimaryTerm(secondTokens); if (firstPrimaryTerm != secondPrimaryTerm) { return firstPrimaryTerm > secondPrimaryTerm ? 1 : -1; } else { - long firstGeneration = getGeneration(first); - long secondGeneration = getGeneration(second); + long firstGeneration = getGeneration(firstTokens); + long secondGeneration = getGeneration(secondTokens); if (firstGeneration != secondGeneration) { return firstGeneration > secondGeneration ? 1 : -1; } else { - return 0; + return getUuid(firstTokens).compareTo(getUuid(secondTokens)); } } } } - public static String getMetadataFilename(String prefix, long primaryTerm, long generation) { - return getMetadataFilename(prefix, primaryTerm, generation, UUIDs.base64UUID()); - } - - public static String getMetadataFilename(String prefix, long primaryTerm, long generation, String uuid) { - return String.join(SEPARATOR, prefix, Long.toString(primaryTerm), Long.toString(generation, Character.MAX_RADIX), uuid); + public static String getMetadataFilename(long primaryTerm, long generation, String uuid) { + return String.join( + SEPARATOR, + METADATA_PREFIX, + Long.toString(primaryTerm), + Long.toString(generation, Character.MAX_RADIX), + uuid + ); } - public static long getPrimaryTerm(String filename) { - return Long.parseLong(filename.split(SEPARATOR)[1]); + public static long getPrimaryTerm(String[] filenameTokens) { + return Long.parseLong(filenameTokens[1]); } - public static long getGeneration(String filename) { - return Long.parseLong(filename.split(SEPARATOR)[2], Character.MAX_RADIX); + public static long getGeneration(String[] filenameTokens) { + return Long.parseLong(filenameTokens[2], Character.MAX_RADIX); } - public static String getUuid(String filename) { - return filename.split(SEPARATOR)[3]; + public static String getUuid(String[] filenameTokens) { + return filenameTokens[3]; } } /** * Returns list of all the segment files uploaded to remote segment store till the last refresh checkpoint. - * Any segment file that is uploaded without corresponding refresh/commit file will not be visible as part of listAll(). + * Any segment file that is uploaded without corresponding metadata file will not be visible as part of listAll(). * We chose not to return cache entries for listAll as cache can have entries for stale segments as well. * Even if we plan to delete stale segments from remote segment store, it will be a periodic operation. * @return segment filenames stored in remote segment store @@ -314,8 +293,8 @@ public void copyFrom(Directory from, String src, String dest, IOContext context) String remoteFilename = getNewRemoteSegmentFilename(dest); remoteDataDirectory.copyFrom(from, src, remoteFilename, context); String checksum = getChecksumOfLocalFile(from, src); - UploadedSegmentMetadata metadata = new UploadedSegmentMetadata(src, remoteFilename, checksum); - segmentsUploadedToRemoteStore.put(src, metadata); + UploadedSegmentMetadata segmentMetadata = new UploadedSegmentMetadata(src, remoteFilename, checksum); + segmentsUploadedToRemoteStore.put(src, segmentMetadata); } /** @@ -332,55 +311,30 @@ public boolean containsFile(String localFilename, String checksum) { } /** - * Upload commit metadata file - * @param committedFiles segment files that are part of the latest segments_N file + * Upload metadata file + * @param segmentFiles segment files that are part of the shard at the time of the latest refresh * @param storeDirectory instance of local directory to temporarily create metadata file before upload * @param primaryTerm primary term to be used in the name of metadata file * @param generation commit generation * @throws IOException in case of I/O error while uploading the metadata file */ - public void uploadCommitMetadata(Collection committedFiles, Directory storeDirectory, long primaryTerm, long generation) + public void uploadMetadata(Collection segmentFiles, Directory storeDirectory, long primaryTerm, long generation) throws IOException { - String commitFilename = MetadataFilenameUtils.getMetadataFilename( - MetadataFilenameUtils.COMMIT_METADATA_PREFIX, - primaryTerm, - generation - ); - uploadMetadataFile(committedFiles, storeDirectory, commitFilename); - this.refreshMetadataFileUniqueSuffix = MetadataFilenameUtils.getUuid(commitFilename); - } - - /** - * Upload commit metadata file - * @param refreshedFiles segment files that are created since last commit and part of the latest refresh - * @param storeDirectory instance of local directory to temporarily create metadata file before upload - * @param primaryTerm primary term to be used in the name of metadata file - * @param generation commit generation - * @throws IOException in case of I/O error while uploading the metadata file - */ - public void uploadRefreshMetadata(Collection refreshedFiles, Directory storeDirectory, long primaryTerm, long generation) - throws IOException { - String refreshFilename = MetadataFilenameUtils.getMetadataFilename( - MetadataFilenameUtils.REFRESH_METADATA_PREFIX, - primaryTerm, - generation, - this.refreshMetadataFileUniqueSuffix - ); - uploadMetadataFile(refreshedFiles, storeDirectory, refreshFilename); - } - - private void uploadMetadataFile(Collection files, Directory storeDirectory, String filename) throws IOException { - IndexOutput indexOutput = storeDirectory.createOutput(filename, IOContext.DEFAULT); - indexOutput.writeMapOfStrings( - segmentsUploadedToRemoteStore.entrySet() - .stream() - .filter(entry -> files.contains(entry.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().toString())) - ); + String metadataFilename = MetadataFilenameUtils.getMetadataFilename(primaryTerm, generation, this.metadataFileUniqueSuffix); + IndexOutput indexOutput = storeDirectory.createOutput(metadataFilename, IOContext.DEFAULT); + Map uploadedSegments = new HashMap<>(); + for (String file : segmentFiles) { + if (segmentsUploadedToRemoteStore.containsKey(file)) { + uploadedSegments.put(file, segmentsUploadedToRemoteStore.get(file).toString()); + } else { + throw new NoSuchFileException(file); + } + } + indexOutput.writeMapOfStrings(uploadedSegments); indexOutput.close(); - storeDirectory.sync(Collections.singleton(filename)); - remoteMetadataDirectory.copyFrom(storeDirectory, filename, filename, IOContext.DEFAULT); - storeDirectory.deleteFile(filename); + storeDirectory.sync(Collections.singleton(metadataFilename)); + remoteMetadataDirectory.copyFrom(storeDirectory, metadataFilename, metadataFilename, IOContext.DEFAULT); + storeDirectory.deleteFile(metadataFilename); } private String getChecksumOfLocalFile(Directory directory, String file) throws IOException { diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 9fa5b62647a91..4eabfa74625f2 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -67,17 +67,17 @@ public void testUploadedSegmentMetadataFromString() { public void testGetMetadataFilename() { // Generation 23 is replaced by n due to radix 32 - assertTrue(RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename("abc", 12, 23).startsWith("abc__12__n__")); assertEquals( - "abc__12__n__uuid_xyz", - RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename("abc", 12, 23, "uuid_xyz") + RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX + "__12__n__uuid1", + RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(12, 23, "uuid1") ); } public void testGetPrimaryTermGenerationUuid() { - assertEquals(12, RemoteSegmentStoreDirectory.MetadataFilenameUtils.getPrimaryTerm("abc__12__n__uuid_xyz")); - assertEquals(23, RemoteSegmentStoreDirectory.MetadataFilenameUtils.getGeneration("abc__12__n__uuid_xyz")); - assertEquals("uuid_xyz", RemoteSegmentStoreDirectory.MetadataFilenameUtils.getUuid("abc__12__n__uuid_xyz")); + String[] filenameTokens = "abc__12__n__uuid_xyz".split(RemoteSegmentStoreDirectory.MetadataFilenameUtils.SEPARATOR); + assertEquals(12, RemoteSegmentStoreDirectory.MetadataFilenameUtils.getPrimaryTerm(filenameTokens)); + assertEquals(23, RemoteSegmentStoreDirectory.MetadataFilenameUtils.getGeneration(filenameTokens)); + assertEquals("uuid_xyz", RemoteSegmentStoreDirectory.MetadataFilenameUtils.getUuid(filenameTokens)); } public void testMetadataFilenameComparator() { @@ -92,9 +92,7 @@ public void testMetadataFilenameComparator() { "abc__3__a__uuid5" ) ); - RemoteSegmentStoreDirectory.MetadataFilenameUtils.MetadataFilenameComparator metadataFilenameComparator = - new RemoteSegmentStoreDirectory.MetadataFilenameUtils.MetadataFilenameComparator(); - metadataFilenames.sort(metadataFilenameComparator); + metadataFilenames.sort(RemoteSegmentStoreDirectory.METADATA_FILENAME_COMPARATOR); assertEquals( List.of( "abc__3__a__uuid4", @@ -110,19 +108,17 @@ public void testMetadataFilenameComparator() { } public void testInitException() throws IOException { - when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.COMMIT_METADATA_PREFIX)) - .thenReturn(List.of()); - when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.REFRESH_METADATA_PREFIX)) - .thenThrow(new IOException("Error")); + when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenThrow( + new IOException("Error") + ); assertThrows(IOException.class, () -> remoteSegmentStoreDirectory.init()); } public void testInitNoMetadataFile() throws IOException { - when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.COMMIT_METADATA_PREFIX)) - .thenReturn(List.of()); - when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.REFRESH_METADATA_PREFIX)) - .thenReturn(List.of()); + when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenReturn( + List.of() + ); remoteSegmentStoreDirectory.init(); Map actualCache = remoteSegmentStoreDirectory @@ -143,33 +139,24 @@ private Map getDummyMetadata(String prefix, int commitGeneration return metadata; } - private void populateCommitMetadata() throws IOException { - List commitFiles = List.of("commit_metadata__1__5__abc", "commit_metadata__1__6__pqr", "commit_metadata__2__1__zxv"); - when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.COMMIT_METADATA_PREFIX)) - .thenReturn(commitFiles); - - IndexInput indexInput = mock(IndexInput.class); - Map commitMetadata = getDummyMetadata("_0", 1); - when(indexInput.readMapOfStrings()).thenReturn(commitMetadata); - when(remoteMetadataDirectory.openInput("commit_metadata__2__1__zxv", IOContext.DEFAULT)).thenReturn(indexInput); - } - - private void populateRefreshMetadata() throws IOException { - List refreshedFiles = List.of("refresh_metadata__1__5__abc", "refresh_metadata__1__6__pqr", "refresh_metadata__2__1__zxv"); - when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.REFRESH_METADATA_PREFIX)) - .thenReturn(refreshedFiles); + private void populateMetadata() throws IOException { + List metadataFiles = List.of("metadata__1__5__abc", "metadata__1__6__pqr", "metadata__2__1__zxv"); + when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenReturn( + metadataFiles + ); IndexInput indexInput = mock(IndexInput.class); - Map refreshMetadata = getDummyMetadata("_1", 2); - when(indexInput.readMapOfStrings()).thenReturn(refreshMetadata); - when(remoteMetadataDirectory.openInput("refresh_metadata__2__1__zxv", IOContext.DEFAULT)).thenReturn(indexInput); + Map dummyMetadata = getDummyMetadata("_0", 1); + when(indexInput.readMapOfStrings()).thenReturn(dummyMetadata); + when(remoteMetadataDirectory.openInput("metadata__2__1__zxv", IOContext.DEFAULT)).thenReturn(indexInput); } - public void testInitOnlyCommit() throws IOException { - populateCommitMetadata(); + public void testInit() throws IOException { + populateMetadata(); - when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.REFRESH_METADATA_PREFIX)) - .thenReturn(List.of()); + when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenReturn( + List.of("metadata__1__5__abc", "metadata__1__6__pqr", "metadata__2__1__zxv") + ); remoteSegmentStoreDirectory.init(); @@ -179,44 +166,14 @@ public void testInitOnlyCommit() throws IOException { assertEquals(Set.of("_0.cfe", "_0.cfs", "_0.si", "segments_1"), actualCache.keySet()); } - public void testInitOnlyRefresh() throws IOException { - when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.COMMIT_METADATA_PREFIX)) - .thenReturn(List.of()); - - populateRefreshMetadata(); - - remoteSegmentStoreDirectory.init(); - - Map actualCache = remoteSegmentStoreDirectory - .getSegmentsUploadedToRemoteStore(); - - assertEquals(Set.of("_1.cfe", "_1.cfs", "_1.si", "segments_2"), actualCache.keySet()); - } - - public void testInitBothCommitAndRefresh() throws IOException { - populateCommitMetadata(); - populateRefreshMetadata(); - - remoteSegmentStoreDirectory.init(); - - Map actualCache = remoteSegmentStoreDirectory - .getSegmentsUploadedToRemoteStore(); - - assertEquals(Set.of("_0.cfe", "_0.cfs", "_0.si", "segments_1", "_1.cfe", "_1.cfs", "_1.si", "segments_2"), actualCache.keySet()); - } - public void testListAll() throws IOException { - populateCommitMetadata(); - populateRefreshMetadata(); + populateMetadata(); - assertEquals( - Set.of("_0.cfe", "_0.cfs", "_0.si", "segments_1", "_1.cfe", "_1.cfs", "_1.si", "segments_2"), - Set.of(remoteSegmentStoreDirectory.listAll()) - ); + assertEquals(Set.of("_0.cfe", "_0.cfs", "_0.si", "segments_1"), Set.of(remoteSegmentStoreDirectory.listAll())); } public void testDeleteFile() throws IOException { - populateCommitMetadata(); + populateMetadata(); remoteSegmentStoreDirectory.init(); Map uploadedSegments = remoteSegmentStoreDirectory @@ -234,7 +191,7 @@ public void testDeleteFile() throws IOException { } public void testDeleteFileException() throws IOException { - populateCommitMetadata(); + populateMetadata(); remoteSegmentStoreDirectory.init(); doThrow(new IOException("Error")).when(remoteDataDirectory).deleteFile(any()); @@ -242,7 +199,7 @@ public void testDeleteFileException() throws IOException { } public void testFileLenght() throws IOException { - populateCommitMetadata(); + populateMetadata(); remoteSegmentStoreDirectory.init(); Map uploadedSegments = remoteSegmentStoreDirectory @@ -256,7 +213,7 @@ public void testFileLenght() throws IOException { } public void testFileLenghtNoSuchFile() throws IOException { - populateCommitMetadata(); + populateMetadata(); remoteSegmentStoreDirectory.init(); Map uploadedSegments = remoteSegmentStoreDirectory @@ -280,7 +237,7 @@ public void testCreateOutputException() { } public void testOpenInput() throws IOException { - populateCommitMetadata(); + populateMetadata(); remoteSegmentStoreDirectory.init(); IndexInput indexInput = mock(IndexInput.class); @@ -294,7 +251,7 @@ public void testOpenInputNoSuchFile() { } public void testOpenInputException() throws IOException { - populateCommitMetadata(); + populateMetadata(); remoteSegmentStoreDirectory.init(); when(remoteDataDirectory.openInput(startsWith("_0.si"), eq(IOContext.DEFAULT))).thenThrow(new IOException("Error")); @@ -304,7 +261,7 @@ public void testOpenInputException() throws IOException { public void testCopyFrom() throws IOException { String filename = "_100.si"; - populateCommitMetadata(); + populateMetadata(); remoteSegmentStoreDirectory.init(); Directory storeDirectory = LuceneTestCase.newDirectory(); @@ -334,7 +291,7 @@ public void testCopyFromException() throws IOException { } public void testContainsFile() throws IOException { - populateCommitMetadata(); + populateMetadata(); remoteSegmentStoreDirectory.init(); // This is not the correct way to add files but the other way is to open up access to fields in UploadedSegmentMetadata @@ -350,38 +307,30 @@ public void testContainsFile() throws IOException { assertFalse(remoteSegmentStoreDirectory.containsFile("_200.si", "1234")); } - public void testUploadCommitMetadataEmpty() throws IOException { + public void testUploadMetadataEmpty() throws IOException { Directory storeDirectory = mock(Directory.class); IndexOutput indexOutput = mock(IndexOutput.class); - when(storeDirectory.createOutput(startsWith("commit_metadata__12__o"), eq(IOContext.DEFAULT))).thenReturn(indexOutput); - - Collection commitFiles = List.of("s1", "s2", "s3"); - remoteSegmentStoreDirectory.uploadCommitMetadata(commitFiles, storeDirectory, 12L, 24L); + when(storeDirectory.createOutput(startsWith("metadata__12__o"), eq(IOContext.DEFAULT))).thenReturn(indexOutput); - verify(remoteMetadataDirectory).copyFrom( - eq(storeDirectory), - startsWith("commit_metadata__12__o"), - startsWith("commit_metadata__12__o"), - eq(IOContext.DEFAULT) - ); - verify(indexOutput).writeMapOfStrings(Map.of()); + Collection segmentFiles = List.of("s1", "s2", "s3"); + assertThrows(NoSuchFileException.class, () -> remoteSegmentStoreDirectory.uploadMetadata(segmentFiles, storeDirectory, 12L, 24L)); } - public void testUploadRefreshMetadataNonEmpty() throws IOException { - populateCommitMetadata(); + public void testUploadMetadataNonEmpty() throws IOException { + populateMetadata(); remoteSegmentStoreDirectory.init(); Directory storeDirectory = mock(Directory.class); IndexOutput indexOutput = mock(IndexOutput.class); - when(storeDirectory.createOutput(startsWith("refresh_metadata__12__o"), eq(IOContext.DEFAULT))).thenReturn(indexOutput); + when(storeDirectory.createOutput(startsWith("metadata__12__o"), eq(IOContext.DEFAULT))).thenReturn(indexOutput); - Collection commitFiles = List.of("_0.si"); - remoteSegmentStoreDirectory.uploadRefreshMetadata(commitFiles, storeDirectory, 12L, 24L); + Collection segmentFiles = List.of("_0.si"); + remoteSegmentStoreDirectory.uploadMetadata(segmentFiles, storeDirectory, 12L, 24L); verify(remoteMetadataDirectory).copyFrom( eq(storeDirectory), - startsWith("refresh_metadata__12__o"), - startsWith("refresh_metadata__12__o"), + startsWith("metadata__12__o"), + startsWith("metadata__12__o"), eq(IOContext.DEFAULT) ); String metadataString = remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().get("_0.si").toString(); From 2470f4a310dfe06fbb5fef67fd33785d20a25ae6 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Fri, 12 Aug 2022 09:35:57 +0530 Subject: [PATCH 3/3] Wrap uploadMetadata implementation with synchronized Signed-off-by: Sachin Kale --- .../store/RemoteSegmentStoreDirectory.java | 42 +++++++++++-------- 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 6810535692c0a..d7d6b29d08bfc 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -114,7 +114,7 @@ private Map readLatestMetadataFile() throws IOE logger.info("Reading latest Metadata file {}", latestMetadataFile.get()); segmentMetadataMap = readMetadataFile(latestMetadataFile.get()); } else { - logger.info("No metadata file found"); + logger.info("No metadata file found, this can happen for new index with no data uploaded to remote segment store"); } return segmentMetadataMap; @@ -191,7 +191,8 @@ public int compare(String first, String second) { } } - public static String getMetadataFilename(long primaryTerm, long generation, String uuid) { + // Visible for testing + static String getMetadataFilename(long primaryTerm, long generation, String uuid) { return String.join( SEPARATOR, METADATA_PREFIX, @@ -201,15 +202,18 @@ public static String getMetadataFilename(long primaryTerm, long generation, Stri ); } - public static long getPrimaryTerm(String[] filenameTokens) { + // Visible for testing + static long getPrimaryTerm(String[] filenameTokens) { return Long.parseLong(filenameTokens[1]); } - public static long getGeneration(String[] filenameTokens) { + // Visible for testing + static long getGeneration(String[] filenameTokens) { return Long.parseLong(filenameTokens[2], Character.MAX_RADIX); } - public static String getUuid(String[] filenameTokens) { + // Visible for testing + static String getUuid(String[] filenameTokens) { return filenameTokens[3]; } } @@ -320,21 +324,23 @@ public boolean containsFile(String localFilename, String checksum) { */ public void uploadMetadata(Collection segmentFiles, Directory storeDirectory, long primaryTerm, long generation) throws IOException { - String metadataFilename = MetadataFilenameUtils.getMetadataFilename(primaryTerm, generation, this.metadataFileUniqueSuffix); - IndexOutput indexOutput = storeDirectory.createOutput(metadataFilename, IOContext.DEFAULT); - Map uploadedSegments = new HashMap<>(); - for (String file : segmentFiles) { - if (segmentsUploadedToRemoteStore.containsKey(file)) { - uploadedSegments.put(file, segmentsUploadedToRemoteStore.get(file).toString()); - } else { - throw new NoSuchFileException(file); + synchronized (this) { + String metadataFilename = MetadataFilenameUtils.getMetadataFilename(primaryTerm, generation, this.metadataFileUniqueSuffix); + IndexOutput indexOutput = storeDirectory.createOutput(metadataFilename, IOContext.DEFAULT); + Map uploadedSegments = new HashMap<>(); + for (String file : segmentFiles) { + if (segmentsUploadedToRemoteStore.containsKey(file)) { + uploadedSegments.put(file, segmentsUploadedToRemoteStore.get(file).toString()); + } else { + throw new NoSuchFileException(file); + } } + indexOutput.writeMapOfStrings(uploadedSegments); + indexOutput.close(); + storeDirectory.sync(Collections.singleton(metadataFilename)); + remoteMetadataDirectory.copyFrom(storeDirectory, metadataFilename, metadataFilename, IOContext.DEFAULT); + storeDirectory.deleteFile(metadataFilename); } - indexOutput.writeMapOfStrings(uploadedSegments); - indexOutput.close(); - storeDirectory.sync(Collections.singleton(metadataFilename)); - remoteMetadataDirectory.copyFrom(storeDirectory, metadataFilename, metadataFilename, IOContext.DEFAULT); - storeDirectory.deleteFile(metadataFilename); } private String getChecksumOfLocalFile(Directory directory, String file) throws IOException {