001    /**
002     *  Licensed to the Apache Software Foundation (ASF) under one or more
003     *  contributor license agreements.  See the NOTICE file distributed with
004     *  this work for additional information regarding copyright ownership.
005     *  The ASF licenses this file to You under the Apache License, Version 2.0
006     *  (the "License"); you may not use this file except in compliance with
007     *  the License.  You may obtain a copy of the License at
008     *
009     *     http://www.apache.org/licenses/LICENSE-2.0
010     *
011     *  Unless required by applicable law or agreed to in writing, software
012     *  distributed under the License is distributed on an "AS IS" BASIS,
013     *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     *  See the License for the specific language governing permissions and
015     *  limitations under the License.
016     */
017    
018    package org.apache.geronimo.connector.work;
019    
020    import java.util.concurrent.CountDownLatch;
021    
022    import javax.resource.spi.work.ExecutionContext;
023    import javax.resource.spi.work.Work;
024    import javax.resource.spi.work.WorkAdapter;
025    import javax.resource.spi.work.WorkCompletedException;
026    import javax.resource.spi.work.WorkEvent;
027    import javax.resource.spi.work.WorkException;
028    import javax.resource.spi.work.WorkListener;
029    import javax.resource.spi.work.WorkManager;
030    import javax.resource.spi.work.WorkRejectedException;
031    import javax.transaction.InvalidTransactionException;
032    import javax.transaction.SystemException;
033    import javax.transaction.xa.XAException;
034    
035    import org.apache.commons.logging.Log;
036    import org.apache.commons.logging.LogFactory;
037    import org.apache.geronimo.transaction.manager.ImportedTransactionActiveException;
038    import org.apache.geronimo.transaction.manager.XAWork;
039    
040    /**
041     * Work wrapper providing an execution context to a Work instance.
042     *
043     * @version $Rev: 550546 $ $Date: 2007-06-25 18:52:11 +0200 (Mon, 25 Jun 2007) $
044     */
045    public class WorkerContext implements Work {
046    
047        private static final Log log = LogFactory.getLog(WorkerContext.class);
048    
049        /**
050         * Null WorkListener used as the default WorkListener.
051         */
052        private static final WorkListener NULL_WORK_LISTENER = new WorkAdapter() {
053            public void workRejected(WorkEvent event) {
054                if (event.getException() != null) {
055                    if (event.getException() instanceof WorkCompletedException && event.getException().getCause() != null) {
056                        log.error(event.getWork().toString(), event.getException().getCause());
057                    } else {
058                        log.error(event.getWork().toString(), event.getException());
059                    }
060                }
061            }
062        };
063    
064        /**
065         * Priority of the thread, which will execute this work.
066         */
067        private int threadPriority;
068    
069        /**
070         * Actual work to be executed.
071         */
072        private Work adaptee;
073    
074        /**
075         * Indicates if this work has been accepted.
076         */
077        private boolean isAccepted;
078    
079        /**
080         * System.currentTimeMillis() when the wrapped Work has been accepted.
081         */
082        private long acceptedTime;
083    
084        /**
085         * Number of times that the execution of this work has been tried.
086         */
087        private int nbRetry;
088    
089        /**
090         * Time duration (in milliseconds) within which the execution of the Work
091         * instance must start.
092         */
093        private long startTimeOut;
094    
095        /**
096         * Execution context of the actual work to be executed.
097         */
098        private final ExecutionContext executionContext;
099    
100        private final XAWork xaWork;
101    
102        /**
103         * Listener to be notified during the life-cycle of the work treatment.
104         */
105        private WorkListener workListener = NULL_WORK_LISTENER;
106    
107        /**
108         * Work exception, if any.
109         */
110        private WorkException workException;
111    
112        /**
113         * A latch, which is released when the work is started.
114         */
115        private CountDownLatch startLatch = new CountDownLatch(1);
116    
117        /**
118         * A latch, which is released when the work is completed.
119         */
120        private CountDownLatch endLatch = new CountDownLatch(1);
121    
122        /**
123         * Create a WorkWrapper.
124         *
125         * @param work                      Work to be wrapped.
126         * @param xaWork
127         */
128        public WorkerContext(Work work, XAWork xaWork) {
129            adaptee = work;
130            executionContext = null;
131            this.xaWork = xaWork;
132        }
133    
134        /**
135         * Create a WorkWrapper with the specified execution context.
136         *
137         * @param aWork         Work to be wrapped.
138         * @param aStartTimeout a time duration (in milliseconds) within which the
139         *                      execution of the Work instance must start.
140         * @param execContext   an object containing the execution context with which
141         *                      the submitted Work instance must be executed.
142         * @param workListener  an object which would be notified when the various
143         *                      Work processing events (work accepted, work rejected, work started,
144         */
145        public WorkerContext(Work aWork,
146                             long aStartTimeout,
147                             ExecutionContext execContext,
148                             XAWork xaWork,
149                             WorkListener workListener) {
150            adaptee = aWork;
151            startTimeOut = aStartTimeout;
152            executionContext = execContext;
153            this.xaWork = xaWork;
154            if (null != workListener) {
155                this.workListener = workListener;
156            }
157        }
158    
159        /* (non-Javadoc)
160         * @see javax.resource.spi.work.Work#release()
161         */
162        public void release() {
163            adaptee.release();
164        }
165    
166        /**
167         * Defines the thread priority level of the thread, which will be dispatched
168         * to process this work. This priority level must be the same one for a
169         * given resource adapter.
170         *
171         * @param aPriority Priority of the thread to be used to process the wrapped
172         *                  Work instance.
173         */
174        public void setThreadPriority(int aPriority) {
175            threadPriority = aPriority;
176        }
177    
178        /**
179         * Gets the priority level of the thread, which will be dispatched
180         * to process this work. This priority level must be the same one for a
181         * given resource adapter.
182         *
183         * @return The priority level of the thread to be dispatched to
184         *         process the wrapped Work instance.
185         */
186        public int getThreadPriority() {
187            return threadPriority;
188        }
189    
190        /**
191         * Call-back method used by a Work executor in order to notify this
192         * instance that the wrapped Work instance has been accepted.
193         *
194         * @param anObject Object on which the event initially occurred. It should
195         *                 be the work executor.
196         */
197        public synchronized void workAccepted(Object anObject) {
198            isAccepted = true;
199            acceptedTime = System.currentTimeMillis();
200            workListener.workAccepted(new WorkEvent(anObject,
201                    WorkEvent.WORK_ACCEPTED, adaptee, null));
202        }
203    
204        /**
205         * System.currentTimeMillis() when the Work has been accepted. This method
206         * can be used to compute the duration of a work.
207         *
208         * @return When the work has been accepted.
209         */
210        public synchronized long getAcceptedTime() {
211            return acceptedTime;
212        }
213    
214        /**
215         * Gets the time duration (in milliseconds) within which the execution of
216         * the Work instance must start.
217         *
218         * @return Time out duration.
219         */
220        public long getStartTimeout() {
221            return startTimeOut;
222        }
223    
224        /**
225         * Used by a Work executor in order to know if this work, which should be
226         * accepted but not started has timed out. This method MUST be called prior
227         * to retry the execution of a Work.
228         *
229         * @return true if the Work has timed out and false otherwise.
230         */
231        public synchronized boolean isTimedOut() {
232            assert isAccepted: "The work is not accepted.";
233            // A value of 0 means that the work never times out.
234            //??? really?
235            if (0 == startTimeOut || startTimeOut == WorkManager.INDEFINITE) {
236                return false;
237            }
238            boolean isTimeout = acceptedTime + startTimeOut > 0 &&
239                    System.currentTimeMillis() > acceptedTime + startTimeOut;
240            if (log.isDebugEnabled()) {
241                log.debug(this
242                        + " accepted at "
243                        + acceptedTime
244                        + (isTimeout ? " has timed out." : " has not timed out. ")
245                        + nbRetry
246                        + " retries have been performed.");
247            }
248            if (isTimeout) {
249                workException = new WorkRejectedException(this + " has timed out.",
250                        WorkException.START_TIMED_OUT);
251                workListener.workRejected(new WorkEvent(this,
252                        WorkEvent.WORK_REJECTED,
253                        adaptee,
254                        workException));
255                return true;
256            }
257            nbRetry++;
258            return isTimeout;
259        }
260    
261        /**
262         * Gets the WorkException, if any, thrown during the execution.
263         *
264         * @return WorkException, if any.
265         */
266        public synchronized WorkException getWorkException() {
267            return workException;
268        }
269    
270        /* (non-Javadoc)
271         * @see java.lang.Runnable#run()
272         */
273        public void run() {
274            if (isTimedOut()) {
275                // In case of a time out, one releases the start and end latches
276                // to prevent a dead-lock.
277                startLatch.countDown();
278                endLatch.countDown();
279                return;
280            }
281            // Implementation note: the work listener is notified prior to release
282            // the start lock. This behavior is intentional and seems to be the
283            // more conservative.
284            workListener.workStarted(new WorkEvent(this, WorkEvent.WORK_STARTED, adaptee, null));
285            startLatch.countDown();
286            //Implementation note: we assume this is being called without an interesting TransactionContext,
287            //and ignore/replace whatever is associated with the current thread.
288            try {
289                if (executionContext == null || executionContext.getXid() == null) {
290                    adaptee.run();
291                } else {
292                    try {
293                        long transactionTimeout = executionContext.getTransactionTimeout();
294                        //translate -1 value to 0 to indicate default transaction timeout.
295                        xaWork.begin(executionContext.getXid(), transactionTimeout < 0 ? 0 : transactionTimeout);
296                    } catch (XAException e) {
297                        throw new WorkCompletedException("Transaction import failed for xid " + executionContext.getXid(), WorkCompletedException.TX_RECREATE_FAILED).initCause(e);
298                    } catch (InvalidTransactionException e) {
299                        throw new WorkCompletedException("Transaction import failed for xid " + executionContext.getXid(), WorkCompletedException.TX_RECREATE_FAILED).initCause(e);
300                    } catch (SystemException e) {
301                        throw new WorkCompletedException("Transaction import failed for xid " + executionContext.getXid(), WorkCompletedException.TX_RECREATE_FAILED).initCause(e);
302                    } catch (ImportedTransactionActiveException e) {
303                        throw new WorkCompletedException("Transaction already active for xid " + executionContext.getXid(), WorkCompletedException.TX_CONCURRENT_WORK_DISALLOWED).initCause(e);
304                    }
305                    try {
306                        adaptee.run();
307                    } finally {
308                        xaWork.end(executionContext.getXid());
309                    }
310    
311                }
312                workListener.workCompleted(new WorkEvent(this, WorkEvent.WORK_COMPLETED, adaptee, null));
313            } catch (Throwable e) {
314                workException = (WorkException) (e instanceof WorkCompletedException ? e : new WorkCompletedException("Unknown error", WorkCompletedException.UNDEFINED).initCause(e));
315                workListener.workCompleted(new WorkEvent(this, WorkEvent.WORK_REJECTED, adaptee,
316                        workException));
317            } finally {
318                endLatch.countDown();
319            }
320        }
321    
322        /**
323         * Provides a latch, which can be used to wait the start of a work
324         * execution.
325         *
326         * @return Latch that a caller can acquire to wait for the start of a
327         *         work execution.
328         */
329        public synchronized CountDownLatch provideStartLatch() {
330            return startLatch;
331        }
332    
333        /**
334         * Provides a latch, which can be used to wait the end of a work
335         * execution.
336         *
337         * @return Latch that a caller can acquire to wait for the end of a
338         *         work execution.
339         */
340        public synchronized CountDownLatch provideEndLatch() {
341            return endLatch;
342        }
343    
344        public String toString() {
345            return "Work :" + adaptee;
346        }
347    
348    }