Skip to content

Examples

Complete Examples

Basic Send and Receive (AT_LEAST_ONCE)

Connects to an AMQP broker, sends a message to a queue, and receives it with explicit settlement (AT_LEAST_ONCE).

import com.swiftmq.amqp.v100.client.*;
import com.swiftmq.amqp.v100.messaging.*;

public class BasicSendReceive {
    public static void main(String[] args) throws Exception {
        AMQPContext ctx = new AMQPContext();
        Connection connection = new Connection(ctx, "localhost", 5672, "user", "password");
        connection.connect();
        Session session = connection.createSession(100, 100);
        Producer producer = session.createProducer("myqueue@router1", QoS.AT_LEAST_ONCE);
        Consumer consumer = session.createConsumer("myqueue@router1", 10, QoS.AT_LEAST_ONCE, false, null);
        try {
            AMQPMessage msg = new AMQPMessage();
            msg.setAmqpValue(new AmqpValue("Hello World"));
            producer.send(msg);
            AMQPMessage received = consumer.receive(5000);
            if (received != null) {
                System.out.println("Received: " + received.getAmqpValue().getValue());
                if (!received.isSettled()) {
                    received.accept();
                }
            } else {
                System.out.println("No message received");
            }
        } finally {
            consumer.close();
            producer.close();
            session.close();
            connection.close();
        }
    }
}

Transactional Send and Receive

Demonstrates sending and receiving messages within a transaction, with commit and rollback.

import com.swiftmq.amqp.v100.client.*;
import com.swiftmq.amqp.v100.messaging.*;

public class TransactionalExample {
    public static void main(String[] args) throws Exception {
        AMQPContext ctx = new AMQPContext();
        Connection connection = new Connection(ctx, "localhost", 5672, "user", "password");
        connection.connect();
        Session session = connection.createSession(100, 100);
        TransactionController txCtrl = session.getTransactionController();
        Producer producer = session.createProducer("myqueue@router1", QoS.AT_LEAST_ONCE);
        Consumer consumer = session.createConsumer("myqueue@router1", 10, QoS.AT_LEAST_ONCE, false, null);
        try {
            TxnIdIF txnId = txCtrl.createTxnId();
            AMQPMessage msg = new AMQPMessage();
            msg.setAmqpValue(new AmqpValue("Transactional Message"));
            msg.setTxnIdIF(txnId);
            producer.send(msg);
            consumer.acquire(1, txnId);
            AMQPMessage received = consumer.receive(5000);
            if (received != null) {
                System.out.println("Received in txn: " + received.getAmqpValue().getValue());
                if (!received.isSettled()) {
                    received.accept();
                }
            }
            txCtrl.commit(txnId);
        } finally {
            consumer.close();
            producer.close();
            session.close();
            connection.close();
        }
    }
}

Durable Consumer on Topic

Creates a durable consumer on a topic, ensuring messages are not lost across disconnects.

import com.swiftmq.amqp.v100.client.*;
import com.swiftmq.amqp.v100.messaging.*;

public class DurableConsumerExample {
    public static void main(String[] args) throws Exception {
        AMQPContext ctx = new AMQPContext();
        Connection connection = new Connection(ctx, "localhost", 5672, "user", "password");
        connection.setContainerId("my-client-id");
        connection.connect();
        Session session = connection.createSession(100, 100);
        DurableConsumer consumer = session.createDurableConsumer("my-durable-link", "mytopic", 10, QoS.AT_LEAST_ONCE, false, null);
        try {
            AMQPMessage received = consumer.receive(5000);
            if (received != null) {
                System.out.println("Durable received: " + received.getAmqpValue().getValue());
                if (!received.isSettled()) {
                    received.accept();
                }
            } else {
                System.out.println("No message received");
            }
            // To unsubscribe and destroy the durable link:
            // consumer.unsubscribe();
        } finally {
            consumer.close();
            session.close();
            connection.close();
        }
    }
}

Non-blocking Receive with Listener

Shows how to use a MessageAvailabilityListener for asynchronous notification of new messages.

import com.swiftmq.amqp.v100.client.*;
import com.swiftmq.amqp.v100.messaging.*;

public class NonBlockingReceive {
    public static void main(String[] args) throws Exception {
        AMQPContext ctx = new AMQPContext();
        Connection connection = new Connection(ctx, "localhost", 5672, "user", "password");
        connection.connect();
        Session session = connection.createSession(100, 100);
        Consumer consumer = session.createConsumer("myqueue@router1", 10, QoS.AT_LEAST_ONCE, false, null);
        try {
            consumer.receiveNoWait(new MessageAvailabilityListener() {
                public void messageAvailable(Consumer c) {
                    try {
                        AMQPMessage msg = c.receiveNoWait();
                        if (msg != null) {
                            System.out.println("Async received: " + msg.getAmqpValue().getValue());
                            if (!msg.isSettled()) {
                                msg.accept();
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
            // Simulate waiting for messages
            Thread.sleep(10000);
        } finally {
            consumer.close();
            session.close();
            connection.close();
        }
    }
}