 | This documentation applies to the Messaging Pattern 2.1.1. The latest Messaging Pattern documentation is available here. |
The Messaging Pattern
This is an example implementation of store and forward messaging built on top of Coherence.
The Messaging Pattern advocates that;
- Payload, typically represented as a Message object, may be sent to a Destination from a Sender (also commonly known as a Publisher).
- It is the responsibility of the infrastructure managing the Destination to ensure that Messages (arriving at the said Destination) are then stored (in some manner) and consequently forwarded (in the order in which they arrived at the said Destination) to one or more Receivers (also commonly known as Subscribers).
- The Subscribers appropriately consume (receive and acknowledge receipt) of the said Messages from the Destination in the order in which they were forwarded to the said Subscribers.
- The infrastructure managing the Messages appropriately clean-up (remove and garbage collect) the said Messages that have been consumed by Subscribers.
- The type of the Destination determines the method of delivery to the Subscribers on that Destination.
- A Topic Destination (or Topic) will store and forward Messages to all of the Subscribers of the said Topic Destination. This form of Message delivery is often called "publish-and-subscribe messaging", "one-to-many messaging" or "the observer pattern".
- A Queue Destination (or Queue) will store and forward Messages to at most one of the Subscribers of the said Queue Destination. For each Message a different Subscriber may be used, but this is implementation and runtime dependent. This form of Message delivery is often called "point-to-point messaging" or "one-to-one messaging".
- A Message may be Persistent or Non-Persistent. In the case of Persistent Messages, the infrastructure managing the Destination must safely store the said Messages to a persistent (and recoverable) storage device so that in the case of infrastructure failure, Messages may be recovered (not lost).
- A Subscriber to a Topic is either Durable or Non-Durable. Durable Subscriptions allow the system implementing the Subscriber to terminate and return without losing Messages that may have been delivered during the outage (or disconnection).
The current implementation of the Messaging Pattern has the following limitations;
- Only Topic Destinations (one-to-many messaging) is supported.
- All Topic Subscriptions are Cluster Durable. That is, they will remain subscribed while the Coherence cluster is operational.
- All Messages are Non Persistent. However as Messages are managed by Coherence (in memory), server failure won't result in message loss.
|
Outline
Rationale
While it's rare that an architecture making extensive use of Coherence will require Store and Forward Messaging (due to the ability to use MapListeners and events for notifications), there are arguably some circumstances (when clients may be disconnected) where the pattern is particularly useful.
Although providing an implementation of Store and Forward Messaging on-top-of-Coherence has always been possible (as demonstrated by this implementation), most application-level requirements for messaging are often better satisfied using existing, off-the-shelf and standardized corporate messaging infrastructure.
It is not the intention nor is it the purpose of this implementation to replace existing messaging infrastructure, but instead it is designed to provide a flexible framework for application-specific, high-performance messaging on a Data Grid.
More specifically, this implementation has been designed as a minimal framework to support multi-directional and multi-point (no single point of failure) push replication between multiple Coherence clusters that are deployed and interconnected by high-latency, high-bandwidth but unreliable wide-area-networks (WANs)... aka: The Push Replication Pattern. |
Implementation Discussion
 | Before you start
