Showing posts with label Message Driven Bean. Show all posts
Showing posts with label Message Driven Bean. Show all posts

Wednesday, June 1, 2011

Creating and consuming a EJB 3 Message Driven Bean Part II

In part I we did all the dirty work - setup JMS connection factory and JMS destination in the application server and then created a stateless session bean that sends message to our XYZ Warehouse application. One thing I forgot to mention is that you'd probably be asking "where is the messaging server?". You might have heard of Active MQ or Webshpere MQ. We're using messaging server included within the AS. So then let's get to creating our MDB.

Create our Message Driven Bean
MessageDrivenBean is just like our stateless bean in the sense it does not retain state. For you and I this means, do not write any code like transaction related code. In the event of any exception you will not be able to recover from this. Little bit confused? Simply understand that it is not your shopper who will have started this transaction for the exception to be returned to the client to handle it. Maybe I should have an article on Transactions. :)

The first thing to do is Annotate the Class with @MessageDriven which tells the AS that this bean is an MDB. You will also have to specify that you want to use the destination "jms/InvoiceQueue". Refer to the source below.  You can then implement MessageListener interface and when you do that you implement void onMessage(Message message) method. Basically, this method encapsulates your business logic or what type of and what message to consume. As with any EJB, you will need an associated EntityManager context to do your CRUDing. But that really is not where I want to focus. What you should get from the code below is that we know we are consuming an ObjectMessage that was sent originally from the producer app. And I'd pointed out that we need an object similar to or at least capable of storing the properties of the Object (Item). We have ItemOrder in this client app to do so.

import com.factory.entities.ItemOrder;
import java.util.ArrayList;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;

@MessageDriven(mappedName = "jms/InvoiceQueue", activationConfig = {
    @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"),
    @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue")
})
public class InvoiceMessageBean implements MessageListener {

    @PersistenceContext
    EntityManager em;

    public InvoiceMessageBean() {
    }

    public void onMessage(Message message) {
        try {            
            ObjectMessage objectMessage = (ObjectMessage) message;
            ArrayList list = (ArrayList)objectMessage.getObject();
            ItemOrder itemOrder = new ItemOrder();
            itemOrder.setBarcode(Integer.parseInt(list.get(0).toString()));
            itemOrder.setTotalItemsToOrder(Integer.parseInt(list.get(1).toString()));
            itemOrder.setCompanyID(19284);
            em.persist(itemOrder);
        } catch (JMSException jmse) {
            jmse.printStackTrace();
        }
    }
}


So that's it. You're now ready to create and consume your EJB 3 MDB. You should be able to appreciate all the asynchronous nature of MDB! Enterprise JAVA rules!

Creating and consuming a EJB 3 Message Driven Bean Part I

"Finally, The Rock has come back to...." More like finally, I am going to write an article (albeit two parts article) about programming, if you don't regard SQL as programming. In the first part I am going to write about how to send message to a messaging server including creating a new JMS resource. I used glassfish v2.1 for this. You can use do the same on your preferred application server. In the next part I will write about consuming the message via the MDB. So let's begin. I have created a checklist of functions we need to perform in order deploy our MDB and consume it.

Checklist:


  1. Create a new JMS connection factory to allow creation of JMS objects in the application server. (Messaging server)

  2. Create a new JMS Destination that will be repository for messages sent. (Messaging server)

  3. Use/Create a  session bean to send the message. (Producer Application)

  4. Create our MessageDrivenBean to consume it. (Consumer Application)


