-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathSBasic.scala
67 lines (51 loc) · 1.86 KB
/
SBasic.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package examples
import java.io.IOException
import edb.client.DBClient
import edb.common.{ExistingTableException, Row, Schema, UnknownTableException}
import edb.server.DBServer
import org.apache.spark.sql.SparkSession
object SBasic {
@throws(classOf[IOException])
@throws(classOf[InterruptedException])
@throws(classOf[ExistingTableException])
@throws(classOf[UnknownTableException])
def main(args: Array[String]) {
val serverHost: String = "localhost"
val serverPort: Int = 50199
val server: DBServer = new DBServer(serverPort)
server.start
System.out.println("*** Example database server started")
val schema: Schema = new Schema
schema.addColumn("i", Schema.ColumnType.INT64)
schema.addColumn("j", Schema.ColumnType.INT64)
val client: DBClient = new DBClient(serverHost, serverPort)
client.createTable("theTable", schema)
val toInsert = new java.util.ArrayList[Row]
val r1: Row = new Row
r1.addField(new Row.Int64Field("i", 100))
r1.addField(new Row.Int64Field("j", 200))
toInsert.add(r1)
val r2: Row = new Row
r2.addField(new Row.Int64Field("i", 300))
r2.addField(new Row.Int64Field("j", 400))
toInsert.add(r2)
client.bulkInsert("theTable", toInsert)
System.out.println("*** Example database server populated with data")
val dataSourceName: String = "datasources.SimpleRowDataSource"
val spark: SparkSession =
SparkSession.builder().appName("SBasic").master("local[4]").getOrCreate()
val data =
spark.read
.format(dataSourceName)
.option("host", serverHost)
.option("port", serverPort)
.load()
System.out.println("*** Schema: ")
data.printSchema()
System.out.println("*** Data: ")
data.show()
System.out.println("*** Number of partitions: " + data.rdd.partitions.length)
spark.stop()
server.stop()
}
}