-
Notifications
You must be signed in to change notification settings - Fork 3
/
pg_zlog--0.1.sql
165 lines (142 loc) · 3.85 KB
/
pg_zlog--0.1.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
CREATE SCHEMA pgzlog_metadata
/* ceph cluster connections */
CREATE TABLE "cluster" (
name text not null,
fsid text not null,
conf_path text default null,
PRIMARY KEY (name)
)
/* storage pools */
CREATE TABLE "pool" (
name text not null,
id bigint not null,
cluster text not null,
PRIMARY KEY (name),
FOREIGN KEY (cluster) REFERENCES pgzlog_metadata.cluster (name)
)
/* zlog instances */
CREATE TABLE "log" (
name text not null,
pool text not null,
host text default null,
port text default null,
last_applied_pos bigint not null default -1,
PRIMARY KEY (name),
FOREIGN KEY (pool) REFERENCES pgzlog_metadata.pool (name)
)
/* tables that are replicated on logs */
CREATE TABLE "replicated_tables" (
table_oid regclass not null,
log text not null,
PRIMARY KEY (table_oid),
FOREIGN KEY (log) REFERENCES pgzlog_metadata.log (name)
);
/*
* Register a cluster.
*/
CREATE FUNCTION pgzlog_add_cluster(name text, conf_path text)
RETURNS void
AS $BODY$
DECLARE
fsid text;
BEGIN
SELECT * FROM ceph_cluster_fsid(conf_path) INTO fsid;
INSERT INTO pgzlog_metadata.cluster (name, fsid, conf_path)
VALUES (name, fsid, conf_path);
END;
$BODY$ LANGUAGE 'plpgsql';
/*
* Register a storage pool.
*/
CREATE FUNCTION pgzlog_add_pool(cluster_name text, pool_name text)
RETURNS void
AS $BODY$
DECLARE
pool_id bigint;
BEGIN
SELECT * FROM ceph_pool_id(cluster_name, pool_name) INTO pool_id;
INSERT INTO pgzlog_metadata.pool (name, id, cluster)
VALUES (pool_name, pool_id, cluster_name);
END;
$BODY$ LANGUAGE 'plpgsql';
/*
* Register a zlog instance.
*/
CREATE FUNCTION pgzlog_add_log(pool_name text, log_name text, host text, port text)
RETURNS void
AS $BODY$
BEGIN
INSERT INTO pgzlog_metadata.log (name, pool, host, port)
VALUES (log_name, pool_name, host, port);
END;
$BODY$ LANGUAGE 'plpgsql';
/*
* Register a table for replication on a log.
*/
CREATE FUNCTION pgzlog_replicate_table(log_name text, table_oid regclass)
RETURNS void
AS $BODY$
BEGIN
INSERT INTO pgzlog_metadata.replicated_tables (table_oid, log)
VALUES (table_oid, log_name);
END;
$BODY$ LANGUAGE 'plpgsql';
/*
* Retrieve the unique identifer of a cluster.
*/
CREATE FUNCTION ceph_cluster_fsid(conf_path text)
RETURNS text
LANGUAGE C
AS 'MODULE_PATHNAME', $$ceph_cluster_fsid$$;
/*
* Retrieve the identification number of the named pool.
*/
CREATE FUNCTION ceph_pool_id(cluster_name text, pool_name text)
RETURNS bigint
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$ceph_pool_id$$;
/*
* zlog_execute executes a query locally, by-passing the query logging logic.
*/
CREATE FUNCTION zlog_execute(INTERNAL)
RETURNS void
LANGUAGE C
AS 'MODULE_PATHNAME', $$zlog_execute$$;
/*
*
*/
CREATE FUNCTION pgzlog_apply_log(log_name text, tail_log_pos bigint)
RETURNS void
AS $BODY$
DECLARE
current_log_pos bigint;
query text;
BEGIN
PERFORM pg_advisory_xact_lock(29030, hashtext(log_name));
SET LOCAL pg_zlog.enabled TO false;
SELECT last_applied_pos INTO current_log_pos
FROM pgzlog_metadata.log
WHERE name = log_name;
current_log_pos := current_log_pos + 1;
ASSERT current_log_pos <= tail_log_pos;
WHILE current_log_pos < tail_log_pos LOOP
SELECT * FROM pgzlog_read_log(log_name, current_log_pos) INTO query;
RAISE DEBUG 'Executing: %', query;
BEGIN
EXECUTE query;
EXCEPTION WHEN others THEN
END;
UPDATE pgzlog_metadata.log
SET last_applied_pos = current_log_pos
WHERE name = log_name;
current_log_pos := current_log_pos + 1;
END LOOP;
END;
$BODY$ LANGUAGE 'plpgsql';
/*
*
*/
CREATE FUNCTION pgzlog_read_log(log_name text, pos bigint)
RETURNS text
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$pgzlog_read_log$$;