Examples
Complete Examples
Basic Send and Receive (AT_LEAST_ONCE)
Connects to an AMQP broker, sends a message to a queue, and receives it with explicit settlement (AT_LEAST_ONCE).
import com.swiftmq.amqp.v100.client.*;
import com.swiftmq.amqp.v100.messaging.*;
public class BasicSendReceive {
public static void main(String[] args) throws Exception {
AMQPContext ctx = new AMQPContext();
Connection connection = new Connection(ctx, "localhost", 5672, "user", "password");
connection.connect();
Session session = connection.createSession(100, 100);
Producer producer = session.createProducer("myqueue@router1", QoS.AT_LEAST_ONCE);
Consumer consumer = session.createConsumer("myqueue@router1", 10, QoS.AT_LEAST_ONCE, false, null);
try {
AMQPMessage msg = new AMQPMessage();
msg.setAmqpValue(new AmqpValue("Hello World"));
producer.send(msg);
AMQPMessage received = consumer.receive(5000);
if (received != null) {
System.out.println("Received: " + received.getAmqpValue().getValue());
if (!received.isSettled()) {
received.accept();
}
} else {
System.out.println("No message received");
}
} finally {
consumer.close();
producer.close();
session.close();
connection.close();
}
}
}
Transactional Send and Receive
Demonstrates sending and receiving messages within a transaction, with commit and rollback.
import com.swiftmq.amqp.v100.client.*;
import com.swiftmq.amqp.v100.messaging.*;
public class TransactionalExample {
public static void main(String[] args) throws Exception {
AMQPContext ctx = new AMQPContext();
Connection connection = new Connection(ctx, "localhost", 5672, "user", "password");
connection.connect();
Session session = connection.createSession(100, 100);
TransactionController txCtrl = session.getTransactionController();
Producer producer = session.createProducer("myqueue@router1", QoS.AT_LEAST_ONCE);
Consumer consumer = session.createConsumer("myqueue@router1", 10, QoS.AT_LEAST_ONCE, false, null);
try {
TxnIdIF txnId = txCtrl.createTxnId();
AMQPMessage msg = new AMQPMessage();
msg.setAmqpValue(new AmqpValue("Transactional Message"));
msg.setTxnIdIF(txnId);
producer.send(msg);
consumer.acquire(1, txnId);
AMQPMessage received = consumer.receive(5000);
if (received != null) {
System.out.println("Received in txn: " + received.getAmqpValue().getValue());
if (!received.isSettled()) {
received.accept();
}
}
txCtrl.commit(txnId);
} finally {
consumer.close();
producer.close();
session.close();
connection.close();
}
}
}
Durable Consumer on Topic
Creates a durable consumer on a topic, ensuring messages are not lost across disconnects.
import com.swiftmq.amqp.v100.client.*;
import com.swiftmq.amqp.v100.messaging.*;
public class DurableConsumerExample {
public static void main(String[] args) throws Exception {
AMQPContext ctx = new AMQPContext();
Connection connection = new Connection(ctx, "localhost", 5672, "user", "password");
connection.setContainerId("my-client-id");
connection.connect();
Session session = connection.createSession(100, 100);
DurableConsumer consumer = session.createDurableConsumer("my-durable-link", "mytopic", 10, QoS.AT_LEAST_ONCE, false, null);
try {
AMQPMessage received = consumer.receive(5000);
if (received != null) {
System.out.println("Durable received: " + received.getAmqpValue().getValue());
if (!received.isSettled()) {
received.accept();
}
} else {
System.out.println("No message received");
}
// To unsubscribe and destroy the durable link:
// consumer.unsubscribe();
} finally {
consumer.close();
session.close();
connection.close();
}
}
}
Non-blocking Receive with Listener
Shows how to use a MessageAvailabilityListener for asynchronous notification of new messages.
import com.swiftmq.amqp.v100.client.*;
import com.swiftmq.amqp.v100.messaging.*;
public class NonBlockingReceive {
public static void main(String[] args) throws Exception {
AMQPContext ctx = new AMQPContext();
Connection connection = new Connection(ctx, "localhost", 5672, "user", "password");
connection.connect();
Session session = connection.createSession(100, 100);
Consumer consumer = session.createConsumer("myqueue@router1", 10, QoS.AT_LEAST_ONCE, false, null);
try {
consumer.receiveNoWait(new MessageAvailabilityListener() {
public void messageAvailable(Consumer c) {
try {
AMQPMessage msg = c.receiveNoWait();
if (msg != null) {
System.out.println("Async received: " + msg.getAmqpValue().getValue());
if (!msg.isSettled()) {
msg.accept();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
// Simulate waiting for messages
Thread.sleep(10000);
} finally {
consumer.close();
session.close();
connection.close();
}
}
}