1 /**
2 * Copyright 2003-2006 Greg Luck
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package net.sf.ehcache.distribution;
18
19 import net.sf.ehcache.Cache;
20 import net.sf.ehcache.CacheException;
21 import net.sf.ehcache.Element;
22 import net.sf.ehcache.Status;
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25
26 import java.io.Serializable;
27 import java.lang.ref.SoftReference;
28 import java.rmi.UnmarshalException;
29 import java.util.ArrayList;
30 import java.util.LinkedList;
31 import java.util.List;
32
33 /**
34 * Listens to {@link net.sf.ehcache.CacheManager} and {@link net.sf.ehcache.Cache} events and propagates those to
35 * {@link CachePeer} peers of the Cache asynchronously.
36 * <p/>
37 * Updates are guaranteed to be replicated in the order in which they are received.
38 * <p/>
39 * While much faster in operation than {@link RMISynchronousCacheReplicator}, it does suffer from a number
40 * of problems. Elements, which may be being spooled to DiskStore may stay around in memory because references
41 * are being held to them from {@link EventMessage}s which are queued up. The replication thread runs once
42 * per second, limiting the build up. However a lot of elements can be put into a cache in that time. We do not want
43 * to get an {@link OutOfMemoryError} using distribution in circumstances when it would not happen if we were
44 * just using the DiskStore.
45 * <p/>
46 * Accordingly, {@link EventMessage}s are held by {@link SoftReference} in the queue,
47 * so that they can be discarded if required by the GC to avoid an {@link OutOfMemoryError}. A log message
48 * will be issued on each flush of the queue if there were any forced discards. One problem with GC collection
49 * of SoftReferences is that the VM (JDK1.5 anyway) will do that rather than grow the heap size to the maximum.
50 * The workaround is to either set minimum heap size to the maximum heap size to force heap allocation at start
51 * up, or put up with a few lost messages while the heap grows.
52 *
53 * @author Greg Luck
54 * @version $Id: RMIAsynchronousCacheReplicator.java 141 2006-06-30 01:56:32Z gregluck $
55 */
56 public final class RMIAsynchronousCacheReplicator extends RMISynchronousCacheReplicator {
57
58 /**
59 * The amount of time the replication thread sleeps after it detects the replicationQueue is empty
60 * before checking again.
61 */
62 protected static final int REPLICATION_THREAD_INTERVAL = 1000;
63
64 private static final Log LOG = LogFactory.getLog(RMIAsynchronousCacheReplicator.class.getName());
65
66 /**
67 * A thread which handles replication, so that replication can take place asynchronously and not hold up the cache
68 */
69 protected final Thread replicationThread = new ReplicationThread();
70
71 /**
72 * A queue of updates.
73 */
74 protected final List replicationQueue = new LinkedList();
75
76 /**
77 * Constructor for internal and subclass use
78 *
79 * @param replicatePuts
80 * @param replicateUpdates
81 * @param replicateUpdatesViaCopy
82 * @param replicateRemovals
83 */
84 protected RMIAsynchronousCacheReplicator(
85 boolean replicatePuts,
86 boolean replicateUpdates,
87 boolean replicateUpdatesViaCopy,
88 boolean replicateRemovals) {
89 super(replicatePuts,
90 replicateUpdates,
91 replicateUpdatesViaCopy,
92 replicateRemovals);
93 status = Status.STATUS_ALIVE;
94 replicationThread.start();
95 }
96
97 /**
98 * Main method for the replicationQueue thread.
99 * <p/>
100 * Note that the replicationQueue thread locks the cache for the entire time it is writing elements to the disk.
101 */
102 private void replicationThreadMain() {
103 while (true) {
104
105 while (alive() && replicationQueue != null && replicationQueue.size() == 0) {
106 try {
107 Thread.sleep(REPLICATION_THREAD_INTERVAL);
108 } catch (InterruptedException e) {
109 LOG.debug("Spool Thread interrupted.");
110 return;
111 }
112 }
113 if (notAlive()) {
114 return;
115 }
116 try {
117 if (replicationQueue.size() != 0) {
118 flushReplicationQueue();
119 }
120 } catch (Throwable e) {
121 LOG.warn("Exception on flushing of replication queue: " + e.getMessage()
122 + ". Continuing...", e);
123 }
124 }
125 }
126
127
128 /**
129 * {@inheritDoc}
130 * <p/>
131 * This implementation queues the put notification for in-order replication to peers.
132 *
133 * @param cache the cache emitting the notification
134 * @param element the element which was just put into the cache.
135 */
136 public final void notifyElementPut(final Cache cache, final Element element) throws CacheException {
137 if (notAlive()) {
138 return;
139 }
140
141 if (!replicatePuts) {
142 return;
143 }
144
145 if (!element.isSerializable()) {
146 if (LOG.isWarnEnabled()) {
147 LOG.warn("Object with key " + element.getObjectKey() + " is not Serializable and cannot be replicated");
148 }
149 return;
150 }
151
152 addToReplicationQueue(new CacheEventMessage(EventMessage.PUT, cache, element, null));
153 }
154
155 /**
156 * Called immediately after an element has been put into the cache and the element already
157 * existed in the cache. This is thus an update.
158 * <p/>
159 * The {@link net.sf.ehcache.Cache#put(net.sf.ehcache.Element)} method
160 * will block until this method returns.
161 * <p/>
162 * Implementers may wish to have access to the Element's fields, including value, so the element is provided.
163 * Implementers should be careful not to modify the element. The effect of any modifications is undefined.
164 *
165 * @param cache the cache emitting the notification
166 * @param element the element which was just put into the cache.
167 */
168 public final void notifyElementUpdated(final Cache cache, final Element element) throws CacheException {
169 if (notAlive()) {
170 return;
171 }
172 if (!replicateUpdates) {
173 return;
174 }
175
176 if (replicateUpdatesViaCopy) {
177 if (!element.isSerializable()) {
178 if (LOG.isWarnEnabled()) {
179 LOG.warn("Object with key " + element.getObjectKey() + " is not Serializable and cannot be updated via copy");
180 }
181 return;
182 }
183 addToReplicationQueue(new CacheEventMessage(EventMessage.PUT, cache, element, null));
184 } else {
185 if (!element.isKeySerializable()) {
186 if (LOG.isWarnEnabled()) {
187 LOG.warn("Key " + element.getObjectKey() + " is not Serializable and cannot be replicated.");
188 }
189 return;
190 }
191 addToReplicationQueue(new CacheEventMessage(EventMessage.REMOVE, cache, null, element.getKey()));
192 }
193 }
194
195 /**
196 * Called immediately after an element has been removed. The remove method will block until
197 * this method returns.
198 * <p/>
199 * This implementation queues the removal notification for in order replication to peers.
200 *
201 * @param cache the cache emitting the notification
202 * @param element just deleted
203 */
204 public final void notifyElementRemoved(final Cache cache, final Element element) throws CacheException {
205 if (!replicateRemovals) {
206 return;
207 }
208
209 if (!element.isKeySerializable()) {
210 if (LOG.isWarnEnabled()) {
211 LOG.warn("Key " + element.getObjectKey() + " is not Serializable and cannot be replicated.");
212 }
213 return;
214 }
215 addToReplicationQueue(new CacheEventMessage(EventMessage.REMOVE, cache, null, element.getKey()));
216 }
217
218 /**
219 * Adds a message to the queue.
220 * <p/>
221 * This method checks the state of the replication thread and warns
222 * if it has stopped and then discards the message.
223 *
224 * @param cacheEventMessage
225 */
226 protected void addToReplicationQueue(CacheEventMessage cacheEventMessage) {
227 if (!replicationThread.isAlive()) {
228 LOG.error("CacheEventMessages cannot be added to the replication queue"
229 + " because the replication thread has died.");
230 } else {
231 synchronized (replicationQueue) {
232 replicationQueue.add(cacheEventMessage);
233 }
234 }
235 }
236
237
238 /**
239 * Gets called once per {@link #REPLICATION_THREAD_INTERVAL}.
240 * <p/>
241 * Sends accumulated messages in bulk to each peer. i.e. if ther are 100 messages and 1 peer,
242 * 1 RMI invocation results, not 100. Also, if a peer is unavailable this is discovered in only 1 try.
243 * <p/>
244 * Makes a copy of the queue so as not to hold up the enqueue operations.
245 * <p/>
246 * Any exceptions are caught so that the replication thread does not die, and because errors are expected,
247 * due to peers becoming unavailable.
248 * <p/>
249 * This method issues warnings for problems that can be fixed with configuration changes.
250 */
251 private void flushReplicationQueue() {
252 List replicationQueueCopy;
253 synchronized (replicationQueue) {
254 if (replicationQueue.size() == 0) {
255 return;
256 }
257
258 replicationQueueCopy = new ArrayList(replicationQueue.size());
259 for (int i = 0; i < replicationQueue.size(); i++) {
260 CacheEventMessage cacheEventMessage = (CacheEventMessage) replicationQueue.get(i);
261 replicationQueueCopy.add(cacheEventMessage);
262 }
263 replicationQueue.clear();
264 }
265
266
267 Cache cache = ((CacheEventMessage) replicationQueueCopy.get(0)).cache;
268 List cachePeers = listRemoteCachePeers(cache);
269
270 List resolvedEventMessages = extractAndResolveEventMessages(replicationQueueCopy);
271
272
273 for (int j = 0; j < cachePeers.size(); j++) {
274 CachePeer cachePeer = (CachePeer) cachePeers.get(j);
275 try {
276 cachePeer.send(resolvedEventMessages);
277 } catch (UnmarshalException e) {
278 String message = e.getMessage();
279 if (message.indexOf("Read time out") != 0) {
280 LOG.warn("Unable to send message to remote peer due to socket read timeout. Consider increasing" +
281 " the socketTimeoutMillis setting in the cacheManagerPeerListenerFactory. " +
282 "Message was: " + e.getMessage());
283 } else {
284 LOG.debug("Unable to send message to remote peer. Message was: " + e.getMessage());
285 }
286 } catch (Throwable t) {
287 LOG.debug("Unable to send message to remote peer. Message was: " + t.getMessage());
288 }
289 }
290 if (LOG.isWarnEnabled()) {
291 int eventMessagesNotResolved = replicationQueueCopy.size() - resolvedEventMessages.size();
292 if (eventMessagesNotResolved > 0) {
293 LOG.warn(eventMessagesNotResolved + " messages were discarded on replicate due to reclamation of " +
294 "SoftReferences by the VM. Consider increasing the maximum heap size and/or setting the " +
295 "starting heap size to a higher value.");
296 }
297
298 }
299 }
300
301 /**
302 * Extracts CacheEventMessages and attempts to get a hard reference to the underlying EventMessage
303 *
304 * @param replicationQueueCopy
305 * @return a list of EventMessages which were able to be resolved
306 */
307 private static List extractAndResolveEventMessages(List replicationQueueCopy) {
308 List list = new ArrayList();
309 for (int i = 0; i < replicationQueueCopy.size(); i++) {
310 EventMessage eventMessage = ((CacheEventMessage) replicationQueueCopy.get(i)).getEventMessage();
311 if (eventMessage != null) {
312 list.add(eventMessage);
313 }
314 }
315 return list;
316 }
317
318 /**
319 * A background daemon thread that writes objects to the file.
320 */
321 private final class ReplicationThread extends Thread {
322 public ReplicationThread() {
323 super("Replication Thread");
324 setDaemon(true);
325 setPriority(2);
326 }
327
328 /**
329 * Main thread method.
330 */
331 public final void run() {
332 replicationThreadMain();
333 }
334 }
335
336 /**
337 * A wrapper around an EventMessage, which enables the element to enqueued along with
338 * what is to be done with it.
339 * <p/>
340 * The wrapper holds a {@link SoftReference} to the {@link EventMessage}, so that the queue is never
341 * the cause of an {@link OutOfMemoryError}
342 */
343 private static final class CacheEventMessage {
344
345 private final Cache cache;
346 private final SoftReference softEventMessage;
347
348 public CacheEventMessage(int event, Cache cache, Element element, Serializable key) {
349 EventMessage eventMessage = new EventMessage(event, key, element);
350 softEventMessage = new SoftReference(eventMessage);
351 this.cache = cache;
352 }
353
354 /**
355 * Gets the component EventMessage
356 */
357 public final EventMessage getEventMessage() {
358 return (EventMessage) softEventMessage.get();
359 }
360
361 }
362
363 /**
364 * Give the replicator a chance to cleanup and free resources when no longer needed
365 */
366 public final void dispose() {
367 status = Status.STATUS_SHUTDOWN;
368 synchronized (replicationQueue) {
369 replicationQueue.clear();
370 }
371
372 }
373
374
375 }