Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get Microsoft.Spark and Microsoft.Spark.Worker assembly version information #715

Merged
merged 23 commits into from
Oct 8, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions src/csharp/Microsoft.Spark/Sql/SparkSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using Microsoft.Spark.Utils;
using Microsoft.Spark.Interop;
using Microsoft.Spark.Interop.Internal.Scala;
using Microsoft.Spark.Interop.Ipc;
Expand Down Expand Up @@ -389,6 +390,42 @@ public UdfRegistration Udf() =>
/// </summary>
public void Stop() => _jvmObject.Invoke("stop");

/// <summary>
/// Get the <see cref="VersionSensor.VersionInfo"/> for the Microsoft.Spark assembly
/// running on the Spark Driver and make a "best effort" attempt in determining the version
/// of Microsoft.Spark.Worker assembly on the Spark Executors.
/// </summary>
suhsteve marked this conversation as resolved.
Show resolved Hide resolved
/// <returns>
/// A <see cref="DataFrame"/> containing the <see cref="VersionSensor.VersionInfo"/>
/// </returns>
public DataFrame Version(int numPartitions = 10)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, why have we chosen numPartitions as 10 here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any number is fine, just some safe default to run for local / small clusters. Do you have a recommendation on what value we can use ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No 10 is fine, I was just wondering if there was a specific reason.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to add a test covering this since it is a public API?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I am wondering if asking for numPartitions is really necessary for finding the version from the user perspective. Would having it as a constant inside the function pose a problem?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be the pros and cons of hardcoding the numbers instead ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pros I think would be usability, since it might be hard to intuitively understand what number of partitions a user should enter and what it has to do with getting version information. Cons, I guess include not being able to trigger every worker node and if some nodes have a different worker version on them then that would go uncaught. But that could happen even with taking numPartitions as an argument so not sure what the benefit there is, unless we document it in more depth so as to advise the user of the importance of estimating a 'correct' numPartitions value, maybe explain in detail what best effort means etc. Thoughts @imback82 ?

Copy link
Member Author

@suhsteve suhsteve Oct 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should name this AssemblyVersion/AssemblyVersionInfo/VersionInfo or something unique to dotnet just in case Spark ever adds a method called Version ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about making this an extension method under Experimental namespace?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep in same Microsoft.Spark project just a different namespace, right ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to Microsoft.Spark.Experimental.Sql

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about the assembly version on the driver?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which assembly version on the driver ? We are getting the Microsoft.Spark assembly version on the driver and the Microsoft.Spark.Worker version on whichever machines the spark executors are spinning up on.

{
StructType schema = VersionSensor.VersionInfo.s_schema;

DataFrame sparkInfoDf =
CreateDataFrame(
new GenericRow[] { VersionSensor.MicrosoftSparkVersion().ToGenericRow() },
schema);

Func<Column, Column> workerInfoUdf =
Functions.Udf<int>(
i => VersionSensor.MicrosoftSparkWorkerVersion().ToGenericRow(),
schema);
suhsteve marked this conversation as resolved.
Show resolved Hide resolved
DataFrame df = CreateDataFrame(Enumerable.Range(0, 10 * numPartitions));

string tempColName = "WorkerVersionInfo";
DataFrame workerInfoDf = df
.Repartition(numPartitions)
.WithColumn(tempColName, workerInfoUdf(df["_1"]))
.Select(schema.Fields.Select(
f => Functions.Col($"{tempColName}.{f.Name}")).ToArray());

return sparkInfoDf
.Union(workerInfoDf)
.DropDuplicates()
.Sort(schema.Fields.Select(f => Functions.Col(f.Name)).ToArray());
}

/// <summary>
/// Returns a single column schema of the given datatype.
/// </summary>
Expand Down
72 changes: 72 additions & 0 deletions src/csharp/Microsoft.Spark/Utils/VersionSensor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.Linq;
using System.Net;
using System.Reflection;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;

namespace Microsoft.Spark.Utils
suhsteve marked this conversation as resolved.
Show resolved Hide resolved
{
internal class VersionSensor
suhsteve marked this conversation as resolved.
Show resolved Hide resolved
{
internal static VersionInfo MicrosoftSparkVersion() => s_microsoftSparkVersionInfo.Value;

internal static VersionInfo MicrosoftSparkWorkerVersion() =>
s_microsoftSparkWorkerVersionInfo.Value;

private static readonly Lazy<VersionInfo> s_microsoftSparkVersionInfo =
new Lazy<VersionInfo>(() =>
{
Assembly assembly =
AppDomain.CurrentDomain
.GetAssemblies()
.SingleOrDefault(asm => asm.GetName().Name == "Microsoft.Spark");

return CreateVersionInfo(assembly);
});

private static readonly Lazy<VersionInfo> s_microsoftSparkWorkerVersionInfo =
new Lazy<VersionInfo>(() =>
{
Assembly assembly =
AppDomain.CurrentDomain
.GetAssemblies()
.SingleOrDefault(asm => asm.GetName().Name == "Microsoft.Spark.Worker");

return CreateVersionInfo(assembly);
});

private static VersionInfo CreateVersionInfo(Assembly assembly)
{
AssemblyName asmName = assembly.GetName();
return new VersionInfo
{
AssemblyName = asmName.Name,
AssemblyVersion = asmName.Version.ToString(),
HostName = Dns.GetHostName()
};
}

internal class VersionInfo
suhsteve marked this conversation as resolved.
Show resolved Hide resolved
{
internal static readonly StructType s_schema = new StructType(
new StructField[]
{
new StructField("AssemblyName", new StringType(), isNullable: false),
new StructField("AssemblyVersion", new StringType(), isNullable: false),
new StructField("HostName", new StringType(), isNullable: false)
});

internal string AssemblyName { get; set; }
internal string AssemblyVersion { get; set; }
internal string HostName { get; set; }
Copy link
Member Author

@suhsteve suhsteve Oct 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add BuildDate (by getting assembly file creation date) if we think it'd be useful/nice to have.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can iterate on this as we get feedbacks


internal GenericRow ToGenericRow() =>
new GenericRow(new object[] { AssemblyName, AssemblyVersion, HostName });
}
}
}