001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.geronimo.connector.work; 019 020 import java.util.concurrent.CountDownLatch; 021 022 import javax.resource.spi.work.ExecutionContext; 023 import javax.resource.spi.work.Work; 024 import javax.resource.spi.work.WorkAdapter; 025 import javax.resource.spi.work.WorkCompletedException; 026 import javax.resource.spi.work.WorkEvent; 027 import javax.resource.spi.work.WorkException; 028 import javax.resource.spi.work.WorkListener; 029 import javax.resource.spi.work.WorkManager; 030 import javax.resource.spi.work.WorkRejectedException; 031 import javax.transaction.InvalidTransactionException; 032 import javax.transaction.SystemException; 033 import javax.transaction.xa.XAException; 034 035 import org.apache.commons.logging.Log; 036 import org.apache.commons.logging.LogFactory; 037 import org.apache.geronimo.transaction.manager.ImportedTransactionActiveException; 038 import org.apache.geronimo.transaction.manager.XAWork; 039 040 /** 041 * Work wrapper providing an execution context to a Work instance. 042 * 043 * @version $Rev: 550546 $ $Date: 2007-06-25 18:52:11 +0200 (Mon, 25 Jun 2007) $ 044 */ 045 public class WorkerContext implements Work { 046 047 private static final Log log = LogFactory.getLog(WorkerContext.class); 048 049 /** 050 * Null WorkListener used as the default WorkListener. 051 */ 052 private static final WorkListener NULL_WORK_LISTENER = new WorkAdapter() { 053 public void workRejected(WorkEvent event) { 054 if (event.getException() != null) { 055 if (event.getException() instanceof WorkCompletedException && event.getException().getCause() != null) { 056 log.error(event.getWork().toString(), event.getException().getCause()); 057 } else { 058 log.error(event.getWork().toString(), event.getException()); 059 } 060 } 061 } 062 }; 063 064 /** 065 * Priority of the thread, which will execute this work. 066 */ 067 private int threadPriority; 068 069 /** 070 * Actual work to be executed. 071 */ 072 private Work adaptee; 073 074 /** 075 * Indicates if this work has been accepted. 076 */ 077 private boolean isAccepted; 078 079 /** 080 * System.currentTimeMillis() when the wrapped Work has been accepted. 081 */ 082 private long acceptedTime; 083 084 /** 085 * Number of times that the execution of this work has been tried. 086 */ 087 private int nbRetry; 088 089 /** 090 * Time duration (in milliseconds) within which the execution of the Work 091 * instance must start. 092 */ 093 private long startTimeOut; 094 095 /** 096 * Execution context of the actual work to be executed. 097 */ 098 private final ExecutionContext executionContext; 099 100 private final XAWork xaWork; 101 102 /** 103 * Listener to be notified during the life-cycle of the work treatment. 104 */ 105 private WorkListener workListener = NULL_WORK_LISTENER; 106 107 /** 108 * Work exception, if any. 109 */ 110 private WorkException workException; 111 112 /** 113 * A latch, which is released when the work is started. 114 */ 115 private CountDownLatch startLatch = new CountDownLatch(1); 116 117 /** 118 * A latch, which is released when the work is completed. 119 */ 120 private CountDownLatch endLatch = new CountDownLatch(1); 121 122 /** 123 * Create a WorkWrapper. 124 * 125 * @param work Work to be wrapped. 126 * @param xaWork 127 */ 128 public WorkerContext(Work work, XAWork xaWork) { 129 adaptee = work; 130 executionContext = null; 131 this.xaWork = xaWork; 132 } 133 134 /** 135 * Create a WorkWrapper with the specified execution context. 136 * 137 * @param aWork Work to be wrapped. 138 * @param aStartTimeout a time duration (in milliseconds) within which the 139 * execution of the Work instance must start. 140 * @param execContext an object containing the execution context with which 141 * the submitted Work instance must be executed. 142 * @param workListener an object which would be notified when the various 143 * Work processing events (work accepted, work rejected, work started, 144 */ 145 public WorkerContext(Work aWork, 146 long aStartTimeout, 147 ExecutionContext execContext, 148 XAWork xaWork, 149 WorkListener workListener) { 150 adaptee = aWork; 151 startTimeOut = aStartTimeout; 152 executionContext = execContext; 153 this.xaWork = xaWork; 154 if (null != workListener) { 155 this.workListener = workListener; 156 } 157 } 158 159 /* (non-Javadoc) 160 * @see javax.resource.spi.work.Work#release() 161 */ 162 public void release() { 163 adaptee.release(); 164 } 165 166 /** 167 * Defines the thread priority level of the thread, which will be dispatched 168 * to process this work. This priority level must be the same one for a 169 * given resource adapter. 170 * 171 * @param aPriority Priority of the thread to be used to process the wrapped 172 * Work instance. 173 */ 174 public void setThreadPriority(int aPriority) { 175 threadPriority = aPriority; 176 } 177 178 /** 179 * Gets the priority level of the thread, which will be dispatched 180 * to process this work. This priority level must be the same one for a 181 * given resource adapter. 182 * 183 * @return The priority level of the thread to be dispatched to 184 * process the wrapped Work instance. 185 */ 186 public int getThreadPriority() { 187 return threadPriority; 188 } 189 190 /** 191 * Call-back method used by a Work executor in order to notify this 192 * instance that the wrapped Work instance has been accepted. 193 * 194 * @param anObject Object on which the event initially occurred. It should 195 * be the work executor. 196 */ 197 public synchronized void workAccepted(Object anObject) { 198 isAccepted = true; 199 acceptedTime = System.currentTimeMillis(); 200 workListener.workAccepted(new WorkEvent(anObject, 201 WorkEvent.WORK_ACCEPTED, adaptee, null)); 202 } 203 204 /** 205 * System.currentTimeMillis() when the Work has been accepted. This method 206 * can be used to compute the duration of a work. 207 * 208 * @return When the work has been accepted. 209 */ 210 public synchronized long getAcceptedTime() { 211 return acceptedTime; 212 } 213 214 /** 215 * Gets the time duration (in milliseconds) within which the execution of 216 * the Work instance must start. 217 * 218 * @return Time out duration. 219 */ 220 public long getStartTimeout() { 221 return startTimeOut; 222 } 223 224 /** 225 * Used by a Work executor in order to know if this work, which should be 226 * accepted but not started has timed out. This method MUST be called prior 227 * to retry the execution of a Work. 228 * 229 * @return true if the Work has timed out and false otherwise. 230 */ 231 public synchronized boolean isTimedOut() { 232 assert isAccepted: "The work is not accepted."; 233 // A value of 0 means that the work never times out. 234 //??? really? 235 if (0 == startTimeOut || startTimeOut == WorkManager.INDEFINITE) { 236 return false; 237 } 238 boolean isTimeout = acceptedTime + startTimeOut > 0 && 239 System.currentTimeMillis() > acceptedTime + startTimeOut; 240 if (log.isDebugEnabled()) { 241 log.debug(this 242 + " accepted at " 243 + acceptedTime 244 + (isTimeout ? " has timed out." : " has not timed out. ") 245 + nbRetry 246 + " retries have been performed."); 247 } 248 if (isTimeout) { 249 workException = new WorkRejectedException(this + " has timed out.", 250 WorkException.START_TIMED_OUT); 251 workListener.workRejected(new WorkEvent(this, 252 WorkEvent.WORK_REJECTED, 253 adaptee, 254 workException)); 255 return true; 256 } 257 nbRetry++; 258 return isTimeout; 259 } 260 261 /** 262 * Gets the WorkException, if any, thrown during the execution. 263 * 264 * @return WorkException, if any. 265 */ 266 public synchronized WorkException getWorkException() { 267 return workException; 268 } 269 270 /* (non-Javadoc) 271 * @see java.lang.Runnable#run() 272 */ 273 public void run() { 274 if (isTimedOut()) { 275 // In case of a time out, one releases the start and end latches 276 // to prevent a dead-lock. 277 startLatch.countDown(); 278 endLatch.countDown(); 279 return; 280 } 281 // Implementation note: the work listener is notified prior to release 282 // the start lock. This behavior is intentional and seems to be the 283 // more conservative. 284 workListener.workStarted(new WorkEvent(this, WorkEvent.WORK_STARTED, adaptee, null)); 285 startLatch.countDown(); 286 //Implementation note: we assume this is being called without an interesting TransactionContext, 287 //and ignore/replace whatever is associated with the current thread. 288 try { 289 if (executionContext == null || executionContext.getXid() == null) { 290 adaptee.run(); 291 } else { 292 try { 293 long transactionTimeout = executionContext.getTransactionTimeout(); 294 //translate -1 value to 0 to indicate default transaction timeout. 295 xaWork.begin(executionContext.getXid(), transactionTimeout < 0 ? 0 : transactionTimeout); 296 } catch (XAException e) { 297 throw new WorkCompletedException("Transaction import failed for xid " + executionContext.getXid(), WorkCompletedException.TX_RECREATE_FAILED).initCause(e); 298 } catch (InvalidTransactionException e) { 299 throw new WorkCompletedException("Transaction import failed for xid " + executionContext.getXid(), WorkCompletedException.TX_RECREATE_FAILED).initCause(e); 300 } catch (SystemException e) { 301 throw new WorkCompletedException("Transaction import failed for xid " + executionContext.getXid(), WorkCompletedException.TX_RECREATE_FAILED).initCause(e); 302 } catch (ImportedTransactionActiveException e) { 303 throw new WorkCompletedException("Transaction already active for xid " + executionContext.getXid(), WorkCompletedException.TX_CONCURRENT_WORK_DISALLOWED).initCause(e); 304 } 305 try { 306 adaptee.run(); 307 } finally { 308 xaWork.end(executionContext.getXid()); 309 } 310 311 } 312 workListener.workCompleted(new WorkEvent(this, WorkEvent.WORK_COMPLETED, adaptee, null)); 313 } catch (Throwable e) { 314 workException = (WorkException) (e instanceof WorkCompletedException ? e : new WorkCompletedException("Unknown error", WorkCompletedException.UNDEFINED).initCause(e)); 315 workListener.workCompleted(new WorkEvent(this, WorkEvent.WORK_REJECTED, adaptee, 316 workException)); 317 } finally { 318 endLatch.countDown(); 319 } 320 } 321 322 /** 323 * Provides a latch, which can be used to wait the start of a work 324 * execution. 325 * 326 * @return Latch that a caller can acquire to wait for the start of a 327 * work execution. 328 */ 329 public synchronized CountDownLatch provideStartLatch() { 330 return startLatch; 331 } 332 333 /** 334 * Provides a latch, which can be used to wait the end of a work 335 * execution. 336 * 337 * @return Latch that a caller can acquire to wait for the end of a 338 * work execution. 339 */ 340 public synchronized CountDownLatch provideEndLatch() { 341 return endLatch; 342 } 343 344 public String toString() { 345 return "Work :" + adaptee; 346 } 347 348 }