Skip to content

Commit

Permalink
edge enhancement part 2
Browse files Browse the repository at this point in the history
  • Loading branch information
derrickoswald committed May 21, 2016
1 parent 1941659 commit 35c60bc
Showing 1 changed file with 126 additions and 35 deletions.
161 changes: 126 additions & 35 deletions src/main/scala/ch/ninecode/cim/CIMRelation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ import org.apache.spark.sql.types.StructType

import ch.ninecode._

case class Pair (id_equ: String, var left: Terminal = null, var right: Terminal = null)
case class Edge (id_seq_1: String, id_seq_2: String, id_equ: String, container: String, length: Double, voltage: String, typ: String, normallyOpen: Boolean)
class Pair (val id_equ: String, var left: Terminal = null, var right: Terminal = null) extends Serializable
class PreEdge (var id_seq_1: String, var id_seq_2: String, var id_equ: String, var container: String, var length: Double, var voltage: String, var typ: String, var normalOpen: Boolean, var location: String) extends Serializable
class Extremum (val id_loc: String, var min_index: Int, var x1 : Double, var y1 : Double, var max_index: Int, var x2 : Double, var y2 : Double) extends Serializable
case class Edge (id_seq_1: String, id_seq_2: String, id_equ: String, container: String, length: Double, voltage: String, typ: String, normalOpen: Boolean, x1: Double, y1: Double, x2: Double, y2: Double)

