Skip to content

Commit

Permalink
bulk: Added experimental support for BULK INSERT. No docs yet.
Browse files Browse the repository at this point in the history
  • Loading branch information
bretcope committed Aug 12, 2014
1 parent 6f446eb commit eb8ae04
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 0 deletions.
139 changes: 139 additions & 0 deletions src/bulk-load.coffee
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
EventEmitter = require('events').EventEmitter
WritableTrackingBuffer = require('./tracking-buffer/tracking-buffer').WritableTrackingBuffer;

TOKEN_TYPE = require('./token/token').TYPE;

FLAGS =
nullable: 1 << 0
caseSen: 1 << 1
updateableReadWrite: 1 << 2
updateableUnknown: 1 << 3
identity: 1 << 4
computed: 1 << 5 # introduced in TDS 7.2
fixedLenCLRType: 1 << 8 # introduced in TDS 7.2
sparseColumnSet: 1 << 10 # introduced in TDS 7.3.B
hidden: 1 << 13 # introduced in TDS 7.2
key: 1 << 14 # introduced in TDS 7.2
nullableUnknown: 1 << 15 # introduced in TDS 7.2

DONE_STATUS =
FINAL: 0x00
MORE: 0x1
ERROR: 0x2
INXACT: 0x4
COUNT: 0x10
ATTN: 0x20
SRVERROR: 0x100

class BulkLoad extends EventEmitter
error: null
canceled: false

constructor: (@table, @callback) ->
@columns = []
@columnsByName = {}
@rowsData = new WritableTrackingBuffer(100) # todo: we should size the buffer better for performance

addColumn: (name, type, options = {}) ->
column =
type: type
name: name
value: null
output: options.output ||= false
length: options.length
precision: options.precision
scale: options.scale
objName: options.objName || name
nullable: options.nullable

@columns.push(column)
@columnsByName[name] = column

addRow: (row) ->
if arguments.length > 1 || !row || typeof row != 'object'
# convert arguments to array in a way the optimizer can handle
arr = new Array(arguments.length);
for c, i in arguments
arr[i] = c

# write row token
@rowsData.writeUInt8(TOKEN_TYPE.ROW)

# write each column
arr = row instanceof Array
for c, i in @columns
c.value = row[if arr then i else c.objName]
c.type.writeParameterData(@rowsData, c)

getSql: () ->
sql = 'insert bulk ' + @table + '('
for c, i in @columns
if i != 0
sql += ', '
sql += "[#{c.name}] #{c.type.declaration(c)}"
# todo: include precision, length, and collation as necessary
sql += ')'
return sql

getPayload: (tdsVersion) ->
# Create COLMETADATA token
metaData = @getColMetaData(tdsVersion)
length = metaData.length

# row data
rows = @rowsData.data
length += rows.length

# Create DONE token
# It might be nice to make DoneToken a class if anything needs to create them, but for now, just do it here
tBuf = new WritableTrackingBuffer(if tdsVersion < "7_2" then 9 else 13)
tBuf.writeUInt8(TOKEN_TYPE.DONE)
status = DONE_STATUS.FINAL
tBuf.writeUInt16LE(status)
tBuf.writeUInt16LE(0) # CurCmd (TDS ignores this)
tBuf.writeUInt32LE(0) # row count - doesn't really matter
if tdsVersion >= "7_2"
tBuf.writeUInt32LE(0) # row count is 64 bits in >= TDS 7.2

done = tBuf.data
length += done.length

# composite payload
payload = new WritableTrackingBuffer(length)
payload.copyFrom(metaData)
payload.copyFrom(rows)
payload.copyFrom(done)

return payload

getColMetaData: (tdsVersion) ->
tBuf = new WritableTrackingBuffer(100) # todo: take a good guess at a correct buffer size
# TokenType
tBuf.writeUInt8(TOKEN_TYPE.COLMETADATA)
# Count
tBuf.writeUInt16LE(@columns.length)

for c in @columns
# UserType
if tdsVersion < "7_2"
tBuf.writeUInt16LE(0)
else
tBuf.writeUInt32LE(0)

# Flags
flags = FLAGS.updateableReadWrite
if c.nullable
flags |= FLAGS.nullable
else if c.nullable == undefined && tdsVersion >= "7_2"
flags |= FLAGS.nullableUnknown # this seems prudent to set, not sure if there are performance consequences
tBuf.writeUInt16LE(flags)

# TYPE_INFO
c.type.writeTypeInfo(tBuf, c)

# ColName
tBuf.writeBVarchar(c.name, 'ucs2')

return tBuf.data

module.exports = BulkLoad
11 changes: 11 additions & 0 deletions src/connection.coffee
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require('./buffertools')
BulkLoad = require('./bulk-load')
Debug = require('./debug')
EventEmitter = require('events').EventEmitter
instanceLookup = require('./instance-lookup').instanceLookup
Expand Down Expand Up @@ -643,6 +644,16 @@ set transaction isolation level #{@getIsolationLevelText @config.options.connect
execSql: (request) ->
request.transformIntoExecuteSqlRpc()
@makeRequest(request, TYPE.RPC_REQUEST, new RpcRequestPayload(request, @currentTransactionDescriptor(), @config.options))

execBulkLoad: (bulkLoad) ->
request = new Request(bulkLoad.getSql(), (error) =>
if error
bulkLoad.error = error
bulkLoad.callback(error)
else
@makeRequest(bulkLoad, TYPE.BULK_LOAD, bulkLoad.getPayload(@config.options.tdsVersion))
)
@execSqlBatch(request)

prepare: (request) ->
request.transformIntoPrepareRpc()
Expand Down
1 change: 1 addition & 0 deletions src/packet.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ TYPE =
RPC_REQUEST: 0x03
TABULAR_RESULT: 0x04
ATTENTION: 0x06
BULK_LOAD: 0x07
TRANSACTION_MANAGER: 0x0E
LOGIN7: 0x10
NTLMAUTH_PKT: 0x11
Expand Down
1 change: 1 addition & 0 deletions src/tedious.coffee
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
exports.statemachineLogLevel = 0

exports.BulkLoad = require('./bulk-load')
exports.Connection = require('./connection')
exports.Request = require('./request')
exports.library = require('./library')
Expand Down

1 comment on commit eb8ae04

@bretcope
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's still some stuff on the TODO list (cleanup, tests, docs), but we have an immediate need for this, so I added it.

Basic usage example:

var bulk = new tedious.BulkLoad('TableName', function (error, rowCount) {
  console.log('row count: ' + rowCount);
});

// must define all of the columns before adding rows
// it's also important to include length/precision as necessary
// and always indicate whether the column is nullable
bulk.addColumn('column1', TYPES.Int, { nullable: false });
bulk.addColumn('column2', TYPES.NVarChar, { length: 50, nullable: true });

// add as many rows as desired
// I haven't tested enough to cause a split over multiple packets yet, but we will soon
// you can add rows using an object, an array, or multiple arguments
bulk.addRow({ column1: 101, column2: "one zero one" });
bulk.addRow(102, "one zero two");
bulk.addRow([ 103, "one zero three" ]);

// execute using an active connection object
conn.execBulkLoad(bulk);

Outputs

row count: 3

Please sign in to comment.