Understanding Messaging Pattern with JMS


22 Jun 2020  Sergio Martin Rubio  10 mins read.

The message design pattern is very common nowadays in distributed systems to decouple applications into smaller components, and it brings better performance, increased reliability and asynchronous communication.

Types Of Messaging Techniques

There are two types of messaging techniques:

  • Point-to-Point model.
  • Publisher/Subscriber model

The differences are:

Feature Point-to-Point Publishes/Subscriber
Middleware Queue Topic
Timing No timing dependency Timing dependency
Consumers Single consumer Multiple consumers
Mechanism Pull Push
Persistence Yes No
Ordered Messages are consumed in order Messages are NOT consumed in order
Destination Known Unknown
Point-to-Point Diagram
Point-to-Point Diagram

In a point-to-point model messages are usually stored in a staging area where they are waiting to be consumed by a single listener. Consumers use a pull mechanism to get the messages from the queue in order, so the listener does not need to be online all the time to receive or read messages from the queue. This model is usually used to decouple two applications and allow asynchronous processing.

Publisher/Subscriber Diagram
Publisher/Subscriber Diagram

On the other hand, a publisher/subscriber model does not store messages, and as a result if no consumers are available, the published message is lost, so it requires that the consumer is present at the time the message is ready to be delivered, unless it has durable subscription for inactive consumers. This model allows multiple clients to subscribe to a topic, but there is no guarantee that the messages are delivered in order. This technique is usually used in a fan-out strategy when we want to send a message to multiple applications.

JMS (Java Message Service)

JMS allows Java applications to communicate with messaging systems through a set of interfaces. JMS supports both messaging model, point-to-point and publisher/subscriber.

JMS Model

The JMS model consists of:

  • Provider: JMS system that implements the JMS specification.
  • Clients: Java applications that send and receive messages.
  • Administered objects: Preconfigured JMS objects that are created by an administrator for the use of JMS clients.
  • Messages: objects that are sent are received and contain header, properties and body:
    • Header: it’s mandatory and contains some metadata like priority, correlation ID, expiration or destination.
    • Properties: they are optional and you can set attributes by adding the prefix .jmx.
    • Body: the body is also optional and contains the data we want to exchange. JMS defines six different message types: Message, StreamMessage, MapMessage, TextMessage, ObjectMessage, BytesMessage.

JMS Architecture

The steps for producing and consuming messages are:

  1. Create a connection through the ConnectionFactory
  2. Create a session
  3. Create Message
  4. Use a producer or consumer to send message to destination or to receive message from destination

Java Implementation with ActiveMQ

Before getting started you need to spin up a standalone ActiveMQ server:

  1. Download ActiveMQ
  2. Unzip the file and go to apache-activemq-<version>/bin
  3. Execute ./activemq start

Now you can access the ActiveMQ dashboard: http://localhost:8161/admin/

You can stop the ActiveMQ Server at any time by executing ./activemq stop.

Also, assuming you have a Maven project you will need to add the following dependencies.

<dependency>
    <groupId>javax.jms</groupId>
    <artifactId>javax.jms-api</artifactId>
    <version>2.0.1</version>
</dependency>
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.13</version>
</dependency>

javax.jms-api is required to start a new context and perform naming operations. activemq-all includes all the dependencies required to talk to ActiveMQ.

Point to Point Implementation

You can create a producer as follows:



public class Producer {

    private static final Logger LOGGER = Logger.getLogger(Producer.class.getName());

