Skip to content

Commit

Permalink
Merge pull request #628 from fb64/improve-arrow-reading
Browse files Browse the repository at this point in the history
Allow any ArrowReader implementation to be use for reading Arrow data #627
  • Loading branch information
Jolanrensen authored Apr 3, 2024
2 parents ff139d7 + 56d27e6 commit 2e1986b
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 5 deletions.
2 changes: 2 additions & 0 deletions dataframe-arrow/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ dependencies {
testImplementation(libs.kotestAssertions) {
exclude("org.jetbrains.kotlin", "kotlin-stdlib-jdk8")
}
testImplementation(libs.arrow.c.data)
testImplementation(libs.duckdb.jdbc)
}

kotlinPublications {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ internal fun DataFrame.Companion.readArrowImpl(
add(df)
}
}
is ArrowStreamReader -> {
else -> {
val root = reader.vectorSchemaRoot
val schema = root.schema
while (reader.loadNextBatch()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.apache.arrow.vector.TimeStampSecVector
import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.ipc.ArrowFileReader
import org.apache.arrow.vector.ipc.ArrowFileWriter
import org.apache.arrow.vector.ipc.ArrowReader
import org.apache.arrow.vector.ipc.ArrowStreamReader
import org.apache.arrow.vector.ipc.ArrowStreamWriter
import org.apache.arrow.vector.types.FloatingPointPrecision
Expand All @@ -21,6 +22,9 @@ import org.apache.arrow.vector.types.pojo.FieldType
import org.apache.arrow.vector.types.pojo.Schema
import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel
import org.apache.arrow.vector.util.Text
import org.duckdb.DuckDBConnection
import org.duckdb.DuckDBResultSet
import org.jetbrains.kotlinx.dataframe.AnyFrame
import org.jetbrains.kotlinx.dataframe.DataColumn
import org.jetbrains.kotlinx.dataframe.DataFrame
import org.jetbrains.kotlinx.dataframe.api.NullabilityOptions
Expand All @@ -29,17 +33,20 @@ import org.jetbrains.kotlinx.dataframe.api.columnOf
import org.jetbrains.kotlinx.dataframe.api.convertToBoolean
import org.jetbrains.kotlinx.dataframe.api.copy
import org.jetbrains.kotlinx.dataframe.api.dataFrameOf
import org.jetbrains.kotlinx.dataframe.api.describe
import org.jetbrains.kotlinx.dataframe.api.map
import org.jetbrains.kotlinx.dataframe.api.pathOf
import org.jetbrains.kotlinx.dataframe.api.remove
import org.jetbrains.kotlinx.dataframe.api.toColumn
import org.jetbrains.kotlinx.dataframe.exceptions.TypeConverterNotFoundException
import org.junit.Assert
import org.junit.Test
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.io.File
import java.net.URL
import java.nio.channels.Channels
import java.sql.DriverManager
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.ZoneOffset
Expand Down Expand Up @@ -558,23 +565,26 @@ internal class ArrowKtTest {
}
}

@Test
fun testArrowReaderExtension() {
private fun expectedSimpleDataFrame(): AnyFrame {
val dates = listOf(
LocalDateTime.of(2023, 11, 23, 9, 30, 25),
LocalDateTime.of(2020, 11, 23, 9, 30, 25),
LocalDateTime.of(2015, 5, 25, 14, 20, 13),
LocalDateTime.of(2013, 6, 19, 11, 20, 13),
LocalDateTime.of(2000, 1, 1, 0, 0, 0)
)

val expected = dataFrameOf(
return dataFrameOf(
"string" to listOf("a", "b", "c", "d"),
"int" to listOf(1, 2, 3, 4),
"float" to listOf(1.0f, 2.0f, 3.0f, 4.0f),
"double" to listOf(1.0, 2.0, 3.0, 4.0),
"datetime" to dates
)
}

@Test
fun testArrowReaderExtension() {
val expected = expectedSimpleDataFrame()
val featherChannel = ByteArrayReadableSeekableByteChannel(expected.saveArrowFeatherToByteArray())
val arrowFileReader = ArrowFileReader(featherChannel, RootAllocator())
arrowFileReader.toDataFrame() shouldBe expected
Expand All @@ -583,4 +593,24 @@ internal class ArrowKtTest {
val arrowStreamReader = ArrowStreamReader(ipcInputStream, RootAllocator())
arrowStreamReader.toDataFrame() shouldBe expected
}

@Test
fun testDuckDBArrowIntegration() {
val expected = expectedSimpleDataFrame()
val query = """
select 'a' as string, 1 as int, CAST(1.0 as FLOAT) as float, CAST(1.0 as DOUBLE) as double, TIMESTAMP '2020-11-23 09:30:25' as datetime
UNION ALL SELECT 'b', 2, 2.0, 2.0, TIMESTAMP '2015-05-25 14:20:13'
UNION ALL SELECT 'c', 3, 3.0, 3.0, TIMESTAMP '2013-06-19 11:20:13'
UNION ALL SELECT 'd', 4, 4.0, 4.0, TIMESTAMP '2000-01-01 00:00:00'
""".trimIndent()

Class.forName("org.duckdb.DuckDBDriver")
val conn = DriverManager.getConnection("jdbc:duckdb:") as DuckDBConnection
conn.use {
val resultSet = it.createStatement().executeQuery(query) as DuckDBResultSet
val dbArrowReader = resultSet.arrowExportStream(RootAllocator(), 256) as ArrowReader
Assert.assertTrue(dbArrowReader.javaClass.name.equals("org.apache.arrow.c.ArrowArrayStreamReader"))
DataFrame.readArrow(dbArrowReader) shouldBe expected
}
}
}
4 changes: 4 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ shadow = "8.1.1"
android-gradle-api = "7.3.1" # Can't be updated to 7.4.0+ due to Java 8 compatibility
ktor-server-netty = "2.3.8"
kotlin-compile-testing = "1.5.0"
duckdb = "0.10.0"

[libraries]
ksp-gradle = { group = "com.google.devtools.ksp", name = "symbol-processing-gradle-plugin", version.ref = "ksp" }
Expand Down Expand Up @@ -97,6 +98,8 @@ jsoup = { group = "org.jsoup", name = "jsoup", version.ref = "jsoup" }
arrow-format = { group = "org.apache.arrow", name = "arrow-format", version.ref = "arrow" }
arrow-vector = { group = "org.apache.arrow", name = "arrow-vector", version.ref = "arrow" }
arrow-memory = { group = "org.apache.arrow", name = "arrow-memory-unsafe", version.ref = "arrow" }
arrow-c-data = { group = "org.apache.arrow", name = "arrow-c-data", version.ref = "arrow" }


kotlinpoet = { group = "com.squareup", name = "kotlinpoet", version.ref = "kotlinpoet" }
swagger = { group = "io.swagger.parser.v3", name = "swagger-parser", version.ref = "openapi" }
Expand All @@ -120,6 +123,7 @@ kotlin-jupyter-test-kit = { group = "org.jetbrains.kotlinx", name = "kotlin-jupy

dataframe-symbol-processor = { group = "org.jetbrains.kotlinx.dataframe", name = "symbol-processor-all" }

duckdb-jdbc = { group = "org.duckdb", name = "duckdb_jdbc", version.ref= "duckdb"}

[plugins]
jupyter-api = { id = "org.jetbrains.kotlin.jupyter.api", version.ref = "kotlinJupyter" }
Expand Down

0 comments on commit 2e1986b

Please sign in to comment.