Create a new JMS connection factory to allow creation of JMS objects in the application server.
Before we start with creating our MDB and consumer, the first thing I suggest doing is to create a new JMS connection factory in your application server. Think of JMS connection factory as JDBC connection pool. Your JDBC connection pool creates a pool of connection and whenever your application makes a call to get a new connection object, the pool serves it. When you're done the connection object is returned to the pool and if in active and not invalid state, it can serve another connection request. Using pool is just a lot faster and the application server is responsible for managing it. Of course you are responsible for closing the connection so that it can be returned to the pool. We have the option of using javax.jms.TopicConnectionFactory, javax.jms.QueueConnectionFactory or simply javax.jms.ConnectionFactory. Even though we're going to be consuming queues (why? I will explain the difference between a topic an queue) staying true to Abstract Factory Pattern, we will use the inteface ConnectionFactory. Have a look at the image below. I've created a new JMS Connection Factory with JNDI name jms/InvoiceQueueFactory. I've left the Pool Settings to default AS settings.

[caption id="attachment_64" align="alignnone" width="300" caption="JMS Connection Factory"]JMS Connection Factory[/caption]

Create a new JMS Destination that will be repository for messages sent
Now that you've created the Connection Factory we are ready to create a JMS Destination. JMS Destination is where the messages that are sent from the MDB are stored. There are two types of destinations

  • Queue - For point-to-point communication.

  • Topic - For publish-subscribe communication.


What messaging paradigm you want to use is dependent on what your business model is. In the sample MDB, whenever the inventory level of an item in an ABC store reaches below a certain threshold we are going to send an order request  to an XYZ warehouse for the item.

Let's say an instance of ItemOrder class/entity in the Warehouse client app has the following properties

  • int barcode;

  • int totalItemsToOrder;

  • String companyID;


And an instance of Item class/entity in the ABC producer app has the following fields

  • int barcode;

  • String name;

  • double price;

  • int minQuantity;

  • int totalItemsInStock;


Now what happens is when a shopper buys an item with barcode 3884994 from the shop, the totalItemsInStock drops below the minQuantity. What we want to do now is to send a message to the messaging server so that at a later time (may be 5 secs from now or 2 days from now) the Warehouse Application consumes this and sends an packaging and shipping order to its distribution vendor. What is important here is that one and only one message needs to be sent to warehouse. Otherwise, they'd end up sending more than what you need for your store. We would also like to make the message as durable. But this is a configuration in the MDB itself. Later I will discuss this. Anyways, the point is, we need to send a point-to-point message with specific  item barcode and our company ID (their system requirement, apparently) and make sure that message stays there until it is consumed (Hopefully the message does not expire).

You'd use Publish-subscribe when this is not a requirement. Generally, there is a one to many relationship between publisher and subscriber. This won't make sense in our scenario here. So Queue it is!

Creating this is easy. Just use another jndi name like I've done below and specify the type as Queue.

[caption id="attachment_65" align="alignnone" width="300" caption="JMS_Destination_Resource"]JMS_Destination_Resource[/caption]

Use/Create a  session bean to send the message. (Producer Application)
As discussed above, whenever the inventory level drops to below the threshold for that Item, we need to send a message to the Warehouse system via our messaging server to request new orders. What we want to send is our ItemOrder object. I think the valid types are TextMessage, BytesMessage, StreamMessage, ObjectMessage, MapMessage. Please check the API for more info on this. As you may have guessed it, we're going to use ObjectMessage. The caveat here is that you need to have a similar object in the Warehouse system, or at least one with the properties in ItemOrder object. You get the point.

So without further ado, below is a stateless bean that sends a request to the messaging server when the invoice is saved (saveInvoice), if the inventory level drops below the min threshold.

import com.store.entities.Customer;
import com.store.entities.Invoice;
import com.store.entities.Item;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Resource;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.jms.JMSException;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import javax.ejb.Remove;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;

@Stateless
public class InvoiceBean implements InvoiceRemote {

    private Invoice invoice = new Invoice();
    @EJB
    private ItemRemote itemService;
    @EJB
    private CustomerRemote customerService;
    @PersistenceContext
    EntityManager em;
    @Resource(name = "jms/InvoiceQueueFactory")
    private ConnectionFactory connectionFactory;
    @Resource(name = "jms/InvoiceQueue")
    private Destination destination;
    private int cartTotal;

