Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FileSystem extension skeleton #787

Merged
merged 33 commits into from
Jan 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
7c25fcb
Basic FileSystem implementation
Dec 2, 2020
cc8237f
Move to extension
Dec 2, 2020
f4e141b
Todo
Dec 2, 2020
bcde071
Tests should run
Dec 2, 2020
06d8b24
Don't use interface
Dec 2, 2020
dfad910
Update src/csharp/Extensions/Microsoft.Spark.Extensions.FileSystem/Jv…
AFFogarty Dec 3, 2020
50bc611
Remove IsPackable
Dec 3, 2020
00d68a6
Fixed: Line length 110
Dec 3, 2020
ec52232
Update src/csharp/Extensions/Microsoft.Spark.Extensions.FileSystem/Fi…
AFFogarty Dec 3, 2020
4adbb64
Update src/csharp/Extensions/Microsoft.Spark.Extensions.FileSystem/Fi…
AFFogarty Dec 3, 2020
16abe0a
Separate testing for signature and functionality
Dec 3, 2020
3310897
Merge
Dec 3, 2020
d79e702
Fixed: Test the type of FileSystem
Dec 3, 2020
bd8579b
Fixed: Empty line
Dec 3, 2020
a3bf508
Rename to Hadoop.FileSystem
Dec 4, 2020
4fc9718
Rename references
Dec 4, 2020
1bacdb2
Move to correct dir
Dec 4, 2020
53351a6
Merge implementation with abstract class
Dec 4, 2020
ac57039
Better comment
Dec 4, 2020
8a85d60
Fixed: Line length
Dec 7, 2020
9936e9f
Merge new files into existing project
Dec 11, 2020
c896d63
Fix references
Dec 11, 2020
e1f33e8
Fixed: return message
Dec 11, 2020
a856829
Merge branch 'master' of https://github.com/dotnet/spark into anfog/1…
Jan 5, 2021
6433b3c
Update src/csharp/Microsoft.Spark/Hadoop/FS/FileSystem.cs
AFFogarty Jan 5, 2021
4b906e0
Merge branch 'anfog/1201_filesystem' of https://github.com/AFFogarty/…
Jan 5, 2021
dcf07d5
Compiles
Jan 5, 2021
e41e1ef
Casing fix -- part 1
Jan 5, 2021
f798618
Casing fix -- part 2
Jan 5, 2021
8cc2378
Don't need to test assignable
Jan 5, 2021
74ce69d
Formatting
Jan 5, 2021
257ce6c
Fixed: Don't commi test app
Jan 5, 2021
9f92c7d
Merge branch 'master' of https://github.com/dotnet/spark into anfog/1…
Jan 21, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
<IsPackable>false</IsPackable>
</PropertyGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
<RootNamespace>Microsoft.Spark.Extensions.DotNet.Interactive.UnitTest</RootNamespace>
<IsPackable>false</IsPackable>
</PropertyGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
<IsPackable>false</IsPackable>
</PropertyGroup>

<ItemGroup>
Expand Down
56 changes: 56 additions & 0 deletions src/csharp/Microsoft.Spark.E2ETest/Hadoop/FileSystemTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.IO;
using Microsoft.Spark.Hadoop.Fs;
using Microsoft.Spark.Sql;
using Microsoft.Spark.UnitTest.TestUtils;
using Xunit;

namespace Microsoft.Spark.E2ETest.Hadoop
{
[Collection("Spark E2E Tests")]
public class FileSystemTests
{
private readonly SparkSession _spark;

public FileSystemTests(SparkFixture fixture)
{
_spark = fixture.Spark;
}

/// <summary>
/// Test that methods return the expected signature.
/// </summary>
[Fact]
public void TestSignatures()
{
using var tempDirectory = new TemporaryDirectory();

using FileSystem fs = FileSystem.Get(_spark.SparkContext.HadoopConfiguration());

Assert.IsType<bool>(fs.Delete(tempDirectory.Path, true));
}

/// <summary>
/// Test that Delete() deletes the file.
/// </summary>
[Fact]
public void TestDelete()
{
using FileSystem fs = FileSystem.Get(_spark.SparkContext.HadoopConfiguration());

using var tempDirectory = new TemporaryDirectory();
string path = Path.Combine(tempDirectory.Path, "temp-table");
_spark.Range(25).Write().Format("parquet").Save(path);

Assert.True(Directory.Exists(path));

Assert.True(fs.Delete(path, true));
Assert.False(fs.Delete(path, true));

Assert.False(Directory.Exists(path));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information.

using System;
using Microsoft.Spark.Hadoop.Conf;
using Microsoft.Spark.UnitTest.TestUtils;
using Xunit;

Expand Down Expand Up @@ -42,6 +43,8 @@ public void TestSignaturesV2_3_X()

using var tempDir = new TemporaryDirectory();
sc.SetCheckpointDir(TestEnvironment.ResourceDirectory);

Assert.IsType<Configuration>(sc.HadoopConfiguration());
}
}
}
23 changes: 23 additions & 0 deletions src/csharp/Microsoft.Spark/Hadoop/Conf/Configuration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using Microsoft.Spark.Interop.Ipc;

