Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support duration datatype & add test #28

Merged
merged 4 commits into from
Dec 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package com.vesoft.nebula.connector

import com.vesoft.nebula.PropertyType
import com.vesoft.nebula.client.graph.data.{DateTimeWrapper, DateWrapper, TimeWrapper}
import com.vesoft.nebula.client.graph.data.{DateTimeWrapper, DurationWrapper, TimeWrapper}
import com.vesoft.nebula.meta.{ColumnDef, ColumnTypeDef}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.{
Expand All @@ -16,7 +16,6 @@ import org.apache.spark.sql.types.{
FloatType,
IntegerType,
LongType,
NullType,
StringType,
StructType,
TimestampType
Expand All @@ -43,7 +42,7 @@ object NebulaUtils {
case PropertyType.FLOAT | PropertyType.DOUBLE => DoubleType
case PropertyType.TIMESTAMP => LongType
case PropertyType.FIXED_STRING | PropertyType.STRING | PropertyType.DATE | PropertyType.TIME |
PropertyType.DATETIME | PropertyType.GEOGRAPHY =>
PropertyType.DATETIME | PropertyType.GEOGRAPHY | PropertyType.DURATION =>
StringType
case PropertyType.UNKNOWN => throw new IllegalArgumentException("unsupported data type")
}
Expand Down Expand Up @@ -99,6 +98,9 @@ object NebulaUtils {
UTF8String.fromString(prop.asInstanceOf[DateTimeWrapper].getUTCDateTimeStr))
} else if (prop.isInstanceOf[TimeWrapper]) {
row.update(pos, UTF8String.fromString(prop.asInstanceOf[TimeWrapper].getUTCTimeStr))
} else if (prop.isInstanceOf[DurationWrapper]) {
row.update(pos,
UTF8String.fromString(prop.asInstanceOf[DurationWrapper].getDurationString))
} else {
row.update(pos, UTF8String.fromString(String.valueOf(prop)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ abstract class NebulaPartitionReader extends InputPartitionReader[InternalRow] {
if (value.isGeography) {
getters(i).apply(value.asGeography(), mutableRow, i)
}
if (value.isDuration) {
getters(i).apply(value.asDuration(), mutableRow, i)
}
}
mutableRow
}
Expand Down
6 changes: 6 additions & 0 deletions nebula-spark-connector/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Global logging configuration
log4j.rootLogger=INFO, stdout
# Console output...
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n
28 changes: 14 additions & 14 deletions nebula-spark-connector/src/test/resources/vertex.csv
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
id,col1,col2,col3,col4,col5,col6,col7,col8,col9,col10,col11,col12,col13,col14
1,Tom,tom,10,20,30,40,2021-01-27,2021-01-01T12:10:10,43535232,true,1.0,2.0,10:10:10,POINT(1 2)
2,Jina,Jina,11,21,31,41,2021-01-28,2021-01-02T12:10:10,43535232,false,1.1,2.1,11:10:10,POINT(3 4)
3,Tim,Tim,12,22,32,42,2021-01-29,2021-01-03T12:10:10,43535232,false,1.2,2.2,12:10:10,POINT(5 6)
4,张三,张三,13,23,33,43,2021-01-30,2021-01-04T12:10:10,43535232,true,1.3,2.3,13:10:10,POINT(6 7)
5,李四,李四,14,24,34,44,2021-02-01,2021-01-05T12:10:10,43535232,false,1.4,2.4,14:10:10,POINT(1 5)
6,王五,王五,15,25,35,45,2021-02-02,2021-01-06T12:10:10,0,false,1.5,2.5,15:10:10,"LINESTRING(1 3, 4.7 73.23)"
7,Jina,Jina,16,26,36,46,2021-02-03,2021-01-07T12:10:10,43535232,true,1.6,2.6,16:10:10,"LINESTRING(1 3, 4.7 73.23)"
8,Jina,Jina,17,27,37,47,2021-02-04,2021-01-08T12:10:10,43535232,false,1.7,2.7,17:10:10,"LINESTRING(1 3, 4.7 73.23)"
9,Jina,Jina,18,28,38,48,2021-02-05,2021-01-09T12:10:10,43535232,true,1.8,2.8,18:10:10,"LINESTRING(1 3, 4.7 73.23)"
10,Jina,Jina,19,29,39,49,2021-02-06,2021-01-10T12:10:10,43535232,false,1.9,2.9,19:10:10,"LINESTRING(1 3, 4.7 73.23)"
-1,Jina,Jina,20,30,40,50,2021-02-07,2021-02-11T12:10:10,43535232,false,2.0,3.0,20:10:10,"POLYGON((0 1, 1 2, 2 3, 0 1))"
-2,Jina,Jina,21,31,41,51,2021-02-08,2021-03-12T12:10:10,43535232,false,2.1,3.1,21:10:10,"POLYGON((0 1, 1 2, 2 3, 0 1))"
-3,Jina,Jina,22,32,42,52,2021-02-09,2021-04-13T12:10:10,43535232,false,2.2,3.2,22:10:10,"POLYGON((0 1, 1 2, 2 3, 0 1))"
id,col1,col2,col3,col4,col5,col6,col7,col8,col9,col10,col11,col12,col13,col14,col15
1,Tom,tom,10,20,30,40,2021-01-27,2021-01-01T12:10:10,43535232,true,1.0,2.0,10:10:10,POINT(1 2),"duration({years:1,months:1,seconds:1})"
2,Jina,Jina,11,21,31,41,2021-01-28,2021-01-02T12:10:10,43535232,false,1.1,2.1,11:10:10,POINT(3 4),"duration({years:1,months:1,seconds:1})"
3,Tim,Tim,12,22,32,42,2021-01-29,2021-01-03T12:10:10,43535232,false,1.2,2.2,12:10:10,POINT(5 6),"duration({years:1,months:1,seconds:1})"
4,张三,张三,13,23,33,43,2021-01-30,2021-01-04T12:10:10,43535232,true,1.3,2.3,13:10:10,POINT(6 7),"duration({years:1,months:1,seconds:1})"
5,李四,李四,14,24,34,44,2021-02-01,2021-01-05T12:10:10,43535232,false,1.4,2.4,14:10:10,POINT(1 5),"duration({years:1,months:1,seconds:1})"
6,王五,王五,15,25,35,45,2021-02-02,2021-01-06T12:10:10,0,false,1.5,2.5,15:10:10,"LINESTRING(1 3, 4.7 73.23)","duration({years:1,months:1,seconds:1})"
7,Jina,Jina,16,26,36,46,2021-02-03,2021-01-07T12:10:10,43535232,true,1.6,2.6,16:10:10,"LINESTRING(1 3, 4.7 73.23)","duration({years:1,months:1,seconds:1})"
8,Jina,Jina,17,27,37,47,2021-02-04,2021-01-08T12:10:10,43535232,false,1.7,2.7,17:10:10,"LINESTRING(1 3, 4.7 73.23)","duration({years:1,months:1,seconds:1})"
9,Jina,Jina,18,28,38,48,2021-02-05,2021-01-09T12:10:10,43535232,true,1.8,2.8,18:10:10,"LINESTRING(1 3, 4.7 73.23)","duration({years:1,months:1,seconds:1})"
10,Jina,Jina,19,29,39,49,2021-02-06,2021-01-10T12:10:10,43535232,false,1.9,2.9,19:10:10,"LINESTRING(1 3, 4.7 73.23)","duration({years:1,months:1,seconds:1})"
-1,Jina,Jina,20,30,40,50,2021-02-07,2021-02-11T12:10:10,43535232,false,2.0,3.0,20:10:10,"POLYGON((0 1, 1 2, 2 3, 0 1))","duration({years:1,months:1,seconds:1})"
-2,Jina,Jina,21,31,41,51,2021-02-08,2021-03-12T12:10:10,43535232,false,2.1,3.1,21:10:10,"POLYGON((0 1, 1 2, 2 3, 0 1))","duration({years:1,months:1,seconds:1})"
-3,Jina,Jina,22,32,42,52,2021-02-09,2021-04-13T12:10:10,43535232,false,2.2,3.2,22:10:10,"POLYGON((0 1, 1 2, 2 3, 0 1))","duration({years:1,months:1,seconds:1})"
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class NebulaUtilsSuite extends AnyFunSuite {
assert(NebulaUtils.convertDataType(new ColumnTypeDef(PropertyType.DATE)) == StringType)
assert(NebulaUtils.convertDataType(new ColumnTypeDef(PropertyType.DATETIME)) == StringType)
assert(NebulaUtils.convertDataType(new ColumnTypeDef(PropertyType.TIME)) == StringType)
assert(NebulaUtils.convertDataType(new ColumnTypeDef(PropertyType.GEOGRAPHY)) == StringType)
assert(NebulaUtils.convertDataType(new ColumnTypeDef(PropertyType.DURATION)) == StringType)
assertThrows[IllegalArgumentException](
NebulaUtils.convertDataType(new ColumnTypeDef(PropertyType.UNKNOWN)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ class NebulaGraphMock {
val createSpace = "CREATE SPACE IF NOT EXISTS test_int(partition_num=10, vid_type=int64);" +
"USE test_int;" + "CREATE TAG IF NOT EXISTS person(col1 string, col2 fixed_string(8), col3 int8, col4 int16, col5 int32, col6 int64, col7 date, col8 datetime, col9 timestamp, col10 bool, col11 double, col12 float, col13 time);" +
"CREATE EDGE IF NOT EXISTS friend(col1 string, col2 fixed_string(8), col3 int8, col4 int16, col5 int32, col6 int64, col7 date, col8 datetime, col9 timestamp, col10 bool, col11 double, col12 float, col13 time);" +
"CREATE TAG IF NOT EXISTS geo_shape(geo geography);"
"CREATE TAG IF NOT EXISTS geo_shape(geo geography);" +
"CREATE TAG IF NOT EXISTS tag_duration(col duration);"
val createResp = session.execute(createSpace)
if (!createResp.isSucceeded) {
close()
Expand Down Expand Up @@ -124,7 +125,8 @@ class NebulaGraphMock {
" 19:(\"person19\", \"person22\", 25, 1500, 14000, 1488888, date(\"2021-01-14\"), datetime(\"2021-01-12T12:00:00\"),timestamp(\"2021-01-12T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\"))," +
" 22:(\"person22\", \"person22\", 26, 1500, 14000, 1488888, date(\"2021-01-14\"), datetime(\"2021-01-12T12:00:00\"),timestamp(\"2021-01-12T12:00:00\"), true, 1.0, 2.0, time(\"12:01:01\")), " +
" 0:(null, null, null, null, null, null, null, null, null, null, null, null, null);" +
"INSERT VERTEX geo_shape(geo) VALUES 100:(ST_GeogFromText(\"POINT(1 2)\")), 101:(ST_GeogFromText(\"LINESTRING(1 2, 3 4)\")), 102:(ST_GeogFromText(\"POLYGON((0 1, 1 2, 2 3, 0 1))\"))"
"INSERT VERTEX geo_shape(geo) VALUES 100:(ST_GeogFromText(\"POINT(1 2)\")), 101:(ST_GeogFromText(\"LINESTRING(1 2, 3 4)\")), 102:(ST_GeogFromText(\"POLYGON((0 1, 1 2, 2 3, 0 1))\"));" +
"INSERT VERTEX tag_duration(col) VALUES 200:(duration({months:1, seconds:100, microseconds:20}))"

val insertTagResp = session.execute(insertTag)
if (!insertTagResp.isSucceeded) {
Expand Down Expand Up @@ -158,7 +160,8 @@ class NebulaGraphMock {
val session = pool.getSession("root", "nebula", true)

val createSpace = "CREATE SPACE IF NOT EXISTS test_write_string(partition_num=10,vid_type=fixed_string(8));" +
"USE test_write_string;" + "CREATE TAG IF NOT EXISTS person_connector(col1 string, col2 fixed_string(8), col3 int8, col4 int16, col5 int32, col6 int64, col7 date, col8 datetime, col9 timestamp, col10 bool, col11 double, col12 float, col13 time, col14 geography);" +
"USE test_write_string;" +
"CREATE TAG IF NOT EXISTS person_connector(col1 string, col2 fixed_string(8), col3 int8, col4 int16, col5 int32, col6 int64, col7 date, col8 datetime, col9 timestamp, col10 bool, col11 double, col12 float, col13 time, col14 geography, col15 duration);" +
"CREATE EDGE IF NOT EXISTS friend_connector(col1 string, col2 fixed_string(8), col3 int8, col4 int16, col5 int32, col6 int64, col7 date, col8 datetime, col9 timestamp, col10 bool, col11 double, col12 float, col13 time, col14 geography);";
val createResp = session.execute(createSpace)
if (!createResp.isSucceeded) {
Expand All @@ -172,7 +175,8 @@ class NebulaGraphMock {
val session = pool.getSession("root", "nebula", true)

val createSpace = "CREATE SPACE IF NOT EXISTS test_write_int(partition_num=10, vid_type=int64);" +
"USE test_write_int;" + "CREATE TAG IF NOT EXISTS person_connector(col1 string, col2 fixed_string(8), col3 int8, col4 int16, col5 int32, col6 int64, col7 date, col8 datetime, col9 timestamp, col10 bool, col11 double, col12 float, col13 time, col14 geography);" +
"USE test_write_int;" +
"CREATE TAG IF NOT EXISTS person_connector(col1 string, col2 fixed_string(8), col3 int8, col4 int16, col5 int32, col6 int64, col7 date, col8 datetime, col9 timestamp, col10 bool, col11 double, col12 float, col13 time, col14 geography, col15 duration);" +
"CREATE EDGE IF NOT EXISTS friend_connector(col1 string, col2 fixed_string(8), col3 int8, col4 int16, col5 int32, col6 int64, col7 date, col8 datetime, col9 timestamp, col10 bool, col11 double, col12 float, col13 time, col14 geography);";
val createResp = session.execute(createSpace)
if (!createResp.isSucceeded) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ package com.vesoft.nebula.connector.nebula

import com.vesoft.nebula.connector.connector.Address
import com.vesoft.nebula.connector.mock.NebulaGraphMock
import org.apache.log4j.BasicConfigurator
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite

class GraphProviderTest extends AnyFunSuite with BeforeAndAfterAll {
BasicConfigurator.configure()

var graphProvider: GraphProvider = null

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import com.vesoft.nebula.connector.DataTypeEnum
import com.vesoft.nebula.connector.connector.Address
import com.vesoft.nebula.connector.mock.NebulaGraphMock
import com.vesoft.nebula.meta.Schema
import org.apache.log4j.BasicConfigurator
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite

class MetaProviderTest extends AnyFunSuite with BeforeAndAfterAll {
BasicConfigurator.configure()
var metaProvider: MetaProvider = null

override def beforeAll(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import com.facebook.thrift.protocol.TCompactProtocol
import com.vesoft.nebula.connector.connector.NebulaDataFrameReader
import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig}
import com.vesoft.nebula.connector.mock.NebulaGraphMock
import org.apache.log4j.BasicConfigurator
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoders, SparkSession}
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite

class ReadSuite extends AnyFunSuite with BeforeAndAfterAll {

BasicConfigurator.configure()
var sparkSession: SparkSession = null

override def beforeAll(): Unit = {
Expand Down Expand Up @@ -192,6 +193,39 @@ class ReadSuite extends AnyFunSuite with BeforeAndAfterAll {
})(Encoders.STRING)
}

test("read vertex for tag_duration") {
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("127.0.0.1:9559")
.withConenctionRetry(2)
.build()
val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("test_int")
.withLabel("tag_duration")
.withNoColumn(false)
.withReturnCols(List("col"))
.withLimit(10)
.withPartitionNum(10)
.build()
val vertex = sparkSession.read.nebula(config, nebulaReadVertexConfig).loadVerticesToDF()
vertex.printSchema()
vertex.show()
assert(vertex.count() == 1)
assert(vertex.schema.fields.length == 2)

vertex.map(row => {
row.getAs[Long]("_vertexId") match {
case 200L => {
assert(
row.getAs[String]("col").equals("duration({months:1, seconds:100, microseconds:20})"))
}
}
""
})(Encoders.STRING)
}

test("read edge with no properties") {
val config =
NebulaConnectionConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package com.vesoft.nebula.connector.writer

import com.vesoft.nebula.connector.KeyPolicy
import com.vesoft.nebula.connector.connector.{NebulaEdge, NebulaEdges, NebulaVertex, NebulaVertices}
import org.apache.log4j.BasicConfigurator
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.types.{
Expand All @@ -23,6 +24,7 @@ import org.scalatest.funsuite.AnyFunSuite
import scala.collection.mutable.ListBuffer

class NebulaExecutorSuite extends AnyFunSuite with BeforeAndAfterAll {
BasicConfigurator.configure()
var schema: StructType = _
var row: InternalRow = _

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import com.vesoft.nebula.client.graph.data.ResultSet
import com.vesoft.nebula.connector.connector.Address
import com.vesoft.nebula.connector.mock.{NebulaGraphMock, SparkMock}
import com.vesoft.nebula.connector.nebula.GraphProvider
import org.apache.log4j.BasicConfigurator
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite

class WriteDeleteSuite extends AnyFunSuite with BeforeAndAfterAll {
BasicConfigurator.configure()

override def beforeAll(): Unit = {
val graphMock = new NebulaGraphMock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
package com.vesoft.nebula.connector.writer

import com.vesoft.nebula.client.graph.data.ResultSet
import com.vesoft.nebula.connector.connector.{Address}
import com.vesoft.nebula.connector.connector.Address
import com.vesoft.nebula.connector.mock.{NebulaGraphMock, SparkMock}
import com.vesoft.nebula.connector.nebula.GraphProvider
import org.apache.log4j.BasicConfigurator
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite

class WriteInsertSuite extends AnyFunSuite with BeforeAndAfterAll {
BasicConfigurator.configure()

override def beforeAll(): Unit = {
val graphMock = new NebulaGraphMock
Expand Down