Streams Swiftlet
Overview
The SwiftMQ Streams Swiftlet provides an embedded, JavaScript-based stream processing engine for real-time message processing within the SwiftMQ router. Streams are defined as scripts that can consume, process, and produce messages using a rich, builder-style API, supporting stateful event processing, timers, JDBC lookups, and more. Scripts are managed and monitored from the router, with support for hot deployment of external libraries.
Features
Embedded JavaScript Engine
Runs JavaScript stream scripts directly inside the router using GraalVM, providing real-time, low-latency processing.
Stream Resource Model
Builder-style API to create and manage Inputs, Outputs, Memories (state), Timers, MailServers, JDBC lookups, and TempQueues.
Flexible Memory Management
State can be stored in heap, persistent queues, or temporary queues; supports time/count windows, grouping, indexing, and join operations.
Timers and Scheduling
Interval, at-time, and next-time timers for event-driven and scheduled processing.
Management and Monitoring
Streams can consume management events, dynamically react to configuration changes, and expose usage statistics.
Hot Deployable Libraries
External JARs can be loaded per-stream from a configurable directory for extending functionality.
Bundled System Streams
Includes ready-to-use stream scripts for common use cases (e.g., mail out, message scheduler, stream registry, last value cache).
Stream API Reference
Stream Object
stream.onStart(callback)— Registers a callback to be executed when the stream starts.stream.onStop(callback)— Registers a callback to be executed when the stream stops.stream.onMessage(callback)— Registers a callback to be executed for each incoming message.stream.onException(callback)— Registers a callback to handle exceptions occurring in the stream.stream.create()— Returns a builder to create stream resources (inputs, outputs, memories, timers, etc.).stream.input(name)— Returns the Input object with the given name.stream.output(name)— Returns the Output object with the given name.stream.memory(name)— Returns the Memory object with the given name.stream.memoryGroup(name)— Returns the MemoryGroup object with the given name.stream.timer(name)— Returns the Timer object with the given name.stream.mailserver(name)— Returns the MailServer object with the given name.stream.jdbcLookup(name)— Returns the JDBCLookup object with the given name.stream.tempQueue(name)— Returns the TempQueue object with the given name.stream.purgeOutputs()— Closes all Outputs not used since the last call.stream.current()— Returns the currently processed Message.stream.routerName()— Returns the name of the local router.stream.domainName()— Returns the domain name of this stream.stream.packageName()— Returns the package name of this stream.stream.name()— Returns the name of this stream.stream.fullyQualifiedName()— Returns the FQN (domain.package.name) of this stream.stream.restartCount()— Returns the number of times the stream has been restarted.stream.log()— Returns the Log object for this stream.stream.cli()— Returns the CLI object for executing management commands.stream.async(interfaceClassName, callback)— Wraps an async callback to run on the stream's event queue (for GraalVM/Nashorn compatibility).stream.executeCallback(callback, context)— Executes a function callback in the stream's event queue (not for GraalVM).stream.lastException()— Returns the last exception that occurred on the stream.stream.lastStackTrace()— Returns the stack trace of the last exception.
Examples
Queue Statistics Aggregator
Demonstrates dynamic memory groups, management input, and time window aggregation for queue statistics.
// See management/queuestats.js
stream.create().input("sys$queuemanager/usage")
.management()
.selector("name not like '%$%' and name not like 'qs\_%' and messagecount is not null")
.onAdd(function (input) {
stream.memoryGroup("queues").add(input.current());
})
.onChange(function (input) {
stream.memoryGroup("queues").add(input.current());
});
stream.create().memoryGroup("queues", "name").inactivityTimeout().minutes(2).onCreate(function (key) {
var queueName = "qs_" + key;
stream.cli()
.execute("cc sys$queuemanager/queues")
.exceptionOff()
.execute("new " + queueName)
.execute("save");
stream.create().memory(queueName).queue(queueName).limit().time().tumbling().minutes(1).onRetire(function (retired) {
print("Queue " + key + ", avg backlog last minute = " + retired.average("messagecount"));
});
return stream.memory(queueName);
}).onRemove(function (key) {
stream.cli()
.execute("cc sys$queuemanager/queues")
.execute("delete qs_" + key)
.execute("save");
});
stream.create().timer("queues").interval().minutes(1).onTimer(function (timer) {
stream.memoryGroup("queues").checkLimit();
});
Nuclear Powerplant Temp Monitor
Shows multi-memory pattern, warning/critical detection, and email alert integration.
// See nuclear/tempmonitor.js
stream.create().memory("all").heap().limit().time().tumbling().seconds(10).onRetire(function (retired) {
stream.log().info("AVERAGE TEMP LAST 10 SECS=" + retired.average("temp") +
", MIN=" + retired.min("temp").property("temp").value().toObject() +
", MAX=" + retired.max("temp").property("temp").value().toObject());
stream.log().info(retired.values("temp"));
});
stream.create().memory("warning").heap().limit().count(2).sliding();
stream.create().memory("critical").heap().limit().count(4).sliding();
stream.create().mailserver(mailHost).username(mailUser).password(mailPassword).connect();
stream.create().input(inputQueue).queue().onInput(function (input) {
input.current().property("temp").set(input.current().property("TEMP").value().toInteger());
});
stream.create().timer("monitor").interval().seconds(10).onTimer(function (timer) {
stream.memory("all").checkLimit();
});
stream.onMessage(function () {
stream.memory("all").add(stream.current());
stream.memory("warning").add(stream.current());
stream.memory("critical").add(stream.current());
if (stream.memory("warning").select("temp > 100").size() == 2)
stream.log().warning("LAST 2 TEMPS > 100!");
if (stream.memory("critical").size() == 4 &&
stream.memory("critical").first().property("temp").value().toDouble() > 400 &&
stream.memory("critical").ascendingSeries("temp") &&
stream.memory("critical").last().property("temp").value().toDouble() >
stream.memory("critical").first().property("temp").value().toDouble() * 1.5) {
stream.log().error("WE HAVE A SPIKE!!!");
stream.mailserver(mailHost)
.email()
.from(mailFrom)
.to(mailTo)
.bcc(mailBcc)
.subject("Nuclear Powerplant Monitor - CRITICAL!")
.body("Temp spike detected, last temps=" + stream.memory("critical").values("temp"))
.send();
}
});
Order Aggregator (EI Pattern)
Implements the Aggregator pattern by joining order head and order position messages, emitting XML when complete.
// See ordercollect/ordercollector.js
var orderHeadQueue = parameters.require("orderhead-queue");
var orderPosQueue = parameters.require("orderpos-queue");
var outputQueue = parameters.require("output-queue");
stream.create().memory("orderhead").queue("stream_ordercollector_memory_orderhead").createIndex("ORDERHEADID");
stream.create().memory("orderpos").queue("stream_ordercollector_memory_orderpos").createIndex("ORDERHEADID");
stream.memory("orderhead").clear();
stream.memory("orderpos").clear();
stream.create().input(orderHeadQueue).queue().onInput(function (input) {
stream.memory("orderhead").add(input.current());
});
stream.create().input(orderPosQueue).queue().onInput(function (input) {
input.current().property("ITEMNO").set(input.current().body().get("ITEMNO").toInteger());
input.current().property("QTY").set(input.current().body().get("QTY").toInteger());
stream.memory("orderpos").add(input.current());
});
stream.create().timer("purger").interval().hours(1).onTimer(function (timer) {
stream.cli()
.exceptionOff()
.execute("cc sys$queuemanager/usage")
.execute("remove " + outputQueue + " *");
});
stream.create().output(outputQueue).queue();
stream.onMessage(function () {
var orderHeadId = stream.current().property("ORDERHEADID").value().toInteger();
var orderHeadMem = stream.memory("orderhead").index("ORDERHEADID").get(orderHeadId);
if (orderHeadMem.size() == 1) {
var orderPosMem = orderHeadMem.join(stream.memory("orderpos"), "ORDERHEADID");
if (orderHeadMem.first().property("NPOSITIONS").value().toInteger() == orderPosMem.size()) {
var orderHead = orderHeadMem.first();
var msg = "<order orderheadid=\"" + orderHead.body().get("ORDERHEADID").toStringValue() + "\"" +
" accountno=\"" + orderHead.body().get("ACCOUNTNO").toStringValue() + "\">\n";
orderPosMem.forEach(function (message) {
msg += " <position itemno=\"" + message.property("ITEMNO").value().toString() + "\"" +
" quantity=\"" + message.property("QTY").value().toString() + "\"/>\n";
});
msg += "</order>";
stream.log().info(msg);
stream.output(outputQueue).send(stream.create().message().textMessage().persistent().body(msg));
stream.memory("orderhead").index("ORDERHEADID").remove(orderHeadId);
stream.memory("orderpos").index("ORDERHEADID").remove(orderHeadId);
}
}
});
Bundled System Streams
lastvalue.js
Implements a last-value cache for topics, storing the latest value per key and distributing updates to subscribers. Supports expiration and persistence.
Parameters: last-value-property, last-value-value, expiration, persistent, topic-name, verbose, store-queue
mailout.js
Receives messages from a queue and sends them as emails via a configured SMTP server.
Parameters: input-queue, servername, username, password, default-from, default-to, default-cc, default-bcc, default-subject
messagescheduler.js
Schedules messages for delayed delivery to a destination, supporting both new and legacy scheduler properties.
Parameters: input-queue, store-queue, delay-format, interval, purge-interval, max-batch-size
routeannouncer.js
Announces active routing table entries (routes) on a topic, supporting dynamic discovery and updates.
Parameters: topic
streammonitor.js
Monitors all streams registered at the stream registry, logging all messages received from them.
Parameters: registry-topic
streamregistry.js
Maintains a registry of available streams, handling registration, deregistration, and discovery requests.
Parameters: topic
streamrepository.js
Implements a repository for script files, allowing add/remove/list operations via messages.
Parameters: input-queue, store-queue
Sample Applications
Queue Statistics Monitoring
Monitors queue statistics using management input.
queuestats.js:
/**
* Author: IIT Software GmbH, Muenster/Germany, (c) 2017, All Rights Reserved
*/
// Create a ManagementInput on sys$queuemanager/usage and receive adds and changes.
// No need to set a onRemove callback as these Memories time out by the inactivity timeout.
// The selector filters those queue which are not system queues and not our own queue (qs_)
// and the message must contain the messagecount property (other property changes are not of interest).
stream.create().input("sys$queuemanager/usage")
.management()
.selector("name not like '%$%' and name not like 'qs\_%' and messagecount is not null")
.onAdd(function (input) {
// A new queue has been created.
// Add it to the memory group.
stream.memoryGroup("queues").add(input.current());
})
.onChange(function (input) {
// Property "messagecount" has changed.
// Add it to the MemoryGroup
stream.memoryGroup("queues").add(input.current());
});
// Create a MemoryGroup for all queues and register a callback for new groups
stream.create().memoryGroup("queues", "name").inactivityTimeout().minutes(2).onCreate(function (key) {
// This is an example on how to use these callbacks. You can create whatever type of Memories.
// Here we create a QueueMemory
var queueName = "qs_" + key;
// We need to create a regular queue as a backstore for the QueueMemory
stream.cli()
.execute("cc sys$queuemanager/queues")
.exceptionOff() // Queue might be already defined
.execute("new " + queueName)
.execute("save");
// A new value of the group is received and we create a Memory for it
// with a 1 min tumbling time window
stream.create().memory(queueName).queue(queueName).limit().time().tumbling().minutes(1).onRetire(function (retired) {
// Window is closed, we print the average message count over the last minute
print("Queue " + key + ", avg backlog last minute = " + retired.average("messagecount"));
});
// Return the new memory
return stream.memory(queueName);
}).onRemove(function (key) {
// Memory has been removed. Delete the corresponding persistent queue
stream.cli()
.execute("cc sys$queuemanager/queues")
.execute("delete qs_" + key)
.execute("save");
});
// Create a timer that calls checkLimit on the memory group each minute
stream.create().timer("queues").interval().minutes(1).onTimer(function (timer) {
stream.memoryGroup("queues").checkLimit();
});
Temperature Producer/Monitor (CEP)
Complex event processing with temperature monitoring.
tempmonitor.js:
/**
* Author: IIT Software GmbH, Muenster/Germany, (c) 2016, All Rights Reserved
*/
// Checking parameters
var inputQueue = parameters.require("input-queue");
var mailHost = parameters.optional("mail-host", "localhost");
var mailUser = parameters.require("mail-user");
var mailPassword = parameters.require("mail-password");
var mailFrom = parameters.require("mail-from");
var mailTo = parameters.require("mail-to");
var mailBcc = parameters.require("mail-bcc");
// Create 3 Memories, one for monitoring (all), one to detect warning, one to detect critical conditions
stream.create().memory("all").heap().limit().time().tumbling().seconds(10).onRetire(function (retired) {
stream.log().info("AVERAGE TEMP LAST 10 SECS=" + retired.average("temp") +
", MIN=" + retired.min("temp").property("temp").value().toObject() +
", MAX=" + retired.max("temp").property("temp").value().toObject());
stream.log().info(retired.values("temp"));
});
stream.create().memory("warning").heap().limit().count(2).sliding();
stream.create().memory("critical").heap().limit().count(4).sliding();
// Create a mail server to send critical mails
stream.create().mailserver(mailHost).username(mailUser).password(mailPassword).connect();
// Create the Inputs
stream.create().input(inputQueue).queue().onInput(function (input) {
// We need a property "temp" instead of "TEMP"
input.current().property("temp").set(input.current().property("TEMP").value().toInteger());
});
// Create a timer to trigger the retirement of the "all" memory every 10 secs
stream.create().timer("monitor").interval().seconds(10).onTimer(function (timer) {
stream.memory("all").checkLimit();
});
// Set the onMessage callback
stream.onMessage(function () {
// Add current message to all memories
stream.memory("all").add(stream.current());
stream.memory("warning").add(stream.current());
stream.memory("critical").add(stream.current());
// Check if we have a warning condition (last 2 temps > 100)
if (stream.memory("warning").select("temp > 100").size() == 2)
stream.log().warning("LAST 2 TEMPS > 100!");
// Check if we have a critical condition (spike):
// At least 4 messages received,
// First temp > 400,
// All temps are greater than the temp before (ascendingSeries),
// Last temp is 1.5 times greater than the first temp.
if (stream.memory("critical").size() == 4 &&
stream.memory("critical").first().property("temp").value().toDouble() > 400 &&
stream.memory("critical").ascendingSeries("temp") &&
stream.memory("critical").last().property("temp").value().toDouble() >
stream.memory("critical").first().property("temp").value().toDouble() * 1.5) {
stream.log().error("WE HAVE A SPIKE!!!");
stream.mailserver(mailHost)
.email()
.from(mailFrom)
.to(mailTo)
.bcc(mailBcc)
.subject("Nuclear Powerplant Monitor - CRITICAL!")
.body("Temp spike detected, last temps=" + stream.memory("critical").values("temp"))
.send();
}
});
tempproducer.js:
/**
* Author: IIT Software GmbH, Muenster/Germany, (c) 2016, All Rights Reserved
*/
// Check parameters
var outputQueue = parameters.require("output-queue");
// Create the Output to send the temp messages
stream.create().output(outputQueue).queue();
// Static temp data that will generate a warning and critical condition
var temps = [90, 97, 101, 80, 90, 110, 120, 200, 250, 500, 450, 320, 401, 402, 450, 800];
var idx = 0;
// Timer to send temps in a 2 secs interval
stream.create().timer("ticker").interval().seconds(2).onTimer(function (timer) {
stream.log().info("Sending temp: " + temps[idx]);
var msg = stream.create().message().message().nonpersistent().property("TEMP").set(temps[idx++]);
if (idx == temps.length)
idx = 0;
stream.output(outputQueue).send(msg);
});
Order Collector
Aggregator pattern collecting orders into batches.
itemstats.js:
/**
* Author: IIT Software GmbH, Muenster/Germany, (c) 2016, All Rights Reserved
*
* This example demonstrates: WireTap, Management Input, Timer reconfiguration.
* It creates a wiretap into the orderpos queue and generates a statistic.
*/
// Parameters
var orderPosQueue = parameters.require("orderpos-queue");
var statIntervalSec = parameters.require("statistic-interval-sec");
// Callback function for onRetire to print the statistics
function printStatistic(retired) {
// Generate statistics in the streams's log file
stream.log().info("Item Statistics:");
// Get and log the statistic data (items with summarized quantity in descending order)
retired.group("ITEMNO").sum("QTY").sort("QTY").reverse().forEach(function (message) {
stream.log().info(message.property("ITEMNO").value().toString() + " = " + message.property("QTY").value().toString());
});
}
// Create the itemstats memories for the item statistics
stream.create().memory("itemstats").heap().limit().time().tumbling().seconds(statIntervalSec).onRetire(printStatistic);
// Create a timer that triggers the item statistics every n secs
stream.create().timer("stats").interval().seconds(statIntervalSec).onTimer(function (timer) {
// Reduce the memory according to the limit
stream.memory("itemstats").checkLimit();
});
// Create a wiretap input on the orderpos queue.
stream.create().input(orderPosQueue).wiretap("w1").onInput(function (input) {
// We need ITEMNO and QTY as a property
input.current().property("ITEMNO").set(input.current().body().get("ITEMNO").toInteger());
input.current().property("QTY").set(input.current().body().get("QTY").toInteger());
stream.memory("itemstats").add(input.current());
});
// Create a Management Input that receives changes on my own statistic-interval-sec parameter
// and reconfigures the Timer
stream.create().input("sys$streams/domains/swiftmq/packages/samples/streams/" + stream.name() + "/parameters/statistic-interval-sec").management().onChange(function (input) {
// Get the new value
var secs = input.current().property("value").value().toInteger();
// Reset and reconfigure the Timer with the new value
stream.timer("stats").reset().seconds(secs).reconfigure();
// Recreate the itemstats Memory
stream.memory("itemstats").close();
stream.create().memory("itemstats").heap().limit().time().tumbling().seconds(secs).onRetire(printStatistic);
// Log it into the Logfile
stream.log().info("Statistic interval reconfigured to " + secs + " seconds");
});
ordercollector.js:
/**
* Author: IIT Software GmbH, Muenster/Germany, (c) 2016, All Rights Reserved
*
* This example implements the EI-Pattern "Aggregator". It receives OrderHead
* and OrderPos messages from different queues and generates a XML messages
* of the full order once completely received.
*/
// 3 queues are required: orderhead, orderpos, and the output
var orderHeadQueue = parameters.require("orderhead-queue");
var orderPosQueue = parameters.require("orderpos-queue");
var outputQueue = parameters.require("output-queue");
// Create the memories: orderhead, orderpos
stream.create().memory("orderhead").queue("stream_ordercollector_memory_orderhead").createIndex("ORDERHEADID");
stream.create().memory("orderpos").queue("stream_ordercollector_memory_orderpos").createIndex("ORDERHEADID");
stream.memory("orderhead").clear();
stream.memory("orderpos").clear();
// Create the inputs
stream.create().input(orderHeadQueue).queue().onInput(function (input) {
// Add message to orderhead memory
stream.memory("orderhead").add(input.current());
});
stream.create().input(orderPosQueue).queue().onInput(function (input) {
// We need ITEMNO and QTY as a property
input.current().property("ITEMNO").set(input.current().body().get("ITEMNO").toInteger());
input.current().property("QTY").set(input.current().body().get("QTY").toInteger());
// Add message to orderpos memory
stream.memory("orderpos").add(input.current());
});
// Purge output queue every hour
stream.create().timer("purger").interval().hours(1).onTimer(function (timer) {
stream.cli()
.exceptionOff()
.execute("cc sys$queuemanager/usage")
.execute("remove " + outputQueue + " *");
});
// Create an Output to send the collected Orders
stream.create().output(outputQueue).queue();
// Create the onMessage callback
stream.onMessage(function () {
// Get the orderhead id from the current message (may be a orderhead or orderpos message)
var orderHeadId = stream.current().property("ORDERHEADID").value().toInteger();
// Get all orderheads we currently have for this orderhead id
var orderHeadMem = stream.memory("orderhead").index("ORDERHEADID").get(orderHeadId);
// Check if we have one orderhead for this orderhead
if (orderHeadMem.size() == 1) {
// Join it with the orderpos memory
var orderPosMem = orderHeadMem.join(stream.memory("orderpos"), "ORDERHEADID");
// Check if we have all orderpositions mentioned in the orderhead
if (orderHeadMem.first().property("NPOSITIONS").value().toInteger() == orderPosMem.size()) {
// Generate XML message with the completed order
var orderHead = orderHeadMem.first();
var msg = "<order orderheadid=\"" + orderHead.body().get("ORDERHEADID").toStringValue() + "\"" +
" accountno=\"" + orderHead.body().get("ACCOUNTNO").toStringValue() + "\">\n";
orderPosMem.forEach(function (message) {
msg += " <position itemno=\"" + message.property("ITEMNO").value().toString() + "\"" +
" quantity=\"" + message.property("QTY").value().toString() + "\"/>\n";
});
msg += "</order>";
// Log it into the stream's log file
stream.log().info(msg);
// Send it to the output queue as persistent text message
stream.output(outputQueue).send(stream.create().message().textMessage().persistent().body(msg));
// Remove the messages with this orderhead id from orderhead and orderpos memories
stream.memory("orderhead").index("ORDERHEADID").remove(orderHeadId);
stream.memory("orderpos").index("ORDERHEADID").remove(orderHeadId);
}
}
});
orderheadproducer.js:
/**
* Author: IIT Software GmbH, Muenster/Germany, (c) 2016, All Rights Reserved
*/
// We need 1 output queue
var outputQueue = parameters.require("output-queue");
// Create the Output to send Orderhead messages
stream.create().output(outputQueue).queue();
// Static data for the orderhead messages
var accounts = [1513, 271, 3300, 8800, 17777, 55167, 99220, 1513, 8800, 1900, 3344, 19220];
var orderheadids = [1011, 998, 1567, 3318, 90001, 44526, 33900, 1444, 9344, 1788, 9900, 14144];
var npositions = [3, 5, 1, 5, 6, 1, 1, 4, 8, 7, 3, 5];
var idx = 0;
// Create a timer that sends an orderhead map message every 5 secs
stream.create().timer("ticker").interval().seconds(5).onTimer(function (timer) {
if (idx < accounts.length) {
stream.output(outputQueue).send(stream.create().message().mapMessage().persistent()
.property("ACCOUNTNO").set(accounts[idx])
.property("ORDERHEADID").set(orderheadids[idx])
.property("NPOSITIONS").set(npositions[idx])
.body().set("ACCOUNTNO", accounts[idx])
.set("ORDERHEADID", orderheadids[idx])
.set("NPOSITIONS", npositions[idx]).message());
idx++;
} else {
stream.create().timer("restarter").next().beginOfHour().onTimer(function (timer) {
idx = 0;
}).start();
}
});
orderposproducer.js:
/**
* Author: IIT Software GmbH, Muenster/Germany, (c) 2016, All Rights Reserved
*/
// We need 1 output queue
var outputQueue = parameters.require("output-queue");
// Create the Output to send Orderpos messages
stream.create().output(outputQueue).queue();
// Static orderpos data
var orderpos = [[14144, 4711, 2],
[14144, 3318, 3],
[14144, 1715, 1],
[9900, 1522, 1],
[9900, 3318, 2],
[9900, 4711, 1],
[1788, 4355, 1],
[1788, 6722, 1],
[1011, 1677, 3],
[998, 1544, 1],
[998, 4711, 1],
[1788, 2011, 4],
[1788, 1988, 1],
[1788, 3789, 1],
[9344, 1988, 1],
[9344, 2011, 1],
[9344, 9211, 1],
[998, 9211, 1],
[998, 3318, 1],
[9344, 1031, 2],
[9344, 1544, 1],
[9344, 1677, 1],
[1444, 1988, 1],
[1444, 6722, 4],
[90001, 1677, 3],
[14144, 2011, 2],
[14144, 1988, 5],
[90001, 3318, 2],
[90001, 2011, 5],
[1788, 1566, 3],
[1788, 9913, 1],
[90001, 1677, 1],
[90001, 4355, 2],
[3318, 1988, 2],
[3318, 9913, 2],
[9344, 3318, 1],
[9344, 4417, 4],
[33900, 6722, 1],
[44526, 4711, 2],
[90001, 9211, 2],
[3318, 4355, 2],
[1444, 3318, 2],
[1444, 4355, 3],
[3318, 1544, 2],
[3318, 1677, 2],
[1567, 4711, 1],
[998, 2011, 10],
[1011, 1031, 2],
[1011, 4444, 11]];
var idx = 0;
// Create a timer that sends one orderpos per second
stream.create().timer("ticker").interval().seconds(5).onTimer(function (timer) {
if (idx < orderpos.length) {
stream.output(outputQueue).send(stream.create().message().mapMessage().persistent()
.property("ORDERHEADID").set(orderpos[idx][0])
.property("ITEMNO").set(orderpos[idx][1])
.property("QTY").set(orderpos[idx][2])
.body().set("ORDERHEADID", orderpos[idx][0])
.set("ITEMNO", orderpos[idx][1])
.set("QTY", orderpos[idx][2]).message());
idx++;
}
else {
stream.create().timer("restarter").next().beginOfHour().onTimer(function (timer) {
idx = 0;
}).start();
}
});
Request-Reply Echo Service
Echo service demonstrating request-reply pattern.
replier.js:
/**
* Author: IIT Software GmbH, Muenster/Germany, (c) 2016, All Rights Reserved
*/
// A simple echo service that creates an input on a temp queue where it receives its requests
// The temp queue is registered in JNDI so that JMS clients can look it up and send requests to it
stream.create().input(stream.create().tempQueue("requestQueue").registerJNDI()).queue();
stream.onMessage(function () {
stream.create().output(null).forAddress(stream.current().replyTo())
.send(stream.create().message().textMessage().correlationId(stream.current().messageId())
.body("RE: " + stream.current().body())).close();
});
Configuration Reference
The top-level entity in routerconfig.xml is <swiftlet name="sys$streams">.
<swiftlet name="sys$streams"> Properties
These properties are attributes of the <swiftlet name="sys$streams"> entity.
| Parameter | Type | Default | Mandatory | Reboot Required | Description |
|---|---|---|---|---|---|
collect-interval |
Long | 1000 |
No | No | Interval for collecting Stream usage information |
stream-grant-predicates |
String | stream\_% |
No | No | SQL-Like Predicates to grant Stream Topic access |
stream-lib-directory |
String | ../data/streamlib |
No | Yes | Hot deploy directory for external jar files used from a stream |
<swiftlet name="sys$streams" collect-interval="1000" stream-grant-predicates="stream\_%" stream-lib-directory="../data/streamlib"/>
<domains> in <swiftlet name="sys$streams">
Stream Domain Names
Each <domain> entry is identified by its name attribute (the Domain).
<swiftlet name="sys$streams">
<domains>
<domain name="..."/>
</domains>
</swiftlet>
<packages> in <domains>
Stream Packages
Each <package> entry is identified by its name attribute (the Package).
<swiftlet name="sys$streams">
<domains>
<domain name="...">
<packages>
<package name="..."/>
</packages>
</domain>
</domains>
</swiftlet>
Changelog
13.2.0 (2025-11-03)
- Modified Stream
- StreamController: added timeout handling
- StreamController: added timeout handling, added validation, added null check, added error handling, fixed boundary condition; StreamProcessor.dispatch(): added null check, added timeout handling; StreamProcessor: added safeAccept()
- StreamController: added timeout handling
13.1.0 (2025-01-16)
- StreamProcessor: removed process()
- Modified StreamContext; StreamController: fixed boundary condition, added error handling; StreamController: added loadClass(); StreamController: removed findClass(); Modified GraalSetup
- Modified StreamsSwiftlet; Modified StreamProcessor
- Modified StreamController