namespace Microsoft.Spark.Hadoop.Conf
{
/// <summary>
/// Provides access to configuration parameters.
/// </summary>
public class Configuration : IJvmObjectReferenceProvider
{
private readonly JvmObjectReference _jvmObject;

internal Configuration(JvmObjectReference jvmObject)
{
_jvmObject = jvmObject;
}

JvmObjectReference IJvmObjectReferenceProvider.Reference => _jvmObject;
}
}
60 changes: 60 additions & 0 deletions src/csharp/Microsoft.Spark/Hadoop/Fs/FileSystem.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using Microsoft.Spark.Hadoop.Conf;
using Microsoft.Spark.Interop;
using Microsoft.Spark.Interop.Ipc;

namespace Microsoft.Spark.Hadoop.Fs
{
/// <summary>
/// A fairly generic filesystem. It may be implemented as a distributed filesystem, or as a "local" one
/// that reflects the locally-connected disk. The local version exists for small Hadoop instances and for
/// testing.
///
/// All user code that may potentially use the Hadoop Distributed File System should be written to use a
/// FileSystem object. The Hadoop DFS is a multi-machine system that appears as a single disk. It's
/// useful because of its fault tolerance and potentially very large capacity.
/// </summary>
public class FileSystem : IJvmObjectReferenceProvider, IDisposable
{
private readonly JvmObjectReference _jvmObject;

internal FileSystem(JvmObjectReference jvmObject)
{
_jvmObject = jvmObject;
}

JvmObjectReference IJvmObjectReferenceProvider.Reference => _jvmObject;

/// <summary>
/// Returns the configured <see cref="FileSystem"/>.
/// </summary>
/// <param name="conf">The configuration to use.</param>
/// <returns>The FileSystem.</returns>
public static FileSystem Get(Configuration conf) =>
new FileSystem((JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
"org.apache.hadoop.fs.FileSystem",
"get",
conf));

/// <summary>
/// Delete a file.
/// </summary>
/// <param name="path">The path to delete.</param>
/// <param name="recursive">If path is a directory and set to true, the directory is deleted else
/// throws an exception. In case of a file the recursive can be set to either true or false.</param>
/// <returns>True if delete is successful else false.</returns>
public bool Delete(string path, bool recursive = true)
{
JvmObjectReference pathObject =
SparkEnvironment.JvmBridge.CallConstructor("org.apache.hadoop.fs.Path", path);

return (bool)_jvmObject.Invoke("delete", pathObject, recursive);
}

public void Dispose() => _jvmObject.Invoke("close");
}
}
8 changes: 8 additions & 0 deletions src/csharp/Microsoft.Spark/SparkContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Generic;
using System.IO;
using System.Runtime.Serialization.Formatters.Binary;
using Microsoft.Spark.Hadoop.Conf;
using Microsoft.Spark.Interop.Ipc;
using static Microsoft.Spark.Utils.CommandSerDe;

Expand Down Expand Up @@ -312,6 +313,13 @@ public Broadcast<T> Broadcast<T>(T value)
return new Broadcast<T>(this, value);
}

/// <summary>
/// A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse.
/// </summary>
/// <returns>The Hadoop Configuration.</returns>
public Configuration HadoopConfiguration() =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@imback82 Not sure if this should be a function or a property.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is def on the Scala side, this will be a method on C# side. So the current approach is good.

new Configuration((JvmObjectReference)_jvmObject.Invoke("hadoopConfiguration"));

/// <summary>
/// Returns JVM object reference to JavaRDD object transformed
/// from a Scala RDD object.
Expand Down