Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29486][SQL] CalendarInterval should have 3 fields: months, days and microseconds #26134

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
44d66d6
modify class CalendarInterval and CalendarIntervalSuite
LinhongLiu Oct 11, 2019
4979e1e
modify DateTimeUtils, DateTimeUtilsSuite, TemporalSequenceImpl
LinhongLiu Oct 12, 2019
f09b529
modify CalendarInterval related classes in sql/catalyst
LinhongLiu Oct 12, 2019
0db5b7a
fix test in sql/catalyst
LinhongLiu Oct 12, 2019
ce879c2
fix remaining tests
LinhongLiu Oct 14, 2019
4ee8354
fix failed tests
LinhongLiu Oct 15, 2019
3d954d6
code clean and more bug fix
LinhongLiu Oct 16, 2019
05042e8
address comments and fix tests
LinhongLiu Oct 16, 2019
4c31401
Merge remote-tracking branch 'origin/master' into calendarinterval
LinhongLiu Oct 16, 2019
064be74
fix python
LinhongLiu Oct 18, 2019
211543f
Merge remote-tracking branch 'origin/master' into calendarinterval
LinhongLiu Oct 18, 2019
0778f9a
use 24 hours = 1 day assumption in window, trigger and watermark
LinhongLiu Oct 20, 2019
3a6518a
Merge remote-tracking branch 'origin/master' into calendarinterval
LinhongLiu Oct 20, 2019
4d8383c
streaming changes followup
LinhongLiu Oct 20, 2019
1ac157e
fix conflict
LinhongLiu Oct 20, 2019
aac92bd
Merge remote-tracking branch 'origin/master' into calendarinterval
LinhongLiu Oct 26, 2019
076ce42
fix code sytle and test cases
LinhongLiu Oct 28, 2019
3d62a24
Merge remote-tracking branch 'origin/master' into calendarinterval
LinhongLiu Oct 28, 2019
2edd8a0
change long,long,long to int,int,long when store CanlendarInterval in…
LinhongLiu Oct 28, 2019
4e97bc0
fix comment
LinhongLiu Oct 29, 2019
0e87e2d
Merge remote-tracking branch 'origin/master' into calendarinterval
LinhongLiu Oct 31, 2019
2f90189
address comments
LinhongLiu Nov 1, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.unsafe.types;

import java.io.Serializable;
import java.util.Objects;

