Introduction
The Processing Pattern is a framework which takes advantage of the Coherence Grid to manage and execute "work". The pattern enables work processing capacity to scale with the number of nodes in the grid as well as taking advantage of the resiliency features of the grid, making execution fault tolerant.
Components of a Processing Pattern Implementation
Processing Pattern Client
A Processing Pattern client is an application that submits work in to the grid. When submitting the work, the client receives a SubmissionOutcome which acts as the handle to the Submission. The SubmissionOutcome acts as a Future and can be used to get and wait for the result to be available.
The client naturally also needs to define or know about the work that is going to be done. Work objects implementing the Callable, Runnable, and ResumableTask interfaces are supported out of the box..
Processing Pattern Submissions - the work
When the client submits work in to the grid, it will serialize the members of the work object and store it on a node in the grid. On that node the work object will be de-serialized for further processing, thus the work must be Serializable. Optionally, the work class can implement the Coherence specific Serialization interfaces, ExternalizableLite and PortableObject.
When work is finished, the result is stored in the grid and the SubmissionOutcome is signaled with the result.
Both the client and Server needs to have the work-classes in the classpath in order for the pattern to work. If you want to separate the client and server code, make sure to package the work classes in an independent jar.
Processing Pattern Configurator
The Processing Pattern Configurator is a custom application that sets up how processing shall be performed by the Processing Pattern . It registers Dispatchers and Executors_which are replicated among the nodes in the grid. Subsequently, you only need one instance of a _Processing Pattern Configurator - to seed the Processing Pattern grid nodes with the required Dispatchers and Executors.
This application needs to join the grid as a full fledged grid member. If it is storage enabled it will perform the role of a Cache Server.
You need to add the jars for Coherence Common and the Processing Pattern and the work classes in your application to the classpath.
You also need to pass the Java Virtual Machine property to set the cache configuration.
-Dtangosol.coherence.cacheconfig=coherence-processingpattern-cache-config.xml
Cache Servers
A Coherence grid consists of a number of participating grid nodes, the nodes that are storage enabled (i.e. stores data) are generally called Cache Servers. The Processing Pattern hooks in to Cache Servers to manage and process submissions.
Starting a Cache Server is most easily done by the script in $COHERENCE_HOME$/bin/cache-server.sh/.cmd.
You would want to modify the script to include passing the Java Virtual Machine property to set the cache configuration.
-Dtangosol.coherence.cacheconfig=coherence-processingpattern-cache-config.xml
Also, you will have to add the jars for Coherence Common and the Processing Pattern and the work classes in your application to the classpath.
To add capacity to the grid, you simply start up additional cache-servers.
Queue Poll Executor
A special kind of Executor, a QueuePollExecutor can execute in a grid node. This kind of executor waits on a particular queue (QueueBasedExecutor) set up by the Processing Pattern Server for work. It polls work, executes and returns the result to the grid.
See com.oracle.coherence.patterns.processing.examples.server.QueuePollServer.java_ for an example. This node doesn't have to be storage enabled and can be seen as a client to the grid rather than participating in the grid.
To set up a Queue Poll Executor, start a custom application that creates a QueuePollExecutor. In the classpath you require the processing-pattern, coherence-common and the jar containing the Work classes. You also need to point out the cache-config as a VM property (-Dtangosol.coherence.cacheconfig=coherence-processingpattern-cache-config.xml).
Creating your first processing pattern application
Downloading the code
The following code is also available in the coherence-examples package. See com.oracle.coherence.examples.incubator.processing.task for the java source files for this sample.
Use case - calculating Pi
This particular use case demonstrates a simple task that calculates Pi using N iterations of the Leibnitz formula. The Leibnitz formula for calculating Pi goes like this: PI * 4 = 1 - 1/3 + 1/5 - 1/7 + 1/9 - 1/11.... As is clear from this simple sample, you need a fairly large number of iterations to get the error down as the error is 4/N.
For the purpose of demonstrating the power of the processing pattern, we will make sure that we store away the intermittent result from each iteration and resume processing after yielding.
To implement the task we will use the ResumableTask in the Processing Pattern. The ResumableTask interacts with the TaskExecutionEnvironment to perform actions to checkpoint intermediate results, report progress and to find out if a task is resuming or not.
The key method to implement is
public Object run(TaskExecutionEnvironment oEnvironment);
In the run method, you will know if the task is resuming by calling
boolean isResuming = oEnvironment.isResuming();
In order to voluntarily yield (suspend) the task, return the special Yield object where you pass in the checkpoint state as well as the delay until rescheduling in milliseconds - this means that we will wait at least that duration before resuming executing.
return new Yield("state", 1000);
Code for the PI calculation task
Creating a task like this is simple, there is an AbstractResumableTask that can be used as a base class.
package com.oracle.coherence.patterns.processing.examples.task;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import com.oracle.coherence.patterns.processing.task.AbstractResumableTask;
import com.oracle.coherence.patterns.processing.task.TaskExecutionEnvironment;
import com.oracle.coherence.patterns.processing.task.Yield;
import com.oracle.coherence.common.tuples.Triple;
import com.tangosol.io.pof.PofReader;
import com.tangosol.io.pof.PofWriter;
import com.tangosol.util.ExternalizableHelper;
/**
* <p>
* The CalculatePITask calculates PI using the Leibnitz formula:
* PI * 4 = 1 - 1/3 + 1/5 - 1/7 + 1/9 - 1/11...
* </p>
* <p>
* This particular implementation leverages the ability to
* store a checkpoint and {@link Yield} the processing for each iteration.
* </p>
*
* @author Christer Fahlgren 2009.10.12
*/
public class CalculatePITask
extends AbstractResumableTask
{
/**
* Default Constructor
*/
public CalculatePITask()
{
super("gridtask");
}
/**
* Constructor taking the number of iterations as a parameter.
*
* @param nIterations the number of iterations in the algorithm
*/
public CalculatePITask(int nIterations)
{
super("gridtask");
m_nIterations = nIterations;
}
public Object run(TaskExecutionEnvironment oEnvironment)
{
System.out.println("CalculatePI");
double denominator = 1.0;
double pi = 1.0;
int currIteration = 1;
if (oEnvironment.isResuming())
{
Triple<Integer, Double, Double> triple = (Triple<Integer, Double, Double>) oEnvironment.loadCheckpoint();
if (triple != null)
{
currIteration = triple.getX();
denominator = triple.getY();
pi = triple.getZ();
}
}
System.out.println("Iteration:" + currIteration + " denominator:" + denominator + " current pi:" + pi * 4);
denominator += 2.0;
if (currIteration % 2 == 1)
{
pi -= (double) (1 / denominator);
}
else
{
pi += (double) (1 / denominator);
}
currIteration++;
if (currIteration >= m_nIterations)
{
return pi * 4;
}
else
{
Triple<Integer, Double, Double> newTriple = new Triple<Integer, Double, Double>(currIteration, denominator, pi);
return new Yield(newTriple, 0);
}
}
public void readExternal(DataInput in) throws IOException
{
super.readExternal(in);
m_nIterations = ExternalizableHelper.readInt(in);
}
public void writeExternal(DataOutput out) throws IOException
{
super.writeExternal(out);
ExternalizableHelper.writeInt(out, m_nIterations);
}
public void readExternal(PofReader reader) throws IOException
{
super.readExternal(reader);
m_nIterations = reader.readInt(10);
}
public void writeExternal(PofWriter writer) throws IOException
{
super.writeExternal(writer);
writer.writeInt(10, m_nIterations);
}
/**
* The number of iterations in the algorithm.
*/
private int m_nIterations;
}
Submitting the task and getting a result back
The steps needed to submit a task is to:
- Get hold of the ProcessingSessionFactory.
- Create a ProcessingSession.
- Use that Session to submit a Task, store the SubmissionOutcome.
- Get the result of the task through the SubmissionOutcome.
Below we have the code that creates three PICalculationTasks and waits for its completion.
package com.oracle.coherence.examples.incubator.processing.task;
import java.util.concurrent.ExecutionException;
import com.oracle.coherence.common.identifiers.StringBasedIdentifier;
import com.oracle.coherence.patterns.processing.ProcessingSession;
import com.oracle.coherence.patterns.processing.ProcessingSessionFactory;
import com.oracle.coherence.patterns.processing.SubmissionOutcome;
import com.oracle.coherence.patterns.processing.internal.DefaultSubmissionConfiguration;
import com.oracle.coherence.patterns.processing.internal.task.LocalExecutorList.ExecutorAlreadyExistsException;
import com.tangosol.net.CacheFactory;
/**
*
* The PICalculationSample illustrates how to submit a Task that yields and
* stores state.
*
* @author Christer Fahlgren 2009.10.13
*
*/
public class PiCalculationSample
{
/**
* Main method of sample.
*
* @param args input arguments to application
*/
public static void main(String[] args)
{
PiCalculationSample sample = new PiCalculationSample();
try
{
sample.executeSample();
}
catch (Exception e)
{
e.printStackTrace();
}
}
/**
* Method that executes the sample.
*
* @throws InterruptedException if execution is interrupted
* @throws ExecutionException if execution fails
* @throws ExecutorAlreadyExistsException if executor is already registered
*/
public void executeSample()
throws InterruptedException,
ExecutionException, ExecutorAlreadyExistsException
{
System.out.println("Testing PI calculation");
ProcessingSessionFactory factory = ProcessingSessionFactory.getInstance();
ProcessingSession session = factory.getSession(StringBasedIdentifier.newInstance("PICalculationSample"));
SubmissionOutcome pioutcome = session.submit(new CalculatePITask(20000), new DefaultSubmissionConfiguration());
SubmissionOutcome pioutcome2 = session.submit(new CalculatePITask(10000), new DefaultSubmissionConfiguration());
SubmissionOutcome pioutcome3 = session.submit(new CalculatePITask(1000), new DefaultSubmissionConfiguration());
System.out.println("Pi calculated in 20000 iterations is: " + pioutcome.get().toString());
System.out.println("Pi calculated in 10000 iterations is: " + pioutcome2.get().toString());
System.out.println("Pi calculated in 1000 iterations is: " + pioutcome3.get().toString());
factory.shutdown();
CacheFactory.shutdown();
}
}
Creating a processing pattern configurator
The processing pattern configurator sets up the processing pattern grid nodes with the dispatchers and executors that shall process work in the grid.
The following configurator registers all available dispatchers and executors in the processing pattern in one go.
The steps involved to configure the processing pattern grid include:
- Getting a handle to the ProcessingFrameworkFactory
- Getting a dispatcher manager
- Creating and registering a LoggerDispatcher which will simply log the Submission to the log.
- Creating and registering a LocalExecutorDispatcher which will handle Runnables and Callables.
- Creating and registering a DefaultTaskDispatcher which will handle ResumableTasks
- Creating Executors and registering them
- Creating a GridExecutor and register it, the GridExecutor in this sample will handle Tasks of the type gridtask
- Creating a QueueBasedExecutor and register it, the QueueBasedExecutor in this sample will handle Tasks of the type nodetask
package com.oracle.coherence.examples.incubator.processing.server;
import com.oracle.coherence.common.identifiers.StringBasedIdentifier;
import com.oracle.coherence.patterns.processing.dispatchers.Dispatcher;
import com.oracle.coherence.patterns.processing.dispatchers.local.LocalExecutorDispatcher;
import com.oracle.coherence.patterns.processing.dispatchers.logging.LoggingDispatcher;
import com.oracle.coherence.patterns.processing.dispatchers.task.DefaultTaskDispatcher;
import com.oracle.coherence.patterns.processing.dispatchers.task.TaskTypeSelectionPolicy;
import com.oracle.coherence.patterns.processing.executors.GridExecutor;
import com.oracle.coherence.patterns.processing.executors.QueueBasedExecutor;
import com.oracle.coherence.patterns.processing.friends.DispatcherManager;
import com.oracle.coherence.patterns.processing.friends.ProcessingFrameworkFactory;
import com.tangosol.net.DefaultCacheServer;
/**
* The Processing Pattern Configurator sets up the dispatchers and
* Executors for the Processing Pattern grid.
*
* Note that since Dispatchers and Executors are replicated, other nodes just need to start
* up regular cache-servers (with the ProcessingPattern and the application
* classes in the classpath).
*
* @author Christer Fahlgren 2009.09.30
*
*/
public class ProcessingPatternConfigurator
{
/**
* Starts the ProcessingServer.
*
* @throws InterruptedException if start fails
*/
public void start() throws InterruptedException
{
DefaultCacheServer.start();
try
{
ProcessingFrameworkFactory oProcessingFactory = ProcessingFrameworkFactory.getInstance();
DispatcherManager manager = oProcessingFactory.getDispatcherManager();
Dispatcher loggingDispatcher = new LoggingDispatcher("MyLogger");
manager.registerDispatcher(loggingDispatcher);
Dispatcher localExecutorDispatcher = new LocalExecutorDispatcher(
"MyLocalExecutorDispatcher",
oProcessingFactory.getSubmissionResultManager(),
oProcessingFactory.getSubmissionManager());
manager.registerDispatcher(localExecutorDispatcher);
Dispatcher taskDispatcher = new DefaultTaskDispatcher("DefaultTaskDispatcher", new TaskTypeSelectionPolicy());
manager.registerDispatcher(taskDispatcher);
GridExecutor oGridExecutor = new GridExecutor(StringBasedIdentifier.newInstance("GridExecutor"), "Test Executor",
"gridtask", oProcessingFactory.getSubmissionManager(),
oProcessingFactory.getSubmissionResultManager(), 5);
QueueBasedExecutor nodeexecutor = new QueueBasedExecutor(
StringBasedIdentifier.newInstance("QueueExecutor"),
"Test Executor", "nodetask");
oProcessingFactory.getExecutorManager().registerExecutor(oGridExecutor);
oProcessingFactory.getExecutorManager().registerExecutor(nodeexecutor);
System.out.println("Processing pattern is now configured.");
System.out.println("This node will continue to serve as a Processing Pattern grid node");
while (true)
{
Thread.sleep(1000);
}
}
catch (Exception oException)
{
oException.printStackTrace();
}
}
/**
* Main method of configurator.
*
* @param args input arguments
*/
public static void main(String[] args)
{
ProcessingPatternConfigurator instance = new ProcessingPatternConfigurator();
try
{
instance.start();
}
catch (InterruptedException oException)
{
oException.printStackTrace();
}
}
}
Building the Processing Pattern Example
Prior to running the Processing Pattern example you'll need to build it. Before starting you'll want to make sure that you've set the JAVA_HOME and COHERENCE_HOME variables in the set-env.(sh|bat) script. Once done to build the example you use the build-incubator script:
- Run or source the bin/set-env(.sh|bat) script.
- Build the Processing Pattern examples by running bin/build-incubator.(sh|bat) processingpattern
Running the Processing Pattern Example
To run the Processing Pattern example you'll need two command line windows. Before starting you'll want to make sure that you've set the JAVA_HOME and COHERENCE_HOME variables in the set-env.(sh|bat) script. Once done to run the example you'll want to:
- In the first window run or source the bin/set-env(.sh|bat) script.
- In the first window run the Processing Pattern Configurator by running: bin/run-incubator.(sh|bat) resource/processingpattern/ProcessingPatternConfigurator.properties
- In the second window run or source the bin/set-env(.sh|bat) script.
- In the second window run the example by running: bin/run-incubator.(sh|bat) resource/processingpattern/PiCalculationSample.properties
The example will write the progress of the calculations in the ProcessingPatternConfigurator window and the finished result will appear in the PiCalculationSample window.