Skip to content

Commit

Permalink
update to Spark 2.2.2
Browse files Browse the repository at this point in the history
  • Loading branch information
derrickoswald committed Jul 21, 2018
1 parent 9325653 commit ed591b8
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 8 deletions.
11 changes: 9 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<groupId>ch.ninecode.cim</groupId>
<artifactId>CIMReader</artifactId>
<!-- version>${version.dependency.scala}-${version.dependency.spark}-CIMREADER_VERSION</version -->
<version>2.11-2.2.1-2.8.0</version>
<version>2.11-2.2.2-2.9.0</version>
<name>${project.artifactId}</name>
<description>Expose CIM data files as Spark RDD</description>
<inceptionYear>2015</inceptionYear>
Expand Down Expand Up @@ -69,12 +69,19 @@
</repository>
</distributionManagement>
<properties>

<encoding>UTF-8</encoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>

<!-- Scala versions -->
<version.dependency.scala>2.11</version.dependency.scala>
<version.dependency.scalalibrary>2.11.8</version.dependency.scalalibrary>
<version.dependency.spark>2.2.1</version.dependency.spark>

<!-- Spark version -->
<version.dependency.spark>2.2.2</version.dependency.spark>

<!-- test versions -->
<version.dependency.scopt>3.6.0</version.dependency.scopt>
<version.dependency.junit>4.12</version.dependency.junit>
<version.dependency.scalatest>3.0.3</version.dependency.scalatest>
Expand Down
1 change: 1 addition & 0 deletions src/main/scala/ch/ninecode/cim/CIMEdgeData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package ch.ninecode.cim
* @param id_seq_2 the mRID of terminal 1 (or N in the case of multi-terminal devices)
* @param id_cn_2 the connectivity node of terminal 1 (or N in the case of multi-terminal devices)
* @param id_equ the [[ch.ninecode.model.ConductingEquipment]] object associated with the terminals
* @param voltage the nominal voltage of the edge
* @param isZero <code>true</code> if there is no electrical difference between the terminals, i.e. a closed switch,
* which means the terminals are the same topological node
* @param isConnected <code>true</code> if there is a connection between the terminals, i.e. a cable,
Expand Down
12 changes: 6 additions & 6 deletions src/main/scala/ch/ninecode/cim/CIMNetworkTopologyProcessor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -575,9 +575,9 @@ with
// get the edges that have different topological nodes on each end
val nodes = graph.vertices.values.keyBy (_.node).distinct // distinct topological nodes
val pairs = graph.vertices.keyBy (_._2.node).join (nodes).map ( x => (x._2._1._1, x._2._2)) // get vertex-node pairs
val b1 = graph.edges.keyBy ((edge) => edge.dstId).join (pairs) // match edge end 1
val b2 = b1.values.keyBy ((edge) => edge._1.srcId).join (pairs) // match edge end 2
val b3 = b2.values.map ((edge) => (edge._1._1, edge._1._2, edge._2)) // simplify
val b1 = graph.edges.keyBy (edge => edge.dstId).join (pairs) // match edge end 1
val b2 = b1.values.keyBy (edge => edge._1.srcId).join (pairs) // match edge end 2
val b3 = b2.values.map (edge => (edge._1._1, edge._1._2, edge._2)) // simplify
val boundaries = b3.filter (boundary) // keep edges with different nodes on each end

// construct the topological graph from the edges
Expand Down Expand Up @@ -637,7 +637,7 @@ with
val elements = get[Element]("Elements").keyBy (_.id)
val terms = get[Terminal].keyBy (_.ConductingEquipment).join (elements).values
// map each graph vertex to the terminals
val vertices = get[ConnectivityNode].map ((x) => (vertex_id (x.id), x))
val vertices = get[ConnectivityNode].map (x => (vertex_id (x.id), x))
val td_plus = graph.vertices.join (vertices).values.filter (_._1.island != 0L).keyBy (_._2.id).leftOuterJoin (terms.keyBy (_._1.ConnectivityNode)).values
val islands = td_plus.groupBy (_._1._1.island).values.map (to_islands)

Expand All @@ -655,14 +655,14 @@ with
TopologicalIsland.subsetter.save (session.sqlContext, new_ti.asInstanceOf[TopologicalIsland.subsetter.rddtype], storage)

val nodes_with_islands = graph.vertices.values.keyBy (_.island).join (islands).values
val nodes = nodes_with_islands.groupBy (_._1.node).map ((x) => (x._1, x._2.head._1, Some (x._2.head._2))).map (to_nodes)
val nodes = nodes_with_islands.groupBy (_._1.node).map (x => (x._1, x._2.head._1, Some (x._2.head._2))).map (to_nodes)
if (debug && log.isDebugEnabled)
log.debug (nodes.count + " nodes")
(nodes, new_ti)
}
else
{
val nodes = graph.vertices.values.groupBy (_.node).map ((x) => (x._1, x._2.head, None)).map (to_nodes)
val nodes = graph.vertices.values.groupBy (_.node).map (x => (x._1, x._2.head, None)).map (to_nodes)
if (debug && log.isDebugEnabled)
log.debug (nodes.count + " nodes")
(nodes, spark.sparkContext.emptyRDD[TopologicalIsland])
Expand Down
35 changes: 35 additions & 0 deletions src/main/scala/ch/ninecode/cim/CIMTopologyOptions.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package ch.ninecode.cim

trait State

case class ForceTrue () extends State
case class ForceFalse () extends State
case class Unforced () extends State

/**
* Topological processing options.
*
* This class determines some of the behaviour of the CIMNetworkTopologyProcessor.
* These options are passed to the processor constructor and cannot be altered dynamically.
*
* @param force_retain_switches Allows override of the behaviour when the processor encounters a Switch
* or a Switch derived class (e.g. Breaker) except for Fuse. The default behaviour of <code>Unforced<c/ode>
* will use the value of the <code>retained</code> attribute to identify an island boundary
* only if the attribute is present in the CIM file and the value is <code>true</code>.
* When set to <code>ForceTrue</code> the behaviour is equivalent to having a <code>retained</code>
* attribute with value <code>true</code> for every instance.
* When set to <code>ForecFalse</code> the behaviour is equivalent to having a <code>retained</code>
* attribute with value <code>false</code> for every instance.
* @param force_retain_fuses Allows override of the behaviour when a Fuse is encountered.
* The same effect as for Switch objects using <code>force_retain_switches</code> but for Fuse objects.
* @param default_switch_closed_state Allows changing the behaviour when the processor encounters a Switch
* that has neither an <code>open</code> attribute, nor <code>normalOpen</code> attribute.
* The default behaviour of <code>ForceTrue<c/ode> is the same as if <code>open</code> and <code>normalOpen</code>
* both specify <code>false</code>.
*/
case class CIMTopologyOptions
(
force_retain_switches: State = Unforced (),
force_retain_fuses: State = Unforced (),
default_switch_closed_state: State = ForceTrue ()
)
1 change: 1 addition & 0 deletions src/main/scala/ch/ninecode/cim/CIMVertexData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import org.apache.spark.graphx.VertexId
* @param island_label a user friendly label for the island
* @param node the minimum (hash code) of equivalent ConnectivityNode (a single topological node)
* @param node_label a user friendly label for the node
* @param voltage the nominal voltage of the node
*/
case class CIMVertexData (var island: VertexId = Long.MaxValue, var island_label: String = "", var node: VertexId = Long.MaxValue, var node_label: String = "", var voltage: String = null) extends Serializable
{
Expand Down

0 comments on commit ed591b8

Please sign in to comment.