Skip to content

AMQP 1.0 Client

Overview

The SwiftMQ AMQP 1.0 Client API provides a standalone, non-JMS AMQP 1.0 client for connecting to AMQP brokers, sending and receiving messages, and managing transactions. It supports advanced features such as configurable Quality of Service (QoS) levels, transactional message delivery, durable consumers, and detailed connection/session management. The API is designed for direct, programmatic use in Java applications, offering fine-grained control over AMQP protocol interactions.

Connection and Session Setup

To interact with an AMQP broker, first create a Connection specifying the hostname, port, and optionally authentication credentials. The connection supports SASL authentication (mechanisms: PLAIN, CRAM-MD5, Digest-MD5, ANONYMOUS) and can be customized for buffer sizes, idle timeout, and SSL/TLS by setting a SocketFactory. After configuring, call connect() to establish the connection. Sessions are created from a connected Connection using createSession(long incomingWindowSize, long outgoingWindowSize). Each session manages its own window sizes for flow control and is the parent for producers and consumers.

import com.swiftmq.amqp.v100.client.Connection;
import com.swiftmq.amqp.v100.client.Session;

Connection connection = new Connection(ctx, "broker-host", 5672, "user", "password");
connection.setIdleTimeout(30000);
connection.connect();
Session session = connection.createSession(100, 100);

Producer and Consumer Creation

Producers and consumers are created from a Session. Use createProducer(String target, int qoS) to create a producer for a queue or topic, specifying the QoS level. For consumers, use createConsumer(String source, int linkCredit, int qoS, boolean noLocal, String selector). The linkCredit parameter controls the prefetch window (number of messages buffered client-side). Durable consumers are supported via createDurableConsumer(String linkName, String source, ...), requiring a unique container id set on the connection. Temporary destinations can be created for reply-to patterns.

Producer producer = session.createProducer("myqueue@router1", QoS.AT_LEAST_ONCE);
Consumer consumer = session.createConsumer("myqueue@router1", 100, QoS.AT_LEAST_ONCE, false, null);

Quality of Service (QoS) Levels

The API supports three QoS levels: QoS.AT_MOST_ONCE (fire-and-forget, settled before send), QoS.AT_LEAST_ONCE (settled after receive, requires explicit accept/reject), and QoS.EXACTLY_ONCE (two-phase settlement). Producers and consumers must be created with a valid QoS constant. For received messages, the client must check if the message is settled (isSettled()); if not, call accept() or reject() to complete settlement (required for AT_LEAST_ONCE and EXACTLY_ONCE).

Producer producer = session.createProducer("mytopic", QoS.EXACTLY_ONCE);
Consumer consumer = session.createConsumer("mytopic", 50, QoS.EXACTLY_ONCE, false, null);

Transactional Messaging

Transactional message delivery is managed via the TransactionController obtained from a session. Create a transaction id with createTxnId(), then send or acquire messages within the transaction by passing the txn id to producer/consumer methods. Commit or rollback the transaction using commit(txnId) or rollback(txnId). The API supports local transactions and can query server capabilities for distributed or promotable transactions.

TransactionController txCtrl = session.getTransactionController();
TxnIdIF txnId = txCtrl.createTxnId();
// Use txnId in producer.send(msg, ..., txnId) or consumer.acquire(..., txnId)
txCtrl.commit(txnId);

Error Handling and Exception Management

The API defines specific exceptions for connection, session, and link errors (e.g., ConnectionClosedException, SessionClosedException, LinkClosedException). An ExceptionListener can be registered on a Connection to receive asynchronous error notifications. Most API methods throw checked exceptions for protocol errors, authentication failures, or invalid states. Always close connections, sessions, and links to release resources.

connection.setExceptionListener(new ExceptionListener() {
    public void onException(Exception exception) {
        System.err.println("Connection error: " + exception.getMessage());
    }
});

Message Structure and Handling

Messages are represented by AMQPMessage, which supports all AMQP 1.0 sections: header, properties, annotations, application properties, body (data, sequence, or value), and footer. Received messages can be inspected and modified. For QoS modes requiring settlement, use accept() or reject() on the message. For transactional messages, the transaction id is set on the message.

AMQPMessage msg = new AMQPMessage();
msg.setAmqpValue(new AmqpValue("Hello World"));
producer.send(msg);