class CIMRelation(
override val paths: Array[String],
Expand Down Expand Up @@ -152,7 +154,10 @@ class CIMRelation(
sqlContext.createDataFrame (rdd.collect ({ case x: Element if x.getClass () == classOf[BaseVoltage] => x.asInstanceOf[BaseVoltage]})).registerTempTable ("Voltage")
sqlContext.createDataFrame (rdd.collect ({ case x: Element if x.getClass () == classOf[CoordinateSystem] => x.asInstanceOf[CoordinateSystem]})).registerTempTable ("CoordinateSystem")
sqlContext.createDataFrame (rdd.collect ({ case x: Element if x.getClass () == classOf[Location] => x.asInstanceOf[Location]})).registerTempTable ("Location")
sqlContext.createDataFrame (rdd.collect ({ case x: Element if x.getClass () == classOf[PositionPoint] => x.asInstanceOf[PositionPoint]})).registerTempTable ("PositionPoint")
val points = rdd.collect ({ case x: Element if x.getClass () == classOf[PositionPoint] => x.asInstanceOf[PositionPoint]})
points.setName ("Points")
points.cache ()
sqlContext.createDataFrame (points).registerTempTable ("PositionPoint")
sqlContext.createDataFrame (rdd.collect ({ case x: Element if x.getClass () == classOf[Asset] => x.asInstanceOf[Asset]})).registerTempTable ("Asset")
sqlContext.createDataFrame (rdd.collect ({ case x: Element if x.getClass () == classOf[EnergyConsumer] => x.asInstanceOf[EnergyConsumer]})).registerTempTable ("Consumer")
val terminals = rdd.collect ({ case x: Element if x.getClass () == classOf[Terminal] => x.asInstanceOf[Terminal]})
Expand Down Expand Up @@ -196,7 +201,7 @@ class CIMRelation(
val pair_seq_op = (l: Pair /* null */, r: Terminal)
{
if (null == l)
Pair (r.equipment, r)
new Pair (r.equipment, r)
else
{
if (null != l.right)
Expand Down Expand Up @@ -232,7 +237,8 @@ class CIMRelation(
var length = 0.0
var voltage = ""
var typ = ""
var normallyOpen = true
var normalOpen = false
var location = ""
Some (e) match
{
case Some(o) if o.getClass () == classOf[PSRType] => { }
Expand All @@ -251,23 +257,27 @@ class CIMRelation(
{
val ec = o.asInstanceOf[EnergyConsumer]
voltage = ec.voltage
location = ec.location
}
case Some(o) if o.getClass () == classOf[Terminal] => { }
case Some(o) if o.getClass () == classOf[BusbarInfo] => { }
case Some(o) if o.getClass () == classOf[BusbarSection] =>
{
val bs = o.asInstanceOf[BusbarSection]
voltage = bs.voltage
location = bs.location
}
case Some(o) if o.getClass () == classOf[Connector] =>
{
val c = o.asInstanceOf[Connector]
voltage = c.voltage
location = c.location
}
case Some(o) if o.getClass () == classOf[Junction] => { }
{
val j = o.asInstanceOf[Junction]
voltage = j.voltage
location = j.location
}
case Some(o) if o.getClass () == classOf[CableInfo] => { }
case Some(o) if o.getClass () == classOf[ACLineSegment] =>
Expand All @@ -276,14 +286,16 @@ class CIMRelation(
length = ac.len.toDouble
voltage = ac.voltage
typ = ac.name
location = ac.location
}
case Some(o) if o.getClass () == classOf[ACLineSegmentPhase] => { }
case Some(o) if o.getClass () == classOf[SwitchInfo] => { }
case Some(o) if o.getClass () == classOf[Switch] =>
{
val s = o.asInstanceOf[Switch]
voltage = s.voltage
normallyOpen = s.normalOpen
normalOpen = s.normalOpen
location = s.location
}
case Some(o) if o.getClass () == classOf[PowerTransformerInfo] => { }
case Some(o) if o.getClass () == classOf[TransformerTankInfo] => { }
Expand All @@ -292,6 +304,7 @@ class CIMRelation(
{
val t = o.asInstanceOf[PowerTransformer]
typ = t.name
location = t.location
}
case Some(o) if o.getClass () == classOf[TransformerTank] => { }
case Some(o) if o.getClass () == classOf[TransformerTankEnd] =>
Expand All @@ -303,19 +316,22 @@ class CIMRelation(
{
val f = o.asInstanceOf[Fuse]
voltage = f.voltage
normallyOpen = f.normalOpen
normalOpen = f.normalOpen
location = f.location
}
case Some(o) if o.getClass () == classOf[Disconnector] => { }
{
val d = o.asInstanceOf[Disconnector]
voltage = d.voltage
normallyOpen = d.normalOpen
normalOpen = d.normalOpen
location = d.location
}
case Some(o) if o.getClass () == classOf[GroundDisconnector] =>
{
val gd = o.asInstanceOf[GroundDisconnector]
voltage = gd.voltage
normallyOpen = gd.normalOpen
normalOpen = gd.normalOpen
location = gd.location
}
case Some(o) if o.getClass () == classOf[ProtectionEquipment] => { }
case Some(o) if o.getClass () == classOf[CurrentTransformer] => { }
Expand All @@ -332,30 +348,32 @@ class CIMRelation(
{
val up = o.asInstanceOf[UsagePoint]
voltage = up.nominalvoltage
location = up.usagepointlocation
}

case Some(o) if o.getClass () == classOf[NameTypeAuthority] => { }
case Some(o) if o.getClass () == classOf[NameType] => { }
case Some(o) if o.getClass () == classOf[Name] => { }
case Some(o) if o.getClass () == classOf[UserAttribute] => { }
}
Edge (
new PreEdge (
p.left.id,
if (null != p.right) p.right.id else "",
p.left.equipment,
"",
length,
voltage,
typ,
normallyOpen)
normalOpen,
location)
}
case (s: String, (p: Pair, None))
// shouldn't happen of course:
=> Edge ("", "", "", "", 0.0, "", "", false)
case (s: String, (p: Pair, None)) =>
// shouldn't happen of course: if it does we have a terminal with an equipment reference to non-existant equipment
new PreEdge ("", "", "", "", 0.0, "", "", false, "")
}
}
}
var edges = terms.keyBy (_.id_equ).leftOuterJoin (rdd.keyBy (_.key)).map (term_op)
var preedges = terms.keyBy (_.id_equ).leftOuterJoin (rdd.keyBy (_.key)).map (term_op)

// change terminal id to node id
val left_op =
Expand All @@ -364,28 +382,34 @@ class CIMRelation(
{
j match
{
case (s: String, (e:Edge, Some (t:Terminal)))
=> Edge (if (t.connectivity != null) t.connectivity else e.id_seq_1, e.id_seq_2, e.id_equ, e.container, e.length, e.voltage, e.typ, e.normallyOpen) // ToDo: avoid reallocation here
case (s: String, (e:Edge, None))
=> e
case (s: String, (e:PreEdge, Some (t:Terminal))) =>
{
if (t.connectivity != null)
e.id_seq_1 = t.connectivity
e
}
case (s: String, (e:PreEdge, None)) => e
}
}
}
edges = edges.keyBy (_.id_seq_1).leftOuterJoin (terminals.keyBy (_.id)).map (left_op)
preedges = preedges.keyBy (_.id_seq_1).leftOuterJoin (terminals.keyBy (_.id)).map (left_op)
val right_op =
{
j: Any =>
{
j match
{
case (s: String, (e:Edge, Some (t:Terminal)))
=> Edge (e.id_seq_1, if (t.connectivity != null) t.connectivity else e.id_seq_2, e.id_equ, e.container, e.length, e.voltage, e.typ, e.normallyOpen) // ToDo: avoid reallocation here
case (s: String, (e:Edge, None))
=> e
case (s: String, (e:PreEdge, Some (t:Terminal))) =>
{
if (t.connectivity != null)
e.id_seq_2 = t.connectivity
e
}
case (s: String, (e:PreEdge, None)) => e
}
}
}
edges = edges.keyBy (_.id_seq_2).leftOuterJoin (terminals.keyBy (_.id)).map (right_op)
preedges = preedges.keyBy (_.id_seq_2).leftOuterJoin (terminals.keyBy (_.id)).map (right_op)

// change node id to node name
val left_op2 =
Expand All @@ -394,28 +418,95 @@ class CIMRelation(
{
j match
{
case (s: String, (e:Edge, Some (c:ConnectivityNode)))
=> Edge (if (c.name != null) c.name else e.id_seq_1, e.id_seq_2, e.id_equ, if (c.container != null) c.container else e.container, e.length, e.voltage, e.typ, e.normallyOpen) // ToDo: avoid reallocation here
case (s: String, (e:Edge, None))
=> e
case (s: String, (e:PreEdge, Some (c:ConnectivityNode))) =>
{
if (c.name != null)
e.id_seq_1 = c.name
if (c.container != null)
e.container = c.container
e
}
case (s: String, (e:PreEdge, None)) => e
}
}
}
edges = edges.keyBy (_.id_seq_1).leftOuterJoin (connectivitynodes.keyBy (_.id)).map (left_op2)
preedges = preedges.keyBy (_.id_seq_1).leftOuterJoin (connectivitynodes.keyBy (_.id)).map (left_op2)
val right_op2 = // ToDo: equipment with two containers should be deterministically assigned to the correct container
{
j: Any =>
{
j match
{
case (s: String, (e:Edge, Some (c:ConnectivityNode)))
=> Edge (e.id_seq_1, if (c.name != null) c.name else e.id_seq_2, e.id_equ, if (c.container != null) c.container else e.container, e.length, e.voltage, e.typ, e.normallyOpen) // ToDo: avoid reallocation here
case (s: String, (e:Edge, None))
=> e
case (s: String, (e:PreEdge, Some (c:ConnectivityNode))) =>
{
if (c.name != null)
e.id_seq_2 = c.name
if (c.container != null)
e.container = c.container
e
}
case (s: String, (e:PreEdge, None)) => e
}
}
}
preedges = preedges.keyBy (_.id_seq_2).leftOuterJoin (connectivitynodes.keyBy (_.id)).map (right_op2)

// get start and end coordinates of each location
val point_seq_op = (x: Extremum /* null */, p: PositionPoint)
{
if (null == x)
new Extremum (p.location, p.sequence, p.x, p.y, p.sequence, p.x, p.y)
else
{
if (p.sequence < x.min_index)
{
x.min_index = p.sequence
x.x1 = p.x
x.y1 = p.y
}
else if (p.sequence > x.max_index)
{
x.max_index = p.sequence
x.x2 = p.x
x.y2 = p.y
}
x
}
}
val point_comb_op = (l: Extremum, r: Extremum)
{
if (r.min_index < l.min_index)
{
l.min_index = r.min_index
l.x1 = r.x1
l.y1 = r.y1
}
if (r.max_index > l.max_index)
{
l.max_index = r.max_index
l.x2 = r.x2
l.y2 = r.y2
}
l
}
val extremum = points.keyBy (_.location).aggregateByKey (null: Extremum) (point_seq_op, point_comb_op).values

// join coordinates with edges using equipment
val edge_op =
{
j: Any =>
{
j match
{
case (l: String, (e:PreEdge, Some (x:Extremum))) =>
Edge (e.id_seq_1, e.id_seq_2, e.id_equ, e.container, e.length, e.voltage, e.typ, e.normalOpen, x.x1, x.y1, x.x2, x.y2)
case (l: String, (e:PreEdge, None)) =>
// shouldn't happen of course: if it does we have an equipment with a location reference to non-existant location
Edge (e.id_seq_1, e.id_seq_2, e.id_equ, e.container, e.length, e.voltage, e.typ, e.normalOpen, 0.0, 0.0, 0.0, 0.0)
}
}
}
edges = edges.keyBy (_.id_seq_2).leftOuterJoin (connectivitynodes.keyBy (_.id)).map (right_op2)
val edges = preedges.keyBy (_.location).leftOuterJoin (extremum.keyBy (_.id_loc)).map (edge_op)

// persist it so the sample can get at it
edges.setName ("Edges")
Expand Down

0 comments on commit 35c60bc

Please sign in to comment.