From 44d66d6b531c91448d474fc9aac17076147d8825 Mon Sep 17 00:00:00 2001 From: "Liu,Linhong" Date: Fri, 11 Oct 2019 14:09:38 +0800 Subject: [PATCH 01/16] modify class CalendarInterval and CalendarIntervalSuite --- .../spark/unsafe/types/CalendarInterval.java | 73 ++++++---- .../unsafe/types/CalendarIntervalSuite.java | 135 ++++++++++-------- 2 files changed, 121 insertions(+), 87 deletions(-) diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java index 28fb64f7cd0e0..1bcdfb2c7293d 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java @@ -111,14 +111,13 @@ public static CalendarInterval fromCaseInsensitiveString(String s) { } long months = toLong(m.group(1)) * 12 + toLong(m.group(2)); - long microseconds = toLong(m.group(3)) * MICROS_PER_WEEK; - microseconds += toLong(m.group(4)) * MICROS_PER_DAY; - microseconds += toLong(m.group(5)) * MICROS_PER_HOUR; + long days = toLong(m.group(3)) * 7 + toLong(m.group(4)); + long microseconds = toLong(m.group(5)) * MICROS_PER_HOUR; microseconds += toLong(m.group(6)) * MICROS_PER_MINUTE; microseconds += toLong(m.group(7)) * MICROS_PER_SECOND; microseconds += toLong(m.group(8)) * MICROS_PER_MILLI; microseconds += toLong(m.group(9)); - return new CalendarInterval((int) months, microseconds); + return new CalendarInterval((int) months, (int) days, microseconds); } public static long toLongWithRange(String fieldName, @@ -154,7 +153,7 @@ public static CalendarInterval fromYearMonthString(String s) throws IllegalArgum int sign = m.group(1) != null && m.group(1).equals("-") ? -1 : 1; int years = (int) toLongWithRange("year", m.group(2), 0, Integer.MAX_VALUE); int months = (int) toLongWithRange("month", m.group(3), 0, 11); - result = new CalendarInterval(sign * (years * 12 + months), 0); + result = new CalendarInterval(sign * (years * 12 + months), 0, 0); } catch (Exception e) { throw new IllegalArgumentException( "Error parsing interval year-month string: " + e.getMessage(), e); @@ -195,7 +194,7 @@ public static CalendarInterval fromDayTimeString(String s, String from, String t } else { try { int sign = m.group(1) != null && m.group(1).equals("-") ? -1 : 1; - long days = m.group(2) == null ? 0 : toLongWithRange("day", m.group(3), + int days = m.group(2) == null ? 0 : (int) toLongWithRange("day", m.group(3), 0, Integer.MAX_VALUE); long hours = 0; long minutes; @@ -231,8 +230,8 @@ public static CalendarInterval fromDayTimeString(String s, String from, String t throw new IllegalArgumentException( String.format("Cannot support (interval '%s' %s to %s) expression", s, from, to)); } - result = new CalendarInterval(0, sign * ( - days * MICROS_PER_DAY + hours * MICROS_PER_HOUR + minutes * MICROS_PER_MINUTE + + result = new CalendarInterval(0, sign * days, sign * ( + hours * MICROS_PER_HOUR + minutes * MICROS_PER_MINUTE + seconds * MICROS_PER_SECOND + nanos / 1000L)); } catch (Exception e) { throw new IllegalArgumentException( @@ -260,46 +259,46 @@ public static CalendarInterval fromSingleUnitString(String unit, String s) case "year": int year = (int) toLongWithRange("year", m.group(1), Integer.MIN_VALUE / 12, Integer.MAX_VALUE / 12); - result = new CalendarInterval(year * 12, 0L); + result = new CalendarInterval(year * 12, 0, 0L); break; case "month": int month = (int) toLongWithRange("month", m.group(1), Integer.MIN_VALUE, Integer.MAX_VALUE); - result = new CalendarInterval(month, 0L); + result = new CalendarInterval(month, 0, 0L); break; case "week": - long week = toLongWithRange("week", m.group(1), - Long.MIN_VALUE / MICROS_PER_WEEK, Long.MAX_VALUE / MICROS_PER_WEEK); - result = new CalendarInterval(0, week * MICROS_PER_WEEK); + int week = (int) toLongWithRange("week", m.group(1), + Integer.MIN_VALUE / 7, Integer.MAX_VALUE / 7); + result = new CalendarInterval(0, week * 7, 0L); break; case "day": - long day = toLongWithRange("day", m.group(1), - Long.MIN_VALUE / MICROS_PER_DAY, Long.MAX_VALUE / MICROS_PER_DAY); - result = new CalendarInterval(0, day * MICROS_PER_DAY); + int day = (int) toLongWithRange("day", m.group(1), + Integer.MIN_VALUE, Integer.MAX_VALUE); + result = new CalendarInterval(0, day, 0L); break; case "hour": long hour = toLongWithRange("hour", m.group(1), Long.MIN_VALUE / MICROS_PER_HOUR, Long.MAX_VALUE / MICROS_PER_HOUR); - result = new CalendarInterval(0, hour * MICROS_PER_HOUR); + result = new CalendarInterval(0, 0, hour * MICROS_PER_HOUR); break; case "minute": long minute = toLongWithRange("minute", m.group(1), Long.MIN_VALUE / MICROS_PER_MINUTE, Long.MAX_VALUE / MICROS_PER_MINUTE); - result = new CalendarInterval(0, minute * MICROS_PER_MINUTE); + result = new CalendarInterval(0, 0, minute * MICROS_PER_MINUTE); break; case "second": { long micros = parseSecondNano(m.group(1)); - result = new CalendarInterval(0, micros); + result = new CalendarInterval(0, 0, micros); break; } case "millisecond": long millisecond = toLongWithRange("millisecond", m.group(1), Long.MIN_VALUE / MICROS_PER_MILLI, Long.MAX_VALUE / MICROS_PER_MILLI); - result = new CalendarInterval(0, millisecond * MICROS_PER_MILLI); + result = new CalendarInterval(0, 0, millisecond * MICROS_PER_MILLI); break; case "microsecond": { long micros = Long.parseLong(m.group(1)); - result = new CalendarInterval(0, micros); + result = new CalendarInterval(0, 0, micros); break; } } @@ -332,31 +331,42 @@ public static long parseSecondNano(String secondNano) throws IllegalArgumentExce } public final int months; + public final int days; public final long microseconds; public long milliseconds() { return this.microseconds / MICROS_PER_MILLI; } + // TODO: Keep this temporarily to pass compile public CalendarInterval(int months, long microseconds) { this.months = months; + this.days = 0; + this.microseconds = microseconds; + } + + public CalendarInterval(int months, int days, long microseconds) { + this.months = months; + this.days = days; this.microseconds = microseconds; } public CalendarInterval add(CalendarInterval that) { int months = this.months + that.months; + int days = this.days + that.days; long microseconds = this.microseconds + that.microseconds; - return new CalendarInterval(months, microseconds); + return new CalendarInterval(months, days, microseconds); } public CalendarInterval subtract(CalendarInterval that) { int months = this.months - that.months; + int days = this.days - that.days; long microseconds = this.microseconds - that.microseconds; - return new CalendarInterval(months, microseconds); + return new CalendarInterval(months, days, microseconds); } public CalendarInterval negate() { - return new CalendarInterval(-this.months, -this.microseconds); + return new CalendarInterval(-this.months, -this.days, -this.microseconds); } @Override @@ -365,12 +375,12 @@ public boolean equals(Object other) { if (other == null || !(other instanceof CalendarInterval)) return false; CalendarInterval o = (CalendarInterval) other; - return this.months == o.months && this.microseconds == o.microseconds; + return this.months == o.months && this.days == o.days && this.microseconds == o.microseconds; } @Override public int hashCode() { - return 31 * months + (int) microseconds; + return 31 * (31 * months + days) + (int) microseconds; } @Override @@ -382,12 +392,13 @@ public String toString() { appendUnit(sb, months % 12, "month"); } + if (days != 0) { + appendUnit(sb, days / 7, "week"); + appendUnit(sb, days % 7, "day"); + } + if (microseconds != 0) { long rest = microseconds; - appendUnit(sb, rest / MICROS_PER_WEEK, "week"); - rest %= MICROS_PER_WEEK; - appendUnit(sb, rest / MICROS_PER_DAY, "day"); - rest %= MICROS_PER_DAY; appendUnit(sb, rest / MICROS_PER_HOUR, "hour"); rest %= MICROS_PER_HOUR; appendUnit(sb, rest / MICROS_PER_MINUTE, "minute"); @@ -397,7 +408,7 @@ public String toString() { appendUnit(sb, rest / MICROS_PER_MILLI, "millisecond"); rest %= MICROS_PER_MILLI; appendUnit(sb, rest, "microsecond"); - } else if (months == 0) { + } else if (months == 0 && days == 0) { sb.append(" 0 microseconds"); } diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java index 587071332ce47..c1f10fed7ad8e 100644 --- a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java +++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java @@ -28,53 +28,66 @@ public class CalendarIntervalSuite { @Test public void equalsTest() { - CalendarInterval i1 = new CalendarInterval(3, 123); - CalendarInterval i2 = new CalendarInterval(3, 321); - CalendarInterval i3 = new CalendarInterval(1, 123); - CalendarInterval i4 = new CalendarInterval(3, 123); + CalendarInterval i1 = new CalendarInterval(3, 2, 123); + CalendarInterval i2 = new CalendarInterval(3, 2,321); + CalendarInterval i3 = new CalendarInterval(3, 4,123); + CalendarInterval i4 = new CalendarInterval(1, 2, 123); + CalendarInterval i5 = new CalendarInterval(1, 4, 321); + CalendarInterval i6 = new CalendarInterval(3, 2, 123); assertNotSame(i1, i2); assertNotSame(i1, i3); + assertNotSame(i1, i4); assertNotSame(i2, i3); - assertEquals(i1, i4); + assertNotSame(i2, i4); + assertNotSame(i3, i4); + assertNotSame(i1, i5); + assertEquals(i1, i6); } @Test public void toStringTest() { CalendarInterval i; - i = new CalendarInterval(0, 0); + i = new CalendarInterval(0, 0, 0); assertEquals("interval 0 microseconds", i.toString()); - i = new CalendarInterval(34, 0); + i = new CalendarInterval(34, 0, 0); assertEquals("interval 2 years 10 months", i.toString()); - i = new CalendarInterval(-34, 0); + i = new CalendarInterval(-34, 0, 0); assertEquals("interval -2 years -10 months", i.toString()); - i = new CalendarInterval(0, 3 * MICROS_PER_WEEK + 13 * MICROS_PER_HOUR + 123); - assertEquals("interval 3 weeks 13 hours 123 microseconds", i.toString()); + i = new CalendarInterval(0, 31, 0); + assertEquals("interval 4 weeks 3 days", i.toString()); - i = new CalendarInterval(0, -3 * MICROS_PER_WEEK - 13 * MICROS_PER_HOUR - 123); - assertEquals("interval -3 weeks -13 hours -123 microseconds", i.toString()); + i = new CalendarInterval(0, -31, 0); + assertEquals("interval -4 weeks -3 days", i.toString()); - i = new CalendarInterval(34, 3 * MICROS_PER_WEEK + 13 * MICROS_PER_HOUR + 123); - assertEquals("interval 2 years 10 months 3 weeks 13 hours 123 microseconds", i.toString()); + i = new CalendarInterval(0, 0, 3 * MICROS_PER_HOUR + 13 * MICROS_PER_MINUTE + 123); + assertEquals("interval 3 hours 13 minutes 123 microseconds", i.toString()); + + i = new CalendarInterval(0, 0, -3 * MICROS_PER_HOUR - 13 * MICROS_PER_MINUTE - 123); + assertEquals("interval -3 hours -13 minutes -123 microseconds", i.toString()); + + i = new CalendarInterval(34, 31, 3 * MICROS_PER_HOUR + 13 * MICROS_PER_MINUTE + 123); + assertEquals("interval 2 years 10 months 4 weeks 3 days 3 hours 13 minutes 123 microseconds", + i.toString()); } @Test public void fromStringTest() { - testSingleUnit("year", 3, 36, 0); - testSingleUnit("month", 3, 3, 0); - testSingleUnit("week", 3, 0, 3 * MICROS_PER_WEEK); - testSingleUnit("day", 3, 0, 3 * MICROS_PER_DAY); - testSingleUnit("hour", 3, 0, 3 * MICROS_PER_HOUR); - testSingleUnit("minute", 3, 0, 3 * MICROS_PER_MINUTE); - testSingleUnit("second", 3, 0, 3 * MICROS_PER_SECOND); - testSingleUnit("millisecond", 3, 0, 3 * MICROS_PER_MILLI); - testSingleUnit("microsecond", 3, 0, 3); - - CalendarInterval result = new CalendarInterval(-5 * 12 + 23, 0); + testSingleUnit("year", 3, 36, 0, 0); + testSingleUnit("month", 3, 3, 0, 0); + testSingleUnit("week", 3, 0, 21, 0); + testSingleUnit("day", 3, 0, 3, 0); + testSingleUnit("hour", 3, 0, 0, 3 * MICROS_PER_HOUR); + testSingleUnit("minute", 3, 0, 0, 3 * MICROS_PER_MINUTE); + testSingleUnit("second", 3, 0, 0, 3 * MICROS_PER_SECOND); + testSingleUnit("millisecond", 3, 0, 0, 3 * MICROS_PER_MILLI); + testSingleUnit("microsecond", 3, 0, 0, 3); + + CalendarInterval result = new CalendarInterval(-5 * 12 + 23, 0, 0); Arrays.asList( "interval -5 years 23 month", " -5 years 23 month", @@ -99,7 +112,8 @@ public void fromStringTest() { @Test public void fromCaseInsensitiveStringTest() { for (String input : new String[]{"5 MINUTES", "5 minutes", "5 Minutes"}) { - assertEquals(fromCaseInsensitiveString(input), new CalendarInterval(0, 5L * 60 * 1_000_000)); + assertEquals(fromCaseInsensitiveString(input), + new CalendarInterval(0, 0, 5L * 60 * 1_000_000)); } for (String input : new String[]{null, "", " "}) { @@ -134,11 +148,11 @@ public void fromYearMonthStringTest() { CalendarInterval i; input = "99-10"; - i = new CalendarInterval(99 * 12 + 10, 0L); + i = new CalendarInterval(99 * 12 + 10, 0, 0L); assertEquals(fromYearMonthString(input), i); input = "-8-10"; - i = new CalendarInterval(-8 * 12 - 10, 0L); + i = new CalendarInterval(-8 * 12 - 10, 0, 0L); assertEquals(fromYearMonthString(input), i); try { @@ -156,17 +170,16 @@ public void fromDayTimeStringTest() { CalendarInterval i; input = "5 12:40:30.999999999"; - i = new CalendarInterval(0, 5 * MICROS_PER_DAY + 12 * MICROS_PER_HOUR + + i = new CalendarInterval(0, 5, 12 * MICROS_PER_HOUR + 40 * MICROS_PER_MINUTE + 30 * MICROS_PER_SECOND + 999999L); assertEquals(fromDayTimeString(input), i); input = "10 0:12:0.888"; - i = new CalendarInterval(0, 10 * MICROS_PER_DAY + 12 * MICROS_PER_MINUTE + - 888 * MICROS_PER_MILLI); + i = new CalendarInterval(0, 10, 12 * MICROS_PER_MINUTE + 888 * MICROS_PER_MILLI); assertEquals(fromDayTimeString(input), i); input = "-3 0:0:0"; - i = new CalendarInterval(0, -3 * MICROS_PER_DAY); + i = new CalendarInterval(0, -3, 0L); assertEquals(fromDayTimeString(input), i); try { @@ -200,11 +213,11 @@ public void fromSingleUnitStringTest() { CalendarInterval i; input = "12"; - i = new CalendarInterval(12 * 12, 0L); + i = new CalendarInterval(12 * 12, 0, 0L); assertEquals(fromSingleUnitString("year", input), i); input = "100"; - i = new CalendarInterval(0, 100 * MICROS_PER_DAY); + i = new CalendarInterval(0, 100, 0L); assertEquals(fromSingleUnitString("day", input), i); input = "1999.38888"; @@ -230,40 +243,41 @@ public void fromSingleUnitStringTest() { @Test public void addTest() { - String input = "interval 3 month 1 hour"; - String input2 = "interval 2 month 100 hour"; + String input = "interval 3 month 1 day 1 hour"; + String input2 = "interval 2 month 4 day 100 hour"; CalendarInterval interval = fromString(input); CalendarInterval interval2 = fromString(input2); - assertEquals(interval.add(interval2), new CalendarInterval(5, 101 * MICROS_PER_HOUR)); + assertEquals(interval.add(interval2), new CalendarInterval(5, 5, 101 * MICROS_PER_HOUR)); - input = "interval -10 month -81 hour"; - input2 = "interval 75 month 200 hour"; + input = "interval -10 month -30 day -81 hour"; + input2 = "interval 75 month 150 day 200 hour"; interval = fromString(input); interval2 = fromString(input2); - assertEquals(interval.add(interval2), new CalendarInterval(65, 119 * MICROS_PER_HOUR)); + assertEquals(interval.add(interval2), new CalendarInterval(65, 120, 119 * MICROS_PER_HOUR)); } @Test public void subtractTest() { - String input = "interval 3 month 1 hour"; - String input2 = "interval 2 month 100 hour"; + String input = "interval 3 month 1 day 1 hour"; + String input2 = "interval 2 month 4 day 100 hour"; CalendarInterval interval = fromString(input); CalendarInterval interval2 = fromString(input2); - assertEquals(interval.subtract(interval2), new CalendarInterval(1, -99 * MICROS_PER_HOUR)); + assertEquals(interval.subtract(interval2), new CalendarInterval(1, -3, -99 * MICROS_PER_HOUR)); - input = "interval -10 month -81 hour"; - input2 = "interval 75 month 200 hour"; + input = "interval -10 month -30 day -81 hour"; + input2 = "interval 75 month 150 day 200 hour"; interval = fromString(input); interval2 = fromString(input2); - assertEquals(interval.subtract(interval2), new CalendarInterval(-85, -281 * MICROS_PER_HOUR)); + assertEquals(interval.subtract(interval2), + new CalendarInterval(-85, -180, -281 * MICROS_PER_HOUR)); } private static void testSingleUnit(String unit, int number, int months, long microseconds) { @@ -276,22 +290,31 @@ private static void testSingleUnit(String unit, int number, int months, long mic }); } + private static void testSingleUnit( + String unit, int number, int months, int days, long microseconds) { + String input1 = "interval " + number + " " + unit; + String input2 = "interval " + number + " " + unit + "s"; + CalendarInterval result = new CalendarInterval(months, days, microseconds); + assertEquals(fromString(input1), result); + assertEquals(fromString(input2), result); + } + @Test public void fromStringCaseSensitivityTest() { - testSingleUnit("YEAR", 3, 36, 0); - testSingleUnit("Month", 3, 3, 0); - testSingleUnit("Week", 3, 0, 3 * MICROS_PER_WEEK); - testSingleUnit("DAY", 3, 0, 3 * MICROS_PER_DAY); - testSingleUnit("HouR", 3, 0, 3 * MICROS_PER_HOUR); - testSingleUnit("MiNuTe", 3, 0, 3 * MICROS_PER_MINUTE); - testSingleUnit("Second", 3, 0, 3 * MICROS_PER_SECOND); - testSingleUnit("MilliSecond", 3, 0, 3 * MICROS_PER_MILLI); - testSingleUnit("MicroSecond", 3, 0, 3); + testSingleUnit("YEAR", 3, 36, 0, 0); + testSingleUnit("Month", 3, 3, 0, 0); + testSingleUnit("Week", 3, 0, 21, 0); + testSingleUnit("DAY", 3, 0, 3, 0); + testSingleUnit("HouR", 3, 0, 0, 3 * MICROS_PER_HOUR); + testSingleUnit("MiNuTe", 3, 0, 0, 3 * MICROS_PER_MINUTE); + testSingleUnit("Second", 3, 0, 0, 3 * MICROS_PER_SECOND); + testSingleUnit("MilliSecond", 3, 0, 0, 3 * MICROS_PER_MILLI); + testSingleUnit("MicroSecond", 3, 0, 0, 3); String input; input = "INTERVAL -5 YEARS 23 MONTHS"; - CalendarInterval result = new CalendarInterval(-5 * 12 + 23, 0); + CalendarInterval result = new CalendarInterval(-5 * 12 + 23, 0, 0); assertEquals(fromString(input), result); assertNull(fromString("INTERVAL")); From 4979e1e373c105f698482d6ddd601a867ec8c199 Mon Sep 17 00:00:00 2001 From: "Liu,Linhong" Date: Sat, 12 Oct 2019 15:03:34 +0800 Subject: [PATCH 02/16] modify DateTimeUtils, DateTimeUtilsSuite, TemporalSequenceImpl --- .../expressions/collectionOperations.scala | 25 +++++++++++------ .../sql/catalyst/util/DateTimeUtils.scala | 18 ++++++++++++ .../catalyst/util/DateTimeUtilsSuite.scala | 28 ++++++++++++++++++- 3 files changed, 61 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala index 57b692ac16068..f095ad33e63e4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala @@ -2610,25 +2610,28 @@ object Sequence { override val defaultStep: DefaultStep = new DefaultStep( (dt.ordering.lteq _).asInstanceOf[LessThanOrEqualFn], CalendarIntervalType, - new CalendarInterval(0, MICROS_PER_DAY)) + new CalendarInterval(0, 1, 0)) private val backedSequenceImpl = new IntegralSequenceImpl[T](dt) - private val microsPerMonth = 28 * CalendarInterval.MICROS_PER_DAY + private val microsPerDay = 24 * CalendarInterval.MICROS_PER_HOUR + private val microsPerMonth = 28 * microsPerDay override def eval(input1: Any, input2: Any, input3: Any): Array[T] = { val start = input1.asInstanceOf[T] val stop = input2.asInstanceOf[T] val step = input3.asInstanceOf[CalendarInterval] val stepMonths = step.months + val stepDays = step.days val stepMicros = step.microseconds - if (stepMonths == 0) { + if (stepMonths == 0 && stepDays == 0) { backedSequenceImpl.eval(start, stop, fromLong(stepMicros / scale)) } else { // To estimate the resulted array length we need to make assumptions - // about a month length in microseconds - val intervalStepInMicros = stepMicros + stepMonths * microsPerMonth + // about a month length in days and a day length in microseconds + val intervalStepInMicros = + stepMicros + stepMonths * microsPerMonth + stepDays * microsPerDay val startMicros: Long = num.toLong(start) * scale val stopMicros: Long = num.toLong(stop) * scale val maxEstimatedArrayLength = @@ -2643,7 +2646,8 @@ object Sequence { while (t < exclusiveItem ^ stepSign < 0) { arr(i) = fromLong(t / scale) i += 1 - t = timestampAddInterval(startMicros, i * stepMonths, i * stepMicros, zoneId) + t = timestampAddInterval( + startMicros, i * stepMonths, i * stepDays, i * stepMicros, zoneId) } // truncate array to the correct length @@ -2659,6 +2663,7 @@ object Sequence { arr: String, elemType: String): String = { val stepMonths = ctx.freshName("stepMonths") + val stepDays = ctx.freshName("stepDays") val stepMicros = ctx.freshName("stepMicros") val stepScaled = ctx.freshName("stepScaled") val intervalInMicros = ctx.freshName("intervalInMicros") @@ -2673,15 +2678,17 @@ object Sequence { val sequenceLengthCode = s""" - |final long $intervalInMicros = $stepMicros + $stepMonths * ${microsPerMonth}L; + |final long $intervalInMicros = + | $stepMicros + $stepMonths * ${microsPerMonth}L + $stepDays * ${microsPerDay}L; |${genSequenceLengthCode(ctx, startMicros, stopMicros, intervalInMicros, arrLength)} """.stripMargin s""" |final int $stepMonths = $step.months; + |final int $stepDays = $step.days; |final long $stepMicros = $step.microseconds; | - |if ($stepMonths == 0) { + |if ($stepMonths == 0 && $stepDays == 0) { | final $elemType $stepScaled = ($elemType) ($stepMicros / ${scale}L); | ${backedSequenceImpl.genCode(ctx, start, stop, stepScaled, arr, elemType)}; | @@ -2702,7 +2709,7 @@ object Sequence { | $arr[$i] = ($elemType) ($t / ${scale}L); | $i += 1; | $t = org.apache.spark.sql.catalyst.util.DateTimeUtils.timestampAddInterval( - | $startMicros, $i * $stepMonths, $i * $stepMicros, $zid); + | $startMicros, $i * $stepMonths, $i * $stepDays, $i * $stepMicros, $zid); | } | | if ($arr.length > $i) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 34e8012106bbe..3e29ab2757d51 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -584,6 +584,24 @@ object DateTimeUtils { instantToMicros(resultTimestamp.toInstant) } + /** + * Add timestamp and full interval. + * Returns a timestamp value, expressed in microseconds since 1.1.1970 00:00:00. + */ + def timestampAddInterval( + start: SQLTimestamp, + months: Int, + days: Int, + microseconds: Long, + zoneId: ZoneId): SQLTimestamp = { + val resultTimestamp = microsToInstant(start) + .atZone(zoneId) + .plusMonths(months) + .plusDays(days) + .plus(microseconds, ChronoUnit.MICROS) + instantToMicros(resultTimestamp.toInstant) + } + /** * Returns number of months between time1 and time2. time1 and time2 are expressed in * microseconds since 1.1.1970. If time1 is later than time2, the result is positive. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala index a9e3c9006a334..d106b95eeb2ae 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala @@ -28,7 +28,7 @@ import org.scalatest.Matchers import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._ import org.apache.spark.sql.catalyst.util.DateTimeUtils._ -import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} class DateTimeUtilsSuite extends SparkFunSuite with Matchers { @@ -382,6 +382,32 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers { assert(timestampAddInterval(ts3, 36, 123000, TimeZoneGMT.toZoneId) === ts5) } + test("timestamp add days") { + // 2019-3-9 is the end of Pacific Standard Time + val ts1 = date(2019, 3, 9, 12, 0, 0, 123000, TimeZonePST) + // 2019-3-10 is the start of Pacific Daylight Time + val ts2 = date(2019, 3, 10, 12, 0, 0, 123000, TimeZonePST) + val ts3 = date(2019, 5, 9, 12, 0, 0, 123000, TimeZonePST) + val ts4 = date(2019, 5, 10, 12, 0, 0, 123000, TimeZonePST) + // 2019-11-2 is the end of Pacific Daylight Time + val ts5 = date(2019, 11, 2, 12, 0, 0, 123000, TimeZonePST) + // 2019-11-3 is the start of Pacific Standard Time + val ts6 = date(2019, 11, 3, 12, 0, 0, 123000, TimeZonePST) + + // transit from Pacific Standard Time to Pacific Daylight Time + assert(timestampAddInterval( + ts1, 0, 0, 23 * CalendarInterval.MICROS_PER_HOUR, TimeZonePST.toZoneId) === ts2) + assert(timestampAddInterval(ts1, 0, 1, 0, TimeZonePST.toZoneId) === ts2) + // just a normal day + assert(timestampAddInterval( + ts3, 0, 0, 24 * CalendarInterval.MICROS_PER_HOUR, TimeZonePST.toZoneId) === ts4) + assert(timestampAddInterval(ts3, 0, 1, 0, TimeZonePST.toZoneId) === ts4) + // transit from Pacific Daylight Time to Pacific Standard Time + assert(timestampAddInterval( + ts5, 0, 0, 25 * CalendarInterval.MICROS_PER_HOUR, TimeZonePST.toZoneId) === ts6) + assert(timestampAddInterval(ts5, 0, 1, 0, TimeZonePST.toZoneId) === ts6) + } + test("monthsBetween") { val date1 = date(1997, 2, 28, 10, 30, 0) var date2 = date(1996, 10, 30) From f09b5296c1c966dc94cd9cf7aad18cc7f099fe21 Mon Sep 17 00:00:00 2001 From: "Liu,Linhong" Date: Sat, 12 Oct 2019 17:45:21 +0800 Subject: [PATCH 03/16] modify CalendarInterval related classes in sql/catalyst --- .../catalyst/expressions/UnsafeArrayData.java | 5 +++-- .../sql/catalyst/expressions/UnsafeRow.java | 5 +++-- .../expressions/codegen/UnsafeWriter.java | 9 ++++---- .../spark/sql/vectorized/ColumnVector.java | 21 +++++++++++-------- .../analysis/StreamingJoinHelper.scala | 6 +++--- .../sql/catalyst/expressions/TimeWindow.scala | 4 ++-- .../expressions/datetimeExpressions.scala | 6 +++--- .../spark/sql/catalyst/expressions/hash.scala | 8 ++++--- .../sql/catalyst/expressions/literals.scala | 2 +- .../plans/logical/EventTimeWatermark.scala | 4 ++-- 10 files changed, 39 insertions(+), 31 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java index db6401b18c0e4..4be665d7722c9 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java @@ -231,8 +231,9 @@ public CalendarInterval getInterval(int ordinal) { final long offsetAndSize = getLong(ordinal); final int offset = (int) (offsetAndSize >> 32); final int months = (int) Platform.getLong(baseObject, baseOffset + offset); - final long microseconds = Platform.getLong(baseObject, baseOffset + offset + 8); - return new CalendarInterval(months, microseconds); + final int days = (int) Platform.getLong(baseObject, baseOffset + offset + 8); + final long microseconds = Platform.getLong(baseObject, baseOffset + offset + 16); + return new CalendarInterval(months, days, microseconds); } @Override diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java index 8fd6029e976ee..812dfa6f6528d 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java @@ -402,8 +402,9 @@ public CalendarInterval getInterval(int ordinal) { final long offsetAndSize = getLong(ordinal); final int offset = (int) (offsetAndSize >> 32); final int months = (int) Platform.getLong(baseObject, baseOffset + offset); - final long microseconds = Platform.getLong(baseObject, baseOffset + offset + 8); - return new CalendarInterval(months, microseconds); + final int days = (int) Platform.getLong(baseObject, baseOffset + offset + 8); + final long microseconds = Platform.getLong(baseObject, baseOffset + offset + 16); + return new CalendarInterval(months, days, microseconds); } } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriter.java index 95263a0da95a8..7f2d7c07d8f30 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriter.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriter.java @@ -132,16 +132,17 @@ private void writeUnalignedBytes( public final void write(int ordinal, CalendarInterval input) { // grow the global buffer before writing data. - grow(16); + grow(24); // Write the months and microseconds fields of Interval to the variable length portion. Platform.putLong(getBuffer(), cursor(), input.months); - Platform.putLong(getBuffer(), cursor() + 8, input.microseconds); + Platform.putLong(getBuffer(), cursor() + 8, input.days); + Platform.putLong(getBuffer(), cursor() + 16, input.microseconds); - setOffsetAndSize(ordinal, 16); + setOffsetAndSize(ordinal, 24); // move the cursor forward. - increaseCursor(16); + increaseCursor(24); } public final void write(int ordinal, UnsafeRow row) { diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java index f18d00359c90c..2158ef8324845 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java @@ -267,21 +267,24 @@ public final ColumnarRow getStruct(int rowId) { * Returns the calendar interval type value for rowId. If the slot for rowId is null, it should * return null. * - * In Spark, calendar interval type value is basically an integer value representing the number of - * months in this interval, and a long value representing the number of microseconds in this - * interval. An interval type vector is the same as a struct type vector with 2 fields: `months` - * and `microseconds`. + * In Spark, calendar interval type value is basically two integer values representing the number + * of months and days in this interval, and a long value representing the number of microseconds + * in this interval. An interval type vector is the same as a struct type vector with 3 fields: + * `months`, `days` and `microseconds`. * - * To support interval type, implementations must implement {@link #getChild(int)} and define 2 + * To support interval type, implementations must implement {@link #getChild(int)} and define 3 * child vectors: the first child vector is an int type vector, containing all the month values of - * all the interval values in this vector. The second child vector is a long type vector, - * containing all the microsecond values of all the interval values in this vector. + * all the interval values in this vector. The second child vector is an int type vector, + * containing all the day values of all the interval values in this vector. The third child vector + * is a long type vector, containing all the microsecond values of all the interval values in this + * vector. */ public final CalendarInterval getInterval(int rowId) { if (isNullAt(rowId)) return null; final int months = getChild(0).getInt(rowId); - final long microseconds = getChild(1).getLong(rowId); - return new CalendarInterval(months, microseconds); + final int days = getChild(1).getInt(rowId); + final long microseconds = getChild(2).getLong(rowId); + return new CalendarInterval(months, days, microseconds); } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala index c1d72f9b58a4b..38075786be642 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala @@ -256,12 +256,12 @@ object StreamingJoinHelper extends PredicateHelper with Logging { val castedLit = lit.dataType match { case CalendarIntervalType => val calendarInterval = lit.value.asInstanceOf[CalendarInterval] - if (calendarInterval.months > 0) { + if (calendarInterval.months > 0 || calendarInterval.days > 0) { invalid = true logWarning( s"Failed to extract state value watermark from condition $exprToCollectFrom " + - s"as imprecise intervals like months and years cannot be used for" + - s"watermark calculation. Use interval in terms of day instead.") + s"as imprecise intervals like days, weeks, months and years cannot be used" + + s"for watermark calculation. Use interval in terms of hour instead.") Literal(0.0) } else { Literal(calendarInterval.microseconds.toDouble) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala index 9aae678deb4bc..610ed15a28e46 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala @@ -103,9 +103,9 @@ object TimeWindow { */ private def getIntervalInMicroSeconds(interval: String): Long = { val cal = CalendarInterval.fromCaseInsensitiveString(interval) - if (cal.months > 0) { + if (cal.months > 0 || cal.days > 0) { throw new IllegalArgumentException( - s"Intervals greater than a month is not supported ($interval).") + s"Intervals greater than a day is not supported ($interval).") } cal.microseconds } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index 5aea884ad5003..560afd3fef533 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -2095,7 +2095,7 @@ case class DatePart(field: Expression, source: Expression, child: Expression) } /** - * Returns the interval from startTimestamp to endTimestamp in which the `months` field + * Returns the interval from startTimestamp to endTimestamp in which the `months` and `day` field * is set to 0 and the `microseconds` field is initialized to the microsecond difference * between the given timestamps. */ @@ -2108,11 +2108,11 @@ case class TimestampDiff(endTimestamp: Expression, startTimestamp: Expression) override def dataType: DataType = CalendarIntervalType override def nullSafeEval(end: Any, start: Any): Any = { - new CalendarInterval(0, end.asInstanceOf[Long] - start.asInstanceOf[Long]) + new CalendarInterval(0, 0, end.asInstanceOf[Long] - start.asInstanceOf[Long]) } override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { defineCodeGen(ctx, ev, (end, start) => - s"new org.apache.spark.unsafe.types.CalendarInterval(0, $end - $start)") + s"new org.apache.spark.unsafe.types.CalendarInterval(0, 0, $end - $start)") } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala index 2aa1e6cc518cb..84beb19782692 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala @@ -495,7 +495,7 @@ abstract class InterpretedHashFunction { val bytes = d.toJavaBigDecimal.unscaledValue().toByteArray hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, bytes.length, seed) } - case c: CalendarInterval => hashInt(c.months, hashLong(c.microseconds, seed)) + case c: CalendarInterval => hashInt(c.months, hashInt(c.days, hashLong(c.microseconds, seed))) case a: Array[Byte] => hashUnsafeBytes(a, Platform.BYTE_ARRAY_OFFSET, a.length, seed) case s: UTF8String => @@ -902,11 +902,13 @@ object HiveHashFunction extends InterpretedHashFunction { * with nanosecond values will lead to wrong output hashes (ie. non adherent with Hive output) */ def hashCalendarInterval(calendarInterval: CalendarInterval): Long = { - val totalSeconds = calendarInterval.microseconds / CalendarInterval.MICROS_PER_SECOND.toInt + val totalMicroSeconds = + calendarInterval.days * CalendarInterval.MICROS_PER_DAY + calendarInterval.microseconds + val totalSeconds = totalMicroSeconds / CalendarInterval.MICROS_PER_SECOND.toInt val result: Int = (17 * 37) + (totalSeconds ^ totalSeconds >> 32).toInt val nanoSeconds = - (calendarInterval.microseconds - + (totalMicroSeconds - (totalSeconds * CalendarInterval.MICROS_PER_SECOND.toInt)).toInt * 1000 (result * 37) + nanoSeconds } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala index f03174babcd9d..02fcca1635fc2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala @@ -162,7 +162,7 @@ object Literal { case TimestampType => create(0L, TimestampType) case StringType => Literal("") case BinaryType => Literal("".getBytes(StandardCharsets.UTF_8)) - case CalendarIntervalType => Literal(new CalendarInterval(0, 0)) + case CalendarIntervalType => Literal(new CalendarInterval(0, 0, 0)) case arr: ArrayType => create(Array(), arr) case map: MapType => create(Map(), map) case struct: StructType => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala index 8441c2c481ec5..a21476b67b65b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala @@ -28,9 +28,9 @@ object EventTimeWatermark { val delayKey = "spark.watermarkDelayMs" def getDelayMs(delay: CalendarInterval): Long = { + val millisPerDay = TimeUnit.MICROSECONDS.toMillis(CalendarInterval.MICROS_PER_DAY) // We define month as `31 days` to simplify calculation. - val millisPerMonth = TimeUnit.MICROSECONDS.toMillis(CalendarInterval.MICROS_PER_DAY) * 31 - delay.milliseconds + delay.months * millisPerMonth + delay.milliseconds + delay.days * millisPerDay + delay.months * millisPerDay * 31 } } From 0db5b7a879494af3a7374bc269fed15a0d9096eb Mon Sep 17 00:00:00 2001 From: "Liu,Linhong" Date: Sat, 12 Oct 2019 17:45:46 +0800 Subject: [PATCH 04/16] fix test in sql/catalyst --- .../expressions/collectionOperations.scala | 14 ++++--- .../expressions/datetimeExpressions.scala | 8 ++-- .../spark/sql/RandomDataGenerator.scala | 3 +- .../catalyst/analysis/TypeCoercionSuite.scala | 2 +- .../sql/catalyst/expressions/CastSuite.scala | 12 +++--- .../expressions/DateExpressionsSuite.scala | 40 +++++++++++++------ .../expressions/LiteralExpressionSuite.scala | 2 +- .../expressions/LiteralGenerator.scala | 4 +- .../optimizer/FilterPushdownSuite.scala | 8 ++-- .../LeftSemiAntiJoinPushDownSuite.scala | 1 - .../parser/ExpressionParserSuite.scala | 7 ++-- .../sql/catalyst/util/UnsafeArraySuite.scala | 3 +- 12 files changed, 62 insertions(+), 42 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala index f095ad33e63e4..f93b9c8071ef4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala @@ -2624,8 +2624,11 @@ object Sequence { val stepDays = step.days val stepMicros = step.microseconds - if (stepMonths == 0 && stepDays == 0) { - backedSequenceImpl.eval(start, stop, fromLong(stepMicros / scale)) + if (stepMonths == 0 && stepMicros == 0 && scale == MICROS_PER_DAY) { + backedSequenceImpl.eval(start, stop, fromLong(stepDays)) + + } else if (stepMonths == 0 && stepDays == 0 && scale == 1) { + backedSequenceImpl.eval(start, stop, fromLong(stepMicros)) } else { // To estimate the resulted array length we need to make assumptions @@ -2688,10 +2691,11 @@ object Sequence { |final int $stepDays = $step.days; |final long $stepMicros = $step.microseconds; | - |if ($stepMonths == 0 && $stepDays == 0) { - | final $elemType $stepScaled = ($elemType) ($stepMicros / ${scale}L); - | ${backedSequenceImpl.genCode(ctx, start, stop, stepScaled, arr, elemType)}; + |if ($stepMonths == 0 && $stepMicros == 0 && ${scale}L == ${MICROS_PER_DAY}L) { + | ${backedSequenceImpl.genCode(ctx, start, stop, stepDays, arr, elemType)}; | + |} else if ($stepMonths == 0 && $stepDays == 0 && ${scale}L == 1) { + | ${backedSequenceImpl.genCode(ctx, start, stop, stepMicros, arr, elemType)}; |} else { | final long $startMicros = $start * ${scale}L; | final long $stopMicros = $stop * ${scale}L; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index 560afd3fef533..977c244480b86 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -1090,14 +1090,14 @@ case class TimeAdd(start: Expression, interval: Expression, timeZoneId: Option[S override def nullSafeEval(start: Any, interval: Any): Any = { val itvl = interval.asInstanceOf[CalendarInterval] DateTimeUtils.timestampAddInterval( - start.asInstanceOf[Long], itvl.months, itvl.microseconds, zoneId) + start.asInstanceOf[Long], itvl.months, itvl.days, itvl.microseconds, zoneId) } override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName) val dtu = DateTimeUtils.getClass.getName.stripSuffix("$") defineCodeGen(ctx, ev, (sd, i) => { - s"""$dtu.timestampAddInterval($sd, $i.months, $i.microseconds, $zid)""" + s"""$dtu.timestampAddInterval($sd, $i.months, $i.days, $i.microseconds, $zid)""" }) } } @@ -1205,14 +1205,14 @@ case class TimeSub(start: Expression, interval: Expression, timeZoneId: Option[S override def nullSafeEval(start: Any, interval: Any): Any = { val itvl = interval.asInstanceOf[CalendarInterval] DateTimeUtils.timestampAddInterval( - start.asInstanceOf[Long], 0 - itvl.months, 0 - itvl.microseconds, zoneId) + start.asInstanceOf[Long], 0 - itvl.months, 0 - itvl.days, 0 - itvl.microseconds, zoneId) } override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName) val dtu = DateTimeUtils.getClass.getName.stripSuffix("$") defineCodeGen(ctx, ev, (sd, i) => { - s"""$dtu.timestampAddInterval($sd, 0 - $i.months, 0 - $i.microseconds, $zid)""" + s"""$dtu.timestampAddInterval($sd, 0 - $i.months, 0 - $i.days, 0 - $i.microseconds, $zid)""" }) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala index d361e6248e2f5..8971e1b501ae6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala @@ -193,8 +193,9 @@ object RandomDataGenerator { Some(generator) case CalendarIntervalType => Some(() => { val months = rand.nextInt(1000) + val days = rand.nextInt(10000) val ns = rand.nextLong() - new CalendarInterval(months, ns) + new CalendarInterval(months, days, ns) }) case DecimalType.Fixed(precision, scale) => Some( () => BigDecimal.apply( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala index f60e0f2bfee6a..9e7261969a7ee 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala @@ -1405,7 +1405,7 @@ class TypeCoercionSuite extends AnalysisTest { val dateTimeOperations = TypeCoercion.DateTimeOperations val date = Literal(new java.sql.Date(0L)) val timestamp = Literal(new Timestamp(0L)) - val interval = Literal(new CalendarInterval(0, 0)) + val interval = Literal(new CalendarInterval(0, 0, 0)) val str = Literal("2015-01-01") val intValue = Literal(0, IntegerType) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala index fc7a0d3af4e28..abeeab6dc7cf4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala @@ -664,16 +664,16 @@ abstract class CastSuiteBase extends SparkFunSuite with ExpressionEvalHelper { import org.apache.spark.unsafe.types.CalendarInterval checkEvaluation(Cast(Literal(""), CalendarIntervalType), null) - checkEvaluation(Cast(Literal("interval -3 month 7 hours"), CalendarIntervalType), - new CalendarInterval(-3, 7 * CalendarInterval.MICROS_PER_HOUR)) + checkEvaluation(Cast(Literal("interval -3 month 1 day 7 hours"), CalendarIntervalType), + new CalendarInterval(-3, 1, 7 * CalendarInterval.MICROS_PER_HOUR)) checkEvaluation(Cast(Literal.create( - new CalendarInterval(15, -3 * CalendarInterval.MICROS_PER_DAY), CalendarIntervalType), + new CalendarInterval(15, 9, -3 * CalendarInterval.MICROS_PER_HOUR), CalendarIntervalType), StringType), - "interval 1 years 3 months -3 days") + "interval 1 years 3 months 1 weeks 2 days -3 hours") checkEvaluation(Cast(Literal("INTERVAL 1 Second 1 microsecond"), CalendarIntervalType), - new CalendarInterval(0, 1000001)) + new CalendarInterval(0, 0, 1000001)) checkEvaluation(Cast(Literal("1 MONTH 1 Microsecond"), CalendarIntervalType), - new CalendarInterval(1, 1)) + new CalendarInterval(1, 0, 1)) } test("cast string to boolean") { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala index 8680a15ee1cd7..d095f3b5f6075 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala @@ -377,15 +377,15 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation( TimeAdd( Literal(new Timestamp(sdf.parse("2016-01-29 10:00:00.000").getTime)), - Literal(new CalendarInterval(1, 123000L)), + Literal(new CalendarInterval(1, 2, 123000L)), timeZoneId), DateTimeUtils.fromJavaTimestamp( - new Timestamp(sdf.parse("2016-02-29 10:00:00.123").getTime))) + new Timestamp(sdf.parse("2016-03-02 10:00:00.123").getTime))) checkEvaluation( TimeAdd( Literal.create(null, TimestampType), - Literal(new CalendarInterval(1, 123000L)), + Literal(new CalendarInterval(1, 2, 123000L)), timeZoneId), null) checkEvaluation( @@ -415,22 +415,36 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation( TimeSub( Literal(new Timestamp(sdf.parse("2016-03-31 10:00:00.000").getTime)), - Literal(new CalendarInterval(1, 0)), + Literal(new CalendarInterval(1, 0, 0)), timeZoneId), DateTimeUtils.fromJavaTimestamp( new Timestamp(sdf.parse("2016-02-29 10:00:00.000").getTime))) + checkEvaluation( + TimeSub( + Literal(new Timestamp(sdf.parse("2016-03-31 10:00:00.000").getTime)), + Literal(new CalendarInterval(1, 1, 0)), + timeZoneId), + DateTimeUtils.fromJavaTimestamp( + new Timestamp(sdf.parse("2016-02-28 10:00:00.000").getTime))) checkEvaluation( TimeSub( Literal(new Timestamp(sdf.parse("2016-03-30 00:00:01.000").getTime)), - Literal(new CalendarInterval(1, 2000000.toLong)), + Literal(new CalendarInterval(1, 0, 2000000.toLong)), timeZoneId), DateTimeUtils.fromJavaTimestamp( new Timestamp(sdf.parse("2016-02-28 23:59:59.000").getTime))) + checkEvaluation( + TimeSub( + Literal(new Timestamp(sdf.parse("2016-03-30 00:00:01.000").getTime)), + Literal(new CalendarInterval(1, 1, 2000000.toLong)), + timeZoneId), + DateTimeUtils.fromJavaTimestamp( + new Timestamp(sdf.parse("2016-02-27 23:59:59.000").getTime))) checkEvaluation( TimeSub( Literal.create(null, TimestampType), - Literal(new CalendarInterval(1, 123000L)), + Literal(new CalendarInterval(1, 2, 123000L)), timeZoneId), null) checkEvaluation( @@ -1073,18 +1087,18 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { test("timestamps difference") { val end = Instant.parse("2019-10-04T11:04:01.123456Z") checkEvaluation(TimestampDiff(Literal(end), Literal(end)), - new CalendarInterval(0, 0)) + new CalendarInterval(0, 0, 0)) checkEvaluation(TimestampDiff(Literal(end), Literal(Instant.EPOCH)), - CalendarInterval.fromString("interval 18173 days " + - "11 hours 4 minutes 1 seconds 123 milliseconds 456 microseconds")) + CalendarInterval.fromString("interval " + + "436163 hours 4 minutes 1 seconds 123 milliseconds 456 microseconds")) checkEvaluation(TimestampDiff(Literal(Instant.EPOCH), Literal(end)), - CalendarInterval.fromString("interval -18173 days " + - "-11 hours -4 minutes -1 seconds -123 milliseconds -456 microseconds")) + CalendarInterval.fromString("interval " + + "-436163 hours -4 minutes -1 seconds -123 milliseconds -456 microseconds")) checkEvaluation( TimestampDiff( Literal(Instant.parse("9999-12-31T23:59:59.999999Z")), Literal(Instant.parse("0001-01-01T00:00:00Z"))), - CalendarInterval.fromString("interval 521722 weeks 4 days " + - "23 hours 59 minutes 59 seconds 999 milliseconds 999 microseconds")) + CalendarInterval.fromString("interval " + + "87649415 hours 59 minutes 59 seconds 999 milliseconds 999 microseconds")) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala index 269f1a09ac533..18385f964930b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala @@ -75,7 +75,7 @@ class LiteralExpressionSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(Literal.default(DateType), LocalDate.ofEpochDay(0)) checkEvaluation(Literal.default(TimestampType), Instant.ofEpochSecond(0)) } - checkEvaluation(Literal.default(CalendarIntervalType), new CalendarInterval(0, 0L)) + checkEvaluation(Literal.default(CalendarIntervalType), new CalendarInterval(0, 0, 0L)) checkEvaluation(Literal.default(ArrayType(StringType)), Array()) checkEvaluation(Literal.default(MapType(IntegerType, StringType)), Map()) checkEvaluation(Literal.default(StructType(StructField("a", StringType) :: Nil)), Row("")) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralGenerator.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralGenerator.scala index b111797c3588e..82bc38ebe037f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralGenerator.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralGenerator.scala @@ -135,10 +135,12 @@ object LiteralGenerator { Instant.parse("0001-01-01T00:00:00.000000Z"), Instant.parse("9999-12-31T23:59:59.999999Z")).getSeconds val maxMicros = TimeUnit.SECONDS.toMicros(maxDurationInSec) + val maxDays = TimeUnit.SECONDS.toDays(maxDurationInSec).toInt for { months <- Gen.choose(-1 * maxIntervalInMonths, maxIntervalInMonths) micros <- Gen.choose(-1 * maxMicros, maxMicros) - } yield Literal.create(new CalendarInterval(months, micros), CalendarIntervalType) + days <- Gen.choose(-1 * maxDays, maxDays) + } yield Literal.create(new CalendarInterval(months, days, micros), CalendarIntervalType) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 3ec8d18bc871d..70e29dca46e9e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -1154,7 +1154,7 @@ class FilterPushdownSuite extends PlanTest { } test("watermark pushdown: no pushdown on watermark attribute #1") { - val interval = new CalendarInterval(2, 2000L) + val interval = new CalendarInterval(2, 2, 2000L) // Verify that all conditions except the watermark touching condition are pushed down // by the optimizer and others are not. @@ -1169,7 +1169,7 @@ class FilterPushdownSuite extends PlanTest { } test("watermark pushdown: no pushdown for nondeterministic filter") { - val interval = new CalendarInterval(2, 2000L) + val interval = new CalendarInterval(2, 2, 2000L) // Verify that all conditions except the watermark touching condition are pushed down // by the optimizer and others are not. @@ -1184,7 +1184,7 @@ class FilterPushdownSuite extends PlanTest { } test("watermark pushdown: full pushdown") { - val interval = new CalendarInterval(2, 2000L) + val interval = new CalendarInterval(2, 2, 2000L) // Verify that all conditions except the watermark touching condition are pushed down // by the optimizer and others are not. @@ -1198,7 +1198,7 @@ class FilterPushdownSuite extends PlanTest { } test("watermark pushdown: no pushdown on watermark attribute #2") { - val interval = new CalendarInterval(2, 2000L) + val interval = new CalendarInterval(2, 2, 2000L) val originalQuery = EventTimeWatermark('a, interval, testRelation) .where('a === 5 && 'b === 10) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala index f6d1898dc64a8..a3da9f73ebd40 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala @@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.types.IntegerType -import org.apache.spark.unsafe.types.CalendarInterval class LeftSemiPushdownSuite extends PlanTest { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala index c2e80c639f43b..e8dc15a5aaed6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala @@ -673,12 +673,11 @@ class ExpressionParserSuite extends AnalysisTest { // Composed intervals. checkIntervals( - "3 months 22 seconds 1 millisecond", - Literal(new CalendarInterval(3, 22001000L))) + "3 months 4 days 22 seconds 1 millisecond", + Literal(new CalendarInterval(3, 4, 22001000L))) checkIntervals( "3 years '-1-10' year to month 3 weeks '1 0:0:2' day to second", - Literal(new CalendarInterval(14, - 22 * CalendarInterval.MICROS_PER_DAY + 2 * CalendarInterval.MICROS_PER_SECOND))) + Literal(new CalendarInterval(14, 22, 2 * CalendarInterval.MICROS_PER_SECOND))) } test("SPARK-23264 Interval Compatibility tests") { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala index 41adf845a6fab..e7b1c0810a033 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala @@ -53,7 +53,8 @@ class UnsafeArraySuite extends SparkFunSuite { BigDecimal("1.2345678901234567890123456").setScale(21, BigDecimal.RoundingMode.FLOOR), BigDecimal("2.3456789012345678901234567").setScale(21, BigDecimal.RoundingMode.FLOOR)) - val calenderintervalArray = Array(new CalendarInterval(3, 321), new CalendarInterval(1, 123)) + val calenderintervalArray = Array( + new CalendarInterval(3, 2, 321), new CalendarInterval(1, 2, 123)) val intMultiDimArray = Array(Array(1), Array(2, 20), Array(3, 30, 300)) val doubleMultiDimArray = Array( From ce879c2cf53d83152563fbdb6225b458d8a2b42b Mon Sep 17 00:00:00 2001 From: "Liu,Linhong" Date: Mon, 14 Oct 2019 09:41:01 +0800 Subject: [PATCH 05/16] fix remaining tests --- .../vectorized/ColumnVectorUtils.java | 3 ++- .../vectorized/WritableColumnVector.java | 7 ++++--- .../vectorized/ColumnarBatchSuite.scala | 20 ++++++++++++++----- 3 files changed, 21 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java index 829f3ce750fe6..bce6aa28c42a1 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java @@ -165,7 +165,8 @@ private static void appendValue(WritableColumnVector dst, DataType t, Object o) CalendarInterval c = (CalendarInterval)o; dst.appendStruct(false); dst.getChild(0).appendInt(c.months); - dst.getChild(1).appendLong(c.microseconds); + dst.getChild(1).appendInt(c.days); + dst.getChild(2).appendLong(c.microseconds); } else if (t instanceof DateType) { dst.appendInt(DateTimeUtils.fromJavaDate((Date)o)); } else { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java index 14fac72847af2..951ca39f1bed0 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java @@ -736,10 +736,11 @@ protected WritableColumnVector(int capacity, DataType type) { this.childColumns[0] = reserveNewColumn(capacity, mapType.keyType()); this.childColumns[1] = reserveNewColumn(capacity, mapType.valueType()); } else if (type instanceof CalendarIntervalType) { - // Two columns. Months as int. Microseconds as Long. - this.childColumns = new WritableColumnVector[2]; + // Three columns. Months as int. Days as Int. Microseconds as Long. + this.childColumns = new WritableColumnVector[3]; this.childColumns[0] = reserveNewColumn(capacity, DataTypes.IntegerType); - this.childColumns[1] = reserveNewColumn(capacity, DataTypes.LongType); + this.childColumns[1] = reserveNewColumn(capacity, DataTypes.IntegerType); + this.childColumns[2] = reserveNewColumn(capacity, DataTypes.LongType); } else { this.childColumns = null; } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index c0f4bb4372bbf..3841516933235 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -636,30 +636,40 @@ class ColumnarBatchSuite extends SparkFunSuite { assert(column.arrayData().elementsAppended == 0) } - testVector("CalendarInterval APIs", 4, CalendarIntervalType) { + testVector("CalendarInterval APIs", 5, CalendarIntervalType) { column => val reference = mutable.ArrayBuffer.empty[CalendarInterval] val months = column.getChild(0) - val microseconds = column.getChild(1) + val days = column.getChild(1) + val microseconds = column.getChild(2) assert(months.dataType() == IntegerType) + assert(days.dataType() == IntegerType) assert(microseconds.dataType() == LongType) months.putInt(0, 1) + days.putInt(0, 10) microseconds.putLong(0, 100) - reference += new CalendarInterval(1, 100) + reference += new CalendarInterval(1, 10, 100) months.putInt(1, 0) + days.putInt(1, 0) microseconds.putLong(1, 2000) - reference += new CalendarInterval(0, 2000) + reference += new CalendarInterval(0, 0, 2000) column.putNull(2) assert(column.getInterval(2) == null) reference += null months.putInt(3, 20) + days.putInt(3, 0) microseconds.putLong(3, 0) - reference += new CalendarInterval(20, 0) + reference += new CalendarInterval(20, 0, 0) + + months.putInt(4, 0) + days.putInt(4, 200) + microseconds.putLong(4, 0) + reference += new CalendarInterval(0, 200, 0) reference.zipWithIndex.foreach { case (v, i) => val errMsg = "VectorType=" + column.getClass.getSimpleName From 4ee83546ec9f8401d21e72e5d0988c8bbad1aa08 Mon Sep 17 00:00:00 2001 From: "Liu,Linhong" Date: Tue, 15 Oct 2019 23:26:37 +0800 Subject: [PATCH 06/16] fix failed tests --- .../sql/catalyst/util/DateTimeUtilsSuite.scala | 6 +++--- .../main/scala/org/apache/spark/sql/Dataset.scala | 2 +- .../org/apache/spark/sql/execution/Columnar.scala | 3 ++- .../sql/execution/streaming/GroupStateImpl.scala | 6 +++--- .../spark/sql/execution/streaming/Triggers.scala | 5 +++-- .../sql-tests/results/ansi/interval.sql.out | 4 ++-- .../resources/sql-tests/results/datetime.sql.out | 4 ++-- .../sql-tests/results/postgreSQL/timestamp.sql.out | 12 ++++++------ .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 14 +++++++------- .../execution/vectorized/ColumnarBatchSuite.scala | 8 ++++---- .../sql/streaming/StreamingAggregationSuite.scala | 14 +++++++------- 11 files changed, 40 insertions(+), 38 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala index d106b95eeb2ae..0981531e64a4b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala @@ -373,13 +373,13 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers { test("timestamp add months") { val ts1 = date(1997, 2, 28, 10, 30, 0) val ts2 = date(2000, 2, 28, 10, 30, 0, 123000) - assert(timestampAddInterval(ts1, 36, 123000, defaultZoneId) === ts2) + assert(timestampAddInterval(ts1, 36, 0, 123000, defaultZoneId) === ts2) val ts3 = date(1997, 2, 27, 16, 0, 0, 0, TimeZonePST) val ts4 = date(2000, 2, 27, 16, 0, 0, 123000, TimeZonePST) val ts5 = date(2000, 2, 28, 0, 0, 0, 123000, TimeZoneGMT) - assert(timestampAddInterval(ts3, 36, 123000, TimeZonePST.toZoneId) === ts4) - assert(timestampAddInterval(ts3, 36, 123000, TimeZoneGMT.toZoneId) === ts5) + assert(timestampAddInterval(ts3, 36, 0, 123000, TimeZonePST.toZoneId) === ts4) + assert(timestampAddInterval(ts3, 36, 0, 123000, TimeZoneGMT.toZoneId) === ts5) } test("timestamp add days") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 076270a9f1c6b..5c1ce16ecfe94 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -731,7 +731,7 @@ class Dataset[T] private[sql]( s"Unable to parse time delay '$delayThreshold'", cause = Some(e)) } - require(parsedDelay.milliseconds >= 0 && parsedDelay.months >= 0, + require(parsedDelay.milliseconds >= 0 && parsedDelay.days >= 0 && parsedDelay.months >= 0, s"delay threshold ($delayThreshold) should not be negative.") EliminateEventTimeWatermark( EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala index 9d1636ccf2718..1f10fe238cbfd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala @@ -323,7 +323,8 @@ private object RowToColumnConverter { val c = row.getInterval(column) cv.appendStruct(false) cv.getChild(0).appendInt(c.months) - cv.getChild(1).appendLong(c.microseconds) + cv.getChild(1).appendInt(c.days) + cv.getChild(2).appendLong(c.microseconds) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala index dda9d41f630e6..584c7df107255 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala @@ -160,12 +160,12 @@ private[sql] class GroupStateImpl[S] private( private def parseDuration(duration: String): Long = { val cal = CalendarInterval.fromCaseInsensitiveString(duration) - if (cal.milliseconds < 0 || cal.months < 0) { + if (cal.milliseconds < 0 || cal.days < 0 || cal.months < 0) { throw new IllegalArgumentException(s"Provided duration ($duration) is not positive") } - val millisPerMonth = TimeUnit.MICROSECONDS.toMillis(CalendarInterval.MICROS_PER_DAY) * 31 - cal.milliseconds + cal.months * millisPerMonth + val millisPerDay = TimeUnit.MICROSECONDS.toMillis(CalendarInterval.MICROS_PER_DAY) + cal.milliseconds + cal.days * millisPerDay + cal.months * millisPerDay * 31 } private def checkTimeoutTimestampAllowed(): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala index 2bdb3402c14b1..a761138551988 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala @@ -31,8 +31,9 @@ private object Triggers { def convert(interval: String): Long = { val cal = CalendarInterval.fromCaseInsensitiveString(interval) - if (cal.months > 0) { - throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval") + if (cal.months > 0 || cal.days > 0) { + throw new IllegalArgumentException( + s"Doesn't support day, week, month or year interval: $interval") } TimeUnit.MICROSECONDS.toMillis(cal.microseconds) } diff --git a/sql/core/src/test/resources/sql-tests/results/ansi/interval.sql.out b/sql/core/src/test/resources/sql-tests/results/ansi/interval.sql.out index 43ad3c3f539f1..fcf6b6b10c8a1 100644 --- a/sql/core/src/test/resources/sql-tests/results/ansi/interval.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/ansi/interval.sql.out @@ -238,7 +238,7 @@ from interval_arithmetic -- !query 17 schema struct -- !query 17 output -2012-01-01 00:00:00 2011-09-23 13:37:26.876544 2012-04-09 12:22:33.123456 2012-04-09 12:22:33.123456 2011-09-23 13:37:26.876544 2011-09-23 13:37:26.876544 2012-04-09 12:22:33.123456 +2012-01-01 00:00:00 2011-09-23 12:37:26.876544 2012-04-09 11:22:33.123456 2012-04-09 11:22:33.123456 2011-09-23 12:37:26.876544 2011-09-23 12:37:26.876544 2012-04-09 11:22:33.123456 -- !query 18 @@ -254,7 +254,7 @@ from interval_arithmetic -- !query 18 schema struct -- !query 18 output -2012-01-01 00:00:00 2011-09-23 13:37:26.876544 2012-04-09 12:22:33.123456 2012-04-09 12:22:33.123456 2011-09-23 13:37:26.876544 2011-09-23 13:37:26.876544 2012-04-09 12:22:33.123456 +2012-01-01 00:00:00 2011-09-23 12:37:26.876544 2012-04-09 11:22:33.123456 2012-04-09 11:22:33.123456 2011-09-23 12:37:26.876544 2011-09-23 12:37:26.876544 2012-04-09 11:22:33.123456 -- !query 19 diff --git a/sql/core/src/test/resources/sql-tests/results/datetime.sql.out b/sql/core/src/test/resources/sql-tests/results/datetime.sql.out index c3c131d22d0fb..4dbf00c3dc7dd 100644 --- a/sql/core/src/test/resources/sql-tests/results/datetime.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/datetime.sql.out @@ -136,7 +136,7 @@ select date'2020-01-01' - timestamp'2019-10-06 10:11:12.345678' -- !query 15 schema struct -- !query 15 output -interval 12 weeks 2 days 14 hours 48 minutes 47 seconds 654 milliseconds 322 microseconds +interval 2078 hours 48 minutes 47 seconds 654 milliseconds 322 microseconds -- !query 16 @@ -144,4 +144,4 @@ select timestamp'2019-10-06 10:11:12.345678' - date'2020-01-01' -- !query 16 schema struct -- !query 16 output -interval -12 weeks -2 days -14 hours -48 minutes -47 seconds -654 milliseconds -322 microseconds +interval -2078 hours -48 minutes -47 seconds -654 milliseconds -322 microseconds diff --git a/sql/core/src/test/resources/sql-tests/results/postgreSQL/timestamp.sql.out b/sql/core/src/test/resources/sql-tests/results/postgreSQL/timestamp.sql.out index 80322fb562895..5a7318023f9e0 100644 --- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/timestamp.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/timestamp.sql.out @@ -219,11 +219,11 @@ SELECT '' AS `54`, d1 - timestamp '1997-01-02' AS diff -- !query 24 schema struct<54:string,diff:interval> -- !query 24 output - interval -1409 weeks -8 hours + interval -236720 hours interval 0 microseconds - interval 246 weeks 2 days 17 hours 19 minutes 20 seconds + interval 41393 hours 19 minutes 20 seconds interval 3 hours 4 minutes 5 seconds - interval 5 weeks 4 days 17 hours 32 minutes 1 seconds + interval 953 hours 32 minutes 1 seconds -- !query 25 @@ -242,11 +242,11 @@ SELECT '' AS `54`, d1 - timestamp '1997-01-02' AS diff -- !query 26 schema struct<54:string,diff:interval> -- !query 26 output - interval -1409 weeks -8 hours + interval -236720 hours interval 0 microseconds - interval 246 weeks 2 days 17 hours 19 minutes 20 seconds + interval 41393 hours 19 minutes 20 seconds interval 3 hours 4 minutes 5 seconds - interval 5 weeks 4 days 17 hours 32 minutes 1 seconds + interval 953 hours 32 minutes 1 seconds -- !query 27 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 1afe3976b2a1a..5ca9cab57e4ce 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1554,7 +1554,7 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession { import org.apache.spark.unsafe.types.CalendarInterval val df = sql("select interval 3 years -3 month 7 week 123 microseconds") - checkAnswer(df, Row(new CalendarInterval(12 * 3 - 3, 7L * 1000 * 1000 * 3600 * 24 * 7 + 123 ))) + checkAnswer(df, Row(new CalendarInterval(12 * 3 - 3, 7 * 7, 123 ))) withTempPath(f => { // Currently we don't yet support saving out values of interval data type. val e = intercept[AnalysisException] { @@ -1580,17 +1580,17 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession { import org.apache.spark.unsafe.types.CalendarInterval.MICROS_PER_WEEK val df = sql("select interval 3 years -3 month 7 week 123 microseconds as i") - checkAnswer(df, Row(new CalendarInterval(12 * 3 - 3, 7L * MICROS_PER_WEEK + 123))) + checkAnswer(df, Row(new CalendarInterval(12 * 3 - 3, 7 * 7, 123))) - checkAnswer(df.select(df("i") + new CalendarInterval(2, 123)), - Row(new CalendarInterval(12 * 3 - 3 + 2, 7L * MICROS_PER_WEEK + 123 + 123))) + checkAnswer(df.select(df("i") + new CalendarInterval(2, 1, 123)), + Row(new CalendarInterval(12 * 3 - 3 + 2, 7 * 7 + 1, 123 + 123))) - checkAnswer(df.select(df("i") - new CalendarInterval(2, 123)), - Row(new CalendarInterval(12 * 3 - 3 - 2, 7L * MICROS_PER_WEEK + 123 - 123))) + checkAnswer(df.select(df("i") - new CalendarInterval(2, 1, 123)), + Row(new CalendarInterval(12 * 3 - 3 - 2, 7 * 7 - 1, 123 - 123))) // unary minus checkAnswer(df.select(-df("i")), - Row(new CalendarInterval(-(12 * 3 - 3), -(7L * MICROS_PER_WEEK + 123)))) + Row(new CalendarInterval(-(12 * 3 - 3), -7 * 7, -123))) } test("aggregation with codegen updates peak execution memory") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index 3841516933235..7c7b5b70323b8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -1321,7 +1321,7 @@ class ColumnarBatchSuite extends SparkFunSuite { Decimal("1234.23456"), DateTimeUtils.fromJavaDate(java.sql.Date.valueOf("2015-01-01")), DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123")), - new CalendarInterval(1, 0), + new CalendarInterval(1, 0, 0), new GenericArrayData(Array(1, 2, 3, 4, null)), new GenericInternalRow(Array[Any](5.asInstanceOf[Any], 10)), mapBuilder.build() @@ -1342,7 +1342,7 @@ class ColumnarBatchSuite extends SparkFunSuite { Decimal("0.01000"), DateTimeUtils.fromJavaDate(java.sql.Date.valueOf("1875-12-12")), DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("1880-01-05 12:45:21.321")), - new CalendarInterval(-10, -100), + new CalendarInterval(-10, -50, -100), new GenericArrayData(Array(5, 10, -100)), new GenericInternalRow(Array[Any](20.asInstanceOf[Any], null)), mapBuilder.build() @@ -1434,8 +1434,8 @@ class ColumnarBatchSuite extends SparkFunSuite { assert(columns(10).isNullAt(2)) assert(columns(11).dataType() == CalendarIntervalType) - assert(columns(11).getInterval(0) == new CalendarInterval(1, 0)) - assert(columns(11).getInterval(1) == new CalendarInterval(-10, -100)) + assert(columns(11).getInterval(0) == new CalendarInterval(1, 0, 0)) + assert(columns(11).getInterval(1) == new CalendarInterval(-10, -50, -100)) assert(columns(11).isNullAt(2)) assert(columns(12).dataType() == ArrayType(IntegerType)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 134e61ed12a21..328b63ee6faf3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -355,18 +355,18 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions { .select( ($"value".cast("long") / DateTimeUtils.SECONDS_PER_DAY).cast("long"), $"count(1)") testStream(aggregated, Complete)( - StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock), + StartStream(Trigger.ProcessingTime("240 hour"), triggerClock = clock), // advance clock to 10 days, should retain all keys AddData(inputData, 0L, 5L, 5L, 10L), - AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), + AdvanceManualClock(DateTimeUtils.MILLIS_PER_HOUR * 240), CheckLastBatch((0L, 1), (5L, 2), (10L, 1)), // advance clock to 20 days, should retain keys >= 10 AddData(inputData, 15L, 15L, 20L), - AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), + AdvanceManualClock(DateTimeUtils.MILLIS_PER_HOUR * 240), CheckLastBatch((10L, 1), (15L, 2), (20L, 1)), // advance clock to 30 days, should retain keys >= 20 AddData(inputData, 85L), - AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), + AdvanceManualClock(DateTimeUtils.MILLIS_PER_HOUR * 240), CheckLastBatch((20L, 1), (85L, 1)), // bounce stream and ensure correct batch timestamp is used @@ -376,16 +376,16 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions { q.sink.asInstanceOf[MemorySink].clear() q.commitLog.purge(3) // advance by 60 days i.e., 90 days total - clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60) + clock.advance(DateTimeUtils.MILLIS_PER_HOUR * 24 * 60) true }, - StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock), + StartStream(Trigger.ProcessingTime("240 hour"), triggerClock = clock), // Commit log blown, causing a re-run of the last batch CheckLastBatch((20L, 1), (85L, 1)), // advance clock to 100 days, should retain keys >= 90 AddData(inputData, 85L, 90L, 100L, 105L), - AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), + AdvanceManualClock(DateTimeUtils.MILLIS_PER_HOUR * 240), CheckLastBatch((90L, 1), (100L, 1), (105L, 1)) ) } From 3d954d6dbf72ae164e873b33f72d57c08361fead Mon Sep 17 00:00:00 2001 From: "Liu,Linhong" Date: Wed, 16 Oct 2019 10:14:16 +0800 Subject: [PATCH 07/16] code clean and more bug fix --- .../spark/unsafe/types/CalendarInterval.java | 7 ------- .../unsafe/types/CalendarIntervalSuite.java | 16 ++++------------ .../spark/sql/catalyst/util/DateTimeUtils.scala | 16 ---------------- .../results/postgreSQL/timestamp.sql.out | 4 ++-- .../apache/spark/sql/DateFunctionsSuite.scala | 16 ++++++++-------- .../sql/catalyst/ExpressionSQLBuilderSuite.scala | 6 +++--- 6 files changed, 17 insertions(+), 48 deletions(-) diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java index 1bcdfb2c7293d..4b841fdd5cc63 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java @@ -338,13 +338,6 @@ public long milliseconds() { return this.microseconds / MICROS_PER_MILLI; } - // TODO: Keep this temporarily to pass compile - public CalendarInterval(int months, long microseconds) { - this.months = months; - this.days = 0; - this.microseconds = microseconds; - } - public CalendarInterval(int months, int days, long microseconds) { this.months = months; this.days = days; diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java index c1f10fed7ad8e..24da2480a2302 100644 --- a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java +++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java @@ -221,7 +221,7 @@ public void fromSingleUnitStringTest() { assertEquals(fromSingleUnitString("day", input), i); input = "1999.38888"; - i = new CalendarInterval(0, 1999 * MICROS_PER_SECOND + 38); + i = new CalendarInterval(0, 0, 1999 * MICROS_PER_SECOND + 38); assertEquals(fromSingleUnitString("second", input), i); try { @@ -280,25 +280,17 @@ public void subtractTest() { new CalendarInterval(-85, -180, -281 * MICROS_PER_HOUR)); } - private static void testSingleUnit(String unit, int number, int months, long microseconds) { + private static void testSingleUnit( + String unit, int number, int months, int days, long microseconds) { Arrays.asList("interval ", "").forEach(prefix -> { String input1 = prefix + number + " " + unit; String input2 = prefix + number + " " + unit + "s"; - CalendarInterval result = new CalendarInterval(months, microseconds); + CalendarInterval result = new CalendarInterval(months, days, microseconds); assertEquals(fromString(input1), result); assertEquals(fromString(input2), result); }); } - private static void testSingleUnit( - String unit, int number, int months, int days, long microseconds) { - String input1 = "interval " + number + " " + unit; - String input2 = "interval " + number + " " + unit + "s"; - CalendarInterval result = new CalendarInterval(months, days, microseconds); - assertEquals(fromString(input1), result); - assertEquals(fromString(input2), result); - } - @Test public void fromStringCaseSensitivityTest() { testSingleUnit("YEAR", 3, 36, 0, 0); diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 3e29ab2757d51..1dfde57ec415c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -568,22 +568,6 @@ object DateTimeUtils { LocalDate.ofEpochDay(days).plusMonths(months).toEpochDay.toInt } - /** - * Add timestamp and full interval. - * Returns a timestamp value, expressed in microseconds since 1.1.1970 00:00:00. - */ - def timestampAddInterval( - start: SQLTimestamp, - months: Int, - microseconds: Long, - zoneId: ZoneId): SQLTimestamp = { - val resultTimestamp = microsToInstant(start) - .atZone(zoneId) - .plusMonths(months) - .plus(microseconds, ChronoUnit.MICROS) - instantToMicros(resultTimestamp.toInstant) - } - /** * Add timestamp and full interval. * Returns a timestamp value, expressed in microseconds since 1.1.1970 00:00:00. diff --git a/sql/core/src/test/resources/sql-tests/results/postgreSQL/timestamp.sql.out b/sql/core/src/test/resources/sql-tests/results/postgreSQL/timestamp.sql.out index 5a7318023f9e0..f8bce0c97c8c5 100644 --- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/timestamp.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/timestamp.sql.out @@ -221,8 +221,8 @@ struct<54:string,diff:interval> -- !query 24 output interval -236720 hours interval 0 microseconds - interval 41393 hours 19 minutes 20 seconds interval 3 hours 4 minutes 5 seconds + interval 41393 hours 19 minutes 20 seconds interval 953 hours 32 minutes 1 seconds @@ -244,8 +244,8 @@ struct<54:string,diff:interval> -- !query 26 output interval -236720 hours interval 0 microseconds - interval 41393 hours 19 minutes 20 seconds interval 3 hours 4 minutes 5 seconds + interval 41393 hours 19 minutes 20 seconds interval 953 hours 32 minutes 1 seconds diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala index 99189a96b2995..2b7cc6f182c9b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala @@ -290,15 +290,15 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { val t2 = Timestamp.valueOf("2015-12-31 00:00:00") val d1 = Date.valueOf("2015-07-31") val d2 = Date.valueOf("2015-12-31") - val i = new CalendarInterval(2, 2000000L) + val i = new CalendarInterval(2, 2, 2000000L) val df = Seq((1, t1, d1), (3, t2, d2)).toDF("n", "t", "d") checkAnswer( df.selectExpr(s"d + $i"), - Seq(Row(Date.valueOf("2015-09-30")), Row(Date.valueOf("2016-02-29")))) + Seq(Row(Date.valueOf("2015-10-02")), Row(Date.valueOf("2016-03-02")))) checkAnswer( df.selectExpr(s"t + $i"), - Seq(Row(Timestamp.valueOf("2015-10-01 00:00:01")), - Row(Timestamp.valueOf("2016-02-29 00:00:02")))) + Seq(Row(Timestamp.valueOf("2015-10-03 00:00:01")), + Row(Timestamp.valueOf("2016-03-02 00:00:02")))) } test("time_sub") { @@ -306,15 +306,15 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { val t2 = Timestamp.valueOf("2016-02-29 00:00:02") val d1 = Date.valueOf("2015-09-30") val d2 = Date.valueOf("2016-02-29") - val i = new CalendarInterval(2, 2000000L) + val i = new CalendarInterval(2, 2, 2000000L) val df = Seq((1, t1, d1), (3, t2, d2)).toDF("n", "t", "d") checkAnswer( df.selectExpr(s"d - $i"), - Seq(Row(Date.valueOf("2015-07-29")), Row(Date.valueOf("2015-12-28")))) + Seq(Row(Date.valueOf("2015-07-27")), Row(Date.valueOf("2015-12-26")))) checkAnswer( df.selectExpr(s"t - $i"), - Seq(Row(Timestamp.valueOf("2015-07-31 23:59:59")), - Row(Timestamp.valueOf("2015-12-29 00:00:00")))) + Seq(Row(Timestamp.valueOf("2015-07-29 23:59:59")), + Row(Timestamp.valueOf("2015-12-27 00:00:00")))) } test("function add_months") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/ExpressionSQLBuilderSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/ExpressionSQLBuilderSuite.scala index ae701f266bf45..fbf77a50c3b1c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/ExpressionSQLBuilderSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/ExpressionSQLBuilderSuite.scala @@ -141,16 +141,16 @@ class ExpressionSQLBuilderSuite extends QueryTest with TestHiveSingleton { } test("interval arithmetic") { - val interval = Literal(new CalendarInterval(0, CalendarInterval.MICROS_PER_DAY)) + val interval = Literal(new CalendarInterval(0, 0, CalendarInterval.MICROS_PER_HOUR)) checkSQL( TimeAdd('a, interval), - "`a` + interval 1 days" + "`a` + interval 1 hours" ) checkSQL( TimeSub('a, interval), - "`a` - interval 1 days" + "`a` - interval 1 hours" ) } } From 05042e89a9a596c9ef93acfc9c282d4f794a177d Mon Sep 17 00:00:00 2001 From: "Liu,Linhong" Date: Wed, 16 Oct 2019 22:25:57 +0800 Subject: [PATCH 08/16] address comments and fix tests --- .../spark/unsafe/types/CalendarInterval.java | 16 +++++++++------- .../expressions/codegen/UnsafeWriter.java | 2 +- .../results/postgreSQL/interval.sql.out | 4 ++-- .../spark/sql/hive/execution/SQLQuerySuite.scala | 2 +- 4 files changed, 13 insertions(+), 11 deletions(-) diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java index 4b841fdd5cc63..237943eabfc1c 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java @@ -18,6 +18,7 @@ package org.apache.spark.unsafe.types; import java.io.Serializable; +import java.util.Objects; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -363,17 +364,18 @@ public CalendarInterval negate() { } @Override - public boolean equals(Object other) { - if (this == other) return true; - if (other == null || !(other instanceof CalendarInterval)) return false; - - CalendarInterval o = (CalendarInterval) other; - return this.months == o.months && this.days == o.days && this.microseconds == o.microseconds; + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CalendarInterval that = (CalendarInterval) o; + return months == that.months && + days == that.days && + microseconds == that.microseconds; } @Override public int hashCode() { - return 31 * (31 * months + days) + (int) microseconds; + return Objects.hash(months, days, microseconds); } @Override diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriter.java index 7f2d7c07d8f30..2580f9f5b6d92 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriter.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriter.java @@ -134,7 +134,7 @@ public final void write(int ordinal, CalendarInterval input) { // grow the global buffer before writing data. grow(24); - // Write the months and microseconds fields of Interval to the variable length portion. + // Write the months, days and microseconds fields of Interval to the variable length portion. Platform.putLong(getBuffer(), cursor(), input.months); Platform.putLong(getBuffer(), cursor() + 8, input.days); Platform.putLong(getBuffer(), cursor() + 16, input.microseconds); diff --git a/sql/core/src/test/resources/sql-tests/results/postgreSQL/interval.sql.out b/sql/core/src/test/resources/sql-tests/results/postgreSQL/interval.sql.out index bed5d7a56c1f8..31e0d65e9a512 100644 --- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/interval.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/interval.sql.out @@ -21,9 +21,9 @@ interval 16 hours 39 minutes -- !query 2 SELECT interval '999' hour -- !query 2 schema -struct +struct -- !query 2 output -interval 5 weeks 6 days 15 hours +interval 999 hours -- !query 3 diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 736a2dcfad297..e445ed28fc4b0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1215,7 +1215,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { checkAnswer(sql("select interval '-100' day"), Row(CalendarInterval.fromString("interval -14 weeks -2 days"))) checkAnswer(sql("select interval '40' hour"), - Row(CalendarInterval.fromString("interval 1 days 16 hours"))) + Row(CalendarInterval.fromString("interval 40 hours"))) checkAnswer(sql("select interval '80' minute"), Row(CalendarInterval.fromString("interval 1 hour 20 minutes"))) checkAnswer(sql("select interval '299.889987299' second"), From 064be74359280d92b32cb8ec176900fafe0cee03 Mon Sep 17 00:00:00 2001 From: "Liu,Linhong" Date: Fri, 18 Oct 2019 22:20:06 +0800 Subject: [PATCH 09/16] fix python --- python/pyspark/sql/tests/test_pandas_udf_grouped_map.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py b/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py index e9a970b581f13..14db441167120 100644 --- a/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py +++ b/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py @@ -539,7 +539,7 @@ def f(pdf): pdf['result'] = [pdf['id']] * len(pdf) return pdf - result = df.groupby('group', window('ts', '5 days')).apply(f)\ + result = df.groupby('group', window('ts', '120 hours')).apply(f)\ .select('id', 'result').collect() for r in result: self.assertListEqual(expected[r[0]], r[1]) @@ -583,7 +583,7 @@ def f(key, pdf): expected[id][1] == window_range)) return pdf.assign(result=is_expected) - result = df.groupby('group', window('ts', '5 days')).apply(f).select('result').collect() + result = df.groupby('group', window('ts', '120 hours')).apply(f).select('result').collect() # Check that all group and window_range values from udf matched expected self.assertTrue(all([r[0] for r in result])) From 0778f9a7c399575906efd316a3bc7ba9446acb79 Mon Sep 17 00:00:00 2001 From: "Liu,Linhong" Date: Sun, 20 Oct 2019 11:01:33 +0800 Subject: [PATCH 10/16] use 24 hours = 1 day assumption in window, trigger and watermark --- .../sql/tests/test_pandas_udf_grouped_map.py | 4 ++-- .../catalyst/analysis/StreamingJoinHelper.scala | 7 ++++--- .../sql/catalyst/expressions/TimeWindow.scala | 6 +++--- .../spark/sql/execution/streaming/Triggers.scala | 4 ++-- .../sql/streaming/StreamingAggregationSuite.scala | 14 +++++++------- 5 files changed, 18 insertions(+), 17 deletions(-) diff --git a/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py b/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py index 14db441167120..e9a970b581f13 100644 --- a/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py +++ b/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py @@ -539,7 +539,7 @@ def f(pdf): pdf['result'] = [pdf['id']] * len(pdf) return pdf - result = df.groupby('group', window('ts', '120 hours')).apply(f)\ + result = df.groupby('group', window('ts', '5 days')).apply(f)\ .select('id', 'result').collect() for r in result: self.assertListEqual(expected[r[0]], r[1]) @@ -583,7 +583,7 @@ def f(key, pdf): expected[id][1] == window_range)) return pdf.assign(result=is_expected) - result = df.groupby('group', window('ts', '120 hours')).apply(f).select('result').collect() + result = df.groupby('group', window('ts', '5 days')).apply(f).select('result').collect() # Check that all group and window_range values from udf matched expected self.assertTrue(all([r[0] for r in result])) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala index 38075786be642..d78a10e1eb2f9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala @@ -260,11 +260,12 @@ object StreamingJoinHelper extends PredicateHelper with Logging { invalid = true logWarning( s"Failed to extract state value watermark from condition $exprToCollectFrom " + - s"as imprecise intervals like days, weeks, months and years cannot be used" + - s"for watermark calculation. Use interval in terms of hour instead.") + s"as imprecise intervals like weeks, months and years cannot be used for" + + s"for watermark calculation. Use interval in terms of day instead.") Literal(0.0) } else { - Literal(calendarInterval.microseconds.toDouble) + Literal(calendarInterval.days * CalendarInterval.MICROS_PER_DAY.toDouble + + calendarInterval.microseconds.toDouble) } case DoubleType => Multiply(lit, Literal(1000000.0)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala index 610ed15a28e46..a6ac8eae5edd2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala @@ -103,11 +103,11 @@ object TimeWindow { */ private def getIntervalInMicroSeconds(interval: String): Long = { val cal = CalendarInterval.fromCaseInsensitiveString(interval) - if (cal.months > 0 || cal.days > 0) { + if (cal.months > 0) { throw new IllegalArgumentException( - s"Intervals greater than a day is not supported ($interval).") + s"Intervals greater than a month is not supported ($interval).") } - cal.microseconds + cal.days * CalendarInterval.MICROS_PER_DAY + cal.microseconds } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala index a761138551988..141c35ba4a3c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala @@ -31,11 +31,11 @@ private object Triggers { def convert(interval: String): Long = { val cal = CalendarInterval.fromCaseInsensitiveString(interval) - if (cal.months > 0 || cal.days > 0) { + if (cal.months > 0) { throw new IllegalArgumentException( s"Doesn't support day, week, month or year interval: $interval") } - TimeUnit.MICROSECONDS.toMillis(cal.microseconds) + TimeUnit.MICROSECONDS.toMillis(cal.microseconds + cal.days * CalendarInterval.MICROS_PER_DAY) } def convert(interval: Duration): Long = interval.toMillis diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 328b63ee6faf3..134e61ed12a21 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -355,18 +355,18 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions { .select( ($"value".cast("long") / DateTimeUtils.SECONDS_PER_DAY).cast("long"), $"count(1)") testStream(aggregated, Complete)( - StartStream(Trigger.ProcessingTime("240 hour"), triggerClock = clock), + StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock), // advance clock to 10 days, should retain all keys AddData(inputData, 0L, 5L, 5L, 10L), - AdvanceManualClock(DateTimeUtils.MILLIS_PER_HOUR * 240), + AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), CheckLastBatch((0L, 1), (5L, 2), (10L, 1)), // advance clock to 20 days, should retain keys >= 10 AddData(inputData, 15L, 15L, 20L), - AdvanceManualClock(DateTimeUtils.MILLIS_PER_HOUR * 240), + AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), CheckLastBatch((10L, 1), (15L, 2), (20L, 1)), // advance clock to 30 days, should retain keys >= 20 AddData(inputData, 85L), - AdvanceManualClock(DateTimeUtils.MILLIS_PER_HOUR * 240), + AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), CheckLastBatch((20L, 1), (85L, 1)), // bounce stream and ensure correct batch timestamp is used @@ -376,16 +376,16 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions { q.sink.asInstanceOf[MemorySink].clear() q.commitLog.purge(3) // advance by 60 days i.e., 90 days total - clock.advance(DateTimeUtils.MILLIS_PER_HOUR * 24 * 60) + clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60) true }, - StartStream(Trigger.ProcessingTime("240 hour"), triggerClock = clock), + StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock), // Commit log blown, causing a re-run of the last batch CheckLastBatch((20L, 1), (85L, 1)), // advance clock to 100 days, should retain keys >= 90 AddData(inputData, 85L, 90L, 100L, 105L), - AdvanceManualClock(DateTimeUtils.MILLIS_PER_HOUR * 240), + AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), CheckLastBatch((90L, 1), (100L, 1), (105L, 1)) ) } From 4d8383c3430db4a0ef2bff3eb93bf80b0c8cdd1c Mon Sep 17 00:00:00 2001 From: "Liu,Linhong" Date: Sun, 20 Oct 2019 11:13:27 +0800 Subject: [PATCH 11/16] streaming changes followup --- .../spark/sql/catalyst/analysis/StreamingJoinHelper.scala | 6 +++--- .../org/apache/spark/sql/execution/streaming/Triggers.scala | 3 +-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala index d78a10e1eb2f9..d9690252a8d63 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala @@ -256,12 +256,12 @@ object StreamingJoinHelper extends PredicateHelper with Logging { val castedLit = lit.dataType match { case CalendarIntervalType => val calendarInterval = lit.value.asInstanceOf[CalendarInterval] - if (calendarInterval.months > 0 || calendarInterval.days > 0) { + if (calendarInterval.months > 0) { invalid = true logWarning( s"Failed to extract state value watermark from condition $exprToCollectFrom " + - s"as imprecise intervals like weeks, months and years cannot be used for" + - s"for watermark calculation. Use interval in terms of day instead.") + s"as imprecise intervals like months and years cannot be used for" + + s"watermark calculation. Use interval in terms of day instead.") Literal(0.0) } else { Literal(calendarInterval.days * CalendarInterval.MICROS_PER_DAY.toDouble + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala index 141c35ba4a3c0..7e90fd1f51971 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala @@ -32,8 +32,7 @@ private object Triggers { def convert(interval: String): Long = { val cal = CalendarInterval.fromCaseInsensitiveString(interval) if (cal.months > 0) { - throw new IllegalArgumentException( - s"Doesn't support day, week, month or year interval: $interval") + throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval") } TimeUnit.MICROSECONDS.toMillis(cal.microseconds + cal.days * CalendarInterval.MICROS_PER_DAY) } From 1ac157eab2467b1b467d92ad35a60ddc0cf8dc11 Mon Sep 17 00:00:00 2001 From: "Liu,Linhong" Date: Sun, 20 Oct 2019 22:23:15 +0800 Subject: [PATCH 12/16] fix conflict --- .../org/apache/spark/sql/catalyst/util/IntervalUtils.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala index 78d188f81f628..0678924df8352 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala @@ -58,7 +58,7 @@ object IntervalUtils { } def getDays(interval: CalendarInterval): Long = { - interval.microseconds / DateTimeUtils.MICROS_PER_DAY + interval.days } def getHours(interval: CalendarInterval): Byte = { @@ -84,6 +84,7 @@ object IntervalUtils { // Returns total number of seconds with microseconds fractional part in the given interval. def getEpoch(interval: CalendarInterval): Decimal = { var result = interval.microseconds + result += DateTimeUtils.MICROS_PER_DAY * interval.days result += MICROS_PER_YEAR * (interval.months / MONTHS_PER_YEAR) result += MICROS_PER_MONTH * (interval.months % MONTHS_PER_YEAR) Decimal(result, 18, 6) From 076ce427d804993820f92e52f1fb82db4783e4e3 Mon Sep 17 00:00:00 2001 From: "Liu,Linhong" Date: Mon, 28 Oct 2019 16:39:26 +0800 Subject: [PATCH 13/16] fix code sytle and test cases --- .../apache/spark/sql/catalyst/util/DateTimeUtils.scala | 10 +++++----- .../test/resources/sql-tests/results/literals.sql.out | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 437fdb799e9fd..649fca38a682b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -573,11 +573,11 @@ object DateTimeUtils { * Returns a timestamp value, expressed in microseconds since 1.1.1970 00:00:00. */ def timestampAddInterval( - start: SQLTimestamp, - months: Int, - days: Int, - microseconds: Long, - zoneId: ZoneId): SQLTimestamp = { + start: SQLTimestamp, + months: Int, + days: Int, + microseconds: Long, + zoneId: ZoneId): SQLTimestamp = { val resultTimestamp = microsToInstant(start) .atZone(zoneId) .plusMonths(months) diff --git a/sql/core/src/test/resources/sql-tests/results/literals.sql.out b/sql/core/src/test/resources/sql-tests/results/literals.sql.out index fd6e51b2385de..aab2c1e04076a 100644 --- a/sql/core/src/test/resources/sql-tests/results/literals.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/literals.sql.out @@ -339,9 +339,9 @@ interval 1 years 2 months 3 weeks 4 days 5 hours 6 minutes 7 seconds 8 milliseco -- !query 36 select interval '30' year '25' month '-100' day '40' hour '80' minute '299.889987299' second -- !query 36 schema -struct +struct -- !query 36 output -interval 32 years 1 months -14 weeks -6 hours -35 minutes -110 milliseconds -13 microseconds +interval 32 years 1 months -14 weeks -2 days 41 hours 24 minutes 59 seconds 889 milliseconds 987 microseconds -- !query 37 From 2edd8a0cffaf004b1c9c3f5ecfa5346736107b82 Mon Sep 17 00:00:00 2001 From: "Liu,Linhong" Date: Mon, 28 Oct 2019 17:22:38 +0800 Subject: [PATCH 14/16] change long,long,long to int,int,long when store CanlendarInterval in UnsafeRow --- .../sql/catalyst/expressions/UnsafeArrayData.java | 6 +++--- .../spark/sql/catalyst/expressions/UnsafeRow.java | 6 +++--- .../catalyst/expressions/codegen/UnsafeWriter.java | 12 ++++++------ .../catalyst/expressions/collectionOperations.scala | 2 ++ 4 files changed, 14 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java index 4be665d7722c9..9e686985b0607 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java @@ -230,9 +230,9 @@ public CalendarInterval getInterval(int ordinal) { if (isNullAt(ordinal)) return null; final long offsetAndSize = getLong(ordinal); final int offset = (int) (offsetAndSize >> 32); - final int months = (int) Platform.getLong(baseObject, baseOffset + offset); - final int days = (int) Platform.getLong(baseObject, baseOffset + offset + 8); - final long microseconds = Platform.getLong(baseObject, baseOffset + offset + 16); + final int months = Platform.getInt(baseObject, baseOffset + offset); + final int days = Platform.getInt(baseObject, baseOffset + offset + 4); + final long microseconds = Platform.getLong(baseObject, baseOffset + offset + 8); return new CalendarInterval(months, days, microseconds); } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java index 812dfa6f6528d..dc6fa30e14f4d 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java @@ -401,9 +401,9 @@ public CalendarInterval getInterval(int ordinal) { } else { final long offsetAndSize = getLong(ordinal); final int offset = (int) (offsetAndSize >> 32); - final int months = (int) Platform.getLong(baseObject, baseOffset + offset); - final int days = (int) Platform.getLong(baseObject, baseOffset + offset + 8); - final long microseconds = Platform.getLong(baseObject, baseOffset + offset + 16); + final int months = Platform.getInt(baseObject, baseOffset + offset); + final int days = Platform.getInt(baseObject, baseOffset + offset + 4); + final long microseconds = Platform.getLong(baseObject, baseOffset + offset + 8); return new CalendarInterval(months, days, microseconds); } } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriter.java index 2580f9f5b6d92..4e4392734184a 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriter.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriter.java @@ -132,17 +132,17 @@ private void writeUnalignedBytes( public final void write(int ordinal, CalendarInterval input) { // grow the global buffer before writing data. - grow(24); + grow(16); // Write the months, days and microseconds fields of Interval to the variable length portion. - Platform.putLong(getBuffer(), cursor(), input.months); - Platform.putLong(getBuffer(), cursor() + 8, input.days); - Platform.putLong(getBuffer(), cursor() + 16, input.microseconds); + Platform.putInt(getBuffer(), cursor(), input.months); + Platform.putInt(getBuffer(), cursor() + 4, input.days); + Platform.putLong(getBuffer(), cursor() + 8, input.microseconds); - setOffsetAndSize(ordinal, 24); + setOffsetAndSize(ordinal, 16); // move the cursor forward. - increaseCursor(24); + increaseCursor(16); } public final void write(int ordinal, UnsafeRow row) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala index f93b9c8071ef4..84cd90ae60da8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala @@ -2633,6 +2633,8 @@ object Sequence { } else { // To estimate the resulted array length we need to make assumptions // about a month length in days and a day length in microseconds + // We choose a minimum days(28) in one month to make sure the estimated + // array length is long enough val intervalStepInMicros = stepMicros + stepMonths * microsPerMonth + stepDays * microsPerDay val startMicros: Long = num.toLong(start) * scale From 4e97bc05f35fc9ddb05e945d1fe1a83f66abd0c7 Mon Sep 17 00:00:00 2001 From: "Liu,Linhong" Date: Tue, 29 Oct 2019 11:00:20 +0800 Subject: [PATCH 15/16] fix comment --- .../expressions/collectionOperations.scala | 4 +-- .../expressions/intervalExpressions.scala | 4 +-- .../sql/catalyst/util/IntervalUtils.scala | 6 ++-- .../IntervalExpressionsSuite.scala | 32 ++++++++++--------- .../sql-tests/results/date_part.sql.out | 16 +++++----- 5 files changed, 32 insertions(+), 30 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala index 84cd90ae60da8..a348f7cd6b699 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala @@ -2614,6 +2614,8 @@ object Sequence { private val backedSequenceImpl = new IntegralSequenceImpl[T](dt) private val microsPerDay = 24 * CalendarInterval.MICROS_PER_HOUR + // We choose a minimum days(28) in one month to calculate the `intervalStepInMicros` + // in order to make sure the estimated array length is long enough private val microsPerMonth = 28 * microsPerDay override def eval(input1: Any, input2: Any, input3: Any): Array[T] = { @@ -2633,8 +2635,6 @@ object Sequence { } else { // To estimate the resulted array length we need to make assumptions // about a month length in days and a day length in microseconds - // We choose a minimum days(28) in one month to make sure the estimated - // array length is long enough val intervalStepInMicros = stepMicros + stepMonths * microsPerMonth + stepDays * microsPerDay val startMicros: Long = num.toLong(start) * scale diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/intervalExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/intervalExpressions.scala index 08360c75a474b..c3a3b3cb58f4f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/intervalExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/intervalExpressions.scala @@ -63,10 +63,10 @@ case class ExtractIntervalMonths(child: Expression) extends ExtractIntervalPart(child, ByteType, getMonths, "getMonths") case class ExtractIntervalDays(child: Expression) - extends ExtractIntervalPart(child, LongType, getDays, "getDays") + extends ExtractIntervalPart(child, IntegerType, getDays, "getDays") case class ExtractIntervalHours(child: Expression) - extends ExtractIntervalPart(child, ByteType, getHours, "getHours") + extends ExtractIntervalPart(child, LongType, getHours, "getHours") case class ExtractIntervalMinutes(child: Expression) extends ExtractIntervalPart(child, ByteType, getMinutes, "getMinutes") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala index a848e7589d39b..2b953ab1adc64 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala @@ -60,12 +60,12 @@ object IntervalUtils { (getMonths(interval) / MONTHS_PER_QUARTER + 1).toByte } - def getDays(interval: CalendarInterval): Long = { + def getDays(interval: CalendarInterval): Int = { interval.days } - def getHours(interval: CalendarInterval): Byte = { - ((interval.microseconds % DateTimeUtils.MICROS_PER_DAY) / MICROS_PER_HOUR).toByte + def getHours(interval: CalendarInterval): Long = { + interval.microseconds / MICROS_PER_HOUR } def getMinutes(interval: CalendarInterval): Byte = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/IntervalExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/IntervalExpressionsSuite.scala index 818ee239dbbf8..0c292e11485aa 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/IntervalExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/IntervalExpressionsSuite.scala @@ -103,25 +103,27 @@ class IntervalExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { "31 days 11 hours 59 minutes 59 seconds 999 milliseconds 999 microseconds" test("days") { - checkEvaluation(ExtractIntervalDays("0 days"), 0L) - checkEvaluation(ExtractIntervalDays("1 days 100 seconds"), 1L) - checkEvaluation(ExtractIntervalDays("-1 days -100 seconds"), -1L) - checkEvaluation(ExtractIntervalDays("-365 days"), -365L) - checkEvaluation(ExtractIntervalDays("365 days"), 365L) + checkEvaluation(ExtractIntervalDays("0 days"), 0) + checkEvaluation(ExtractIntervalDays("1 days 100 seconds"), 1) + checkEvaluation(ExtractIntervalDays("-1 days -100 seconds"), -1) + checkEvaluation(ExtractIntervalDays("-365 days"), -365) + checkEvaluation(ExtractIntervalDays("365 days"), 365) // Years and months must not be taken into account - checkEvaluation(ExtractIntervalDays("100 year 10 months 5 days"), 5L) - checkEvaluation(ExtractIntervalDays(largeInterval), 31L) + checkEvaluation(ExtractIntervalDays("100 year 10 months 5 days"), 5) + checkEvaluation(ExtractIntervalDays(largeInterval), 31) } test("hours") { - checkEvaluation(ExtractIntervalHours("0 hours"), 0.toByte) - checkEvaluation(ExtractIntervalHours("1 hour"), 1.toByte) - checkEvaluation(ExtractIntervalHours("-1 hour"), -1.toByte) - checkEvaluation(ExtractIntervalHours("23 hours"), 23.toByte) - checkEvaluation(ExtractIntervalHours("-23 hours"), -23.toByte) - // Years and months must not be taken into account - checkEvaluation(ExtractIntervalHours("100 year 10 months 10 hours"), 10.toByte) - checkEvaluation(ExtractIntervalHours(largeInterval), 11.toByte) + checkEvaluation(ExtractIntervalHours("0 hours"), 0L) + checkEvaluation(ExtractIntervalHours("1 hour"), 1L) + checkEvaluation(ExtractIntervalHours("-1 hour"), -1L) + checkEvaluation(ExtractIntervalHours("23 hours"), 23L) + checkEvaluation(ExtractIntervalHours("-23 hours"), -23L) + // Years, months and days must not be taken into account + checkEvaluation(ExtractIntervalHours("100 year 10 months 10 days 10 hours"), 10L) + // Minutes should be taken into account + checkEvaluation(ExtractIntervalHours("10 hours 100 minutes"), 11L) + checkEvaluation(ExtractIntervalHours(largeInterval), 11L) } test("minutes") { diff --git a/sql/core/src/test/resources/sql-tests/results/date_part.sql.out b/sql/core/src/test/resources/sql-tests/results/date_part.sql.out index 8f4edf1960755..a10e25540bcc0 100644 --- a/sql/core/src/test/resources/sql-tests/results/date_part.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/date_part.sql.out @@ -615,7 +615,7 @@ struct -- !query 76 select date_part('day', c) from t2 -- !query 76 schema -struct +struct -- !query 76 output 8 @@ -623,7 +623,7 @@ struct -- !query 77 select date_part('d', c) from t2 -- !query 77 schema -struct +struct -- !query 77 output 8 @@ -631,7 +631,7 @@ struct -- !query 78 select date_part('days', c) from t2 -- !query 78 schema -struct +struct -- !query 78 output 8 @@ -639,7 +639,7 @@ struct -- !query 79 select date_part('hour', c) from t2 -- !query 79 schema -struct +struct -- !query 79 output 7 @@ -647,7 +647,7 @@ struct -- !query 80 select date_part('h', c) from t2 -- !query 80 schema -struct +struct -- !query 80 output 7 @@ -655,7 +655,7 @@ struct -- !query 81 select date_part('hours', c) from t2 -- !query 81 schema -struct +struct -- !query 81 output 7 @@ -663,7 +663,7 @@ struct -- !query 82 select date_part('hr', c) from t2 -- !query 82 schema -struct +struct -- !query 82 output 7 @@ -671,7 +671,7 @@ struct -- !query 83 select date_part('hrs', c) from t2 -- !query 83 schema -struct +struct -- !query 83 output 7 From 2f901894eb24d7deec31947708b9afdaae2a4866 Mon Sep 17 00:00:00 2001 From: "Liu,Linhong" Date: Fri, 1 Nov 2019 10:14:25 +0800 Subject: [PATCH 16/16] address comments --- .../org/apache/spark/sql/catalyst/util/IntervalUtils.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala index 6d8e02b2463c6..73e9f37c94528 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala @@ -202,7 +202,7 @@ object IntervalUtils { val days = if (m.group(2) == null) { 0 } else { - toLongWithRange("day", m.group(3), 0, Integer.MAX_VALUE) + toLongWithRange("day", m.group(3), 0, Integer.MAX_VALUE).toInt } var hours: Long = 0L var minutes: Long = 0L @@ -238,7 +238,7 @@ object IntervalUtils { micros = Math.addExact(micros, Math.multiplyExact(hours, MICROS_PER_HOUR)) micros = Math.addExact(micros, Math.multiplyExact(minutes, MICROS_PER_MINUTE)) micros = Math.addExact(micros, Math.multiplyExact(seconds, DateTimeUtils.MICROS_PER_SECOND)) - new CalendarInterval(0, sign * days.toInt, sign * micros) + new CalendarInterval(0, sign * days, sign * micros) } catch { case e: Exception => throw new IllegalArgumentException(