Wednesday, June 1, 2011

Creating and consuming a EJB 3 Message Driven Bean Part I

"Finally, The Rock has come back to...." More like finally, I am going to write an article (albeit two parts article) about programming, if you don't regard SQL as programming. In the first part I am going to write about how to send message to a messaging server including creating a new JMS resource. I used glassfish v2.1 for this. You can use do the same on your preferred application server. In the next part I will write about consuming the message via the MDB. So let's begin. I have created a checklist of functions we need to perform in order deploy our MDB and consume it.

Checklist:


  1. Create a new JMS connection factory to allow creation of JMS objects in the application server. (Messaging server)

  2. Create a new JMS Destination that will be repository for messages sent. (Messaging server)

  3. Use/Create a  session bean to send the message. (Producer Application)

  4. Create our MessageDrivenBean to consume it. (Consumer Application)


Create a new JMS connection factory to allow creation of JMS objects in the application server.
Before we start with creating our MDB and consumer, the first thing I suggest doing is to create a new JMS connection factory in your application server. Think of JMS connection factory as JDBC connection pool. Your JDBC connection pool creates a pool of connection and whenever your application makes a call to get a new connection object, the pool serves it. When you're done the connection object is returned to the pool and if in active and not invalid state, it can serve another connection request. Using pool is just a lot faster and the application server is responsible for managing it. Of course you are responsible for closing the connection so that it can be returned to the pool. We have the option of using javax.jms.TopicConnectionFactory, javax.jms.QueueConnectionFactory or simply javax.jms.ConnectionFactory. Even though we're going to be consuming queues (why? I will explain the difference between a topic an queue) staying true to Abstract Factory Pattern, we will use the inteface ConnectionFactory. Have a look at the image below. I've created a new JMS Connection Factory with JNDI name jms/InvoiceQueueFactory. I've left the Pool Settings to default AS settings.

[caption id="attachment_64" align="alignnone" width="300" caption="JMS Connection Factory"]JMS Connection Factory[/caption]

Create a new JMS Destination that will be repository for messages sent
Now that you've created the Connection Factory we are ready to create a JMS Destination. JMS Destination is where the messages that are sent from the MDB are stored. There are two types of destinations

  • Queue - For point-to-point communication.

  • Topic - For publish-subscribe communication.


What messaging paradigm you want to use is dependent on what your business model is. In the sample MDB, whenever the inventory level of an item in an ABC store reaches below a certain threshold we are going to send an order request  to an XYZ warehouse for the item.

Let's say an instance of ItemOrder class/entity in the Warehouse client app has the following properties

  • int barcode;

  • int totalItemsToOrder;

  • String companyID;


And an instance of Item class/entity in the ABC producer app has the following fields

  • int barcode;

  • String name;

  • double price;

  • int minQuantity;

  • int totalItemsInStock;


Now what happens is when a shopper buys an item with barcode 3884994 from the shop, the totalItemsInStock drops below the minQuantity. What we want to do now is to send a message to the messaging server so that at a later time (may be 5 secs from now or 2 days from now) the Warehouse Application consumes this and sends an packaging and shipping order to its distribution vendor. What is important here is that one and only one message needs to be sent to warehouse. Otherwise, they'd end up sending more than what you need for your store. We would also like to make the message as durable. But this is a configuration in the MDB itself. Later I will discuss this. Anyways, the point is, we need to send a point-to-point message with specific  item barcode and our company ID (their system requirement, apparently) and make sure that message stays there until it is consumed (Hopefully the message does not expire).

You'd use Publish-subscribe when this is not a requirement. Generally, there is a one to many relationship between publisher and subscriber. This won't make sense in our scenario here. So Queue it is!

Creating this is easy. Just use another jndi name like I've done below and specify the type as Queue.

[caption id="attachment_65" align="alignnone" width="300" caption="JMS_Destination_Resource"]JMS_Destination_Resource[/caption]

Use/Create a  session bean to send the message. (Producer Application)
As discussed above, whenever the inventory level drops to below the threshold for that Item, we need to send a message to the Warehouse system via our messaging server to request new orders. What we want to send is our ItemOrder object. I think the valid types are TextMessage, BytesMessage, StreamMessage, ObjectMessage, MapMessage. Please check the API for more info on this. As you may have guessed it, we're going to use ObjectMessage. The caveat here is that you need to have a similar object in the Warehouse system, or at least one with the properties in ItemOrder object. You get the point.

So without further ado, below is a stateless bean that sends a request to the messaging server when the invoice is saved (saveInvoice), if the inventory level drops below the min threshold.

