PRODUCTS AND SERVICES INDUSTRIES SUPPORT PARTNERS COMMUNITIES ABOUT
  Coherence 3.4 User Guide
  Pre-Loading the Cache
Added by Patrick Peralta, last edited by Tom Pfaeffle on Sep 17, 2008  (view change)

Labels

 

This section describes different patterns you can use to pre-load the cache. The patterns include bulk loading and distributed loading.

Performing Bulk Loading and Processing

The attached example, PagedQuery.java demonstrates techniques for efficiently bulk loading and processing items in a Coherence Cache.

Bulk Writing to a Cache

A common scenario when using Coherence is to pre-populate a cache before the application makes use of it. A simple way to do this is illustrated by the following Java code:

public static void bulkLoad(NamedCache cache, Connection conn)
    {
    Statement s;
    ResultSet rs;
    
    try
        {
        s = conn.createStatement();
        rs = s.executeQuery("select key, value from table");
        while (rs.next())
            {
            Integer key   = new Integer(rs.getInt(1));
            String  value = rs.getString(2);
            cache.put(key, value);
            }
        ...
        }
    catch (SQLException e)
        {...}
    }

This technique works, but each call to put may result in network traffic, especially for partitioned and replicated caches. Additionally, each call to put will return the object it just replaced in the cache (per the java.util.Map interface) which will add more unnecessary overhead. Loading the cache can be made much more efficient by using the ConcurrentMap.putAll method instead:

public static void bulkLoad(NamedCache cache, Connection conn)
    {
    Statement s;
    ResultSet rs;
    Map       buffer = new HashMap();

    try
        {
        int count = 0;
        s = conn.createStatement();
        rs = s.executeQuery("select key, value from table");
        while (rs.next())
            {
            Integer key   = new Integer(rs.getInt(1));
            String  value = rs.getString(2);
            buffer.put(key, value);

            // this loads 1000 items at a time into the cache
            if ((count++ % 1000) == 0)
                {
                cache.putAll(buffer);
                buffer.clear();
                }
            }
        if (!buffer.isEmpty())
            {
            cache.putAll(buffer);
            }
        ...
        }
    catch (SQLException e)
        {...}
    }

Efficient processing of filter results

Coherence provides the ability to query caches based on criteria via the filter API. Here is an example (given entries with integers as keys and strings as values):

NamedCache c = CacheFactory.getCache("test");

// Search for entries that start with 'c'
Filter query = new LikeFilter(IdentityExtractor.INSTANCE, "c%", '\\', true);

// Perform query, return all entries that match
Set results = c.entrySet(query);
for (Iterator i = results.iterator(); i.hasNext();)
    {
    Map.Entry e = (Map.Entry) i.next();
    out("key: "+e.getKey() + ", value: "+e.getValue());
    }

This example works for small data sets, but it may encounter problems if the data set is too large, such as running out of heap space. Here is a pattern to process query results in batches to avoid this problem:

public static void performQuery()
    {
    NamedCache c = CacheFactory.getCache("test");

    // Search for entries that start with 'c'
    Filter query = new LikeFilter(IdentityExtractor.INSTANCE, "c%", '\\', true);

    // Perform query, return keys of entries that match
    Set keys = c.keySet(query);

    // The amount of objects to process at a time
    final int BUFFER_SIZE = 100;

    // Object buffer
    Set buffer = new HashSet(BUFFER_SIZE);

    for (Iterator i = keys.iterator(); i.hasNext();)
        {
        buffer.add(i.next());

        if (buffer.size() >= BUFFER_SIZE)
            {
            // Bulk load BUFFER_SIZE number of objects from cache
            Map entries = c.getAll(buffer);

            // Process each entry
            process(entries);

            // Done processing these keys, clear buffer
            buffer.clear();
            }
        }
        // Handle the last partial chunk (if any)
        if (!buffer.isEmpty())
            {
            process(c.getAll(buffer));
            }
    }

public static void process(Map map)
    {
    for (Iterator ie = map.entrySet().iterator(); ie.hasNext();)
        {

        Map.Entry e = (Map.Entry) ie.next();
        out("key: "+e.getKey() + ", value: "+e.getValue());
        }
    }

In this example, all keys for entries that match the filter are returned, but only BUFFER_SIZE (in this case, 100) entries are retrieved from the cache at a time.

Note that LimitFilter can be used to process results in parts, similar to the example above. However LimitFilter is meant for scenarios where the results will be paged, such as in a user interface. It is not an efficient means to process all data in a query result.

A Bulk Loading and Processing Example

