Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SqlBulkCopy support for JSON datatype #2916

Merged
merged 2 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,10 @@ private string AnalyzeTargetAndCreateUpdateBulkCommand(BulkCopySimpleResultSet i
{
AppendColumnNameAndTypeName(updateBulkCommandText, metadata.column, "varbinary");
}
else if (metadata.type == SqlDbTypeExtensions.Json)
{
AppendColumnNameAndTypeName(updateBulkCommandText, metadata.column, "json");
}
else
{
AppendColumnNameAndTypeName(updateBulkCommandText, metadata.column, metadata.type.ToString());
Expand Down Expand Up @@ -645,7 +649,7 @@ private string AnalyzeTargetAndCreateUpdateBulkCommand(BulkCopySimpleResultSet i
}
updateBulkCommandText.AppendFormat((IFormatProvider)null, "({0})", size);
}
else if (metadata.metaType.IsPlp && metadata.metaType.SqlDbType != SqlDbType.Xml)
else if (metadata.metaType.IsPlp && metadata.metaType.SqlDbType != SqlDbType.Xml && metadata.metaType.SqlDbType != SqlDbTypeExtensions.Json)
{
// Partial length column prefix (max)
updateBulkCommandText.Append("(max)");
Expand Down Expand Up @@ -1213,7 +1217,7 @@ private SourceColumnMetadata GetColumnMetadata(int ordinal)
}
}
// Check for data streams
else if ((_enableStreaming) && (metadata.length == MAX_LENGTH))
else if ((_enableStreaming) && ((metadata.length == MAX_LENGTH) || metadata.type == SqlDbTypeExtensions.Json))
{
isSqlType = false;

Expand All @@ -1229,7 +1233,7 @@ private SourceColumnMetadata GetColumnMetadata(int ordinal)
method = ValueMethod.DataFeedStream;
}
// For text and XML there is memory gain from streaming on destination side even if reader is non-sequential
else if (((metadata.type == SqlDbType.VarChar) || (metadata.type == SqlDbType.NVarChar)) && (mtSource.IsCharType) && (mtSource.SqlDbType != SqlDbType.Xml))
else if (((metadata.type == SqlDbType.VarChar) || (metadata.type == SqlDbType.NVarChar || metadata.type == SqlDbTypeExtensions.Json)) && (mtSource.IsCharType) && (mtSource.SqlDbType != SqlDbType.Xml))
{
isDataFeed = true;
method = ValueMethod.DataFeedText;
Expand Down Expand Up @@ -1590,6 +1594,7 @@ private object ConvertValue(object value, _SqlMetaData metadata, bool isNull, re
}
break;
case TdsEnums.SQLXMLTYPE:
case TdsEnums.SQLJSON:
// Could be either string, SqlCachedBuffer, XmlReader or XmlDataFeed
Debug.Assert((value is XmlReader) || (value is SqlCachedBuffer) || (value is string) || (value is SqlString) || (value is XmlDataFeed), "Invalid value type of Xml datatype");
if (value is XmlReader)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ internal void Stop()
// XML metadata substitute sequence
private static readonly byte[] s_xmlMetadataSubstituteSequence = { 0xe7, 0xff, 0xff, 0x00, 0x00, 0x00, 0x00, 0x00 };

// JSON metadata substitute sequence
private static readonly byte[] s_jsonMetadataSubstituteSequence = { 0xa7, 0xff, 0xff, 0x00, 0x00, 0x00, 0x00, 0x00 };

// size of Guid (e.g. _clientConnectionId, ActivityId.Id)
private const int GUID_SIZE = 16;

Expand Down Expand Up @@ -8072,6 +8075,11 @@ internal TdsOperationStatus TryGetTokenLength(byte token, TdsParserStateObject s
Debug.Assert(tokenLength == TdsEnums.SQL_USHORTVARMAXLEN, "Invalid token stream for xml datatype");
return TdsOperationStatus.Done;
}
else if (token == TdsEnums.SQLJSON)
{
tokenLength = -1;
return TdsOperationStatus.Done;
}
}

