forked from tigerbeetle/tigerbeetle
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdemo.zig
138 lines (112 loc) · 3.38 KB
/
demo.zig
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
const std = @import("std");
const assert = std.debug.assert;
const config = @import("config.zig");
const tb = @import("tigerbeetle.zig");
const Account = tb.Account;
const Transfer = tb.Transfer;
const Commit = tb.Commit;
const CreateAccountsResult = tb.CreateAccountsResult;
const CreateTransfersResult = tb.CreateTransfersResult;
const CommitTransfersResult = tb.CommitTransfersResult;
const IO = @import("io.zig").IO;
const MessageBus = @import("message_bus.zig").MessageBusClient;
const StateMachine = @import("state_machine.zig").StateMachine;
const vsr = @import("vsr.zig");
const Header = vsr.Header;
const Client = vsr.Client(StateMachine, MessageBus);
pub const log_level: std.log.Level = .alert;
pub fn request(
operation: StateMachine.Operation,
batch: anytype,
on_reply: fn (
user_data: u128,
operation: StateMachine.Operation,
results: Client.Error![]const u8,
) void,
) !void {
const allocator = std.heap.page_allocator;
const client_id = std.crypto.random.int(u128);
const cluster_id: u32 = 0;
var addresses = [_]std.net.Address{try std.net.Address.parseIp4("127.0.0.1", config.port)};
var io = try IO.init(32, 0);
defer io.deinit();
var message_bus = try MessageBus.init(allocator, cluster_id, &addresses, client_id, &io);
defer message_bus.deinit();
var client = try Client.init(
allocator,
client_id,
cluster_id,
@intCast(u8, addresses.len),
&message_bus,
);
defer client.deinit();
message_bus.set_on_message(*Client, &client, Client.on_message);
var message = client.get_message();
defer client.unref(message);
const body = std.mem.asBytes(&batch);
std.mem.copy(u8, message.buffer[@sizeOf(Header)..], body);
client.request(
0,
on_reply,
operation,
message,
body.len,
);
while (client.request_queue.count > 0) {
client.tick();
try io.run_for_ns(config.tick_ms * std.time.ns_per_ms);
}
}
pub fn on_create_accounts(
user_data: u128,
operation: StateMachine.Operation,
results: Client.Error![]const u8,
) void {
_ = user_data;
_ = operation;
print_results(CreateAccountsResult, results);
}
pub fn on_lookup_accounts(
user_data: u128,
operation: StateMachine.Operation,
results: Client.Error![]const u8,
) void {
_ = user_data;
_ = operation;
print_results(Account, results);
}
pub fn on_lookup_transfers(
user_data: u128,
operation: StateMachine.Operation,
results: Client.Error![]const u8,
) void {
_ = user_data;
_ = operation;
print_results(Transfer, results);
}
pub fn on_create_transfers(
user_data: u128,
operation: StateMachine.Operation,
results: Client.Error![]const u8,
) void {
_ = user_data;
_ = operation;
print_results(CreateTransfersResult, results);
}
pub fn on_commit_transfers(
user_data: u128,
operation: StateMachine.Operation,
results: Client.Error![]const u8,
) void {
_ = user_data;
_ = operation;
print_results(CommitTransfersResult, results);
}
fn print_results(comptime Results: type, results: Client.Error![]const u8) void {
const body = results catch unreachable;
const slice = std.mem.bytesAsSlice(Results, body);
for (slice) |result| {
std.debug.print("{}\n", .{result});
}
if (slice.len == 0) std.debug.print("OK\n", .{});
}