Skip to content

Commit

Permalink
remove tagless for sst file (#98)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicole00 authored Sep 26, 2022
1 parent 4edf6ce commit 775f7e7
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,7 @@ class VerticesProcessor(spark: SparkSession,
iter.map { row =>
encodeVertex(row, partitionNum, vidType, spaceVidLen, tagItem, fieldTypeMap)
}
}(Encoders.tuple(Encoders.BINARY, Encoders.BINARY, Encoders.BINARY))
.flatMap(line => {
List((line._1, emptyValue), (line._2, line._3))
})(Encoders.tuple(Encoders.BINARY, Encoders.BINARY))
}(Encoders.tuple(Encoders.BINARY, Encoders.BINARY))

// repartition dataframe according to nebula part, to make sure sst files for one part has no overlap
if (tagConfig.repartitionWithNebula) {
Expand Down Expand Up @@ -226,7 +223,7 @@ class VerticesProcessor(spark: SparkSession,
vidType: VidType.Value,
spaceVidLen: Int,
tagItem: TagItem,
fieldTypeMap: Map[String, Int]): (Array[Byte], Array[Byte], Array[Byte]) = {
fieldTypeMap: Map[String, Int]): (Array[Byte], Array[Byte]) = {
// check if vertex id is valid, if not, throw AssertException
isVertexValid(row, tagConfig, false, vidType == VidType.STRING)

Expand Down Expand Up @@ -260,15 +257,14 @@ class VerticesProcessor(spark: SparkSession,
} else {
vertexId.getBytes()
}
val codec = new NebulaCodecImpl()
val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id)
val orphanVertexKey = codec.orphanVertexKey(spaceVidLen, partitionId, vidBytes)
val codec = new NebulaCodecImpl()
val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id)
val values = for {
property <- fieldKeys if property.trim.length != 0
} yield
extraValueForSST(row, property, fieldTypeMap)
.asInstanceOf[AnyRef]
val vertexValue = codec.encodeTag(tagItem, nebulaKeys.asJava, values.asJava)
(orphanVertexKey, vertexKey, vertexValue)
(vertexKey, vertexValue)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,12 @@ class VerticesProcessorSuite {
val tagItem = new TagItem(1, "person".getBytes(), -1, schema)
val map = getFieldType()

val (orphanKey, key, value) =
val (key, value) =
processClazz.encodeVertex(row, 10, VidType.STRING, 10, tagItem, map)

val keyHex = Hex.encodeHexString(key)
val orphanKeyHex = Hex.encodeHexString(orphanKey)
val valueHex = Hex.encodeHexString(value)
val keyHex = Hex.encodeHexString(key)
val valueHex = Hex.encodeHexString(value)
assert(keyHex.equals("010600003100000000000000000001000000"))
assert(orphanKeyHex.equals("0706000031000000000000000000"))
}

private def getRow(): Row = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,14 @@ class VerticesProcessor(spark: SparkSession,

val spaceVidLen = metaProvider.getSpaceVidLen(space)
val tagItem = metaProvider.getTagItem(space, tagName)
val emptyValue = ByteBuffer.allocate(0).array()

var sstKeyValueData = data
.dropDuplicates(tagConfig.vertexField)
.mapPartitions { iter =>
iter.map { row =>
encodeVertex(row, partitionNum, vidType, spaceVidLen, tagItem, fieldTypeMap)
}
}(Encoders.tuple(Encoders.BINARY, Encoders.BINARY, Encoders.BINARY))
.flatMap(line => {
List((line._1, emptyValue), (line._2, line._3))
})(Encoders.tuple(Encoders.BINARY, Encoders.BINARY))
}(Encoders.tuple(Encoders.BINARY, Encoders.BINARY))

