Skip to content

Commit

Permalink
Move class definitions outside packages
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones committed Dec 19, 2023
1 parent 1671db7 commit bf26cf1
Show file tree
Hide file tree
Showing 41 changed files with 1,615 additions and 1,433 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.spotify.scio.avro.types

import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.typelevel.scalaccompat.annotation.unused

import scala.annotation.{compileTimeOnly, nowarn, StaticAnnotation}
import scala.reflect.runtime.universe._
Expand Down Expand Up @@ -243,6 +244,22 @@ object AvroType {
def apply[T: TypeTag]: AvroType[T] = new AvroType[T]
}

/**
* Case class and argument annotation to get Avro field and record docs.
*
* To be used with case class fields annotated with [[AvroType.toSchema]], For example:
*
* Example:
*
* {{{
* AvroType.toSchema
* @doc("User Record")
* case class User(@doc("user name") name: String,
* @doc("user age") age: Int)
* }}}
*/
class doc(@unused value: String) extends StaticAnnotation

/**
* Type class for case class `T` annotated for Avro IO.
*
Expand Down
40 changes: 0 additions & 40 deletions scio-avro/src/main/scala/com/spotify/scio/avro/types/types.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ import com.spotify.scio.ScioContext
import com.spotify.scio.io.{EmptyTap, EmptyTapOf, ScioIO, Tap}
import com.spotify.scio.io.TapT

case class CassandraOptions(
keyspace: String,
table: String,
cql: String,
seedNodeHost: String,
seedNodePort: Int = -1,
username: String = null,
password: String = null
)

final case class CassandraIO[T](opts: CassandraOptions) extends ScioIO[T] {
override type ReadP = Nothing
override type WriteP = CassandraIO.WriteParam[T]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,56 +17,11 @@

package com.spotify.scio

import com.spotify.scio.io.ClosedTap
import com.spotify.scio.values.SCollection

/**
* Main package for Cassandra APIs. Import all.
*
* {{{
* import com.spotify.scio.cassandra._
* }}}
*/
package object cassandra {
case class CassandraOptions(
keyspace: String,
table: String,
cql: String,
seedNodeHost: String,
seedNodePort: Int = -1,
username: String = null,
password: String = null
)

/**
* Enhanced version of [[com.spotify.scio.values.SCollection SCollection]] with Cassandra methods.
*/
implicit class CassandraSCollection[T](@transient private val self: SCollection[T])
extends AnyVal {

/**
* Save this SCollection as a Cassandra table.
*
* Cassandra `org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter` is used to perform bulk
* writes for better throughput. The [[com.spotify.scio.values.SCollection SCollection]] is
* grouped by the table partition key before written to the cluster. Therefore writes only occur
* at the end of each window in streaming mode. The bulk writer writes to all nodes in a cluster
* so remote nodes in a multi-datacenter cluster may become a bottleneck.
*
* '''NOTE: this module is optimized for throughput in batch mode and not recommended for
* streaming mode.'''
*
* @param opts
* Cassandra options
* @param parallelism
* number of concurrent bulk writers, default to number of Cassandra nodes
* @param f
* function to convert input data to values for the CQL statement
*/
def saveAsCassandra(
opts: CassandraOptions,
parallelism: Int = CassandraIO.WriteParam.DefaultParallelism
)(f: T => Seq[Any]): ClosedTap[Nothing] =
self.write(CassandraIO[T](opts))(CassandraIO.WriteParam(f, parallelism))
}
}
package object cassandra extends syntax.SCollectionSyntax
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2023 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.spotify.scio.cassandra.syntax

import com.spotify.scio.cassandra.{CassandraIO, CassandraOptions}
import com.spotify.scio.io.ClosedTap
import com.spotify.scio.values.SCollection

