001    /**
002     *  Licensed to the Apache Software Foundation (ASF) under one or more
003     *  contributor license agreements.  See the NOTICE file distributed with
004     *  this work for additional information regarding copyright ownership.
005     *  The ASF licenses this file to You under the Apache License, Version 2.0
006     *  (the "License"); you may not use this file except in compliance with
007     *  the License.  You may obtain a copy of the License at
008     *
009     *     http://www.apache.org/licenses/LICENSE-2.0
010     *
011     *  Unless required by applicable law or agreed to in writing, software
012     *  distributed under the License is distributed on an "AS IS" BASIS,
013     *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     *  See the License for the specific language governing permissions and
015     *  limitations under the License.
016     */
017    
018    package org.apache.geronimo.transaction.manager;
019    
020    import java.util.ArrayList;
021    import java.util.HashMap;
022    import java.util.Iterator;
023    import java.util.List;
024    import java.util.Map;
025    
026    import javax.transaction.*;
027    import javax.transaction.xa.XAException;
028    import javax.transaction.xa.Xid;
029    
030    import java.util.concurrent.ConcurrentHashMap;
031    import java.util.concurrent.CopyOnWriteArrayList;
032    import java.util.concurrent.atomic.AtomicLong;
033    import org.apache.commons.logging.Log;
034    import org.apache.commons.logging.LogFactory;
035    import org.apache.geronimo.transaction.log.UnrecoverableLog;
036    
037    /**
038     * Simple implementation of a transaction manager.
039     *
040     * @version $Rev: 584554 $ $Date: 2007-10-14 17:19:58 +0200 (Sun, 14 Oct 2007) $
041     */
042    public class TransactionManagerImpl implements TransactionManager, UserTransaction, TransactionSynchronizationRegistry, XidImporter, MonitorableTransactionManager, RecoverableTransactionManager {
043        private static final Log log = LogFactory.getLog(TransactionManagerImpl.class);
044        protected static final int DEFAULT_TIMEOUT = 600;
045        protected static final byte[] DEFAULT_TM_ID = new byte[] {71,84,77,73,68};
046    
047        final TransactionLog transactionLog;
048        final XidFactory xidFactory;
049        private final int defaultTransactionTimeoutMilliseconds;
050        private final ThreadLocal transactionTimeoutMilliseconds = new ThreadLocal();
051        private final ThreadLocal threadTx = new ThreadLocal();
052        private final ConcurrentHashMap associatedTransactions = new ConcurrentHashMap();
053        private static final Log recoveryLog = LogFactory.getLog("RecoveryController");
054        final Recovery recovery;
055        private final CopyOnWriteArrayList transactionAssociationListeners = new CopyOnWriteArrayList();
056        private List recoveryErrors = new ArrayList();
057        // statistics
058        private AtomicLong totalCommits = new AtomicLong(0);
059        private AtomicLong totalRollBacks = new AtomicLong(0);
060        private AtomicLong activeCount = new AtomicLong(0);
061    
062        public TransactionManagerImpl() throws XAException {
063            this(DEFAULT_TIMEOUT,
064                    null,
065                    null
066            );
067        }
068    
069        public TransactionManagerImpl(int defaultTransactionTimeoutSeconds) throws XAException {
070            this(defaultTransactionTimeoutSeconds,
071                    null,
072                    null
073            );
074        }
075    
076        public TransactionManagerImpl(int defaultTransactionTimeoutSeconds, TransactionLog transactionLog) throws XAException {
077            this(defaultTransactionTimeoutSeconds,
078                    null,
079                    transactionLog
080            );
081        }
082    
083        public TransactionManagerImpl(int defaultTransactionTimeoutSeconds, XidFactory xidFactory, TransactionLog transactionLog) throws XAException {
084            if (defaultTransactionTimeoutSeconds <= 0) {
085                throw new IllegalArgumentException("defaultTransactionTimeoutSeconds must be positive: attempted value: " + defaultTransactionTimeoutSeconds);
086            }
087            this.defaultTransactionTimeoutMilliseconds = defaultTransactionTimeoutSeconds * 1000;
088    
089            if (transactionLog == null) {
090                this.transactionLog = new UnrecoverableLog();
091            } else {
092                this.transactionLog = transactionLog;
093            }
094    
095            if (xidFactory != null) {
096                this.xidFactory = xidFactory;
097            } else {
098                this.xidFactory = new XidFactoryImpl(DEFAULT_TM_ID);
099            }
100    
101            recovery = new RecoveryImpl(this.transactionLog, this.xidFactory);
102            recovery.recoverLog();
103        }
104    
105        public Transaction getTransaction() {
106            return (Transaction) threadTx.get();
107        }
108    
109        private void associate(TransactionImpl tx) throws InvalidTransactionException {
110            if (tx == null) throw new NullPointerException("tx is null");
111    
112            Object existingAssociation = associatedTransactions.putIfAbsent(tx, Thread.currentThread());
113            if (existingAssociation != null) {
114                throw new InvalidTransactionException("Specified transaction is already associated with another thread");
115            }
116            threadTx.set(tx);
117            fireThreadAssociated(tx);
118            activeCount.getAndIncrement();
119        }
120    
121        private void unassociate() {
122            Transaction tx = getTransaction();
123            if (tx != null) {
124                associatedTransactions.remove(tx);
125                threadTx.set(null);
126                fireThreadUnassociated(tx);
127                activeCount.getAndDecrement();
128            }
129        }
130    
131        public void setTransactionTimeout(int seconds) throws SystemException {
132            if (seconds < 0) {
133                throw new SystemException("transaction timeout must be positive or 0 to reset to default");
134            }
135            if (seconds == 0) {
136                transactionTimeoutMilliseconds.set(null);
137            } else {
138                transactionTimeoutMilliseconds.set(new Long(seconds * 1000));
139            }
140        }
141    
142        public int getStatus() throws SystemException {
143            Transaction tx = getTransaction();
144            return (tx != null) ? tx.getStatus() : Status.STATUS_NO_TRANSACTION;
145        }
146    
147        public void begin() throws NotSupportedException, SystemException {
148            begin(getTransactionTimeoutMilliseconds(0L));
149        }
150    
151        public Transaction begin(long transactionTimeoutMilliseconds) throws NotSupportedException, SystemException {
152            if (getStatus() != Status.STATUS_NO_TRANSACTION) {
153                throw new NotSupportedException("Nested Transactions are not supported");
154            }
155            TransactionImpl tx = new TransactionImpl(xidFactory, transactionLog, getTransactionTimeoutMilliseconds(transactionTimeoutMilliseconds));
156    //        timeoutTimer.schedule(tx, getTransactionTimeoutMilliseconds(transactionTimeoutMilliseconds));
157            try {
158                associate(tx);
159            } catch (InvalidTransactionException e) {
160                // should not be possible since we just created that transaction and no one has a reference yet
161                throw (SystemException)new SystemException("Internal error: associate threw an InvalidTransactionException for a newly created transaction").initCause(e);
162            }
163            // Todo: Verify if this is correct thing to do. Use default timeout for next transaction.
164            this.transactionTimeoutMilliseconds.set(null);
165            return tx;
166        }
167    
168        public Transaction suspend() throws SystemException {
169            Transaction tx = getTransaction();
170            if (tx != null) {
171                unassociate();
172            }
173            return tx;
174        }
175    
176        public void resume(Transaction tx) throws IllegalStateException, InvalidTransactionException, SystemException {
177            if (getTransaction() != null) {
178                throw new IllegalStateException("Thread already associated with another transaction");
179            }
180            if (!(tx instanceof TransactionImpl)) {
181                throw new InvalidTransactionException("Cannot resume foreign transaction: " + tx);
182            }
183            associate((TransactionImpl) tx);
184        }
185    
186        public Object getResource(Object key) {
187            TransactionImpl tx = getActiveTransactionImpl();
188            return tx.getResource(key);
189        }
190    
191        private TransactionImpl getActiveTransactionImpl() {
192            TransactionImpl tx = (TransactionImpl)threadTx.get();
193            if (tx == null) {
194                throw new IllegalStateException("No tx on thread");
195            }
196            if (tx.getStatus() != Status.STATUS_ACTIVE && tx.getStatus() != Status.STATUS_MARKED_ROLLBACK) {
197                throw new IllegalStateException("Transaction " + tx + " is not active");
198            }
199            return tx;
200        }
201    
202        public boolean getRollbackOnly() {
203            TransactionImpl tx = getActiveTransactionImpl();
204            return tx.getRollbackOnly();
205        }
206    
207        public Object getTransactionKey() {
208            TransactionImpl tx = getActiveTransactionImpl();
209            return tx.getTransactionKey();
210        }
211    
212        public int getTransactionStatus() {
213            TransactionImpl tx = (TransactionImpl) getTransaction();
214            return tx == null? Status.STATUS_NO_TRANSACTION: tx.getTransactionStatus();
215        }
216    
217        public void putResource(Object key, Object value) {
218            TransactionImpl tx = getActiveTransactionImpl();
219            tx.putResource(key, value);
220        }
221    
222        /**
223         * jta 1.1 method so the jpa implementations can be told to flush their caches.
224         * @param synchronization
225         */
226        public void registerInterposedSynchronization(Synchronization synchronization) {
227            TransactionImpl tx = getActiveTransactionImpl();
228            tx.registerInterposedSynchronization(synchronization);
229        }
230    
231        public void setRollbackOnly() throws IllegalStateException {
232            TransactionImpl tx = (TransactionImpl) threadTx.get();
233            if (tx == null) {
234                throw new IllegalStateException("No transaction associated with current thread");
235            }
236            tx.setRollbackOnly();
237        }
238    
239        public void commit() throws HeuristicMixedException, HeuristicRollbackException, IllegalStateException, RollbackException, SecurityException, SystemException {
240            Transaction tx = getTransaction();
241            if (tx == null) {
242                throw new IllegalStateException("No transaction associated with current thread");
243            }
244            try {
245                tx.commit();
246            } finally {
247                unassociate();
248            }
249            totalCommits.getAndIncrement();
250        }
251    
252        public void rollback() throws IllegalStateException, SecurityException, SystemException {
253            Transaction tx = getTransaction();
254            if (tx == null) {
255                throw new IllegalStateException("No transaction associated with current thread");
256            }
257            try {
258                tx.rollback();
259            } finally {
260                unassociate();
261            }
262            totalRollBacks.getAndIncrement();
263        }
264    
265        //XidImporter implementation
266        public Transaction importXid(Xid xid, long transactionTimeoutMilliseconds) throws XAException, SystemException {
267            if (transactionTimeoutMilliseconds < 0) {
268                throw new SystemException("transaction timeout must be positive or 0 to reset to default");
269            }
270            TransactionImpl tx = new TransactionImpl(xid, xidFactory, transactionLog, getTransactionTimeoutMilliseconds(transactionTimeoutMilliseconds));
271            return tx;
272        }
273    
274        public void commit(Transaction tx, boolean onePhase) throws XAException {
275            if (onePhase) {
276                try {
277                    tx.commit();
278                } catch (HeuristicMixedException e) {
279                    throw (XAException) new XAException().initCause(e);
280                } catch (HeuristicRollbackException e) {
281                    throw (XAException) new XAException().initCause(e);
282                } catch (RollbackException e) {
283                    throw (XAException) new XAException().initCause(e);
284                } catch (SecurityException e) {
285                    throw (XAException) new XAException().initCause(e);
286                } catch (SystemException e) {
287                    throw (XAException) new XAException().initCause(e);
288                }
289            } else {
290                try {
291                    ((TransactionImpl) tx).preparedCommit();
292                } catch (SystemException e) {
293                    throw (XAException) new XAException().initCause(e);
294                }
295            }
296            totalCommits.getAndIncrement();
297        }
298    
299        public void forget(Transaction tx) throws XAException {
300            //TODO implement this!
301        }
302    
303        public int prepare(Transaction tx) throws XAException {
304            try {
305                return ((TransactionImpl) tx).prepare();
306            } catch (SystemException e) {
307                throw (XAException) new XAException().initCause(e);
308            } catch (RollbackException e) {
309                throw (XAException) new XAException().initCause(e);
310            }
311        }
312    
313        public void rollback(Transaction tx) throws XAException {
314            try {
315                tx.rollback();
316            } catch (IllegalStateException e) {
317                throw (XAException) new XAException().initCause(e);
318            } catch (SystemException e) {
319                throw (XAException) new XAException().initCause(e);
320            }
321            totalRollBacks.getAndIncrement();
322        }
323    
324        long getTransactionTimeoutMilliseconds(long transactionTimeoutMilliseconds) {
325            if (transactionTimeoutMilliseconds != 0) {
326                return transactionTimeoutMilliseconds;
327            }
328            Long timeout = (Long) this.transactionTimeoutMilliseconds.get();
329            if (timeout != null) {
330                return timeout.longValue();
331            }
332            return defaultTransactionTimeoutMilliseconds;
333        }
334    
335        //Recovery
336        public void recoveryError(Exception e) {
337            recoveryLog.error(e);
338            recoveryErrors.add(e);
339        }
340    
341        public void recoverResourceManager(NamedXAResource xaResource) {
342            try {
343                recovery.recoverResourceManager(xaResource);
344            } catch (XAException e) {
345                recoveryError(e);
346            }
347        }
348    
349        public Map getExternalXids() {
350            return new HashMap(recovery.getExternalXids());
351        }
352    
353        public void addTransactionAssociationListener(TransactionManagerMonitor listener) {
354            transactionAssociationListeners.addIfAbsent(listener);
355        }
356    
357        public void removeTransactionAssociationListener(TransactionManagerMonitor listener) {
358            transactionAssociationListeners.remove(listener);
359        }
360    
361        protected void fireThreadAssociated(Transaction tx) {
362            for (Iterator iterator = transactionAssociationListeners.iterator(); iterator.hasNext();) {
363                TransactionManagerMonitor listener = (TransactionManagerMonitor) iterator.next();
364                try {
365                    listener.threadAssociated(tx);
366                } catch (Exception e) {
367                    log.warn("Error calling transaction association listener", e);
368                }
369            }
370        }
371    
372        protected void fireThreadUnassociated(Transaction tx) {
373            for (Iterator iterator = transactionAssociationListeners.iterator(); iterator.hasNext();) {
374                TransactionManagerMonitor listener = (TransactionManagerMonitor) iterator.next();
375                try {
376                    listener.threadUnassociated(tx);
377                } catch (Exception e) {
378                    log.warn("Error calling transaction association listener", e);
379                }
380            }
381        }
382    
383        /**
384         * Returns the number of active transactions.
385         */
386        public long getActiveCount() {
387            return activeCount.longValue();
388        }
389    
390        /**
391         * Return the number of total commits
392         */
393        public long getTotalCommits() {
394            return totalCommits.longValue();
395        }
396    
397        /**
398         * Returns the number of total rollbacks
399         */
400        public long getTotalRollbacks() {
401            return totalRollBacks.longValue();
402        }
403    
404        /**
405         * Reset statistics
406         */
407        public void resetStatistics() {
408            totalCommits.getAndSet(0);
409            totalRollBacks.getAndSet(0);
410        }
411    }