Messaging with Oracle DB: Consumer Groups and Message Retention
In this article, we’ll discuss messaging with Oracle Database, consumer groups, and walk through the differences between queues and topics. By the end of this article, you should have a solid idea of how consumers and message retention works when using Oracle Database for asynchronous, service-to-service messaging.
Messaging with Oracle Database
Before we dive into the details, let’s briefly describe Oracle Database Transactional Event Queues (TxEventQ).
TxEventQ is a messaging system built into Oracle Database — that is, TxEventQ runs within the database, and provides a messaging interface for applications to asynchronously produce and consume messages using database transactions.
TxEventQ was introduced in Oracle Database 19c as the evolution of classic queuing. TxEventQ is included as a free-of-charge Oracle Database feature.
Check out my article Introduction to event streaming with Oracle Database Transactional Event Queues for an in-depth description.
Subscribers and Consumer Groups?
A Subscriber or Consumer Group (I’ll use the terms interchangeably) is a group of processes that subscribe to a queue or topic to consume messages. Consumer Groups allow us to parallelize message consumption — By increasing the number of consumers to a group, we can concurrently read messages from a queue or topic.
After all consumer groups have consumed a message from a queue or topic, that message is scheduled for deletion. This is the default message retention behavior, and may be modified during or after queue creation— we’ll explore message retention in a later section.
The number of consumers in a consumer group is limited by the partitions or shards or the corresponding queue or topic. A shard is a sub-unit of a queue or topic that allows the assignment of at most one consumer from each consumer group. If a consumer group has less consumers than there are shards in a queue, a consumer may be assigned to more than one shard. Conversely, if a consumer group has more consumers than it has shards, excess consumers will remain idle.
You can configure the shards of a queue or topic with the dbms_aqadm.set_queue_parameter procedure, modifying the shard_num parameter to the desired value. This procedure should be run after a queue is created, but before it is started.
begin
-- create the queue ...
dbms_aqadm.set_queue_parameter('myqueue', 'shard_num', 6);
--- then start queue ...
end;
/
Note: Users of Oracle Database 19.27 or earlier using sticky dequeue should ensure there is a 1–1 mapping between consumers and queue shards to avoid idle shards. After database version 19.27 (including 23ai), this limitation no longer present.
begin
-- create the queue ...
-- must be set to make sure that order is guaranteed at the consumer
dbms_aqadm.set_queue_parameter('mytopic', 'sticky_dequeue', 1);
-- must be set to make sure that order of the events per correlation in place
dbms_aqadm.set_queue_parameter('mytopic', 'key_based_enqueue', 1);
-- then start queue ...
end;
/
Queues
A Queue can have exactly one consumer group, with as many consumers as the queue has shards.
We can use DBMS_AQADM PL/SQL package to create a queue. In this example, we’ll specifically create a JMS queue using the dbms_aqadm.JMS_TYPE payload type. Note that the multiple_consumers payload is set to false — this allows only one consumer group from this queue.
-- Create a Transactional Event Queue
begin
dbms_aqadm.create_transactional_event_queue(
queue_name => 'myqueue',
-- Payload can be RAW, JSON, DBMS_AQADM.JMS_TYPE, or an object type.
-- Default is DBMS_AQADM.JMS_TYPE.
queue_payload_type => DBMS_AQADM.JMS_TYPE,
-- One consumer group per message (a queue)
multiple_consumers => false
);
-- Start the queue
dbms_aqadm.start_queue(
queue_name => 'myqueue'
);
end;
/
Recommended by LinkedIn
Find a write-up about producing and consuming messages using JMS Queues Here. You can also find the sample on GitHub Here.
Topics
A Topic is similar to a queue, with a few key differences. A topic may have any number of consumer groups. A queue’s consumer group is implicitly created, but must create consumer groups for topics using the dbms_aqadm.add_subscriber PL/SQL procedure.
In the following PL/SQL snippet, we create a topic and two consumer groups, example_subscriber_1 and example_subscriber_2 :
-- Create a Transactional Event Queue
begin
dbms_aqadm.create_transactional_event_queue(
queue_name => 'mytopic',
-- Payload can be RAW, JSON, DBMS_AQADM.JMS_TYPE, or an object type.
-- Default is DBMS_AQADM.JMS_TYPE.
queue_payload_type => DBMS_AQADM.JMS_TYPE,
-- Multiple consumers for each message (a topic)
multiple_consumers => true
);
-- Start the queue
dbms_aqadm.start_queue(
queue_name => 'mytopic'
);
end;
/
-- Create two consumer groups for the topic
begin
dbms_aqadm.add_subscriber(
queue_name => 'mytopic',
subscriber => sys.aq$_agent('example_subscriber_1', null, null)
);
dbms_aqadm.add_subscriber(
queue_name => 'mytopic',
subscriber => sys.aq$_agent('example_subscriber_2', null, null)
);
end;
/
When subscribing to a topic from a consumer process, we must use the name of an existing consumer group — see the following Java snippet using the JMS API:
Topic jmsTopic = session.getTopic(username, topicName);
MessageConsumer consumer = session
.createDurableSubscriber(jmsTopic, "example_subscriber_1");
Note: If you’re using the OKafka Java API, the add_subscriber call is implicitly handled for you.
Find a write-up on TxEventQ JMS Topics Here, and a code sample on GitHub Here.
Message Retention (since 21c)
Sometimes (quite often) we want to retain messages in a queue or topic after consumption for message replay or auditing. By default, TxEventQ messages are purged after consumption, but from Oracle Database 21c onward, we can configure a message retention period for seekable subscribers at the queue or topic level.
To configure retention time during queue creation, set the retention_time property of the dbms_aqadm.queue_props_t object to an integer value, that represents the duration in seconds messages will be retained after consumption:
declare
props dbms_aqadm.queue_props_t;
begin
props.retention_time := 300; -- retention time in seconds
dbms_aqadm.create_transactional_event_queue(
queue_name => 'myqueue',
multiple_consumers => TRUE,
queue_properties => props -- properties containing retention time
);
end;
/
After queue creation, we can modify retention using the alter_sharded_queue PL/SQL procedure:
declare
qprops dbms_aqadm.queue_props_t;
begin
qprops.retention_time := 500; -- retention time in seconds
dbms_aqadm.alter_sharded_queue(
queue_name => 'myqueue',
queue_properties => qprops
);
end;
/
Code Samples and Demo Apps
Life is better with code samples — I maintain a number of Java demo apps in the oracle-database-java-samples GitHub repo — the following modules provide sample code for developing Java event-streaming applications using Oracle Database Transactional Event Queues:
Have a use case that’s not already covered? Leave a comment or reach out directly and I’ll be happy to help you out.