From 78656eb469ee178f1692cd6a8f95ccbd2be7cf18 Mon Sep 17 00:00:00 2001 From: Alexander Krotov Date: Tue, 6 Apr 2021 22:49:22 +0300 Subject: [PATCH] fix(sqlite): reset the statement when fetch_many() stream is dropped Unlike `Executor.fetch_optional()`, `Executor.fetch_many()` does not have a single exit. The stream can be dropped at any time. To catch this event, we create a `StatementResetter` structure inside the stream loop and reset the statement when it is dropped. A test case `it_resets_prepared_statement_after_fetch_many` is similar to `it_resets_prepared_statement_after_fetch_one` which tests `Executor.fetch_optional()`. --- sqlx-core/src/sqlite/connection/executor.rs | 24 +++++++++++++++++++++ sqlx-core/src/sqlite/statement/handle.rs | 9 ++++++-- tests/sqlite/sqlite.rs | 20 +++++++++++++++++ 3 files changed, 51 insertions(+), 2 deletions(-) diff --git a/sqlx-core/src/sqlite/connection/executor.rs b/sqlx-core/src/sqlite/connection/executor.rs index 1194f9c46b..6e09fdc27a 100644 --- a/sqlx-core/src/sqlite/connection/executor.rs +++ b/sqlx-core/src/sqlite/connection/executor.rs @@ -59,6 +59,24 @@ fn bind( Ok(n) } +/// A structure holding sqlite statement handle and resetting the +/// statement when it is dropped. +struct StatementResetter { + handle: StatementHandle, +} + +impl StatementResetter { + fn new(handle: StatementHandle) -> Self { + Self { handle } + } +} + +impl Drop for StatementResetter { + fn drop(&mut self) { + self.handle.reset(); + } +} + impl<'c> Executor<'c> for &'c mut SqliteConnection { type Database = Sqlite; @@ -91,6 +109,12 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection { let mut num_arguments = 0; while let Some((stmt, columns, column_names, last_row_values)) = stmt.prepare(conn)? { + // Prepare to reset raw SQLite statement when the handle + // is dropped. `StatementResetter` will reliably reset the + // statement even if the stream returned from `fetch_many` + // is dropped early. + let _resetter = StatementResetter::new(*stmt); + // bind values to the statement num_arguments += bind(stmt, &arguments, num_arguments)?; diff --git a/sqlx-core/src/sqlite/statement/handle.rs b/sqlx-core/src/sqlite/statement/handle.rs index 1a50423441..d1af117a7d 100644 --- a/sqlx-core/src/sqlite/statement/handle.rs +++ b/sqlx-core/src/sqlite/statement/handle.rs @@ -13,8 +13,9 @@ use libsqlite3_sys::{ sqlite3_column_count, sqlite3_column_database_name, sqlite3_column_decltype, sqlite3_column_double, sqlite3_column_int, sqlite3_column_int64, sqlite3_column_name, sqlite3_column_origin_name, sqlite3_column_table_name, sqlite3_column_type, - sqlite3_column_value, sqlite3_db_handle, sqlite3_sql, sqlite3_stmt, sqlite3_stmt_readonly, - sqlite3_table_column_metadata, sqlite3_value, SQLITE_OK, SQLITE_TRANSIENT, SQLITE_UTF8, + sqlite3_column_value, sqlite3_db_handle, sqlite3_reset, sqlite3_sql, sqlite3_stmt, + sqlite3_stmt_readonly, sqlite3_table_column_metadata, sqlite3_value, SQLITE_OK, + SQLITE_TRANSIENT, SQLITE_UTF8, }; use crate::error::{BoxDynError, Error}; @@ -278,4 +279,8 @@ impl StatementHandle { pub(crate) fn column_text(&self, index: usize) -> Result<&str, BoxDynError> { Ok(from_utf8(self.column_blob(index))?) } + + pub(crate) fn reset(&self) { + unsafe { sqlite3_reset(self.0.as_ptr()) }; + } } diff --git a/tests/sqlite/sqlite.rs b/tests/sqlite/sqlite.rs index 7f25d6ad5b..2100e4fbe7 100644 --- a/tests/sqlite/sqlite.rs +++ b/tests/sqlite/sqlite.rs @@ -504,3 +504,23 @@ async fn it_resets_prepared_statement_after_fetch_one() -> anyhow::Result<()> { Ok(()) } + +#[sqlx_macros::test] +async fn it_resets_prepared_statement_after_fetch_many() -> anyhow::Result<()> { + let mut conn = new::().await?; + + conn.execute("CREATE TEMPORARY TABLE foobar (id INTEGER)") + .await?; + conn.execute("INSERT INTO foobar VALUES (42)").await?; + conn.execute("INSERT INTO foobar VALUES (43)").await?; + + let mut rows = sqlx::query("SELECT id FROM foobar").fetch(&mut conn); + let row = rows.try_next().await?.unwrap(); + let x: i32 = row.try_get("id")?; + assert_eq!(x, 42); + drop(rows); + + conn.execute("DROP TABLE foobar").await?; + + Ok(()) +}