Here is an example program that demonstrates the concepts described above.

Sample Bulk Loading Program and Output

To run the example, follow these steps:

  1. Save the following Java file as com/tangosol/examples/PagedQuery.java
  2. Point the classpath to the Coherence libraries and the current directory
  3. Compile and run the example

Performing Distributed Bulk Loading

When pre-populating a Coherence partitioned cache with a large data set, it may be more efficient to distribute the work to Coherence cluster members. Distributed loading will allow for higher data throughput rates to the cache by leveraging the aggregate network bandwidth and CPU power of the cluster. When performing a distributed load, the application will need to decide on the following:

  • which cluster members will perform the load
  • how to divide the data set among the members

The application should take into account the load that will be placed on the underlying data source (such as a database or file system) when selecting members and dividing work. For example, a single database can easily be overwhelmed if too many members execute queries concurrently.

A Distributed Bulk Loading Example

This section outlines the general steps to perform a simple distributed load. The example assumes that the data is stored in files and will be distributed to all storage-enabled members of a cluster.

  1. Retrieve the set of storage-enabled members.
     
    For example, the following method uses the getStorageEnabledMembers method to retrieve the storage-enabled members of a distributed cache.
     
    protected Set getStorageMembers(NamedCache cache)
            {
            return ((DistributedCacheService) cache.getCacheService())
                    .getStorageEnabledMembers();
            }

     

  2. Divide the work among the storage enabled cluster members.
     
    For example, the following routine returns a map, keyed by member, containing a list of files assigned to that member.
     
    protected Map<Member, List<String>> divideWork(Set members, List<String> fileNames)
            {
            Iterator i = members.iterator();
            Map<Member, List<String>> mapWork = new HashMap(members.size());
            for (String sFileName : fileNames)
                {
                Member member = (Member) i.next();
                List<String> memberFileNames = mapWork.get(member);
                if (memberFileNames == null)
                    {
                    memberFileNames = new ArrayList();
                    mapWork.put(member, memberFileNames);
                    }
                memberFileNames.add(sFileName);
    
                // recycle through the members
                if (!i.hasNext())
                    {
                    i = members.iterator();
                    }
                }
            return mapWork;
            }

     

  3. Launch a task that will perform the load on each member.
     
    For example, use Coherence's InvocationService to launch the task. In this case, the implementation of LoaderInvocable will need to iterate through memberFileNames and process each file, loading its contents into the cache. The cache operations normally performed on the client will need to be executed through the LoaderInvocable.
     
    public void load()
            {
            NamedCache cache = getCache();
    
            Set members = getStorageMembers(cache);
    
            List<String> fileNames = getFileNames();
    
            Map<Member, List<String>> mapWork = divideWork(members, fileNames);
    
            InvocationService service = (InvocationService)
                    CacheFactory.getService("InvocationService");        
    
            for (Map.Entry<Member, List<String>> entry : mapWork.entrySet())
                {
                Member member = entry.getKey();
                List<String> memberFileNames = entry.getValue();
    
                LoaderInvocable task = new LoaderInvocable(memberFileNames, cache.getCacheName());
                service.execute(task, Collections.singleton(member), this);
                }
            }

Running a Distributed Bulk Loading Example

The examples in the previous section are taken from DistributedLoader.java, which is included in the attached zip file, coherence-example-distributedload.zip. This sample application uses the InvocationService to distribute the task of loading data into a cache. Each storage-enabled member of the cluster will be responsible for loading a portion of the data. The data in this example will come from several CSV files and the unit of work is one file. All storage-enabled nodes must have access to all of the data files.

To build and run the example, you must have the following software installed:

  • J2SE SDK 1.4 or later
  • Apache Ant
  • Oracle Coherence

Building the Sample Application

  1. Extract the contents of {{coherence-example-distributedload.zip)) into your file system.
     
  2. Update the bin\set-env.cmd file to reflect your system environment.
     
  3. Open a command prompt and execute the following command in the bin directory to build the samples:
    
    C:\distributedLoad\bin\ant.cmd build
    
    

     
    After running the samples, you can completely remove all build artifacts from your file system by running the clean command:

    
    C:\distributedLoad\bin\ant.cmd clean
    
    

Running the Sample Application

  1. Start multiple cache servers (from the bin directory):
    
    C:\distributedLoad\bin\server.cmd
    
    
  2. Run the client loader (from the bin directory):
    
    C:\distributedLoad\bin\load.cmd
    
    

After entering load.cmd on the command line, you will messages indicating that the various members are joining the services. Then you will see messages that indicate that the date is being distributed among the members. In this example, four cache servers were started.


