diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala index 0558e0092..69659bd47 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala @@ -236,8 +236,8 @@ object OneDAL { matrixLabel } - private def vectorsToSparseNumericTable(vectors: Array[Vector], - nFeatures: Long): CSRNumericTable = { + def vectorsToSparseNumericTable(vectors: Array[Vector], + nFeatures: Long): CSRNumericTable = { require(vectors(0).isInstanceOf[SparseVector], "vectors should be sparse") println(s"Features row x column: ${vectors.length} x ${vectors(0).size}") @@ -250,10 +250,10 @@ object OneDAL { val columnIndices = Array.fill(ratingsNum) { 0L } + // First row index is 1 val rowOffsets = ArrayBuffer[Long](1L) var indexValues = 0 - var curRow = 0L // Converted to one CSRNumericTable for (row <- 0 until vectors.length) { @@ -263,20 +263,22 @@ object OneDAL { // one-based indexValues columnIndices(indexValues) = column + 1 - if (row > curRow) { - curRow = row - // one-based indexValues - rowOffsets += indexValues + 1 - } - indexValues = indexValues + 1 } + // one-based row indexValues + rowOffsets += indexValues + 1 } - // one-based row indexValues - rowOffsets += indexValues + 1 val contextLocal = new DaalContext() + // check CSR encoding + assert(values.length == ratingsNum, + "the length of values should be equal to the number of non-zero elements") + assert(columnIndices.length == ratingsNum, + "the length of columnIndices should be equal to the number of non-zero elements") + assert(rowOffsets.size == (csrRowNum + 1), + "the size of rowOffsets should be equal to the number of rows + 1") + val cTable = OneDAL.cNewCSRNumericTableDouble(values, columnIndices, rowOffsets.toArray, nFeatures, csrRowNum) val table = new CSRNumericTable(contextLocal, cTable) diff --git a/mllib-dal/src/test/scala/org/apache/spark/ml/oneDALSuite.scala b/mllib-dal/src/test/scala/org/apache/spark/ml/oneDALSuite.scala new file mode 100644 index 000000000..f2c27fc2e --- /dev/null +++ b/mllib-dal/src/test/scala/org/apache/spark/ml/oneDALSuite.scala @@ -0,0 +1,40 @@ +package org.apache.spark.ml + +import com.intel.oap.mllib.OneDAL +import org.apache.spark.internal.Logging +import org.apache.spark.ml.linalg.{Matrices, Vector, Vectors} +import org.apache.spark.sql.Row + +class oneDALSuite extends FunctionsSuite with Logging { + + import testImplicits._ + + test("test sparse vector to CSRNumericTable") { + val data = Seq( + Vectors.sparse(3, Seq((0, 1.0), (1, 2.0), (2, 3.0))), + Vectors.sparse(3, Seq((0, 10.0), (1, 20.0), (2, 30.0))), + Vectors.sparse(3, Seq.empty), + Vectors.sparse(3, Seq.empty), + Vectors.sparse(3, Seq((0, 1.0), (1, 2.0))), + Vectors.sparse(3, Seq((0, 10.0), (2, 20.0))), + ) + val df = data.map(Tuple1.apply).toDF("features") + df.show() + val rowsRDD = df.rdd.map { + case Row(features: Vector) => features + } + val results = rowsRDD.coalesce(1).mapPartitions { it: Iterator[Vector] => + val vectors: Array[Vector] = it.toArray + val numColumns = vectors(0).size + val CSRNumericTable = { + OneDAL.vectorsToSparseNumericTable(vectors, numColumns) + } + Iterator(CSRNumericTable.getCNumericTable) + }.collect() + val csr = OneDAL.makeNumericTable(results(0)) + val resultMatrix = OneDAL.numericTableToMatrix(csr) + val matrix = Matrices.fromVectors(data) + + assert((resultMatrix.toArray sameElements matrix.toArray) === true) + } +} diff --git a/mllib-dal/test.sh b/mllib-dal/test.sh index 6bb4f1652..d331cb324 100755 --- a/mllib-dal/test.sh +++ b/mllib-dal/test.sh @@ -53,7 +53,8 @@ suiteArray=( "classification.MLlibNaiveBayesSuite" \ "regression.MLlibLinearRegressionSuite" \ "stat.MLlibCorrelationSuite" \ - "stat.MultivariateOnlineSummarizerSuite" + "stat.MultivariateOnlineSummarizerSuite" \ + "oneDALSuite" ) MVN_NO_TRANSFER_PROGRESS=