Fun with Apache Beam.
Extends the Apache Beam library with convenience functions, mostly to better utilize type inference in Kotlin.
A convenience function to create a new transform with one or more steps.
val newTransform = transform {
it.apply(SomeTransform())
}
A default implementation without this helper would look more like this:
val newTransform = object : PTransform<PCollection<T>, POutput>() {
override fun expand(input: PCollection<T>) = input.apply(SomeTransform)
}
A DSL that utilizes Generics, Scope Functions, and Type Inference to provide the transforms from the Apache Beam library.
val pipeline = Pipeline.create()
pipeline
.sequence()
.window(FixedWindows.of(Duration.standardSeconds(1)))
.map { it * 2 }
.filter { it % 2 == 0L }
.key { it / 10 }
.groupByKey()
.sink(Log.info())
pipeline.run()
This demo's batching of all elements of a bundle. This is a very useful technique for IO heavy DoFn's.
Example: An upstream transform provides a collection of record keys, and the downstream transform needs to fetch the records from a persistent datastore, it is usually better to batch multiple keys into a single request.
A Kotlin specific Apache Beam DSL that allows applying transforms in a direct way onto collections.
Add metadata to schemas or single fields. The metadata and hints are stored as schema options. Directly with the schema and field definition.
val schema = schema {
version("1.0")
// base
field("id", INT64.withNullable(true)) {
// adds the "version" extension
version("1.0")
}
fields(
Field.of("title", STRING),
Field.of("description", STRING)
) {
// adds the "compress" extension to all fields!
compress()
}
}
Demo's the usage of POJO's (Beans) and field schemas in Beam with Kotlin
A running version of the SDF example from the Apache Beam website.
Inversion of Control and Dependency Injection with Apache Beam, Kotlin and Koin.
A demo of the gleam-core
Generator to create fake data for a pipeline.
A demo of the BigtableIO.write
transform.