Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add des cluster when perform 'allter system add backend' #134

Merged
merged 2 commits into from
Nov 3, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions docs/help/Contents/Administration/admin_stmt.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,21 @@

该语句用于操作一个系统内的节点。(仅管理员使用!)
语法:
1) 增加节点
1) 增加节点(不使用多租户功能则按照此方法添加)
ALTER SYSTEM ADD BACKEND "host:heartbeat_port"[,"host:heartbeat_port"...];
2) 增加空闲节点
2) 增加空闲节点(即添加不属于任何cluster的BACKEND)
ALTER SYSTEM ADD FREE BACKEND "host:heartbeat_port"[,"host:heartbeat_port"...];
2) 删除节点
3) 增加节点到某个cluster
ALTER SYSTEM ADD BACKEND TO cluster_name "host:heartbeat_port"[,"host:heartbeat_port"...];
4) 删除节点
ALTER SYSTEM DROP BACKEND "host:heartbeat_port"[,"host:heartbeat_port"...];
3) 节点下线
5) 节点下线
ALTER SYSTEM DECOMMISSION BACKEND "host:heartbeat_port"[,"host:heartbeat_port"...];
4) 增加Broker
6) 增加Broker
ALTER SYSTEM ADD BROKER broker_name "host:port"[,"host:port"...];
5) 减少Broker
7) 减少Broker
ALTER SYSTEM DROP BROKER broker_name "host:port"[,"host:port"...];
6) 删除所有Broker
8) 删除所有Broker
ALTER SYSTEM DROP ALL BROKER broker_name

说明:
Expand Down
10 changes: 9 additions & 1 deletion fe/src/com/baidu/palo/alter/SystemHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import com.baidu.palo.thrift.TTabletInfo;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang.NotImplementedException;
Expand Down Expand Up @@ -145,7 +146,14 @@ public synchronized void process(List<AlterClause> alterClauses, String clusterN

if (alterClause instanceof AddBackendClause) {
AddBackendClause addBackendClause = (AddBackendClause) alterClause;
Catalog.getCurrentSystemInfo().addBackends(addBackendClause.getHostPortPairs(), addBackendClause.isFree());
final String destClusterName = addBackendClause.getDestCluster();

if (!Strings.isNullOrEmpty(destClusterName)
&& Catalog.getInstance().getCluster(destClusterName) == null) {
throw new DdlException("Cluster: " + destClusterName + " does not exist.");
}
Catalog.getCurrentSystemInfo().addBackends(addBackendClause.getHostPortPairs(),
addBackendClause.isFree(), addBackendClause.getDestCluster());
} else if (alterClause instanceof DropBackendClause) {
DropBackendClause dropBackendClause = (DropBackendClause) alterClause;
if (!dropBackendClause.isForce()) {
Expand Down
37 changes: 30 additions & 7 deletions fe/src/com/baidu/palo/analysis/AddBackendClause.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,43 @@

import java.util.List;

import com.baidu.palo.catalog.Catalog;
import com.baidu.palo.common.AnalysisException;
import com.baidu.palo.common.InternalException;
import com.google.common.base.Strings;

public class AddBackendClause extends BackendClause {

// be in free state is not owned by any cluster
protected boolean isFree;

// cluster that backend will be added to
protected String destCluster;

public AddBackendClause(List<String> hostPorts, boolean isFree) {
super(hostPorts);
this.isFree = isFree;
this.destCluster = "";
}

public AddBackendClause(List<String> hostPorts, String destCluster) {
super(hostPorts);
this.isFree = false;
this.destCluster = destCluster;
}

@Override
public String toSql() {
StringBuilder sb = new StringBuilder();
sb.append("ADD BACKEND ");
sb.append("ADD ");
if (isFree) {
sb.append("FREE ");
}
sb.append("BACKEND ");

if (!Strings.isNullOrEmpty(destCluster)) {
sb.append("to").append(destCluster);
}

for (int i = 0; i < hostPorts.size(); i++) {
sb.append("\"").append(hostPorts.get(i)).append("\"");
if (i != hostPorts.size() - 1) {
Expand All @@ -40,12 +63,12 @@ public String toSql() {
return sb.toString();
}


public void setFree(boolean isFree) {
this.isFree = isFree;
}

public boolean isFree() {
return this.isFree;
}

public String getDestCluster() {
return destCluster;
}

}
2 changes: 1 addition & 1 deletion fe/src/com/baidu/palo/analysis/AlterClusterStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void analyze(Analyzer analyzer) throws AnalysisException, InternalExcepti
ErrorReport.reportAnalysisException(ErrorCode.ERR_CLUSTER_NO_PARAMETER);
}

if (instanceNum <= 0) {
if (instanceNum < 0) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_CLUSTER_CREATE_ISTANCE_NUM_ERROR);
}
}
Expand Down
4 changes: 3 additions & 1 deletion fe/src/com/baidu/palo/analysis/AlterTableStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.baidu.palo.common.AnalysisException;
import com.baidu.palo.common.ErrorCode;
import com.baidu.palo.common.ErrorReport;
import com.baidu.palo.common.InternalException;
import com.baidu.palo.common.io.Writable;

import com.google.common.collect.Lists;
Expand Down Expand Up @@ -62,7 +63,8 @@ public void setTableName(String newTableName) {
}

@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
public void analyze(Analyzer analyzer) throws AnalysisException, InternalException {
super.analyze(analyzer);
if (tbl == null) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_TABLES_USED);
}
Expand Down
5 changes: 3 additions & 2 deletions fe/src/com/baidu/palo/analysis/CreateClusterStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,11 @@ public void analyze(Analyzer analyzer) throws AnalysisException, InternalExcepti
} catch (NumberFormatException e) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_CLUSTER_NO_PARAMETER);
}
if (instanceNum <= 0) {

if (instanceNum < 0) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_CLUSTER_CREATE_ISTANCE_NUM_ERROR);
}