/**
* Enhanced version of [[com.spotify.scio.values.SCollection SCollection]] with Cassandra methods.
*/
class CassandraSCollection[T](@transient private val self: SCollection[T]) extends AnyVal {

/**
* Save this SCollection as a Cassandra table.
*
* Cassandra `org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter` is used to perform bulk writes
* for better throughput. The [[com.spotify.scio.values.SCollection SCollection]] is grouped by
* the table partition key before written to the cluster. Therefore writes only occur at the end
* of each window in streaming mode. The bulk writer writes to all nodes in a cluster so remote
* nodes in a multi-datacenter cluster may become a bottleneck.
*
* '''NOTE: this module is optimized for throughput in batch mode and not recommended for
* streaming mode.'''
*
* @param opts
* Cassandra options
* @param parallelism
* number of concurrent bulk writers, default to number of Cassandra nodes
* @param f
* function to convert input data to values for the CQL statement
*/
def saveAsCassandra(
opts: CassandraOptions,
parallelism: Int = CassandraIO.WriteParam.DefaultParallelism
)(f: T => Seq[Any]): ClosedTap[Nothing] =
self.write(CassandraIO[T](opts))(CassandraIO.WriteParam(f, parallelism))
}

trait SCollectionSyntax {
implicit def cassandraSCollectionOps[T](sc: SCollection[T]): CassandraSCollection[T] =
new CassandraSCollection[T](sc)
}
166 changes: 1 addition & 165 deletions scio-core/src/main/scala/com/spotify/scio/hash/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,175 +17,11 @@

package com.spotify.scio

import com.spotify.scio.values.{SCollection, SideInput}

