Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix some bugs #69

Merged
merged 5 commits into from
Sep 4, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/agent/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

namespace palo {

const uint32_t MASTER_CLIENT_TIMEOUT = 500;
const uint32_t MASTER_CLIENT_TIMEOUT = 3000;

// client cache
// All service client should be defined in client_cache.h
Expand Down
6 changes: 5 additions & 1 deletion be/src/exec/hash_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,13 @@ Status HashJoinNode::prepare(RuntimeState* state) {
_build_tuple_row_size = num_build_tuples * sizeof(Tuple*);

// TODO: default buckets
const bool stores_nulls = _join_op == TJoinOp::RIGHT_OUTER_JOIN
|| _join_op == TJoinOp::FULL_OUTER_JOIN
|| _join_op == TJoinOp::RIGHT_ANTI_JOIN
|| _join_op == TJoinOp::RIGHT_SEMI_JOIN;
_hash_tbl.reset(new HashTable(
_build_expr_ctxs, _probe_expr_ctxs, _build_tuple_size,
false, id(), mem_tracker(), 1024));
stores_nulls, id(), mem_tracker(), 1024));

_probe_batch.reset(new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker()));

Expand Down
2 changes: 1 addition & 1 deletion fe/src/com/baidu/palo/PaloFe.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public static void main(String[] args) {
}

// pid file
if (!createAndLockPidFile(paloHome + "/bin/fe.pid")) {
if (!createAndLockPidFile(System.getenv("PID_DIR") + "/fe.pid")) {
throw new IOException("pid file is already locked.");
}

Expand Down
25 changes: 13 additions & 12 deletions fe/src/com/baidu/palo/alter/DecommissionBackendJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ public synchronized int tryFinishJob() {
}
}
} else {
// Shrinking capacity in cluser
// Shrinking capacity in cluster
if (decommissionType == DecommissionType.ClusterDecommission) {
for (String clusterName : clusterBackendsMap.keySet()) {
final Map<Long, Backend> idToBackend = clusterBackendsMap.get(clusterName);
Expand Down Expand Up @@ -571,6 +571,17 @@ public void readFields(DataInput in) throws IOException {
}
clusterBackendsMap.put(cluster, backends);
}

if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_33) {
String str = Text.readString(in);
// this is only for rectify misspellings...
if (str.equals("SystemDecomission")) {
str = "SystemDecommission";
} else if (str.equals("ClusterDecomission")) {
str = "ClusterDecommission";
}
decommissionType = DecommissionType.valueOf(str);
}
} else {
int backendNum = in.readInt();
Map<Long, Backend> backends = Maps.newHashMap();
Expand All @@ -582,17 +593,6 @@ public void readFields(DataInput in) throws IOException {
}
clusterBackendsMap.put(SystemInfoService.DEFAULT_CLUSTER, backends);
}

if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_33) {
String str = Text.readString(in);
// this is only for rectify misspellings...
if (str.equals("SystemDecomission")) {
str = "SystemDecommission";
} else if (str.equals("ClusterDecomission")) {
str = "ClusterDecommission";
}
decommissionType = DecommissionType.valueOf(str);
}
}

@Override
Expand All @@ -608,6 +608,7 @@ public void write(DataOutput out) throws IOException {
out.writeLong(id);
}
}

Text.writeString(out, decommissionType.toString());
}

