Table of contents
FFMQ requires a JRE version 1.4 or above to run.
Default JVM parameters are set in the bin/setenv.sh script, which is read on startup.
If you want to change things like the default FFMQ heap size this is the file to edit.
Download the latest ffmq3-distribution-<version>-dist.zip archive from the Downloads page, and unpack it somewhere.
You will get the following directory tree :
The bin/ directory contains shell scripts to start, stop and manage the FFMQ server.
The bridges/ directory holds JMS bridges descriptors loaded by the server on startup.
The conf/ directory contains various configuration files used by the server (see Configuring below).
The conf/templates/ directory contains default templates used to create new destinations (queues or topics).
The data/ directory (empty upon installation) is the default data folder where queues store/index files will be stored.
The destinations/ directory (empty upon installation) is the default descriptor folder where queues and topics descriptors will be created.
The docs/ directory contains these documentation files.
The lib/ directory contains all the jar files used by the server.
The logs/ directory will hold the server log files
On the client-side, you need the ffmq3-core.jar in your classpath. (plus the jms-api and commons-logging if you don't already have them)
You will find those jars in the lib/ directory of the server package.
The main configuration file is conf/ffmq-server.properties.
This is a standard java properties file with key/pair values.
See inside the distributed file for details, the various configuration settings are described in comments.
FFMQ supports destination auto-creation based on templates.
When enabled, on first usage of a non-existing queue (or topic), a matching template will be looked up in the
templates.mapping file and used to generate a valid queue (or topic) descriptor.
This operation is transparent to clients and is useful to create auto-deployable queuers.
If the default XML-based security module is enabled in the main configuration file, users and permissions must
be described in the security.xml file.
See syntax inside the distributed file. Please note that security is *disabled* by default.
When a queue (or topic) is auto-created, the FFMQ engine looks up a template descriptor and
copy the template values into a real descriptor file (in the destinations/ folder)
Here is a sample queue template descriptor (which is actually a properties file) :
# # This is the default template for queues # name = DEFAULT_QUEUE_TEMPLATE persistentStore.initialBlockCount = 1000 persistentStore.autoExtendAmount = 1000 persistentStore.maxBlockCount = 10000 persistentStore.blockSize = 4096 persistentStore.dataFolder = ${FFMQ_BASE}/data persistentStore.useJournal = true memoryStore.maxMessages = 1000
Here is an exhaustive list of the supported properties and their default values :
Common Properties
Property | Default | Usage |
---|---|---|
name | No default | The template name (as referenced in the templates.mapping file) |
persistentStore.initialBlockCount | 0 | Initial number of available blocks in the persistent store |
persistentStore.maxBlockCount | 0 | Maximum number of blocks in the persistent store. The store file may auto-extend up to this size (see persistentStore.autoExtendAmount).0 means no persistent storage. |
persistentStore.autoExtendAmount | 0 | Amount to grow by when auto-extending the store. The store is auto-extended when full. |
persistentStore.blockSize | 4096 | Size of a storage block. Minimum size is 1024 (1KB). Using bigger values may improve performance with large messages but will waste more disk space. |
persistentStore.dataFolder | ${FFMQ_BASE}/data | Folder where to store persistent files for this destination. May be relative or absolute. May use system properties as variables. |
persistentStore.useJournal | true | Enables the use of a synced transaction log for the persistent store. Setting this to false disables transaction logs and filesystem syncing for this destination, which is a lot faster but no longer fail-safe. |
persistentStore.journal.dataFolder | same as persistentStore.dataFolder | Folder where to store journal files for this destination. May be relative or absolute. May use system properties as variables. This is useful if you want to place journal files on different hardware devices (or on faster devices like SSDs) to improve performance. |
persistentStore.journal.maxFileSize | 33554432 | Maximum size of a journal file before creating a new one. Default is 32MB. Actual journal files size may grow a little larger than this limit if it ends on a large transaction. |
persistentStore.journal.maxWriteBatchSize | 1000 | Maximum number of operations to process per asynchronous task. This setting has various implications on lock contention and commit barriers. |
persistentStore.journal.maxUnflushedJournalSize | 4194304 | Maximum size of journal data that may be bufferized in memory before starting to write to the actual journal file. Default is 4MB. |
persistentStore.journal.maxUncommittedStoreSize | 16777216 | Maximum size of store data that may be written to disk without syncing. Default is 16MB. |
persistentStore.journal.outputBufferSize | 16384 | Size of the I/O buffer to be used when writing to a journal file. Default is 16KB. |
persistentStore.journal.preAllocateFiles | true | If true, journal files are pre-allocated to improve performance. |
persistentStore.syncMethod | 2 |
Select the disk sync method to use :
|
memoryStore.maxMessages | 0 | Maximum number of non-persistent messages to store in memory. |
memoryStore.overflowToPersistent | false | Allow non-persistent messages to be stored in the persistent store if the memory store is full. Note that if you use this option, message ordering is no longer guaranteed. |
Topic Properties
Property | Default | Usage |
---|---|---|
subscriberFailurePolicy | 1 | Define this topic behavior when an exception occurs while pushing a message to a subscriber. (Bit mask)
|
subscriberOverflowPolicy | 1 | Define this topic behavior when a subscriber queue is full. (Bit mask)
|
Notes :
A destination descriptor is much like a template descriptor but it represents an actual existing destination.
Descriptors are automatically created from a template or created on demand from the administration utility (see below).
Syntax is the same as for a template, only the 'name' property refers to the actual destination name.
Notes :
There is a command-line utility to administrate the server. The utility must be configured
in the conf/ffmq-admin-client.properties properties file.
See usage below.
Here is the default implementation behavior regarding synchronous/asynchronous calls :
PERSISTENT | NON-PERSISTENT | |
---|---|---|
TRANSACTED | Async* | Async* |
DUPS_OK_ACKNOWLEDGE | Sync | Async |
AUTO_ACKNOWLEDGE | Sync | Async |
CLIENT_ACKNOWLEDGE | Sync | Async |
PERSISTENT | NON-PERSISTENT | |
---|---|---|
TRANSACTED | Sync | Sync |
DUPS_OK_ACKNOWLEDGE | Async | Async |
AUTO_ACKNOWLEDGE | Sync | Sync |
CLIENT_ACKNOWLEDGE | Sync | Sync |
The client-side driver looks for an optional resource file named ffmq3.properties in the classpath root.
If you need to override some default settings, you can create this file and add it to your classpath.
You can refer to net/timewalker/ffmq3/ffmq3.properties in ffmq-core.jar to
see available properties and their default values :
# Time to wait for an anwser from the server (seconds) transport.timeout=120 # Time to wait for a connection attempt (seconds) transport.tcp.connectTimeout=30 # SSL options transport.tcp.ssl.protocol=SSLv3 transport.tcp.ssl.ignoreCertificates=false # Indicate if a consumer should send message acknowledgements synchronously or not consumer.sendAcksAsync=true # Indicate if a producer should retry if a target queue is full producer.retryOnQueueFull=true # Indicate how much time a producer should wait if a target queue is full (milliseconds) # 0 = unlimited producer.retryTimeout=30000
Note that in most cases you won't have to change any of these default settings.
The FFMQ provider can be declared in spring like any other JMS client implementation.
Here is how you would define a JMS connection factory :
<bean id="JMSConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean"> <property name="jndiName" value="factory/ConnectionFactory"/> <property name="jndiEnvironment"> <props> <prop key="java.naming.factory.initial">net.timewalker.ffmq3.jndi.FFMQInitialContextFactory</prop> <prop key="java.naming.provider.url">tcp://localhost:10002</prop> </props> </property> </bean>
And here is how to obtain a a JMS destination bean from JNDI :
<bean id="myJMSQueue" class="org.springframework.jndi.JndiObjectFactoryBean"> <property name="jndiName" value="queue/myJMSQueue"/> <property name="jndiEnvironment"> <props> <prop key="java.naming.factory.initial">net.timewalker.ffmq3.jndi.FFMQInitialContextFactory</prop> </props> </property> </bean>
Then, you can declare standard spring JMS templates and listeners. See the spring documentation for details.
The FFMQ provider and destination can be declared as a resource reference in a Tomcat <Context>.
Here is an example definition :
<Resource name="jms/ConnectionFactory" type="net.timewalker.ffmq3.jndi.FFMQConnectionFactory" factory="net.timewalker.ffmq3.jndi.JNDIObjectFactory" providerURL="tcp://localhost:10002" userName="test" password="test" /> <Resource name="jms/myQueue" type="net.timewalker.ffmq3.common.destination.QueueRef" factory="net.timewalker.ffmq3.jndi.JNDIObjectFactory" queueName="MYQUEUE" /> <Resource name="jms/mytopic" type="net.timewalker.ffmq3.common.destination.TopicRef" factory="net.timewalker.ffmq3.jndi.JNDIObjectFactory" queueName="MYTOPIC" />
And here is how to retrieve thoses resources using the tomcat default InitialContext :
InitialContext init = new InitialContext(); // Retrieving the connection factory ConnectionFactory connectionFactory = (ConnectionFactory) init.lookup("java:comp/env/jms/ConnectionFactory"); // Retrieving the queue or topic reference Queue myQueue = (Queue)init.lookup("java:comp/env/jms/myQueue"); Topic myTopic = (Topic)init.lookup("java:comp/env/jms/myTopic");
- To start the server, use the ffmq-server.bat or ffmq-server.sh shell script in the server bin/ directory.
- To stop the server, use the ffmq-shutdown.bat or ffmq-shutdown.sh shell script in the server bin/ directory.
(If you want to remotely stop the server, you need to enable the administrative queues in the server configuration and use the ffmq-remote-shutdown.bat or ffmq-remote-shutdown.sh shell script)
Notes :
The persistent storage for a given destination is made of two or more files :
The required disk space for a queue is close to blockCount*(blockSize+13)+2*maxJournalSize
Note that messages are splitted over complete blocks, which means you can store between
blockCount / (maxMessageSize modulo blockSize) (worst case) and blockCount (best case)
in a given message store.
The client implementation can be accessed through JNDI like any other standard JMS implementation.
Here is the JNDI configuration to use :
Here is a sample source code if you need to do it yourself :
// Create and initialize a JNDI context Hashtable env = new Hashtable(); env.put(Context.INITIAL_CONTEXT_FACTORY, FFMQConstants.JNDI_CONTEXT_FACTORY); env.put(Context.PROVIDER_URL, "tcp://localhost:10002"); Context context = new InitialContext(env); // Lookup a connection factory in the context ConnectionFactory connFactory = (ConnectionFactory)context.lookup(FFMQConstants.JNDI_CONNECTION_FACTORY_NAME); // Obtain a JMS connection from the factory Connection conn = connFactory.createConnection(); conn.start(); ...
Note that to provide JMS 1.0.2 compliance, the following connection factories are also declared in the JNDI context :
The FFMQ server can be (remotely) administered through a command-line utility.
To execute this utility, use the ffmq-admin-client.bat or ffmq-admin-client.sh shell script in the server bin/ directory.
Executing the utility without command-line parameters will display usage information :
Command-line parameters ------------------------- -command: the command to execute (createQueue,createTopic,deleteQueue,deleteTopic,shutdown) -conf : path to a properties file (optional) All other variables should be passed as variable=value
When dealing with destinations you can pass all available properties (see 'Templates' above) with <propertyName>=<value>
Examples :
# Create queue FOO with a non-persistent message capacity of 1000 ffmq-admin-client -command createQueue name=FOO memoryStore.maxMessages=1000 # Delete topic BAR ffmq-admin-client -command deleteTopic name=BAR # Ask the server to shutdown ffmq-admin-client -command shutdown
The FFMQ server can be (remotely) monitored through JMX using generic tools or the provided command-line utility.
To execute this utility, use the ffmq-jmx-console.bat or ffmq-jmx-console.sh shell script in the server bin/ directory.
You can start one (or more) engine instance in a given ApplicationContext, using a provided helper bean :
net.timewalker.ffmq3.spring.FFMQServerBean
Here is a typical XML spring definition for this :
<bean id="FFMQServer" class="net.timewalker.ffmq3.spring.FFMQServerBean" init-method="start" destroy-method="stop"> <property name="engineName" value="myEngine"/> <property name="configLocation" value="../conf/ffmq-server.properties"/> </bean>
Notes :
A server instance can be created directly from Java code if needed.
Here is an example source code :
package net.timewalker.ffmq3.sample.embedded; import java.io.FileInputStream; import java.util.Properties; import net.timewalker.ffmq3.listeners.ClientListener; import net.timewalker.ffmq3.listeners.tcp.io.TcpListener; import net.timewalker.ffmq3.local.FFMQEngine; import net.timewalker.ffmq3.management.destination.definition.QueueDefinition; import net.timewalker.ffmq3.utils.Settings; /** * Embedded FFMQ sample */ public class EmbeddedFFMQSample implements Runnable { private FFMQEngine engine; /* * (non-Javadoc) * @see java.lang.Runnable#run() */ public void run() { try { // Create engine settings Settings settings = createEngineSettings(); // Create the engine itself engine = new FFMQEngine("myLocalEngineName", settings); // -> myLocalEngineName will be the engine name. // - It should be unique in a given JVM // - This is the name to be used by local clients to establish // an internal JVM connection (high performance) // Use the following URL for clients : vm://myLocalEngineName // // Deploy the engine System.out.println("Deploying engine : "+engine.getName()); engine.deploy(); // - The FFMQ engine is not functional until deployed. // - The deploy operation re-activates all persistent queues // and recovers them if the engine was not properly closed. // (May take some time for large queues) // Adding a TCP based client listener System.out.println("Starting listener ..."); ClientListener tcpListener = new TcpListener(engine,"0.0.0.0",10002,settings,null); tcpListener.start(); // This is how you can programmatically define a new queue if (!engine.getDestinationDefinitionProvider().hasQueueDefinition("foo")) { QueueDefinition queueDef = new QueueDefinition(); queueDef.setName("foo"); queueDef.setUseJournal(false); queueDef.setMaxNonPersistentMessages(1000); queueDef.check(); engine.createQueue(queueDef); } // You could also define a queue using some java Properties if (!engine.getDestinationDefinitionProvider().hasQueueDefinition("foo2")) { Properties queueProps = new Properties(); queueProps.put("name", "foo2"); queueProps.put("persistentStore.useJournal", "false"); queueProps.put("memoryStore.maxMessages", "1000"); QueueDefinition queueDef2 = new QueueDefinition(new Settings(queueProps)); engine.createQueue(queueDef2); } // Run for some time System.out.println("Running ..."); Thread.sleep(30*1000); // Stopping the listener System.out.println("Stopping listener ..."); tcpListener.stop(); // Undeploy the engine System.out.println("Undeploying engine ..."); engine.undeploy(); // - It is important to properly shutdown the engine // before stopping the JVM to make sure current transactions // are nicely completed and storages properly closed. System.out.println("Done."); } catch (Exception e) { // Oops e.printStackTrace(); } } private Settings createEngineSettings() { // Various ways of creating engine settings // 1 - From a properties file Properties externalProperties = new Properties(); try { FileInputStream in = new FileInputStream("../conf/ffmq-server.properties"); externalProperties.load(in); in.close(); } catch (Exception e) { throw new RuntimeException("Cannot load external properties",e); } Settings settings = new Settings(externalProperties); // 2 - Explicit Java code // Settings settings = new Settings(); // // settings.setStringProperty(FFMQCoreSettings.DESTINATION_DEFINITIONS_DIR, "."); // settings.setStringProperty(FFMQCoreSettings.BRIDGE_DEFINITIONS_DIR, "."); // settings.setStringProperty(FFMQCoreSettings.TEMPLATES_DIR, "."); // settings.setStringProperty(FFMQCoreSettings.DEFAULT_DATA_DIR, "."); // ... return settings; } /** * @param args */ public static void main(String[] args) { System.setProperty("FFMQ_BASE", ".."); new EmbeddedFFMQSample().run(); } }