Scaling Out Your Data Grid Aggregations Linearly

Added by Rob Misek, last edited by Jon Purdy on Oct 18, 2006  (view change)


Enter labels to add to this page:
Wait Image 
Looking for a label? Just start typing.
All of the Data Grid capabilities described below are features of Coherence Application Edition and higher.


Coherence provides a data grid by dynamically, transparently and automatically partitioning the data set across all storage enabled nodes in a cluster. We have been doing some scale out testing on our new Dual 2.3GHz PowerPC G5 Xserve cluster and here is one of the tests that we have performed using the Data Grid Aggregation feature.

The new InvocableMap tightly binds the concepts of a data grid (i.e. partitioned cache) and the processing of the data stored in the grid. When you take the InvocableMap and combine it with the linear scalability of Coherence itself you get an extremely powerful solution. The following tests show that you can take an application that Coherence provides you (the developer, the engineer, the architect, etc.) the ability to build an application once that can scale out to handle any SLA requirement, any increase in throughput requirements. For example, the following test demonstrate Coherence's ability to linearly increase the number of trades aggregated per second as you increase hardware. I.e. if one machine can aggregate X trades per second, if you add a second machine to the data grid you will be able to aggregate 2X trades per second, if you add a third machine to the data grid you will be able to aggregate 3X trades per second and so on...

The data

First, we need some data to aggregate. In this case we used a Trade object with a three properties Id, Price and Symbol.

Configure a partitioned cache

The cache configuration is easy through the XML Cache Configuration Elements. Here I define one wildcard cache-mapping that maps to one caching-scheme which has unlimited capacity:

Add an index to the Price property

Adding an index to this property increases performance by allowing Coherence to access the values directly rather than having to deserialize each item to accomplish the calculation. In our tests the aggregation speed was increased by more than 2x after an index was applied.

The code to perform a parallel aggregation across all JVMs in the data grid

The aggregation is initiated and results received by a single client. I.e. a single "low-power" client is able to utilize the full processing power of the cluster/data grid in aggregate to perform this aggregation in parallel with just one line of code.

The testing environment/process

A "test run" does a number of things:

  1. Loads 200,000 trade objects into the data grid.
  2. Adds indexes to Price property.
  3. Performs a parallel aggregation of all trade objects stored in the data grid. This aggregation step is done 20 times to obtain an "average run time" to ensure that the test takes into account garbage collection.
  4. Loads 400,000 trade objects into the data grid.
  5. Repeats steps 2 and 3.
  6. Loads 600,000 trade objects into the data grid.
  7. Repeats steps 2 and 3.
  8. Loads 800,000 trade objects into the data grid.
  9. Repeats steps 2 and 3.
  10. Loads 1,000,000 trade objects into the data grid.
  11. Repeats steps 2 and 3.
The test client itself is run on an Intel Core Duo iMac which is marked as Local Storage disabled.

The command line used to start the client was:

java ... -Dtangosol.coherence.distributed.localstorage=false -Xmx128m -Xms128m com.tangosol.examples.coherence.invocable.TradeTest

This "test suite" (and subsequent results) includes data from four "test runs":

  1. Start 4 JVMs on one Xserve - Perform a "test run"
  2. Start 4 JVMs on each of two Xserves - Perform a "test run"
  3. Start 4 JVMs on each of three Xserves - Perform a "test run"
  4. Start 4 JVMs on each of four Xserves - Perform a "test run"
In this case a "JVM" refers to a cache server instance (i.e. a data grid node) that is a standalone JVM responsible for managing/storing the data. I used the DefaultCacheServer helper class to accomplish this.

The command line used to start the server was:

java ... -Xmx384m -Xms384m -server

JDK Version

The JDK used on both the client and the servers was Java(TM) 2 Runtime Environment, Standard Edition (build 1.5.0_05-84)

The results

The lowest and highest times were not used in the calculations below resulting in a data set of eighteen results used to create an average.

As you can see in the following graph the average aggregation time for the aggregations decreases linearly as more cache servers/machines are added to the data grid!

Similarly, the following graph illustrates how the aggregations per second scales linearly as you add more machines! When moving from 1 machine to 2 machines the trades aggregated per second double, when moving from 2 machines to 4 machines the trades aggregated per second double again.

The above aggregations will complete successfully and correctly even if one of the cache servers or and entire machine fails during the aggregation!


Combining the Coherence data grid (i.e. partitioned cache) with the InvocableMap features enables:

  • Applications to scale out data grid calculations linearly;
  • Groups to meet increasingly aggresive SLAs by dynamically/trasparently adding more resources to the data grid. I.e. if you need to achieve 1,837,932 trade aggregations per second all that is required is to start 16 more cache servers across four more machines;