Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor state format to use incremental state IDs #10316

Merged
merged 1 commit into from
Mar 31, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 65 additions & 54 deletions src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,19 @@ public abstract class MetaDataStateFormat<T> {
private static final int STATE_FILE_VERSION = 0;
private static final int BUFFER_SIZE = 4096;
private final XContentType format;
private final boolean deleteOldFiles;
private final String prefix;
private final Pattern stateFilePattern;


/**
* Creates a new {@link MetaDataStateFormat} instance
* @param format the format of the x-content
* @param deleteOldFiles if <code>true</code> write operations will
* clean up old files written with this format.
*/
protected MetaDataStateFormat(XContentType format, boolean deleteOldFiles) {
protected MetaDataStateFormat(XContentType format, String prefix) {
this.format = format;
this.deleteOldFiles = deleteOldFiles;
this.prefix = prefix;
this.stateFilePattern = Pattern.compile(Pattern.quote(prefix) + "(\\d+)(" + MetaDataStateFormat.STATE_FILE_EXTENSION + ")?");

}

/**
Expand All @@ -83,15 +85,16 @@ public XContentType format() {
* it's target filename of the pattern <tt>{prefix}{version}.st</tt>.
*
* @param state the state object to write
* @param prefix the state names prefix used to compose the file name.
* @param version the version of the state
* @param locations the locations where the state should be written to.
* @throws IOException if an IOException occurs
*/
public final void write(final T state, final String prefix, final long version, final Path... locations) throws IOException {
public final void write(final T state, final long version, final Path... locations) throws IOException {
Preconditions.checkArgument(locations != null, "Locations must not be null");
Preconditions.checkArgument(locations.length > 0, "One or more locations required");
String fileName = prefix + version + STATE_FILE_EXTENSION;
final long maxStateId = findMaxStateId(prefix, locations)+1;
assert maxStateId >= 0 : "maxStateId must be positive but was: [" + maxStateId + "]";
final String fileName = prefix + maxStateId + STATE_FILE_EXTENSION;
Path stateLocation = locations[0].resolve(STATE_DIR_NAME);
Files.createDirectories(stateLocation);
final Path tmpStatePath = stateLocation.resolve(fileName + ".tmp");
Expand Down Expand Up @@ -136,9 +139,7 @@ public void close() throws IOException {
} finally {
Files.deleteIfExists(tmpStatePath);
}
if (deleteOldFiles) {
cleanupOldFiles(prefix, fileName, locations);
}
cleanupOldFiles(prefix, fileName, locations);
}

protected XContentBuilder newXContentBuilder(XContentType type, OutputStream stream ) throws IOException {
Expand All @@ -161,17 +162,14 @@ protected XContentBuilder newXContentBuilder(XContentType type, OutputStream str
* Reads the state from a given file and compares the expected version against the actual version of
* the state.
*/
public final T read(Path file, long expectedVersion) throws IOException {
public final T read(Path file) throws IOException {
try (Directory dir = newDirectory(file.getParent())) {
try (final IndexInput indexInput = dir.openInput(file.getFileName().toString(), IOContext.DEFAULT)) {
// We checksum the entire file before we even go and parse it. If it's corrupted we barf right here.
CodecUtil.checksumEntireFile(indexInput);
CodecUtil.checkHeader(indexInput, STATE_FILE_CODEC, STATE_FILE_VERSION, STATE_FILE_VERSION);
final XContentType xContentType = XContentType.values()[indexInput.readInt()];
final long version = indexInput.readLong();
if (version != expectedVersion) {
throw new CorruptStateException("State version mismatch expected: " + expectedVersion + " but was: " + version);
}
indexInput.readLong(); // version currently unused
long filePointer = indexInput.getFilePointer();
long contentSize = indexInput.length() - CodecUtil.footerLength() - filePointer;
try (IndexInput slice = indexInput.slice("state_xcontent", filePointer, contentSize)) {
Expand Down Expand Up @@ -210,25 +208,38 @@ public boolean accept(Path entry) throws IOException {
}
}

long findMaxStateId(final String prefix, Path... locations) throws IOException {
long maxId = -1;
for (Path dataLocation : locations) {
final Path resolve = dataLocation.resolve(STATE_DIR_NAME);
if (Files.exists(resolve)) {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(resolve, prefix + "*")) {
for (Path stateFile : stream) {
final Matcher matcher = stateFilePattern.matcher(stateFile.getFileName().toString());
if (matcher.matches()) {
final long id = Long.parseLong(matcher.group(1));
maxId = Math.max(maxId, id);
}
}
}
}
}
return maxId;
}

/**
* Tries to load the latest state from the given data-locations. It tries to load the latest state determined by
* the states version from one or more data directories and if none of the latest states can be loaded an exception
* is thrown to prevent accidentally loading a previous state and silently omitting the latest state.
*
* @param logger an elasticsearch logger instance
* @param format the actual metastate format to use
* @param pattern the file name pattern to identify files belonging to this pattern and to read the version from.
* The first capture group should return the version of the file. If the second capture group is has a
* null value the files is considered a legacy file and will be treated as if the file contains a plain
* x-content payload.
* @param stateType the state type we are loading. used for logging contenxt only.
* @param dataLocations the data-locations to try.
* @return the latest state or <code>null</code> if no state was found.
*/
public static <T> T loadLatestState(ESLogger logger, MetaDataStateFormat<T> format, Pattern pattern, String stateType, Path... dataLocations) throws IOException {
List<PathAndVersion> files = new ArrayList<>();
long maxVersion = -1;
boolean maxVersionIsLegacy = true;
public T loadLatestState(ESLogger logger, Path... dataLocations) throws IOException {
List<PathAndStateId> files = new ArrayList<>();
long maxStateId = -1;
boolean maxStateIdIsLegacy = true;
if (dataLocations != null) { // select all eligable files first
for (Path dataLocation : dataLocations) {
final Path stateDir = dataLocation.resolve(STATE_DIR_NAME);
Expand All @@ -238,13 +249,13 @@ public static <T> T loadLatestState(ESLogger logger, MetaDataStateFormat<T> form
// now, iterate over the current versions, and find latest one
try (DirectoryStream<Path> paths = Files.newDirectoryStream(stateDir)) { // we don't pass a glob since we need the group part for parsing
for (Path stateFile : paths) {
final Matcher matcher = pattern.matcher(stateFile.getFileName().toString());
final Matcher matcher = stateFilePattern.matcher(stateFile.getFileName().toString());
if (matcher.matches()) {
final long version = Long.parseLong(matcher.group(1));
maxVersion = Math.max(maxVersion, version);
final long stateId = Long.parseLong(matcher.group(1));
maxStateId = Math.max(maxStateId, stateId);
final boolean legacy = MetaDataStateFormat.STATE_FILE_EXTENSION.equals(matcher.group(2)) == false;
maxVersionIsLegacy &= legacy; // on purpose, see NOTE below
PathAndVersion pav = new PathAndVersion(stateFile, version, legacy);
maxStateIdIsLegacy &= legacy; // on purpose, see NOTE below
PathAndStateId pav = new PathAndStateId(stateFile, stateId, legacy);
logger.trace("found state file: {}", pav);
files.add(pav);
}
Expand All @@ -259,30 +270,30 @@ public static <T> T loadLatestState(ESLogger logger, MetaDataStateFormat<T> form
// new format (ie. legacy == false) then we know that the latest version state ought to use this new format.
// In case the state file with the latest version does not use the new format while older state files do,
// the list below will be empty and loading the state will fail
for (PathAndVersion pathAndVersion : Collections2.filter(files, new VersionAndLegacyPredicate(maxVersion, maxVersionIsLegacy))) {
for (PathAndStateId pathAndStateId : Collections2.filter(files, new StateIdAndLegacyPredicate(maxStateId, maxStateIdIsLegacy))) {
try {
final Path stateFile = pathAndVersion.file;
final long version = pathAndVersion.version;
final Path stateFile = pathAndStateId.file;
final long id = pathAndStateId.id;
final XContentParser parser;
if (pathAndVersion.legacy) { // read the legacy format -- plain XContent
if (pathAndStateId.legacy) { // read the legacy format -- plain XContent
final byte[] data = Files.readAllBytes(stateFile);
if (data.length == 0) {
logger.debug("{}: no data for [{}], ignoring...", stateType, stateFile.toAbsolutePath());
logger.debug("{}: no data for [{}], ignoring...", prefix, stateFile.toAbsolutePath());
continue;
}
parser = XContentHelper.createParser(data, 0, data.length);
state = format.fromXContent(parser);
state = fromXContent(parser);
if (state == null) {
logger.debug("{}: no data for [{}], ignoring...", stateType, stateFile.toAbsolutePath());
logger.debug("{}: no data for [{}], ignoring...", prefix, stateFile.toAbsolutePath());
}
} else {
state = format.read(stateFile, version);
logger.trace("state version [{}] read from [{}]", version, stateFile.getFileName());
state = read(stateFile);
logger.trace("state id [{}] read from [{}]", id, stateFile.getFileName());
}
return state;
} catch (Throwable e) {
exceptions.add(e);
logger.debug("{}: failed to read [{}], ignoring...", e, pathAndVersion.file.toAbsolutePath(), stateType);
logger.debug("{}: failed to read [{}], ignoring...", e, pathAndStateId.file.toAbsolutePath(), prefix);
}
}
// if we reach this something went wrong
Expand All @@ -295,42 +306,42 @@ public static <T> T loadLatestState(ESLogger logger, MetaDataStateFormat<T> form
}

/**
* Filters out all {@link MetaDataStateFormat.PathAndVersion} instances with a different version than
* Filters out all {@link org.elasticsearch.gateway.MetaDataStateFormat.PathAndStateId} instances with a different id than
* the given one.
*/
private static final class VersionAndLegacyPredicate implements Predicate<PathAndVersion> {
private final long version;
private static final class StateIdAndLegacyPredicate implements Predicate<PathAndStateId> {
private final long id;
private final boolean legacy;

VersionAndLegacyPredicate(long version, boolean legacy) {
this.version = version;
StateIdAndLegacyPredicate(long id, boolean legacy) {
this.id = id;
this.legacy = legacy;
}

@Override
public boolean apply(PathAndVersion input) {
return input.version == version && input.legacy == legacy;
public boolean apply(PathAndStateId input) {
return input.id == id && input.legacy == legacy;
}
}

/**
* Internal struct-like class that holds the parsed state version, the file
* Internal struct-like class that holds the parsed state id, the file
* and a flag if the file is a legacy state ie. pre 1.5
*/
private static class PathAndVersion {
private static class PathAndStateId {
final Path file;
final long version;
final long id;
final boolean legacy;

private PathAndVersion(Path file, long version, boolean legacy) {
private PathAndStateId(Path file, long id, boolean legacy) {
this.file = file;
this.version = version;
this.id = id;
this.legacy = legacy;
}

@Override
public String toString() {
return "[version:" + version + ", legacy:" + legacy + ", file:" + file.toAbsolutePath() + "]";
return "[id:" + id + ", legacy:" + legacy + ", file:" + file.toAbsolutePath() + "]";
}
}

Expand Down
34 changes: 15 additions & 19 deletions src/main/java/org/elasticsearch/gateway/MetaStateService.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;

/**
* Handles writing and loading both {@link MetaData} and {@link IndexMetaData}
Expand All @@ -47,22 +46,20 @@ public class MetaStateService extends AbstractComponent {

static final String GLOBAL_STATE_FILE_PREFIX = "global-";
private static final String INDEX_STATE_FILE_PREFIX = "state-";
static final Pattern GLOBAL_STATE_FILE_PATTERN = Pattern.compile(GLOBAL_STATE_FILE_PREFIX + "(\\d+)(" + MetaDataStateFormat.STATE_FILE_EXTENSION + ")?");
static final Pattern INDEX_STATE_FILE_PATTERN = Pattern.compile(INDEX_STATE_FILE_PREFIX + "(\\d+)(" + MetaDataStateFormat.STATE_FILE_EXTENSION + ")?");
private static final String GLOBAL_STATE_LOG_TYPE = "[_global]";

private final NodeEnvironment nodeEnv;

private final XContentType format;
private final ToXContent.Params formatParams;
private final ToXContent.Params gatewayModeFormatParams;
private final MetaDataStateFormat<IndexMetaData> indexStateFormat;
private final MetaDataStateFormat<MetaData> globalStateFormat;

@Inject
public MetaStateService(Settings settings, NodeEnvironment nodeEnv) {
super(settings);
this.nodeEnv = nodeEnv;
this.format = XContentType.fromRestContentType(settings.get(FORMAT_SETTING, "smile"));

if (this.format == XContentType.SMILE) {
Map<String, String> params = Maps.newHashMap();
params.put("binary", "true");
Expand All @@ -77,6 +74,9 @@ public MetaStateService(Settings settings, NodeEnvironment nodeEnv) {
gatewayModeParams.put(MetaData.CONTEXT_MODE_PARAM, MetaData.CONTEXT_MODE_GATEWAY);
gatewayModeFormatParams = new ToXContent.MapParams(gatewayModeParams);
}
indexStateFormat = indexStateFormat(format, formatParams);
globalStateFormat = globalStateFormat(format, gatewayModeFormatParams);

}

/**
Expand Down Expand Up @@ -109,26 +109,23 @@ MetaData loadFullState() throws Exception {
*/
@Nullable
IndexMetaData loadIndexState(String index) throws IOException {
return MetaDataStateFormat.loadLatestState(logger, indexStateFormat(format, formatParams, true),
INDEX_STATE_FILE_PATTERN, "[" + index + "]", nodeEnv.indexPaths(new Index(index)));
return indexStateFormat.loadLatestState(logger, nodeEnv.indexPaths(new Index(index)));
}

/**
* Loads the global state, *without* index state, see {@link #loadFullState()} for that.
*/
MetaData loadGlobalState() throws IOException {
return MetaDataStateFormat.loadLatestState(logger, globalStateFormat(format, gatewayModeFormatParams, true), GLOBAL_STATE_FILE_PATTERN, GLOBAL_STATE_LOG_TYPE, nodeEnv.nodeDataPaths());
return globalStateFormat.loadLatestState(logger, nodeEnv.nodeDataPaths());
}

/**
* Writes the index state.
*/
void writeIndex(String reason, IndexMetaData indexMetaData, @Nullable IndexMetaData previousIndexMetaData) throws Exception {
logger.trace("[{}] writing state, reason [{}]", indexMetaData.index(), reason);
final boolean deleteOldFiles = previousIndexMetaData != null && previousIndexMetaData.version() != indexMetaData.version();
final MetaDataStateFormat<IndexMetaData> writer = indexStateFormat(format, formatParams, deleteOldFiles);
try {
writer.write(indexMetaData, INDEX_STATE_FILE_PREFIX, indexMetaData.version(),
indexStateFormat.write(indexMetaData, indexMetaData.version(),
nodeEnv.indexPaths(new Index(indexMetaData.index())));
} catch (Throwable ex) {
logger.warn("[{}]: failed to write index state", ex, indexMetaData.index());
Expand All @@ -140,21 +137,20 @@ void writeIndex(String reason, IndexMetaData indexMetaData, @Nullable IndexMetaD
* Writes the global state, *without* the indices states.
*/
void writeGlobalState(String reason, MetaData metaData) throws Exception {
logger.trace("{} writing state, reason [{}]", GLOBAL_STATE_LOG_TYPE, reason);
final MetaDataStateFormat<MetaData> writer = globalStateFormat(format, gatewayModeFormatParams, true);
logger.trace("[_global] writing state, reason [{}]", reason);
try {
writer.write(metaData, GLOBAL_STATE_FILE_PREFIX, metaData.version(), nodeEnv.nodeDataPaths());
globalStateFormat.write(metaData, metaData.version(), nodeEnv.nodeDataPaths());
} catch (Throwable ex) {
logger.warn("{}: failed to write global state", ex, GLOBAL_STATE_LOG_TYPE);
logger.warn("[_global]: failed to write global state", ex);
throw new IOException("failed to write global state", ex);
}
}

/**
* Returns a StateFormat that can read and write {@link MetaData}
*/
static MetaDataStateFormat<MetaData> globalStateFormat(XContentType format, final ToXContent.Params formatParams, final boolean deleteOldFiles) {
return new MetaDataStateFormat<MetaData>(format, deleteOldFiles) {
static MetaDataStateFormat<MetaData> globalStateFormat(XContentType format, final ToXContent.Params formatParams) {
return new MetaDataStateFormat<MetaData>(format, GLOBAL_STATE_FILE_PREFIX) {

@Override
public void toXContent(XContentBuilder builder, MetaData state) throws IOException {
Expand All @@ -171,8 +167,8 @@ public MetaData fromXContent(XContentParser parser) throws IOException {
/**
* Returns a StateFormat that can read and write {@link IndexMetaData}
*/
static MetaDataStateFormat<IndexMetaData> indexStateFormat(XContentType format, final ToXContent.Params formatParams, boolean deleteOldFiles) {
return new MetaDataStateFormat<IndexMetaData>(format, deleteOldFiles) {
static MetaDataStateFormat<IndexMetaData> indexStateFormat(XContentType format, final ToXContent.Params formatParams) {
return new MetaDataStateFormat<IndexMetaData>(format, INDEX_STATE_FILE_PREFIX) {

@Override
public void toXContent(XContentBuilder builder, IndexMetaData state) throws IOException {
Expand Down
Loading