    public static void main(String[] args) throws JMSException, NamingException {
        // log4j configuration
        BasicConfigurator.configure();

        // Obtain a JNDI connection
        Properties properties = new Properties();
        properties.setProperty(INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
        properties.setProperty(PROVIDER_URL, DEFAULT_BROKER_URL);
        properties.setProperty("queue.MyQueue", "example.MyQueue");
        InitialContext jndi = new InitialContext(properties);

        // Look up a JMS connection factory
        ConnectionFactory connectionFactory = (ConnectionFactory) jndi.lookup("ConnectionFactory");

        try (Connection connection = connectionFactory.createConnection()) {
            connection.start();

            // Create session to send a receive messages. Set the first parameter to true
            // if you want to allow transactions
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            Destination destination = (Destination) jndi.lookup("MyQueue");

            // To send messages
            MessageProducer producer = session.createProducer(destination);
            TextMessage message = session.createTextMessage("Hello World!");
            producer.send(message);
            LOGGER.info("Message " + message.getText() + " was sent!");
        }
    }
}

The default ActiveMQ broker URL is tcp://localhost:61616.

The context properties can be also define in resources/jndi.properties.

java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url = tcp://localhost:61616
queue.MyQueue = example.MyQueue
topic.MyTopic = example.MyTopic

and the consumer will be:

public class Consumer {

    private static final Logger LOGGER = Logger.getLogger(Producer.class.getName());

    public static void main(String[] args) throws JMSException {
        // log4j configuration
        BasicConfigurator.configure();

        // Getting JMS connection from the server
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);

        try (Connection connection = connectionFactory.createConnection()) {
            connection.start();

            // Create session for receiving messages
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            // Getting the queue
            Destination destination = session.createQueue("example.MyQueue");
            MessageConsumer consumer = session.createConsumer(destination);
            // This blocks indefinitely until a message is produced
            Message message = consumer.receive();

            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                LOGGER.info("Receive message: " + textMessage.getText());
            }
        }

    }
}

Alternatively, you can use ActiveMQConnectionFactory instead of manually creating the context as you can see on the consumer.

Publisher Subscriber Implementation

You can create a publisher as follows:

public class Publisher {

    private static final Logger LOGGER = Logger.getLogger(Producer.class.getName());

    public static void main(String[] args) throws NamingException, JMSException {
        // log4j configuration
        BasicConfigurator.configure();

        // Obtain a JNDI connection
        InitialContext jndi = new InitialContext();

        // Look up a JMS connection factory
        TopicConnectionFactory connectionFactory = (TopicConnectionFactory) jndi.lookup("TopicConnectionFactory");

        // Create a JMS connection and start the JMS connection; allows messages to be received
        TopicConnection connection = connectionFactory.createTopicConnection();
        connection.start();

        // Create JMS session publisher
        TopicSession publisherSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);

        // Look up a JMS topic
        Topic topic = (Topic) jndi.lookup("MyTopic");

        // Create JMS publisher
        TopicPublisher publisher = publisherSession.createPublisher(topic);

        // Create and send message using topic publisher
        TextMessage message = publisherSession.createTextMessage("How are you my friend?");
        publisher.publish(message);
        LOGGER.info("Message " + message.getText() + " was published to topic " + topic.getTopicName());
    }
}

and a subscriber like this:

public class Subscriber {

    public static void main(String[] args) throws NamingException, JMSException {
        // log4j configuration
        BasicConfigurator.configure();

        // Obtain a JNDI connection
        InitialContext jndi = new InitialContext();

        // Look up a JMS connection factory
        TopicConnectionFactory connectionFactory = (TopicConnectionFactory) jndi.lookup("TopicConnectionFactory");

        // Create a JMS connection and start the JMS connection; allows messages to be delivered
        TopicConnection connection = connectionFactory.createTopicConnection();
        connection.start();

        // Create JMS session publisher
        TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);

        // Look up a JMS topic
        Topic topic = (Topic) jndi.lookup("MyTopic");

        // Create a JMS subscriber
        TopicSubscriber subscriber = subscriberSession.createSubscriber(topic);

        // Set a JMS message listener
        subscriber.setMessageListener(new MyListener());
    }

}

You also need to define a listener implementing MessageListener interface and provide an override onMessage method, where we can process the message received. Every time a new message is published to the topic the subscriber is listening to, the onMessage method is called.

public class MyListener implements MessageListener {

    private static final Logger LOGGER = Logger.getLogger(Producer.class.getName());

    @Override
    public void onMessage(Message message) {
        try {
            TextMessage textMessage = (TextMessage) message;
            LOGGER.info("Receive message: " + textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

Examples

Image by Daria Nepriakhina from Pixabay