diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala index 0620e287d0c7..0545c140bb3e 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala @@ -73,6 +73,7 @@ object HoodieProcedures { mapBuilder.put(MetadataInitProcedure.NAME, MetadataInitProcedure.builder) mapBuilder.put(ShowMetadataStatsProcedure.NAME, ShowMetadataStatsProcedure.builder) mapBuilder.put(ValidateMetadataFilesProcedure.NAME, ValidateMetadataFilesProcedure.builder) + mapBuilder.put(ShowFsPathDetailProcedure.NAME, ShowFsPathDetailProcedure.builder) mapBuilder.build } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFsPathDetailProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFsPathDetailProcedure.scala new file mode 100644 index 000000000000..8aa56f27a315 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFsPathDetailProcedure.scala @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import com.google.common.collect.Lists +import org.apache.hadoop.fs.{ContentSummary, FileStatus, Path} +import org.apache.hudi.common.fs.FSUtils +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.text.DecimalFormat +import java.util.function.Supplier + +class ShowFsPathDetailProcedure extends BaseProcedure with ProcedureBuilder { + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "path", DataTypes.StringType, None), + ProcedureParameter.optional(1, "is_sub", DataTypes.BooleanType, false), + ProcedureParameter.optional(2, "sort", DataTypes.BooleanType, true) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("path_num", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("file_num", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("storage_size", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("storage_size(unit)", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("storage_path", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("space_consumed", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("quota", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("space_quota", DataTypes.LongType, nullable = true, Metadata.empty)) + ) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val srcPath = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String] + val isSub = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Boolean] + val sort = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Boolean] + + val path: Path = new Path(srcPath) + val fs = FSUtils.getFs(path, jsc.hadoopConfiguration()) + val status: Array[FileStatus] = if (isSub) fs.listStatus(path) else fs.globStatus(path) + val rows: java.util.List[Row] = Lists.newArrayList() + + if (status.nonEmpty) { + for (i <- status.indices) { + val summary: ContentSummary = fs.getContentSummary(status(i).getPath) + val storagePath: String = status(i).getPath.toString + rows.add(Row(summary.getDirectoryCount, summary.getFileCount, summary.getLength, + getFileSize(summary.getLength), storagePath, summary.getQuota, summary.getSpaceConsumed, + summary.getSpaceQuota)) + } + } + + val df = spark.sqlContext.createDataFrame(rows, OUTPUT_TYPE) + if (sort) { + df.orderBy(df("storage_size").desc).collect() + } else { + df.orderBy(df("file_num").desc).collect() + } + } + + def getFileSize(size: Long): String = { + val GB = 1024 * 1024 * 1024 + val MB = 1024 * 1024 + val KB = 1024 + val df: DecimalFormat = new DecimalFormat("0.00") + + val resultSize = if (size / GB >= 1) { + df.format(size / GB.toFloat) + "GB" + } else if (size / MB >= 1) { + df.format(size / MB.toFloat) + "MB" + } else if (size / KB >= 1) { + df.format(size / KB.toFloat) + "KB" + } else { + size + "B" + } + + resultSize + } + + override def build: Procedure = new ShowFsPathDetailProcedure() +} + +object ShowFsPathDetailProcedure { + val NAME = "show_fs_path_detail" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get(): ProcedureBuilder = new ShowFsPathDetailProcedure() + } +} + + + diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowFsPathDetailProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowFsPathDetailProcedure.scala new file mode 100644 index 000000000000..8940f7c4ee62 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowFsPathDetailProcedure.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.procedure + +import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase + +class TestShowFsPathDetailProcedure extends HoodieSparkSqlTestBase { + test("Test Call show_fs_path_detail Procedure") { + withTempDir { tmp => + val tableName = generateTableName + val tablePath = tmp.getCanonicalPath + "/" + tableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '$tablePath' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + + // insert data to table + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") + spark.sql(s"insert into $tableName select 3, 'a3', 30, 2000") + + val result = spark.sql(s"""call show_fs_path_detail(path => '$tablePath')""").collect() + assertResult(1) { + result.length + } + } + } +}