Skip to content

Commit

Permalink
fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
patrick-schultz committed Mar 19, 2024
1 parent 375fead commit 5d534f0
Showing 1 changed file with 28 additions and 21 deletions.
49 changes: 28 additions & 21 deletions hail/src/main/scala/is/hail/rvd/AbstractRVDSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -484,31 +484,37 @@ case class IndexedRVDSpec2(
): IR => TableStage = newPartitioner match {
case Some(np) =>
val part = partitioner(ctx.stateManager)
/* ensure the old and new partitioners have the same key, and ensure the new partitioner is
* strict */
val extendedNP = np.extendKey(part.kType)
val tmpPartitioner = part.intersect(extendedNP)

assert(key.nonEmpty)

val rSpec = typedCodecSpec
val reader =
ir.PartitionNativeReaderIndexed(rSpec, indexSpec, part.kType.fieldNames, uidFieldName)

val absPath = path
val partPaths = tmpPartitioner.rangeBounds.map(b => partFiles(part.lowerBoundInterval(b)))

val kSize = part.kType.size
absolutePartPaths(path)
assert(tmpPartitioner.rangeBounds.size == partPaths.length)
val contextsValues: IndexedSeq[Row] = tmpPartitioner.rangeBounds.map { interval =>
val partIdx = part.lowerBoundInterval(interval)
val partPath = partFiles(partIdx)
Row(
partIdx.toLong,
s"$absPath/parts/$partPath",
s"$absPath/${indexSpec.relPath}/$partPath.idx",
RVDPartitioner.intervalToIRRepresentation(interval, kSize),
val reader = ir.PartitionNativeReaderIndexed(
typedCodecSpec,
indexSpec,
part.kType.fieldNames,
uidFieldName,
)

val (contextsValues, tmpRangeBounds) = (for {
newInterval <- extendedNP.rangeBounds
oldPartIdx <- part.queryInterval(newInterval)
} yield {
val oldInterval = part.rangeBounds(oldPartIdx)
val intersectionInterval =
newInterval.intersect(extendedNP.kord.intervalEndpointOrdering, oldInterval).get

val partFile = partFiles(oldPartIdx)
val ctx = Row(
oldPartIdx.toLong,
s"$path/parts/$partFile",
s"$path/${indexSpec.relPath}/$partFile.idx",
RVDPartitioner.intervalToIRRepresentation(intersectionInterval, part.kType.size),
)
}
(ctx, intersectionInterval)
}).unzip
val tmpPartitioner = new RVDPartitioner(part.sm, part.kType, tmpRangeBounds)

assert(TArray(reader.contextType).typeCheck(contextsValues))

Expand All @@ -524,7 +530,8 @@ case class IndexedRVDSpec2(
contexts,
body,
)
if (filterIntervals) ts.repartitionNoShuffle(ctx, part, dropEmptyPartitions = true)
if (filterIntervals)
ts.repartitionNoShuffle(ctx, part.strictify(), dropEmptyPartitions = true)
else ts.repartitionNoShuffle(ctx, extendedNP)
}

Expand Down

0 comments on commit 5d534f0

Please sign in to comment.