001    /**
002     *  Licensed to the Apache Software Foundation (ASF) under one or more
003     *  contributor license agreements.  See the NOTICE file distributed with
004     *  this work for additional information regarding copyright ownership.
005     *  The ASF licenses this file to You under the Apache License, Version 2.0
006     *  (the "License"); you may not use this file except in compliance with
007     *  the License.  You may obtain a copy of the License at
008     *
009     *     http://www.apache.org/licenses/LICENSE-2.0
010     *
011     *  Unless required by applicable law or agreed to in writing, software
012     *  distributed under the License is distributed on an "AS IS" BASIS,
013     *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     *  See the License for the specific language governing permissions and
015     *  limitations under the License.
016     */
017    
018    package org.apache.geronimo.transaction.manager;
019    
020    import java.io.PrintWriter;
021    import java.io.StringWriter;
022    import java.io.Writer;
023    import java.util.ArrayList;
024    import java.util.HashMap;
025    import java.util.IdentityHashMap;
026    import java.util.Iterator;
027    import java.util.LinkedList;
028    import java.util.List;
029    import java.util.Map;
030    import java.util.Set;
031    
032    import javax.transaction.HeuristicMixedException;
033    import javax.transaction.HeuristicRollbackException;
034    import javax.transaction.RollbackException;
035    import javax.transaction.Status;
036    import javax.transaction.Synchronization;
037    import javax.transaction.SystemException;
038    import javax.transaction.Transaction;
039    import javax.transaction.xa.XAException;
040    import javax.transaction.xa.XAResource;
041    import javax.transaction.xa.Xid;
042    
043    import org.apache.commons.logging.Log;
044    import org.apache.commons.logging.LogFactory;
045    
046    /**
047     * Basic local transaction with support for multiple resources.
048     *
049     * @version $Rev: 577161 $ $Date: 2007-09-19 07:34:43 +0200 (Wed, 19 Sep 2007) $
050     */
051    public class TransactionImpl implements Transaction {
052        private static final Log log = LogFactory.getLog("Transaction");
053    
054        private final XidFactory xidFactory;
055        private final Xid xid;
056        private final TransactionLog txnLog;
057        private final long timeout;
058        private final List syncList = new ArrayList(5);
059        private final List interposedSyncList = new ArrayList(3);
060        private final LinkedList resourceManagers = new LinkedList();
061        private final IdentityHashMap activeXaResources = new IdentityHashMap(3);
062        private final IdentityHashMap suspendedXaResources = new IdentityHashMap(3);
063        private int status = Status.STATUS_NO_TRANSACTION;
064        private Object logMark;
065    
066        private final Map resources = new HashMap();
067    
068        TransactionImpl(XidFactory xidFactory, TransactionLog txnLog, long transactionTimeoutMilliseconds) throws SystemException {
069            this(xidFactory.createXid(), xidFactory, txnLog, transactionTimeoutMilliseconds);
070        }
071    
072        TransactionImpl(Xid xid, XidFactory xidFactory, TransactionLog txnLog, long transactionTimeoutMilliseconds) throws SystemException {
073            this.xidFactory = xidFactory;
074            this.txnLog = txnLog;
075            this.xid = xid;
076            this.timeout = transactionTimeoutMilliseconds + TransactionTimer.getCurrentTime();
077            try {
078                txnLog.begin(xid);
079            } catch (LogException e) {
080                status = Status.STATUS_MARKED_ROLLBACK;
081                SystemException ex = new SystemException("Error logging begin; transaction marked for roll back)");
082                ex.initCause(e);
083                throw ex;
084            }
085            status = Status.STATUS_ACTIVE;
086        }
087    
088        //reconstruct a tx for an external tx found in recovery
089        public TransactionImpl(Xid xid, TransactionLog txLog) {
090            this.xidFactory = null;
091            this.txnLog = txLog;
092            this.xid = xid;
093            status = Status.STATUS_PREPARED;
094            //TODO is this a good idea?
095            this.timeout = Long.MAX_VALUE;
096        }
097    
098        public synchronized int getStatus() {
099            return status;
100        }
101    
102        public Object getResource(Object key) {
103            return resources.get(key);
104        }
105    
106        public boolean getRollbackOnly() {
107            return status == Status.STATUS_MARKED_ROLLBACK;
108        }
109    
110        public Object getTransactionKey() {
111            return xid;
112        }
113    
114        public int getTransactionStatus() {
115            return status;
116        }
117    
118        public void putResource(Object key, Object value) {
119            if (key == null) {
120                throw new NullPointerException("You must supply a non-null key for putResource");
121            }
122            resources.put(key, value);
123        }
124    
125        public void registerInterposedSynchronization(Synchronization synchronization) {
126            interposedSyncList.add(synchronization);
127        }
128    
129        public synchronized void setRollbackOnly() throws IllegalStateException {
130            switch (status) {
131                case Status.STATUS_ACTIVE:
132                case Status.STATUS_PREPARING:
133                    status = Status.STATUS_MARKED_ROLLBACK;
134                    break;
135                case Status.STATUS_MARKED_ROLLBACK:
136                case Status.STATUS_ROLLING_BACK:
137                    // nothing to do
138                    break;
139                default:
140                    throw new IllegalStateException("Cannot set rollback only, status is " + getStateString(status));
141            }
142        }
143    
144        public synchronized void registerSynchronization(Synchronization synch) throws IllegalStateException, RollbackException, SystemException {
145            if (synch == null) {
146                throw new IllegalArgumentException("Synchronization is null");
147            }
148            switch (status) {
149                case Status.STATUS_ACTIVE:
150                case Status.STATUS_PREPARING:
151                    break;
152                case Status.STATUS_MARKED_ROLLBACK:
153                    throw new RollbackException("Transaction is marked for rollback");
154                default:
155                    throw new IllegalStateException("Status is " + getStateString(status));
156            }
157            syncList.add(synch);
158        }
159    
160        public synchronized boolean enlistResource(XAResource xaRes) throws IllegalStateException, RollbackException, SystemException {
161            if (xaRes == null) {
162                throw new IllegalArgumentException("XAResource is null");
163            }
164            switch (status) {
165                case Status.STATUS_ACTIVE:
166                    break;
167                case Status.STATUS_MARKED_ROLLBACK:
168                    break;
169                default:
170                    throw new IllegalStateException("Status is " + getStateString(status));
171            }
172    
173            if (activeXaResources.containsKey(xaRes)) {
174                throw new IllegalStateException("xaresource: " + xaRes + " is already enlisted!");
175            }
176    
177            try {
178                TransactionBranch manager = (TransactionBranch) suspendedXaResources.remove(xaRes);
179                if (manager != null) {
180                    //we know about this one, it was suspended
181                    xaRes.start(manager.getBranchId(), XAResource.TMRESUME);
182                    activeXaResources.put(xaRes, manager);
183                    return true;
184                }
185                //it is not suspended.
186                for (Iterator i = resourceManagers.iterator(); i.hasNext();) {
187                    manager = (TransactionBranch) i.next();
188                    boolean sameRM;
189                    //if the xares is already known, we must be resuming after a suspend.
190                    if (xaRes == manager.getCommitter()) {
191                        throw new IllegalStateException("xaRes " + xaRes + " is a committer but is not active or suspended");
192                    }
193                    //Otherwise, see if this is a new xares for the same resource manager
194                    try {
195                        sameRM = xaRes.isSameRM(manager.getCommitter());
196                    } catch (XAException e) {
197                        log.warn("Unexpected error checking for same RM", e);
198                        continue;
199                    }
200                    if (sameRM) {
201                        xaRes.start(manager.getBranchId(), XAResource.TMJOIN);
202                        activeXaResources.put(xaRes, manager);
203                        return true;
204                    }
205                }
206                //we know nothing about this XAResource or resource manager
207                Xid branchId = xidFactory.createBranch(xid, resourceManagers.size() + 1);
208                xaRes.start(branchId, XAResource.TMNOFLAGS);
209                activeXaResources.put(xaRes, addBranchXid(xaRes, branchId));
210                return true;
211            } catch (XAException e) {
212                log.warn("Unable to enlist XAResource " + xaRes + ", errorCode: " + e.errorCode, e);
213                return false;
214            }
215        }
216    
217        public synchronized boolean delistResource(XAResource xaRes, int flag) throws IllegalStateException, SystemException {
218            if (!(flag == XAResource.TMFAIL || flag == XAResource.TMSUCCESS || flag == XAResource.TMSUSPEND)) {
219                throw new IllegalStateException("invalid flag for delistResource: " + flag);
220            }
221            if (xaRes == null) {
222                throw new IllegalArgumentException("XAResource is null");
223            }
224            switch (status) {
225                case Status.STATUS_ACTIVE:
226                case Status.STATUS_MARKED_ROLLBACK:
227                    break;
228                default:
229                    throw new IllegalStateException("Status is " + getStateString(status));
230            }
231            TransactionBranch manager = (TransactionBranch) activeXaResources.remove(xaRes);
232            if (manager == null) {
233                if (flag == XAResource.TMSUSPEND) {
234                    throw new IllegalStateException("trying to suspend an inactive xaresource: " + xaRes);
235                }
236                //not active, and we are not trying to suspend.  We must be ending tx.
237                manager = (TransactionBranch) suspendedXaResources.remove(xaRes);
238                if (manager == null) {
239                    throw new IllegalStateException("Resource not known to transaction: " + xaRes);
240                }
241            }
242    
243            try {
244                xaRes.end(manager.getBranchId(), flag);
245                if (flag == XAResource.TMSUSPEND) {
246                    suspendedXaResources.put(xaRes, manager);
247                }
248                return true;
249            } catch (XAException e) {
250                log.warn("Unable to delist XAResource " + xaRes + ", error code: " + e.errorCode, e);
251                return false;
252            }
253        }
254    
255        //Transaction method, does 2pc
256        public void commit() throws HeuristicMixedException, HeuristicRollbackException, RollbackException, SecurityException, SystemException {
257            beforePrepare();
258    
259            try {
260                boolean timedout = false;
261                if (TransactionTimer.getCurrentTime() > timeout) {
262                    status = Status.STATUS_MARKED_ROLLBACK;
263                    timedout = true;
264                }
265    
266                if (status == Status.STATUS_MARKED_ROLLBACK) {
267                    rollbackResources(resourceManagers);
268                    if (timedout) {
269                        throw new RollbackException("Transaction timeout");
270                    } else {
271                        throw new RollbackException("Unable to commit: transaction marked for rollback");
272                    }
273                }
274                synchronized (this) {
275                    if (status == Status.STATUS_ACTIVE) {
276                        if (this.resourceManagers.size() == 0) {
277                            // nothing to commit
278                            status = Status.STATUS_COMMITTED;
279                        } else if (this.resourceManagers.size() == 1) {
280                            // one-phase commit decision
281                            status = Status.STATUS_COMMITTING;
282                        } else {
283                            // start prepare part of two-phase
284                            status = Status.STATUS_PREPARING;
285                        }
286                    }
287                    // resourceManagers is now immutable
288                }
289    
290                // no-phase
291                if (resourceManagers.size() == 0) {
292                    synchronized (this) {
293                        status = Status.STATUS_COMMITTED;
294                    }
295                    return;
296                }
297    
298                // one-phase
299                if (resourceManagers.size() == 1) {
300                    TransactionBranch manager = (TransactionBranch) resourceManagers.getFirst();
301                    try {
302                        manager.getCommitter().commit(manager.getBranchId(), true);
303                        synchronized (this) {
304                            status = Status.STATUS_COMMITTED;
305                        }
306                        return;
307                    } catch (XAException e) {
308                        synchronized (this) {
309                            status = Status.STATUS_ROLLEDBACK;
310                        }
311                        throw (RollbackException) new RollbackException("Error during one-phase commit").initCause(e);
312                    }
313                }
314    
315                // two-phase
316                boolean willCommit = internalPrepare();
317    
318                // notify the RMs
319                if (willCommit) {
320                    commitResources(resourceManagers);
321                } else {
322                    rollbackResources(resourceManagers);
323                    throw new RollbackException("Unable to commit");
324                }
325            } finally {
326                afterCompletion();
327                synchronized (this) {
328                    status = Status.STATUS_NO_TRANSACTION;
329                }
330            }
331        }
332    
333        //Used from XATerminator for first phase in a remotely controlled tx.
334        int prepare() throws SystemException, RollbackException {
335            beforePrepare();
336            int result = XAResource.XA_RDONLY;
337            try {
338                LinkedList rms;
339                synchronized (this) {
340                    if (status == Status.STATUS_ACTIVE) {
341                        if (resourceManagers.size() == 0) {
342                            // nothing to commit
343                            status = Status.STATUS_COMMITTED;
344                            return result;
345                        } else {
346                            // start prepare part of two-phase
347                            status = Status.STATUS_PREPARING;
348                        }
349                    }
350                    // resourceManagers is now immutable
351                    rms = resourceManagers;
352                }
353    
354                boolean willCommit = internalPrepare();
355    
356                // notify the RMs
357                if (willCommit) {
358                    if (!rms.isEmpty()) {
359                        result = XAResource.XA_OK;
360                    }
361                } else {
362                    rollbackResources(rms);
363                    throw new RollbackException("Unable to commit");
364                }
365            } finally {
366                if (result == XAResource.XA_RDONLY) {
367                    afterCompletion();
368                    synchronized (this) {
369                        status = Status.STATUS_NO_TRANSACTION;
370                    }
371                }
372            }
373            return result;
374        }
375    
376        //used from XATerminator for commit phase of non-readonly remotely controlled tx.
377        void preparedCommit() throws SystemException {
378            try {
379                commitResources(resourceManagers);
380            } finally {
381                afterCompletion();
382                synchronized (this) {
383                    status = Status.STATUS_NO_TRANSACTION;
384                }
385            }
386        }
387    
388        //helper method used by Transaction.commit and XATerminator prepare.
389        private void beforePrepare() {
390            synchronized (this) {
391                switch (status) {
392                    case Status.STATUS_ACTIVE:
393                    case Status.STATUS_MARKED_ROLLBACK:
394                        break;
395                    default:
396                        throw new IllegalStateException("Status is " + getStateString(status));
397                }
398            }
399    
400            beforeCompletion();
401            endResources();
402        }
403    
404    
405        //helper method used by Transaction.commit and XATerminator prepare.
406        private boolean internalPrepare() throws SystemException {
407    
408            for (Iterator rms = resourceManagers.iterator(); rms.hasNext();) {
409                synchronized (this) {
410                    if (status != Status.STATUS_PREPARING) {
411                        // we were marked for rollback
412                        break;
413                    }
414                }
415                TransactionBranch manager = (TransactionBranch) rms.next();
416                try {
417                    int vote = manager.getCommitter().prepare(manager.getBranchId());
418                    if (vote == XAResource.XA_RDONLY) {
419                        // we don't need to consider this RM any more
420                        rms.remove();
421                    }
422                } catch (XAException e) {
423                    synchronized (this) {
424                        status = Status.STATUS_MARKED_ROLLBACK;
425                        //TODO document why this is true from the spec.
426                        //XAException during prepare means we can assume resource is rolled back.
427                        rms.remove();
428                        break;
429                    }
430                }
431            }
432    
433            // decision time...
434            boolean willCommit;
435            synchronized (this) {
436                willCommit = (status != Status.STATUS_MARKED_ROLLBACK);
437                if (willCommit) {
438                    status = Status.STATUS_PREPARED;
439                }
440            }
441            // log our decision
442            if (willCommit && !resourceManagers.isEmpty()) {
443                try {
444                    logMark = txnLog.prepare(xid, resourceManagers);
445                } catch (LogException e) {
446                    try {
447                        rollbackResources(resourceManagers);
448                    } catch (Exception se) {
449                        log.error("Unable to rollback after failure to log prepare", se.getCause());
450                    }
451                    throw (SystemException) new SystemException("Error logging prepare; transaction was rolled back)").initCause(e);
452                }
453            }
454            return willCommit;
455        }
456    
457        public void rollback() throws IllegalStateException, SystemException {
458            List rms;
459            synchronized (this) {
460                switch (status) {
461                    case Status.STATUS_ACTIVE:
462                        status = Status.STATUS_MARKED_ROLLBACK;
463                        break;
464                    case Status.STATUS_MARKED_ROLLBACK:
465                        break;
466                    default:
467                        throw new IllegalStateException("Status is " + getStateString(status));
468                }
469                rms = resourceManagers;
470            }
471    
472            beforeCompletion();
473            endResources();
474            try {
475                rollbackResources(rms);
476                //only write rollback record if we have already written prepare record.
477                if (logMark != null) {
478                    try {
479                        txnLog.rollback(xid, logMark);
480                    } catch (LogException e) {
481                        try {
482                            rollbackResources(rms);
483                        } catch (Exception se) {
484                            log.error("Unable to rollback after failure to log decision", se.getCause());
485                        }
486                        throw (SystemException) new SystemException("Error logging rollback").initCause(e);
487                    }
488                }
489            } finally {
490                afterCompletion();
491                synchronized (this) {
492                    status = Status.STATUS_NO_TRANSACTION;
493                }
494            }
495        }
496    
497        private void beforeCompletion() {
498            beforeCompletion(syncList);
499            beforeCompletion(interposedSyncList);
500        }
501    
502        private void beforeCompletion(List syncs) {
503            int i = 0;
504            while (true) {
505                Synchronization synch;
506                synchronized (this) {
507                    if (i == syncs.size()) {
508                        return;
509                    } else {
510                        synch = (Synchronization) syncs.get(i++);
511                    }
512                }
513                try {
514                    synch.beforeCompletion();
515                } catch (Exception e) {
516                    log.warn("Unexpected exception from beforeCompletion; transaction will roll back", e);
517                    synchronized (this) {
518                        status = Status.STATUS_MARKED_ROLLBACK;
519                    }
520                }
521            }
522        }
523    
524        private void afterCompletion() {
525            // this does not synchronize because nothing can modify our state at this time
526            afterCompletion(interposedSyncList);
527            afterCompletion(syncList);
528        }
529    
530        private void afterCompletion(List syncs) {
531            for (Iterator i = syncs.iterator(); i.hasNext();) {
532                Synchronization synch = (Synchronization) i.next();
533                try {
534                    synch.afterCompletion(status);
535                } catch (Exception e) {
536                    log.warn("Unexpected exception from afterCompletion; continuing", e);
537                }
538            }
539        }
540    
541        private void endResources() {
542            endResources(activeXaResources);
543            endResources(suspendedXaResources);
544        }
545    
546        private void endResources(IdentityHashMap resourceMap) {
547            while (true) {
548                XAResource xaRes;
549                TransactionBranch manager;
550                int flags;
551                synchronized (this) {
552                    Set entrySet = resourceMap.entrySet();
553                    if (entrySet.isEmpty()) {
554                        return;
555                    }
556                    Map.Entry entry = (Map.Entry) entrySet.iterator().next();
557                    xaRes = (XAResource) entry.getKey();
558                    manager = (TransactionBranch) entry.getValue();
559                    flags = (status == Status.STATUS_MARKED_ROLLBACK) ? XAResource.TMFAIL : XAResource.TMSUCCESS;
560                    resourceMap.remove(xaRes);
561                }
562                try {
563                    xaRes.end(manager.getBranchId(), flags);
564                } catch (XAException e) {
565                    log.warn("Error ending association for XAResource " + xaRes + "; transaction will roll back. XA error code: " + e.errorCode, e);
566                    synchronized (this) {
567                        status = Status.STATUS_MARKED_ROLLBACK;
568                    }
569                }
570            }
571        }
572    
573        private void rollbackResources(List rms) throws SystemException {
574            SystemException cause = null;
575            synchronized (this) {
576                status = Status.STATUS_ROLLING_BACK;
577            }
578            for (Iterator i = rms.iterator(); i.hasNext();) {
579                TransactionBranch manager = (TransactionBranch) i.next();
580                try {
581                    manager.getCommitter().rollback(manager.getBranchId());
582                } catch (XAException e) {
583                    log.error("Unexpected exception rolling back " + manager.getCommitter() + "; continuing with rollback", e);
584                    if (cause == null) {
585                        cause = new SystemException(e.errorCode);
586                    }
587                }
588            }
589            synchronized (this) {
590                status = Status.STATUS_ROLLEDBACK;
591            }
592            if (cause != null) {
593                throw cause;
594            }
595        }
596    
597        private void commitResources(List rms) throws SystemException {
598            SystemException cause = null;
599            synchronized (this) {
600                status = Status.STATUS_COMMITTING;
601            }
602            for (Iterator i = rms.iterator(); i.hasNext();) {
603                TransactionBranch manager = (TransactionBranch) i.next();
604                try {
605                    manager.getCommitter().commit(manager.getBranchId(), false);
606                } catch (XAException e) {
607                    log.error("Unexpected exception committing" + manager.getCommitter() + "; continuing to commit other RMs", e);
608                    if (cause == null) {
609                        cause = new SystemException(e.errorCode);
610                    }
611                }
612            }
613            //if all resources were read only, we didn't write a prepare record.
614            if (!rms.isEmpty()) {
615                try {
616                    txnLog.commit(xid, logMark);
617                } catch (LogException e) {
618                    log.error("Unexpected exception logging commit completion for xid " + xid, e);
619                    throw (SystemException) new SystemException("Unexpected error logging commit completion for xid " + xid).initCause(e);
620                }
621            }
622            synchronized (this) {
623                status = Status.STATUS_COMMITTED;
624            }
625            if (cause != null) {
626                throw cause;
627            }
628        }
629    
630        private static String getStateString(int status) {
631            switch (status) {
632                case Status.STATUS_ACTIVE:
633                    return "STATUS_ACTIVE";
634                case Status.STATUS_PREPARING:
635                    return "STATUS_PREPARING";
636                case Status.STATUS_PREPARED:
637                    return "STATUS_PREPARED";
638                case Status.STATUS_MARKED_ROLLBACK:
639                    return "STATUS_MARKED_ROLLBACK";
640                case Status.STATUS_ROLLING_BACK:
641                    return "STATUS_ROLLING_BACK";
642                case Status.STATUS_COMMITTING:
643                    return "STATUS_COMMITTING";
644                case Status.STATUS_COMMITTED:
645                    return "STATUS_COMMITTED";
646                case Status.STATUS_ROLLEDBACK:
647                    return "STATUS_ROLLEDBACK";
648                case Status.STATUS_NO_TRANSACTION:
649                    return "STATUS_NO_TRANSACTION";
650                case Status.STATUS_UNKNOWN:
651                    return "STATUS_UNKNOWN";
652                default:
653                    throw new AssertionError();
654            }
655        }
656    
657        public boolean equals(Object obj) {
658            if (obj instanceof TransactionImpl) {
659                TransactionImpl other = (TransactionImpl) obj;
660                return xid.equals(other.xid);
661            } else {
662                return false;
663            }
664        }
665    
666        //when used from recovery, do not add manager to active or suspended resource maps.
667        // The xaresources have already been ended with TMSUCCESS.
668        public TransactionBranch addBranchXid(XAResource xaRes, Xid branchId) {
669            TransactionBranch manager = new TransactionBranch(xaRes, branchId);
670            resourceManagers.add(manager);
671            return manager;
672        }
673    
674        private static class TransactionBranch implements TransactionBranchInfo {
675            private final XAResource committer;
676            private final Xid branchId;
677    
678            public TransactionBranch(XAResource xaRes, Xid branchId) {
679                committer = xaRes;
680                this.branchId = branchId;
681            }
682    
683            public XAResource getCommitter() {
684                return committer;
685            }
686    
687            public Xid getBranchId() {
688                return branchId;
689            }
690    
691            public String getResourceName() {
692                if (committer instanceof NamedXAResource) {
693                    return ((NamedXAResource) committer).getName();
694                } else {
695                    // if it isn't a named resource should we really stop all processing here!
696                    // Maybe this would be better to handle else where and do we really want to prevent all processing of transactions?
697                    log.error("Please correct the integration and supply a NamedXAResource", new IllegalStateException("Cannot log transactions as " + committer + " is not a NamedXAResource."));
698                    return committer.toString();
699                }
700            }
701    
702            public Xid getBranchXid() {
703                return branchId;
704            }
705        }
706    
707    
708    }