001    /**
002     *  Licensed to the Apache Software Foundation (ASF) under one or more
003     *  contributor license agreements.  See the NOTICE file distributed with
004     *  this work for additional information regarding copyright ownership.
005     *  The ASF licenses this file to You under the Apache License, Version 2.0
006     *  (the "License"); you may not use this file except in compliance with
007     *  the License.  You may obtain a copy of the License at
008     *
009     *     http://www.apache.org/licenses/LICENSE-2.0
010     *
011     *  Unless required by applicable law or agreed to in writing, software
012     *  distributed under the License is distributed on an "AS IS" BASIS,
013     *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     *  See the License for the specific language governing permissions and
015     *  limitations under the License.
016     */
017    package org.apache.geronimo.connector.outbound;
018    
019    import java.util.ArrayList;
020    import java.util.Timer;
021    import java.util.TimerTask;
022    import java.util.List;
023    import java.util.concurrent.Semaphore;
024    import java.util.concurrent.TimeUnit;
025    import java.util.concurrent.locks.ReadWriteLock;
026    import java.util.concurrent.locks.ReentrantReadWriteLock;
027    
028    import javax.resource.ResourceException;
029    import javax.resource.spi.ConnectionRequestInfo;
030    import javax.resource.spi.ManagedConnectionFactory;
031    import javax.resource.spi.ManagedConnection;
032    import javax.security.auth.Subject;
033    
034    import org.apache.commons.logging.Log;
035    import org.apache.commons.logging.LogFactory;
036    
037    /**
038     * @version $Rev: 620213 $ $Date: 2008-02-10 00:13:09 +0100 (Sun, 10 Feb 2008) $
039     */
040    public abstract class AbstractSinglePoolConnectionInterceptor implements ConnectionInterceptor, PoolingAttributes {
041        protected static Log log = LogFactory.getLog(AbstractSinglePoolConnectionInterceptor.class.getName());
042        protected final ConnectionInterceptor next;
043        private final ReadWriteLock resizeLock = new ReentrantReadWriteLock();
044        protected Semaphore permits;
045        protected int blockingTimeoutMilliseconds;
046        protected int connectionCount = 0;
047        private long idleTimeoutMilliseconds;
048        private IdleReleaser idleReleaser;
049        protected Timer timer = PoolIdleReleaserTimer.getTimer();
050        protected int maxSize = 0;
051        protected int minSize = 0;
052        protected int shrinkLater = 0;
053        protected volatile boolean destroyed = false;
054    
055        public AbstractSinglePoolConnectionInterceptor(final ConnectionInterceptor next,
056                                                       int maxSize,
057                                                       int minSize,
058                                                       int blockingTimeoutMilliseconds,
059                                                       int idleTimeoutMinutes) {
060            this.next = next;
061            this.maxSize = maxSize;
062            this.minSize = minSize;
063            this.blockingTimeoutMilliseconds = blockingTimeoutMilliseconds;
064            setIdleTimeoutMinutes(idleTimeoutMinutes);
065            permits = new Semaphore(maxSize, true);
066        }
067    
068        public void getConnection(ConnectionInfo connectionInfo) throws ResourceException {
069            if (connectionInfo.getManagedConnectionInfo().getManagedConnection() != null) {
070                if (log.isTraceEnabled()) {
071                    log.trace("using already assigned connection " + connectionInfo.getConnectionHandle() + " for managed connection " + connectionInfo.getManagedConnectionInfo().getManagedConnection() + " to pool " + this);
072                }
073                return;
074            }
075            try {
076                resizeLock.readLock().lock();
077                try {
078                    if (permits.tryAcquire(blockingTimeoutMilliseconds, TimeUnit.MILLISECONDS)) {
079                        try {
080                            internalGetConnection(connectionInfo);
081                        } catch (ResourceException e) {
082                            permits.release();
083                            throw e;
084                        }
085                    } else {
086                        throw new ResourceException("No ManagedConnections available "
087                                + "within configured blocking timeout ( "
088                                + blockingTimeoutMilliseconds
089                                + " [ms] ) for pool " + this);
090    
091                    }
092                } finally {
093                    resizeLock.readLock().unlock();
094                }
095    
096            } catch (InterruptedException ie) {
097                throw new ResourceException("Interrupted while requesting permit.", ie);
098            } // end of try-catch
099        }
100    
101        protected abstract void internalGetConnection(ConnectionInfo connectionInfo) throws ResourceException;
102    
103        public void returnConnection(ConnectionInfo connectionInfo,
104                                     ConnectionReturnAction connectionReturnAction) {
105            if (log.isTraceEnabled()) {
106                log.trace("returning connection " + connectionInfo.getConnectionHandle() + " for MCI " + connectionInfo.getManagedConnectionInfo() + " and MC " + connectionInfo.getManagedConnectionInfo().getManagedConnection() + " to pool " + this);
107            }
108    
109            // not strictly synchronized with destroy(), but pooled operations in internalReturn() are...
110            if (destroyed) {
111                try {
112                    connectionInfo.getManagedConnectionInfo().getManagedConnection().destroy();
113                } catch (ResourceException re) {
114                    // empty
115                }
116                return;
117            }
118    
119            resizeLock.readLock().lock();
120            try {
121                ManagedConnectionInfo mci = connectionInfo.getManagedConnectionInfo();
122                if (connectionReturnAction == ConnectionReturnAction.RETURN_HANDLE && mci.hasConnectionHandles()) {
123                    if (log.isTraceEnabled()) {
124                        log.trace("Return request at pool with connection handles! " + connectionInfo.getConnectionHandle() + " for MCI " + connectionInfo.getManagedConnectionInfo() + " and MC " + connectionInfo.getManagedConnectionInfo().getManagedConnection() + " to pool " + this, new Exception("Stack trace"));
125                    }
126                    return;
127                }
128    
129                boolean releasePermit = internalReturn(connectionInfo, connectionReturnAction);
130    
131                if (releasePermit) {
132                    permits.release();
133                }
134            } finally {
135                resizeLock.readLock().unlock();
136            }
137        }
138    
139        protected boolean internalReturn(ConnectionInfo connectionInfo, ConnectionReturnAction connectionReturnAction) {
140            ManagedConnectionInfo mci = connectionInfo.getManagedConnectionInfo();
141            ManagedConnection mc = mci.getManagedConnection();
142            try {
143                mc.cleanup();
144            } catch (ResourceException e) {
145                connectionReturnAction = ConnectionReturnAction.DESTROY;
146            }
147    
148            boolean releasePermit;
149            synchronized (getPool()) {
150                // a bit redundant, but this closes a small timing hole...
151                if (destroyed) {
152                    try {
153                        mc.destroy();
154                    }
155                    catch (ResourceException re) {
156                        //ignore
157                    }
158                    return doRemove(mci);
159                }
160                if (shrinkLater > 0) {
161                    //nothing can get in the pool while shrinkLater > 0, so releasePermit is false here.
162                    connectionReturnAction = ConnectionReturnAction.DESTROY;
163                    shrinkLater--;
164                    releasePermit = false;
165                } else if (connectionReturnAction == ConnectionReturnAction.RETURN_HANDLE) {
166                    mci.setLastUsed(System.currentTimeMillis());
167                    doAdd(mci);
168                    return true;
169                } else {
170                    releasePermit = doRemove(mci);
171                }
172            }
173            //we must destroy connection.
174            next.returnConnection(connectionInfo, connectionReturnAction);
175            connectionCount--;
176            return releasePermit;
177        }
178    
179        protected abstract void internalDestroy();
180    
181        // Cancel the IdleReleaser TimerTask (fixes memory leak) and clean up the pool
182        public void destroy() {
183            destroyed = true;
184            if (idleReleaser != null)
185                idleReleaser.cancel();
186            internalDestroy();
187            next.destroy();
188        }
189    
190        public int getPartitionCount() {
191            return 1;
192        }
193    
194        public int getPartitionMaxSize() {
195            return maxSize;
196        }
197    
198        public void setPartitionMaxSize(int newMaxSize) throws InterruptedException {
199            if (newMaxSize <= 0) {
200                throw new IllegalArgumentException("Max size must be positive, not " + newMaxSize);
201            }
202            if (newMaxSize != getPartitionMaxSize()) {
203                resizeLock.writeLock().lock();
204                try {
205                    ResizeInfo resizeInfo = new ResizeInfo(this.minSize, permits.availablePermits(), connectionCount, newMaxSize);
206                    permits = new Semaphore(newMaxSize, true);
207                    //pre-acquire permits for the existing checked out connections that will not be closed when they are returned.
208                    for (int i = 0; i < resizeInfo.getTransferCheckedOut(); i++) {
209                        permits.acquire();
210                    }
211                    //make sure shrinkLater is 0 while discarding excess connections
212                    this.shrinkLater = 0;
213                    //transfer connections we are going to keep
214                    transferConnections(newMaxSize, resizeInfo.getShrinkNow());
215                    this.shrinkLater = resizeInfo.getShrinkLater();
216                    this.minSize = resizeInfo.getNewMinSize();
217                    this.maxSize = newMaxSize;
218                } finally {
219                    resizeLock.writeLock().unlock();
220                }
221            }
222        }
223    
224        protected abstract boolean doRemove(ManagedConnectionInfo mci);
225    
226        protected abstract void doAdd(ManagedConnectionInfo mci);
227    
228        protected abstract Object getPool();
229    
230    
231        static final class ResizeInfo {
232    
233            private final int newMinSize;
234            private final int shrinkNow;
235            private final int shrinkLater;
236            private final int transferCheckedOut;
237    
238            ResizeInfo(final int oldMinSize, final int oldPermitsAvailable, final int oldConnectionCount, final int newMaxSize) {
239                final int checkedOut = oldConnectionCount - oldPermitsAvailable;
240                int shrinkLater = checkedOut - newMaxSize;
241                if (shrinkLater < 0) {
242                    shrinkLater = 0;
243                }
244                this.shrinkLater = shrinkLater;
245                int shrinkNow = oldConnectionCount - newMaxSize - shrinkLater;
246                if (shrinkNow < 0) {
247                    shrinkNow = 0;
248                }
249                this.shrinkNow = shrinkNow;
250                if (newMaxSize >= oldMinSize) {
251                    newMinSize = oldMinSize;
252                } else {
253                    newMinSize = newMaxSize;
254                }
255                this.transferCheckedOut = checkedOut - shrinkLater;
256            }
257    
258            public int getNewMinSize() {
259                return newMinSize;
260            }
261    
262            public int getShrinkNow() {
263                return shrinkNow;
264            }
265    
266            public int getShrinkLater() {
267                return shrinkLater;
268            }
269    
270            public int getTransferCheckedOut() {
271                return transferCheckedOut;
272            }
273    
274    
275        }
276    
277        protected abstract void transferConnections(int maxSize, int shrinkNow);
278    
279        public abstract int getIdleConnectionCount();
280    
281        public int getConnectionCount() {
282            return connectionCount;
283        }
284    
285        public int getPartitionMinSize() {
286            return minSize;
287        }
288    
289        public void setPartitionMinSize(int minSize) {
290            this.minSize = minSize;
291        }
292    
293        public int getBlockingTimeoutMilliseconds() {
294            return blockingTimeoutMilliseconds;
295        }
296    
297        public void setBlockingTimeoutMilliseconds(int blockingTimeoutMilliseconds) {
298            if (blockingTimeoutMilliseconds < 0) {
299                throw new IllegalArgumentException("blockingTimeoutMilliseconds must be positive or 0, not " + blockingTimeoutMilliseconds);
300            }
301            if (blockingTimeoutMilliseconds == 0) {
302                this.blockingTimeoutMilliseconds = Integer.MAX_VALUE;
303            } else {
304                this.blockingTimeoutMilliseconds = blockingTimeoutMilliseconds;
305            }
306        }
307    
308        public int getIdleTimeoutMinutes() {
309            return (int) idleTimeoutMilliseconds / (1000 * 60);
310        }
311    
312        public void setIdleTimeoutMinutes(int idleTimeoutMinutes) {
313            if (idleTimeoutMinutes < 0) {
314                throw new IllegalArgumentException("idleTimeoutMinutes must be positive or 0, not " + idleTimeoutMinutes);
315            }
316            if (idleReleaser != null) {
317                idleReleaser.cancel();
318            }
319            if (idleTimeoutMinutes > 0) {
320                this.idleTimeoutMilliseconds = idleTimeoutMinutes * 60 * 1000;
321                idleReleaser = new IdleReleaser(this);
322                timer.schedule(idleReleaser, this.idleTimeoutMilliseconds, this.idleTimeoutMilliseconds);
323            }
324        }
325    
326        protected abstract void getExpiredManagedConnectionInfos(long threshold, List<ManagedConnectionInfo> killList);
327    
328        protected boolean addToPool(ManagedConnectionInfo mci) {
329            boolean added;
330            synchronized (getPool()) {
331                connectionCount++;
332                added = getPartitionMaxSize() > getIdleConnectionCount();
333                if (added) {
334                    doAdd(mci);
335                }
336            }
337            return added;
338        }
339    
340        // static class to permit chain of strong references from preventing ClassLoaders
341        // from being GC'ed.
342        private static class IdleReleaser extends TimerTask {
343            private AbstractSinglePoolConnectionInterceptor parent;
344    
345            private IdleReleaser(AbstractSinglePoolConnectionInterceptor parent) {
346                this.parent = parent;
347            }
348    
349            public boolean cancel() {
350                this.parent = null;
351                return super.cancel();
352            }
353    
354            public void run() {
355                // protect against interceptor being set to null mid-execution
356                AbstractSinglePoolConnectionInterceptor interceptor = parent;
357                if (interceptor == null)
358                    return;
359    
360                interceptor.resizeLock.readLock().lock();
361                try {
362                    long threshold = System.currentTimeMillis() - interceptor.idleTimeoutMilliseconds;
363                    List<ManagedConnectionInfo> killList = new ArrayList<ManagedConnectionInfo>(interceptor.getPartitionMaxSize());
364                    interceptor.getExpiredManagedConnectionInfos(threshold, killList);
365                    for (ManagedConnectionInfo managedConnectionInfo : killList) {
366                        ConnectionInfo killInfo = new ConnectionInfo(managedConnectionInfo);
367                        interceptor.internalReturn(killInfo, ConnectionReturnAction.DESTROY);
368                    }
369                    interceptor.permits.release(killList.size());
370                } catch (Throwable t) {
371                    log.error("Error occurred during execution of ExpirationMonitor TimerTask", t);
372                } finally {
373                    interceptor.resizeLock.readLock().unlock();
374                }
375            }
376    
377        }
378    
379        // Currently only a short-lived (10 millisecond) task.
380        // So, FillTask, unlike IdleReleaser, shouldn't cause GC problems.
381        protected class FillTask extends TimerTask {
382            private final ManagedConnectionFactory managedConnectionFactory;
383            private final Subject subject;
384            private final ConnectionRequestInfo cri;
385    
386            public FillTask(ConnectionInfo connectionInfo) {
387                managedConnectionFactory = connectionInfo.getManagedConnectionInfo().getManagedConnectionFactory();
388                subject = connectionInfo.getManagedConnectionInfo().getSubject();
389                cri = connectionInfo.getManagedConnectionInfo().getConnectionRequestInfo();
390            }
391    
392            public void run() {
393                resizeLock.readLock().lock();
394                try {
395                    while (connectionCount < minSize) {
396                        ManagedConnectionInfo mci = new ManagedConnectionInfo(managedConnectionFactory, cri);
397                        mci.setSubject(subject);
398                        ConnectionInfo ci = new ConnectionInfo(mci);
399                        try {
400                            next.getConnection(ci);
401                        } catch (ResourceException e) {
402                            return;
403                        }
404                        boolean added = addToPool(mci);
405                        if (!added) {
406                            internalReturn(ci, ConnectionReturnAction.DESTROY);
407                            return;
408                        }
409                    }
410                } catch (Throwable t) {
411                    log.error("FillTask encountered error in run method", t);
412                } finally {
413                    resizeLock.readLock().unlock();
414                }
415            }
416    
417        }
418    }