/**
* Main package for hash APIs. Import all.
*
* {{{
* import com.spotify.scio.hash._
* }}}
*/
package object hash {
implicit class ApproxFilterIterable[T](private val self: Iterable[T]) extends AnyVal {

/**
* Creates an [[ApproxFilter]] from this [[Iterable]] with the collection size as
* `expectedInsertions` and default `fpp` of 0.03.
*
* Note that overflowing an [[ApproxFilter]] with significantly more elements than specified,
* will result in its saturation, and a sharp deterioration of its false positive probability.
*
* @param c
* companion object of the [[ApproxFilter]] implementation, e.g. [[BloomFilter]].
*/
def asApproxFilter[C <: ApproxFilterCompanion](c: C)(implicit hash: c.Hash[T]): c.Filter[T] =
c.create(self)

/**
* Creates an [[ApproxFilter]] from this [[Iterable]] with the expected number of insertions and
* default `fpp` of 0.03.
*
* Note that overflowing an [[ApproxFilter]] with significantly more elements than specified,
* will result in its saturation, and a sharp deterioration of its false positive probability.
*
* @param c
* companion object of the [[ApproxFilter]] implementation, e.g. [[BloomFilter]].
*/
def asApproxFilter[C <: ApproxFilterCompanion](c: C, expectedInsertions: Long)(implicit
hash: c.Hash[T]
): c.Filter[T] =
c.create(self, expectedInsertions)

/**
* Creates an [[ApproxFilter]] from this [[Iterable]] with the expected number of insertions and
* expected false positive probability.
*
* Note that overflowing an [[ApproxFilter]] with significantly more elements than specified,
* will result in its saturation, and a sharp deterioration of its false positive probability.
*
* @param c
* companion object of the [[ApproxFilter]] implementation, e.g. [[BloomFilter]].
*/
def asApproxFilter[C <: ApproxFilterCompanion](c: C, expectedInsertions: Long, fpp: Double)(
implicit hash: c.Hash[T]
): c.Filter[T] =
c.create(self, expectedInsertions, fpp)
}

implicit class ApproxFilterSCollection[T](private val self: SCollection[T]) extends AnyVal {

/**
* Creates an [[ApproxFilter]] from this [[SCollection]] with the collection size as
* `expectedInsertions` and default `fpp` of 0.03.
*
* Note that overflowing an [[ApproxFilter]] with significantly more elements than specified,
* will result in its saturation, and a sharp deterioration of its false positive probability.
*
* @param c
* companion object of the [[ApproxFilter]] implementation, e.g. [[BloomFilter]].
*/
def asApproxFilter[C <: ApproxFilterCompanion](
c: C
)(implicit hash: c.Hash[T]): SCollection[c.Filter[T]] =
c.create(self)

/**
* Creates an [[ApproxFilter]] from this [[SCollection]] with the expected number of insertions
* and default `fpp` of 0.03.
*
* Note that overflowing an [[ApproxFilter]] with significantly more elements than specified,
* will result in its saturation, and a sharp deterioration of its false positive probability.
*
* @param c
* companion object of the [[ApproxFilter]] implementation, e.g. [[BloomFilter]].
*/
def asApproxFilter[C <: ApproxFilterCompanion](c: C, expectedInsertions: Long)(implicit
hash: c.Hash[T]
): SCollection[c.Filter[T]] =
c.create(self, expectedInsertions)

/**
* Creates an [[ApproxFilter]] from this [[SCollection]] with the expected number of insertions
* and expected false positive probability.
*
* Note that overflowing an [[ApproxFilter]] with significantly more elements than specified,
* will result in its saturation, and a sharp deterioration of its false positive probability.
*
* @param c
* companion object of the [[ApproxFilter]] implementation, e.g. [[BloomFilter]].
*/
def asApproxFilter[C <: ApproxFilterCompanion](c: C, expectedInsertions: Long, fpp: Double)(
implicit hash: c.Hash[T]
): SCollection[c.Filter[T]] =
c.create(self, expectedInsertions, fpp)

/**
* Creates a `SideInput[ApproxFilter]` from an [[SCollection]] with the collection size as
* `expectedInsertions` and false positive probability of 0.03.
*
* Note that overflowing an [[ApproxFilter]] with significantly more elements than specified,
* will result in its saturation, and a sharp deterioration of its false positive probability.
*
* Since this results in one filter as a [[SideInput]] care should be taken that the size of the
* filter does not exceed the runner recommended max size of Side Inputs (100 MB for Dataflow)
* This implies that `expectedInsertions` should not exceed 112 Million with a fp of 0.03 on
* Dataflow.
*/
def asApproxFilterSideInput[C <: ApproxFilterCompanion](
c: C
)(implicit
hash: c.Hash[T]
): SideInput[c.Filter[T]] =
c.createSideInput(self)

/**
* Creates a `SideInput[ApproxFilter]` from an [[SCollection]] with the expected number of
* insertions and expected false positive probability.
*
* The `expectedInsertions` should be approximately the number of unique elements in the
* SCollection.
*
* The default false positive probability is 0.03
*
* Note that overflowing an [[ApproxFilter]] with significantly more elements than specified,
* will result in its saturation, and a sharp deterioration of its false positive probability.
*
* Since this results in one filter as a [[SideInput]] care should be taken that the size of the
* filter does not exceed the runner recommended max size of Side Inputs (100 MB for Dataflow)
* This implies that `expectedInsertions` should not exceed 112 Million with a fp of 0.03 on
* Dataflow.
*/
def asApproxFilterSideInput[C <: ApproxFilterCompanion](
c: C,
expectedInsertions: Long
)(implicit
hash: c.Hash[T]
): SideInput[c.Filter[T]] =
c.createSideInput(self, expectedInsertions)

/**
* Creates a `SideInput[ApproxFilter]` from an [[SCollection]] with the expected number of
* insertions and expected false positive probability.
*
* The `expectedInsertions` should be approximately the number of unique elements in the
* SCollection.
*
* Note that overflowing an [[ApproxFilter]] with significantly more elements than specified,
* will result in its saturation, and a sharp deterioration of its false positive probability.
*
* Since this results in one filter as a [[SideInput]] care should be taken that the size of the
* filter does not exceed the runner recommended max size of Side Inputs (100 MB for Dataflow)
* This implies that `expectedInsertions` should not exceed 112 Million with a fp of 0.03 on
* Dataflow.
*/
def asApproxFilterSideInput[C <: ApproxFilterCompanion](
c: C,
expectedInsertions: Long,
fpp: Double
)(implicit
hash: c.Hash[T]
): SideInput[c.Filter[T]] =
c.createSideInput(self, expectedInsertions, fpp)
}
}
package object hash extends syntax.AllSyntax
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package com.spotify.scio.hash.syntax

class AllSyntax extends SCollectionSyntax with IterableSyntax
Loading

0 comments on commit bf26cf1

Please sign in to comment.