switch (token & TdsEnums.SQLLenMask)
Expand Down Expand Up @@ -10788,6 +10796,9 @@ internal void WriteBulkCopyMetaData(_SqlMetaDataSet metadataCollection, int coun
stateObj.WriteByte(md.tdsType);
stateObj.WriteByte(md.scale);
break;
case SqlDbTypeExtensions.Json:
stateObj.WriteByteArray(s_jsonMetadataSubstituteSequence, s_jsonMetadataSubstituteSequence.Length, 0);
break;
default:
stateObj.WriteByte(md.tdsType);
WriteTokenLength(md.tdsType, md.length, stateObj);
Expand Down Expand Up @@ -11042,7 +11053,11 @@ internal Task WriteBulkCopyValue(object value, SqlMetaDataPriv metadata, TdsPars
}
ccb = ((isSqlType) ? ((SqlString)value).Value.Length : ((string)value).Length) * 2;
break;

case TdsEnums.SQLJSON:
string strval = (isSqlType) ? ((SqlString)value).Value : (string)value;
ccb = isSqlType ? strval.Length : strval.Length * 2;
ccbStringBytes = Encoding.UTF8.GetByteCount(strval);
break;
default:
ccb = metadata.length;
break;
Expand All @@ -11052,9 +11067,9 @@ internal Task WriteBulkCopyValue(object value, SqlMetaDataPriv metadata, TdsPars
{
Debug.Assert(metatype.IsLong &&
((metatype.SqlDbType == SqlDbType.VarBinary && value is StreamDataFeed) ||
((metatype.SqlDbType == SqlDbType.VarChar || metatype.SqlDbType == SqlDbType.NVarChar) && value is TextDataFeed) ||
((metatype.SqlDbType == SqlDbType.VarChar || metatype.SqlDbType == SqlDbType.NVarChar || metatype.SqlDbType == SqlDbTypeExtensions.Json) && value is TextDataFeed) ||
(metatype.SqlDbType == SqlDbType.Xml && value is XmlDataFeed)),
"Stream data feed should only be assigned to VarBinary(max), Text data feed should only be assigned to [N]VarChar(max), Xml data feed should only be assigned to XML(max)");
"Stream data feed should only be assigned to VarBinary(max), Text data feed should only be assigned to [N]VarChar(max) or json, Xml data feed should only be assigned to XML(max)");
}


Expand All @@ -11076,6 +11091,7 @@ internal Task WriteBulkCopyValue(object value, SqlMetaDataPriv metadata, TdsPars
case SqlDbType.VarBinary:
case SqlDbType.Xml:
case SqlDbType.Udt:
case SqlDbTypeExtensions.Json:
// plp data
WriteUnsignedLong(TdsEnums.SQL_PLP_UNKNOWNLEN, stateObj);
break;
Expand Down Expand Up @@ -11956,7 +11972,7 @@ private async Task WriteXmlFeed(XmlDataFeed feed, TdsParserStateObject stateObj,
}
}

private async Task WriteTextFeed(TextDataFeed feed, Encoding encoding, bool needBom, TdsParserStateObject stateObj, int size)
private async Task WriteTextFeed(TextDataFeed feed, Encoding encoding, bool needBom, TdsParserStateObject stateObj, int size, bool useReadBlock)
{
Debug.Assert(encoding == null || !needBom);
char[] inBuff = ArrayPool<char>.Shared.Rent(constTextBufferSize);
Expand Down Expand Up @@ -11984,11 +12000,25 @@ private async Task WriteTextFeed(TextDataFeed feed, Encoding encoding, bool need

if (_asyncWrite)
{
nRead = await feed._source.ReadBlockAsync(inBuff, 0, constTextBufferSize).ConfigureAwait(false);
if (useReadBlock)
{
nRead = await feed._source.ReadBlockAsync(inBuff, 0, constTextBufferSize).ConfigureAwait(false);
}
else
{
nRead = await feed._source.ReadAsync(inBuff, 0, constTextBufferSize).ConfigureAwait(false);
}
}
else
{
nRead = feed._source.ReadBlock(inBuff, 0, constTextBufferSize);
if (useReadBlock)
{
nRead = feed._source.ReadBlock(inBuff, 0, constTextBufferSize);
}
else
{
nRead = feed._source.Read(inBuff, 0, constTextBufferSize);
}
}

if (nRead == 0)
Expand Down Expand Up @@ -12189,7 +12219,7 @@ private Task WriteUnterminatedValue(object value, MetaType type, byte scale, int
}
else
{
return NullIfCompletedWriteTask(WriteTextFeed(tdf, _defaultEncoding, false, stateObj, paramSize));
return NullIfCompletedWriteTask(WriteTextFeed(tdf, _defaultEncoding, false, stateObj, paramSize, true));
}
}
else
Expand Down Expand Up @@ -12228,7 +12258,8 @@ private Task WriteUnterminatedValue(object value, MetaType type, byte scale, int
else
{
Encoding encoding = type.NullableType == TdsEnums.SQLJSON ? new UTF8Encoding() : null;
return NullIfCompletedWriteTask(WriteTextFeed(tdf, encoding, IsBOMNeeded(type, value), stateObj, paramSize));
bool useReadBlock = type.NullableType == TdsEnums.SQLJSON ? false : true;
return NullIfCompletedWriteTask(WriteTextFeed(tdf, encoding, IsBOMNeeded(type, value), stateObj, paramSize, useReadBlock));
}
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,10 @@ private string AnalyzeTargetAndCreateUpdateBulkCommand(BulkCopySimpleResultSet i
{
AppendColumnNameAndTypeName(updateBulkCommandText, metadata.column, "varbinary");
}
else if (metadata.type == SqlDbTypeExtensions.Json)
{
AppendColumnNameAndTypeName(updateBulkCommandText, metadata.column, "json");
}
else
{
AppendColumnNameAndTypeName(updateBulkCommandText, metadata.column, typeof(SqlDbType).GetEnumName(metadata.type));
Expand Down Expand Up @@ -680,7 +684,7 @@ private string AnalyzeTargetAndCreateUpdateBulkCommand(BulkCopySimpleResultSet i
}
updateBulkCommandText.AppendFormat((IFormatProvider)null, "({0})", size);
}
else if (metadata.metaType.IsPlp && metadata.metaType.SqlDbType != SqlDbType.Xml)
else if (metadata.metaType.IsPlp && metadata.metaType.SqlDbType != SqlDbType.Xml && metadata.metaType.SqlDbType != SqlDbTypeExtensions.Json)
{
// Partial length column prefix (max)
updateBulkCommandText.Append("(max)");
Expand Down Expand Up @@ -1254,7 +1258,7 @@ private SourceColumnMetadata GetColumnMetadata(int ordinal)
}
}
// Check for data streams
else if ((_enableStreaming) && (metadata.length == MAX_LENGTH) && (!_rowSourceIsSqlDataReaderSmi))
else if ((_enableStreaming) && ((metadata.length == MAX_LENGTH) || metadata.metaType.SqlDbType == SqlDbTypeExtensions.Json) && (!_rowSourceIsSqlDataReaderSmi))
{
isSqlType = false;

Expand All @@ -1270,7 +1274,7 @@ private SourceColumnMetadata GetColumnMetadata(int ordinal)
method = ValueMethod.DataFeedStream;
}
// For text and XML there is memory gain from streaming on destination side even if reader is non-sequential
else if (((metadata.type == SqlDbType.VarChar) || (metadata.type == SqlDbType.NVarChar)) && (mtSource.IsCharType) && (mtSource.SqlDbType != SqlDbType.Xml))
else if (((metadata.type == SqlDbType.VarChar) || (metadata.type == SqlDbType.NVarChar) || (metadata.type == SqlDbTypeExtensions.Json)) && (mtSource.IsCharType) && (mtSource.SqlDbType != SqlDbType.Xml))
{
isDataFeed = true;
method = ValueMethod.DataFeedText;
Expand Down Expand Up @@ -1640,6 +1644,7 @@ private object ConvertValue(object value, _SqlMetaData metadata, bool isNull, re
}
break;
case TdsEnums.SQLXMLTYPE:
case TdsEnums.SQLJSON:
// Could be either string, SqlCachedBuffer, XmlReader or XmlDataFeed
Debug.Assert((value is XmlReader) || (value is SqlCachedBuffer) || (value is string) || (value is SqlString) || (value is XmlDataFeed), "Invalid value type of Xml datatype");
if (value is XmlReader)
Expand Down
Loading
Loading