// repartition dataframe according to nebula part, to make sure sst files for one part has no overlap
if (tagConfig.repartitionWithNebula) {
Expand Down Expand Up @@ -245,7 +241,7 @@ class VerticesProcessor(spark: SparkSession,
vidType: VidType.Value,
spaceVidLen: Int,
tagItem: TagItem,
fieldTypeMap: Map[String, Int]): (Array[Byte], Array[Byte], Array[Byte]) = {
fieldTypeMap: Map[String, Int]): (Array[Byte], Array[Byte]) = {
// check if vertex id is valid, if not, throw AssertException
isVertexValid(row, tagConfig, false, vidType == VidType.STRING)

Expand Down Expand Up @@ -279,15 +275,14 @@ class VerticesProcessor(spark: SparkSession,
} else {
vertexId.getBytes()
}
val codec = new NebulaCodecImpl()
val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id)
val orphanVertexKey = codec.orphanVertexKey(spaceVidLen, partitionId, vidBytes)
val codec = new NebulaCodecImpl()
val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id)
val values = for {
property <- fieldKeys if property.trim.length != 0
} yield
extraValueForSST(row, property, fieldTypeMap)
.asInstanceOf[AnyRef]
val vertexValue = codec.encodeTag(tagItem, nebulaKeys.asJava, values.asJava)
(orphanVertexKey, vertexKey, vertexValue)
(vertexKey, vertexValue)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,12 @@ class VerticesProcessorSuite {
val tagItem = new TagItem(1, "person".getBytes(), -1, schema)
val map = getFieldType()

val (orphanKey, key, value) =
val (key, value) =
processClazz.encodeVertex(row, 10, VidType.STRING, 10, tagItem, map)

val keyHex = Hex.encodeHexString(key)
val orphenKeyHex = Hex.encodeHexString(orphanKey)
val valueHex = Hex.encodeHexString(value)
val keyHex = Hex.encodeHexString(key)
val valueHex = Hex.encodeHexString(value)
assert(keyHex.equals("010600003100000000000000000001000000"))
assert(orphenKeyHex.equals("0706000031000000000000000000"))
}

private def getRow(): Row = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,7 @@ class VerticesProcessor(spark: SparkSession,
iter.map { row =>
encodeVertex(row, partitionNum, vidType, spaceVidLen, tagItem, fieldTypeMap)
}
}(Encoders.tuple(Encoders.BINARY, Encoders.BINARY, Encoders.BINARY))
.flatMap(line => {
List((line._1, emptyValue), (line._2, line._3))
})(Encoders.tuple(Encoders.BINARY, Encoders.BINARY))
}(Encoders.tuple(Encoders.BINARY, Encoders.BINARY))

// repartition dataframe according to nebula part, to make sure sst files for one part has no overlap
if (tagConfig.repartitionWithNebula) {
Expand Down Expand Up @@ -245,7 +242,7 @@ class VerticesProcessor(spark: SparkSession,
vidType: VidType.Value,
spaceVidLen: Int,
tagItem: TagItem,
fieldTypeMap: Map[String, Int]): (Array[Byte], Array[Byte], Array[Byte]) = {
fieldTypeMap: Map[String, Int]): (Array[Byte], Array[Byte]) = {
// check if vertex id is valid, if not, throw AssertException
isVertexValid(row, tagConfig, false, vidType == VidType.STRING)

Expand Down Expand Up @@ -279,15 +276,14 @@ class VerticesProcessor(spark: SparkSession,
} else {
vertexId.getBytes()
}
val codec = new NebulaCodecImpl()
val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id)
val orphanVertexKey = codec.orphanVertexKey(spaceVidLen, partitionId, vidBytes)
val codec = new NebulaCodecImpl()
val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id)
val values = for {
property <- fieldKeys if property.trim.length != 0
} yield
extraValueForSST(row, property, fieldTypeMap)
.asInstanceOf[AnyRef]
val vertexValue = codec.encodeTag(tagItem, nebulaKeys.asJava, values.asJava)
(orphanVertexKey, vertexKey, vertexValue)
(vertexKey, vertexValue)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,12 @@ class VerticesProcessorSuite {
val tagItem = new TagItem(1, "person".getBytes(), -1, schema)
val map = getFieldType()

val (orphanKey, key, value) =
val (key, value) =
processClazz.encodeVertex(row, 10, VidType.STRING, 10, tagItem, map)

val keyHex = Hex.encodeHexString(key)
val orphanKeyHex = Hex.encodeHexString(orphanKey)
val valueHex = Hex.encodeHexString(value)
val keyHex = Hex.encodeHexString(key)
val valueHex = Hex.encodeHexString(value)
assert(keyHex.equals("010600003100000000000000000001000000"))
assert(orphanKeyHex.equals("0706000031000000000000000000"))
}

private def getRow(): Row = {
Expand Down

0 comments on commit 775f7e7

Please sign in to comment.