pgq seems to be a great alternative to RabbitMQ or Kafka for many reasons:

  1. it runs on postgresql and then is supported by every languages

  2. it can be used using only SQL queries

  3. it is simple enough to have some custom configuration

  4. it has been (and is still) used by Skype, dealing with billion of messages

# Installation

# from source
git clone https://github.com/pgq/pgq
make 
make install
CREATE DATABASE my_database;
\c my_database
CREATE EXTENSION pgq;

# Configuration and Optimization

-- list available consumers
SELECT * FROM pgq.consumer;

-- list all available queues
SELECT * FROM pgq.queue;

-- list event templates
SELECT * FROM pgq.event_template;

-- list retry queue
SELECT * FROM pgq.retry_queue;

-- list all subscription
SELECT * FROM pgq.subscription;

-- list all tick
SELECT * FROM pgq.tick;

# Ticker Configuration

-- set ticker_max_count (default 500)
SELECT pgq.set_queue_config('queue_test', 'ticker_max_count', '1');

-- set ticker_paused (default f)

-- set ticker_max_lag (default 00:00:03)

-- set ticker_idle_period (default 00:01:00)

# Procedures

Database and queue initialization procedure:

  1. create a connection to the database

  2. create a new queue

  3. create a new consumer

Publisher simple procedure:

  1. create a connection to the database

  2. create a new queue (optional)

  3. publish data to the selected queue

Consumer simple procedure:

  1. create a connection to the database

  2. create a new ticker

  3. create a loop

  4. get a batch id

  5. get events using batch id

  6. execute the batch. in case of success, ack.

  7. go to 3.1

# Usage

All queueing functions can easily be executed with pgq.* functions.

-- create a new queue
-- see: https://pgq.github.io/extension/pgq/files/external-sql.html#pgq.create_queue(1)
SELECT pgq.create_queue('queue_test');

-- create a new tick
SELECT gqq.ticker('queue_test');

-- insert a new event
-- see: https://pgq.github.io/extension/pgq/files/external-sql.html#pgq.insert_event(3)
SELECT pgq.insert_event('queue_test', 'json', '{test: 1}');

-- create a new consumer
SELECT pgq.register_consumer('queue_test', 'random_id');

-- get a batch id
SELECT pgq.next_batch('queue_test', 'random_id');

-- get batch content
SELECT pgq.get_batch_events(1);

-- finish batch
SELECT pgq.finish_batch(1)

-- remove the queue
-- see: https://pgq.github.io/extension/pgq/files/external-sql.html#pgq.drop_queue(1)
SELECT pgq.drop_queue('queue_test');

# Erlang (epgsql)

Using epgsql (opens new window).

{ok, C} = epgsql:connect(#{ host => "localhost"
                          , username => <<"user">>
                          , password => <<"password">>
                          , database => <<"my_database">> 
                          }).

% create a new queue
epgsql:equery(C, "SELECT pgq.create_queue($1);", ["erlang"]).

% create consumer
epgsql:equery(C, "SELECT pgq.register_consumer($1, $2);", ["erlang", "consumer"]).

% insert data
epgsql:equery(C, "SELECT pgq.insert_event($1, $2, $3);", ["erlang", "raw", "my data"]).

% create a new tick
{ok, _, [{BatchId}]} = epgsql:equery(C, "SELECT pgq.ticker($1);", ["erlang"]).

% get batch
% where:
% [{{ EventId, EventTime, EventTxId, EventRetry, EventType, 
%   , EventData, EventExtra1, EventExtra2, EventExtra3, EventExtra4}}
% |_] = _Events
{ok, _Columns, _Events} = epgsql:equery(C, "SELECT pgq.get_batch_events($1)", [BatchId]).

% in case of failure, put in retry queue
{ok, _, _} = epgsql:equery(C, "SELECT pgq.event_retry($1::bigint, $2::bigint, $3::integer)"
                            , [BatchId, EventId, Seconds]).

% finish batch
% return batch id
{ok, _, _} = epgsql:equery(C, "SELECT pgq.finish_batch($1::bigint)", [2]).

% unsubscribe
epgsql:equery(C, "SELECT pgq.unregister_consumer($1, $2);", ["erlang", "consumer"]).

% drop queue
epgsql:equery(C, "SELECT pgq.drop_queue($1);", ["erlang"]).

% close connection
epgsql:close(C).

The default method is to use a loop, but it could also be possible to use LISTEN/NOTIFY to be noticed when a batch is ready, it would be more efficient. This part is not working yet, but the process should be something like that:

  1. create a random channel name based on the queue/consumer

  2. create a (temporary?) trigger listening to all event related to this queue

  3. when a new event happens, a notification is sent to the random channel

  4. consumer fetch batch from the queue and treats events.

{ok, C} = epgsql:connect(Args#{ async => self() }).
% it can also be set with epgsql:set_notice_receiver(C, self()).

{ok, _, _} = epgsql:squery(C, "LISTEN virtual").

{ok, _, _} = epgsql:squery(C, "SELECT pg_notify('virtual', 'data')").

flush().
% {epgsql,<0.4610.0>,{notification,<<"virtual">>,42795,<<"data">>}}

# Elixir (ecto)

Using Ecto (opens new window).

# Python

Using psycopg (opens new window).

# NodeJS

Using node-postgres (opens new window).

# Clojure

Using hugsql (opens new window).

# Ruby

Using ruby-pg (opens new window).

# Implementing Pub/Sub Pattern

# Implementing Routing

# Implementing Topics

# Implementing RPC Pattern

# References and Resources

pgq official documentation (opens new window)

pgq official repository (opens new window)

PGQ, Pretty Darn Quick - PostgreSQL (opens new window)

pgq tutorial (opens new window)

skytools documentation (opens new window)

skytooks documentation (archive) (opens new window)

Skytools: PgQ - PGCon (opens new window)

Devious SQL: Message Queuing Using Native PostgreSQL (opens new window)

Skytools database scripting framework & PgQ (opens new window)

PgQ Generic high-performance queue for PostgreSQL (opens new window)

PGQ: Queuing for Long-Running Jobs in Go Written Atop Postgres (opens new window)

PGQ Coop Consumers (opens new window) (archive) (opens new window)

Clean PGQ Subconsumers (opens new window) (archive) (opens new window)

Queues in Postgres (opens new window)

SQL Maxis: Why We Ditched RabbitMQ And Replaced It With A Postgres Queue (opens new window) (reddit) (opens new window)

Skype Plans for PostgreSQL to Scale to 1 Billion Users (opens new window)

How we improved DNS record build speed by more than 4,000x (opens new window)

# LISTEN/NOTIFY

LISTEN (opens new window)

NOTIFY (opens new window)

PostgreSQL LISTEN/NOTIFY (opens new window)

Postgres Listen/Notify As Message Queue (opens new window)

RabbitMQ Exchange that publishes messages received from PostgreSQL Notifications. (opens new window)

Listening to generic JSON notifications from PostgreSQL in Go (opens new window)

LISTEN / NOTIFY: Automatic client notification in PostgreSQL (opens new window)

PostgreSQL Asynchronous Notification (opens new window)

Postgres Triggers with Listen / Notify (opens new window)

PostgreSQL LISTEN/NOTIFY (opens new window)