Expand Down
3 changes: 1 addition & 2 deletions fe/src/com/baidu/palo/alter/SchemaChangeJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -829,10 +829,9 @@ public void unprotectedReplayCancel(Database db) {
}
for (Tablet tablet : index.getTablets()) {
for (Replica replica : tablet.getReplicas()) {
if (replica.getState() == ReplicaState.CLONE) {
if (replica.getState() == ReplicaState.CLONE || replica.getState() == ReplicaState.NORMAL) {
continue;
}
Preconditions.checkState(replica.getState() == ReplicaState.SCHEMA_CHANGE, replica.getState());
replica.setState(ReplicaState.NORMAL);
} // end for replicas
} // end for tablets
Expand Down
25 changes: 16 additions & 9 deletions fe/src/com/baidu/palo/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.baidu.palo.catalog.Catalog;
import com.baidu.palo.catalog.Column;
import com.baidu.palo.catalog.Database;
import com.baidu.palo.catalog.InfoSchemaDb;
import com.baidu.palo.catalog.Table;
import com.baidu.palo.cluster.ClusterNamespace;
import com.baidu.palo.catalog.Type;
Expand Down Expand Up @@ -526,6 +527,10 @@ public SlotDescriptor registerColumnRef(TableName tblName, String colName) throw
if (tblName == null) {
d = resolveColumnRef(colName);
} else {
if (InfoSchemaDb.isInfoSchemaDb(tblName.getDb()) ||
(tblName.getDb() == null && InfoSchemaDb.isInfoSchemaDb(getDefaultDb()))) {
tblName = new TableName(tblName.getDb(), tblName.getTbl().toLowerCase());
}
d = resolveColumnRef(tblName, colName);
}
if (d == null && hasAncestors() && isSubquery) {
Expand Down Expand Up @@ -728,15 +733,15 @@ private void registerConjunct(Expr e) {

e.setId(globalState.conjunctIdGenerator.getNextId());
globalState.conjuncts.put(e.getId(), e);

// LOG.info("registered conjunct " + p.getId().toString() + ": " + p.toSql());

ArrayList<TupleId> tupleIds = Lists.newArrayList();
ArrayList<SlotId> slotIds = Lists.newArrayList();
e.getIds(tupleIds, slotIds);

// register full join conjuncts
registerFullOuterJoinedConjunct(e);

// update tuplePredicates
for (TupleId id : tupleIds) {
if (!tuplePredicates.containsKey(id)) {
Expand Down Expand Up @@ -855,10 +860,10 @@ public List<Expr> getUnassignedConjuncts(
public List<Expr> getAllUnassignedConjuncts(List<TupleId> tupleIds) {
List<Expr> result = Lists.newArrayList();
for (Expr e : globalState.conjuncts.values()) {
if (!e.isAuxExpr()
&& e.isBoundByTupleIds(tupleIds)
&& !globalState.assignedConjuncts.contains(e.getId())
&& !globalState.ojClauseByConjunct.containsKey(e.getId())) {
if (!e.isAuxExpr()
&& e.isBoundByTupleIds(tupleIds)
&& !globalState.assignedConjuncts.contains(e.getId())
&& !globalState.ojClauseByConjunct.containsKey(e.getId())) {
result.add(e);
}
}
Expand Down Expand Up @@ -1265,8 +1270,10 @@ public Type castAllToCompatibleType(List<Expr> exprs) throws AnalysisException {
// TODO(zc)
compatibleType = Type.getCmpType(compatibleType, exprs.get(i).getType());
}
if (compatibleType == Type.VARCHAR && exprs.get(0).getType().isDateType()) {
compatibleType = Type.DATETIME;
if (compatibleType == Type.VARCHAR) {
if (exprs.get(0).getType().isDateType()) {
compatibleType = Type.DATETIME;
}
}
// Add implicit casts if necessary.
for (int i = 0; i < exprs.size(); ++i) {
Expand Down Expand Up @@ -1412,7 +1419,7 @@ public boolean canEvalPredicate(List<TupleId> tupleIds, Expr e) {
}

if (e.isOnClauseConjunct()) {

if (isAntiJoinedConjunct(e)) return canEvalAntiJoinedConjunct(e, tupleIds);
if (isIjConjunct(e) || isSjConjunct(e)) {
if (!containsOuterJoinedTid(tids)) return true;
Expand Down
12 changes: 11 additions & 1 deletion fe/src/com/baidu/palo/analysis/CreateTableStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.baidu.palo.catalog.Catalog;
import com.baidu.palo.catalog.Column;
import com.baidu.palo.catalog.KeysType;
import com.baidu.palo.catalog.PartitionType;
import com.baidu.palo.common.AnalysisException;
import com.baidu.palo.common.ErrorCode;
import com.baidu.palo.common.ErrorReport;
Expand Down Expand Up @@ -279,7 +280,16 @@ public void analyze(Analyzer analyzer) throws AnalysisException, InternalExcepti
if (engineName.equals("olap")) {
// analyze partition
if (partitionDesc != null) {
partitionDesc.analyze(columnSet, properties);
if (partitionDesc.getType() != PartitionType.RANGE) {
throw new AnalysisException("Currently only support range partition with engine type olap");
}

RangePartitionDesc rangePartitionDesc = (RangePartitionDesc) partitionDesc;
if (rangePartitionDesc.getPartitionColNames().size() != 1) {
throw new AnalysisException("Only allow partitioned by one column");
}

rangePartitionDesc.analyze(columns, properties);
}

// analyze distribution
Expand Down
4 changes: 4 additions & 0 deletions fe/src/com/baidu/palo/analysis/DateLiteral.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,17 @@

import com.google.common.base.Preconditions;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Date;

public class DateLiteral extends LiteralExpr {
private static final Logger LOG = LogManager.getLogger(DateLiteral.class);
private Date date;

private DateLiteral() {
Expand Down
4 changes: 2 additions & 2 deletions fe/src/com/baidu/palo/analysis/ExprSubstitutionMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ public ExprSubstitutionMap() {
}

// Only used to convert show statement to select statement
public ExprSubstitutionMap(boolean check_analyzed) {
public ExprSubstitutionMap(boolean checkAnalyzed) {
this(Lists.<Expr>newArrayList(), Lists.<Expr>newArrayList());
this.checkAnalyzed_ = false;
this.checkAnalyzed_ = checkAnalyzed;
}

public ExprSubstitutionMap(List<Expr> lhs, List<Expr> rhs) {
Expand Down
6 changes: 5 additions & 1 deletion fe/src/com/baidu/palo/analysis/PartitionDesc.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,14 @@ public class PartitionDesc implements Writable {
public PartitionDesc() {
}

public void analyze(Set<String> colSet, Map<String, String> otherProperties) throws AnalysisException {
public void analyze(List<Column> cols, Map<String, String> otherProperties) throws AnalysisException {
throw new NotImplementedException();
}

public PartitionType getType() {
return type;
}

public String toSql() {
throw new NotImplementedException();
}
Expand Down
32 changes: 19 additions & 13 deletions fe/src/com/baidu/palo/analysis/RangePartitionDesc.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,29 +70,36 @@ public List<String> getPartitionColNames() {
}

@Override
public void analyze(Set<String> cols, Map<String, String> otherProperties) throws AnalysisException {
public void analyze(List<Column> cols, Map<String, String> otherProperties) throws AnalysisException {
if (partitionColNames == null || partitionColNames.isEmpty()) {
throw new AnalysisException("No partition columns.");
}

if (partitionColNames.size() != 1) {
throw new AnalysisException("Only allow partitioned by one column");
}

Set<String> partColNames = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
for (String partitionCol : partitionColNames) {
// use this to infer user which columns not exist
if (!cols.contains(partitionCol)) {
throw new AnalysisException("Partition column[" + partitionCol + "] does not exist.");
if (!partColNames.add(partitionCol)) {
throw new AnalysisException("Duplicated partition column " + partitionCol);
}

boolean found = false;
for (Column col : cols) {
if (col.getName().equals(partitionCol)) {
if (!col.isKey()) {
throw new AnalysisException("Only key column can be partition column");
}
found = true;
break;
}
}
if (partitionCol.equals(PrimitiveType.HLL.toString())) {
throw new AnalysisException("Partition column[" + partitionCol + "] can't be HLL.");

if (!found) {
throw new AnalysisException("Partition column[" + partitionCol + "] does not exist in column list.");
}
}

Set<String> nameSet = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
for (SingleRangePartitionDesc desc : singleRangePartitionDescs) {
if (nameSet.contains(desc.getPartitionName())) {
if (!nameSet.add(desc.getPartitionName())) {
throw new AnalysisException("Duplicated partition name: " + desc.getPartitionName());
}
// in create table stmt, we use given properties
Expand All @@ -102,7 +109,6 @@ public void analyze(Set<String> cols, Map<String, String> otherProperties) throw
givenProperties = Maps.newHashMap(otherProperties);
}
desc.analyze(cols.size(), givenProperties);
nameSet.add(desc.getPartitionName());
}
}

Expand Down
2 changes: 2 additions & 0 deletions fe/src/com/baidu/palo/analysis/SetStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ public RedirectStatus getRedirectStatus() {
for (SetVar var : setVars) {
if (var instanceof SetPassVar) {
return RedirectStatus.FORWARD_WITH_SYNC;
} else if (var.getType() == SetType.GLOBAL) {
return RedirectStatus.FORWARD_WITH_SYNC;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion fe/src/com/baidu/palo/analysis/ShowVariablesStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public SelectStmt toSelectStmt(Analyzer analyzer) {
analyze(analyzer);
// Columns
SelectList selectList = new SelectList();
ExprSubstitutionMap aliasMap = new ExprSubstitutionMap();
ExprSubstitutionMap aliasMap = new ExprSubstitutionMap(false);
TableName tableName = null;
if (type == SetType.GLOBAL) {
tableName = new TableName(InfoSchemaDb.DATABASE_NAME, "GLOBAL_VARIABLES");
Expand Down
Loading