001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.geronimo.transaction.manager; 019 020 import java.util.ArrayList; 021 import java.util.HashMap; 022 import java.util.Iterator; 023 import java.util.List; 024 import java.util.Map; 025 026 import javax.transaction.*; 027 import javax.transaction.xa.XAException; 028 import javax.transaction.xa.Xid; 029 030 import java.util.concurrent.ConcurrentHashMap; 031 import java.util.concurrent.CopyOnWriteArrayList; 032 import java.util.concurrent.atomic.AtomicLong; 033 import org.apache.commons.logging.Log; 034 import org.apache.commons.logging.LogFactory; 035 import org.apache.geronimo.transaction.log.UnrecoverableLog; 036 037 /** 038 * Simple implementation of a transaction manager. 039 * 040 * @version $Rev: 584554 $ $Date: 2007-10-14 17:19:58 +0200 (Sun, 14 Oct 2007) $ 041 */ 042 public class TransactionManagerImpl implements TransactionManager, UserTransaction, TransactionSynchronizationRegistry, XidImporter, MonitorableTransactionManager, RecoverableTransactionManager { 043 private static final Log log = LogFactory.getLog(TransactionManagerImpl.class); 044 protected static final int DEFAULT_TIMEOUT = 600; 045 protected static final byte[] DEFAULT_TM_ID = new byte[] {71,84,77,73,68}; 046 047 final TransactionLog transactionLog; 048 final XidFactory xidFactory; 049 private final int defaultTransactionTimeoutMilliseconds; 050 private final ThreadLocal transactionTimeoutMilliseconds = new ThreadLocal(); 051 private final ThreadLocal threadTx = new ThreadLocal(); 052 private final ConcurrentHashMap associatedTransactions = new ConcurrentHashMap(); 053 private static final Log recoveryLog = LogFactory.getLog("RecoveryController"); 054 final Recovery recovery; 055 private final CopyOnWriteArrayList transactionAssociationListeners = new CopyOnWriteArrayList(); 056 private List recoveryErrors = new ArrayList(); 057 // statistics 058 private AtomicLong totalCommits = new AtomicLong(0); 059 private AtomicLong totalRollBacks = new AtomicLong(0); 060 private AtomicLong activeCount = new AtomicLong(0); 061 062 public TransactionManagerImpl() throws XAException { 063 this(DEFAULT_TIMEOUT, 064 null, 065 null 066 ); 067 } 068 069 public TransactionManagerImpl(int defaultTransactionTimeoutSeconds) throws XAException { 070 this(defaultTransactionTimeoutSeconds, 071 null, 072 null 073 ); 074 } 075 076 public TransactionManagerImpl(int defaultTransactionTimeoutSeconds, TransactionLog transactionLog) throws XAException { 077 this(defaultTransactionTimeoutSeconds, 078 null, 079 transactionLog 080 ); 081 } 082 083 public TransactionManagerImpl(int defaultTransactionTimeoutSeconds, XidFactory xidFactory, TransactionLog transactionLog) throws XAException { 084 if (defaultTransactionTimeoutSeconds <= 0) { 085 throw new IllegalArgumentException("defaultTransactionTimeoutSeconds must be positive: attempted value: " + defaultTransactionTimeoutSeconds); 086 } 087 this.defaultTransactionTimeoutMilliseconds = defaultTransactionTimeoutSeconds * 1000; 088 089 if (transactionLog == null) { 090 this.transactionLog = new UnrecoverableLog(); 091 } else { 092 this.transactionLog = transactionLog; 093 } 094 095 if (xidFactory != null) { 096 this.xidFactory = xidFactory; 097 } else { 098 this.xidFactory = new XidFactoryImpl(DEFAULT_TM_ID); 099 } 100 101 recovery = new RecoveryImpl(this.transactionLog, this.xidFactory); 102 recovery.recoverLog(); 103 } 104 105 public Transaction getTransaction() { 106 return (Transaction) threadTx.get(); 107 } 108 109 private void associate(TransactionImpl tx) throws InvalidTransactionException { 110 if (tx == null) throw new NullPointerException("tx is null"); 111 112 Object existingAssociation = associatedTransactions.putIfAbsent(tx, Thread.currentThread()); 113 if (existingAssociation != null) { 114 throw new InvalidTransactionException("Specified transaction is already associated with another thread"); 115 } 116 threadTx.set(tx); 117 fireThreadAssociated(tx); 118 activeCount.getAndIncrement(); 119 } 120 121 private void unassociate() { 122 Transaction tx = getTransaction(); 123 if (tx != null) { 124 associatedTransactions.remove(tx); 125 threadTx.set(null); 126 fireThreadUnassociated(tx); 127 activeCount.getAndDecrement(); 128 } 129 } 130 131 public void setTransactionTimeout(int seconds) throws SystemException { 132 if (seconds < 0) { 133 throw new SystemException("transaction timeout must be positive or 0 to reset to default"); 134 } 135 if (seconds == 0) { 136 transactionTimeoutMilliseconds.set(null); 137 } else { 138 transactionTimeoutMilliseconds.set(new Long(seconds * 1000)); 139 } 140 } 141 142 public int getStatus() throws SystemException { 143 Transaction tx = getTransaction(); 144 return (tx != null) ? tx.getStatus() : Status.STATUS_NO_TRANSACTION; 145 } 146 147 public void begin() throws NotSupportedException, SystemException { 148 begin(getTransactionTimeoutMilliseconds(0L)); 149 } 150 151 public Transaction begin(long transactionTimeoutMilliseconds) throws NotSupportedException, SystemException { 152 if (getStatus() != Status.STATUS_NO_TRANSACTION) { 153 throw new NotSupportedException("Nested Transactions are not supported"); 154 } 155 TransactionImpl tx = new TransactionImpl(xidFactory, transactionLog, getTransactionTimeoutMilliseconds(transactionTimeoutMilliseconds)); 156 // timeoutTimer.schedule(tx, getTransactionTimeoutMilliseconds(transactionTimeoutMilliseconds)); 157 try { 158 associate(tx); 159 } catch (InvalidTransactionException e) { 160 // should not be possible since we just created that transaction and no one has a reference yet 161 throw (SystemException)new SystemException("Internal error: associate threw an InvalidTransactionException for a newly created transaction").initCause(e); 162 } 163 // Todo: Verify if this is correct thing to do. Use default timeout for next transaction. 164 this.transactionTimeoutMilliseconds.set(null); 165 return tx; 166 } 167 168 public Transaction suspend() throws SystemException { 169 Transaction tx = getTransaction(); 170 if (tx != null) { 171 unassociate(); 172 } 173 return tx; 174 } 175 176 public void resume(Transaction tx) throws IllegalStateException, InvalidTransactionException, SystemException { 177 if (getTransaction() != null) { 178 throw new IllegalStateException("Thread already associated with another transaction"); 179 } 180 if (!(tx instanceof TransactionImpl)) { 181 throw new InvalidTransactionException("Cannot resume foreign transaction: " + tx); 182 } 183 associate((TransactionImpl) tx); 184 } 185 186 public Object getResource(Object key) { 187 TransactionImpl tx = getActiveTransactionImpl(); 188 return tx.getResource(key); 189 } 190 191 private TransactionImpl getActiveTransactionImpl() { 192 TransactionImpl tx = (TransactionImpl)threadTx.get(); 193 if (tx == null) { 194 throw new IllegalStateException("No tx on thread"); 195 } 196 if (tx.getStatus() != Status.STATUS_ACTIVE && tx.getStatus() != Status.STATUS_MARKED_ROLLBACK) { 197 throw new IllegalStateException("Transaction " + tx + " is not active"); 198 } 199 return tx; 200 } 201 202 public boolean getRollbackOnly() { 203 TransactionImpl tx = getActiveTransactionImpl(); 204 return tx.getRollbackOnly(); 205 } 206 207 public Object getTransactionKey() { 208 TransactionImpl tx = getActiveTransactionImpl(); 209 return tx.getTransactionKey(); 210 } 211 212 public int getTransactionStatus() { 213 TransactionImpl tx = (TransactionImpl) getTransaction(); 214 return tx == null? Status.STATUS_NO_TRANSACTION: tx.getTransactionStatus(); 215 } 216 217 public void putResource(Object key, Object value) { 218 TransactionImpl tx = getActiveTransactionImpl(); 219 tx.putResource(key, value); 220 } 221 222 /** 223 * jta 1.1 method so the jpa implementations can be told to flush their caches. 224 * @param synchronization 225 */ 226 public void registerInterposedSynchronization(Synchronization synchronization) { 227 TransactionImpl tx = getActiveTransactionImpl(); 228 tx.registerInterposedSynchronization(synchronization); 229 } 230 231 public void setRollbackOnly() throws IllegalStateException { 232 TransactionImpl tx = (TransactionImpl) threadTx.get(); 233 if (tx == null) { 234 throw new IllegalStateException("No transaction associated with current thread"); 235 } 236 tx.setRollbackOnly(); 237 } 238 239 public void commit() throws HeuristicMixedException, HeuristicRollbackException, IllegalStateException, RollbackException, SecurityException, SystemException { 240 Transaction tx = getTransaction(); 241 if (tx == null) { 242 throw new IllegalStateException("No transaction associated with current thread"); 243 } 244 try { 245 tx.commit(); 246 } finally { 247 unassociate(); 248 } 249 totalCommits.getAndIncrement(); 250 } 251 252 public void rollback() throws IllegalStateException, SecurityException, SystemException { 253 Transaction tx = getTransaction(); 254 if (tx == null) { 255 throw new IllegalStateException("No transaction associated with current thread"); 256 } 257 try { 258 tx.rollback(); 259 } finally { 260 unassociate(); 261 } 262 totalRollBacks.getAndIncrement(); 263 } 264 265 //XidImporter implementation 266 public Transaction importXid(Xid xid, long transactionTimeoutMilliseconds) throws XAException, SystemException { 267 if (transactionTimeoutMilliseconds < 0) { 268 throw new SystemException("transaction timeout must be positive or 0 to reset to default"); 269 } 270 TransactionImpl tx = new TransactionImpl(xid, xidFactory, transactionLog, getTransactionTimeoutMilliseconds(transactionTimeoutMilliseconds)); 271 return tx; 272 } 273 274 public void commit(Transaction tx, boolean onePhase) throws XAException { 275 if (onePhase) { 276 try { 277 tx.commit(); 278 } catch (HeuristicMixedException e) { 279 throw (XAException) new XAException().initCause(e); 280 } catch (HeuristicRollbackException e) { 281 throw (XAException) new XAException().initCause(e); 282 } catch (RollbackException e) { 283 throw (XAException) new XAException().initCause(e); 284 } catch (SecurityException e) { 285 throw (XAException) new XAException().initCause(e); 286 } catch (SystemException e) { 287 throw (XAException) new XAException().initCause(e); 288 } 289 } else { 290 try { 291 ((TransactionImpl) tx).preparedCommit(); 292 } catch (SystemException e) { 293 throw (XAException) new XAException().initCause(e); 294 } 295 } 296 totalCommits.getAndIncrement(); 297 } 298 299 public void forget(Transaction tx) throws XAException { 300 //TODO implement this! 301 } 302 303 public int prepare(Transaction tx) throws XAException { 304 try { 305 return ((TransactionImpl) tx).prepare(); 306 } catch (SystemException e) { 307 throw (XAException) new XAException().initCause(e); 308 } catch (RollbackException e) { 309 throw (XAException) new XAException().initCause(e); 310 } 311 } 312 313 public void rollback(Transaction tx) throws XAException { 314 try { 315 tx.rollback(); 316 } catch (IllegalStateException e) { 317 throw (XAException) new XAException().initCause(e); 318 } catch (SystemException e) { 319 throw (XAException) new XAException().initCause(e); 320 } 321 totalRollBacks.getAndIncrement(); 322 } 323 324 long getTransactionTimeoutMilliseconds(long transactionTimeoutMilliseconds) { 325 if (transactionTimeoutMilliseconds != 0) { 326 return transactionTimeoutMilliseconds; 327 } 328 Long timeout = (Long) this.transactionTimeoutMilliseconds.get(); 329 if (timeout != null) { 330 return timeout.longValue(); 331 } 332 return defaultTransactionTimeoutMilliseconds; 333 } 334 335 //Recovery 336 public void recoveryError(Exception e) { 337 recoveryLog.error(e); 338 recoveryErrors.add(e); 339 } 340 341 public void recoverResourceManager(NamedXAResource xaResource) { 342 try { 343 recovery.recoverResourceManager(xaResource); 344 } catch (XAException e) { 345 recoveryError(e); 346 } 347 } 348 349 public Map getExternalXids() { 350 return new HashMap(recovery.getExternalXids()); 351 } 352 353 public void addTransactionAssociationListener(TransactionManagerMonitor listener) { 354 transactionAssociationListeners.addIfAbsent(listener); 355 } 356 357 public void removeTransactionAssociationListener(TransactionManagerMonitor listener) { 358 transactionAssociationListeners.remove(listener); 359 } 360 361 protected void fireThreadAssociated(Transaction tx) { 362 for (Iterator iterator = transactionAssociationListeners.iterator(); iterator.hasNext();) { 363 TransactionManagerMonitor listener = (TransactionManagerMonitor) iterator.next(); 364 try { 365 listener.threadAssociated(tx); 366 } catch (Exception e) { 367 log.warn("Error calling transaction association listener", e); 368 } 369 } 370 } 371 372 protected void fireThreadUnassociated(Transaction tx) { 373 for (Iterator iterator = transactionAssociationListeners.iterator(); iterator.hasNext();) { 374 TransactionManagerMonitor listener = (TransactionManagerMonitor) iterator.next(); 375 try { 376 listener.threadUnassociated(tx); 377 } catch (Exception e) { 378 log.warn("Error calling transaction association listener", e); 379 } 380 } 381 } 382 383 /** 384 * Returns the number of active transactions. 385 */ 386 public long getActiveCount() { 387 return activeCount.longValue(); 388 } 389 390 /** 391 * Return the number of total commits 392 */ 393 public long getTotalCommits() { 394 return totalCommits.longValue(); 395 } 396 397 /** 398 * Returns the number of total rollbacks 399 */ 400 public long getTotalRollbacks() { 401 return totalRollBacks.longValue(); 402 } 403 404 /** 405 * Reset statistics 406 */ 407 public void resetStatistics() { 408 totalCommits.getAndSet(0); 409 totalRollBacks.getAndSet(0); 410 } 411 }