Skip to content

Commit

Permalink
feat: use microseconds precision (#20)
Browse files Browse the repository at this point in the history
For the moment, the backend only supports microseconds precision (we
were using nanoseconds before). This PR changes the splitting logic to
only consider micros for now.
  • Loading branch information
thiagotnunes authored Jun 29, 2021
1 parent f373276 commit a960c6a
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,35 @@
// TODO: This class probably does not belong in the cdc change
public class TimestampConverter {

private static final BigDecimal TEN_TO_THE_NINETH = BigDecimal.valueOf(1_000_000_000L);
private static final BigDecimal TEN_TO_THE_THIRD = BigDecimal.ONE.scaleByPowerOfTen(3);
private static final BigDecimal TEN_TO_THE_SIXTH = BigDecimal.ONE.scaleByPowerOfTen(6);
private static final BigDecimal TEN_TO_THE_NINTH = BigDecimal.ONE.scaleByPowerOfTen(9);

public static BigDecimal timestampToNanos(Timestamp timestamp) {
final BigDecimal seconds = BigDecimal.valueOf(timestamp.getSeconds());
final BigDecimal nanos = BigDecimal.valueOf(timestamp.getNanos());

return seconds.multiply(TEN_TO_THE_NINETH).add(nanos);
return seconds.multiply(TEN_TO_THE_NINTH).add(nanos);
}

public static Timestamp timestampFromNanos(BigDecimal timestampAsNanos) {
final long seconds = timestampAsNanos.divide(TEN_TO_THE_NINETH, RoundingMode.FLOOR).longValue();
final int nanos = timestampAsNanos.remainder(TEN_TO_THE_NINETH).intValue();
final long seconds = timestampAsNanos.divide(TEN_TO_THE_NINTH, RoundingMode.FLOOR).longValue();
final int nanos = timestampAsNanos.remainder(TEN_TO_THE_NINTH).intValue();

return Timestamp.ofTimeSecondsAndNanos(seconds, nanos);
}

public static BigDecimal timestampToMicros(Timestamp timestamp) {
final BigDecimal seconds = BigDecimal.valueOf(timestamp.getSeconds());
final BigDecimal nanos = BigDecimal.valueOf(timestamp.getNanos());
final BigDecimal micros = nanos.divide(TEN_TO_THE_THIRD, RoundingMode.FLOOR);

return seconds
.multiply(TEN_TO_THE_SIXTH)
.add(micros);
}

public static Timestamp timestampFromMicros(BigDecimal timestampAsMicros) {
return Timestamp.ofTimeMicroseconds(timestampAsMicros.longValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
*/
package org.apache.beam.sdk.io.gcp.spanner.cdc.restriction;

import static org.apache.beam.sdk.io.gcp.spanner.cdc.TimestampConverter.timestampFromNanos;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.TimestampConverter.timestampToNanos;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.TimestampConverter.timestampFromMicros;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.TimestampConverter.timestampToMicros;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.restriction.PartitionMode.QUERY_CHANGE_STREAM;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;

Expand Down Expand Up @@ -106,16 +106,16 @@ private SplitResult<PartitionRestriction> splitQueryChangeStream(
final Timestamp startTimestamp = restriction.getStartTimestamp();
final Timestamp endTimestamp = restriction.getEndTimestamp();

final BigDecimal currentNanos = timestampToNanos(lastClaimedPosition.getTimestamp().get());
final BigDecimal endNanos = timestampToNanos(endTimestamp);
final BigDecimal splitPositionNanos =
currentNanos.add(
endNanos
.subtract(currentNanos)
.multiply(BigDecimal.valueOf(fractionOfRemainder))
.max(BigDecimal.ONE));
// FIXME: The backend only supports micros precision for now. Change this to nanos whenever possible
final BigDecimal currentMicros = timestampToMicros(lastClaimedPosition.getTimestamp().get());
final BigDecimal endMicros = timestampToMicros(endTimestamp);
final BigDecimal splitPositionMicros = currentMicros.add(
endMicros
.subtract(currentMicros)
.multiply(BigDecimal.valueOf(fractionOfRemainder))
.max(BigDecimal.ONE));

final Timestamp splitPositionTimestamp = timestampFromNanos(splitPositionNanos);
final Timestamp splitPositionTimestamp = timestampFromMicros(splitPositionMicros);

if (splitPositionTimestamp.compareTo(endTimestamp) > 0) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,71 @@ public void testConvertUnderflowNanosToTimestamp() {
public void testConvertOverflowNanosToTimestamp() {
TimestampConverter.timestampFromNanos(new BigDecimal("253402300800000000000"));
}

@Test
public void testConvertTimestampToMicros() {
final Timestamp timestamp = Timestamp.ofTimeMicroseconds(2_000_360L);

assertEquals(
BigDecimal.valueOf(2_000_360L), TimestampConverter.timestampToMicros(timestamp)
);
}

@Test
public void testConvertTimestampZeroToMicros() {
final Timestamp timestamp = Timestamp.ofTimeMicroseconds(0);

assertEquals(
BigDecimal.valueOf(0), TimestampConverter.timestampToMicros(timestamp)
);
}

@Test
public void testConvertTimestampMinToMicros() {
final Timestamp timestamp = Timestamp.MIN_VALUE;

assertEquals(
new BigDecimal("-62135596800000000"), TimestampConverter.timestampToMicros(timestamp)
);
}

@Test
public void testConvertTimestampMaxToMicros() {
final Timestamp timestamp = Timestamp.MAX_VALUE;

assertEquals(
new BigDecimal("253402300799999999"), TimestampConverter.timestampToMicros(timestamp)
);
}

@Test
public void testConvertMicrosToTimestamp() {
final Timestamp timestamp = Timestamp.ofTimeMicroseconds(2_000_360L);

assertEquals(
timestamp, TimestampConverter.timestampFromMicros(BigDecimal.valueOf(2_000_360L))
);
}

@Test
public void testConvertZeroMicrosToTimestamp() {
final Timestamp timestamp = Timestamp.ofTimeMicroseconds(0);

assertEquals(
timestamp, TimestampConverter.timestampFromMicros(BigDecimal.valueOf(0))
);
}

@Test
public void testConvertMicrosToMinTimestamp() {
assertEquals(
Timestamp.MIN_VALUE,
TimestampConverter.timestampFromMicros(new BigDecimal("-62135596800000000"))
);
}

@Test(expected = IllegalArgumentException.class)
public void testConvertUnderflowMicrosToTimestamp() {
TimestampConverter.timestampFromNanos(new BigDecimal("-62135596800000001"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,34 +61,34 @@ public void testLastClaimedPositionIsNull() {
@Test
public void testQueryChangeStreamWithZeroFractionOfRemainder() {
final PartitionPosition position =
PartitionPosition.queryChangeStream(Timestamp.ofTimeSecondsAndNanos(50L, 25));
PartitionPosition.queryChangeStream(Timestamp.ofTimeMicroseconds(50_000_250L));

final SplitResult<PartitionRestriction> splitResult =
splitter.trySplit(0D, true, position, restriction);

assertEquals(
SplitResult.of(
PartitionRestriction.queryChangeStream(
startTimestamp, Timestamp.ofTimeSecondsAndNanos(50L, 26)),
startTimestamp, Timestamp.ofTimeMicroseconds(50_000_251L)),
PartitionRestriction.queryChangeStream(
Timestamp.ofTimeSecondsAndNanos(50L, 26), endTimestamp)),
Timestamp.ofTimeMicroseconds(50_000_251L), endTimestamp)),
splitResult);
}

@Test
public void testQueryChangeStreamWithNonZeroFractionOfRemainder() {
final PartitionPosition position =
PartitionPosition.queryChangeStream(Timestamp.ofTimeSecondsAndNanos(50L, 25));
final PartitionPosition position = PartitionPosition
.queryChangeStream(Timestamp.ofTimeMicroseconds(50_000_250L));

final SplitResult<PartitionRestriction> splitResult =
splitter.trySplit(0.5D, true, position, restriction);

assertEquals(
SplitResult.of(
PartitionRestriction.queryChangeStream(
startTimestamp, Timestamp.ofTimeSecondsAndNanos(75L, 37)),
startTimestamp, Timestamp.ofTimeMicroseconds(75_000_125L)),
PartitionRestriction.queryChangeStream(
Timestamp.ofTimeSecondsAndNanos(75L, 37), endTimestamp)),
Timestamp.ofTimeMicroseconds(75_000_125L), endTimestamp)),
splitResult);
}

Expand Down

0 comments on commit a960c6a

Please sign in to comment.