Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Microsoft.Data.Sqlite: Delay statement execution until NextResult #15001

Merged
merged 1 commit into from
Mar 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 43 additions & 82 deletions src/Microsoft.Data.Sqlite.Core/SqliteCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class SqliteCommand : DbCommand
private readonly List<sqlite3_stmt> _preparedStatements = new List<sqlite3_stmt>();
private SqliteConnection _connection;
private string _commandText;
private bool _prepared;

/// <summary>
/// Initializes a new instance of the <see cref="SqliteCommand" /> class.
Expand Down Expand Up @@ -238,28 +239,19 @@ public override void Prepare()
throw new InvalidOperationException(Resources.CallRequiresSetCommandText(nameof(Prepare)));
}

if (_preparedStatements.Count != 0)
if (_prepared)
{
return;
}

var timer = Stopwatch.StartNew();
var timer = new Stopwatch();

try
using (var enumerator = PrepareAndEnumerateStatements(timer).GetEnumerator())
{
using (var enumerator = PrepareAndEnumerateStatements(timer).GetEnumerator())
while (enumerator.MoveNext())
{
while (enumerator.MoveNext())
{
}
}
}
catch
{
DisposePreparedStatements();

throw;
}
}

/// <summary>
Expand Down Expand Up @@ -306,88 +298,48 @@ public override void Prepare()
throw new InvalidOperationException(Resources.TransactionCompleted);
}

var hasChanges = false;
var changes = 0;
int rc;
var stmts = new Queue<(sqlite3_stmt, bool)>();
var unprepared = _preparedStatements.Count == 0;
var timer = Stopwatch.StartNew();

try
{
foreach (var stmt in unprepared
? PrepareAndEnumerateStatements(timer)
: _preparedStatements)
{
var boundParams = 0;
var timer = new Stopwatch();
var closeConnection = behavior.HasFlag(CommandBehavior.CloseConnection);

if (_parameters.IsValueCreated)
{
boundParams = _parameters.Value.Bind(stmt);
}
DataReader = new SqliteDataReader(this, timer, GetStatements(timer), closeConnection);
DataReader.NextResult();

var expectedParams = sqlite3_bind_parameter_count(stmt);
if (expectedParams != boundParams)
{
var unboundParams = new List<string>();
for (var i = 1; i <= expectedParams; i++)
{
var name = sqlite3_bind_parameter_name(stmt, i);
return DataReader = DataReader;
}

if (_parameters.IsValueCreated
&& !_parameters.Value.Cast<SqliteParameter>().Any(p => p.ParameterName == name))
{
unboundParams.Add(name);
}
}
private IEnumerable<sqlite3_stmt> GetStatements(Stopwatch timer)
{
foreach (var stmt in !_prepared
? PrepareAndEnumerateStatements(timer)
: _preparedStatements)
{
var boundParams = 0;

throw new InvalidOperationException(Resources.MissingParameters(string.Join(", ", unboundParams)));
}
if (_parameters.IsValueCreated)
{
boundParams = _parameters.Value.Bind(stmt);
}

while (IsBusy(rc = sqlite3_step(stmt)))
var expectedParams = sqlite3_bind_parameter_count(stmt);
if (expectedParams != boundParams)
{
var unboundParams = new List<string>();
for (var i = 1; i <= expectedParams; i++)
{
if (timer.ElapsedMilliseconds >= CommandTimeout * 1000L)
{
break;
}

sqlite3_reset(stmt);

// TODO: Consider having an async path that uses Task.Delay()
Thread.Sleep(150);
}

SqliteException.ThrowExceptionForRC(rc, _connection.Handle);
var name = sqlite3_bind_parameter_name(stmt, i);

// It's a SELECT statement
if (sqlite3_column_count(stmt) != 0)
{
stmts.Enqueue((stmt, rc != SQLITE_DONE));
}
else
{
while (rc != SQLITE_DONE)
if (_parameters.IsValueCreated
&& !_parameters.Value.Cast<SqliteParameter>().Any(p => p.ParameterName == name))
{
rc = sqlite3_step(stmt);
SqliteException.ThrowExceptionForRC(rc, _connection.Handle);
unboundParams.Add(name);
}

sqlite3_reset(stmt);
hasChanges = true;
changes += sqlite3_changes(_connection.Handle);
}

throw new InvalidOperationException(Resources.MissingParameters(string.Join(", ", unboundParams)));
}
}
catch when (unprepared)
{
DisposePreparedStatements();

throw;
yield return stmt;
}

var closeConnection = behavior.HasFlag(CommandBehavior.CloseConnection);

return DataReader = new SqliteDataReader(this, stmts, hasChanges ? changes : -1, closeConnection);
}

/// <summary>
Expand Down Expand Up @@ -520,11 +472,15 @@ public override void Cancel()

private IEnumerable<sqlite3_stmt> PrepareAndEnumerateStatements(Stopwatch timer)
{
DisposePreparedStatements(disposing: false);

int rc;
sqlite3_stmt stmt;
var tail = _commandText;
do
{
timer.Start();

string nextTail;
while (IsBusy(rc = sqlite3_prepare_v2(_connection.Handle, tail, out stmt, out nextTail)))
{
Expand All @@ -536,6 +492,7 @@ private IEnumerable<sqlite3_stmt> PrepareAndEnumerateStatements(Stopwatch timer)
Thread.Sleep(150);
}

timer.Stop();
tail = nextTail;

SqliteException.ThrowExceptionForRC(rc, _connection.Handle);
Expand All @@ -556,6 +513,8 @@ private IEnumerable<sqlite3_stmt> PrepareAndEnumerateStatements(Stopwatch timer)
yield return stmt;
}
while (!string.IsNullOrEmpty(tail));

_prepared = true;
}

private void DisposePreparedStatements(bool disposing = true)
Expand All @@ -576,6 +535,8 @@ private void DisposePreparedStatements(bool disposing = true)

_preparedStatements.Clear();
}

