-
Notifications
You must be signed in to change notification settings - Fork 510
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve async support for Cursors #4102
Conversation
parameters: Int, | ||
binders: (SqlPreparedStatement.() -> Unit)?, | ||
) = execute(identifier, { AndroidQuery(sql, database, parameters) }, binders) { executeQuery(mapper) } | ||
) = execute(identifier, { AndroidQuery(sql, database, parameters) }, binders) { executeQuery(mapper) }.value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we return: QueryResult.Value
here? (Only use GitHub Web, so maybe this is already the case).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I made a mistake here. The executeQuery()
call in the trailing lambda there expected a (SqlCursor) -> R
and returned R
so the whole thing was evaluating into a QueryResult<QueryResult<T>>
which isn't right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hope we don't need to add transformers like map
and flatMap
to fix QueryResult<QueryResult<T>>
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, I expected more problems :D
So we can finally implement a real async R2DBC driver, can't we?
Looks good, gonna wait to approve until tests go green because they might uncover some bigger change that needs to be made |
val first = cursor.next() | ||
val result = mutableListOf<T>() | ||
|
||
// If the cursor isn't async, we want to preserve the blocking semantics and execute it synchronously | ||
if (first is QueryResult.AsyncValue) { | ||
QueryResult.AsyncValue { | ||
if (first.await()) result.add(mapper(cursor)) else return@AsyncValue result | ||
while (cursor.next().value) result.add(mapper(cursor)) | ||
result | ||
} | ||
} else { | ||
if (first.value) result.add(mapper(cursor)) else return@execute QueryResult.Value(result) | ||
while (cursor.next().value) result.add(mapper(cursor)) | ||
QueryResult.Value(result) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change deserves extra attention because of how... special it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
merged cause green, I'm surprised this was needed, I assumed calling await
on a QueryResult.Value
would just block anyway and do the same thing as .value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Explaining it here for posterity:
AsyncValue
s are evaluated lazily so that the getter
block can be run in the consumer's coroutine scope. Because of this, when awaitAsList()
(or awaitAsOne...()
) is called, the actual work in the AsyncValue
block is deferred until the final .await()
is called, which happens after the call to execute()
has completed. So by then, the driver may have closed the connection, and there will be no results in the cursor.
Basically the order of calls before:
awaitAs*()
execute()
mapper()
cursor.next()
(synchronously called in the mapper)onClose()
(driver closes the connection)await()
(returns the synchronous value)
and after:
awaitAs*()
execute()
mapper()
(returns anAsyncValue
which will be lazily evaluated)onClose()
(driver closes the connection)await()
(evaluates theAsyncValue
)cursor.next()
(connection has already been closed)
With drivers that implement async cursors, it's up to the driver to ensure that the connection (or whatever the underlying driver needs) stays alive until the coroutine is completed.
val next = cursor.next() | ||
|
||
// If the cursor isn't async, we want to preserve the blocking semantics and execute it synchronously | ||
if (next is QueryResult.AsyncValue) { | ||
QueryResult.AsyncValue { | ||
if (!next.await()) return@AsyncValue null | ||
val value = mapper(cursor) | ||
check(!cursor.next().value) { "ResultSet returned more than 1 row for $this" } | ||
value | ||
} | ||
} else { | ||
if (!next.value) return@execute QueryResult.Value(null) | ||
val value = mapper(cursor) | ||
check(!cursor.next().await()) { "ResultSet returned more than 1 row for $this" } | ||
value | ||
check(!cursor.next().value) { "ResultSet returned more than 1 row for $this" } | ||
QueryResult.Value(value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(This one too)
SqlCursor.next()
now returnsQueryResult<Boolean>
which can be anAsyncValue
for async drivers.I'll follow up with changes to the R2DBC driver that uses this new API.