This section describes different patterns you can use to pre-load the cache. The patterns include bulk loading and distributed loading.
Performing Bulk Loading and Processing
The attached example, PagedQuery.java demonstrates techniques for efficiently bulk loading and processing items in a Coherence Cache.
Bulk Writing to a Cache
A common scenario when using Coherence is to pre-populate a cache before the application makes use of it. A simple way to do this is illustrated by the following Java code:
public static void bulkLoad(NamedCache cache, Connection conn)
{
Statement s;
ResultSet rs;
try
{
s = conn.createStatement();
rs = s.executeQuery("select key, value from table");
while (rs.next())
{
Integer key = new Integer(rs.getInt(1));
String value = rs.getString(2);
cache.put(key, value);
}
...
}
catch (SQLException e)
{...}
}
This technique works, but each call to put may result in network traffic, especially for partitioned and replicated caches. Additionally, each call to put will return the object it just replaced in the cache (per the java.util.Map interface) which will add more unnecessary overhead. Loading the cache can be made much more efficient by using the
ConcurrentMap.putAll method instead:
public static void bulkLoad(NamedCache cache, Connection conn)
{
Statement s;
ResultSet rs;
Map buffer = new HashMap();
try
{
int count = 0;
s = conn.createStatement();
rs = s.executeQuery("select key, value from table");
while (rs.next())
{
Integer key = new Integer(rs.getInt(1));
String value = rs.getString(2);
buffer.put(key, value);
if ((count++ % 1000) == 0)
{
cache.putAll(buffer);
buffer.clear();
}
}
if (!buffer.isEmpty())
{
cache.putAll(buffer);
}
...
}
catch (SQLException e)
{...}
}
Efficient processing of filter results
Coherence provides the ability to query caches based on criteria via the filter API. Here is an example (given entries with integers as keys and strings as values):
NamedCache c = CacheFactory.getCache("test");
Filter query = new LikeFilter(IdentityExtractor.INSTANCE, "c%", '\\', true);
Set results = c.entrySet(query);
for (Iterator i = results.iterator(); i.hasNext();)
{
Map.Entry e = (Map.Entry) i.next();
out("key: "+e.getKey() + ", value: "+e.getValue());
}
This example works for small data sets, but it may encounter problems if the data set is too large, such as running out of heap space. Here is a pattern to process query results in batches to avoid this problem:
public static void performQuery()
{
NamedCache c = CacheFactory.getCache("test");
Filter query = new LikeFilter(IdentityExtractor.INSTANCE, "c%", '\\', true);
Set keys = c.keySet(query);
final int BUFFER_SIZE = 100;
Set buffer = new HashSet(BUFFER_SIZE);
for (Iterator i = keys.iterator(); i.hasNext();)
{
buffer.add(i.next());
if (buffer.size() >= BUFFER_SIZE)
{
Map entries = c.getAll(buffer);
process(entries);
buffer.clear();
}
}
if (!buffer.isEmpty())
{
process(c.getAll(buffer));
}
}
public static void process(Map map)
{
for (Iterator ie = map.entrySet().iterator(); ie.hasNext();)
{
Map.Entry e = (Map.Entry) ie.next();
out("key: "+e.getKey() + ", value: "+e.getValue());
}
}
In this example, all keys for entries that match the filter are returned, but only BUFFER_SIZE (in this case, 100) entries are retrieved from the cache at a time.
Note that
LimitFilter can be used to process results in parts, similar to the example above. However LimitFilter is meant for scenarios where the results will be paged, such as in a user interface. It is not an efficient means to process all data in a query result.
A Bulk Loading and Processing Example
Here is an example program that demonstrates the concepts described above.
Sample Bulk Loading Program and Output
To run the example, follow these steps:
- Save the following Java file as com/tangosol/examples/PagedQuery.java
- Point the classpath to the Coherence libraries and the current directory
- Compile and run the example
Performing Distributed Bulk Loading
When pre-populating a Coherence partitioned cache with a large data set, it may be more efficient to distribute the work to Coherence cluster members. Distributed loading will allow for higher data throughput rates to the cache by leveraging the aggregate network bandwidth and CPU power of the cluster. When performing a distributed load, the application will need to decide on the following:
- which cluster members will perform the load
- how to divide the data set among the members
The application should take into account the load that will be placed on the underlying data source (such as a database or file system) when selecting members and dividing work. For example, a single database can easily be overwhelmed if too many members execute queries concurrently.
A Distributed Bulk Loading Example
This section outlines the general steps to perform a simple distributed load. The example assumes that the data is stored in files and will be distributed to all storage-enabled members of a cluster.
- Retrieve the set of storage-enabled members.
For example, the following method uses the getStorageEnabledMembers method to retrieve the storage-enabled members of a distributed cache.
protected Set getStorageMembers(NamedCache cache)
{
return ((DistributedCacheService) cache.getCacheService())
.getStorageEnabledMembers();
}
- Divide the work among the storage enabled cluster members.
For example, the following routine returns a map, keyed by member, containing a list of files assigned to that member.
protected Map<Member, List<String>> divideWork(Set members, List<String> fileNames)
{
Iterator i = members.iterator();
Map<Member, List<String>> mapWork = new HashMap(members.size());
for (String sFileName : fileNames)
{
Member member = (Member) i.next();
List<String> memberFileNames = mapWork.get(member);
if (memberFileNames == null)
{
memberFileNames = new ArrayList();
mapWork.put(member, memberFileNames);
}
memberFileNames.add(sFileName);
if (!i.hasNext())
{
i = members.iterator();
}
}
return mapWork;
}
- Launch a task that will perform the load on each member.
For example, use Coherence's InvocationService to launch the task. In this case, the implementation of LoaderInvocable will need to iterate through memberFileNames and process each file, loading its contents into the cache. The cache operations normally performed on the client will need to be executed through the LoaderInvocable.
public void load()
{
NamedCache cache = getCache();
Set members = getStorageMembers(cache);
List<String> fileNames = getFileNames();
Map<Member, List<String>> mapWork = divideWork(members, fileNames);
InvocationService service = (InvocationService)
CacheFactory.getService("InvocationService");
for (Map.Entry<Member, List<String>> entry : mapWork.entrySet())
{
Member member = entry.getKey();
List<String> memberFileNames = entry.getValue();
LoaderInvocable task = new LoaderInvocable(memberFileNames, cache.getCacheName());
service.execute(task, Collections.singleton(member), this);
}
}
Running a Distributed Bulk Loading Example
The examples in the previous section are taken from DistributedLoader.java, which is included in the attached zip file, coherence-example-distributedload.zip. This sample application uses the InvocationService to distribute the task of loading data into a cache. Each storage-enabled member of the cluster will be responsible for loading a portion of the data. The data in this example will come from several CSV files and the unit of work is one file. All storage-enabled nodes must have access to all of the data files.
To build and run the example, you must have the following software installed:
- J2SE SDK 1.4 or later
- Apache Ant
- Oracle Coherence
Building the Sample Application
- Extract the contents of {{coherence-example-distributedload.zip)) into your file system.
- Update the bin\set-env.cmd file to reflect your system environment.
- Open a command prompt and execute the following command in the bin directory to build the samples:
C:\distributedLoad\bin\ant.cmd build
After running the samples, you can completely remove all build artifacts from your file system by running the clean command:
C:\distributedLoad\bin\ant.cmd clean
Running the Sample Application
- Start multiple cache servers (from the bin directory):
C:\distributedLoad\bin\server.cmd
- Run the client loader (from the bin directory):
C:\distributedLoad\bin\load.cmd
After entering load.cmd on the command line, you will messages indicating that the various members are joining the services. Then you will see messages that indicate that the date is being distributed among the members. In this example, four cache servers were started.
...
Member(Id=1, Timestamp=2008-09-15 16:49:04.359, Address=ip_address:8088, Mac
hineId=49690, Location=site:us.oracle.com,machine:machine_name,process:21952, Rol
e=CoherenceServer)
Member(Id=2, Timestamp=2008-09-15 16:49:50.25, Address=ip_address:8089, Mach
ineId=49690, Location=site:us.oracle.com,machine:machine_name,process:16604, Role
=CoherenceServer)
Member(Id=3, Timestamp=2008-09-15 16:49:54.937, Address=ip_address:8090, Mac
hineId=49690, Location=site:us.oracle.com,machine:machine_name,process:7344, Role
=CoherenceServer)
Member(Id=4, Timestamp=2008-09-15 16:49:58.734, Address=ip_address:8091, Mac
hineId=49690, Location=site:us.oracle.com,machine:machine_name,process:19052, Role
=CoherenceServer)
)
2008-09-15 16:51:00.593/4.890 Oracle Coherence GE 3.4/405
(thread=main, member=5): Loading stock file names from '..\data'
2008-09-15 16:51:00.593/4.890 Oracle Coherence GE 3.4/405
(thread=main, member=5): Files to load: [..\data\AAPL.CSV, ..\data\BT.CSV, ..
\data\DELL.CSV, ..\data\GOOG.CSV, ..\data\HPQ.CSV, ..\data\JAVA.CSV, ..\data\MSF
T.CSV, ..\data\ORCL.CSV, ..\data\YHOO.CSV]
2008-09-15 16:51:00.593/4.890 Oracle Coherence GE 3.4/405
(thread=main, member=5): Invoking load on member(Id=2) for files [..\data\BT.
CSV, ..\data\JAVA.CSV]
2008-09-15 16:51:00.640/4.937 Oracle Coherence GE 3.4/405
(thread=main, member=5): Invoking load on member(Id=3) for files [..\data\DEL
L.CSV, ..\data\MSFT.CSV]
2008-09-15 16:51:00.750/5.047 Oracle Coherence GE 3.4/405
(thread=main, member=5): Invoking load on member(Id=4) for files [..\data\GOO
G.CSV, ..\data\ORCL.CSV]
2008-09-15 16:51:00.781/5.078 Oracle Coherence GE 3.4/405
(thread=main, member=5): Invoking load on member(Id=1) for files [..\data\AAP
L.CSV, ..\data\HPQ.CSV, ..\data\YHOO.CSV]
2008-09-15 16:51:27.500/31.797 Oracle Coherence GE 3.4/405 <
D5>
(thread=Invocation:InvocationService, member=5): Finished loading on member:
Member(Id=4, Timestamp=2008-09-15 16:49:58.734, Address=ip_address:8091, Mach
ineId=49690, Location=site:us.oracle.com,machine:machine_name,process:19052, Role
=CoherenceServer)
2008-09-15 16:51:31.640/35.937 Oracle Coherence GE 3.4/405 <
D5>
(thread=Invocation:InvocationService, member=5): Finished loading on member:
Member(Id=2, Timestamp=2008-09-15 16:49:50.25, Address=ip_address:8089, Machi
neId=49690, Location=site:us.oracle.com,machine:machine_name,process:16604, Role=
CoherenceServer)
2008-09-15 16:51:32.812/37.109 Oracle Coherence GE 3.4/405 <
D5>
(thread=Invocation:InvocationService, member=5): Finished loading on member:
Member(Id=3, Timestamp=2008-09-15 16:49:54.937, Address=ip_address:8090, Mach
ineId=49690, Location=site:us.oracle.com,machine:machine_name,process:7344, Role=
CoherenceServer)
2008-09-15 16:51:37.750/42.047 Oracle Coherence GE 3.4/405 <
D5>
(thread=Invocation:InvocationService, member=5): Finished loading on member:
Member(Id=1, Timestamp=2008-09-15 16:49:04.359, Address=ip_address:8088, Mach
ineId=49690, Location=site:us.oracle.com,machine:machine_name,process:21952, Role
=CoherenceServer)
2008-09-15 16:51:37.796/42.093 Oracle Coherence GE 3.4/405 <
D5>
(thread=main, member=5): Load finished in 37.20 secs
2008-09-15 16:51:37.812/42.109 Oracle Coherence GE 3.4/405 <
D5> (thread=main, member=5): Final cache size: 47131
C:\distributedload\bin>