import com.store.entities.Customer;
import com.store.entities.Invoice;
import com.store.entities.Item;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Resource;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.jms.JMSException;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import javax.ejb.Remove;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;

@Stateless
public class InvoiceBean implements InvoiceRemote {

    private Invoice invoice = new Invoice();
    @EJB
    private ItemRemote itemService;
    @EJB
    private CustomerRemote customerService;
    @PersistenceContext
    EntityManager em;
    @Resource(name = "jms/InvoiceQueueFactory")
    private ConnectionFactory connectionFactory;
    @Resource(name = "jms/InvoiceQueue")
    private Destination destination;
    private int cartTotal;

    public void addItem(int barcode) {
        Item it = new Item();
        it = (Item) itemService.findItem(barcode);
        if (it.getQuantity() < 1) {
            System.out.println("No item available..........");
        } else {
            it.setQuantity(it.getQuantity() - 1);
            it = (Item) itemService.updateItem(it.getId(), it.getName(), it.getQuantity(), it.getPrice(), it.getBarcode(),
                    it.getMinQuantity(), it.getImage(), it.getItemsToOrder(), it.getShippingCost());
            if(getInvoice().getItems()==null){
                List<Item> items = new ArrayList<Item>();
                items.add(it);
                getInvoice().setItems(items);
                this.setCartTotal(1);
            }else{
                getInvoice().getItems().add(it);
                this.setCartTotal(this.getCartTotal()+1);
            }
            getInvoice().setTotalCost(it.getPrice()+getInvoice().getTotalCost());
        }

    }

    public void removeItem(int barcode) {
        Item it = itemService.findItem(barcode);
        it.setQuantity(it.getQuantity() + 1);
        itemService.updateItem(it.getId(), it.getName(), it.getQuantity(), it.getPrice(), it.getBarcode(),
                it.getMinQuantity(), it.getImage(), it.getItemsToOrder(), it.getShippingCost());
        getInvoice().getItems().remove(it);
        getInvoice().setTotalCost(getInvoice().getTotalCost()- it.getPrice());
        this.setCartTotal(this.getCartTotal()-1);
    }

    @Remove
    public void saveInvoice() {
        em.persist(getInvoice());
        for (Item i : getInvoice().getItems()) {
            Item it = new Item();
            it = itemService.findItem(i.getBarcode());
            if (it.getQuantity() <= it.getMinQuantity()) {

                try {
                    Connection connection = connectionFactory.createConnection();
                    Session session = connection.createSession(true,
                            Session.AUTO_ACKNOWLEDGE);
                    MessageProducer producer = session.createProducer(destination);
                    ObjectMessage message = session.createObjectMessage();
                    ArrayList list = new ArrayList();
                    list.add(it.getBarcode());
                    list.add(it.getItemsToOrder());
                    message.setObject(list);
                    producer.send(message);
                    session.close();
                    connection.close();
                } catch (JMSException ex) {
                    Logger.getLogger(InvoiceBean.class.getName()).log(Level.SEVERE, null, ex);
                }
            }
        }
    }

    @Remove
    public void cancelInvoice() {
        this.setCartTotal(0);
        for (Item i : getInvoice().getItems()) {
            Item it = itemService.findItem(i.getBarcode());
            it.setQuantity(it.getQuantity() + 1);
            itemService.updateItem(it.getId(), it.getName(), it.getQuantity(), it.getPrice(),
                    it.getBarcode(), it.getMinQuantity(), it.getImage(), it.getItemsToOrder(), it.getShippingCost());
        }
        setInvoice(new Invoice());
        setCartTotal(0);
    }

    public void addCustomer(int customerID) {

        Customer cust = new Customer();
        cust = (Customer) customerService.findCustomer(customerID);
        System.out.println(cust.getEmail());
        getInvoice().setCustomer(cust);
    }

    /**
     * @return the invoice
     */
    public Invoice getInvoice() {
        return invoice;
    }

    /**
     * @param invoice the invoice to set
     */
    public void setInvoice(Invoice invoice) {
        this.invoice = invoice;
    }

    /**
     * @return the cartTotal
     */
    public int getCartTotal() {
        return cartTotal;
    }

    /**
     * @param cartTotal the cartTotal to set
     */
    public void setCartTotal(int cartTotal) {
        this.cartTotal = cartTotal;
    }
}


As you can see the stateless bean performs its other business methods and when it finally (@Remove to remove its instance and return it to the pool of stateless beans)  savesInvoice, it sends a message to the messaging server, only if the inventory level drops below the minQuantity.

So this takes care of part I. In Part II I will write the Warehouse's MDB.


No comments:

Post a Comment