    public void addItem(int barcode) {
        Item it = new Item();
        it = (Item) itemService.findItem(barcode);
        if (it.getQuantity() < 1) {
            System.out.println("No item available..........");
        } else {
            it.setQuantity(it.getQuantity() - 1);
            it = (Item) itemService.updateItem(it.getId(), it.getName(), it.getQuantity(), it.getPrice(), it.getBarcode(),
                    it.getMinQuantity(), it.getImage(), it.getItemsToOrder(), it.getShippingCost());
            if(getInvoice().getItems()==null){
                List<Item> items = new ArrayList<Item>();
                items.add(it);
                getInvoice().setItems(items);
                this.setCartTotal(1);
            }else{
                getInvoice().getItems().add(it);
                this.setCartTotal(this.getCartTotal()+1);
            }
            getInvoice().setTotalCost(it.getPrice()+getInvoice().getTotalCost());
        }

    }

    public void removeItem(int barcode) {
        Item it = itemService.findItem(barcode);
        it.setQuantity(it.getQuantity() + 1);
        itemService.updateItem(it.getId(), it.getName(), it.getQuantity(), it.getPrice(), it.getBarcode(),
                it.getMinQuantity(), it.getImage(), it.getItemsToOrder(), it.getShippingCost());
        getInvoice().getItems().remove(it);
        getInvoice().setTotalCost(getInvoice().getTotalCost()- it.getPrice());
        this.setCartTotal(this.getCartTotal()-1);
    }

    @Remove
    public void saveInvoice() {
        em.persist(getInvoice());
        for (Item i : getInvoice().getItems()) {
            Item it = new Item();
            it = itemService.findItem(i.getBarcode());
            if (it.getQuantity() <= it.getMinQuantity()) {

                try {
                    Connection connection = connectionFactory.createConnection();
                    Session session = connection.createSession(true,
                            Session.AUTO_ACKNOWLEDGE);
                    MessageProducer producer = session.createProducer(destination);
                    ObjectMessage message = session.createObjectMessage();
                    ArrayList list = new ArrayList();
                    list.add(it.getBarcode());
                    list.add(it.getItemsToOrder());
                    message.setObject(list);
                    producer.send(message);
                    session.close();
                    connection.close();
                } catch (JMSException ex) {
                    Logger.getLogger(InvoiceBean.class.getName()).log(Level.SEVERE, null, ex);
                }
            }
        }
    }

    @Remove
    public void cancelInvoice() {
        this.setCartTotal(0);
        for (Item i : getInvoice().getItems()) {
            Item it = itemService.findItem(i.getBarcode());
            it.setQuantity(it.getQuantity() + 1);
            itemService.updateItem(it.getId(), it.getName(), it.getQuantity(), it.getPrice(),
                    it.getBarcode(), it.getMinQuantity(), it.getImage(), it.getItemsToOrder(), it.getShippingCost());
        }
        setInvoice(new Invoice());
        setCartTotal(0);
    }

    public void addCustomer(int customerID) {

        Customer cust = new Customer();
        cust = (Customer) customerService.findCustomer(customerID);
        System.out.println(cust.getEmail());
        getInvoice().setCustomer(cust);
    }

    /**
     * @return the invoice
     */
    public Invoice getInvoice() {
        return invoice;
    }

    /**
     * @param invoice the invoice to set
     */
    public void setInvoice(Invoice invoice) {
        this.invoice = invoice;
    }

    /**
     * @return the cartTotal
     */
    public int getCartTotal() {
        return cartTotal;
    }

    /**
     * @param cartTotal the cartTotal to set
     */
    public void setCartTotal(int cartTotal) {
        this.cartTotal = cartTotal;
    }
}


As you can see the stateless bean performs its other business methods and when it finally (@Remove to remove its instance and return it to the pool of stateless beans)  savesInvoice, it sends a message to the messaging server, only if the inventory level drops below the minQuantity.

So this takes care of part I. In Part II I will write the Warehouse's MDB.