1 /**
2 * Copyright 2003-2006 Greg Luck
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package net.sf.ehcache.distribution;
18
19 import junit.framework.TestCase;
20 import net.sf.ehcache.Cache;
21 import net.sf.ehcache.CacheException;
22 import net.sf.ehcache.CacheManager;
23 import net.sf.ehcache.Element;
24 import net.sf.ehcache.AbstractCacheTest;
25 import net.sf.ehcache.StopWatch;
26 import net.sf.ehcache.ThreadKiller;
27 import net.sf.ehcache.event.CountingCacheEventListener;
28
29 import java.io.Serializable;
30 import java.io.IOException;
31 import java.util.Arrays;
32 import java.util.Date;
33 import java.util.List;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37
38 /**
39 * Tests replication of Cache events
40 * <p/>
41 * Note these tests need a live network interface running in multicast mode to work
42 * <p/>
43 * If running involving RMIAsynchronousCacheReplicator individually the test will fail because
44 * the VM will gobble up the SoftReferences rather than allocating more memory. Uncomment the
45 * forceVMGrowth() method usage in setup.
46 *
47 * @author Greg Luck
48 * @version $Id: RMICacheReplicatorTest.java 141 2006-06-30 01:56:32Z gregluck $
49 */
50 public class RMICacheReplicatorTest extends TestCase {
51
52 private static final Log LOG = LogFactory.getLog(RMICacheReplicatorTest.class.getName());
53
54 private static final boolean ASYNCHRONOUS = true;
55 private static final boolean SYNCHRONOUS = false;
56
57 /**
58 * CacheManager 1 in the cluster
59 */
60 protected CacheManager manager1;
61 /**
62 * CacheManager 2 in the cluster
63 */
64 protected CacheManager manager2;
65 /**
66 * CacheManager 3 in the cluster
67 */
68 protected CacheManager manager3;
69 /**
70 * CacheManager 4 in the cluster
71 */
72 protected CacheManager manager4;
73 /**
74 * CacheManager 5 in the cluster
75 */
76 protected CacheManager manager5;
77 /**
78 * CacheManager 6 in the cluster
79 */
80 protected CacheManager manager6;
81
82 /**
83 * The name of the cache under test
84 */
85 protected String cacheName = "sampleCache1";
86 /**
87 * CacheManager 1 of 2s cache being replicated
88 */
89 protected Cache cache1;
90
91 /**
92 * CacheManager 2 of 2s cache being replicated
93 */
94 protected Cache cache2;
95
96
97 /**
98 * {@inheritDoc}
99 * Sets up two caches: cache1 is local. cache2 is to be receive updates
100 *
101 * @throws Exception
102 */
103 protected void setUp() throws Exception {
104 if (JVMUtil.isSingleRMIRegistryPerVM()) {
105 return;
106 }
107
108
109
110
111
112
113 CountingCacheEventListener.resetCounters();
114 manager1 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed1.xml");
115 manager2 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed2.xml");
116 manager3 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed3.xml");
117 manager4 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed4.xml");
118 manager5 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed5.xml");
119
120 manager1.getCache(cacheName).removeAll();
121
122 cache1 = manager1.getCache(cacheName);
123 cache1.removeAll();
124
125 cache2 = manager2.getCache(cacheName);
126 cache2.removeAll();
127
128
129 Thread.sleep(6000);
130
131 }
132
133 private void forceVMGrowth() {
134 byte[] forceVMGrowth = new byte[50000000];
135 }
136
137
138 /**
139 * {@inheritDoc}
140 *
141 * @throws Exception
142 */
143 protected void tearDown() throws Exception {
144
145 if (JVMUtil.isSingleRMIRegistryPerVM()) {
146 return;
147 }
148
149 if (manager1 != null) {
150 manager1.shutdown();
151 }
152 if (manager2 != null) {
153 manager2.shutdown();
154 }
155 if (manager3 != null) {
156 manager3.shutdown();
157 }
158 if (manager4 != null) {
159 manager4.shutdown();
160 }
161
162 if (manager5 != null) {
163 manager5.shutdown();
164 }
165
166 if (manager6 != null) {
167 manager6.shutdown();
168 }
169
170 }
171
172 /**
173 * 5 cache managers should means that each cache has four remote peers
174 */
175 public void testRemoteCachePeersEqualsNumberOfCacheManagersInCluster() {
176
177 if (JVMUtil.isSingleRMIRegistryPerVM()) {
178 return;
179 }
180
181
182 CacheManagerPeerProvider provider = manager1.getCachePeerProvider();
183 List remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
184 assertEquals(4, remotePeersOfCache1.size());
185 }
186
187 /**
188 * Does a new cache manager in the cluster get detected?
189 */
190 public void testRemoteCachePeersDetectsNewCacheManager() throws InterruptedException {
191
192 if (JVMUtil.isSingleRMIRegistryPerVM()) {
193 return;
194 }
195
196 CacheManagerPeerProvider provider = manager1.getCachePeerProvider();
197 List remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
198 assertEquals(4, remotePeersOfCache1.size());
199
200
201 manager6 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed6.xml");
202
203
204 Thread.sleep(1010);
205
206 remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
207 assertEquals(5, remotePeersOfCache1.size());
208 }
209
210 /**
211 * Does a down cache manager in the cluster get removed?
212 */
213 public void testRemoteCachePeersDetectsDownCacheManager() throws InterruptedException {
214
215 if (JVMUtil.isSingleRMIRegistryPerVM()) {
216 return;
217 }
218
219
220 CacheManagerPeerProvider provider = manager1.getCachePeerProvider();
221 List remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
222 assertEquals(4, remotePeersOfCache1.size());
223
224
225 manager5.shutdown();
226
227
228 Thread.sleep(11010);
229 remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
230
231
232 assertEquals(3, remotePeersOfCache1.size());
233 }
234
235 /**
236 * Does a down cache manager in the cluster get removed?
237 */
238 public void testRemoteCachePeersDetectsDownCacheManagerSlow() throws InterruptedException {
239
240 if (JVMUtil.isSingleRMIRegistryPerVM()) {
241 return;
242 }
243
244 CacheManagerPeerProvider provider = manager1.getCachePeerProvider();
245 List remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
246 assertEquals(4, remotePeersOfCache1.size());
247
248
249 manager5.shutdown();
250
251
252 remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
253 assertEquals(4, remotePeersOfCache1.size());
254 }
255
256 /**
257 * Tests put and remove initiated from cache1 in a cluster
258 * <p/>
259 * This test goes into an infinite loop if the chain of notifications is not somehow broken.
260 */
261 public void testPutProgagatesFromAndToEveryCacheManagerAndCache() throws CacheException, InterruptedException {
262
263 if (JVMUtil.isSingleRMIRegistryPerVM()) {
264 return;
265 }
266
267
268 String[] cacheNames = manager1.getCacheNames();
269 Arrays.sort(cacheNames);
270 for (int i = 0; i < cacheNames.length; i++) {
271 String name = cacheNames[i];
272 manager1.getCache(name).put(new Element("" + i, new Integer(i)));
273
274 manager1.getCache(name).put(new Element("nonSerializable" + i, new Object()));
275 }
276
277 waitForProgagate();
278
279 int count2 = 0;
280 int count3 = 0;
281 int count4 = 0;
282 int count5 = 0;
283 for (int i = 0; i < cacheNames.length; i++) {
284 String name = cacheNames[i];
285 Element element2 = manager2.getCache(name).get("" + i);
286 if (element2 != null) {
287 count2++;
288 }
289 Element element3 = manager3.getCache(name).get("" + i);
290 if (element3 != null) {
291 count3++;
292 }
293 Element element4 = manager4.getCache(name).get("" + i);
294 if (element4 != null) {
295 count4++;
296 }
297 Element element5 = manager5.getCache(name).get("" + i);
298 if (element5 != null) {
299 count5++;
300 }
301
302 }
303 assertEquals(55, count2);
304 assertEquals(55, count3);
305 assertEquals(55, count4);
306 assertEquals(55, count5);
307
308 }
309
310 /**
311 * Performance and capacity tests.
312 * <p/>
313 * The numbers given are for the remote peer tester (java -jar ehcache-test.jar ehcache-distributed1.xml)
314 * running on a 10Mbit ethernet network and are measured from the time the peer starts receiving to when
315 * it has fully received.
316 * <p/>
317 * r37 and earlier - initial implementation
318 * 38 seconds to get all notifications with 6 peers, 2000 Elements and 400 byte payload
319 * 18 seconds to get all notifications with 2 peers, 2000 Elements and 400 byte payload
320 * 40 seconds to get all notifications with 2 peers, 2000 Elements and 10k payload
321 * 22 seconds to get all notifications with 2 peers, 2000 Elements and 1k payload
322 * 26 seconds to get all notifications with 2 peers, 200 Elements and 100k payload
323 * <p/>
324 * r38 - RMI stub lookup on registration rather than at each lookup. Saves quite a few lookups. Also change to 5 second heartbeat
325 * 38 seconds to get 2000 notifications with 6 peers, Elements with 400 byte payload (1 second heartbeat)
326 * 16 seconds to get 2000 notifications with 6 peers, Elements with 400 byte payload (5 second heartbeat)
327 * 13 seconds to get 2000 notifications with 2 peers, Elements with 400 byte payload
328 * <p/>
329 * r39 - Batching asyn replicator. Send all queued messages in one RMI call once per second.
330 * 2 seconds to get 2000 notifications with 6 peers, Elements with 400 byte payload (5 second heartbeat)
331 */
332 public void testBigPutsProgagatesAsynchronous() throws CacheException, InterruptedException {
333
334 if (JVMUtil.isSingleRMIRegistryPerVM()) {
335 return;
336 }
337
338
339 StopWatch stopWatch = new StopWatch();
340 Integer index = null;
341 for (int i = 0; i < 2; i++) {
342 for (int j = 0; j < 1000; j++) {
343 index = new Integer(((1000 * i) + j));
344 cache1.put(new Element(index,
345 "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
346 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
347 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
348 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
349 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"));
350 }
351
352 }
353 long elapsed = stopWatch.getElapsedTime();
354 long putTime = ((elapsed / 1000));
355 LOG.info("Put Elapsed time: " + putTime);
356
357
358 assertEquals(2000, cache1.getSize());
359
360 Thread.sleep(2000);
361 assertEquals(2000, manager2.getCache("sampleCache1").getSize());
362 assertEquals(2000, manager3.getCache("sampleCache1").getSize());
363 assertEquals(2000, manager4.getCache("sampleCache1").getSize());
364 assertEquals(2000, manager5.getCache("sampleCache1").getSize());
365
366 }
367
368
369 /**
370 * Drive everything to point of breakage within a 64MB VM.
371 */
372 public void xTestHugePutsBreaksAsynchronous() throws CacheException, InterruptedException {
373
374 if (JVMUtil.isSingleRMIRegistryPerVM()) {
375 return;
376 }
377
378
379 StopWatch stopWatch = new StopWatch();
380 Integer index = null;
381 for (int i = 0; i < 500; i++) {
382 for (int j = 0; j < 1000; j++) {
383 index = new Integer(((1000 * i) + j));
384 cache1.put(new Element(index,
385 "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
386 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
387 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
388 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
389 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"));
390 }
391
392 }
393 long elapsed = stopWatch.getElapsedTime();
394 long putTime = ((elapsed / 1000));
395 LOG.info("Put Elapsed time: " + putTime);
396
397
398 assertEquals(100000, cache1.getSize());
399
400 Thread.sleep(100000);
401 assertEquals(20000, manager2.getCache("sampleCache1").getSize());
402 assertEquals(20000, manager3.getCache("sampleCache1").getSize());
403 assertEquals(20000, manager4.getCache("sampleCache1").getSize());
404 assertEquals(20000, manager5.getCache("sampleCache1").getSize());
405
406 }
407
408
409 /**
410 * Performance and capacity tests.
411 * <p/>
412 * The numbers given are for the remote peer tester (java -jar ehcache-test.jar ehcache-distributed1.xml)
413 * running on a 10Mbit ethernet network and are measured from the time the peer starts receiving to when
414 * it has fully received.
415 * <p/>
416 * 4 seconds to get all remove notifications with 6 peers, 5000 Elements and 400 byte payload
417 */
418 public void testBigRemovesProgagatesAsynchronous() throws CacheException, InterruptedException {
419
420 if (JVMUtil.isSingleRMIRegistryPerVM()) {
421 return;
422 }
423
424
425 Integer index = null;
426 for (int i = 0; i < 5; i++) {
427 for (int j = 0; j < 1000; j++) {
428 index = new Integer(((1000 * i) + j));
429 cache1.put(new Element(index,
430 "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
431 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
432 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
433 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
434 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"));
435 }
436
437 }
438 Thread.sleep(5000);
439 assertEquals(5000, cache1.getSize());
440 assertEquals(5000, manager2.getCache("sampleCache1").getSize());
441 assertEquals(5000, manager3.getCache("sampleCache1").getSize());
442 assertEquals(5000, manager4.getCache("sampleCache1").getSize());
443 assertEquals(5000, manager5.getCache("sampleCache1").getSize());
444
445
446 Thread.sleep(2000);
447
448 StopWatch stopWatch = new StopWatch();
449
450 for (int i = 0; i < 5; i++) {
451 for (int j = 0; j < 1000; j++) {
452 index = new Integer(((1000 * i) + j));
453 cache1.remove(index);
454 }
455 }
456
457
458 int timeForPropagate = 10000;
459
460 Thread.sleep(timeForPropagate);
461 assertEquals(0, cache1.getSize());
462 assertEquals(0, manager2.getCache("sampleCache1").getSize());
463 assertEquals(0, manager3.getCache("sampleCache1").getSize());
464 assertEquals(0, manager4.getCache("sampleCache1").getSize());
465 assertEquals(0, manager5.getCache("sampleCache1").getSize());
466
467 LOG.info("Remove Elapsed time: " + timeForPropagate);
468
469
470 }
471
472
473 /**
474 * Performance and capacity tests.
475 * <p/>
476 * 5 seconds to send all notifications synchronously with 5 peers, 2000 Elements and 400 byte payload
477 * The numbers given below are for the remote peer tester (java -jar ehcache-test.jar ehcache-distributed1.xml)
478 * running on a 10Mbit ethernet network and are measured from the time the peer starts receiving to when
479 * it has fully received.
480 */
481 public void testBigPutsProgagatesSynchronous() throws CacheException, InterruptedException {
482
483 if (JVMUtil.isSingleRMIRegistryPerVM()) {
484 return;
485 }
486
487
488 StopWatch stopWatch = new StopWatch();
489 Integer index;
490 for (int i = 0; i < 2; i++) {
491 for (int j = 0; j < 1000; j++) {
492 index = new Integer(((1000 * i) + j));
493 manager1.getCache("sampleCache3").put(new Element(index,
494 "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
495 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
496 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
497 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
498 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"));
499 }
500
501 }
502 long elapsed = stopWatch.getElapsedTime();
503 long putTime = ((elapsed / 1000));
504 LOG.info("Put and Propagate Synchronously Elapsed time: " + putTime + " seconds");
505
506 assertEquals(2000, manager1.getCache("sampleCache3").getSize());
507 assertEquals(2000, manager2.getCache("sampleCache3").getSize());
508 assertEquals(2000, manager3.getCache("sampleCache3").getSize());
509 assertEquals(2000, manager4.getCache("sampleCache3").getSize());
510 assertEquals(2000, manager5.getCache("sampleCache3").getSize());
511
512 }
513
514
515 /**
516 * Test various cache configurations for cache1 - explicit setting of:
517 * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
518 */
519 public void testPutWithExplicitReplicationConfig() throws InterruptedException {
520 if (JVMUtil.isSingleRMIRegistryPerVM()) {
521 return;
522 }
523 putTest(manager1.getCache("sampleCache1"), manager2.getCache("sampleCache1"), ASYNCHRONOUS);
524 }
525
526
527 /**
528 * Test various cache configurations for cache1 - explicit setting of:
529 * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
530 */
531 public void testPutWithThreadKiller() throws InterruptedException {
532 if (JVMUtil.isSingleRMIRegistryPerVM()) {
533 return;
534 }
535 putTestWithThreadKiller(manager1.getCache("sampleCache1"), manager2.getCache("sampleCache1"), ASYNCHRONOUS);
536 }
537
538 /**
539 * CacheEventListeners that are not CacheReplicators should receive cache events originated from receipt
540 * of a remote event by a CachePeer.
541 */
542 public void testRemotelyReceivedPutNotifiesCountingListener() throws InterruptedException {
543 if (JVMUtil.isSingleRMIRegistryPerVM()) {
544 return;
545 }
546 putTest(manager1.getCache("sampleCache1"), manager2.getCache("sampleCache1"), ASYNCHRONOUS);
547 assertEquals(1, CountingCacheEventListener.getCacheElementsPut(manager1.getCache("sampleCache1")).size());
548 assertEquals(1, CountingCacheEventListener.getCacheElementsPut(manager2.getCache("sampleCache1")).size());
549
550 }
551
552 /**
553 * Test various cache configurations for cache1 - explicit setting of:
554 * properties="replicateAsynchronously=false, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
555 */
556 public void testPutWithExplicitReplicationSynchronousConfig() throws InterruptedException {
557 if (JVMUtil.isSingleRMIRegistryPerVM()) {
558 return;
559 }
560 putTest(manager1.getCache("sampleCache3"), manager2.getCache("sampleCache3"), SYNCHRONOUS);
561 }
562
563
564 /**
565 * Test put replicated for cache4 - no properties.
566 * Defaults should be replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
567 */
568 public void testPutWithEmptyReplicationPropertiesConfig() throws InterruptedException {
569 if (JVMUtil.isSingleRMIRegistryPerVM()) {
570 return;
571 }
572 putTest(manager1.getCache("sampleCache4"), manager2.getCache("sampleCache4"), ASYNCHRONOUS);
573 }
574
575 /**
576 * Test put replicated for cache4 - missing replicatePuts property.
577 * replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
578 * should equal replicateAsynchronously=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
579 */
580 public void testPutWithOneMissingReplicationPropertyConfig() throws InterruptedException {
581 if (JVMUtil.isSingleRMIRegistryPerVM()) {
582 return;
583 }
584 putTest(manager1.getCache("sampleCache5"), manager2.getCache("sampleCache5"), ASYNCHRONOUS);
585 }
586
587
588 /**
589 * Tests put and remove initiated from cache1 in a cluster
590 * <p/>
591 * This test goes into an infinite loop if the chain of notifications is not somehow broken.
592 */
593 public void putTest(Cache fromCache, Cache toCache, boolean asynchronous) throws CacheException, InterruptedException {
594
595 Serializable key = new Date();
596 Serializable value = new Date();
597 Element sourceElement = new Element(key, value);
598
599
600 fromCache.put(sourceElement);
601 int i = 0;
602
603 if (asynchronous) {
604 waitForProgagate();
605 }
606
607 int j = 0;
608
609 Thread.sleep(5000);
610
611 LOG.info("" + manager1.getCache("sampleCache1").getSize());
612 LOG.info("" + manager2.getCache("sampleCache1").getSize());
613 LOG.info("" + manager3.getCache("sampleCache1").getSize());
614 LOG.info("" + manager4.getCache("sampleCache1").getSize());
615 LOG.info("" + manager5.getCache("sampleCache1").getSize());
616
617 }
618
619 /**
620 * Tests put and remove initiated from cache1 in a cluster
621 * <p/>
622 * This test goes into an infinite loop if the chain of notifications is not somehow broken.
623 */
624 public void putTestWithThreadKiller(Cache fromCache, Cache toCache, boolean asynchronous)
625 throws CacheException, InterruptedException {
626
627 fromCache.put(new Element("thread killer", new ThreadKiller()));
628 if (asynchronous) {
629 waitForProgagate();
630 }
631
632 Serializable key = new Date();
633 Serializable value = new Date();
634 Element sourceElement = new Element(key, value);
635
636
637 fromCache.put(sourceElement);
638
639 if (asynchronous) {
640 waitForProgagate();
641 }
642
643
644 Element deliveredElement = toCache.get(key);
645 assertEquals(sourceElement, deliveredElement);
646
647 }
648
649
650 /**
651 * Checks that a put received from a remote cache notifies any registered listeners.
652 * <p/>
653 * This test goes into an infinite loop if the chain of notifications is not somehow broken.
654 */
655 public void testRemotePutNotificationGetsToOtherListeners() throws CacheException, InterruptedException {
656
657 if (JVMUtil.isSingleRMIRegistryPerVM()) {
658 return;
659 }
660
661 Serializable key = new Date();
662 Serializable value = new Date();
663 Element element1 = new Element(key, value);
664
665
666 cache1.put(new Element("1", new Date()));
667 cache1.put(new Element("2", new Date()));
668 cache1.put(new Element("3", new Date()));
669
670
671 Object nonSerializableObject = new Object();
672 cache1.put(new Element(nonSerializableObject, new Object()));
673
674
675 waitForProgagate();
676
677
678 assertEquals(4, CountingCacheEventListener.getCacheElementsPut(cache1).size());
679
680 assertEquals(3, CountingCacheEventListener.getCacheElementsPut(cache2).size());
681
682
683 cache1.put(new Element("1", new Date()));
684 cache1.put(new Element("2", new Date()));
685 cache1.put(new Element("3", new Date()));
686
687
688 cache1.put(new Element(nonSerializableObject, new Object()));
689
690 waitForProgagate();
691
692
693 assertEquals(4, CountingCacheEventListener.getCacheElementsUpdated(cache1).size());
694
695 assertEquals(3, CountingCacheEventListener.getCacheElementsUpdated(cache2).size());
696
697
698 cache1.remove("1");
699 cache1.remove("2");
700 cache1.remove("3");
701 cache1.remove(nonSerializableObject);
702
703 waitForProgagate();
704
705
706 assertEquals(4, CountingCacheEventListener.getCacheElementsRemoved(cache1).size());
707
708 assertEquals(3, CountingCacheEventListener.getCacheElementsRemoved(cache2).size());
709
710 }
711
712
713 /**
714 * Test various cache configurations for cache1 - explicit setting of:
715 * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
716 */
717 public void testRemoveWithExplicitReplicationConfig() throws InterruptedException {
718 if (JVMUtil.isSingleRMIRegistryPerVM()) {
719 return;
720 }
721 removeTest(manager1.getCache("sampleCache1"), manager2.getCache("sampleCache1"), ASYNCHRONOUS);
722 }
723
724 /**
725 * Test various cache configurations for cache1 - explicit setting of:
726 * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
727 */
728 public void testRemoveWithExplicitReplicationSynchronousConfig() throws InterruptedException {
729 if (JVMUtil.isSingleRMIRegistryPerVM()) {
730 return;
731 }
732 removeTest(manager1.getCache("sampleCache3"), manager2.getCache("sampleCache3"), SYNCHRONOUS);
733 }
734
735
736 /**
737 * Test put replicated for cache4 - no properties.
738 * Defaults should be replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
739 */
740 public void testRemoveWithEmptyReplicationPropertiesConfig() throws InterruptedException {
741 if (JVMUtil.isSingleRMIRegistryPerVM()) {
742 return;
743 }
744 removeTest(manager1.getCache("sampleCache4"), manager2.getCache("sampleCache4"), ASYNCHRONOUS);
745 }
746
747 /**
748 * Tests put and remove initiated from a cache to another cache in a cluster
749 * <p/>
750 * This test goes into an infinite loop if the chain of notifications is not somehow broken.
751 */
752 public void removeTest(Cache fromCache, Cache toCache, boolean asynchronous) throws CacheException, InterruptedException {
753
754 Serializable key = new Date();
755 Serializable value = new Date();
756 Element element1 = new Element(key, value);
757
758
759 fromCache.put(element1);
760
761 if (asynchronous) {
762 waitForProgagate();
763 }
764
765
766 Element element2 = toCache.get(key);
767 assertEquals(element1, element2);
768
769
770 fromCache.remove(key);
771 if (asynchronous) {
772 waitForProgagate();
773 }
774
775
776 element2 = toCache.get(key);
777 assertNull(element2);
778
779 }
780
781
782 /**
783 * Test various cache configurations for cache1 - explicit setting of:
784 * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
785 */
786 public void testUpdateWithExplicitReplicationConfig() throws Exception {
787 if (JVMUtil.isSingleRMIRegistryPerVM()) {
788 return;
789 }
790 updateViaCopyTest(manager1.getCache("sampleCache1"), manager2.getCache("sampleCache1"), ASYNCHRONOUS);
791 }
792
793 /**
794 * Test various cache configurations for cache1 - explicit setting of:
795 * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
796 */
797 public void testUpdateWithExplicitReplicationSynchronousConfig() throws Exception {
798 if (JVMUtil.isSingleRMIRegistryPerVM()) {
799 return;
800 }
801 updateViaCopyTest(manager1.getCache("sampleCache3"), manager2.getCache("sampleCache3"), SYNCHRONOUS);
802 }
803
804
805 /**
806 * Test put replicated for cache4 - no properties.
807 * Defaults should be replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
808 */
809 public void testUpdateWithEmptyReplicationPropertiesConfig() throws Exception {
810 if (JVMUtil.isSingleRMIRegistryPerVM()) {
811 return;
812 }
813 updateViaCopyTest(manager1.getCache("sampleCache4"), manager2.getCache("sampleCache4"), ASYNCHRONOUS);
814 }
815
816 /**
817 * Tests put and update through copy initiated from cache1 in a cluster
818 * <p/>
819 * This test goes into an infinite loop if the chain of notifications is not somehow broken.
820 */
821 public void updateViaCopyTest(Cache fromCache, Cache toCache, boolean asynchronous) throws Exception {
822
823 fromCache.removeAll();
824 toCache.removeAll();
825
826 Serializable key = new Date();
827 Serializable value = new Date();
828 Element element1 = new Element(key, value);
829
830
831 fromCache.put(element1);
832 if (asynchronous) {
833 waitForProgagate();
834 }
835
836
837 Element element2 = toCache.get(key);
838 assertEquals(element1, element2);
839
840
841 Element updatedElement1 = new Element(key, new Date());
842
843 fromCache.put(updatedElement1);
844 if (asynchronous) {
845 waitForProgagate();
846 }
847
848
849 Element receivedUpdatedElement2 = toCache.get(key);
850 assertEquals(updatedElement1, receivedUpdatedElement2);
851
852 }
853
854
855 /**
856 * Tests put and update through invalidation initiated from cache1 in a cluster
857 * <p/>
858 * This test goes into an infinite loop if the chain of notifications is not somehow broken.
859 */
860 public void testUpdateViaInvalidate() throws CacheException, InterruptedException, IOException {
861
862 if (JVMUtil.isSingleRMIRegistryPerVM()) {
863 return;
864 }
865
866 cache1 = manager1.getCache("sampleCache2");
867 cache1.removeAll();
868
869 cache2 = manager2.getCache("sampleCache2");
870 cache2.removeAll();
871
872 Serializable key = "1";
873 Serializable value = new Date();
874 Element element1 = new Element(key, value);
875
876
877 cache1.put(element1);
878 waitForProgagate();
879
880
881 Element element2 = cache2.get(key);
882 assertEquals(element1, element2);
883
884
885 cache1.put(element1);
886 waitForProgagate();
887
888
889 element2 = cache2.get(key);
890 assertNull(element2);
891
892 }
893
894 /**
895 * What happens when two cache instances replicate to each other and a change is initiated
896 */
897 public void testInfiniteNotificationsLoop() throws InterruptedException {
898
899 if (JVMUtil.isSingleRMIRegistryPerVM()) {
900 return;
901 }
902
903 Serializable key = "1";
904 Serializable value = new Date();
905 Element element = new Element(key, value);
906
907
908 cache1.put(element);
909 waitForProgagate();
910
911
912 Element element2 = cache2.get(key);
913 assertEquals(element, element2);
914
915
916 cache1.remove(key);
917 assertNull(cache1.get(key));
918
919
920 waitForProgagate();
921 element2 = cache2.get(key);
922 assertNull(element2);
923
924
925 Element element3 = new Element("3", "ddsfds");
926 cache2.put(element3);
927 waitForProgagate();
928 Element element4 = cache2.get("3");
929 assertEquals(element3, element4);
930
931 }
932
933
934 /**
935 * Need to wait for async
936 *
937 * @throws InterruptedException
938 */
939 protected void waitForProgagate() throws InterruptedException {
940 Thread.sleep(2000);
941 }
942
943 /**
944 * Need to wait for async
945 *
946 * @throws InterruptedException
947 */
948 protected void waitForSlowProgagate() throws InterruptedException {
949 Thread.sleep(6000);
950 }
951
952 }