pgq seems to be a great alternative to RabbitMQ or Kafka for many
reasons:
it runs on postgresql and then is supported by every languages
it can be used using only SQL queries
it is simple enough to have some custom configuration
it has been (and is still) used by Skype, dealing with billion of messages
# Installation
# from source
git clone https://github.com/pgq/pgq
make
make install
CREATE DATABASE my_database;
\c my_database
CREATE EXTENSION pgq;
# Configuration and Optimization
-- list available consumers
SELECT * FROM pgq.consumer;
-- list all available queues
SELECT * FROM pgq.queue;
-- list event templates
SELECT * FROM pgq.event_template;
-- list retry queue
SELECT * FROM pgq.retry_queue;
-- list all subscription
SELECT * FROM pgq.subscription;
-- list all tick
SELECT * FROM pgq.tick;
# Ticker Configuration
-- set ticker_max_count (default 500)
SELECT pgq.set_queue_config('queue_test', 'ticker_max_count', '1');
-- set ticker_paused (default f)
-- set ticker_max_lag (default 00:00:03)
-- set ticker_idle_period (default 00:01:00)
# Procedures
Database and queue initialization procedure:
create a connection to the database
create a new queue
create a new consumer
Publisher simple procedure:
create a connection to the database
create a new queue (optional)
publish data to the selected queue
Consumer simple procedure:
create a connection to the database
create a new ticker
create a loop
get a batch id
get events using batch id
execute the batch. in case of success, ack.
go to 3.1
# Usage
All queueing functions can easily be executed with pgq.* functions.
-- create a new queue
-- see: https://pgq.github.io/extension/pgq/files/external-sql.html#pgq.create_queue(1)
SELECT pgq.create_queue('queue_test');
-- create a new tick
SELECT gqq.ticker('queue_test');
-- insert a new event
-- see: https://pgq.github.io/extension/pgq/files/external-sql.html#pgq.insert_event(3)
SELECT pgq.insert_event('queue_test', 'json', '{test: 1}');
-- create a new consumer
SELECT pgq.register_consumer('queue_test', 'random_id');
-- get a batch id
SELECT pgq.next_batch('queue_test', 'random_id');
-- get batch content
SELECT pgq.get_batch_events(1);
-- finish batch
SELECT pgq.finish_batch(1)
-- remove the queue
-- see: https://pgq.github.io/extension/pgq/files/external-sql.html#pgq.drop_queue(1)
SELECT pgq.drop_queue('queue_test');
# Erlang (epgsql)
Using epgsql (opens new window).
{ok, C} = epgsql:connect(#{ host => "localhost"
, username => <<"user">>
, password => <<"password">>
, database => <<"my_database">>
}).
% create a new queue
epgsql:equery(C, "SELECT pgq.create_queue($1);", ["erlang"]).
% create consumer
epgsql:equery(C, "SELECT pgq.register_consumer($1, $2);", ["erlang", "consumer"]).
% insert data
epgsql:equery(C, "SELECT pgq.insert_event($1, $2, $3);", ["erlang", "raw", "my data"]).
% create a new tick
{ok, _, [{BatchId}]} = epgsql:equery(C, "SELECT pgq.ticker($1);", ["erlang"]).
% get batch
% where:
% [{{ EventId, EventTime, EventTxId, EventRetry, EventType,
% , EventData, EventExtra1, EventExtra2, EventExtra3, EventExtra4}}
% |_] = _Events
{ok, _Columns, _Events} = epgsql:equery(C, "SELECT pgq.get_batch_events($1)", [BatchId]).
% in case of failure, put in retry queue
{ok, _, _} = epgsql:equery(C, "SELECT pgq.event_retry($1::bigint, $2::bigint, $3::integer)"
, [BatchId, EventId, Seconds]).
% finish batch
% return batch id
{ok, _, _} = epgsql:equery(C, "SELECT pgq.finish_batch($1::bigint)", [2]).
% unsubscribe
epgsql:equery(C, "SELECT pgq.unregister_consumer($1, $2);", ["erlang", "consumer"]).
% drop queue
epgsql:equery(C, "SELECT pgq.drop_queue($1);", ["erlang"]).
% close connection
epgsql:close(C).
The default method is to use a loop, but it could also be possible to use LISTEN/NOTIFY to be noticed when a batch is ready, it would be more efficient. This part is not working yet, but the process should be something like that:
create a random channel name based on the queue/consumer
create a (temporary?) trigger listening to all event related to this queue
when a new event happens, a notification is sent to the random channel
consumer fetch batch from the queue and treats events.
{ok, C} = epgsql:connect(Args#{ async => self() }).
% it can also be set with epgsql:set_notice_receiver(C, self()).
{ok, _, _} = epgsql:squery(C, "LISTEN virtual").
{ok, _, _} = epgsql:squery(C, "SELECT pg_notify('virtual', 'data')").
flush().
% {epgsql,<0.4610.0>,{notification,<<"virtual">>,42795,<<"data">>}}
# Elixir (ecto)
Using Ecto (opens new window).
# Python
Using psycopg (opens new window).
# NodeJS
Using node-postgres (opens new window).
# Clojure
Using hugsql (opens new window).
# Ruby
Using ruby-pg (opens new window).
# Implementing Pub/Sub Pattern
# Implementing Routing
# Implementing Topics
# Implementing RPC Pattern
# References and Resources
pgq official documentation (opens new window)
pgq official repository (opens new window)
PGQ, Pretty Darn Quick - PostgreSQL (opens new window)
pgq tutorial (opens new window)
skytools documentation (opens new window)
skytooks documentation (archive) (opens new window)
Skytools: PgQ - PGCon (opens new window)
Devious SQL: Message Queuing Using Native PostgreSQL (opens new window)
Skytools database scripting framework & PgQ (opens new window)
PgQ Generic high-performance queue for PostgreSQL (opens new window)
PGQ: Queuing for Long-Running Jobs in Go Written Atop Postgres (opens new window)
PGQ Coop Consumers (opens new window) (archive) (opens new window)
Clean PGQ Subconsumers (opens new window) (archive) (opens new window)
Queues in Postgres (opens new window)
SQL Maxis: Why We Ditched RabbitMQ And Replaced It With A Postgres Queue (opens new window) (reddit) (opens new window)
Skype Plans for PostgreSQL to Scale to 1 Billion Users (opens new window)
How we improved DNS record build speed by more than 4,000x (opens new window)
# LISTEN/NOTIFY
PostgreSQL LISTEN/NOTIFY (opens new window)
Postgres Listen/Notify As Message Queue (opens new window)
RabbitMQ Exchange that publishes messages received from PostgreSQL Notifications. (opens new window)
Listening to generic JSON notifications from PostgreSQL in Go (opens new window)
LISTEN / NOTIFY: Automatic client notification in PostgreSQL (opens new window)
PostgreSQL Asynchronous Notification (opens new window)