Skip to content

Commit

Permalink
fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
morningman-cmy authored and morningman committed May 23, 2020
1 parent d25ba8b commit 4a6a21c
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 15 deletions.
14 changes: 11 additions & 3 deletions be/src/runtime/file_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ FileResultWriter::FileResultWriter(
}

FileResultWriter::~FileResultWriter() {
close();
_close_file_writer(true);
}

Status FileResultWriter::init(RuntimeState* state) {
Expand Down Expand Up @@ -282,13 +282,15 @@ Status FileResultWriter::_create_new_file_if_exceed_size() {
}
// current file size exceed the max file size. close this file
// and create new one
RETURN_IF_ERROR(_close_file_writer(false));
{
SCOPED_TIMER(_writer_close_timer);
RETURN_IF_ERROR(_close_file_writer(false));
}
_current_written_bytes = 0;
return Status::OK();
}

Status FileResultWriter::_close_file_writer(bool done) {
SCOPED_TIMER(_writer_close_timer);
if (_parquet_writer != nullptr) {
_parquet_writer->close();
delete _parquet_writer;
Expand All @@ -310,7 +312,13 @@ Status FileResultWriter::_close_file_writer(bool done) {
}

Status FileResultWriter::close() {
// the following 2 profile "_written_rows_counter" and "_writer_close_timer"
// must be outside the `_close_file_writer()`.
// because `_close_file_writer()` may be called in deconstructor,
// at that time, the RuntimeState may already been deconstructed,
// so does the profile in RuntimeState.
COUNTER_SET(_written_rows_counter, _written_rows);
SCOPED_TIMER(_writer_close_timer);
RETURN_IF_ERROR(_close_file_writer(true));
return Status::OK();
}
Expand Down
34 changes: 22 additions & 12 deletions fe/src/main/java/org/apache/doris/analysis/OutFileClause.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,19 @@

import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

// For syntax select * from tbl INTO OUTFILE xxxx
public class OutFileClause {
private static final Logger LOG = LogManager.getLogger(OutFileClause.class);

private static final String BROKER_PROP_PREFIX = "broker.";
private static final String PROP_BROKER_NAME = "broker.name";
Expand Down Expand Up @@ -107,7 +114,8 @@ private void analyzeProperties() throws AnalysisException {
return;
}

getBrokerProperties();
Set<String> processedPropKeys = Sets.newHashSet();
getBrokerProperties(processedPropKeys);
if (brokerDesc == null) {
return;
}
Expand All @@ -117,44 +125,46 @@ private void analyzeProperties() throws AnalysisException {
throw new AnalysisException(PROP_COLUMN_SEPARATOR + " is only for CSV format");
}
columnSeparator = properties.get(PROP_COLUMN_SEPARATOR);
properties.remove(PROP_COLUMN_SEPARATOR);
processedPropKeys.add(PROP_COLUMN_SEPARATOR);
}

if (properties.containsKey(PROP_LINE_DELIMITER)) {
if (!isCsvFormat()) {
throw new AnalysisException(PROP_LINE_DELIMITER + " is only for CSV format");
}
lineDelimiter = properties.get(PROP_LINE_DELIMITER);
properties.remove(PROP_LINE_DELIMITER);
processedPropKeys.add(PROP_LINE_DELIMITER);
}

if (properties.containsKey(PROP_MAX_FILE_SIZE)) {
maxFileSizeBytes = ParseUtil.analyzeDataVolumn(properties.get(PROP_MAX_FILE_SIZE));
if (maxFileSizeBytes > MAX_FILE_SIZE_BYTES || maxFileSizeBytes < MIN_FILE_SIZE_BYTES) {
throw new AnalysisException("max file size should between 5MB and 2GB. Given: " + maxFileSizeBytes);
}
properties.remove(PROP_MAX_FILE_SIZE);
processedPropKeys.add(PROP_MAX_FILE_SIZE);
}

if (!properties.isEmpty()) {
throw new AnalysisException("Unknown properties: " + properties);
if (processedPropKeys.size() != properties.size()) {
LOG.debug("{} vs {}", processedPropKeys, properties);
throw new AnalysisException("Unknown properties: " + properties.keySet().stream()
.filter(k -> !processedPropKeys.contains(k)).collect(Collectors.toList()));
}
}

private void getBrokerProperties() {
private void getBrokerProperties(Set<String> processedPropKeys) {
if (!properties.containsKey(PROP_BROKER_NAME)) {
return;
}
String brokerName = properties.get(PROP_BROKER_NAME);
properties.remove(PROP_BROKER_NAME);
processedPropKeys.add(PROP_BROKER_NAME);

Map<String, String> brokerProps = Maps.newHashMap();
Iterator<Map.Entry<String, String>> iter = properties.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, String> entry = iter.next();
if (entry.getKey().startsWith(BROKER_PROP_PREFIX)) {
brokerProps.put(entry.getKey(), entry.getValue());
iter.remove();
if (entry.getKey().startsWith(BROKER_PROP_PREFIX) && !entry.getKey().equals(PROP_BROKER_NAME)) {
brokerProps.put(entry.getKey().substring(BROKER_PROP_PREFIX.length()), entry.getValue());
processedPropKeys.add(entry.getKey());
}
}

Expand All @@ -179,7 +189,6 @@ public OutFileClause clone() {
public String toSql() {
StringBuilder sb = new StringBuilder();
sb.append(" INTO OUTFILE '").append(filePath).append(" FORMAT AS ").append(format);
sb.append(brokerDesc.toSql());
if (properties != null && !properties.isEmpty()) {
sb.append(" PROPERTIES(");
sb.append(new PrintableMap<>(properties, " = ", true, false));
Expand All @@ -204,3 +213,4 @@ public TResultFileSinkOptions toSinkOptions() {
}
}


0 comments on commit 4a6a21c

Please sign in to comment.