View Javadoc

1   /**
2    *  Copyright 2003-2006 Greg Luck
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  
17  package net.sf.ehcache.distribution;
18  
19  import net.sf.ehcache.Cache;
20  import net.sf.ehcache.CacheException;
21  import net.sf.ehcache.Element;
22  import net.sf.ehcache.Status;
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  
26  import java.io.Serializable;
27  import java.lang.ref.SoftReference;
28  import java.rmi.UnmarshalException;
29  import java.util.ArrayList;
30  import java.util.LinkedList;
31  import java.util.List;
32  
33  /**
34   * Listens to {@link net.sf.ehcache.CacheManager} and {@link net.sf.ehcache.Cache} events and propagates those to
35   * {@link CachePeer} peers of the Cache asynchronously.
36   * <p/>
37   * Updates are guaranteed to be replicated in the order in which they are received.
38   * <p/>
39   * While much faster in operation than {@link RMISynchronousCacheReplicator}, it does suffer from a number
40   * of problems. Elements, which may be being spooled to DiskStore may stay around in memory because references
41   * are being held to them from {@link EventMessage}s which are queued up. The replication thread runs once
42   * per second, limiting the build up. However a lot of elements can be put into a cache in that time. We do not want
43   * to get an {@link OutOfMemoryError} using distribution in circumstances when it would not happen if we were
44   * just using the DiskStore.
45   * <p/>
46   * Accordingly, {@link EventMessage}s are held by {@link SoftReference} in the queue,
47   * so that they can be discarded if required by the GC to avoid an {@link OutOfMemoryError}. A log message
48   * will be issued on each flush of the queue if there were any forced discards. One problem with GC collection
49   * of SoftReferences is that the VM (JDK1.5 anyway) will do that rather than grow the heap size to the maximum.
50   * The workaround is to either set minimum heap size to the maximum heap size to force heap allocation at start
51   * up, or put up with a few lost messages while the heap grows.
52   *
53   * @author Greg Luck
54   * @version $Id: RMIAsynchronousCacheReplicator.java 141 2006-06-30 01:56:32Z gregluck $
55   */
56  public final class RMIAsynchronousCacheReplicator extends RMISynchronousCacheReplicator {
57  
58      /**
59       * The amount of time the replication thread sleeps after it detects the replicationQueue is empty
60       * before checking again.
61       */
62      protected static final int REPLICATION_THREAD_INTERVAL = 1000;
63  
64      private static final Log LOG = LogFactory.getLog(RMIAsynchronousCacheReplicator.class.getName());
65  
66      /**
67       * A thread which handles replication, so that replication can take place asynchronously and not hold up the cache
68       */
69      protected final Thread replicationThread = new ReplicationThread();
70  
71      /**
72       * A queue of updates.
73       */
74      protected final List replicationQueue = new LinkedList();
75  
76      /**
77       * Constructor for internal and subclass use
78       *
79       * @param replicatePuts
80       * @param replicateUpdates
81       * @param replicateUpdatesViaCopy
82       * @param replicateRemovals
83       */
84      protected RMIAsynchronousCacheReplicator(
85              boolean replicatePuts,
86              boolean replicateUpdates,
87              boolean replicateUpdatesViaCopy,
88              boolean replicateRemovals) {
89          super(replicatePuts,
90                  replicateUpdates,
91                  replicateUpdatesViaCopy,
92                  replicateRemovals);
93          status = Status.STATUS_ALIVE;
94          replicationThread.start();
95      }
96  
97      /**
98       * Main method for the replicationQueue thread.
99       * <p/>
100      * Note that the replicationQueue thread locks the cache for the entire time it is writing elements to the disk.
101      */
102     private void replicationThreadMain() {
103         while (true) {
104             // Wait for elements in the replicationQueue
105             while (alive() && replicationQueue != null && replicationQueue.size() == 0) {
106                 try {
107                     Thread.sleep(REPLICATION_THREAD_INTERVAL);
108                 } catch (InterruptedException e) {
109                     LOG.debug("Spool Thread interrupted.");
110                     return;
111                 }
112             }
113             if (notAlive()) {
114                 return;
115             }
116             try {
117                 if (replicationQueue.size() != 0) {
118                     flushReplicationQueue();
119                 }
120             } catch (Throwable e) {
121                 LOG.warn("Exception on flushing of replication queue: " + e.getMessage()
122                         + ". Continuing...", e);
123             }
124         }
125     }
126 
127 
128     /**
129      * {@inheritDoc}
130      * <p/>
131      * This implementation queues the put notification for in-order replication to peers.
132      *
133      * @param cache   the cache emitting the notification
134      * @param element the element which was just put into the cache.
135      */
136     public final void notifyElementPut(final Cache cache, final Element element) throws CacheException {
137         if (notAlive()) {
138             return;
139         }
140 
141         if (!replicatePuts) {
142             return;
143         }
144 
145         if (!element.isSerializable()) {
146             if (LOG.isWarnEnabled()) {
147                 LOG.warn("Object with key " + element.getObjectKey() + " is not Serializable and cannot be replicated");
148             }
149             return;
150         }
151 
152         addToReplicationQueue(new CacheEventMessage(EventMessage.PUT, cache, element, null));
153     }
154 
155     /**
156      * Called immediately after an element has been put into the cache and the element already
157      * existed in the cache. This is thus an update.
158      * <p/>
159      * The {@link net.sf.ehcache.Cache#put(net.sf.ehcache.Element)} method
160      * will block until this method returns.
161      * <p/>
162      * Implementers may wish to have access to the Element's fields, including value, so the element is provided.
163      * Implementers should be careful not to modify the element. The effect of any modifications is undefined.
164      *
165      * @param cache   the cache emitting the notification
166      * @param element the element which was just put into the cache.
167      */
168     public final void notifyElementUpdated(final Cache cache, final Element element) throws CacheException {
169         if (notAlive()) {
170             return;
171         }
172         if (!replicateUpdates) {
173             return;
174         }
175 
176         if (replicateUpdatesViaCopy) {
177             if (!element.isSerializable()) {
178                 if (LOG.isWarnEnabled()) {
179                     LOG.warn("Object with key " + element.getObjectKey() + " is not Serializable and cannot be updated via copy");
180                 }
181                 return;
182             }
183             addToReplicationQueue(new CacheEventMessage(EventMessage.PUT, cache, element, null));
184         } else {
185             if (!element.isKeySerializable()) {
186                 if (LOG.isWarnEnabled()) {
187                     LOG.warn("Key " + element.getObjectKey() + " is not Serializable and cannot be replicated.");
188                 }
189                 return;
190             }
191             addToReplicationQueue(new CacheEventMessage(EventMessage.REMOVE, cache, null, element.getKey()));
192         }
193     }
194 
195     /**
196      * Called immediately after an element has been removed. The remove method will block until
197      * this method returns.
198      * <p/>
199      * This implementation queues the removal notification for in order replication to peers.
200      *
201      * @param cache   the cache emitting the notification
202      * @param element just deleted
203      */
204     public final void notifyElementRemoved(final Cache cache, final Element element) throws CacheException {
205         if (!replicateRemovals) {
206             return;
207         }
208 
209         if (!element.isKeySerializable()) {
210             if (LOG.isWarnEnabled()) {
211                 LOG.warn("Key " + element.getObjectKey() + " is not Serializable and cannot be replicated.");
212             }
213             return;
214         }
215         addToReplicationQueue(new CacheEventMessage(EventMessage.REMOVE, cache, null, element.getKey()));
216     }
217 
218     /**
219      * Adds a message to the queue.
220      * <p/>
221      * This method checks the state of the replication thread and warns
222      * if it has stopped and then discards the message.
223      *
224      * @param cacheEventMessage
225      */
226     protected void addToReplicationQueue(CacheEventMessage cacheEventMessage) {
227         if (!replicationThread.isAlive()) {
228             LOG.error("CacheEventMessages cannot be added to the replication queue"
229                     + " because the replication thread has died.");
230         } else {
231             synchronized (replicationQueue) {
232                 replicationQueue.add(cacheEventMessage);
233             }
234         }
235     }
236 
237 
238     /**
239      * Gets called once per {@link #REPLICATION_THREAD_INTERVAL}.
240      * <p/>
241      * Sends accumulated messages in bulk to each peer. i.e. if ther are 100 messages and 1 peer,
242      * 1 RMI invocation results, not 100. Also, if a peer is unavailable this is discovered in only 1 try.
243      * <p/>
244      * Makes a copy of the queue so as not to hold up the enqueue operations.
245      * <p/>
246      * Any exceptions are caught so that the replication thread does not die, and because errors are expected,
247      * due to peers becoming unavailable.
248      * <p/>
249      * This method issues warnings for problems that can be fixed with configuration changes.
250      */
251     private void flushReplicationQueue() {
252         List replicationQueueCopy;
253         synchronized (replicationQueue) {
254             if (replicationQueue.size() == 0) {
255                 return;
256             }
257 
258             replicationQueueCopy = new ArrayList(replicationQueue.size());
259             for (int i = 0; i < replicationQueue.size(); i++) {
260                 CacheEventMessage cacheEventMessage = (CacheEventMessage) replicationQueue.get(i);
261                 replicationQueueCopy.add(cacheEventMessage);
262             }
263             replicationQueue.clear();
264         }
265 
266 
267         Cache cache = ((CacheEventMessage) replicationQueueCopy.get(0)).cache;
268         List cachePeers = listRemoteCachePeers(cache);
269 
270         List resolvedEventMessages = extractAndResolveEventMessages(replicationQueueCopy);
271 
272 
273         for (int j = 0; j < cachePeers.size(); j++) {
274             CachePeer cachePeer = (CachePeer) cachePeers.get(j);
275             try {
276                 cachePeer.send(resolvedEventMessages);
277             } catch (UnmarshalException e) {
278                 String message = e.getMessage();
279                 if (message.indexOf("Read time out") != 0) {
280                     LOG.warn("Unable to send message to remote peer due to socket read timeout. Consider increasing" +
281                             " the socketTimeoutMillis setting in the cacheManagerPeerListenerFactory. " +
282                             "Message was: " + e.getMessage());
283                 } else {
284                     LOG.debug("Unable to send message to remote peer.  Message was: " + e.getMessage());
285                 }
286             } catch (Throwable t) {
287                 LOG.debug("Unable to send message to remote peer.  Message was: " + t.getMessage());
288             }
289         }
290         if (LOG.isWarnEnabled()) {
291             int eventMessagesNotResolved = replicationQueueCopy.size() - resolvedEventMessages.size();
292             if (eventMessagesNotResolved > 0) {
293                 LOG.warn(eventMessagesNotResolved + " messages were discarded on replicate due to reclamation of " +
294                         "SoftReferences by the VM. Consider increasing the maximum heap size and/or setting the " +
295                         "starting heap size to a higher value.");
296             }
297 
298         }
299     }
300 
301     /**
302      * Extracts CacheEventMessages and attempts to get a hard reference to the underlying EventMessage
303      *
304      * @param replicationQueueCopy
305      * @return a list of EventMessages which were able to be resolved
306      */
307     private static List extractAndResolveEventMessages(List replicationQueueCopy) {
308         List list = new ArrayList();
309         for (int i = 0; i < replicationQueueCopy.size(); i++) {
310             EventMessage eventMessage = ((CacheEventMessage) replicationQueueCopy.get(i)).getEventMessage();
311             if (eventMessage != null) {
312                 list.add(eventMessage);
313             }
314         }
315         return list;
316     }
317 
318     /**
319      * A background daemon thread that writes objects to the file.
320      */
321     private final class ReplicationThread extends Thread {
322         public ReplicationThread() {
323             super("Replication Thread");
324             setDaemon(true);
325             setPriority(2);
326         }
327 
328         /**
329          * Main thread method.
330          */
331         public final void run() {
332             replicationThreadMain();
333         }
334     }
335 
336     /**
337      * A wrapper around an EventMessage, which enables the element to enqueued along with
338      * what is to be done with it.
339      * <p/>
340      * The wrapper holds a {@link SoftReference} to the {@link EventMessage}, so that the queue is never
341      * the cause of an {@link OutOfMemoryError}
342      */
343     private static final class CacheEventMessage {
344 
345         private final Cache cache;
346         private final SoftReference softEventMessage;
347 
348         public CacheEventMessage(int event, Cache cache, Element element, Serializable key) {
349             EventMessage eventMessage = new EventMessage(event, key, element);
350             softEventMessage = new SoftReference(eventMessage);
351             this.cache = cache;
352         }
353 
354         /**
355          * Gets the component EventMessage
356          */
357         public final EventMessage getEventMessage() {
358             return (EventMessage) softEventMessage.get();
359         }
360 
361     }
362 
363     /**
364      * Give the replicator a chance to cleanup and free resources when no longer needed
365      */
366     public final void dispose() {
367         status = Status.STATUS_SHUTDOWN;
368         synchronized (replicationQueue) {
369             replicationQueue.clear();
370         }
371 
372     }
373 
374 
375 }