Skip to content

Commit

Permalink
Lazily read Prometheus response
Browse files Browse the repository at this point in the history
  • Loading branch information
wendigo committed Sep 2, 2024
1 parent 391a7d9 commit 92d301c
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import okhttp3.OkHttpClient.Builder;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -159,15 +160,21 @@ private String toRemoteTableName(String tableName)

private Map<String, Object> fetchMetrics(JsonCodec<Map<String, Object>> metricsCodec, URI metadataUri)
{
return metricsCodec.fromJson(fetchUri(metadataUri));
try (ResponseBody body = fetchUri(metadataUri)) {
return metricsCodec.fromJson(body.string());
}
catch (IOException e) {
throw new TrinoException(PROMETHEUS_UNKNOWN_ERROR, "Error reading metadata", e);
}
}

public byte[] fetchUri(URI uri)
public ResponseBody fetchUri(URI uri)
{
Request.Builder requestBuilder = new Request.Builder().url(uri.toString());
try (Response response = httpClient.newCall(requestBuilder.build()).execute()) {
try {
Response response = httpClient.newCall(requestBuilder.build()).execute();
if (response.isSuccessful() && response.body() != null) {
return response.body().bytes();
return response.body();
}
throw new TrinoException(PROMETHEUS_UNKNOWN_ERROR, "Bad response " + response.code() + " " + response.message());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.trino.plugin.prometheus;

import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteSource;
import com.google.common.io.CountingInputStream;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
Expand All @@ -30,6 +29,7 @@
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeUtils;
import io.trino.spi.type.VarcharType;
import okhttp3.ResponseBody;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -64,20 +64,22 @@ public class PrometheusRecordCursor

private final Iterator<PrometheusStandardizedRow> metricsItr;
private final long totalBytes;
private final Runnable closeResponse;

private PrometheusStandardizedRow fields;

public PrometheusRecordCursor(List<PrometheusColumnHandle> columnHandles, ByteSource byteSource)
public PrometheusRecordCursor(List<PrometheusColumnHandle> columnHandles, ResponseBody responseBody)
{
this.columnHandles = columnHandles;
this.closeResponse = responseBody::close;

fieldToColumnIndex = new int[columnHandles.size()];
for (int i = 0; i < columnHandles.size(); i++) {
PrometheusColumnHandle columnHandle = columnHandles.get(i);
fieldToColumnIndex[i] = columnHandle.ordinalPosition();
}

try (CountingInputStream input = new CountingInputStream(byteSource.openStream())) {
try (CountingInputStream input = new CountingInputStream(responseBody.byteStream())) {
metricsItr = prometheusResultsInStandardizedForm(new PrometheusQueryResponseParse(input).getResults()).iterator();
totalBytes = input.getCount();
}
Expand Down Expand Up @@ -284,5 +286,8 @@ private static List<Object> getArrayFromBlock(Type elementType, Block block)
}

@Override
public void close() {}
public void close()
{
closeResponse.run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
package io.trino.plugin.prometheus;

import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteSource;
import io.trino.spi.connector.RecordCursor;
import io.trino.spi.connector.RecordSet;
import io.trino.spi.type.Type;
import okhttp3.ResponseBody;

import java.net.URI;
import java.util.List;
Expand All @@ -29,7 +29,7 @@ public class PrometheusRecordSet
{
private final List<PrometheusColumnHandle> columnHandles;
private final List<Type> columnTypes;
private final ByteSource byteSource;
private final ResponseBody responseBody;

public PrometheusRecordSet(PrometheusClient prometheusClient, PrometheusSplit split, List<PrometheusColumnHandle> columnHandles)
{
Expand All @@ -43,7 +43,7 @@ public PrometheusRecordSet(PrometheusClient prometheusClient, PrometheusSplit sp
}
this.columnTypes = types.build();

this.byteSource = ByteSource.wrap(prometheusClient.fetchUri(URI.create(split.getUri())));
this.responseBody = prometheusClient.fetchUri(URI.create(split.getUri()));
}

@Override
Expand All @@ -55,6 +55,6 @@ public List<Type> getColumnTypes()
@Override
public RecordCursor cursor()
{
return new PrometheusRecordCursor(columnHandles, byteSource);
return new PrometheusRecordCursor(columnHandles, responseBody);
}
}

0 comments on commit 92d301c

Please sign in to comment.