/**
* The internal representation of interval type.
Expand All @@ -31,45 +32,50 @@ public final class CalendarInterval implements Serializable {
public static final long MICROS_PER_WEEK = MICROS_PER_DAY * 7;

public final int months;
public final int days;
public final long microseconds;

public long milliseconds() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method will return different values because you exclude days from microseconds

return this.microseconds / MICROS_PER_MILLI;
}

public CalendarInterval(int months, long microseconds) {
public CalendarInterval(int months, int days, long microseconds) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LinhongLiu Could you send out a follow up PR to document why we need days and how do we use it in this file? This is definitely worth to document. Otherwise, everyone reading this class may need to git blame and go through the long discussion in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LinhongLiu Could you send out a follow up PR to document why we need days and how do we use it in this file? This is definitely worth to document. Otherwise, everyone reading this class may need to git blame and go through the long discussion in this PR.

Sure, will do.

this.months = months;
this.days = days;
this.microseconds = microseconds;
}

public CalendarInterval add(CalendarInterval that) {
int months = this.months + that.months;
int days = this.days + that.days;
long microseconds = this.microseconds + that.microseconds;
return new CalendarInterval(months, microseconds);
return new CalendarInterval(months, days, microseconds);
}

public CalendarInterval subtract(CalendarInterval that) {
int months = this.months - that.months;
int days = this.days - that.days;
long microseconds = this.microseconds - that.microseconds;
return new CalendarInterval(months, microseconds);
return new CalendarInterval(months, days, microseconds);
}

public CalendarInterval negate() {
return new CalendarInterval(-this.months, -this.microseconds);
return new CalendarInterval(-this.months, -this.days, -this.microseconds);
}

@Override
public boolean equals(Object other) {
if (this == other) return true;
if (other == null || !(other instanceof CalendarInterval)) return false;

CalendarInterval o = (CalendarInterval) other;
return this.months == o.months && this.microseconds == o.microseconds;
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CalendarInterval that = (CalendarInterval) o;
return months == that.months &&
days == that.days &&
microseconds == that.microseconds;
}

@Override
public int hashCode() {
return 31 * months + (int) microseconds;
return Objects.hash(months, days, microseconds);
}

@Override
Expand All @@ -81,12 +87,13 @@ public String toString() {
appendUnit(sb, months % 12, "month");
}

if (days != 0) {
appendUnit(sb, days / 7, "week");
appendUnit(sb, days % 7, "day");
}

if (microseconds != 0) {
long rest = microseconds;
appendUnit(sb, rest / MICROS_PER_WEEK, "week");
rest %= MICROS_PER_WEEK;
appendUnit(sb, rest / MICROS_PER_DAY, "day");
rest %= MICROS_PER_DAY;
appendUnit(sb, rest / MICROS_PER_HOUR, "hour");
rest %= MICROS_PER_HOUR;
appendUnit(sb, rest / MICROS_PER_MINUTE, "minute");
Expand All @@ -96,7 +103,7 @@ public String toString() {
appendUnit(sb, rest / MICROS_PER_MILLI, "millisecond");
rest %= MICROS_PER_MILLI;
appendUnit(sb, rest, "microsecond");
} else if (months == 0) {
} else if (months == 0 && days == 0) {
sb.append(" 0 microseconds");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,59 +26,72 @@ public class CalendarIntervalSuite {

@Test
public void equalsTest() {
CalendarInterval i1 = new CalendarInterval(3, 123);
CalendarInterval i2 = new CalendarInterval(3, 321);
CalendarInterval i3 = new CalendarInterval(1, 123);
CalendarInterval i4 = new CalendarInterval(3, 123);
CalendarInterval i1 = new CalendarInterval(3, 2, 123);
CalendarInterval i2 = new CalendarInterval(3, 2,321);
CalendarInterval i3 = new CalendarInterval(3, 4,123);
CalendarInterval i4 = new CalendarInterval(1, 2, 123);
CalendarInterval i5 = new CalendarInterval(1, 4, 321);
CalendarInterval i6 = new CalendarInterval(3, 2, 123);

assertNotSame(i1, i2);
assertNotSame(i1, i3);
assertNotSame(i1, i4);
assertNotSame(i2, i3);
assertEquals(i1, i4);
assertNotSame(i2, i4);
assertNotSame(i3, i4);
assertNotSame(i1, i5);
assertEquals(i1, i6);
}

@Test
public void toStringTest() {
CalendarInterval i;

i = new CalendarInterval(0, 0);
i = new CalendarInterval(0, 0, 0);
assertEquals("interval 0 microseconds", i.toString());

i = new CalendarInterval(34, 0);
i = new CalendarInterval(34, 0, 0);
assertEquals("interval 2 years 10 months", i.toString());

i = new CalendarInterval(-34, 0);
i = new CalendarInterval(-34, 0, 0);
assertEquals("interval -2 years -10 months", i.toString());

i = new CalendarInterval(0, 3 * MICROS_PER_WEEK + 13 * MICROS_PER_HOUR + 123);
assertEquals("interval 3 weeks 13 hours 123 microseconds", i.toString());
i = new CalendarInterval(0, 31, 0);
assertEquals("interval 4 weeks 3 days", i.toString());

i = new CalendarInterval(0, -3 * MICROS_PER_WEEK - 13 * MICROS_PER_HOUR - 123);
assertEquals("interval -3 weeks -13 hours -123 microseconds", i.toString());
i = new CalendarInterval(0, -31, 0);
assertEquals("interval -4 weeks -3 days", i.toString());

i = new CalendarInterval(34, 3 * MICROS_PER_WEEK + 13 * MICROS_PER_HOUR + 123);
assertEquals("interval 2 years 10 months 3 weeks 13 hours 123 microseconds", i.toString());
i = new CalendarInterval(0, 0, 3 * MICROS_PER_HOUR + 13 * MICROS_PER_MINUTE + 123);
assertEquals("interval 3 hours 13 minutes 123 microseconds", i.toString());

i = new CalendarInterval(0, 0, -3 * MICROS_PER_HOUR - 13 * MICROS_PER_MINUTE - 123);
assertEquals("interval -3 hours -13 minutes -123 microseconds", i.toString());

i = new CalendarInterval(34, 31, 3 * MICROS_PER_HOUR + 13 * MICROS_PER_MINUTE + 123);
assertEquals("interval 2 years 10 months 4 weeks 3 days 3 hours 13 minutes 123 microseconds",
i.toString());
}

@Test
public void addTest() {
CalendarInterval input1 = new CalendarInterval(3, 1 * MICROS_PER_HOUR);
CalendarInterval input2 = new CalendarInterval(2, 100 * MICROS_PER_HOUR);
assertEquals(input1.add(input2), new CalendarInterval(5, 101 * MICROS_PER_HOUR));
CalendarInterval input1 = new CalendarInterval(3, 1, 1 * MICROS_PER_HOUR);
CalendarInterval input2 = new CalendarInterval(2, 4, 100 * MICROS_PER_HOUR);
assertEquals(input1.add(input2), new CalendarInterval(5, 5, 101 * MICROS_PER_HOUR));

input1 = new CalendarInterval(-10, -81 * MICROS_PER_HOUR);
input2 = new CalendarInterval(75, 200 * MICROS_PER_HOUR);
assertEquals(input1.add(input2), new CalendarInterval(65, 119 * MICROS_PER_HOUR));
input1 = new CalendarInterval(-10, -30, -81 * MICROS_PER_HOUR);
input2 = new CalendarInterval(75, 150, 200 * MICROS_PER_HOUR);
assertEquals(input1.add(input2), new CalendarInterval(65, 120, 119 * MICROS_PER_HOUR));
}

@Test
public void subtractTest() {
CalendarInterval input1 = new CalendarInterval(3, 1 * MICROS_PER_HOUR);
CalendarInterval input2 = new CalendarInterval(2, 100 * MICROS_PER_HOUR);
assertEquals(input1.subtract(input2), new CalendarInterval(1, -99 * MICROS_PER_HOUR));
CalendarInterval input1 = new CalendarInterval(3, 1, 1 * MICROS_PER_HOUR);
CalendarInterval input2 = new CalendarInterval(2, 4, 100 * MICROS_PER_HOUR);
assertEquals(input1.subtract(input2), new CalendarInterval(1, -3, -99 * MICROS_PER_HOUR));

input1 = new CalendarInterval(-10, -81 * MICROS_PER_HOUR);
input2 = new CalendarInterval(75, 200 * MICROS_PER_HOUR);
assertEquals(input1.subtract(input2), new CalendarInterval(-85, -281 * MICROS_PER_HOUR));
input1 = new CalendarInterval(-10, -30, -81 * MICROS_PER_HOUR);
input2 = new CalendarInterval(75, 150, 200 * MICROS_PER_HOUR);
assertEquals(input1.subtract(input2), new CalendarInterval(-85, -180, -281 * MICROS_PER_HOUR));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,10 @@ public CalendarInterval getInterval(int ordinal) {
if (isNullAt(ordinal)) return null;
final long offsetAndSize = getLong(ordinal);
final int offset = (int) (offsetAndSize >> 32);
final int months = (int) Platform.getLong(baseObject, baseOffset + offset);
final int months = Platform.getInt(baseObject, baseOffset + offset);
final int days = Platform.getInt(baseObject, baseOffset + offset + 4);
final long microseconds = Platform.getLong(baseObject, baseOffset + offset + 8);
return new CalendarInterval(months, microseconds);
return new CalendarInterval(months, days, microseconds);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,9 +401,10 @@ public CalendarInterval getInterval(int ordinal) {
} else {
final long offsetAndSize = getLong(ordinal);
final int offset = (int) (offsetAndSize >> 32);
final int months = (int) Platform.getLong(baseObject, baseOffset + offset);
final int months = Platform.getInt(baseObject, baseOffset + offset);
final int days = Platform.getInt(baseObject, baseOffset + offset + 4);
final long microseconds = Platform.getLong(baseObject, baseOffset + offset + 8);
return new CalendarInterval(months, microseconds);
return new CalendarInterval(months, days, microseconds);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,9 @@ public final void write(int ordinal, CalendarInterval input) {
// grow the global buffer before writing data.
grow(16);

// Write the months and microseconds fields of Interval to the variable length portion.
Platform.putLong(getBuffer(), cursor(), input.months);
// Write the months, days and microseconds fields of Interval to the variable length portion.
Platform.putInt(getBuffer(), cursor(), input.months);
Platform.putInt(getBuffer(), cursor() + 4, input.days);
Platform.putLong(getBuffer(), cursor() + 8, input.microseconds);

setOffsetAndSize(ordinal, 16);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,21 +267,24 @@ public final ColumnarRow getStruct(int rowId) {
* Returns the calendar interval type value for rowId. If the slot for rowId is null, it should
* return null.
*
* In Spark, calendar interval type value is basically an integer value representing the number of
* months in this interval, and a long value representing the number of microseconds in this
* interval. An interval type vector is the same as a struct type vector with 2 fields: `months`
* and `microseconds`.
* In Spark, calendar interval type value is basically two integer values representing the number
* of months and days in this interval, and a long value representing the number of microseconds
* in this interval. An interval type vector is the same as a struct type vector with 3 fields:
* `months`, `days` and `microseconds`.
*
* To support interval type, implementations must implement {@link #getChild(int)} and define 2
* To support interval type, implementations must implement {@link #getChild(int)} and define 3
* child vectors: the first child vector is an int type vector, containing all the month values of
* all the interval values in this vector. The second child vector is a long type vector,
* containing all the microsecond values of all the interval values in this vector.
* all the interval values in this vector. The second child vector is an int type vector,
* containing all the day values of all the interval values in this vector. The third child vector
* is a long type vector, containing all the microsecond values of all the interval values in this
* vector.
*/
public final CalendarInterval getInterval(int rowId) {
if (isNullAt(rowId)) return null;
final int months = getChild(0).getInt(rowId);
final long microseconds = getChild(1).getLong(rowId);
return new CalendarInterval(months, microseconds);
final int days = getChild(1).getInt(rowId);
final long microseconds = getChild(2).getLong(rowId);
return new CalendarInterval(months, days, microseconds);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,8 @@ object StreamingJoinHelper extends PredicateHelper with Logging {
s"watermark calculation. Use interval in terms of day instead.")
Literal(0.0)
} else {
Literal(calendarInterval.microseconds.toDouble)
Literal(calendarInterval.days * CalendarInterval.MICROS_PER_DAY.toDouble +
calendarInterval.microseconds.toDouble)
}
case DoubleType =>
Multiply(lit, Literal(1000000.0))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGe
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util.IntervalUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval

