Skip to content

Commit

Permalink
Merge pull request #1274 from Graylog2/index-ranges-refactoring
Browse files Browse the repository at this point in the history
Refactor index ranges handling
  • Loading branch information
kroepke committed Jul 7, 2015
2 parents a5e0b2c + 7630647 commit 3bcee89
Show file tree
Hide file tree
Showing 36 changed files with 1,377 additions and 1,049 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.graylog2.restclient.models.api.responses.system.indices.ShardRoutingResponse;
import org.graylog2.restroutes.generated.routes;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -200,30 +199,28 @@ public ShardMeterResponse getRefreshMeter() {

public static class Range {

private final DateTime starts;
private final boolean providesCalculationInfo;

private long calculationTookMs = 0;
private DateTime calculatedAt = null;
private final DateTime begin;
private final DateTime end;
private final long calculationTookMs;
private final DateTime calculatedAt;

public Range(IndexRangeSummary ir) {
this.starts = new DateTime(ir.start(), DateTimeZone.UTC);

if (ir.calculatedAt() != null && ir.calculationTookMs() >= 0) {
this.providesCalculationInfo = true;
this.begin = ir.begin();
this.end = ir.end();
this.calculationTookMs = ir.calculationTookMs();
this.calculatedAt = new DateTime(ir.calculatedAt(), DateTimeZone.UTC);
} else {
this.providesCalculationInfo = false;
}
this.calculatedAt = ir.calculatedAt();
}

public DateTime getBegin() {
return begin;
}

public DateTime getStarts() {
return starts;
public DateTime getEnd() {
return end;
}

public boolean isProvidesCalculationInfo() {
return providesCalculationInfo;
return true;
}

public long getCalculationTookMs() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,27 @@
@AutoValue
@JsonAutoDetect
public abstract class IndexRangeSummary {
@JsonProperty("index")
@JsonProperty("index_name")
public abstract String indexName();

@JsonProperty("begin")
public abstract DateTime begin();

@JsonProperty("end")
public abstract DateTime end();

@Nullable @JsonProperty("calculated_at")
public abstract DateTime calculatedAt();

@JsonProperty("starts")
public abstract DateTime start();

@JsonProperty("calculation_took_ms")
@JsonProperty("took_ms")
public abstract int calculationTookMs();

@JsonCreator
public static IndexRangeSummary create(@JsonProperty("index") String indexName,
public static IndexRangeSummary create(@JsonProperty("index_name") String indexName,
@JsonProperty("begin") DateTime begin,
@JsonProperty("end") DateTime end,
@Nullable @JsonProperty("calculated_at") DateTime calculatedAt,
@JsonProperty("starts") DateTime start,
@JsonProperty("calculation_took_ms") int calculationTookMs) {
return new AutoValue_IndexRangeSummary(indexName, calculatedAt, start, calculationTookMs);
@JsonProperty("took_ms") int calculationTookMs) {
return new AutoValue_IndexRangeSummary(indexName, begin, end, calculatedAt, calculationTookMs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,20 @@
package org.graylog2.bindings;

import com.google.inject.AbstractModule;
import org.graylog2.collectors.CollectorService;
import org.graylog2.collectors.CollectorServiceImpl;
import org.graylog2.alerts.AlertService;
import org.graylog2.alerts.AlertServiceImpl;
import org.graylog2.cluster.NodeService;
import org.graylog2.cluster.NodeServiceImpl;
import org.graylog2.collectors.CollectorService;
import org.graylog2.collectors.CollectorServiceImpl;
import org.graylog2.dashboards.DashboardService;
import org.graylog2.dashboards.DashboardServiceImpl;
import org.graylog2.indexer.IndexFailureService;
import org.graylog2.indexer.IndexFailureServiceImpl;
import org.graylog2.indexer.PersistedDeadLetterService;
import org.graylog2.indexer.PersistedDeadLetterServiceImpl;
import org.graylog2.indexer.ranges.EsIndexRangeService;
import org.graylog2.indexer.ranges.IndexRangeService;
import org.graylog2.indexer.ranges.IndexRangeServiceImpl;
import org.graylog2.inputs.InputService;
import org.graylog2.inputs.InputServiceImpl;
import org.graylog2.notifications.NotificationService;
Expand All @@ -52,9 +52,6 @@
import org.graylog2.system.activities.SystemMessageServiceImpl;
import org.graylog2.users.UserServiceImpl;

/**
* @author Dennis Oelkers <dennis@torch.sh>
*/
public class PersistenceServicesBindings extends AbstractModule {
@Override
protected void configure() {
Expand All @@ -65,7 +62,7 @@ protected void configure() {
bind(PersistedDeadLetterService.class).to(PersistedDeadLetterServiceImpl.class);
bind(IndexFailureService.class).to(IndexFailureServiceImpl.class);
bind(NodeService.class).to(NodeServiceImpl.class);
bind(IndexRangeService.class).to(IndexRangeServiceImpl.class);
bind(IndexRangeService.class).to(EsIndexRangeService.class);
bind(InputService.class).to(InputServiceImpl.class);
bind(StreamRuleService.class).to(StreamRuleServiceImpl.class);
bind(UserService.class).to(UserServiceImpl.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,12 @@
package org.graylog2.bindings;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import org.graylog2.database.ObjectIdSerializer;
import org.graylog2.bindings.providers.ServerObjectMapperProvider;
import org.graylog2.shared.bindings.ObjectMapperModule;

public class ServerObjectMapperModule extends ObjectMapperModule {

@Override
protected ObjectMapper makeObjectMapper() {
final ObjectMapper objectMapper = super.makeObjectMapper();

objectMapper.registerModule(new SimpleModule().addSerializer(new ObjectIdSerializer()));

return objectMapper;
protected void configure() {
bind(ObjectMapper.class).toProvider(ServerObjectMapperProvider.class).asEagerSingleton();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog2.bindings.providers;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import org.graylog2.database.ObjectIdSerializer;
import org.graylog2.shared.bindings.providers.ObjectMapperProvider;

import javax.inject.Provider;
import javax.inject.Singleton;

@Singleton
public class ServerObjectMapperProvider extends ObjectMapperProvider implements Provider<ObjectMapper> {
public ServerObjectMapperProvider() {
super();
this.objectMapper.registerModule(new SimpleModule().addSerializer(new ObjectIdSerializer()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Comparator;
import java.util.List;
import java.util.Set;

Expand Down Expand Up @@ -91,8 +90,8 @@ public static Set<String> determineAffectedIndices(IndexRangeService indexRangeS
TimeRange range) {
Set<String> indices = Sets.newHashSet();

for (IndexRange indexRange : indexRangeService.getFrom((int) (range.getFrom().getMillis() / 1000))) {
indices.add(indexRange.getIndexName());
for (IndexRange indexRange : indexRangeService.find(range.getFrom(), range.getTo())) {
indices.add(indexRange.indexName());
}

// Always include the most recent index in some cases.
Expand All @@ -107,14 +106,9 @@ public static Set<String> determineAffectedIndices(IndexRangeService indexRangeS
public static Set<IndexRange> determineAffectedIndicesWithRanges(IndexRangeService indexRangeService,
Deflector deflector,
TimeRange range) {
Set<IndexRange> indices = Sets.newTreeSet(new Comparator<IndexRange>() {
@Override
public int compare(IndexRange o1, IndexRange o2) {
return o2.getStart().compareTo(o1.getStart());
}
});
Set<IndexRange> indices = Sets.newTreeSet(IndexRange.COMPARATOR);

for (IndexRange indexRange : indexRangeService.getFrom((int) (range.getFrom().getMillis() / 1000))) {
for (IndexRange indexRange : indexRangeService.find(range.getFrom(), range.getTo())) {
indices.add(indexRange);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,48 +18,91 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.client.Client;
import org.graylog2.indexer.messages.Messages;
import org.graylog2.plugin.Tools;

import javax.inject.Inject;
import javax.inject.Singleton;
import java.io.Serializable;
import java.util.List;
import java.util.Map;

import static com.google.common.base.Preconditions.checkNotNull;

/**
* Representing the message type mapping in ElasticSearch. This is giving ES more
* information about what the fields look like and how it should analyze them.
*/
public class Mapping {
@Singleton
public class IndexMapping {
public static final String TYPE_MESSAGE = "message";
public static final String TYPE_INDEX_RANGE = "index_range";

public static PutMappingRequest getPutMappingRequest(final Client client,
final String index,
final String analyzer,
boolean storeTimestampsAsDocValues) {
final PutMappingRequestBuilder builder = client.admin().indices().preparePutMapping(index);
builder.setType(Messages.TYPE);
private final Client client;

final Map<String, Object> mapping = ImmutableMap.of(
@Inject
public IndexMapping(Client client) {
this.client = checkNotNull(client);
}

public ActionFuture<PutMappingResponse> createMapping(final String index, final String type, final Map<String, Object> mapping) {
return client.admin().indices().putMapping(mappingRequest(index, type, mapping));
}

private PutMappingRequest mappingRequest(final String index, final String type, final Map<String, Object> mapping) {
return client.admin().indices().preparePutMapping(index)
.setType(type)
.setSource(ImmutableMap.of(type, mapping))
.request();
}

public Map<String, Object> metaMapping() {
final ImmutableMap<String, ? extends Serializable> stringProperty = ImmutableMap.of(
"type", "string",
"index", "not_analyzed",
"doc_values", true);
final ImmutableMap<String, ? extends Serializable> dateProperty = ImmutableMap.of(
"type", "date",
"format", "date_time",
"index", "not_analyzed",
"doc_values", true);
final ImmutableMap<String, ? extends Serializable> intProperty = ImmutableMap.of(
"type", "integer",
"index", "no",
"doc_values", true);
final Map<String, ? extends Serializable> properties = ImmutableMap.of(
"index_name", stringProperty,
"begin", dateProperty,
"end", dateProperty,
"calculated_at", dateProperty,
"took_ms", intProperty
);

return ImmutableMap.<String, Object>of(
"properties", properties,
"_source", enabled(),
"_timestamp", ImmutableMap.of(
"enabled", true,
"format", "date_time"));
}

public Map<String, Object> messageMapping(final String analyzer, boolean storeTimestampsAsDocValues) {
return ImmutableMap.of(
"properties", partFieldProperties(analyzer, storeTimestampsAsDocValues),
"dynamic_templates", partDefaultAllInDynamicTemplate(),
// Compress source field
"_source", enabledAndCompressed(),
// Enable purging by TTL
"_ttl", enabled());

final Map<String, Map<String, Object>> completeMapping = ImmutableMap.of(Messages.TYPE, mapping);

builder.setSource(completeMapping);
return builder.request();
}

/*
* Disable analyzing for every field by default.
*/
private static List<Map<String, Map<String, Object>>> partDefaultAllInDynamicTemplate() {
private List<Map<String, Map<String, Object>>> partDefaultAllInDynamicTemplate() {
final Map<String, String> notAnalyzed = ImmutableMap.of("index", "not_analyzed");
final Map<String, Object> defaultAll = ImmutableMap.of(
// Match all
Expand All @@ -74,8 +117,8 @@ private static List<Map<String, Map<String, Object>>> partDefaultAllInDynamicTem
/*
* Enable analyzing for some fields again. Like for message and full_message.
*/
private static Map<String, Map<String, ? extends Serializable>> partFieldProperties(String analyzer,
boolean storeTimestampsAsDocValues) {
private Map<String, Map<String, ? extends Serializable>> partFieldProperties(String analyzer,
boolean storeTimestampsAsDocValues) {
return ImmutableMap.of(
"message", analyzedString(analyzer),
"full_message", analyzedString(analyzer),
Expand All @@ -86,33 +129,32 @@ private static List<Map<String, Map<String, Object>>> partDefaultAllInDynamicTem
"source", analyzedString("analyzer_keyword"));
}

private static Map<String, String> analyzedString(String analyzer) {
private Map<String, String> analyzedString(String analyzer) {
return ImmutableMap.of(
"index", "analyzed",
"type", "string",
"analyzer", analyzer);
}

private static Map<String, Serializable> typeTimeWithMillis(boolean storeTimestampsAsDocValues) {
final ImmutableMap.Builder<String, Serializable> builder = ImmutableMap.builder();
builder.put("type", "date")
private Map<String, Serializable> typeTimeWithMillis(boolean storeTimestampsAsDocValues) {
final ImmutableMap.Builder<String, Serializable> builder = ImmutableMap.<String, Serializable>builder()
.put("type", "date")
.put("format", Tools.ES_DATE_FORMAT);

if (storeTimestampsAsDocValues) {
builder.put("doc_values", true);
}

return builder.build();
}

private static Map<String, Boolean> enabled() {
private Map<String, Boolean> enabled() {
return ImmutableMap.of("enabled", true);
}


private static Map<String, Boolean> enabledAndCompressed() {
private Map<String, Boolean> enabledAndCompressed() {
return ImmutableMap.of(
"enabled", true,
"compress", true);
}

}
Loading

0 comments on commit 3bcee89

Please sign in to comment.