diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlBulkCopy.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlBulkCopy.cs index 93bf6e119f..d368a25d50 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlBulkCopy.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlBulkCopy.cs @@ -596,6 +596,10 @@ private string AnalyzeTargetAndCreateUpdateBulkCommand(BulkCopySimpleResultSet i { AppendColumnNameAndTypeName(updateBulkCommandText, metadata.column, "varbinary"); } + else if (metadata.type == SqlDbTypeExtensions.Json) + { + AppendColumnNameAndTypeName(updateBulkCommandText, metadata.column, "json"); + } else { AppendColumnNameAndTypeName(updateBulkCommandText, metadata.column, metadata.type.ToString()); @@ -645,7 +649,7 @@ private string AnalyzeTargetAndCreateUpdateBulkCommand(BulkCopySimpleResultSet i } updateBulkCommandText.AppendFormat((IFormatProvider)null, "({0})", size); } - else if (metadata.metaType.IsPlp && metadata.metaType.SqlDbType != SqlDbType.Xml) + else if (metadata.metaType.IsPlp && metadata.metaType.SqlDbType != SqlDbType.Xml && metadata.metaType.SqlDbType != SqlDbTypeExtensions.Json) { // Partial length column prefix (max) updateBulkCommandText.Append("(max)"); @@ -1213,7 +1217,7 @@ private SourceColumnMetadata GetColumnMetadata(int ordinal) } } // Check for data streams - else if ((_enableStreaming) && (metadata.length == MAX_LENGTH)) + else if ((_enableStreaming) && ((metadata.length == MAX_LENGTH) || metadata.type == SqlDbTypeExtensions.Json)) { isSqlType = false; @@ -1229,7 +1233,7 @@ private SourceColumnMetadata GetColumnMetadata(int ordinal) method = ValueMethod.DataFeedStream; } // For text and XML there is memory gain from streaming on destination side even if reader is non-sequential - else if (((metadata.type == SqlDbType.VarChar) || (metadata.type == SqlDbType.NVarChar)) && (mtSource.IsCharType) && (mtSource.SqlDbType != SqlDbType.Xml)) + else if (((metadata.type == SqlDbType.VarChar) || (metadata.type == SqlDbType.NVarChar || metadata.type == SqlDbTypeExtensions.Json)) && (mtSource.IsCharType) && (mtSource.SqlDbType != SqlDbType.Xml)) { isDataFeed = true; method = ValueMethod.DataFeedText; @@ -1590,6 +1594,7 @@ private object ConvertValue(object value, _SqlMetaData metadata, bool isNull, re } break; case TdsEnums.SQLXMLTYPE: + case TdsEnums.SQLJSON: // Could be either string, SqlCachedBuffer, XmlReader or XmlDataFeed Debug.Assert((value is XmlReader) || (value is SqlCachedBuffer) || (value is string) || (value is SqlString) || (value is XmlDataFeed), "Invalid value type of Xml datatype"); if (value is XmlReader) diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/TdsParser.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/TdsParser.cs index 6119676b6e..d99ca11429 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/TdsParser.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/TdsParser.cs @@ -164,6 +164,9 @@ internal void Stop() // XML metadata substitute sequence private static readonly byte[] s_xmlMetadataSubstituteSequence = { 0xe7, 0xff, 0xff, 0x00, 0x00, 0x00, 0x00, 0x00 }; + // JSON metadata substitute sequence + private static readonly byte[] s_jsonMetadataSubstituteSequence = { 0xa7, 0xff, 0xff, 0x00, 0x00, 0x00, 0x00, 0x00 }; + // size of Guid (e.g. _clientConnectionId, ActivityId.Id) private const int GUID_SIZE = 16; @@ -8072,6 +8075,11 @@ internal TdsOperationStatus TryGetTokenLength(byte token, TdsParserStateObject s Debug.Assert(tokenLength == TdsEnums.SQL_USHORTVARMAXLEN, "Invalid token stream for xml datatype"); return TdsOperationStatus.Done; } + else if (token == TdsEnums.SQLJSON) + { + tokenLength = -1; + return TdsOperationStatus.Done; + } } switch (token & TdsEnums.SQLLenMask) @@ -10788,6 +10796,9 @@ internal void WriteBulkCopyMetaData(_SqlMetaDataSet metadataCollection, int coun stateObj.WriteByte(md.tdsType); stateObj.WriteByte(md.scale); break; + case SqlDbTypeExtensions.Json: + stateObj.WriteByteArray(s_jsonMetadataSubstituteSequence, s_jsonMetadataSubstituteSequence.Length, 0); + break; default: stateObj.WriteByte(md.tdsType); WriteTokenLength(md.tdsType, md.length, stateObj); @@ -11042,7 +11053,11 @@ internal Task WriteBulkCopyValue(object value, SqlMetaDataPriv metadata, TdsPars } ccb = ((isSqlType) ? ((SqlString)value).Value.Length : ((string)value).Length) * 2; break; - + case TdsEnums.SQLJSON: + string strval = (isSqlType) ? ((SqlString)value).Value : (string)value; + ccb = isSqlType ? strval.Length : strval.Length * 2; + ccbStringBytes = Encoding.UTF8.GetByteCount(strval); + break; default: ccb = metadata.length; break; @@ -11052,9 +11067,9 @@ internal Task WriteBulkCopyValue(object value, SqlMetaDataPriv metadata, TdsPars { Debug.Assert(metatype.IsLong && ((metatype.SqlDbType == SqlDbType.VarBinary && value is StreamDataFeed) || - ((metatype.SqlDbType == SqlDbType.VarChar || metatype.SqlDbType == SqlDbType.NVarChar) && value is TextDataFeed) || + ((metatype.SqlDbType == SqlDbType.VarChar || metatype.SqlDbType == SqlDbType.NVarChar || metatype.SqlDbType == SqlDbTypeExtensions.Json) && value is TextDataFeed) || (metatype.SqlDbType == SqlDbType.Xml && value is XmlDataFeed)), - "Stream data feed should only be assigned to VarBinary(max), Text data feed should only be assigned to [N]VarChar(max), Xml data feed should only be assigned to XML(max)"); + "Stream data feed should only be assigned to VarBinary(max), Text data feed should only be assigned to [N]VarChar(max) or json, Xml data feed should only be assigned to XML(max)"); } @@ -11076,6 +11091,7 @@ internal Task WriteBulkCopyValue(object value, SqlMetaDataPriv metadata, TdsPars case SqlDbType.VarBinary: case SqlDbType.Xml: case SqlDbType.Udt: + case SqlDbTypeExtensions.Json: // plp data WriteUnsignedLong(TdsEnums.SQL_PLP_UNKNOWNLEN, stateObj); break; @@ -11956,7 +11972,7 @@ private async Task WriteXmlFeed(XmlDataFeed feed, TdsParserStateObject stateObj, } } - private async Task WriteTextFeed(TextDataFeed feed, Encoding encoding, bool needBom, TdsParserStateObject stateObj, int size) + private async Task WriteTextFeed(TextDataFeed feed, Encoding encoding, bool needBom, TdsParserStateObject stateObj, int size, bool useReadBlock) { Debug.Assert(encoding == null || !needBom); char[] inBuff = ArrayPool.Shared.Rent(constTextBufferSize); @@ -11984,11 +12000,25 @@ private async Task WriteTextFeed(TextDataFeed feed, Encoding encoding, bool need if (_asyncWrite) { - nRead = await feed._source.ReadBlockAsync(inBuff, 0, constTextBufferSize).ConfigureAwait(false); + if (useReadBlock) + { + nRead = await feed._source.ReadBlockAsync(inBuff, 0, constTextBufferSize).ConfigureAwait(false); + } + else + { + nRead = await feed._source.ReadAsync(inBuff, 0, constTextBufferSize).ConfigureAwait(false); + } } else { - nRead = feed._source.ReadBlock(inBuff, 0, constTextBufferSize); + if (useReadBlock) + { + nRead = feed._source.ReadBlock(inBuff, 0, constTextBufferSize); + } + else + { + nRead = feed._source.Read(inBuff, 0, constTextBufferSize); + } } if (nRead == 0) @@ -12189,7 +12219,7 @@ private Task WriteUnterminatedValue(object value, MetaType type, byte scale, int } else { - return NullIfCompletedWriteTask(WriteTextFeed(tdf, _defaultEncoding, false, stateObj, paramSize)); + return NullIfCompletedWriteTask(WriteTextFeed(tdf, _defaultEncoding, false, stateObj, paramSize, true)); } } else @@ -12228,7 +12258,8 @@ private Task WriteUnterminatedValue(object value, MetaType type, byte scale, int else { Encoding encoding = type.NullableType == TdsEnums.SQLJSON ? new UTF8Encoding() : null; - return NullIfCompletedWriteTask(WriteTextFeed(tdf, encoding, IsBOMNeeded(type, value), stateObj, paramSize)); + bool useReadBlock = type.NullableType == TdsEnums.SQLJSON ? false : true; + return NullIfCompletedWriteTask(WriteTextFeed(tdf, encoding, IsBOMNeeded(type, value), stateObj, paramSize, useReadBlock)); } } else diff --git a/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/SqlBulkCopy.cs b/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/SqlBulkCopy.cs index 0f43f46da1..ac9e1a79c7 100644 --- a/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/SqlBulkCopy.cs +++ b/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/SqlBulkCopy.cs @@ -631,6 +631,10 @@ private string AnalyzeTargetAndCreateUpdateBulkCommand(BulkCopySimpleResultSet i { AppendColumnNameAndTypeName(updateBulkCommandText, metadata.column, "varbinary"); } + else if (metadata.type == SqlDbTypeExtensions.Json) + { + AppendColumnNameAndTypeName(updateBulkCommandText, metadata.column, "json"); + } else { AppendColumnNameAndTypeName(updateBulkCommandText, metadata.column, typeof(SqlDbType).GetEnumName(metadata.type)); @@ -680,7 +684,7 @@ private string AnalyzeTargetAndCreateUpdateBulkCommand(BulkCopySimpleResultSet i } updateBulkCommandText.AppendFormat((IFormatProvider)null, "({0})", size); } - else if (metadata.metaType.IsPlp && metadata.metaType.SqlDbType != SqlDbType.Xml) + else if (metadata.metaType.IsPlp && metadata.metaType.SqlDbType != SqlDbType.Xml && metadata.metaType.SqlDbType != SqlDbTypeExtensions.Json) { // Partial length column prefix (max) updateBulkCommandText.Append("(max)"); @@ -1254,7 +1258,7 @@ private SourceColumnMetadata GetColumnMetadata(int ordinal) } } // Check for data streams - else if ((_enableStreaming) && (metadata.length == MAX_LENGTH) && (!_rowSourceIsSqlDataReaderSmi)) + else if ((_enableStreaming) && ((metadata.length == MAX_LENGTH) || metadata.metaType.SqlDbType == SqlDbTypeExtensions.Json) && (!_rowSourceIsSqlDataReaderSmi)) { isSqlType = false; @@ -1270,7 +1274,7 @@ private SourceColumnMetadata GetColumnMetadata(int ordinal) method = ValueMethod.DataFeedStream; } // For text and XML there is memory gain from streaming on destination side even if reader is non-sequential - else if (((metadata.type == SqlDbType.VarChar) || (metadata.type == SqlDbType.NVarChar)) && (mtSource.IsCharType) && (mtSource.SqlDbType != SqlDbType.Xml)) + else if (((metadata.type == SqlDbType.VarChar) || (metadata.type == SqlDbType.NVarChar) || (metadata.type == SqlDbTypeExtensions.Json)) && (mtSource.IsCharType) && (mtSource.SqlDbType != SqlDbType.Xml)) { isDataFeed = true; method = ValueMethod.DataFeedText; @@ -1640,6 +1644,7 @@ private object ConvertValue(object value, _SqlMetaData metadata, bool isNull, re } break; case TdsEnums.SQLXMLTYPE: + case TdsEnums.SQLJSON: // Could be either string, SqlCachedBuffer, XmlReader or XmlDataFeed Debug.Assert((value is XmlReader) || (value is SqlCachedBuffer) || (value is string) || (value is SqlString) || (value is XmlDataFeed), "Invalid value type of Xml datatype"); if (value is XmlReader) diff --git a/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/TdsParser.cs b/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/TdsParser.cs index 8267bc75e0..c5500a4da3 100644 --- a/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/TdsParser.cs +++ b/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/TdsParser.cs @@ -244,6 +244,9 @@ internal static void Assert(string message) // XML metadata substitue sequence private static readonly byte[] s_xmlMetadataSubstituteSequence = { 0xe7, 0xff, 0xff, 0x00, 0x00, 0x00, 0x00, 0x00 }; + // JSON metadata substitue sequence + private static readonly byte[] s_jsonMetadataSubstituteSequence = { 0xa7, 0xff, 0xff, 0x00, 0x00, 0x00, 0x00, 0x00 }; + // size of Guid (e.g. _clientConnectionId, ActivityId.Id) private const int GUID_SIZE = 16; private byte[] _tempGuidBytes; @@ -8781,6 +8784,11 @@ internal TdsOperationStatus TryGetTokenLength(byte token, TdsParserStateObject s Debug.Assert(tokenLength == TdsEnums.SQL_USHORTVARMAXLEN, "Invalid token stream for xml datatype"); return TdsOperationStatus.Done; } + else if (token == TdsEnums.SQLJSON) + { + tokenLength = -1; + return TdsOperationStatus.Done; + } } switch (token & TdsEnums.SQLLenMask) @@ -11589,6 +11597,9 @@ internal void WriteBulkCopyMetaData(_SqlMetaDataSet metadataCollection, int coun stateObj.WriteByte(md.tdsType); stateObj.WriteByte(md.scale); break; + case SqlDbTypeExtensions.Json: + stateObj.WriteByteArray(s_jsonMetadataSubstituteSequence, s_xmlMetadataSubstituteSequence.Length, 0); + break; default: stateObj.WriteByte(md.tdsType); WriteTokenLength(md.tdsType, md.length, stateObj); @@ -11842,6 +11853,11 @@ internal Task WriteBulkCopyValue(object value, SqlMetaDataPriv metadata, TdsPars } ccb = ((isSqlType) ? ((SqlString)value).Value.Length : ((string)value).Length) * 2; break; + case TdsEnums.SQLJSON: + string strval = (isSqlType) ? ((SqlString)value).Value : (string)value; + ccb = isSqlType ? strval.Length : strval.Length * 2; + ccbStringBytes = Encoding.UTF8.GetByteCount(strval); + break; default: ccb = metadata.length; @@ -11852,9 +11868,9 @@ internal Task WriteBulkCopyValue(object value, SqlMetaDataPriv metadata, TdsPars { Debug.Assert(metatype.IsLong && ((metatype.SqlDbType == SqlDbType.VarBinary && value is StreamDataFeed) || - ((metatype.SqlDbType == SqlDbType.VarChar || metatype.SqlDbType == SqlDbType.NVarChar) && value is TextDataFeed) || + ((metatype.SqlDbType == SqlDbType.VarChar || metatype.SqlDbType == SqlDbType.NVarChar || metatype.SqlDbType == SqlDbTypeExtensions.Json) && value is TextDataFeed) || (metatype.SqlDbType == SqlDbType.Xml && value is XmlDataFeed)), - "Stream data feed should only be assigned to VarBinary(max), Text data feed should only be assigned to [N]VarChar(max), Xml data feed should only be assigned to XML(max)"); + "Stream data feed should only be assigned to VarBinary(max), Text data feed should only be assigned to [N]VarChar(max) or json, Xml data feed should only be assigned to XML(max)"); } @@ -11876,6 +11892,7 @@ internal Task WriteBulkCopyValue(object value, SqlMetaDataPriv metadata, TdsPars case SqlDbType.VarBinary: case SqlDbType.Xml: case SqlDbType.Udt: + case SqlDbTypeExtensions.Json: // plp data WriteUnsignedLong(TdsEnums.SQL_PLP_UNKNOWNLEN, stateObj); break; @@ -12825,7 +12842,7 @@ private async Task WriteXmlFeed(XmlDataFeed feed, TdsParserStateObject stateObj, } } - private async Task WriteTextFeed(TextDataFeed feed, Encoding encoding, bool needBom, TdsParserStateObject stateObj, int size) + private async Task WriteTextFeed(TextDataFeed feed, Encoding encoding, bool needBom, TdsParserStateObject stateObj, int size, bool useReadBlock) { Debug.Assert(encoding == null || !needBom); char[] inBuff = new char[constTextBufferSize]; @@ -12852,11 +12869,25 @@ private async Task WriteTextFeed(TextDataFeed feed, Encoding encoding, bool need if (_asyncWrite) { - nRead = await feed._source.ReadBlockAsync(inBuff, 0, constTextBufferSize).ConfigureAwait(false); + if (useReadBlock) + { + nRead = await feed._source.ReadBlockAsync(inBuff, 0, constTextBufferSize).ConfigureAwait(false); + } + else + { + nRead = await feed._source.ReadAsync(inBuff, 0, constTextBufferSize).ConfigureAwait(false); + } } else { - nRead = feed._source.ReadBlock(inBuff, 0, constTextBufferSize); + if (useReadBlock) + { + nRead = feed._source.ReadBlock(inBuff, 0, constTextBufferSize); + } + else + { + nRead = feed._source.Read(inBuff, 0, constTextBufferSize); + } } if (nRead == 0) @@ -13052,7 +13083,7 @@ private Task WriteUnterminatedValue(object value, MetaType type, byte scale, int } else { - return NullIfCompletedWriteTask(WriteTextFeed(tdf, _defaultEncoding, false, stateObj, paramSize)); + return NullIfCompletedWriteTask(WriteTextFeed(tdf, _defaultEncoding, false, stateObj, paramSize, true)); } } else @@ -13091,7 +13122,8 @@ private Task WriteUnterminatedValue(object value, MetaType type, byte scale, int else { Encoding encoding = type.NullableType == TdsEnums.SQLJSON ? new UTF8Encoding() : null; - return NullIfCompletedWriteTask(WriteTextFeed(tdf, encoding, IsBOMNeeded(type, value), stateObj, paramSize)); + bool useReadBlock = type.NullableType == TdsEnums.SQLJSON ? false : true; + return NullIfCompletedWriteTask(WriteTextFeed(tdf, encoding, IsBOMNeeded(type, value), stateObj, paramSize, useReadBlock)); } } else diff --git a/src/Microsoft.Data.SqlClient/tests/ManualTests/Microsoft.Data.SqlClient.ManualTesting.Tests.csproj b/src/Microsoft.Data.SqlClient/tests/ManualTests/Microsoft.Data.SqlClient.ManualTesting.Tests.csproj index 35a6b80fcd..27d8d6643b 100644 --- a/src/Microsoft.Data.SqlClient/tests/ManualTests/Microsoft.Data.SqlClient.ManualTesting.Tests.csproj +++ b/src/Microsoft.Data.SqlClient/tests/ManualTests/Microsoft.Data.SqlClient.ManualTesting.Tests.csproj @@ -294,6 +294,7 @@ + diff --git a/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/JsonTest/JsonBulkCopyTest.cs b/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/JsonTest/JsonBulkCopyTest.cs new file mode 100644 index 0000000000..02ca1ef172 --- /dev/null +++ b/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/JsonTest/JsonBulkCopyTest.cs @@ -0,0 +1,287 @@ +using System; +using System.Collections.Generic; +using System.Data; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Newtonsoft.Json.Linq; +using Newtonsoft.Json; +using Xunit.Abstractions; +using Xunit; + +namespace Microsoft.Data.SqlClient.ManualTesting.Tests.SQL.JsonTest +{ + public class JsonBulkCopyTest + { + private readonly ITestOutputHelper _output; + private static readonly string _jsonFile = "randomRecords.json"; + private static readonly string _outputFile = "serverRecords.json"; + private static readonly bool _isTestEnabled = DataTestUtility.IsJsonSupported; + + public JsonBulkCopyTest(ITestOutputHelper output) + { + _output = output; + } + + private void PopulateData(int noOfRecords) + { + using (SqlConnection connection = new SqlConnection(DataTestUtility.TCPConnectionString)) + { + DataTestUtility.CreateTable(connection, "jsonTab", "(data json)"); + DataTestUtility.CreateTable(connection, "jsonTabCopy", "(data json)"); + GenerateJsonFile(50000, _jsonFile); + StreamJsonFileToServer(connection); + } + } + + private void GenerateJsonFile(int noOfRecords, string filename) + { + DeleteFile(filename); + var random = new Random(); + var records = new List(); + int recordCount = noOfRecords; + + for (int i = 0; i < recordCount; i++) + { + records.Add(new JsonRecord + { + Id = i + 1, + //Inclusion of 𩸽 and क is intentional to include 4byte and 3 byte UTF8character + Name = "𩸽jsonक" + random.Next(1, noOfRecords), + }); + } + + string json = JsonConvert.SerializeObject(records, Formatting.Indented); + File.WriteAllText(filename, json); + Assert.True(File.Exists(filename)); + _output.WriteLine("Generated JSON file " + filename); + } + + private void CompareJsonFiles() + { + using (var stream1 = File.OpenText(_jsonFile)) + using (var stream2 = File.OpenText(_outputFile)) + using (var reader1 = new JsonTextReader(stream1)) + using (var reader2 = new JsonTextReader(stream2)) + { + var jToken1 = JToken.ReadFrom(reader1); + var jToken2 = JToken.ReadFrom(reader2); + Assert.True(JToken.DeepEquals(jToken1, jToken2)); + } + } + + private void PrintJsonDataToFile(SqlConnection connection) + { + DeleteFile(_outputFile); + using (SqlCommand command = new SqlCommand("SELECT [data] FROM [jsonTabCopy]", connection)) + { + using (SqlDataReader reader = command.ExecuteReader(CommandBehavior.SequentialAccess)) + { + using (StreamWriter sw = new StreamWriter(_outputFile)) + { + while (reader.Read()) + { + char[] buffer = new char[4096]; + int charsRead = 0; + + using (TextReader data = reader.GetTextReader(0)) + { + do + { + charsRead = data.Read(buffer, 0, buffer.Length); + sw.Write(buffer, 0, charsRead); + + } while (charsRead > 0); + } + _output.WriteLine("Output written to " + _outputFile); + } + } + } + } + } + + private async Task PrintJsonDataToFileAsync(SqlConnection connection) + { + DeleteFile(_outputFile); + using (SqlCommand command = new SqlCommand("SELECT [data] FROM [jsonTab]", connection)) + { + using (SqlDataReader reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess)) + { + using (StreamWriter sw = new StreamWriter(_outputFile)) + { + while (await reader.ReadAsync()) + { + char[] buffer = new char[4096]; + int charsRead = 0; + + using (TextReader data = reader.GetTextReader(0)) + { + do + { + charsRead = await data.ReadAsync(buffer, 0, buffer.Length); + await sw.WriteAsync(buffer, 0, charsRead); + + } while (charsRead > 0); + } + _output.WriteLine("Output written to file " + _outputFile); + } + } + } + } + } + + private void StreamJsonFileToServer(SqlConnection connection) + { + using (SqlCommand cmd = new SqlCommand("INSERT INTO [jsonTab] (data) VALUES (@jsondata)", connection)) + { + using (StreamReader jsonFile = File.OpenText(_jsonFile)) + { + cmd.Parameters.Add("@jsondata", Microsoft.Data.SqlDbTypeExtensions.Json, -1).Value = jsonFile; + cmd.ExecuteNonQuery(); + } + } + } + + private async Task StreamJsonFileToServerAsync(SqlConnection connection) + { + using (SqlCommand cmd = new SqlCommand("INSERT INTO [jsonTab] (data) VALUES (@jsondata)", connection)) + { + using (StreamReader jsonFile = File.OpenText(_jsonFile)) + { + cmd.Parameters.Add("@jsondata", Microsoft.Data.SqlDbTypeExtensions.Json, -1).Value = jsonFile; + await cmd.ExecuteNonQueryAsync(); + } + } + } + + private void DeleteFile(string filename) + { + if (File.Exists(filename)) + { + File.Delete(filename); + } + } + + private void BulkCopyData(CommandBehavior cb, bool enableStraming) + { + using (SqlConnection sourceConnection = new SqlConnection(DataTestUtility.TCPConnectionString)) + { + sourceConnection.Open(); + SqlCommand commandRowCount = new SqlCommand("SELECT COUNT(*) FROM " + "dbo.jsonTabCopy;", sourceConnection); + long countStart = System.Convert.ToInt32(commandRowCount.ExecuteScalar()); + _output.WriteLine("Starting row count = {0}", countStart); + SqlCommand commandSourceData = new SqlCommand("SELECT data FROM dbo.jsonTab;", sourceConnection); + SqlDataReader reader = commandSourceData.ExecuteReader(cb); + using (SqlConnection destinationConnection = new SqlConnection(DataTestUtility.TCPConnectionString)) + { + destinationConnection.Open(); + using (SqlBulkCopy bulkCopy = new SqlBulkCopy(destinationConnection)) + { + bulkCopy.EnableStreaming = enableStraming; + bulkCopy.DestinationTableName = "dbo.jsonTabCopy"; + try + { + bulkCopy.WriteToServer(reader); + } + catch (Exception ex) + { + Assert.Fail(ex.Message); + } + finally + { + reader.Close(); + } + } + long countEnd = System.Convert.ToInt32(commandRowCount.ExecuteScalar()); + _output.WriteLine("Ending row count = {0}", countEnd); + _output.WriteLine("{0} rows were added.", countEnd - countStart); + } + } + } + + private async Task BulkCopyDataAsync(CommandBehavior cb, bool enableStraming) + { + using (SqlConnection sourceConnection = new SqlConnection(DataTestUtility.TCPConnectionString)) + { + await sourceConnection.OpenAsync(); + SqlCommand commandRowCount = new SqlCommand("SELECT COUNT(*) FROM " + "dbo.jsonTabCopy;", sourceConnection); + long countStart = System.Convert.ToInt32(await commandRowCount.ExecuteScalarAsync()); + _output.WriteLine("Starting row count = {0}", countStart); + SqlCommand commandSourceData = new SqlCommand("SELECT data FROM dbo.jsonTab;", sourceConnection); + SqlDataReader reader = await commandSourceData.ExecuteReaderAsync(cb); + using (SqlConnection destinationConnection = new SqlConnection(DataTestUtility.TCPConnectionString)) + { + await destinationConnection.OpenAsync(); + using (SqlBulkCopy bulkCopy = new SqlBulkCopy(destinationConnection)) + { + bulkCopy.EnableStreaming = enableStraming; + bulkCopy.DestinationTableName = "dbo.jsonTabCopy"; + try + { + await bulkCopy.WriteToServerAsync(reader); + } + catch (Exception ex) + { + Assert.Fail(ex.Message); + } + finally + { + reader.Close(); + } + } + long countEnd = System.Convert.ToInt32(await commandRowCount.ExecuteScalarAsync()); + _output.WriteLine("Ending row count = {0}", countEnd); + _output.WriteLine("{0} rows were added.", countEnd - countStart); + } + } + } + + [Theory] + [InlineData(CommandBehavior.Default, false)] + [InlineData(CommandBehavior.Default, true)] + [InlineData(CommandBehavior.SequentialAccess, false)] + [InlineData(CommandBehavior.SequentialAccess, true)] + public void TestJsonBulkCopy(CommandBehavior cb, bool enableStraming) + { + if (!_isTestEnabled) + { + return; + } + + PopulateData(10000); + using (SqlConnection connection = new SqlConnection(DataTestUtility.TCPConnectionString)) + { + BulkCopyData(cb, enableStraming); + connection.Open(); + PrintJsonDataToFile(connection); + CompareJsonFiles(); + DeleteFile(_jsonFile); + DeleteFile(_outputFile); + } + } + + [Theory] + [InlineData(CommandBehavior.Default, false)] + [InlineData(CommandBehavior.Default, true)] + [InlineData(CommandBehavior.SequentialAccess, false)] + [InlineData(CommandBehavior.SequentialAccess, true)] + public async Task TestJsonBulkCopyAsync(CommandBehavior cb, bool enableStraming) + { + if (!_isTestEnabled) + { + return; + } + PopulateData(10000); + using (SqlConnection connection = new SqlConnection(DataTestUtility.TCPConnectionString)) + { + await BulkCopyDataAsync(cb, enableStraming); + await connection.OpenAsync(); + await PrintJsonDataToFileAsync(connection); + CompareJsonFiles(); + DeleteFile(_jsonFile); + DeleteFile(_outputFile); + } + } + } +}