Skip to content

Commit

Permalink
Expose Hadoop Configuration/FileSystem (#787)
Browse files Browse the repository at this point in the history
  • Loading branch information
AFFogarty authored Jan 21, 2021
1 parent d2822c3 commit 491de17
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
<IsPackable>false</IsPackable>
</PropertyGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
<RootNamespace>Microsoft.Spark.Extensions.DotNet.Interactive.UnitTest</RootNamespace>
<IsPackable>false</IsPackable>
</PropertyGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
<IsPackable>false</IsPackable>
</PropertyGroup>

<ItemGroup>
Expand Down
56 changes: 56 additions & 0 deletions src/csharp/Microsoft.Spark.E2ETest/Hadoop/FileSystemTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.IO;
using Microsoft.Spark.Hadoop.Fs;
using Microsoft.Spark.Sql;
using Microsoft.Spark.UnitTest.TestUtils;
using Xunit;

namespace Microsoft.Spark.E2ETest.Hadoop
{
[Collection("Spark E2E Tests")]
public class FileSystemTests
{
private readonly SparkSession _spark;

public FileSystemTests(SparkFixture fixture)
{
_spark = fixture.Spark;
}

/// <summary>
/// Test that methods return the expected signature.
/// </summary>
[Fact]
public void TestSignatures()
{
using var tempDirectory = new TemporaryDirectory();

using FileSystem fs = FileSystem.Get(_spark.SparkContext.HadoopConfiguration());

Assert.IsType<bool>(fs.Delete(tempDirectory.Path, true));
}

/// <summary>
/// Test that Delete() deletes the file.
/// </summary>
[Fact]
public void TestDelete()
{
using FileSystem fs = FileSystem.Get(_spark.SparkContext.HadoopConfiguration());

using var tempDirectory = new TemporaryDirectory();
string path = Path.Combine(tempDirectory.Path, "temp-table");
_spark.Range(25).Write().Format("parquet").Save(path);

Assert.True(Directory.Exists(path));

Assert.True(fs.Delete(path, true));
Assert.False(fs.Delete(path, true));

Assert.False(Directory.Exists(path));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information.

using System;
using Microsoft.Spark.Hadoop.Conf;
using Microsoft.Spark.UnitTest.TestUtils;
using Xunit;

Expand Down Expand Up @@ -42,6 +43,8 @@ public void TestSignaturesV2_3_X()

using var tempDir = new TemporaryDirectory();
sc.SetCheckpointDir(TestEnvironment.ResourceDirectory);

Assert.IsType<Configuration>(sc.HadoopConfiguration());
}
}
}
23 changes: 23 additions & 0 deletions src/csharp/Microsoft.Spark/Hadoop/Conf/Configuration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using Microsoft.Spark.Interop.Ipc;

namespace Microsoft.Spark.Hadoop.Conf
{
/// <summary>
/// Provides access to configuration parameters.
/// </summary>
public class Configuration : IJvmObjectReferenceProvider
{
private readonly JvmObjectReference _jvmObject;

internal Configuration(JvmObjectReference jvmObject)
{
_jvmObject = jvmObject;
}

JvmObjectReference IJvmObjectReferenceProvider.Reference => _jvmObject;
}
}
60 changes: 60 additions & 0 deletions src/csharp/Microsoft.Spark/Hadoop/Fs/FileSystem.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using Microsoft.Spark.Hadoop.Conf;
using Microsoft.Spark.Interop;
using Microsoft.Spark.Interop.Ipc;

namespace Microsoft.Spark.Hadoop.Fs
{
/// <summary>
/// A fairly generic filesystem. It may be implemented as a distributed filesystem, or as a "local" one
/// that reflects the locally-connected disk. The local version exists for small Hadoop instances and for
/// testing.
///
/// All user code that may potentially use the Hadoop Distributed File System should be written to use a
/// FileSystem object. The Hadoop DFS is a multi-machine system that appears as a single disk. It's
/// useful because of its fault tolerance and potentially very large capacity.
/// </summary>
public class FileSystem : IJvmObjectReferenceProvider, IDisposable
{
private readonly JvmObjectReference _jvmObject;

internal FileSystem(JvmObjectReference jvmObject)
{
_jvmObject = jvmObject;
}

JvmObjectReference IJvmObjectReferenceProvider.Reference => _jvmObject;

/// <summary>
/// Returns the configured <see cref="FileSystem"/>.
/// </summary>
/// <param name="conf">The configuration to use.</param>
/// <returns>The FileSystem.</returns>
public static FileSystem Get(Configuration conf) =>
new FileSystem((JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
"org.apache.hadoop.fs.FileSystem",
"get",
conf));

/// <summary>
/// Delete a file.
/// </summary>
/// <param name="path">The path to delete.</param>
/// <param name="recursive">If path is a directory and set to true, the directory is deleted else
/// throws an exception. In case of a file the recursive can be set to either true or false.</param>
/// <returns>True if delete is successful else false.</returns>
public bool Delete(string path, bool recursive = true)
{
JvmObjectReference pathObject =
SparkEnvironment.JvmBridge.CallConstructor("org.apache.hadoop.fs.Path", path);

return (bool)_jvmObject.Invoke("delete", pathObject, recursive);
}

public void Dispose() => _jvmObject.Invoke("close");
}
}
8 changes: 8 additions & 0 deletions src/csharp/Microsoft.Spark/SparkContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Generic;
using System.IO;
using System.Runtime.Serialization.Formatters.Binary;
using Microsoft.Spark.Hadoop.Conf;
using Microsoft.Spark.Interop.Ipc;
using static Microsoft.Spark.Utils.CommandSerDe;

Expand Down Expand Up @@ -312,6 +313,13 @@ public Broadcast<T> Broadcast<T>(T value)
return new Broadcast<T>(this, value);
}

/// <summary>
/// A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse.
/// </summary>
/// <returns>The Hadoop Configuration.</returns>
public Configuration HadoopConfiguration() =>
new Configuration((JvmObjectReference)_jvmObject.Invoke("hadoopConfiguration"));

/// <summary>
/// Returns JVM object reference to JavaRDD object transformed
/// from a Scala RDD object.
Expand Down

0 comments on commit 491de17

Please sign in to comment.