Skip to content

Commit

Permalink
Add support for obtaining raw SQL query results from the underlying S…
Browse files Browse the repository at this point in the history
…QL client (#3457)
  • Loading branch information
IMax153 authored Aug 15, 2024
1 parent f8d95a6 commit a07990d
Show file tree
Hide file tree
Showing 13 changed files with 155 additions and 29 deletions.
59 changes: 59 additions & 0 deletions .changeset/metal-geese-move.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
---
"@effect/sql-sqlite-react-native": minor
"@effect/sql-sqlite-node": minor
"@effect/sql-sqlite-wasm": minor
"@effect/sql-sqlite-bun": minor
"@effect/sql-mysql2": minor
"@effect/sql-mssql": minor
"@effect/sql-d1": minor
"@effect/sql-pg": minor
"@effect/sql": minor
---

Add support for executing raw SQL queries with the underlying SQL client.

This is primarily useful when the SQL client returns special results for certain
query types.

For example, because MySQL does not support the `RETURNING` clause, the `mysql2`
client will return a [`ResultSetHeader`](https://sidorares.github.io/node-mysql2/docs/documentation/typescript-examples#resultsetheader)
for `INSERT`, `UPDATE`, `DELETE`, and `TRUNCATE` operations.

To gain access to the raw results of a query, you can use the `.raw` property on
the `Statement`:

```ts
import * as Effect from "effect/Effect"
import * as SqlClient from "@effect/sql/SqlClient"
import * as MysqlClient from "@effect/sql/MysqlClient"

const DatabaseLive = MysqlClient.layer({
database: Config.succeed("database"),
username: Config.succeed("root"),
password: Config.succeed(Redacted.make("password")),
})

const program = Effect.gen(function*() {
const sql = yield* SqlClient.SqlClient

const result = yield* sql`INSERT INTO usernames VALUES ("Bob")`.raw

console.log(result)
/**
* ResultSetHeader {
* fieldCount: 0,
* affectedRows: 1,
* insertId: 0,
* info: '',
* serverStatus: 2,
* warningStatus: 0,
* changedRows: 0
* }
*/
})

program.pipe(
Effect.provide(DatabaseLive),
Effect.runPromise
)
```
25 changes: 17 additions & 8 deletions packages/sql-d1/src/D1Client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,19 +102,25 @@ export const make = (
catch: (cause) => new SqlError({ cause, message: `Failed to execute statement` })
})

const run = (
const runRaw = (
sql: string,
params: ReadonlyArray<Statement.Primitive> = []
) => runStatement(db.prepare(sql), params)

const runCached = (
sql: string,
params: ReadonlyArray<Statement.Primitive> = []
) => Effect.flatMap(prepareCache.get(sql), (s) => runStatement(s, params))

const runRaw = (
const runUncached = (
sql: string,
params: ReadonlyArray<Statement.Primitive> = []
) => Effect.map(runStatement(db.prepare(sql), params), transformRows)
) => Effect.map(runRaw(sql, params), transformRows)

const runTransform = options.transformResultNames
? (sql: string, params?: ReadonlyArray<Statement.Primitive>) => Effect.map(run(sql, params), transformRows)
: run
? (sql: string, params?: ReadonlyArray<Statement.Primitive>) =>
Effect.map(runCached(sql, params), transformRows)
: runCached

const runValues = (
sql: string,
Expand All @@ -139,14 +145,17 @@ export const make = (
execute(sql, params) {
return runTransform(sql, params)
},
executeRaw(sql, params) {
return runRaw(sql, params)
},
executeValues(sql, params) {
return runValues(sql, params)
},
executeWithoutTransform(sql, params) {
return run(sql, params)
return runCached(sql, params)
},
executeRaw(sql, params) {
return runRaw(sql, params)
executeUnprepared(sql, params) {
return runUncached(sql, params)
},
executeStream(_sql, _params) {
return Effect.dieMessage("executeStream not implemented")
Expand Down
5 changes: 4 additions & 1 deletion packages/sql-mssql/src/MssqlClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,13 +260,16 @@ export const make = (
execute(sql, params) {
return run(sql, params)
},
executeRaw(sql, params) {
return run(sql, params, false)
},
executeWithoutTransform(sql, params) {
return run(sql, params, false)
},
executeValues(sql, params) {
return run(sql, params, true, true)
},
executeRaw(sql, params) {
executeUnprepared(sql, params) {
return run(sql, params)
},
executeStream() {
Expand Down
33 changes: 25 additions & 8 deletions packages/sql-mysql2/src/MysqlClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,40 +89,57 @@ export const make = (
class ConnectionImpl implements Connection {
constructor(private readonly conn: Mysql.PoolConnection | Mysql.Pool) {}

private run(
private runRaw(
sql: string,
values?: ReadonlyArray<any>,
transform = true,
rowsAsArray = false,
method: "execute" | "query" = "execute"
) {
return Effect.async<ReadonlyArray<any>, SqlError>((resume) => {
return Effect.async<unknown, SqlError>((resume) => {
;(this.conn as any)[method]({
sql,
values,
rowsAsArray
}, (cause: unknown | null, results: ReadonlyArray<any>, _fields: any) => {
}, (cause: unknown | null, results: unknown, _fields: any) => {
if (cause) {
resume(Effect.fail(new SqlError({ cause, message: "Failed to execute statement" })))
} else if (transform && !rowsAsArray && options.transformResultNames) {
resume(Effect.succeed(transformRows(results)))
} else {
resume(Effect.succeed(Array.isArray(results) ? results : []))
resume(Effect.succeed(results))
}
})
})
}

private run(
sql: string,
values?: ReadonlyArray<any>,
transform = true,
rowsAsArray = false,
method: "execute" | "query" = "execute"
) {
return this.runRaw(sql, values, rowsAsArray, method).pipe(
Effect.map((results) => {
if (transform && !rowsAsArray && options.transformResultNames) {
return transformRows(results as ReadonlyArray<any>)
}
return Array.isArray(results) ? results : []
})
)
}

execute(sql: string, params: ReadonlyArray<Statement.Primitive>) {
return this.run(sql, params)
}
executeRaw(sql: string, params: ReadonlyArray<Statement.Primitive>) {
return this.runRaw(sql, params, true)
}
executeWithoutTransform(sql: string, params: ReadonlyArray<Statement.Primitive>) {
return this.run(sql, params, false)
}
executeValues(sql: string, params: ReadonlyArray<Statement.Primitive>) {
return this.run(sql, params, true, true)
}
executeRaw(sql: string, params?: ReadonlyArray<Statement.Primitive>) {
executeUnprepared(sql: string, params?: ReadonlyArray<Statement.Primitive>) {
return this.run(sql, params, true, false, "query")
}
executeStream(sql: string, params: ReadonlyArray<Statement.Primitive>) {
Expand Down
5 changes: 4 additions & 1 deletion packages/sql-pg/src/PgClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,13 +192,16 @@ export const make = (
execute(sql: string, params: ReadonlyArray<Primitive>) {
return this.runTransform(this.pg.unsafe(sql, params as any))
}
executeRaw(sql: string, params: ReadonlyArray<Primitive>) {
return this.run(this.pg.unsafe(sql, params as any))
}
executeWithoutTransform(sql: string, params: ReadonlyArray<Primitive>) {
return this.run(this.pg.unsafe(sql, params as any))
}
executeValues(sql: string, params: ReadonlyArray<Primitive>) {
return this.run(this.pg.unsafe(sql, params as any).values())
}
executeRaw(sql: string, params?: ReadonlyArray<Primitive>) {
executeUnprepared(sql: string, params?: ReadonlyArray<Primitive>) {
return this.runTransform(this.pg.unsafe(sql, params as any))
}
executeStream(sql: string, params: ReadonlyArray<Primitive>) {
Expand Down
5 changes: 4 additions & 1 deletion packages/sql-sqlite-bun/src/SqliteClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,16 @@ export const make = (
execute(sql, params) {
return runTransform(sql, params)
},
executeRaw(sql, params) {
return run(sql, params)
},
executeValues(sql, params) {
return runValues(sql, params)
},
executeWithoutTransform(sql, params) {
return run(sql, params)
},
executeRaw(sql, params) {
executeUnprepared(sql, params) {
return runTransform(sql, params)
},
executeStream(_sql, _params) {
Expand Down
9 changes: 6 additions & 3 deletions packages/sql-sqlite-node/src/SqliteClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ export const make = (
params: ReadonlyArray<Statement.Primitive> = []
) => Effect.flatMap(prepareCache.get(sql), (s) => runStatement(s, params))

const runRaw = (
const runUnprepared = (
sql: string,
params: ReadonlyArray<Statement.Primitive> = []
) => Effect.map(runStatement(db.prepare(sql), params), transformRows)
Expand Down Expand Up @@ -172,14 +172,17 @@ export const make = (
execute(sql, params) {
return runTransform(sql, params)
},
executeRaw(sql, params) {
return run(sql, params)
},
executeValues(sql, params) {
return runValues(sql, params)
},
executeWithoutTransform(sql, params) {
return run(sql, params)
},
executeRaw(sql, params) {
return runRaw(sql, params)
executeUnprepared(sql, params) {
return runUnprepared(sql, params)
},
executeStream(_sql, _params) {
return Effect.dieMessage("executeStream not implemented")
Expand Down
5 changes: 4 additions & 1 deletion packages/sql-sqlite-react-native/src/SqliteClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ export const make = (
execute(sql, params) {
return runTransform(sql, params)
},
executeRaw(sql, params) {
return run(sql, params)
},
executeValues(sql, params) {
return Effect.map(run(sql, params), (results) => {
if (results.length === 0) {
Expand All @@ -161,7 +164,7 @@ export const make = (
executeWithoutTransform(sql, params) {
return run(sql, params)
},
executeRaw(sql, params) {
executeUnprepared(sql, params) {
return runTransform(sql, params)
},
executeStream() {
Expand Down
5 changes: 4 additions & 1 deletion packages/sql-sqlite-wasm/src/SqliteClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,16 @@ export const make = (
execute(sql, params) {
return runTransform(sql, params)
},
executeRaw(sql, params) {
return run(sql, params)
},
executeValues(sql, params) {
return run(sql, params, "array")
},
executeWithoutTransform(sql, params) {
return run(sql, params)
},
executeRaw(sql, params) {
executeUnprepared(sql, params) {
return runTransform(sql, params)
},
executeStream() {
Expand Down
11 changes: 10 additions & 1 deletion packages/sql/src/SqlConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ export interface Connection {
params: ReadonlyArray<Primitive>
) => Effect<ReadonlyArray<any>, SqlError>

/**
* Execute the specified SQL query and return the raw results directly from
* underlying SQL client.
*/
readonly executeRaw: (
sql: string,
params: ReadonlyArray<Primitive>
) => Effect<unknown, SqlError>

readonly executeStream: (
sql: string,
params: ReadonlyArray<Primitive>
Expand All @@ -33,7 +42,7 @@ export interface Connection {
params: ReadonlyArray<Primitive>
) => Effect<ReadonlyArray<ReadonlyArray<Primitive>>, SqlError>

readonly executeRaw: (
readonly executeUnprepared: (
sql: string,
params?: ReadonlyArray<Primitive> | undefined
) => Effect<ReadonlyArray<any>, SqlError>
Expand Down
1 change: 1 addition & 0 deletions packages/sql/src/Statement.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export type Dialect = "sqlite" | "pg" | "mysql" | "mssql"
* @since 1.0.0
*/
export interface Statement<A> extends Fragment, Effect<ReadonlyArray<A>, SqlError>, Pipeable {
readonly raw: Effect<unknown, SqlError>
readonly withoutTransform: Effect<ReadonlyArray<A>, SqlError>
readonly stream: Stream.Stream<A, SqlError>
readonly values: Effect<ReadonlyArray<ReadonlyArray<Primitive>>, SqlError>
Expand Down
8 changes: 5 additions & 3 deletions packages/sql/src/internal/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ export function make({
(
[scope, conn]
) =>
conn.executeRaw(id === 0 ? beginTransaction : savepoint(`effect_sql_${id}`)).pipe(
conn.executeUnprepared(id === 0 ? beginTransaction : savepoint(`effect_sql_${id}`)).pipe(
Effect.zipRight(Effect.locally(
restore(effect),
FiberRef.currentContext,
Expand All @@ -90,15 +90,17 @@ export function make({
if (Exit.isSuccess(exit)) {
if (id === 0) {
span.event("db.transaction.commit", clock.unsafeCurrentTimeNanos())
effect = Effect.orDie(conn.executeRaw(commit))
effect = Effect.orDie(conn.executeUnprepared(commit))
} else {
span.event("db.transaction.savepoint", clock.unsafeCurrentTimeNanos())
effect = Effect.void
}
} else {
span.event("db.transaction.rollback", clock.unsafeCurrentTimeNanos())
effect = Effect.orDie(
id > 0 ? conn.executeRaw(rollbackSavepoint(`effect_sql_${id}`)) : conn.executeRaw(rollback)
id > 0
? conn.executeUnprepared(rollbackSavepoint(`effect_sql_${id}`))
: conn.executeUnprepared(rollback)
)
}
const withScope = scope !== undefined ? Effect.ensuring(effect, Scope.close(scope, exit)) : effect
Expand Down
13 changes: 12 additions & 1 deletion packages/sql/src/internal/statement.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,14 @@ export class StatementPrimitive<A> extends Effectable.Class<ReadonlyArray<A>, Er
)
}

get raw(): Effect.Effect<unknown, Error.SqlError> {
return this.withConnection(
"executeRaw",
(connection, sql, params) => connection.executeRaw(sql, params),
true
)
}

get stream(): Stream.Stream<A, Error.SqlError> {
return Stream.unwrapScoped(Effect.flatMap(
Effect.makeSpanScoped("sql.execute", { kind: "client", captureStackTrace: false }),
Expand All @@ -154,7 +162,10 @@ export class StatementPrimitive<A> extends Effectable.Class<ReadonlyArray<A>, Er
}

get unprepared(): Effect.Effect<ReadonlyArray<A>, Error.SqlError> {
return this.withConnection("executeRaw", (connection, sql, params) => connection.executeRaw(sql, params))
return this.withConnection(
"executeUnprepared",
(connection, sql, params) => connection.executeUnprepared(sql, params)
)
}

compile(withoutTransform?: boolean | undefined) {
Expand Down

0 comments on commit a07990d

Please sign in to comment.