Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add streaming support for JSON #2882

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using System.IO;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;
Expand Down Expand Up @@ -1985,7 +1986,11 @@ override public TextReader GetTextReader(int i)
}

System.Text.Encoding encoding;
if (mt.IsNCharType)
if (mt.SqlDbType == SqlDbTypeExtensions.Json)
{
encoding = new UTF8Encoding();
}
else if (mt.IsNCharType)
{
// NChar types always use unicode
encoding = SqlUnicodeEncoding.SqlUnicodeEncodingInstance;
Expand All @@ -1994,7 +1999,6 @@ override public TextReader GetTextReader(int i)
{
encoding = _metaData[i].encoding;
}

_currentTextReader = new SqlSequentialTextReader(this, i, encoding);
_lastColumnWithDataChunkRead = i;
return _currentTextReader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12177,7 +12177,8 @@ private Task WriteUnterminatedValue(object value, MetaType type, byte scale, int
}
else
{
return NullIfCompletedWriteTask(WriteTextFeed(tdf, null, IsBOMNeeded(type, value), stateObj, paramSize));
Encoding encoding = type.NullableType == TdsEnums.SQLJSON ? new UTF8Encoding() : null;
return NullIfCompletedWriteTask(WriteTextFeed(tdf, encoding, IsBOMNeeded(type, value), stateObj, paramSize));
}
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using System.IO;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;
Expand Down Expand Up @@ -2273,7 +2274,11 @@ override public TextReader GetTextReader(int i)
}