_prepared = false;
}

private static bool IsBusy(int rc)
Expand Down
129 changes: 104 additions & 25 deletions src/Microsoft.Data.Sqlite.Core/SqliteDataReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
using System.Collections.Generic;
using System.Data;
using System.Data.Common;
using System.Diagnostics;
using System.IO;
using System.Text;
using System.Threading;
using Microsoft.Data.Sqlite.Properties;
using SQLitePCL;

Expand All @@ -22,25 +24,21 @@ public class SqliteDataReader : DbDataReader
{
private readonly SqliteCommand _command;
private readonly bool _closeConnection;
private readonly Queue<(sqlite3_stmt stmt, bool)> _stmtQueue;
private readonly Stopwatch _timer;
private IEnumerator<sqlite3_stmt> _stmtEnumerator;
private SqliteDataRecord _record;
private bool _closed;
private int _recordsAffected = -1;

internal SqliteDataReader(
SqliteCommand command,
Queue<(sqlite3_stmt, bool)> stmtQueue,
int recordsAffected,
Stopwatch timer,
IEnumerable<sqlite3_stmt> stmts,
bool closeConnection)
{
if (stmtQueue.Count != 0)
{
(var stmt, var hasRows) = stmtQueue.Dequeue();
_record = new SqliteDataRecord(stmt, hasRows, command.Connection);
}

_command = command;
_stmtQueue = stmtQueue;
RecordsAffected = recordsAffected;
_timer = timer;
_stmtEnumerator = stmts.GetEnumerator();
_closeConnection = closeConnection;
}

Expand Down Expand Up @@ -86,7 +84,8 @@ public override bool IsClosed
/// Gets the number of rows inserted, updated, or deleted. -1 for SELECT statements.
/// </summary>
/// <value>The number of rows inserted, updated, or deleted.</value>
public override int RecordsAffected { get; }
public override int RecordsAffected
=> _recordsAffected;

/// <summary>
/// Gets the value of the specified column.
Expand Down Expand Up @@ -126,19 +125,92 @@ public override bool Read()
/// <returns>true if there are more result sets; otherwise, false.</returns>
public override bool NextResult()
{
if (_stmtQueue.Count == 0)
if (_closed)
{
return false;
throw new InvalidOperationException(Resources.DataReaderClosed(nameof(NextResult)));
}

_record.Dispose();
if (_record != null)
{
_record.Dispose();
_record = null;
}

sqlite3_stmt stmt;
int rc;

while (_stmtEnumerator.MoveNext())
{
try
{
stmt = _stmtEnumerator.Current;

_timer.Start();

while (IsBusy(rc = sqlite3_step(stmt)))
{
if (_timer.ElapsedMilliseconds >= _command.CommandTimeout * 1000L)
{
break;
}

sqlite3_reset(stmt);

// TODO: Consider having an async path that uses Task.Delay()
Thread.Sleep(150);
}

_timer.Stop();

SqliteException.ThrowExceptionForRC(rc, _command.Connection.Handle);

// It's a SELECT statement
if (sqlite3_column_count(stmt) != 0)
{
_record = new SqliteDataRecord(stmt, rc != SQLITE_DONE, _command.Connection);

(var stmt, var hasRows) = _stmtQueue.Dequeue();
_record = new SqliteDataRecord(stmt, hasRows, _command.Connection);
return true;
}
else
{
while (rc != SQLITE_DONE)
{
rc = sqlite3_step(stmt);
SqliteException.ThrowExceptionForRC(rc, _command.Connection.Handle);
}

sqlite3_reset(stmt);

var changes = sqlite3_changes(_command.Connection.Handle);
if (_recordsAffected == -1)
{
_recordsAffected = changes;
}
else
{
_recordsAffected += changes;
}
}
}
catch
{
sqlite3_reset(_stmtEnumerator.Current);
_stmtEnumerator.Dispose();
_stmtEnumerator = null;
Dispose();

return true;
throw;
}
}

return false;
}

private static bool IsBusy(int rc)
=> rc == SQLITE_LOCKED
|| rc == SQLITE_BUSY
|| rc == SQLITE_LOCKED_SHAREDCACHE;

/// <summary>
/// Closes the data reader.
/// </summary>
Expand All @@ -160,17 +232,24 @@ protected override void Dispose(bool disposing)

_command.DataReader = null;

if (_record != null)
{
_record.Dispose();
_record = null;
}
_record?.Dispose();

while (_stmtQueue.Count != 0)
if (_stmtEnumerator != null)
{
sqlite3_reset(_stmtQueue.Dequeue().stmt);
try
{
while (NextResult())
{
_record.Dispose();
}
}
catch
{
}
}

_stmtEnumerator?.Dispose();

_closed = true;

if (_closeConnection)
Expand Down
Loading