The Store and Forward Messaging pattern makes extensive use of the Command Pattern for managing Destinations and Message delivery. In fact, the entire messaging infrastructure is modeled as a set of Commands. Consequently it is advised that you make yourself familiar with the Command Pattern (and it's implementation) to understand the implementation of this pattern. |
Internally all of the standard messaging concepts, like (Destinations, Subscriptions and Messages), with the exception of Subscribers, are represented as objects placed in one or more Coherence managed distributed caches. Destinations (Topics in this case) themselves are actually represented as Command Pattern Contexts, which themselves are places in the "contexts" distributed cache. All messaging interactions, for example creating subscriptions for a Destination, publishing messages to a Destination, are modeled as Command Pattern Commands submitted to the said Destinations for execution. The major consequences and benefits of this approach are that the implementation;
- Is naturally geared for accepting as many messages as physically possible (until the network capacity is exceeded or you run out of storage), off-loading the actual delivery to another thread (a CommandExecutor)
- Is completely JMX enabled and monitorable. Simply enable JMX monitoring and the Contexts representing the Destinations will appear in the JMX-tree.
- Message delivery is guaranteed to occur in-order (due to the underlying use of the Command Pattern for executing Commands).
- Server-loss (ie: cluster member loss) does not effect the availability of the messaging infrastructure (as it's built using Coherence distributed caching).
The most significant interface (from an application-level perspective) is the MessagingSession. Implementations of this interface, namely the DefaultMessagingSession, are how you control the messaging infrastructure, including;
- Creating Topic Destinations
- Creating Subscriptions to Topics
- Publishing Payload to Topics, and
- Consuming Payload from Topic Subscriptions
The following section steps you through a simple example demonstrating how to publish and consume a message from a Topic.
Example
Step 1: Creating a MessagingSession
Once you've installed and appropriately configured the implementation (see below), the first thing you need to do is create a MessagingSession. This simplest wat to achieve this is to use the statically defined DefaultMessagingSession.getInstance() method. (Alternatively you may construct your own instance by providing ContextManager and CommandSubmitter instances to the DefaultMessagingSession constructor)
MessagingSession messagingSession = DefaultMessagingSession.getInstance();
Step 2: Creating a Topic
Once you have a MessagingSession you can create a Topic. As Topics are Command Pattern Contexts, here you have the ability to determine how the underlying infrastructure is configured to manage the submitted Commands to the said Topic. For example, just like registering a Context with the Command Pattern, you may (optionally) provide a ContextConfiguration and Command ManagementStrategy. For example: You can choose how the underlying commands are managed, either colocated with the Topic or distributed across the cluster. For more information, refer to the Command Pattern.
Identifier topicIdentifier = messagingSession.createTopic("my-topic");
or alternatively (specifying a specific configuration)
DefaultContextConfiguration contextConfiguration = new DefaultContextConfiguration(ManagementStrategy.DISTRIBUTED);
Identifier topicIdentifier = messagingSession.createTopic("my-topic", contextConfiguration);
 | Note If the Topic with the specified name (or Identifier) already exists, the Identifier of the existing Topic is returned. |
Step 3: Creating a Subscription to a Topic
Once you have created a Topic you can create a TopicSubscription.
 | Warning This is where this pattern implementation differs significantly from traditional messaging solutions, including that of JMS. |
Instead of registering and being provided with a class that represents the Subscriber to a Topic (which in turn may receive notifications when Messages are delivered), you instead must register a class that is responsible for holding the state of a subscription to a Topic. Once you have achieved this, you are returned a SubscriptionIdentifier that you may then in turn use to refer to the said Subscription.
 | Why?
Unlike other messaging solutions, this allows us to control how subscription state is represented, and further, extend this to provide other information, features and functionality - as used in the Push Replication Pattern with PublishingSubscriptions |
To support a more traditional use of the Store and Forward Messaging pattern, a class has been provided to represent the standard information required for a TopicSubscription - called TopicSubscription.
SubscriptionIdentifier subscriptionIdentifier = messagingSession.createTopicSubscription(new TopicSubscription(topicIdentifier, "my-subscription"));
Step 4: Publishing message payload to a Topic
To publish message payload to a Topic, you need to again use the MessagingSession. It's simple, call publishMessage(...) with the Identifier for the Destination and the corresponding payload.
messagingSession.publishMessage(topicIdentifier, "Hello World");
 | The only thing you need to ensure is that the payload being published is in some way serializable, either by implementing standard Java java.io.serializable or Coherence ExternalizableLite or PortableObject. |
Step 5: Consuming a message from a Topic Subscription
In order to consume a message from a Topic Subscription, you need to use the MessagingSession method called consumeMessageForTopicSubscription, providing it with the Identifier of the Subscription.
String message = (String)messagingSession.consumeMessageForTopicSubscription(subscriptionIdentifier);
 | null is returned if there are currently no messages available for the Subscription. |
 | Like most messaging systems, including JMS, you should be careful about having multiple threads attempting to consume from a single subscription. |
Frequently Asked Questions
Why don't you support the Java Messaging Specification (ie: JMS) or feature X of JMS?
While it is theoretically possible for this implementation to be a SPI (Service Provider Implementation) for the Java Messaging Specification (JMS), this implementation has been explicitly designed to support the development of the Push Replication Pattern and other WAN-based architectures, that of which do not necessarily require JMS. As it stands, this implementation provides only minimal support of certain types of messaging semantics (as required by the Push Replication Pattern, but it is conceivable in the future, based on demand and community contributions, we may add further support for other messaging semantics or APIs.
Why isn't Queuing (point-to-point messaging) provided?
As above. However it's next on the list. 
How can I monitor the infrastructure?
Two ways. Firstly by enabling JMX, you'll find that all Destinations (ie: Topics) are automatically registered into the clustered JMX tree (as ContextExecutors. Secondly, as all of the infrastructure state is represented using Coherence distributed caches, you may examine, listen to and mutate the appropriately named caches called; "contexts", "commands", "messages" and "subscriptions".
References and Additional Information