From fbce0a9f852c945f6bf63528500249c14f37799f Mon Sep 17 00:00:00 2001 From: Derrick Oswald Date: Mon, 23 May 2016 14:50:12 +0200 Subject: [PATCH] fixes for V3 --- .../scala/ch/ninecode/cim/CIMRelation.scala | 558 +++++++++--------- 1 file changed, 280 insertions(+), 278 deletions(-) diff --git a/src/main/scala/ch/ninecode/cim/CIMRelation.scala b/src/main/scala/ch/ninecode/cim/CIMRelation.scala index d62b0fc56..ac3ada3b2 100644 --- a/src/main/scala/ch/ninecode/cim/CIMRelation.scala +++ b/src/main/scala/ch/ninecode/cim/CIMRelation.scala @@ -128,7 +128,8 @@ class CIMRelation( { // make a config val configuration = new Configuration (sqlContext.sparkContext.hadoopConfiguration) - configuration.set ("mapreduce.input.fileinputformat.inputdir", inputFiles (0).getPath.toString); + val filename = inputFiles (0).getPath.toString + configuration.set ("mapreduce.input.fileinputformat.inputdir", filename); val rdd = sqlContext.sparkContext.newAPIHadoopRDD ( configuration, @@ -195,326 +196,327 @@ class CIMRelation( sqlContext.createDataFrame (rdd.collect ({ case x: Element if x.getClass () == classOf[CustomerAgreement] => x.asInstanceOf[CustomerAgreement]})).registerTempTable ("CustomerAgreement") sqlContext.createDataFrame (rdd.collect ({ case x: Element if x.getClass () == classOf[UsagePoint] => x.asInstanceOf[UsagePoint]})).registerTempTable ("UsagePoint") - // set up edge graph - - // first get the pairs of terminals keyed by equipment - val pair_seq_op = (l: Pair /* null */, r: Terminal) ⇒ + // set up edge graph if it's not an ISU file + if (!filename.contains ("ISU")) { - if (null == l) - new Pair (r.equipment, r) - else + // first get the pairs of terminals keyed by equipment + val pair_seq_op = (l: Pair /* null */, r: Terminal) ⇒ { - if (null != l.right) - throw new IllegalStateException ("three terminals") - l.right = r - l + if (null == l) + new Pair (r.equipment, r) + else + { +// if (null != l.right) +// throw new IllegalStateException ("three terminals") + l.right = r + l + } } - } - val pair_comb_op = (l: Pair, r: Pair) ⇒ - { - if ((null != l.right) || (null != r.right)) - throw new IllegalStateException ("three terminals") - if (1 == l.left.sequence) - l.right = r.left - else - { // swap so seq#1 is left - l.right = l.left - l.left = r.left + val pair_comb_op = (l: Pair, r: Pair) ⇒ + { +// if ((null != l.right) || (null != r.right)) +// throw new IllegalStateException ("three terminals") + if (1 == l.left.sequence) + l.right = r.left + else + { // swap so seq#1 is left + l.right = l.left + l.left = r.left + } + l } - l - } - val terms = terminals.keyBy (_.equipment).aggregateByKey (null: Pair) (pair_seq_op, pair_comb_op).values + val terms = terminals.keyBy (_.equipment).aggregateByKey (null: Pair) (pair_seq_op, pair_comb_op).values - // next, map the pairs to edges - val term_op = - { - j: Any => + // next, map the pairs to edges + val term_op = { - j match + j: Any => { - case (s: String, (p: Pair, Some (e:Element))) => - { - var length = 0.0 - var voltage = "" - var typ = "" - var normalOpen = false - var location = "" - Some (e) match + j match + { + case (s: String, (p: Pair, Some (e:Element))) => { - case Some(o) if o.getClass () == classOf[PSRType] => { } - case Some(o) if o.getClass () == classOf[SvStatus] => { } - case Some(o) if o.getClass () == classOf[Line] => { } - case Some(o) if o.getClass () == classOf[Substation] => { } - case Some(o) if o.getClass () == classOf[VoltageLevel] => { } - case Some(o) if o.getClass () == classOf[Bay] => { } - case Some(o) if o.getClass () == classOf[ConnectivityNode] => { } - case Some(o) if o.getClass () == classOf[BaseVoltage] => { } - case Some(o) if o.getClass () == classOf[CoordinateSystem] => { } - case Some(o) if o.getClass () == classOf[Location] => { } - case Some(o) if o.getClass () == classOf[PositionPoint] => { }; - case Some(o) if o.getClass () == classOf[Asset] => { } - case Some(o) if o.getClass () == classOf[EnergyConsumer] => - { - val ec = o.asInstanceOf[EnergyConsumer] - voltage = ec.voltage - location = ec.location - } - case Some(o) if o.getClass () == classOf[Terminal] => { } - case Some(o) if o.getClass () == classOf[BusbarInfo] => { } - case Some(o) if o.getClass () == classOf[BusbarSection] => - { - val bs = o.asInstanceOf[BusbarSection] - voltage = bs.voltage - location = bs.location - } - case Some(o) if o.getClass () == classOf[Connector] => - { - val c = o.asInstanceOf[Connector] - voltage = c.voltage - location = c.location - } - case Some(o) if o.getClass () == classOf[Junction] => { } - { - val j = o.asInstanceOf[Junction] - voltage = j.voltage - location = j.location - } - case Some(o) if o.getClass () == classOf[CableInfo] => { } - case Some(o) if o.getClass () == classOf[ACLineSegment] => - { - val ac = o.asInstanceOf[ACLineSegment] - length = ac.len.toDouble - voltage = ac.voltage - typ = ac.name - location = ac.location - } - case Some(o) if o.getClass () == classOf[ACLineSegmentPhase] => { } - case Some(o) if o.getClass () == classOf[SwitchInfo] => { } - case Some(o) if o.getClass () == classOf[Switch] => - { - val s = o.asInstanceOf[Switch] - voltage = s.voltage - normalOpen = s.normalOpen - location = s.location - } - case Some(o) if o.getClass () == classOf[PowerTransformerInfo] => { } - case Some(o) if o.getClass () == classOf[TransformerTankInfo] => { } - case Some(o) if o.getClass () == classOf[TransformerEndInfo] => { } - case Some(o) if o.getClass () == classOf[PowerTransformer] => - { - val t = o.asInstanceOf[PowerTransformer] - typ = t.name - location = t.location - } - case Some(o) if o.getClass () == classOf[TransformerTank] => { } - case Some(o) if o.getClass () == classOf[TransformerTankEnd] => - { - val te = o.asInstanceOf[TransformerTankEnd] - voltage = te.voltage - } - case Some(o) if o.getClass () == classOf[Fuse] => - { - val f = o.asInstanceOf[Fuse] - voltage = f.voltage - normalOpen = f.normalOpen - location = f.location - } - case Some(o) if o.getClass () == classOf[Disconnector] => { } - { - val d = o.asInstanceOf[Disconnector] - voltage = d.voltage - normalOpen = d.normalOpen - location = d.location - } - case Some(o) if o.getClass () == classOf[GroundDisconnector] => - { - val gd = o.asInstanceOf[GroundDisconnector] - voltage = gd.voltage - normalOpen = gd.normalOpen - location = gd.location - } - case Some(o) if o.getClass () == classOf[ProtectionEquipment] => { } - case Some(o) if o.getClass () == classOf[CurrentTransformer] => { } - case Some(o) if o.getClass () == classOf[CurrentRelay] => { } - case Some(o) if o.getClass () == classOf[SolarGeneratingUnit] => { } - - case Some(o) if o.getClass () == classOf[ServiceLocation] => { } - case Some(o) if o.getClass () == classOf[UsagePointLocation] => { } - case Some(o) if o.getClass () == classOf[ServiceCategory] => { } - case Some(o) if o.getClass () == classOf[PricingStructure] => { } - case Some(o) if o.getClass () == classOf[Customer] => { } - case Some(o) if o.getClass () == classOf[CustomerAgreement] => { } - case Some(o) if o.getClass () == classOf[UsagePoint] => - { - val up = o.asInstanceOf[UsagePoint] - voltage = up.nominalvoltage - location = up.usagepointlocation - } - - case Some(o) if o.getClass () == classOf[NameTypeAuthority] => { } - case Some(o) if o.getClass () == classOf[NameType] => { } - case Some(o) if o.getClass () == classOf[Name] => { } - case Some(o) if o.getClass () == classOf[UserAttribute] => { } + var length = 0.0 + var voltage = "" + var typ = "" + var normalOpen = false + var location = "" + Some (e) match + { + case Some(o) if o.getClass () == classOf[PSRType] => { } + case Some(o) if o.getClass () == classOf[SvStatus] => { } + case Some(o) if o.getClass () == classOf[Line] => { } + case Some(o) if o.getClass () == classOf[Substation] => { } + case Some(o) if o.getClass () == classOf[VoltageLevel] => { } + case Some(o) if o.getClass () == classOf[Bay] => { } + case Some(o) if o.getClass () == classOf[ConnectivityNode] => { } + case Some(o) if o.getClass () == classOf[BaseVoltage] => { } + case Some(o) if o.getClass () == classOf[CoordinateSystem] => { } + case Some(o) if o.getClass () == classOf[Location] => { } + case Some(o) if o.getClass () == classOf[PositionPoint] => { }; + case Some(o) if o.getClass () == classOf[Asset] => { } + case Some(o) if o.getClass () == classOf[EnergyConsumer] => + { + val ec = o.asInstanceOf[EnergyConsumer] + voltage = ec.voltage + location = ec.location + } + case Some(o) if o.getClass () == classOf[Terminal] => { } + case Some(o) if o.getClass () == classOf[BusbarInfo] => { } + case Some(o) if o.getClass () == classOf[BusbarSection] => + { + val bs = o.asInstanceOf[BusbarSection] + voltage = bs.voltage + location = bs.location + } + case Some(o) if o.getClass () == classOf[Connector] => + { + val c = o.asInstanceOf[Connector] + voltage = c.voltage + location = c.location + } + case Some(o) if o.getClass () == classOf[Junction] => { } + { + val j = o.asInstanceOf[Junction] + voltage = j.voltage + location = j.location + } + case Some(o) if o.getClass () == classOf[CableInfo] => { } + case Some(o) if o.getClass () == classOf[ACLineSegment] => + { + val ac = o.asInstanceOf[ACLineSegment] + length = ac.len.toDouble + voltage = ac.voltage + typ = ac.name + location = ac.location + } + case Some(o) if o.getClass () == classOf[ACLineSegmentPhase] => { } + case Some(o) if o.getClass () == classOf[SwitchInfo] => { } + case Some(o) if o.getClass () == classOf[Switch] => + { + val s = o.asInstanceOf[Switch] + voltage = s.voltage + normalOpen = s.normalOpen + location = s.location + } + case Some(o) if o.getClass () == classOf[PowerTransformerInfo] => { } + case Some(o) if o.getClass () == classOf[TransformerTankInfo] => { } + case Some(o) if o.getClass () == classOf[TransformerEndInfo] => { } + case Some(o) if o.getClass () == classOf[PowerTransformer] => + { + val t = o.asInstanceOf[PowerTransformer] + typ = t.name + location = t.location + } + case Some(o) if o.getClass () == classOf[TransformerTank] => { } + case Some(o) if o.getClass () == classOf[TransformerTankEnd] => + { + val te = o.asInstanceOf[TransformerTankEnd] + voltage = te.voltage + } + case Some(o) if o.getClass () == classOf[Fuse] => + { + val f = o.asInstanceOf[Fuse] + voltage = f.voltage + normalOpen = f.normalOpen + location = f.location + } + case Some(o) if o.getClass () == classOf[Disconnector] => { } + { + val d = o.asInstanceOf[Disconnector] + voltage = d.voltage + normalOpen = d.normalOpen + location = d.location + } + case Some(o) if o.getClass () == classOf[GroundDisconnector] => + { + val gd = o.asInstanceOf[GroundDisconnector] + voltage = gd.voltage + normalOpen = gd.normalOpen + location = gd.location + } + case Some(o) if o.getClass () == classOf[ProtectionEquipment] => { } + case Some(o) if o.getClass () == classOf[CurrentTransformer] => { } + case Some(o) if o.getClass () == classOf[CurrentRelay] => { } + case Some(o) if o.getClass () == classOf[SolarGeneratingUnit] => { } + + case Some(o) if o.getClass () == classOf[ServiceLocation] => { } + case Some(o) if o.getClass () == classOf[UsagePointLocation] => { } + case Some(o) if o.getClass () == classOf[ServiceCategory] => { } + case Some(o) if o.getClass () == classOf[PricingStructure] => { } + case Some(o) if o.getClass () == classOf[Customer] => { } + case Some(o) if o.getClass () == classOf[CustomerAgreement] => { } + case Some(o) if o.getClass () == classOf[UsagePoint] => + { + val up = o.asInstanceOf[UsagePoint] + voltage = up.nominalvoltage + location = up.usagepointlocation + } + + case Some(o) if o.getClass () == classOf[NameTypeAuthority] => { } + case Some(o) if o.getClass () == classOf[NameType] => { } + case Some(o) if o.getClass () == classOf[Name] => { } + case Some(o) if o.getClass () == classOf[UserAttribute] => { } + } + new PreEdge ( + p.left.id, + if (null != p.right) p.right.id else "", + p.left.equipment, + "", + length, + voltage, + typ, + normalOpen, + location) } - new PreEdge ( - p.left.id, - if (null != p.right) p.right.id else "", - p.left.equipment, - "", - length, - voltage, - typ, - normalOpen, - location) - } - case (s: String, (p: Pair, None)) => - // shouldn't happen of course: if it does we have a terminal with an equipment reference to non-existant equipment - new PreEdge ("", "", "", "", 0.0, "", "", false, "") + case (s: String, (p: Pair, None)) => + // shouldn't happen of course: if it does we have a terminal with an equipment reference to non-existant equipment + new PreEdge ("", "", "", "", 0.0, "", "", false, "") + } } } - } - var preedges = terms.keyBy (_.id_equ).leftOuterJoin (rdd.keyBy (_.key)).map (term_op) + var preedges = terms.keyBy (_.id_equ).leftOuterJoin (rdd.keyBy (_.key)).map (term_op) - // change terminal id to node id - val left_op = - { - j: Any => + // change terminal id to node id + val left_op = { - j match + j: Any => { - case (s: String, (e:PreEdge, Some (t:Terminal))) => - { - if (t.connectivity != null) - e.id_seq_1 = t.connectivity - e - } - case (s: String, (e:PreEdge, None)) => e + j match + { + case (s: String, (e:PreEdge, Some (t:Terminal))) => + { + if (t.connectivity != null) + e.id_seq_1 = t.connectivity + e + } + case (s: String, (e:PreEdge, None)) => e + } } } - } - preedges = preedges.keyBy (_.id_seq_1).leftOuterJoin (terminals.keyBy (_.id)).map (left_op) - val right_op = - { - j: Any => + preedges = preedges.keyBy (_.id_seq_1).leftOuterJoin (terminals.keyBy (_.id)).map (left_op) + val right_op = { - j match + j: Any => { - case (s: String, (e:PreEdge, Some (t:Terminal))) => - { - if (t.connectivity != null) - e.id_seq_2 = t.connectivity - e - } - case (s: String, (e:PreEdge, None)) => e + j match + { + case (s: String, (e:PreEdge, Some (t:Terminal))) => + { + if (t.connectivity != null) + e.id_seq_2 = t.connectivity + e + } + case (s: String, (e:PreEdge, None)) => e + } } } - } - preedges = preedges.keyBy (_.id_seq_2).leftOuterJoin (terminals.keyBy (_.id)).map (right_op) + preedges = preedges.keyBy (_.id_seq_2).leftOuterJoin (terminals.keyBy (_.id)).map (right_op) - // change node id to node name - val left_op2 = - { - j: Any => + // change node id to node name + val left_op2 = { - j match + j: Any => { - case (s: String, (e:PreEdge, Some (c:ConnectivityNode))) => - { - if (c.name != null) - e.id_seq_1 = c.name - if (c.container != null) - e.container = c.container - e - } - case (s: String, (e:PreEdge, None)) => e + j match + { + case (s: String, (e:PreEdge, Some (c:ConnectivityNode))) => + { + if (c.name != null) + e.id_seq_1 = c.name + if (c.container != null) + e.container = c.container + e + } + case (s: String, (e:PreEdge, None)) => e + } } } - } - preedges = preedges.keyBy (_.id_seq_1).leftOuterJoin (connectivitynodes.keyBy (_.id)).map (left_op2) - val right_op2 = // ToDo: equipment with two containers should be deterministically assigned to the correct container - { - j: Any => + preedges = preedges.keyBy (_.id_seq_1).leftOuterJoin (connectivitynodes.keyBy (_.id)).map (left_op2) + val right_op2 = // ToDo: equipment with two containers should be deterministically assigned to the correct container { - j match + j: Any => { - case (s: String, (e:PreEdge, Some (c:ConnectivityNode))) => - { - if (c.name != null) - e.id_seq_2 = c.name - if (c.container != null) - e.container = c.container - e - } - case (s: String, (e:PreEdge, None)) => e + j match + { + case (s: String, (e:PreEdge, Some (c:ConnectivityNode))) => + { + if (c.name != null) + e.id_seq_2 = c.name + if (c.container != null) + e.container = c.container + e + } + case (s: String, (e:PreEdge, None)) => e + } } } - } - preedges = preedges.keyBy (_.id_seq_2).leftOuterJoin (connectivitynodes.keyBy (_.id)).map (right_op2) + preedges = preedges.keyBy (_.id_seq_2).leftOuterJoin (connectivitynodes.keyBy (_.id)).map (right_op2) - // get start and end coordinates of each location - val point_seq_op = (x: Extremum /* null */, p: PositionPoint) ⇒ - { - if (null == x) - new Extremum (p.location, p.sequence, p.x, p.y, p.sequence, p.x, p.y) - else + // get start and end coordinates of each location + val point_seq_op = (x: Extremum /* null */, p: PositionPoint) ⇒ { - if (p.sequence < x.min_index) + if (null == x) + new Extremum (p.location, p.sequence, p.x, p.y, p.sequence, p.x, p.y) + else { - x.min_index = p.sequence - x.x1 = p.x - x.y1 = p.y + if (p.sequence < x.min_index) + { + x.min_index = p.sequence + x.x1 = p.x + x.y1 = p.y + } + else if (p.sequence > x.max_index) + { + x.max_index = p.sequence + x.x2 = p.x + x.y2 = p.y + } + x } - else if (p.sequence > x.max_index) - { - x.max_index = p.sequence - x.x2 = p.x - x.y2 = p.y - } - x - } - } - val point_comb_op = (l: Extremum, r: Extremum) ⇒ - { - if (r.min_index < l.min_index) - { - l.min_index = r.min_index - l.x1 = r.x1 - l.y1 = r.y1 } - if (r.max_index > l.max_index) + val point_comb_op = (l: Extremum, r: Extremum) ⇒ { - l.max_index = r.max_index - l.x2 = r.x2 - l.y2 = r.y2 + if (r.min_index < l.min_index) + { + l.min_index = r.min_index + l.x1 = r.x1 + l.y1 = r.y1 + } + if (r.max_index > l.max_index) + { + l.max_index = r.max_index + l.x2 = r.x2 + l.y2 = r.y2 + } + l } - l - } - val extremum = points.keyBy (_.location).aggregateByKey (null: Extremum) (point_seq_op, point_comb_op).values + val extremum = points.keyBy (_.location).aggregateByKey (null: Extremum) (point_seq_op, point_comb_op).values - // join coordinates with edges using equipment - val edge_op = - { - j: Any => + // join coordinates with edges using equipment + val edge_op = { - j match + j: Any => { - case (l: String, (e:PreEdge, Some (x:Extremum))) => - Edge (e.id_seq_1, e.id_seq_2, e.id_equ, e.container, e.length, e.voltage, e.typ, e.normalOpen, x.x1, x.y1, x.x2, x.y2) - case (l: String, (e:PreEdge, None)) => - // shouldn't happen of course: if it does we have an equipment with a location reference to non-existant location - Edge (e.id_seq_1, e.id_seq_2, e.id_equ, e.container, e.length, e.voltage, e.typ, e.normalOpen, 0.0, 0.0, 0.0, 0.0) + j match + { + case (l: String, (e:PreEdge, Some (x:Extremum))) => + Edge (e.id_seq_1, e.id_seq_2, e.id_equ, e.container, e.length, e.voltage, e.typ, e.normalOpen, x.x1, x.y1, x.x2, x.y2) + case (l: String, (e:PreEdge, None)) => + // shouldn't happen of course: if it does we have an equipment with a location reference to non-existant location + Edge (e.id_seq_1, e.id_seq_2, e.id_equ, e.container, e.length, e.voltage, e.typ, e.normalOpen, 0.0, 0.0, 0.0, 0.0) + } } } - } - val edges = preedges.keyBy (_.location).leftOuterJoin (extremum.keyBy (_.id_loc)).map (edge_op) + val edges = preedges.keyBy (_.location).leftOuterJoin (extremum.keyBy (_.id_loc)).map (edge_op) - // persist it so the sample can get at it - edges.setName ("Edges") - edges.cache () - - // expose it - sqlContext.createDataFrame (edges).registerTempTable ("edges") + // persist it so the sample can get at it + edges.setName ("Edges") + edges.cache () + // expose it + sqlContext.createDataFrame (edges).registerTempTable ("edges") + } } else logError ("ch.ninecode.cim.DefaultSource.buildScan was given an input list containing no files")