Skip to content

Commit

Permalink
Make drop queue infer queue type (#319)
Browse files Browse the repository at this point in the history
* Make drop queue infer queue type

* Overload function to not break backwards compatibility

* Add migration file for drop_queue changes

* Add test for automatic is_partitioned check

* Drop function in migration script

* Avoid code duplication in drop_queue

* Refactor, add deprecation warning

* fix

* Include deprecation warnings in tests

---------

Co-authored-by: Adam Hendel <ChuckHend@users.noreply.github.com>
Co-authored-by: v0idpwn <v0idpwn@gmail.com>
  • Loading branch information
3 people authored Oct 22, 2024
1 parent 81f4a13 commit be39b73
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 6 deletions.
2 changes: 1 addition & 1 deletion pgmq-extension/pgmq.control
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
comment = 'A lightweight message queue. Like AWS SQS and RSMQ but on Postgres.'
default_version = '1.4.4'
default_version = '1.5.0'
module_pathname = '$libdir/pgmq'
schema = 'pgmq'
relocatable = false
Expand Down
87 changes: 87 additions & 0 deletions pgmq-extension/sql/pgmq--1.4.4--1.5.0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
DROP FUNCTION IF EXISTS pgmq.drop_queue(TEXT, BOOLEAN);

CREATE FUNCTION pgmq.drop_queue(queue_name TEXT, partitioned BOOLEAN)
RETURNS BOOLEAN AS $$
DECLARE
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
fq_qtable TEXT := 'pgmq.' || qtable;
atable TEXT := pgmq.format_table_name(queue_name, 'a');
fq_atable TEXT := 'pgmq.' || atable;
BEGIN
RAISE WARNING "drop_queue(queue_name, partitioned) is deprecated and will be removed in PGMQ v2.0. Use drop_queue(queue_name) instead.";

PERFORM pgmq.drop_queue(queue_name);

RETURN TRUE;
END;
$$ LANGUAGE plpgsql;

CREATE FUNCTION pgmq.drop_queue(queue_name TEXT)
RETURNS BOOLEAN AS $$
DECLARE
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
fq_qtable TEXT := 'pgmq.' || qtable;
atable TEXT := pgmq.format_table_name(queue_name, 'a');
fq_atable TEXT := 'pgmq.' || atable;
partitioned BOOLEAN;
BEGIN
EXECUTE FORMAT(
$QUERY$
SELECT is_partitioned FROM pgmq.meta WHERE queue_name = %L
$QUERY$,
queue_name
) INTO partitioned;

EXECUTE FORMAT(
$QUERY$
ALTER EXTENSION pgmq DROP TABLE pgmq.%I
$QUERY$,
qtable
);

EXECUTE FORMAT(
$QUERY$
ALTER EXTENSION pgmq DROP TABLE pgmq.%I
$QUERY$,
atable
);

EXECUTE FORMAT(
$QUERY$
DROP TABLE IF EXISTS pgmq.%I
$QUERY$,
qtable
);

EXECUTE FORMAT(
$QUERY$
DROP TABLE IF EXISTS pgmq.%I
$QUERY$,
atable
);

IF EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_name = 'meta' and table_schema = 'pgmq'
) THEN
EXECUTE FORMAT(
$QUERY$
DELETE FROM pgmq.meta WHERE queue_name = %L
$QUERY$,
queue_name
);
END IF;

IF partitioned THEN
EXECUTE FORMAT(
$QUERY$
DELETE FROM %I.part_config where parent_table in (%L, %L)
$QUERY$,
pgmq._get_pg_partman_schema(), fq_qtable, fq_atable
);
END IF;

