-
Notifications
You must be signed in to change notification settings - Fork 3
/
app.js
151 lines (128 loc) · 3.87 KB
/
app.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import Knex from "knex";
const config = {
client: "cockroachdb",
connection: process.env.DATABASE_URL + "&application_name=docs_simplecrud_knex",
migrations: {
directory: "migration/migrations",
},
seeds: {
directory: "migration/seeds",
},
}
// Connect to database
const client = Knex(config);
let accountValues;
function logRows(rows) {
if (rows.length > 0) {
console.log("New account balances:");
rows.forEach((row) => {
console.log(row);
});
}
}
// Wrapper for a transaction. This automatically re-calls the operation with
// the client as an argument as long as the database server asks for
// the transaction to be retried.
async function retryTxn(n, max, client, operation) {
const transactionProvider = client.transactionProvider();
const transaction = await transactionProvider();
while (true) {
n++;
if (n === max) {
throw new Error("Max retry count reached.");
}
try {
await operation(client, transaction);
await transaction.commit();
return;
} catch (err) {
if (err.code !== "40001") {
console.error(err.message);
throw err;
} else {
console.log("Transaction failed. Retrying transaction.");
console.log(err.message);
await transaction.rollback();
await new Promise((r) => setTimeout(r, 2 ** n * 1000));
}
}
}
}
// This function is called within the first transaction. It inserts some initial values into the "accounts" table.
async function initTable(client) {
await client.migrate.latest();
await client.seed.run();
const insertedValues = await client("accounts")
.select(["id", "balance"])
.orderBy("balance", "desc");
logRows(insertedValues);
accountValues = insertedValues;
}
// This function updates the values of two rows, simulating a "transfer" of funds.
async function transferFunds(client, transaction) {
const from = accountValues[0].id;
const to = accountValues[1].id;
const amount = 100;
const rows = await client("accounts")
.transacting(transaction)
.select("balance")
.where({
id: from,
});
const acctBal = rows[0].balance;
if (acctBal < amount) {
console.error(`insufficient funds for account ${from}`);
}
await client("accounts")
.transacting(transaction)
.update({
balance: client.raw(`balance - ${amount}`),
})
.where({
id: from,
});
await client("accounts")
.transacting(transaction)
.update({
balance: client.raw(`balance + ${amount}`),
})
.where({
id: to,
});
const insertedValues = await client("accounts")
.transacting(transaction)
.select(["id", "balance"])
.orderBy("balance", "desc");
logRows(insertedValues);
accountValues = insertedValues;
}
// This function deletes the third row in the accounts table.
async function deleteAccounts(client, transaction) {
await client("accounts").transacting(transaction).delete().where({
id: accountValues[1].id,
});
const insertedValues = await client("accounts")
.transacting(transaction)
.select(["id", "balance"])
.orderBy("balance", "desc");
logRows(insertedValues);
accountValues = insertedValues;
}
// Run the transactions in the connection pool
(async () => {
// Prevent knex migration framework from running
// schema changes in a transaction
await client.raw("SET autocommit_before_ddl = true");
// Initialize table in without using a transaction,
// since it involves schema changes.
console.log("Initializing accounts table...");
await initTable(client);
// Transfer funds in transaction retry wrapper
console.log("Transferring funds...");
await retryTxn(0, 15, client, transferFunds);
// Delete a row in transaction retry wrapper
console.log("Deleting a row...");
await retryTxn(0, 15, client, deleteAccounts);
// Exit program
process.exit();
})().catch((err) => console.log(err.stack));