The Messaging PatternThis is an example implementation of store and forward messaging built on top of Coherence. The Messaging Pattern advocates that;
RationaleWhile it's rare that an architecture making extensive use of Coherence will require Store and Forward Messaging (due to the ability to use MapListeners, Continuous Queries and events for notifications), there are arguably some circumstances (when clients may be disconnected) where the pattern is particularly useful. Although providing an implementation of store and forward Messaging on-top-of-Coherence has always been possible (as demonstrated by this implementation), most application-level requirements for messaging are often satisfied using existing, off-the-shelf and standardized corporate messaging infrastructure, like JMS. In the cases where such infrastructure is not available or not appropriate, this implementation may provide some benefits. While it is not the intention nor is it the purpose of this implementation to replace existing messaging infrastructure, this implementation is specifically designed to provide a flexible framework for application-specific, high-performance messaging on a Data Grid. More specifically, this implementation has been designed as a minimal framework to support multi-directional and multi-point (no single point of failure) push replication between multiple Coherence clusters that are deployed and interconnected by high-latency, high-bandwidth but unreliable wide-area-networks (WANs)... aka: The Push Replication Pattern. |
Coherence 3.6 Support
Coherence 3.5 Support
Previous ReleasesThe following releases are provided for historical purposes only. DependenciesThis project (like other Coherence Incubator projects) uses Apache Ivy for dependency specification and management. While a standard ivy.xml definition file ships with the source and documentation distribution, the following diagram visually indicates the current dependencies.
|
Benefits of Datagrid-based Hub-less Messaging
The implementation of the Store and Forward Messaging Pattern on Oracle Coherence provides many unique advantages over traditional hub-and-spoke messaging solutions.
- The messaging infrastructure becomes part of and shares the same Coherence Datagrid infrastructure (by default) as your application. Consequently the requirement for separately installed and maintained "messaging servers" is not required.
- Being "part" of your application, there is no reason to wrap application objects as Messages (unlike JMS). You can simply publish and consume your application and domain objects.
- As all stateful objects in the pattern are managed using standard distributed caches (including Topics, Queues, Subscriptions, and Messages themselves), you may use standard Oracle Coherence features to monitor, manage, query and observe the state of the messaging infrastructure.
- Given that the source code is publicly available, you are free to extend and enhance the messaging solution to suit your needs.
- Being based on Oracle Coherence, you can rest assured that the solution will provide highly available, scalable (scale-out) and resilient messaging infrastructure.
What's New?
Changes made as part of version 2.7.4:
- Resolved intermittent deadlock between message publishing and queue roll-back.
Changes made as part of version 2.7.3:
- Upgraded to use [Coherence Common 1.7.3]
- Resolved issue where a QueueSubscriber abnormal termination may result in another QueueSubscriber hanging when it calls getMessage.
Changes made as part of version 2.7.2:
- Upgraded to use [Coherence Common 1.7.2]
- Resolved issue where a message was previously rolled back, manually committing it could lead to a hang.
- Resolved a race condition due to lazy delete of a subscription could lead to a subsequent re-use of a deleted durable
subscription to be deleted when it shouldn't be.
- Resolved log formatting missing information.
- Resolved issue where MessageHeader.makeVisibleTo could lead to a POF failure during fail over.
- Resolved issue where subscribers intermittently hang during Queue fail over.
- Hardend concurrent publisher and subscriber implementation to prevent orphaned messages.
- Resolved issue where messagingPattern calling ensureCluster inappropriately.
- Removed RemoveSubscriberFromMessageProcessor from coherence-messagingpattern-pof-config.xml
Changes made as part of version 2.7.1:
- Upgraded to use [Coherence Common 1.7.1]
Changes made as part of version 2.7.0:
- Upgraded to use Coherence Common 1.7.0
- Resolved Intermittent hang when re-subscribing to a durable subscription.
- Resolved issue where messages might be dropped during message partition fail-over.
- Resolved issue where duplicate message detection by Subscription doesn't work if multiple message identifiers
from multiple partitions are contained in a single message tracker.
Changes made as part of version 2.6.1:
- Upgraded to use [Coherence Common 1.6.1]
Changes made as part of version 2.6.0:
- Upgraded to use Coherence Common 1.6.0
- Remove dependency on the Command Pattern.
- Improved message publishing scalability such that messages are no longer routed through a single member.
- Messages are now ordered on a per-publisher basis rather than globally for all publishers.
- Resolved intermittent problem with lease expiration.
Changes made as part of version 2.5.0:
- Upgraded to use Coherence Common 1.5.0
- Upgraded to use Command Pattern 2.5.0
- Upgraded to to depend on Coherence 3.5.2 or Coherence 3.4
- Resolved issue where possible orphaning of Messages when rolling-back messages that are held by a subscriber who's lease has expired.
Changes made as part of version 2.4.0:
- Upgraded to use Coherence Common 1.4.0
- Upgraded to use Command Pattern 2.4.0
- Resolved issue where message "draining" would not work with PublishingSubscription (as they are now special types of Subscriptions, not TopicSubscriptions).
- Allow TopicSubscriptionConfigurations to specify/override the default subscription lease duration.
- Default subscription lease durations are now set to 2 minutes (like a TCP/IP time-out) instead of 10 seconds. This helps solve potential problems when long GC's occur and/or massive re-partitioning occurs.
- Non-durable Topic Subscriptions are no longer created as Durable Topic Subscriptions.
Changes made as a part of version 2.3.0:
- Migrated to use Apache Ivy for dependency management and publishing artifacts (introduced ivy.xml, removed dependencies.info). Apart from now using standardized repositories and the potential to integrate with Maven, all Coherence Incubator projects should now (almost) have the same build.xml files!
- Ugraded to use coherence-common-1.3.0 and coherence-commandpattern-2.3.0
- Added the concept of Subscription Visibility to Messages. That is, Messages now hold information that details what Subscriptions can "see" a Message (ie: visibleTo), those that have "read" a Message (ie: deliveredTo) and those that have "consumed" a Message (ie: acknowledgedBy). Refactored previous implementation to support this. Public interfaces remain unchanged.
- Added support for Queues (one-to-one messaging).
- Added support for (configurable) Subscribers, including Queue subscribers, durable and
non-durable Topic subscribers.
- Upgraded to use the coherence-common Logger
Implementation Discussion
The Store and Forward Messaging pattern no longer uses the Command Pattern, but rather makes extensive use of entry processors and asynchronous event processing scheduled by backing map listeners. As a result, message publishing to a given topic or queue is no longer funneled through a single command context running on a given node. Rather, publishing is now distributed across the cluster and will scale as the cluster scales.
Internally, the Messaging Pattern uses three Coherence distributed caches: the Destination cache (for Topic and Queue objects), the Subscription cache and the Message cache. To use Messaging, an application creates a destination, subscribes to that destination, then publishes messages to the destination. When an application publishes a message, it is put in the Message cache. A Message cache backing map listener receives an insert event and schedules a thread to process the newly inserted message. If the destination is a topic, the background thread will invoke each subscription to that topic notifying it about the new message. All topic subscriptions will receive the message. For a queue destination, the background thread will notify the queue, which then hands the message off to a single subscription, the next one waiting for a message. Meanwhile, the subscriber, which runs in the client JVM, has a client listener registered on the subscription cache. When the messages arrives at the subscription, the subscriber listener is called and it puts the subscription object on an internal LinkedBlockingQueue. When the client thread calls the Subscriber.getMessage method, the subscriber waits until the LinkedBlockingQueue has an entry. Once the subscriber takes a entry off the queue, it then gets the message from the Message cache and returns the payload to the application.
Some of features of Messages
- Is naturally geared for accepting as many messages as physically possible (until the network capacity is exceeded or you run out of storage).
- Is completely JMX enabled and monitorable. Simply enable Coherence JMX monitoring to the Messaging tree which shows all topics, queues and subscriptions.
- Message delivery is guaranteed to occur in-order on a per client session basis (publisher).
- Server-loss (i.e.: cluster member loss) does not effect the availability of the messaging infrastructure (as it's built using Coherence distributed caching).
The most significant interface (from an application-level perspective) is the MessagingSession. Implementations of this interface, namely the DefaultMessagingSession, are how you control the messaging infrastructure, including;
- Creating Topic and Queue Destinations
- Subscribing and Unsubscribing to Destinations (creating Subscribers)
- Publishing Payload to Destinations, and
- Subscribing and reading (ie: consuming) Payload from Subscriptions
Using Topics and Queues
The following tabs outline the use Topics and Queues in an application.
Frequently Asked Questions
Why don't you support the Java Messaging Specification (ie: JMS) or feature X of JMS?
While it is theoretically possible for this implementation to be a SPI (Service Provider Implementation) for the Java Messaging Specification (JMS), this implementation has been explicitly designed to support the development of the Push Replication Pattern and other WAN-based architectures, that of which do not necessarily require JMS.
How can I monitor the infrastructure?
Two ways. Firstly by enabling JMX, you'll find that all Destinations (ie: Topics) are automatically registered into the clustered JMX tree (under Messaging). Secondly, as all of the infrastructure state is represented using Coherence distributed caches; you may examine, listen to and mutate the appropriately named caches, called: "coherence.messagingpattern.destinations", "coherence.messagingpattern.messages" and "coherence.messagingpattern.subscriptions".