...
  Member(Id=1, Timestamp=2008-09-15 16:49:04.359, Address=ip_address:8088, Mac
hineId=49690, Location=site:us.oracle.com,machine:machine_name,process:21952, Rol
e=CoherenceServer)
  Member(Id=2, Timestamp=2008-09-15 16:49:50.25, Address=ip_address:8089, Mach
ineId=49690, Location=site:us.oracle.com,machine:machine_name,process:16604, Role
=CoherenceServer)
  Member(Id=3, Timestamp=2008-09-15 16:49:54.937, Address=ip_address:8090, Mac
hineId=49690, Location=site:us.oracle.com,machine:machine_name,process:7344, Role
=CoherenceServer)
  Member(Id=4, Timestamp=2008-09-15 16:49:58.734, Address=ip_address:8091, Mac
hineId=49690, Location=site:us.oracle.com,machine:machine_name,process:19052, Role
=CoherenceServer)
  )
2008-09-15 16:51:00.593/4.890 Oracle Coherence GE 3.4/405  
(thread=main, member=5): Loading stock file names from '..\data'
2008-09-15 16:51:00.593/4.890 Oracle Coherence GE 3.4/405  
(thread=main, member=5): Files to load: [..\data\AAPL.CSV, ..\data\BT.CSV, ..
\data\DELL.CSV, ..\data\GOOG.CSV, ..\data\HPQ.CSV, ..\data\JAVA.CSV, ..\data\MSF
T.CSV, ..\data\ORCL.CSV, ..\data\YHOO.CSV]
2008-09-15 16:51:00.593/4.890 Oracle Coherence GE 3.4/405  
(thread=main, member=5): Invoking load on member(Id=2) for files [..\data\BT.
CSV, ..\data\JAVA.CSV]
2008-09-15 16:51:00.640/4.937 Oracle Coherence GE 3.4/405  
(thread=main, member=5): Invoking load on member(Id=3) for files [..\data\DEL
L.CSV, ..\data\MSFT.CSV]
2008-09-15 16:51:00.750/5.047 Oracle Coherence GE 3.4/405  
(thread=main, member=5): Invoking load on member(Id=4) for files [..\data\GOO
G.CSV, ..\data\ORCL.CSV]
2008-09-15 16:51:00.781/5.078 Oracle Coherence GE 3.4/405  
(thread=main, member=5): Invoking load on member(Id=1) for files [..\data\AAP
L.CSV, ..\data\HPQ.CSV, ..\data\YHOO.CSV]
2008-09-15 16:51:27.500/31.797 Oracle Coherence GE 3.4/405 <
D5> 
(thread=Invocation:InvocationService, member=5): Finished loading on member:
 Member(Id=4, Timestamp=2008-09-15 16:49:58.734, Address=ip_address:8091, Mach
ineId=49690, Location=site:us.oracle.com,machine:machine_name,process:19052, Role
=CoherenceServer)
2008-09-15 16:51:31.640/35.937 Oracle Coherence GE 3.4/405 <
D5> 
(thread=Invocation:InvocationService, member=5): Finished loading on member:
 Member(Id=2, Timestamp=2008-09-15 16:49:50.25, Address=ip_address:8089, Machi
neId=49690, Location=site:us.oracle.com,machine:machine_name,process:16604, Role=
CoherenceServer)
2008-09-15 16:51:32.812/37.109 Oracle Coherence GE 3.4/405 <
D5> 
(thread=Invocation:InvocationService, member=5): Finished loading on member:
 Member(Id=3, Timestamp=2008-09-15 16:49:54.937, Address=ip_address:8090, Mach
ineId=49690, Location=site:us.oracle.com,machine:machine_name,process:7344, Role=
CoherenceServer)
2008-09-15 16:51:37.750/42.047 Oracle Coherence GE 3.4/405 <
D5> 
(thread=Invocation:InvocationService, member=5): Finished loading on member:
 Member(Id=1, Timestamp=2008-09-15 16:49:04.359, Address=ip_address:8088, Mach
ineId=49690, Location=site:us.oracle.com,machine:machine_name,process:21952, Role
=CoherenceServer)
2008-09-15 16:51:37.796/42.093 Oracle Coherence GE 3.4/405 <
D5> 
(thread=main, member=5): Load finished in 37.20 secs
2008-09-15 16:51:37.812/42.109 Oracle Coherence GE 3.4/405 <
D5> (thread=main, member=5): Final cache size: 47131

C:\distributedload\bin>