case class TimeWindow(
timeColumn: Expression,
Expand Down Expand Up @@ -107,7 +108,7 @@ object TimeWindow {
throw new IllegalArgumentException(
s"Intervals greater than a month is not supported ($interval).")
}
cal.microseconds
cal.days * CalendarInterval.MICROS_PER_DAY + cal.microseconds
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan here we still use a "1 day = 24 hours" assumption. is this OK?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we use interval as a duration, and it's safe to use 1 day = 24 hours (this is always true).

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2610,25 +2610,33 @@ object Sequence {
override val defaultStep: DefaultStep = new DefaultStep(
(dt.ordering.lteq _).asInstanceOf[LessThanOrEqualFn],
CalendarIntervalType,
new CalendarInterval(0, MICROS_PER_DAY))
new CalendarInterval(0, 1, 0))

private val backedSequenceImpl = new IntegralSequenceImpl[T](dt)
private val microsPerMonth = 28 * CalendarInterval.MICROS_PER_DAY
private val microsPerDay = 24 * CalendarInterval.MICROS_PER_HOUR
// We choose a minimum days(28) in one month to calculate the `intervalStepInMicros`
// in order to make sure the estimated array length is long enough
private val microsPerMonth = 28 * microsPerDay
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to the PR but it is interesting why 28 days per months here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the code will use microsPerMonth to estimate the array length first. use the minimum days in one month can make sure the array long enough. (smaller days means smaller steps in micro and means longer array length)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related to this PR, but would be great if we can add a comment to explain it in the code.


override def eval(input1: Any, input2: Any, input3: Any): Array[T] = {
val start = input1.asInstanceOf[T]
val stop = input2.asInstanceOf[T]
val step = input3.asInstanceOf[CalendarInterval]
val stepMonths = step.months
val stepDays = step.days
val stepMicros = step.microseconds

if (stepMonths == 0) {
backedSequenceImpl.eval(start, stop, fromLong(stepMicros / scale))
if (stepMonths == 0 && stepMicros == 0 && scale == MICROS_PER_DAY) {
backedSequenceImpl.eval(start, stop, fromLong(stepDays))

} else if (stepMonths == 0 && stepDays == 0 && scale == 1) {
backedSequenceImpl.eval(start, stop, fromLong(stepMicros))

} else {
// To estimate the resulted array length we need to make assumptions
// about a month length in microseconds
val intervalStepInMicros = stepMicros + stepMonths * microsPerMonth
// about a month length in days and a day length in microseconds
val intervalStepInMicros =
stepMicros + stepMonths * microsPerMonth + stepDays * microsPerDay
val startMicros: Long = num.toLong(start) * scale
val stopMicros: Long = num.toLong(stop) * scale
val maxEstimatedArrayLength =
Expand All @@ -2643,7 +2651,8 @@ object Sequence {
while (t < exclusiveItem ^ stepSign < 0) {
arr(i) = fromLong(t / scale)
i += 1
t = timestampAddInterval(startMicros, i * stepMonths, i * stepMicros, zoneId)
t = timestampAddInterval(
startMicros, i * stepMonths, i * stepDays, i * stepMicros, zoneId)
}

// truncate array to the correct length
Expand All @@ -2659,6 +2668,7 @@ object Sequence {
arr: String,
elemType: String): String = {
val stepMonths = ctx.freshName("stepMonths")
val stepDays = ctx.freshName("stepDays")
val stepMicros = ctx.freshName("stepMicros")
val stepScaled = ctx.freshName("stepScaled")
val intervalInMicros = ctx.freshName("intervalInMicros")
Expand All @@ -2673,18 +2683,21 @@ object Sequence {

val sequenceLengthCode =
s"""
|final long $intervalInMicros = $stepMicros + $stepMonths * ${microsPerMonth}L;
|final long $intervalInMicros =
| $stepMicros + $stepMonths * ${microsPerMonth}L + $stepDays * ${microsPerDay}L;
|${genSequenceLengthCode(ctx, startMicros, stopMicros, intervalInMicros, arrLength)}
""".stripMargin

s"""
|final int $stepMonths = $step.months;
|final int $stepDays = $step.days;
|final long $stepMicros = $step.microseconds;
|
|if ($stepMonths == 0) {
| final $elemType $stepScaled = ($elemType) ($stepMicros / ${scale}L);
| ${backedSequenceImpl.genCode(ctx, start, stop, stepScaled, arr, elemType)};
|if ($stepMonths == 0 && $stepMicros == 0 && ${scale}L == ${MICROS_PER_DAY}L) {
| ${backedSequenceImpl.genCode(ctx, start, stop, stepDays, arr, elemType)};
|
|} else if ($stepMonths == 0 && $stepDays == 0 && ${scale}L == 1) {
| ${backedSequenceImpl.genCode(ctx, start, stop, stepMicros, arr, elemType)};
|} else {
| final long $startMicros = $start * ${scale}L;
| final long $stopMicros = $stop * ${scale}L;
Expand All @@ -2702,7 +2715,7 @@ object Sequence {
| $arr[$i] = ($elemType) ($t / ${scale}L);
| $i += 1;
| $t = org.apache.spark.sql.catalyst.util.DateTimeUtils.timestampAddInterval(
| $startMicros, $i * $stepMonths, $i * $stepMicros, $zid);
| $startMicros, $i * $stepMonths, $i * $stepDays, $i * $stepMicros, $zid);
| }
|
| if ($arr.length > $i) {
Expand Down
Loading