RETURN TRUE;
END;
$$ LANGUAGE plpgsql;
26 changes: 25 additions & 1 deletion pgmq-extension/sql/pgmq.sql
Original file line number Diff line number Diff line change
Expand Up @@ -466,14 +466,38 @@ RETURNS TEXT AS $$
extname = 'pg_partman';
$$ LANGUAGE SQL;
CREATE FUNCTION pgmq.drop_queue(queue_name TEXT, partitioned BOOLEAN DEFAULT FALSE)
CREATE FUNCTION pgmq.drop_queue(queue_name TEXT, partitioned BOOLEAN)
RETURNS BOOLEAN AS $$
DECLARE
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
fq_qtable TEXT := 'pgmq.' || qtable;
atable TEXT := pgmq.format_table_name(queue_name, 'a');
fq_atable TEXT := 'pgmq.' || atable;
BEGIN
RAISE WARNING 'drop_queue(queue_name, partitioned) is deprecated and will be removed in PGMQ v2.0. Use drop_queue(queue_name) instead';
PERFORM pgmq.drop_queue(queue_name);
RETURN TRUE;
END;
$$ LANGUAGE plpgsql;
CREATE FUNCTION pgmq.drop_queue(queue_name TEXT)
RETURNS BOOLEAN AS $$
DECLARE
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
fq_qtable TEXT := 'pgmq.' || qtable;
atable TEXT := pgmq.format_table_name(queue_name, 'a');
fq_atable TEXT := 'pgmq.' || atable;
partitioned BOOLEAN;
BEGIN
EXECUTE FORMAT(
$QUERY$
SELECT is_partitioned FROM pgmq.meta WHERE queue_name = %L
$QUERY$,
queue_name
) INTO partitioned;
EXECUTE FORMAT(
$QUERY$
ALTER EXTENSION pgmq DROP TABLE pgmq.%I
Expand Down
16 changes: 14 additions & 2 deletions pgmq-extension/test/expected/base.out
Original file line number Diff line number Diff line change
Expand Up @@ -153,16 +153,28 @@ LINE 1: SELECT * from {PGMQ_SCHEMA}.metrics_all();
-- delete all the queues
-- delete partitioned queues
SELECT pgmq.drop_queue(queue, true)
FROM unnest('{test_duration_queue, test_numeric_queue}'::text[]) AS queue;
FROM unnest('{test_numeric_queue}'::text[]) AS queue;
WARNING: drop_queue(queue_name, partitioned) is deprecated and will be removed in PGMQ v2.0. Use drop_queue(queue_name) instead
drop_queue
------------
t
(1 row)

-- test automatic partitioned status checking
SELECT pgmq.drop_queue(queue)
FROM unnest('{test_duration_queue}'::text[]) AS queue;
drop_queue
------------
t
(2 rows)
(1 row)

-- drop the rest of the queues
SELECT pgmq.drop_queue(q.queue_name, true)
FROM (SELECT queue_name FROM pgmq.list_queues()) AS q;
WARNING: drop_queue(queue_name, partitioned) is deprecated and will be removed in PGMQ v2.0. Use drop_queue(queue_name) instead
WARNING: drop_queue(queue_name, partitioned) is deprecated and will be removed in PGMQ v2.0. Use drop_queue(queue_name) instead
WARNING: drop_queue(queue_name, partitioned) is deprecated and will be removed in PGMQ v2.0. Use drop_queue(queue_name) instead
WARNING: drop_queue(queue_name, partitioned) is deprecated and will be removed in PGMQ v2.0. Use drop_queue(queue_name) instead
drop_queue
------------
t
Expand Down
8 changes: 6 additions & 2 deletions pgmq-extension/test/sql/base.sql
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ SELECT * from {PGMQ_SCHEMA}.metrics_all();
-- delete all the queues
-- delete partitioned queues
SELECT pgmq.drop_queue(queue, true)
FROM unnest('{test_duration_queue, test_numeric_queue}'::text[]) AS queue;
FROM unnest('{test_numeric_queue}'::text[]) AS queue;

-- test automatic partitioned status checking
SELECT pgmq.drop_queue(queue)
FROM unnest('{test_duration_queue}'::text[]) AS queue;

-- drop the rest of the queues
SELECT pgmq.drop_queue(q.queue_name, true)
Expand Down Expand Up @@ -303,4 +307,4 @@ SELECT pgmq.format_table_name($$single'quote-fail$$, 'a');
--Cleanup tests
DROP EXTENSION pgmq CASCADE;
DROP EXTENSION pg_partman CASCADE;
DROP EXTENSION pg_partman CASCADE;

0 comments on commit be39b73

Please sign in to comment.