Skip to content

Commit

Permalink
#236 rollback config
Browse files Browse the repository at this point in the history
  • Loading branch information
yanhuqing666 committed Aug 26, 2017
1 parent 281070f commit 115a743
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 44 deletions.
19 changes: 17 additions & 2 deletions src/main/java/io/mycat/config/MycatConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,24 @@ public void reload(Map<String, UserConfig> newUsers, Map<String, SchemaConfig> n
this.status = reloadAll ? RELOAD_ALL : RELOAD;
}

public boolean canRollbackAll() {
if (status != RELOAD_ALL) {
return false;
} else if (users2 == null || schemas2 == null || firewall2 == null || dataNodes2 == null || dataHosts2 == null) {
return false;
} else {
return true;
}
}

public boolean canRollback() {
return users2 != null && schemas2 != null && dataNodes2 != null && dataHosts2 != null &&
firewall2 != null && status != ROLLBACK;
if (status != RELOAD) {
return false;
} else if (users2 == null || schemas2 == null || firewall2 == null) {
return false;
} else {
return true;
}
}

public void rollback(Map<String, UserConfig> backupUsers, Map<String, SchemaConfig> backupSchemas,
Expand Down
77 changes: 40 additions & 37 deletions src/main/java/io/mycat/manager/response/RollbackConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,56 +129,59 @@ private static void writeOKResult(ManagerConnection c) {
}

private static void writeErrorResult(ManagerConnection c, String errorMsg) {
String sb = "Rollback config failure.The reason is " + errorMsg;
String sb = "Rollback config failure.The reason is that " + errorMsg;
LOGGER.warn(sb + "." + String.valueOf(c));
c.writeErrMessage(ErrorCode.ER_YES, sb);
}

public static void rollback() throws Exception {
MycatConfig conf = MycatServer.getInstance().getConfig();
Map<String, PhysicalDBPool> dataHosts = conf.getBackupDataHosts();

// 检查可回滚状态
if (!conf.canRollback()) {
throw new Exception("Conf can not be rollback because of no old version");
}

// 如果回滚已经存在的pool
boolean rollbackStatus = true;
String errorMsg = null;
for (PhysicalDBPool dn : dataHosts.values()) {
dn.init(dn.getActiveIndex());
if (!dn.isInitSuccess()) {
rollbackStatus = false;
errorMsg = "dataHost" + dn.getHostName() + " inited failure";
break;
}
}
// 如果回滚不成功,则清理已初始化的资源。
if (!rollbackStatus) {
for (PhysicalDBPool dn : dataHosts.values()) {
dn.clearDataSources("rollbackup config");
dn.stopHeartbeat();
}
throw new Exception(errorMsg);
}
// 应用回滚
Map<String, UserConfig> users = conf.getBackupUsers();
Map<String, SchemaConfig> schemas = conf.getBackupSchemas();
Map<String, PhysicalDBNode> dataNodes = conf.getBackupDataNodes();
FirewallConfig firewall = conf.getBackupFirewall();
Map<ERTable, Set<ERTable>> erRelations = conf.getBackupErRelations();
conf.rollback(users, schemas, dataNodes, dataHosts, erRelations, firewall);

// 处理旧的资源
Map<String, PhysicalDBPool> cNodes = conf.getDataHosts();
for (PhysicalDBPool dn : cNodes.values()) {
dn.clearDataSources("clear old config ");
dn.stopHeartbeat();
// 检查可回滚状态
if (conf.canRollback()) {
conf.rollback(users, schemas, dataNodes, dataHosts, erRelations, firewall);
//清理缓存
MycatServer.getInstance().getCacheService().clearCache();
MycatServer.getInstance().reloadMetaData();
} else if (conf.canRollbackAll()) {
Map<String, PhysicalDBPool> cNodes = conf.getDataHosts();
// 如果回滚已经存在的pool
boolean rollbackStatus = true;
String errorMsg = null;
for (PhysicalDBPool dn : dataHosts.values()) {
dn.init(dn.getActiveIndex());
if (!dn.isInitSuccess()) {
rollbackStatus = false;
errorMsg = "dataHost[" + dn.getHostName() + "] inited failure";
break;
}
}
// 如果回滚不成功,则清理已初始化的资源。
if (!rollbackStatus && dataHosts != null) {
for (PhysicalDBPool dn : dataHosts.values()) {
dn.clearDataSources("rollbackup config");
dn.stopHeartbeat();
}
throw new Exception(errorMsg);
}
// 应用回滚
conf.rollback(users, schemas, dataNodes, dataHosts, erRelations, firewall);
// 处理旧的资源
for (PhysicalDBPool dn : cNodes.values()) {
dn.clearDataSources("clear old config ");
dn.stopHeartbeat();
}
//清理缓存
MycatServer.getInstance().getCacheService().clearCache();
MycatServer.getInstance().reloadMetaData();
} else {
throw new Exception("there is no old version");
}

//清理缓存
MycatServer.getInstance().getCacheService().clearCache();
MycatServer.getInstance().reloadMetaData();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,6 @@ private String getFieldName(SQLSelectItem item) {
private Map<String, String> parseAggGroupCommon(SchemaConfig schema, SQLStatement stmt, RouteResultset rrs,
MySqlSelectQueryBlock mysqlSelectQuery, TableConfig tc) throws SQLException {
Map<String, String> aliaColumns = new HashMap<>();
Map<String, Integer> aggrColumns = new HashMap<>();

parseAggExprCommon(schema, rrs, mysqlSelectQuery, aliaColumns, tc);
if (rrs.isNeedOptimizer()) {
Expand All @@ -307,10 +306,6 @@ private Map<String, String> parseAggGroupCommon(SchemaConfig schema, SQLStatemen
return aliaColumns;
}

if (aggrColumns.size() > 0) {
rrs.setMergeCols(aggrColumns);
}

// 通过优化转换成group by来实现
boolean isNeedChangeSql = (mysqlSelectQuery.getDistionOption() == SQLSetQuantifier.DISTINCT) || (mysqlSelectQuery.getDistionOption() == SQLSetQuantifier.DISTINCTROW);
if (isNeedChangeSql) {
Expand Down

0 comments on commit 115a743

Please sign in to comment.