System.Text.Encoding encoding;
if (mt.IsNCharType)
if (mt.SqlDbType == SqlDbTypeExtensions.Json)
{
encoding = new UTF8Encoding();
}
else if (mt.IsNCharType)
{
// NChar types always use unicode
encoding = SqlUnicodeEncoding.SqlUnicodeEncodingInstance;
Expand All @@ -2282,7 +2287,6 @@ override public TextReader GetTextReader(int i)
{
encoding = _metaData[i].encoding;
}

_currentTextReader = new SqlSequentialTextReader(this, i, encoding);
_lastColumnWithDataChunkRead = i;
return _currentTextReader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13169,7 +13169,8 @@ private Task WriteUnterminatedValue(object value, MetaType type, byte scale, int
}
else
{
return NullIfCompletedWriteTask(WriteTextFeed(tdf, null, IsBOMNeeded(type, value), stateObj, paramSize));
Encoding encoding = type.NullableType == TdsEnums.SQLJSON ? new UTF8Encoding() : null;
return NullIfCompletedWriteTask(WriteTextFeed(tdf, encoding, IsBOMNeeded(type, value), stateObj, paramSize));
}
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ private static bool _IsCharType(SqlDbType type) =>
type == SqlDbType.Char ||
type == SqlDbType.VarChar ||
type == SqlDbType.Text ||
type == SqlDbTypeExtensions.Json ||
type == SqlDbType.Xml;

private static bool _IsNCharType(SqlDbType type) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,17 @@ public static string GetUniqueNameForSqlServer(string prefix, bool withBracket =
return name;
}

public static void CreateTable(SqlConnection sqlConnection, string tableName, string createBody)
{
DropTable(sqlConnection, tableName);
string tableCreate = "CREATE TABLE " + tableName + createBody;
using (SqlCommand command = sqlConnection.CreateCommand())
{
command.CommandText = tableCreate;
command.ExecuteNonQuery();
}
}

public static void DropTable(SqlConnection sqlConnection, string tableName)
{
ResurrectConnection(sqlConnection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@
<Compile Include="SQL\Common\SystemDataInternals\TdsParserStateObjectHelper.cs" />
<Compile Include="SQL\ConnectionTestWithSSLCert\CertificateTest.cs" />
<Compile Include="SQL\ConnectionTestWithSSLCert\CertificateTestWithTdsServer.cs" />
<Compile Include="SQL\JsonTest\JsonStreamTest.cs" />
<Compile Include="SQL\JsonTest\JsonTest.cs" />
<Compile Include="SQL\SqlCommand\SqlCommandStoredProcTest.cs" />
<Compile Include="TracingTests\TestTdsServer.cs" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Data;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Xunit;
using Xunit.Abstractions;
using Newtonsoft.Json.Linq;


namespace Microsoft.Data.SqlClient.ManualTesting.Tests
{
public class JsonRecord
{
public int Id { get; set; }
public string Name { get; set; }
}

public class JsonStreamTest
{
private readonly ITestOutputHelper _output;
private static readonly string _jsonFile = "randomRecords.json";
private static readonly string _outputFile = "serverRecords.json";

public JsonStreamTest(ITestOutputHelper output)
{
_output = output;
}

private void GenerateJsonFile(int noOfRecords, string filename)
{
DeleteFile(filename);
var random = new Random();
var records = new List<JsonRecord>();
int recordCount = noOfRecords;

for (int i = 0; i < recordCount; i++)
{
records.Add(new JsonRecord
{
Id = i + 1,
Name = "𩸽json" + random.Next(1, noOfRecords),
});
}

string json = JsonConvert.SerializeObject(records, Formatting.Indented);
File.WriteAllText(filename, json);
Assert.True(File.Exists(filename));
_output.WriteLine("Generated JSON file "+filename);
}

private void CompareJsonFiles()
{
using (var stream1 = File.OpenText(_jsonFile))
using (var stream2 = File.OpenText(_outputFile))
using (var reader1 = new JsonTextReader(stream1))
using (var reader2 = new JsonTextReader(stream2))
{
var jToken1 = JToken.ReadFrom(reader1);
var jToken2 = JToken.ReadFrom(reader2);
Assert.True(JToken.DeepEquals(jToken1, jToken2));
}
}

private void PrintJsonDataToFile(SqlConnection connection)
{
DeleteFile(_outputFile);
using (SqlCommand command = new SqlCommand("SELECT [data] FROM [jsonTab]", connection))
{
using (SqlDataReader reader = command.ExecuteReader(CommandBehavior.SequentialAccess))
{
using (StreamWriter sw = new StreamWriter(_outputFile))
{
while (reader.Read())
{
char[] buffer = new char[4096];
int charsRead = 0;

using (TextReader data = reader.GetTextReader(0))
{
do
{
charsRead = data.Read(buffer, 0, buffer.Length);
sw.Write(buffer, 0, charsRead);

} while (charsRead > 0);
}
_output.WriteLine("Output written to " + _outputFile);
}
}
}
}
}

private async Task PrintJsonDataToFileAsync(SqlConnection connection)
{
DeleteFile(_outputFile);
using (SqlCommand command = new SqlCommand("SELECT [data] FROM [jsonTab]", connection))
{
using (SqlDataReader reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess))
{
using (StreamWriter sw = new StreamWriter(_outputFile))
{
while (await reader.ReadAsync())
{
char[] buffer = new char[4096];
int charsRead = 0;

using (TextReader data = reader.GetTextReader(0))
{
do
{
charsRead = await data.ReadAsync(buffer, 0, buffer.Length);
await sw.WriteAsync(buffer, 0, charsRead);

} while (charsRead > 0);
}
_output.WriteLine("Output written to file " + _outputFile);
}
}
}
}
}

private void StreamJsonFileToServer(SqlConnection connection)
{
using (SqlCommand cmd = new SqlCommand("INSERT INTO [jsonTab] (data) VALUES (@jsondata)", connection))
{
using (StreamReader jsonFile = File.OpenText(_jsonFile))
{
cmd.Parameters.Add("@jsondata", Microsoft.Data.SqlDbTypeExtensions.Json, -1).Value = jsonFile;
cmd.ExecuteNonQuery();
}
}
}

private async Task StreamJsonFileToServerAsync(SqlConnection connection)
{
using (SqlCommand cmd = new SqlCommand("INSERT INTO [jsonTab] (data) VALUES (@jsondata)", connection))
{
using (StreamReader jsonFile = File.OpenText(_jsonFile))
{
cmd.Parameters.Add("@jsondata", Microsoft.Data.SqlDbTypeExtensions.Json, -1).Value = jsonFile;
await cmd.ExecuteNonQueryAsync();
}
}
}

private void DeleteFile(string filename)
{
if (File.Exists(filename))
{
File.Delete(filename);
}
}

[ConditionalFact(typeof(DataTestUtility), nameof(DataTestUtility.IsJsonSupported))]
public void TestJsonStreaming()
{
GenerateJsonFile(10000, _jsonFile);
using (SqlConnection connection = new SqlConnection(DataTestUtility.TCPConnectionString))
{
connection.Open();
DataTestUtility.CreateTable(connection, "jsonTab", "(data json)");
StreamJsonFileToServer(connection);
PrintJsonDataToFile(connection);
CompareJsonFiles();
DeleteFile(_jsonFile);
DeleteFile(_outputFile);
}
}

[ConditionalFact(typeof(DataTestUtility), nameof(DataTestUtility.IsJsonSupported))]
public async Task TestJsonStreamingAsync()
{
GenerateJsonFile(10000, _jsonFile);
using (SqlConnection connection = new SqlConnection(DataTestUtility.TCPConnectionString))
{
await connection.OpenAsync();
DataTestUtility.CreateTable(connection, "jsonTab", "(data json)");
await StreamJsonFileToServerAsync(connection);
await PrintJsonDataToFileAsync(connection);
CompareJsonFiles();
DeleteFile(_jsonFile);
DeleteFile(_outputFile);
}
}
}
}

Loading