Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change PaloMetrics' name and Catalog's Id generator #329

Merged
merged 4 commits into from
Nov 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions fe/src/main/java/org/apache/doris/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,8 @@ public class Catalog {
private int masterHttpPort;
private String masterIp;

// For metadata persistence
private AtomicLong nextId = new AtomicLong(NEXT_ID_INIT_VALUE);
private CatalogIdGenerator idGenerator = new CatalogIdGenerator(NEXT_ID_INIT_VALUE);

private String metaDir;
private EditLog editLog;
private int clusterId;
Expand Down Expand Up @@ -565,6 +565,7 @@ public void initialize(String[] args) throws Exception {
loadImage(IMAGE_DIR); // load image file
editLog.open(); // open bdb env or local output stream
this.globalTransactionMgr.setEditLog(editLog);
this.idGenerator.setEditLog(editLog);

// 4. start load label cleaner thread
createCleaner();
Expand Down Expand Up @@ -1281,7 +1282,7 @@ public long loadHeader(DataInputStream dis, long checksum) throws IOException {
newChecksum ^= replayedJournalId;
long id = dis.readLong();
newChecksum ^= id;
nextId.set(id);
idGenerator.setId(id);

if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_32) {
isDefaultClusterCreated = dis.readBoolean();
Expand Down Expand Up @@ -1685,7 +1686,7 @@ public long saveHeader(DataOutputStream dos, long replayedJournalId, long checks
dos.writeLong(replayedJournalId);

// Write id
long id = nextId.getAndIncrement();
long id = idGenerator.getBatchEndId();
checksum ^= id;
dos.writeLong(id);

Expand Down Expand Up @@ -4093,8 +4094,7 @@ public EditLog getEditLog() {

// Get the next available, need't lock because of nextId is atomic.
public long getNextId() {
long id = nextId.getAndIncrement();
editLog.logSaveNextId(id);
long id = idGenerator.getNextId();
return id;
}

Expand Down Expand Up @@ -4371,9 +4371,7 @@ public void setEditLog(EditLog editLog) {
}

public void setNextId(long id) {
if (nextId.get() < id) {
nextId.set(id);
}
idGenerator.setId(id);
}

public void setHaProtocol(HAProtocol protocol) {
Expand Down
64 changes: 64 additions & 0 deletions fe/src/main/java/org/apache/doris/catalog/CatalogIdGenerator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.catalog;

import org.apache.doris.persist.EditLog;

// This new Id generator is just same as TransactionIdGenerator.
// But we can't just use TransactionIdGenerator to replace the old catalog's 'nextId' for compatibility reason.
// cause they are using different edit log operation type.
public class CatalogIdGenerator {
private static final int BATCH_ID_INTERVAL = 1000;

private long nextId;
private long batchEndId;

private EditLog editLog;

public CatalogIdGenerator(long initValue) {
nextId = initValue + 1;
batchEndId = initValue;
}

public void setEditLog(EditLog editLog) {
this.editLog = editLog;
}

// performance is more quickly
public synchronized long getNextId() {
if (nextId < batchEndId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nextId is set to 10000. Some successive id is ignored by this function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

return nextId++;
} else {
batchEndId = batchEndId + BATCH_ID_INTERVAL;
editLog.logSaveNextId(batchEndId);
return nextId++;
}
}

public synchronized void setId(long id) {
if (id > batchEndId) {
batchEndId = id;
nextId = id;
}
}

// just for checkpoint, so no need to synchronize
public long getBatchEndId() {
return batchEndId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
/*
* Counter metric can only be increased
*/
public abstract class PaloCounterMetric<T> extends PaloMetric<T> {
public abstract class CounterMetric<T> extends Metric<T> {

public PaloCounterMetric(String name, String description) {
public CounterMetric(String name, String description) {
super(name, MetricType.COUNTER, description);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,19 @@
import java.util.List;
import java.util.stream.Collectors;

public class PaloMetricRegistry {
public class DorisMetricRegistry {

private List<PaloMetric> paloMetrics = Lists.newArrayList();
private List<Metric> paloMetrics = Lists.newArrayList();

public PaloMetricRegistry() {
public DorisMetricRegistry() {

}

public synchronized void addPaloMetrics(PaloMetric paloMetric) {
public synchronized void addPaloMetrics(Metric paloMetric) {
paloMetrics.add(paloMetric);
}

public synchronized List<PaloMetric> getPaloMetrics() {
public synchronized List<Metric> getPaloMetrics() {
return Lists.newArrayList(paloMetrics);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

import com.google.common.util.concurrent.AtomicDouble;

public class PaloDoubleCounterMetric extends PaloCounterMetric<Double> {
public class DoubleCounterMetric extends CounterMetric<Double> {

public PaloDoubleCounterMetric(String name, String description) {
public DoubleCounterMetric(String name, String description) {
super(name, description);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
/*
* Gauge metric is updated every time it is visited
*/
public abstract class PaloGaugeMetric<T> extends PaloMetric<T> {
public abstract class GaugeMetric<T> extends Metric<T> {

public PaloGaugeMetric(String name, String description) {
public GaugeMetric(String name, String description) {
super(name, MetricType.GAUGE, description);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

import java.util.concurrent.atomic.AtomicLong;

public class PaloLongCounterMetric extends PaloCounterMetric<Long> {
public class LongCounterMetric extends CounterMetric<Long> {

public PaloLongCounterMetric(String name, String description) {
public LongCounterMetric(String name, String description) {
super(name, description);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import java.util.List;

public abstract class PaloMetric<T> {
public abstract class Metric<T> {
public enum MetricType {
GAUGE, COUNTER
}
Expand All @@ -31,7 +31,7 @@ public enum MetricType {
protected List<MetricLabel> labels = Lists.newArrayList();
protected String description;

public PaloMetric(String name, MetricType type, String description) {
public Metric(String name, MetricType type, String description) {
this.name = name;
this.type = type;
this.description = description;
Expand All @@ -49,7 +49,7 @@ public String getDescription() {
return description;
}

public PaloMetric<T> addLabel(MetricLabel label) {
public Metric<T> addLabel(MetricLabel label) {
if (labels.contains(label)) {
return this;
}
Expand Down
65 changes: 33 additions & 32 deletions fe/src/main/java/org/apache/doris/metric/MetricRepo.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.doris.metric;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;

import org.apache.doris.alter.Alter;
import org.apache.doris.alter.AlterJob.JobType;
import org.apache.doris.catalog.Catalog;
Expand All @@ -30,10 +33,6 @@
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -45,20 +44,21 @@ public final class MetricRepo {
private static final Logger LOG = LogManager.getLogger(MetricRepo.class);

private static final MetricRegistry METRIC_REGISTER = new MetricRegistry();
private static final PaloMetricRegistry PALO_METRIC_REGISTER = new PaloMetricRegistry();
private static final DorisMetricRegistry PALO_METRIC_REGISTER = new DorisMetricRegistry();

public static AtomicBoolean isInit = new AtomicBoolean(false);

public static PaloLongCounterMetric COUNTER_REQUEST_ALL;
public static PaloLongCounterMetric COUNTER_QUERY_ALL;
public static PaloLongCounterMetric COUNTER_QUERY_ERR;
public static PaloLongCounterMetric COUNTER_LOAD_ADD;
public static PaloLongCounterMetric COUNTER_LOAD_FINISHED;
public static PaloLongCounterMetric COUNTER_EDIT_LOG_WRITE;
public static PaloLongCounterMetric COUNTER_EDIT_LOG_READ;
public static PaloLongCounterMetric COUNTER_IMAGE_WRITE;
public static PaloLongCounterMetric COUNTER_IMAGE_PUSH;
public static LongCounterMetric COUNTER_REQUEST_ALL;
public static LongCounterMetric COUNTER_QUERY_ALL;
public static LongCounterMetric COUNTER_QUERY_ERR;
public static LongCounterMetric COUNTER_LOAD_ADD;
public static LongCounterMetric COUNTER_LOAD_FINISHED;
public static LongCounterMetric COUNTER_EDIT_LOG_WRITE;
public static LongCounterMetric COUNTER_EDIT_LOG_READ;
public static LongCounterMetric COUNTER_IMAGE_WRITE;
public static LongCounterMetric COUNTER_IMAGE_PUSH;
public static Histogram HISTO_QUERY_LATENCY;
public static Histogram HISTO_EDIT_LOG_WRITE_LATENCY;

public static synchronized void init() {
if (isInit.get()) {
Expand All @@ -70,7 +70,7 @@ public static synchronized void init() {
Load load = Catalog.getInstance().getLoadInstance();
for (EtlJobType jobType : EtlJobType.values()) {
for (JobState state : JobState.values()) {
PaloGaugeMetric<Integer> gauge = (PaloGaugeMetric<Integer>) new PaloGaugeMetric<Integer>("job",
GaugeMetric<Integer> gauge = (GaugeMetric<Integer>) new GaugeMetric<Integer>("job",
"job statistics") {
@Override
public Integer getValue() {
Expand All @@ -94,7 +94,7 @@ public Integer getValue() {
continue;
}

PaloGaugeMetric<Integer> gauge = (PaloGaugeMetric<Integer>) new PaloGaugeMetric<Integer>("job",
GaugeMetric<Integer> gauge = (GaugeMetric<Integer>) new GaugeMetric<Integer>("job",
"job statistics") {
@Override
public Integer getValue() {
Expand All @@ -118,7 +118,7 @@ public Integer getValue() {
generateCapacityMetrics();

// connections
PaloGaugeMetric<Integer> conections = (PaloGaugeMetric<Integer>) new PaloGaugeMetric<Integer>(
GaugeMetric<Integer> conections = (GaugeMetric<Integer>) new GaugeMetric<Integer>(
"connection_total", "total connections") {
@Override
public Integer getValue() {
Expand All @@ -128,7 +128,7 @@ public Integer getValue() {
PALO_METRIC_REGISTER.addPaloMetrics(conections);

// journal id
PaloGaugeMetric<Long> maxJournalId = (PaloGaugeMetric<Long>) new PaloGaugeMetric<Long>(
GaugeMetric<Long> maxJournalId = (GaugeMetric<Long>) new GaugeMetric<Long>(
"max_journal_id", "max journal id of this frontends") {
@Override
public Long getValue() {
Expand All @@ -142,31 +142,32 @@ public Long getValue() {
PALO_METRIC_REGISTER.addPaloMetrics(maxJournalId);

// 2. counter
COUNTER_REQUEST_ALL = new PaloLongCounterMetric("request_total", "total request");
COUNTER_REQUEST_ALL = new LongCounterMetric("request_total", "total request");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_REQUEST_ALL);
COUNTER_QUERY_ALL = new PaloLongCounterMetric("query_total", "total query");
COUNTER_QUERY_ALL = new LongCounterMetric("query_total", "total query");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_QUERY_ALL);
COUNTER_QUERY_ERR = new PaloLongCounterMetric("query_err", "total error query");
COUNTER_QUERY_ERR = new LongCounterMetric("query_err", "total error query");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_QUERY_ERR);
COUNTER_LOAD_ADD = new PaloLongCounterMetric("load_add", "total laod submit");
COUNTER_LOAD_ADD = new LongCounterMetric("load_add", "total laod submit");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_LOAD_ADD);
COUNTER_LOAD_FINISHED = new PaloLongCounterMetric("load_finished", "total laod finished");
COUNTER_LOAD_FINISHED = new LongCounterMetric("load_finished", "total laod finished");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_LOAD_FINISHED);
COUNTER_EDIT_LOG_WRITE = new PaloLongCounterMetric("edit_log_write", "counter of edit log write into bdbje");
COUNTER_EDIT_LOG_WRITE = new LongCounterMetric("edit_log_write", "counter of edit log write into bdbje");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_EDIT_LOG_WRITE);
COUNTER_EDIT_LOG_READ = new PaloLongCounterMetric("edit_log_read", "counter of edit log read from bdbje");
COUNTER_EDIT_LOG_READ = new LongCounterMetric("edit_log_read", "counter of edit log read from bdbje");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_EDIT_LOG_READ);
COUNTER_IMAGE_WRITE = new PaloLongCounterMetric("image_write", "counter of image generated");
COUNTER_IMAGE_WRITE = new LongCounterMetric("image_write", "counter of image generated");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_IMAGE_WRITE);
COUNTER_IMAGE_PUSH = new PaloLongCounterMetric("image_push",
COUNTER_IMAGE_PUSH = new LongCounterMetric("image_push",
"counter of image succeeded in pushing to other frontends");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_IMAGE_PUSH);

// 3. histogram
HISTO_QUERY_LATENCY = METRIC_REGISTER.histogram(MetricRegistry.name("query", "latency", "ms"));
HISTO_EDIT_LOG_WRITE_LATENCY = METRIC_REGISTER.histogram(MetricRegistry.name("editlog", "write", "latency",
"ms"));

isInit.set(true);
;
}

// this metric is reentrant, so that we can add or remove metric along with the backend add or remove
Expand All @@ -187,7 +188,7 @@ public static void generateCapacityMetrics() {
LOG.debug("get backend: {}", be);
for (DiskInfo diskInfo : be.getDisks().values()) {
LOG.debug("get disk: {}", diskInfo);
PaloGaugeMetric<Long> total = (PaloGaugeMetric<Long>) new PaloGaugeMetric<Long>(CAPACITY,
GaugeMetric<Long> total = (GaugeMetric<Long>) new GaugeMetric<Long>(CAPACITY,
"disk capacity") {
@Override
public Long getValue() {
Expand All @@ -202,7 +203,7 @@ public Long getValue() {
.addLabel(new MetricLabel("type", "total"));
PALO_METRIC_REGISTER.addPaloMetrics(total);

PaloGaugeMetric<Long> used = (PaloGaugeMetric<Long>) new PaloGaugeMetric<Long>(CAPACITY,
GaugeMetric<Long> used = (GaugeMetric<Long>) new GaugeMetric<Long>(CAPACITY,
"disk capacity") {
@Override
public Long getValue() {
Expand All @@ -221,7 +222,7 @@ public Long getValue() {
}
}

public static synchronized String getMetric(PaloMetricVisitor visitor) {
public static synchronized String getMetric(MetricVisitor visitor) {
if (!isInit.get()) {
return "";
}
Expand All @@ -232,7 +233,7 @@ public static synchronized String getMetric(PaloMetricVisitor visitor) {
sb.append(visitor.visitJvm(jvmStats)).append("\n");

// palo metrics
for (PaloMetric metric : PALO_METRIC_REGISTER.getPaloMetrics()) {
for (Metric metric : PALO_METRIC_REGISTER.getPaloMetrics()) {
sb.append(visitor.visit(metric)).append("\n");
}

Expand Down
Loading