Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Remove stream ttl in client library, since there is no very clear TTL defined. #627

Merged
merged 4 commits into from
Oct 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion google-cloud-bigquerystorage/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,14 @@
<className>com/google/cloud/bigquery/storage/v1alpha2/BQTableSchemaToProtoDescriptor</className>
<method>com.google.protobuf.Descriptors$Descriptor ConvertBQTableSchemaToProtoDescriptor(com.google.cloud.bigquery.storage.v1alpha2.Table$TableSchema)</method>
</difference>
</differences>
<difference>
<className>com/google/cloud/bigquery/storage/v1alpha2/JsonStreamWriter</className>
<differenceType>7002</differenceType>
<method>java.lang.Boolean expired()</method>
</difference>
<difference>
<className>com/google/cloud/bigquery/storage/v1alpha2/StreamWriter</className>
<differenceType>7002</differenceType>
<method>java.lang.Boolean expired()</method>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,6 @@ public void close() {
this.streamWriter.close();
}

/** Returns if a stream has expired. */
public Boolean expired() {
return this.streamWriter.expired();
}

private class JsonStreamWriterOnSchemaUpdateRunnable extends OnSchemaUpdateRunnable {
private JsonStreamWriter jsonStreamWriter;
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,7 @@ public JsonStreamWriter getTableWriter(String tableName)
synchronized (this) {
writer = jsonWriterCache.getIfPresent(tableName);
if (writer != null) {
if (!writer.expired()) {
return writer;
} else {
writer.close();
}
return writer;
}
writeStream = CreateNewWriteStream(tableName);
writer = CreateNewWriter(writeStream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.threeten.bp.Duration;
import org.threeten.bp.Instant;

/**
* A BigQuery Stream Writer that can be used to write data into BigQuery Table.
Expand Down Expand Up @@ -118,9 +117,6 @@ public class StreamWriter implements AutoCloseable {
private final AtomicBoolean activeAlarm;
private ScheduledFuture<?> currentAlarmFuture;

private Instant createTime;
private Duration streamTTL = Duration.ofDays(1);

private Integer currentRetries = 0;

// Used for schema updates
Expand Down Expand Up @@ -182,19 +178,6 @@ private StreamWriter(Builder builder)
}

refreshAppend();
Stream.WriteStream stream =
stub.getWriteStream(Storage.GetWriteStreamRequest.newBuilder().setName(streamName).build());
createTime =
Instant.ofEpochSecond(
stream.getCreateTime().getSeconds(), stream.getCreateTime().getNanos());
if (stream.getType() == Stream.WriteStream.Type.PENDING && stream.hasCommitTime()) {
throw new IllegalStateException(
"Cannot write to a stream that is already committed: " + streamName);
}
if (createTime.plus(streamTTL).compareTo(Instant.now()) < 0) {
throw new IllegalStateException(
"Cannot write to a stream that is already expired: " + streamName);
}
}

/** Stream name we are writing to. */
Expand All @@ -212,11 +195,6 @@ OnSchemaUpdateRunnable getOnSchemaUpdateRunnable() {
return this.onSchemaUpdateRunnable;
}

/** Returns if a stream has expired. */
public Boolean expired() {
return createTime.plus(streamTTL).compareTo(Instant.now()) < 0;
}

private void setException(Throwable t) {
exceptionLock.lock();
if (this.streamException == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,7 @@ public StreamWriter getTableWriter(String tableName, Descriptor userSchema)
if (tableEntry != null) {
writer = tableEntry.getIfPresent(userSchema);
if (writer != null) {
if (!writer.expired()) {
return writer;
} else {
writer.close();
}
return writer;
}
compat.check(tableName, userSchema);
streamName = CreateNewStream(tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.google.cloud.bigquery.storage.v1alpha2.Storage.AppendRowsRequest;
import com.google.common.collect.Sets;
import com.google.protobuf.AbstractMessage;
import com.google.protobuf.Timestamp;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
Expand All @@ -50,7 +49,6 @@
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.threeten.bp.Instant;

@RunWith(JUnit4.class)
public class DirectWriterTest {
Expand Down Expand Up @@ -115,18 +113,6 @@ void WriterCreationResponseMock(String testStreamName, Set<Long> responseOffsets
Stream.WriteStream.newBuilder().setName(testStreamName).build();
mockBigQueryWrite.addResponse(expectedResponse);

// Response from GetWriteStream
Instant time = Instant.now();
Timestamp timestamp =
Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano()).build();
Stream.WriteStream expectedResponse2 =
Stream.WriteStream.newBuilder()
.setName(testStreamName)
.setType(Stream.WriteStream.Type.COMMITTED)
.setCreateTime(timestamp)
.build();
mockBigQueryWrite.addResponse(expectedResponse2);

for (Long offset : responseOffsets) {
Storage.AppendRowsResponse response =
Storage.AppendRowsResponse.newBuilder().setOffset(offset).build();
Expand All @@ -144,18 +130,6 @@ void JsonWriterCreationResponseMock(String testStreamName, Set<Long> responseOff
.build();
mockBigQueryWrite.addResponse(expectedResponse);

// Response from GetWriteStream
Instant time = Instant.now();
Timestamp timestamp =
Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano()).build();
Stream.WriteStream expectedResponse2 =
Stream.WriteStream.newBuilder()
.setName(testStreamName)
.setType(Stream.WriteStream.Type.COMMITTED)
.setCreateTime(timestamp)
.build();
mockBigQueryWrite.addResponse(expectedResponse2);

for (Long offset : responseOffsets) {
Storage.AppendRowsResponse response =
Storage.AppendRowsResponse.newBuilder().setOffset(offset).build();
Expand Down Expand Up @@ -183,19 +157,15 @@ public void testJsonWriteSuccess() throws Exception {
ApiFuture<Long> ret = DirectWriter.append(TEST_TABLE, jsonArr);
assertEquals(Long.valueOf(0L), ret.get());
List<AbstractMessage> actualRequests = mockBigQueryWrite.getRequests();
Assert.assertEquals(3, actualRequests.size());
Assert.assertEquals(2, actualRequests.size());
assertEquals(
TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getParent());
assertEquals(
Stream.WriteStream.Type.COMMITTED,
((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getWriteStream().getType());
assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(1)).getName());
assertEquals(
m1.toByteString(),
((AppendRowsRequest) actualRequests.get(2)).getProtoRows().getRows().getSerializedRows(0));
((AppendRowsRequest) actualRequests.get(1)).getProtoRows().getRows().getSerializedRows(0));
assertEquals(
m2.toByteString(),
((AppendRowsRequest) actualRequests.get(2)).getProtoRows().getRows().getSerializedRows(1));
((AppendRowsRequest) actualRequests.get(1)).getProtoRows().getRows().getSerializedRows(1));

Storage.AppendRowsResponse response =
Storage.AppendRowsResponse.newBuilder().setOffset(2).build();
Expand All @@ -205,7 +175,7 @@ public void testJsonWriteSuccess() throws Exception {
assertEquals(Long.valueOf(2L), ret.get());
assertEquals(
m1.toByteString(),
((AppendRowsRequest) actualRequests.get(3)).getProtoRows().getRows().getSerializedRows(0));
((AppendRowsRequest) actualRequests.get(2)).getProtoRows().getRows().getSerializedRows(0));
DirectWriter.clearCache();
}

Expand All @@ -220,13 +190,9 @@ public void testProtobufWriteSuccess() throws Exception {
verify(schemaCheck).check(TEST_TABLE, FooType.getDescriptor());
assertEquals(Long.valueOf(0L), ret.get());
List<AbstractMessage> actualRequests = mockBigQueryWrite.getRequests();
Assert.assertEquals(3, actualRequests.size());
Assert.assertEquals(2, actualRequests.size());
assertEquals(
TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getParent());
assertEquals(
Stream.WriteStream.Type.COMMITTED,
((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getWriteStream().getType());
assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(1)).getName());

Storage.AppendRowsRequest.ProtoData.Builder dataBuilder =
Storage.AppendRowsRequest.ProtoData.newBuilder();
Expand All @@ -241,7 +207,7 @@ public void testProtobufWriteSuccess() throws Exception {
.setWriteStream(TEST_STREAM)
.setProtoRows(dataBuilder.build())
.build();
assertEquals(expectRequest.toString(), actualRequests.get(2).toString());
assertEquals(expectRequest.toString(), actualRequests.get(1).toString());

Storage.AppendRowsResponse response =
Storage.AppendRowsResponse.newBuilder().setOffset(2).build();
Expand All @@ -254,7 +220,7 @@ public void testProtobufWriteSuccess() throws Exception {
ProtoBufProto.ProtoRows.newBuilder().addSerializedRows(m1.toByteString()).build());
expectRequest =
Storage.AppendRowsRequest.newBuilder().setProtoRows(dataBuilder.build()).build();
assertEquals(expectRequest.toString(), actualRequests.get(3).toString());
assertEquals(expectRequest.toString(), actualRequests.get(2).toString());

// Write with a different schema.
WriterCreationResponseMock(TEST_STREAM_2, Sets.newHashSet(Long.valueOf(0L)));
Expand All @@ -271,14 +237,10 @@ public void testProtobufWriteSuccess() throws Exception {
.setWriteStream(TEST_STREAM_2)
.setProtoRows(dataBuilder.build())
.build();
Assert.assertEquals(7, actualRequests.size());
Assert.assertEquals(5, actualRequests.size());
assertEquals(
TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(4)).getParent());
assertEquals(
Stream.WriteStream.Type.COMMITTED,
((Storage.CreateWriteStreamRequest) actualRequests.get(4)).getWriteStream().getType());
assertEquals(TEST_STREAM_2, ((Storage.GetWriteStreamRequest) actualRequests.get(5)).getName());
assertEquals(expectRequest.toString(), actualRequests.get(6).toString());
TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(3)).getParent());
assertEquals(expectRequest.toString(), actualRequests.get(4).toString());

DirectWriter.clearCache();
}
Expand Down Expand Up @@ -433,13 +395,9 @@ public void testJsonProtobufWrite() throws Exception {
verify(schemaCheck).check(TEST_TABLE, FooType.getDescriptor());
assertEquals(Long.valueOf(0L), ret.get());
List<AbstractMessage> actualRequests = mockBigQueryWrite.getRequests();
Assert.assertEquals(3, actualRequests.size());
Assert.assertEquals(2, actualRequests.size());
assertEquals(
TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getParent());
assertEquals(
Stream.WriteStream.Type.COMMITTED,
((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getWriteStream().getType());
assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(1)).getName());

Storage.AppendRowsRequest.ProtoData.Builder dataBuilder =
Storage.AppendRowsRequest.ProtoData.newBuilder();
Expand All @@ -454,25 +412,19 @@ public void testJsonProtobufWrite() throws Exception {
.setWriteStream(TEST_STREAM)
.setProtoRows(dataBuilder.build())
.build();
assertEquals(expectRequest.toString(), actualRequests.get(2).toString());
assertEquals(expectRequest.toString(), actualRequests.get(1).toString());

JsonWriterCreationResponseMock(TEST_STREAM, Sets.newHashSet(Long.valueOf(0L)));
ret = DirectWriter.append(TEST_TABLE, jsonArr);
assertEquals(Long.valueOf(0L), ret.get());
actualRequests = mockBigQueryWrite.getRequests();
Assert.assertEquals(6, actualRequests.size());
assertEquals(
TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(3)).getParent());
assertEquals(
Stream.WriteStream.Type.COMMITTED,
((Storage.CreateWriteStreamRequest) actualRequests.get(3)).getWriteStream().getType());
assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(4)).getName());
Assert.assertEquals(4, actualRequests.size());
assertEquals(
m1.toByteString(),
((AppendRowsRequest) actualRequests.get(5)).getProtoRows().getRows().getSerializedRows(0));
((AppendRowsRequest) actualRequests.get(3)).getProtoRows().getRows().getSerializedRows(0));
assertEquals(
m2.toByteString(),
((AppendRowsRequest) actualRequests.get(5)).getProtoRows().getRows().getSerializedRows(1));
((AppendRowsRequest) actualRequests.get(3)).getProtoRows().getRows().getSerializedRows(1));

DirectWriter.clearCache();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.cloud.bigquery.storage.test.Test.*;
import com.google.protobuf.AbstractMessage;
import com.google.protobuf.Timestamp;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
Expand All @@ -40,8 +39,6 @@
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.threeten.bp.Instant;
import org.threeten.bp.temporal.ChronoUnit;

@RunWith(JUnit4.class)
public class JsonWriterCacheTest {
Expand Down Expand Up @@ -108,18 +105,6 @@ void WriterCreationResponseMock(String testStreamName) {
.setTableSchema(TABLE_SCHEMA)
.build();
mockBigQueryWrite.addResponse(expectedResponse);

// Response from GetWriteStream
Instant time = Instant.now();
Timestamp timestamp =
Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano()).build();
Stream.WriteStream expectedResponse2 =
Stream.WriteStream.newBuilder()
.setName(testStreamName)
.setType(Stream.WriteStream.Type.COMMITTED)
.setCreateTime(timestamp)
.build();
mockBigQueryWrite.addResponse(expectedResponse2);
}

@After
Expand All @@ -144,50 +129,15 @@ public void testCreateNewWriter() throws Exception {
WriterCreationResponseMock(TEST_STREAM);
JsonStreamWriter writer = cache.getTableWriter(TEST_TABLE);
List<AbstractMessage> actualRequests = mockBigQueryWrite.getRequests();
assertEquals(2, actualRequests.size());
assertEquals(1, actualRequests.size());
assertEquals(
TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getParent());
assertEquals(
Stream.WriteStream.Type.COMMITTED,
((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getWriteStream().getType());
assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(1)).getName());

assertEquals(TEST_STREAM, writer.getStreamName());
assertEquals(1, cache.cachedTableCount());
cache.clear();
}

@Test
public void testWriterExpired() throws Exception {
JsonWriterCache cache = JsonWriterCache.getTestInstance(client, 10);
// Response from CreateWriteStream
Stream.WriteStream expectedResponse =
Stream.WriteStream.newBuilder().setName(TEST_STREAM).build();
mockBigQueryWrite.addResponse(expectedResponse);

// Response from GetWriteStream
Instant time = Instant.now().minus(2, ChronoUnit.DAYS);
Timestamp timestamp =
Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano()).build();
Stream.WriteStream expectedResponse2 =
Stream.WriteStream.newBuilder()
.setName(TEST_STREAM)
.setType(Stream.WriteStream.Type.COMMITTED)
.setCreateTime(timestamp)
.build();
mockBigQueryWrite.addResponse(expectedResponse2);

try {
JsonStreamWriter writer = cache.getTableWriter(TEST_TABLE);
fail("Should fail");
} catch (IllegalStateException e) {
assertEquals(
"Cannot write to a stream that is already expired: projects/p/datasets/d/tables/t/streams/s",
e.getMessage());
}
cache.clear();
}

@Test
public void testWriterWithDifferentTable() throws Exception {
JsonWriterCache cache = JsonWriterCache.getTestInstance(client, 2);
Expand All @@ -197,14 +147,11 @@ public void testWriterWithDifferentTable() throws Exception {
JsonStreamWriter writer2 = cache.getTableWriter(TEST_TABLE_2);

List<AbstractMessage> actualRequests = mockBigQueryWrite.getRequests();
assertEquals(4, actualRequests.size());
assertEquals(2, actualRequests.size());
assertEquals(
TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getParent());
assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(1)).getName());
assertEquals(
TEST_TABLE_2, ((Storage.CreateWriteStreamRequest) actualRequests.get(2)).getParent());
Assert.assertEquals(
TEST_STREAM_21, ((Storage.GetWriteStreamRequest) actualRequests.get(3)).getName());
TEST_TABLE_2, ((Storage.CreateWriteStreamRequest) actualRequests.get(1)).getParent());
assertEquals(TEST_STREAM, writer1.getStreamName());
assertEquals(TEST_STREAM_21, writer2.getStreamName());
assertEquals(2, cache.cachedTableCount());
Expand Down
Loading