final String password = passwd;
if (!Strings.isNullOrEmpty(password)) {
scramblePassword = MysqlPassword.makeScrambledPassword(password);
Expand Down
1 change: 1 addition & 0 deletions fe/src/com/baidu/palo/analysis/CreateTableStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ public void setTableName(String newTableName) {

@Override
public void analyze(Analyzer analyzer) throws AnalysisException, InternalException {
super.analyze(analyzer);
tableName.analyze(analyzer);
FeNameFormat.checkTableName(tableName.getTbl());

Expand Down
32 changes: 18 additions & 14 deletions fe/src/com/baidu/palo/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -4505,7 +4505,9 @@ public void createCluster(CreateClusterStmt stmt) throws DdlException {
ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_HAS_EXIST, clusterName);
} else {
List<Long> backendList = systemInfo.createCluster(clusterName, stmt.getInstanceNum());
if (backendList != null) {
// 1: BE returned is less than requested, throws DdlException.
// 2: BE returned is more than or equal to 0, succeeds.
if (backendList != null || stmt.getInstanceNum() == 0) {
final long id = getNextId();
final Cluster cluster = new Cluster();
cluster.setName(clusterName);
Expand Down Expand Up @@ -4948,8 +4950,13 @@ public void linkDb(LinkDbStmt stmt) throws DdlException {
}
}

public Cluster getCluster(String cluster) {
return nameToCluster.get(cluster);
public Cluster getCluster(String clusterName) {
readLock();
try {
return nameToCluster.get(clusterName);
} finally {
readUnlock();
}
}

public List<String> getClusterNames() {
Expand Down Expand Up @@ -5029,19 +5036,16 @@ public long loadCluster(DataInputStream dis, long checksum) throws IOException,
cluster.readFields(dis);
checksum ^= cluster.getId();

// BE is in default_cluster when added , therefore it is possible that the BE
// in default_cluster are not the latest because cluster cant't be updated when
// loadCluster is after loadBackend. Because of forgeting to remove BE's id in
// cluster when drop BE or decommission in latest versions, need to update cluster's
// BE.
List<Long> latestBackendIds = systemInfo.getClusterBackendIds(cluster.getName());
if (cluster.getName().equalsIgnoreCase(SystemInfoService.DEFAULT_CLUSTER)
|| Catalog.getCurrentCatalogJournalVersion() <= FeMetaVersion.VERSION_34) {
cluster.setBackendIdList(latestBackendIds);
} else {
// The cluster has the same number of be as systeminfo recorded
Preconditions.checkState(latestBackendIds.size() == cluster.getBackendIdList().size());
if (latestBackendIds.size() != cluster.getBackendIdList().size()) {
LOG.warn("Cluster:" + cluster.getName() + ", backends in Cluster is "
+ cluster.getBackendIdList().size() + ", backends in SystemInfoService is "
+ cluster.getBackendIdList().size());
}
// The number of BE in cluster is not same as in SystemInfoService, when perform 'ALTER
// SYSTEM ADD BACKEND TO ...' or 'ALTER SYSTEM ADD BACKEND ...', because both of them are
// for adding BE to some Cluster, but loadCluster is after loadBackend.
cluster.setBackendIdList(latestBackendIds);

final InfoSchemaDb db = new InfoSchemaDb(cluster.getName());
db.setClusterName(cluster.getName());
Expand Down
3 changes: 3 additions & 0 deletions fe/src/com/baidu/palo/cluster/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,9 @@ public List<Long> getBackendIdList() {
}

public void setBackendIdList(List<Long> backendIdList) {
if (backendIdList == null) {
return;
}
writeLock();
try {
this.backendIdSet = Sets.newHashSet(backendIdList);
Expand Down
41 changes: 31 additions & 10 deletions fe/src/com/baidu/palo/system/SystemInfoService.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,18 @@ public void setMaster(String masterHost, int masterPort, int clusterId, String t
}

public void addBackends(List<Pair<String, Integer>> hostPortPairs, boolean isFree) throws DdlException {
addBackends(hostPortPairs, isFree, "");
}

/**
*
* @param hostPortPairs : backend's host and port
* @param isFree : if true the backend is not owned by any cluster
* @param destCluster : if not null or empty backend will be added to destCluster
* @throws DdlException
*/
public void addBackends(List<Pair<String, Integer>> hostPortPairs,
boolean isFree, String destCluster) throws DdlException {
for (Pair<String, Integer> pair : hostPortPairs) {
// check is already exist
if (getBackendWithHeartbeatPort(pair.first, pair.second) != null) {
Expand All @@ -140,7 +152,7 @@ public void addBackends(List<Pair<String, Integer>> hostPortPairs, boolean isFre
}

for (Pair<String, Integer> pair : hostPortPairs) {
addBackend(pair.first, pair.second, isFree);
addBackend(pair.first, pair.second, isFree, destCluster);
}
}

Expand All @@ -152,7 +164,15 @@ public void addBackend(Backend backend) {
idToBackendRef.set(newIdToBackend);
}

private void addBackend(String host, int heartbeatPort, boolean isFree) throws DdlException {
private void setBackendOwner(Backend backend, String clusterName) {
final Cluster cluster = Catalog.getInstance().getCluster(clusterName);
Preconditions.checkState(cluster != null);
cluster.addBackend(backend.getId());
backend.setOwnerClusterName(clusterName);
backend.setBackendState(BackendState.using);
}

private void addBackend(String host, int heartbeatPort, boolean isFree, String destCluster) throws DdlException {
Backend newBackend = new Backend(Catalog.getInstance().getNextId(), host, heartbeatPort);
// update idToBackend
Map<Long, Backend> copiedBackends = Maps.newHashMap(idToBackendRef.get());
Expand All @@ -174,13 +194,14 @@ private void addBackend(String host, int heartbeatPort, boolean isFree) throws D
ImmutableMap<Long, HeartbeatHandler> newIdToHeartbeatHandler = ImmutableMap.copyOf(copiedHeartbeatHandlersMap);
idToHeartbeatHandlerRef.set(newIdToHeartbeatHandler);

// to add be to DEFAULT_CLUSTER
if (!isFree) {
final Cluster cluster = Catalog.getInstance().getCluster(DEFAULT_CLUSTER);
Preconditions.checkState(cluster != null);
cluster.addBackend(newBackend.getId());
newBackend.setOwnerClusterName(DEFAULT_CLUSTER);
newBackend.setBackendState(BackendState.using);
if (!Strings.isNullOrEmpty(destCluster)) {
// add backend to destCluster
setBackendOwner(newBackend, destCluster);
} else if (!isFree) {
// add backend to DEFAULT_CLUSTER
setBackendOwner(newBackend, DEFAULT_CLUSTER);
} else {
// backend is free
}

// log
Expand Down Expand Up @@ -327,7 +348,7 @@ public List<Long> getBackendIds(boolean needAlive) {
*
* @param clusterName
* @param instanceNum
* @return
* @return if BE avaliable is less than requested , return null.
*/
public List<Long> createCluster(String clusterName, int instanceNum) {
final List<Long> chosenBackendIds = Lists.newArrayList();
Expand Down
4 changes: 4 additions & 0 deletions gensrc/parser/sql_parser.y
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,10 @@ alter_system_clause ::=
{:
RESULT = new AddBackendClause(hostPorts, true);
:}
| KW_ADD KW_BACKEND KW_TO ident:clusterName string_list:hostPorts
{:
RESULT = new AddBackendClause(hostPorts, clusterName);
:}
| KW_DROP KW_BACKEND string_list:hostPorts
{:
RESULT = new DropBackendClause(hostPorts, false);
Expand Down