View Javadoc

1   /**
2    *  Copyright 2003-2006 Greg Luck
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  
17  package net.sf.ehcache.distribution;
18  
19  import net.sf.ehcache.CacheManager;
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  
23  import java.io.IOException;
24  import java.net.DatagramPacket;
25  import java.net.InetAddress;
26  import java.net.MulticastSocket;
27  import java.rmi.RemoteException;
28  import java.util.StringTokenizer;
29  
30  /**
31   * Receives heartbeats from any {@link MulticastKeepaliveHeartbeatSender}s out there.
32   * <p/>
33   * Our own multicast heartbeats are ignored.
34   *
35   * @author Greg Luck
36   * @version $Id: MulticastKeepaliveHeartbeatReceiver.java 52 2006-04-24 14:50:03Z gregluck $
37   */
38  public final class MulticastKeepaliveHeartbeatReceiver {
39  
40      private static final Log LOG = LogFactory.getLog(MulticastKeepaliveHeartbeatReceiver.class.getName());
41  
42      private final InetAddress groupMulticastAddress;
43      private final Integer groupMulticastPort;
44      private MulticastReceiverThread receiverThread;
45      private MulticastSocket socket;
46      private boolean stopped;
47      private final MulticastRMICacheManagerPeerProvider peerProvider;
48  
49      /**
50       * Constructor.
51       *
52       * @param peerProvider
53       * @param multicastAddress
54       * @param multicastPort
55       */
56      public MulticastKeepaliveHeartbeatReceiver(
57              MulticastRMICacheManagerPeerProvider peerProvider, InetAddress multicastAddress, Integer multicastPort) {
58          this.peerProvider = peerProvider;
59          this.groupMulticastAddress = multicastAddress;
60          this.groupMulticastPort = multicastPort;
61      }
62  
63      /**
64       * Start.
65       * @throws IOException
66       */
67      final void init() throws IOException {
68  
69          socket = new MulticastSocket(groupMulticastPort.intValue());
70          socket.joinGroup(groupMulticastAddress);
71          receiverThread = new MulticastReceiverThread();
72          receiverThread.start();
73      }
74  
75      /**
76       * Shutdown the heartbeat.
77       */
78      public final void dispose() {
79          stopped = true;
80          receiverThread.interrupt();
81      }
82  
83  
84      /**
85       * A multicast receiver which continously receives heartbeats.
86       */
87      private final class MulticastReceiverThread extends Thread {
88  
89  
90  
91          public final void run() {
92              byte[] buf = new byte[PayloadUtil.MTU];
93              while (!stopped) {
94                  DatagramPacket packet = new DatagramPacket(buf, buf.length);
95                  try {
96                      socket.receive(packet);
97                      byte[] payload = packet.getData();
98                      processPayload(payload);
99  
100 
101                 } catch (IOException e) {
102                     if (!stopped) {
103                         LOG.error("Error receiving heartbeat. " + e.getMessage() + ". Initial cause was " + e.getMessage(), e);
104                     }
105                 }
106             }
107         }
108 
109         private void processPayload(byte[] compressedPayload) {
110             byte[] payload = PayloadUtil.ungzip(compressedPayload);
111             String rmiUrls = new String(payload);
112             if (self(rmiUrls)) {
113                 return;
114             }
115             rmiUrls = rmiUrls.trim();
116             if (LOG.isTraceEnabled()) {
117                 LOG.trace("rmiUrls received " + rmiUrls);
118             }
119             for (StringTokenizer stringTokenizer = new StringTokenizer(rmiUrls,
120                     PayloadUtil.URL_DELIMITER); stringTokenizer.hasMoreTokens();) {
121                 String rmiUrl = stringTokenizer.nextToken();
122                 registerNotification(rmiUrl);
123             }
124         }
125 
126 
127         /**
128          * @param rmiUrls
129          * @return true if our own hostname and listener port are found in the list. This then means we have
130          * caught our onw multicast, and should be ignored.
131          */
132         private boolean self(String rmiUrls) {
133             CacheManager cacheManager = peerProvider.getCacheManager();
134             CachePeer peer = (CachePeer) cacheManager.getCachePeerListener().getBoundCachePeers().get(0);
135             String cacheManagerUrlBase = null;
136             try {
137                 cacheManagerUrlBase = peer.getUrlBase();
138             } catch (RemoteException e) {
139                 LOG.error("Error geting url base");
140             }
141             int baseUrlMatch = rmiUrls.indexOf(cacheManagerUrlBase);
142             return baseUrlMatch != -1;
143         }
144 
145         private void registerNotification(String rmiUrl) {
146             peerProvider.registerPeer(rmiUrl);
147         }
148 
149 
150         /**
151          * {@inheritDoc}
152          */
153         public final void interrupt() {
154             try {
155                 socket.leaveGroup(groupMulticastAddress);
156             } catch (IOException e) {
157                 LOG.error("Error leaving group");
158             }
159             socket.close();
160             super.interrupt();
161         }
162     }
163 
164 
165 }