Skip to content

Commit

Permalink
Simplify response and error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
zhicwu committed Oct 19, 2021
1 parent dae6fdd commit 6d18204
Show file tree
Hide file tree
Showing 13 changed files with 433 additions and 247 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.G
ClickHouseResponse resp = client.connect(server)
.format(ClickHouseFormat.RowBinaryWithNamesAndTypes).set("send_logs_level", "trace")
.query("select id, name from some_table where id in :ids and name like :name").params(Arrays.asList(1,2,3), "%key%").execute().get()) {
// you can also use resp.recordStream() as well
// you can also use resp.stream() as well
for (ClickHouseRecord record : resp.records()) {
int id = record.getValue(0).asInteger();
String name = record.getValue(1).asString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ static CompletableFuture<ClickHouseResponseSummary> dump(ClickHouseNode server,
}

try (ClickHouseResponse response = request.execute().get()) {
response.dump(output);
response.pipe(output, 8192);
return response.getSummary();
}
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ protected ClickHouseDataProcessor(ClickHouseConfig config, InputStream input, Ou
}

/**
* Get column list.
* Gets list of columns to process.
*
* @return list of columns to process
*/
Expand All @@ -100,9 +100,12 @@ public final List<ClickHouseColumn> getColumns() {
}

/**
* Return iterable to walk through all records in a for-each loop.
* Returns an iterable collection of records which can be walked through in a
* foreach loop. Please pay attention that: 1)
* {@link java.io.UncheckedIOException} might be thrown when iterating through
* the collection; and 2) it's not supposed to be called for more than once.
*
* @return iterable
* @return non-null iterable collection
*/
public abstract Iterable<ClickHouseRecord> records();
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,248 +4,110 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Collections;
import java.util.Iterator;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import com.clickhouse.client.exception.ClickHouseException;
import com.clickhouse.client.exception.ClickHouseExceptionSpecifier;
import com.clickhouse.client.logging.Logger;
import com.clickhouse.client.logging.LoggerFactory;

/**
* This represents server reponse. To get data returned from server, depending
* on actual needs, you have 3 options:
* This encapsulates a server reponse. Depending on concrete implementation, it
* could be either an in-memory list or a wrapped input stream with
* {@link ClickHouseDataProcessor} attached for deserialization. To get data
* returned from server, depending on actual needs, you have 3 options:
*
* <ul>
* <li>use {@link #records()} or {@link #recordStream()} to get deserialized
* records(usually rows), a record is composed of one or more values</li>
* <li>use {@link #values()} or {@link #valueStream()} to get deserialized
* values</li>
* <li>use {@link #getInputStream()} for custom processing like dumping results
* into a file</li>
* <li>use {@link #records()} or {@link #stream()} to get deserialized
* {@link ClickHouseRecord} one at a time</li>
* <li>use {@link #firstRecord()} if you're certain that all you need is the
* first {@link ClickHouseRecord}</li>
* <li>use {@link #getInputStream()} or {@link #pipe(OutputStream, int)} if you
* prefer to handle stream instead of deserialized data</li>
* </ul>
*/
public class ClickHouseResponse implements AutoCloseable, Serializable {
private static final Logger log = LoggerFactory.getLogger(ClickHouseResponse.class);

private static final long serialVersionUID = 2271296998310082447L;

private static final ClickHouseResponseSummary emptySummary = new ClickHouseResponseSummary() {
};

protected static final List<ClickHouseColumn> defaultTypes = Collections
.singletonList(ClickHouseColumn.of("results", "Nullable(String)"));

protected final ClickHouseConfig config;
protected final ClickHouseNode server;
protected final InputStream input;
protected final ClickHouseDataProcessor processor;
protected final List<ClickHouseColumn> columns;
protected final Throwable error;

private boolean isClosed;

protected ClickHouseResponse(ClickHouseConfig config, ClickHouseNode server, Throwable error)
throws ClickHouseException {
this(config, server, null, null, null, error);
}

protected ClickHouseResponse(ClickHouseConfig config, ClickHouseNode server, Map<String, Object> settings,
InputStream input, List<ClickHouseColumn> columns, Throwable error) throws ClickHouseException {
try {
this.config = ClickHouseChecker.nonNull(config, "config");
this.server = ClickHouseChecker.nonNull(server, "server");

if (error != null) {
this.processor = null;
this.input = null;
// response object may be constructed in a separate thread
this.error = error;
} else if (input == null) {
throw new IllegalArgumentException("input cannot be null when there's no error");
} else {
this.input = input;
this.processor = ClickHouseDataStreamFactory.getInstance().getProcessor(config, input, null, settings,
columns);

this.error = null;
}

this.isClosed = input == null;
this.columns = columns != null ? columns
: (processor != null ? processor.getColumns() : Collections.emptyList());
} catch (IOException | RuntimeException e) { // TODO and Error?
if (input != null) {
log.warn("Failed to instantiate response object, will try to close the given input stream");
try {
input.close();
} catch (IOException exp) {
log.warn("Failed to close given input stream", exp);
}
}

throw ClickHouseExceptionSpecifier.specify(e, server);
}
}

protected void throwErrorIfAny() throws ClickHouseException {
if (error == null) {
return;
}

if (error instanceof ClickHouseException) {
throw (ClickHouseException) error;
} else {
throw ClickHouseExceptionSpecifier.specify(error, server);
}
}

public boolean isClosed() {
return isClosed;
}

@Override
public void close() {
if (input != null) {
long skipped = 0L;
try {
skipped = input.skip(Long.MAX_VALUE);
log.debug("%d bytes skipped before closing input stream", skipped);
} catch (Exception e) {
// ignore
log.debug("%d bytes skipped before closing input stream", skipped, e);
} finally {
try {
input.close();
} catch (Exception e) {
log.warn("Failed to close input stream", e);
}
isClosed = true;
}
}
}

public boolean hasError() {
return error != null;
}

public List<ClickHouseColumn> getColumns() throws ClickHouseException {
throwErrorIfAny();

return columns;
}

public ClickHouseFormat getFormat() throws ClickHouseException {
throwErrorIfAny();

return this.config.getFormat();
}

public ClickHouseNode getServer() {
return server;
}
public interface ClickHouseResponse extends AutoCloseable, Serializable {
/**
* Gets list of columns.
*
* @return non-null list of column
*/
List<ClickHouseColumn> getColumns();

public ClickHouseResponseSummary getSummary() {
return emptySummary;
}
/**
* Gets summary of this response. Keep in mind that the summary may change over
* time until response is closed.
*
* @return non-null summary of this response
*/
ClickHouseResponseSummary getSummary();

/**
* This is the most memory-efficient way for you to handle data returned from
* ClickHouse. However, this also means additional work is required for
* deserialization, especially when using a binary format.
* Gets input stream of the response. In general, this is the most
* memory-efficient way for streaming data from server to client. However, this
* also means additional work is required for deserialization, especially when
* using a binary format.
*
* @return input stream get raw data returned from server
* @throws ClickHouseException when failed to get input stream or read data
* @return input stream for getting raw data returned from server
*/
public InputStream getInputStream() throws ClickHouseException {
throwErrorIfAny();
InputStream getInputStream();

return input;
/**
* Gets the first record only. Please use {@link #records()} instead if you need
* to access the rest of records.
*
* @return the first record
* @throws NoSuchElementException when there's no record at all
* @throws UncheckedIOException when failed to read data(e.g. deserialization)
*/
default ClickHouseRecord firstRecord() {
return records().iterator().next();
}

/**
* Dump response into output stream.
* Returns an iterable collection of records which can be walked through in a
* foreach loop. Please pay attention that: 1) {@link UncheckedIOException}
* might be thrown when iterating through the collection; and 2) it's not
* supposed to be called for more than once.
*
* @param output output stream, which will remain open
* @throws ClickHouseException when error occurred dumping response and/or
* writing data into output stream
* @return non-null iterable collection
*/
public void dump(OutputStream output) throws ClickHouseException {
throwErrorIfAny();
Iterable<ClickHouseRecord> records();

/**
* Pipes the contents of this response into the given output stream.
*
* @param output non-null output stream, which will remain open
* @param bufferSize buffer size, 0 or negative value will be treated as 8192
* @throws IOException when error occurred reading or writing data
*/
default void pipe(OutputStream output, int bufferSize) throws IOException {
ClickHouseChecker.nonNull(output, "output");

// TODO configurable buffer size
int size = 8192;
byte[] buffer = new byte[size];
int counter = 0;
try {
while ((counter = input.read(buffer, 0, size)) >= 0) {
output.write(buffer, 0, counter);
}
} catch (IOException e) {
throw ClickHouseExceptionSpecifier.specify(e, server);
if (bufferSize <= 0) {
bufferSize = 8192;
}
}

public Iterable<ClickHouseRecord> records() throws ClickHouseException {
throwErrorIfAny();

if (processor == null) {
throw new UnsupportedOperationException(
"No data processor available for deserialization, please use getInputStream instead");
byte[] buffer = new byte[bufferSize];
int counter = 0;
while ((counter = getInputStream().read(buffer, 0, bufferSize)) >= 0) {
output.write(buffer, 0, counter);
}

return processor.records();
// caller's responsibility to call output.flush() as needed
}

public Stream<ClickHouseRecord> recordStream() throws ClickHouseException {
/**
* Gets stream of records to process.
*
* @return stream of records
*/
default Stream<ClickHouseRecord> stream() {
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(records().iterator(),
Spliterator.IMMUTABLE | Spliterator.NONNULL | Spliterator.ORDERED), false);
}

public Iterable<ClickHouseValue> values() throws ClickHouseException {
throwErrorIfAny();

if (processor == null) {
throw new UnsupportedOperationException(
"No data processor available for deserialization, please use getInputStream instead");
}

return new Iterable<ClickHouseValue>() {
@Override
public Iterator<ClickHouseValue> iterator() {
return new Iterator<ClickHouseValue>() {
Iterator<ClickHouseRecord> records = processor.records().iterator();
ClickHouseRecord current = null;
int index = 0;

@Override
public boolean hasNext() {
return records.hasNext() || (current != null && index < current.size());
}

@Override
public ClickHouseValue next() {
if (current == null || index == current.size()) {
current = records.next();
index = 0;
}

return current.getValue(index++);
}
};
}
};
}

public Stream<ClickHouseValue> valueStream() throws ClickHouseException {
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(values().iterator(),
Spliterator.IMMUTABLE | Spliterator.NONNULL | Spliterator.ORDERED), false);
}
@Override
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumMap;
Expand Down Expand Up @@ -344,13 +345,13 @@ void readNextRow() {
if (index == 0) { // end of the stream, which is fine
values = null;
} else {
throw new IllegalStateException(
throw new UncheckedIOException(
ClickHouseUtils.format("Reached end of the stream when reading column #%d(total %d): %s",
index + 1, size, column),
e);
}
} catch (IOException e) {
throw new IllegalStateException(
throw new UncheckedIOException(
ClickHouseUtils.format("Failed to read column #%d(total %d): %s", index + 1, size, column), e);
}
}
Expand All @@ -360,7 +361,7 @@ public boolean hasNext() {
try {
return input.available() > 0;
} catch (IOException e) {
throw new IllegalStateException(e);
throw new UncheckedIOException(e);
}
}

Expand Down
Loading

0 comments on commit 6d18204

Please sign in to comment.