PRODUCTS AND SERVICES INDUSTRIES SUPPORT PARTNERS COMMUNITIES ABOUT
  The Coherence Incubator
  Messaging Pattern 2.3.0
compared with
Current by Rob Misek
on Aug 12, 2009 10:21.


 
Key
These lines were removed. This word was removed.
These lines were added. This word was added.

View page history


There are 1 changes. View first change.

  {note}This documentation applies to the Messaging Pattern 2.3.0. The latest Coherence Common documentation is available [here|Messaging Pattern].{note}
  {note}This documentation applies to the Messaging Pattern 2.3.0. The latest Messaging Pattern documentation is available [here|Messaging Pattern].{note}
  
 {section}
 {column:width=50%}
 h3. The Messaging Pattern
 This is an {color:green}*example implementation*{color} of store and forward messaging built on top of Coherence.
  
 *The Messaging Pattern advocates that;*
 # _Payload_, typically represented as a _Message_ object, may be sent to a _Destination_ from a _Sender_ (also commonly known as a _Publisher_).
 # It is the responsibility of the infrastructure managing the _Destination_ to ensure that _Messages_ (arriving at the said _Destination_) are then stored (in some manner) and consequently forwarded (in the order in which they arrived at the said _Destination_) to one or more _Receivers_ (also commonly known as _Subscribers_).
 # The _Subscribers_ appropriately _consume_ (receive and acknowledge receipt) of the said _Messages_ from the _Destination_ in the order in which they were forwarded to the said _Subscribers_.
 # The infrastructure managing the _Messages_ appropriately clean-up (remove and garbage collect) the said _Messages_ that have been consumed by _Subscribers_.
 # The type of the _Destination_ determines the method of delivery to the _Subscribers_ on that _Destination_.
 # A _Topic Destination_ (or _Topic_) will store and forward _Messages_ to all of the _Subscribers_ of the said _Topic Destination_. This form of _Message_ delivery is often called "publish-and-subscribe messaging", "one-to-many messaging" or "the observer pattern".
 # A _Queue Destination_ (or _Queue_) will store and forward _Messages_ to at most one of the _Subscribers_ of the said _Queue Destination_. For each _Message_ a different _Subscriber_ may be used, but this is implementation and runtime dependent. This form of _Message_ delivery is often called "point-to-point messaging" or "one-to-one messaging".
 # A _Message_ may be _Persistent_ or _Non-Persistent_. In the case of _Persistent Messages_, the infrastructure managing the _Destination_ must safely store the said _Messages_ to a persistent (and recoverable) storage device so that in the case of infrastructure failure, _Messages_ may be recovered (not lost).
 # A _Subscriber_ to a _Topic_ is either _Durable_ or _Non-Durable_. _Durable Subscriptions_ allow the system implementing the _Subscriber_ to terminate and return without losing _Messages_ that may have been delivered during the outage (or disconnection).
  
 {column}
  
 {column:width=50%}
 h3. Outline
 || *Release Name:* | *{color:red}Version 2.3.0: March 30th, 2009{color}* |
 || *Target Platforms:* | Java Standard Edition 5+ |
 || *Requires Coherence Version:* | 3.3.1+ or 3.4.+ |
 || *Dependencies:* | [Coherence Common 1.3.0] \\ [Command Pattern 2.3.0] |
 || *Download:* | [^coherence-messagingpattern-2.3.0.jar] \\MD5:752d139830c8d42e6066b49eb1a45f0e |
 || *Source Code and Documentation:* | [^coherence-messagingpattern-2.3.0-src.zip] \\MD5:de0b1171eb6d3e4d4802e819d6cd8e6c1 |
 || *Previous Releases:* | [Messaging Pattern 2.2.0] \\ [Messaging Pattern 2.1.1] |
  
 h3. Dependencies
 This project (like other Coherence Incubator projects) uses [Apache Ivy|http://ant.apache.org/ivy/] for dependency specification and management. While a standard {{ivy.xml}} definition file ships with the source and documentation distribution, the following diagram visually indicates the current dependencies.
  
 {center}
 !Incubator-Dependencies-Messaging.png!
 {center}
 {column}
 {section}
  
 h3. Rationale
 While it's rare that an architecture making extensive use of Coherence will require Store and Forward Messaging (due to the ability to use MapListeners, Continuous Queries and events for notifications), there are arguably some circumstances (when clients may be disconnected) where the pattern is particularly useful.
  
 Although providing an implementation of store and forward Messaging on-top-of-Coherence has always been possible (as demonstrated by this implementation), most application-level requirements for messaging are often satisfied using existing, off-the-shelf and standardized corporate messaging infrastructure, like JMS. In the cases where such infrastructure is not available or not appropriate, this implementation may provide some benefits.
  
 While it is not the intention nor is it the purpose of this implementation to replace existing messaging infrastructure, this implementation is specifically designed to provide a flexible framework for application-specific, high-performance messaging on a Data Grid.
  
 More specifically, this implementation has been designed as a {color:green}minimal framework{color} to support multi-directional and multi-point {color:gray}(no single point of failure){color} push replication between multiple Coherence clusters that are deployed and interconnected by high-latency, high-bandwidth but unreliable wide-area-networks (WANs)... aka: The [Push Replication Pattern].
  
 h3. Benefits of Datagrid-based Hub-less Messaging
 The implementation of the Store and Forward Messaging Pattern on Oracle Coherence provides many unique advantages over traditional hub-and-spoke messaging solutions.
  
 * The messaging infrastructure becomes part of and shares the same Coherence Datagrid infrastructure (by default) as your application. Consequently the requirement for separately installed and maintained "messaging servers" is not required.
  
 * Being "part" of your application, there is no reason to wrap application objects as Messages (unlike JMS). You can simply publish and consume your application and domain objects.
  
 * As all stateful objects in the pattern are managed using standard distributed caches (including Topics, Queues, Subscriptions, and Messages themselves), you may use standard Oracle Coherence features to monitor, manage, query and observe the state of the messaging infrastructure.
  
 * Given that the source code is publicly available, you are free to extend and enhance the messaging solution to suit your needs.
  
 * Being based on Oracle Coherence, you can rest assured that the solution will provide highly available, scalable (scale-out) and resilient messaging infrastructure.
  
 h3. What's New?
 Significant effort has been made in this release to bring the Messaging Pattern, the internal and external terminology and the API inline with commonly used messaging infrastructure - like JMS. This includes adding support for;
  
 * Queues Destinations (one to one messaging semantics).
  
 * Subscribers for Topics and Queues.
  
 * Durable Subscribers to Topics.
  
 * Transaction support for Subscribers, including auto and manual commit modes, commit and rollback.
  
 While every effort has been made to ensure compatibility with the previous API, it is likely that some applications based on the previous release(s) of this pattern will be broken when you upgrade to this release. However, the new API will dramatically simplify the said applications and provide greater functionality and flexibility.
  
 h3. Implementation Discussion
  
 {tip:title=Before you start}
 The Store and Forward Messaging pattern makes extensive use of the [Command Pattern] for managing Destinations and Message delivery. In fact the entire messaging infrastructure is modeled as a set of {{Commands}} and a few {{EntryProcessors}}. Consequently it is advised that you make yourself familiar with the [Command Pattern] (and its implementation) to understand the implementation of this pattern. Note however that it's know how this pattern is implemented does not preclude you from using it.
 {tip}
  
 Internally all of the standard messaging concepts, like {{Destinations}} ({{Topics}} and {{Queues}}), {{Subscriptions}} and {{Messages}}, with the exception of Subscribers, are represented as objects placed in one or more Coherence managed distributed caches. {{Destinations}} themselves are actually represented as [Command Pattern] {{Contexts}}, which themselves are managed in the "contexts" distributed cache. Almost all messaging interactions, for example creating subscriptions for a {{Destination}}, publishing messages to a {{Destination}}, are modeled as [Command Pattern] {{Commands}} submitted (or Entry Processors) to the said {{Destinations}} for execution. The major consequences and benefits of this approach are that the implementation;
  
 # Is naturally geared for accepting as many messages as physically possible (until the network capacity is exceeded or you run out of storage), off-loading the actual delivery to another thread (a {{CommandExecutor}})
 # Is completely JMX enabled and monitorable. Simply enable Coherence JMX monitoring and the {{Contexts}} representing the {{Destinations}} will appear in the JMX-tree.
 # Message delivery is guaranteed to occur in-order (due to the underlying use of the [Command Pattern] for executing {{Commands}}).
 # Server-loss (i.e.: cluster member loss) does not effect the availability of the messaging infrastructure (as it's built using Coherence distributed caching).
  
 The most significant interface (from an application-level perspective) is the {{MessagingSession}}. Implementations of this interface, namely the {{DefaultMessagingSession}}, are how you control the messaging infrastructure, including;
 # Creating {{Topic}} and {{Queue}} {{Destinations}}
 # Subscribing and Unsubscribing to {{Destinations}} (creating {{Subscribers}})
 # Publishing _Payload_ to {{Destinations}}, and
 # Subscribing and reading (ie: consuming) _Payload_ from {{Subscriptions}}
  
 The following section steps you through a simple example demonstrating how to publish and consume a message from a {{Topic}}.
  
 h3. Using Topics
 The following examples outline the use of Topics (one to many messaging), where each published message is delivered to all subscribers.
  
 !Incubator-Messaging-One-To-Many.png!
  
 *Step 1: Creating a {{MessagingSession}}*
 Once you've installed and appropriately configured the implementation (see below), the first thing you need to do is create a {{MessagingSession}}. This simplest way to achieve this is to use the statically defined {{DefaultMessagingSession.getInstance()}} method. {color:gray}(Alternatively you may construct your own instance by providing {{ContextManager}} and {{CommandSubmitter}} instances to the {{DefaultMessagingSession}} constructor){color}
  
 {code:java}
  MessagingSession messagingSession = DefaultMessagingSession.getInstance();
 {code}
  
 *Step 2: Creating a {{Topic}}*
 Once you have a {{MessagingSession}} you can create a {{Topic}}. As {{Topics}} are [Command Pattern] {{Contexts}}, here you have the ability to determine how the underlying infrastructure is configured to manage the submitted {{Commands}} to the said {{Topic}}. For example, just like registering a {{Context}} with the [Command Pattern], you may (optionally) provide a {{ContextConfiguration}} and {{Command}} {{ManagementStrategy}}. For example: You can choose how the underlying commands are managed, either colocated with the {{Topic}} or distributed across the cluster. For more information, refer to the [Command Pattern].
  
 {code:java}
  Identifier topicIdentifier = messagingSession.createTopic("my-topic");
 {code}
  
 or alternatively (specifying a specific configuration)
  
 {code:java}
  DefaultContextConfiguration contextConfiguration = new DefaultContextConfiguration(ManagementStrategy.DISTRIBUTED);
  Identifier topicIdentifier = messagingSession.createTopic("my-topic", contextConfiguration);
 {code}
  
 {tip:title=Note}If the {{Topic}} with the specified name (or {{Identifier}}) already exists, the {{Identifier}} of the existing {{Topic}} is returned.{tip}
  
 *Step 3a: Subscribing to a {{Topic}} (non-durable subscribers)*
 Once you have created a {{Topic}} you can subscribe to it by creating a {{Subscriber}}. By default, {{Subscribers}} to {{Topics}} (and {{Queues}}) are non-durable. That is, when a {{Subscriber}} becomes disconnected from the messaging infrastructure, all of the visible messages for the said {{Subscriber}} are appropriately rolled back and the subscription for the {{Subscriber}} is removed.
  
 {code:java}
  Subscriber subscriber = messagingSession.subscribe(topicIdentifier);
 {code}
  
 If you don't have the Identifier of the {{Destination}} to which you would like to subscribe, you can simply use it's name (as a String). For example;
  
 {code:java}
  Subscriber subscriber = messagingSession.subscribe("my-topic");
 {code}
  
 *Step 3b: Subscribing to a {{Topic}} (durable subscribers)*
 To create a durable subscription to a {{Topic}}, you need to provide an appropriate {{TopicSubscriptionConfiguration}} when subscribing.
  
 {code:java}
  Subscriber subscriber = messagingSession.subscribe("my-topic", TopicSubscriptionConfiguration.newDurableConfiguration("my-durable-subscription"));
 {code}
  
 {tip}To resubscribe to a previously created durable subscription, simply call the subscribe method again with the same durable configuration (including the same subscription name){tip}
  
 *Step 4: Publishing _payload_ to a {{Destination}}*
  
 To publish message _payload_ to a {{Destination}}, either {{Topics}} or {{Queues}}, you need to again use the {{MessagingSession}}. It's simple, call {{publishMessage(...)}} with the {{Identifier}} (or name) of the {{Destination}} and the corresponding _payload_.
  
 {code:java}
  messagingSession.publishMessage(topicIdentifier, "Hello World");
 {code}
  
 or
  
 {code:java}
  messagingSession.publishMessage("my-topic", "Hello World");
 {code}
  
 {note}The only thing you need to ensure is that the _payload_ being published is in some way {{serializable}}, either by implementing standard Java {{java.io.serializable}} or Coherence {{ExternalizableLite}} or {{PortableObject}}. {note}
  
 *Step 5: Consuming a message with a {{Subscriber}}*
 In order to consume a message using a {{Subscriber}} you need to use the {{Subscriber}} {{getMessage()}} method. This method will request, block and wait for a message to be delivered to the Subscriber and then returned to your application.
  
 {code:java}
  String message = (String)subscriber.getMessage();
 {code}
  
 By default newly created {{Subscribers}} are in "auto-commit" mode. This means that any message received from a {{Subscriber}} will be automatically acknowledged and removed from underlying messaging infrastructure. If however you'd like the opportunity to *rollback* received messages, simply set the {{Susbcriber}} auto commit mode to false. For Example:
  
 {code:java}
  subscriber.setAutoCommit(false);
 {code}
  
 Once you have done this you may make use of the {{rollback}} or {{commit}} methods on the {{Subscriber}} interface to manually control message acknowledgement in the underlying messaging infrastructure.
  
 {note}Like most messaging systems, including JMS, subscribers should only be used by a single thread. If you require concurrent subscriptions to a {{Destination}}, simply create more {{Subcribers}}.{note}
  
 h3. Using Queues
 The following examples outline the use of Queues (many to many messaging), where each message published is delivered to one and only one subscriber.
  
 !Incubator-Messaging-One-To-One.png!
  
 *Step 1: Creating a {{MessagingSession}}*
 Once you've installed and appropriately configured the implementation (see below), the first thing you need to do is create a {{MessagingSession}}. This simplest way to achieve this is to use the statically defined {{DefaultMessagingSession.getInstance()}} method. {color:gray}(Alternatively you may construct your own instance by providing {{ContextManager}} and {{CommandSubmitter}} instances to the {{DefaultMessagingSession}} constructor){color}
  
 {code:java}
  MessagingSession messagingSession = DefaultMessagingSession.getInstance();
 {code}
  
 *Step 2: Creating a {{Queue}}*
 Much like creating {{Topics}}, to create a {{Queue}} you use a {{MessagingSession}}.
  
 {code:java}
  Identifier queueIdentifier = messagingSession.createQueue("my-queue");
 {code}
  
 or alternatively, like a {{Topic}}, you may specifying a configuration.
  
 {code:java}
  DefaultContextConfiguration contextConfiguration = new DefaultContextConfiguration(ManagementStrategy.DISTRIBUTED);
  Identifier queueIdentifier = messagingSession.createQueue("my-queue", contextConfiguration);
 {code}
  
 {tip:title=Note}If the {{Queue}} with the specified name (or {{Identifier}}) already exists, the {{Identifier}} of the existing {{Queue}} is returned.{tip}
  
 *Step 3: Subscribing to a {{Queue}}*
 Once you have created a {{Queue}} you can subscribe to it by creating a {{Subscriber}}. {{Subscribers}} to {{Queues}} are always non-durable. That is, when a {{Subscriber}} becomes disconnected from the messaging infrastructure, all of the visible messages for the said {{Subscriber}} are appropriately rolled back and the subscription for the {{Subscriber}} is removed.
  
 {code:java}
  Subscriber subscriber = messagingSession.subscribe(queueIdentifier);
 {code}
  
 If you don't have the Identifier of the {{Destination}} to which you would like to subscribe, you can simply use it's name (as a String). For example;
  
 {code:java}
  Subscriber subscriber = messagingSession.subscribe("my-queue");
 {code}
  
 h3. Frequently Asked Questions
  
 (*b) *Why don't you support the Java Messaging Specification (ie: JMS) or feature X of JMS?*
  
 While it is theoretically possible for this implementation to be a SPI (Service Provider Implementation) for the Java Messaging Specification (JMS), this implementation has been explicitly designed to support the development of the [Push Replication Pattern] and other WAN-based architectures, that of which do not necessarily require JMS.
  
 (*b) *How can I monitor the infrastructure?*
  
 Two ways. Firstly by enabling JMX, you'll find that all {{Destinations}} (ie: {{Topics}}) are automatically registered into the clustered JMX tree (as {{ContextExecutors}}). Secondly, as all of the infrastructure state is represented using Coherence distributed caches; you may examine, listen to and mutate the appropriately named caches, called: "contexts", "commands", "messages" and "subscriptions".
  
 h3. References and Additional Information
  
 * The Coherence Incubator - [Command Pattern]