001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.geronimo.connector.work; 019 020 import java.util.concurrent.Executor; 021 022 import javax.resource.spi.work.ExecutionContext; 023 import javax.resource.spi.work.Work; 024 import javax.resource.spi.work.WorkCompletedException; 025 import javax.resource.spi.work.WorkException; 026 import javax.resource.spi.work.WorkListener; 027 import javax.resource.spi.work.WorkManager; 028 029 import org.apache.geronimo.connector.work.pool.ScheduleWorkExecutor; 030 import org.apache.geronimo.connector.work.pool.StartWorkExecutor; 031 import org.apache.geronimo.connector.work.pool.SyncWorkExecutor; 032 import org.apache.geronimo.connector.work.pool.WorkExecutor; 033 import org.apache.geronimo.transaction.manager.XAWork; 034 035 /** 036 * WorkManager implementation which uses under the cover three WorkExecutorPool 037 * - one for each synchronization policy - in order to dispatch the submitted 038 * Work instances. 039 * <P> 040 * A WorkManager is a component of the JCA specifications, which allows a 041 * Resource Adapter to submit tasks to an Application Server for execution. 042 * 043 * @version $Rev: 550546 $ $Date: 2007-06-25 18:52:11 +0200 (Mon, 25 Jun 2007) $ 044 */ 045 public class GeronimoWorkManager implements WorkManager { 046 047 // private final static int DEFAULT_POOL_SIZE = 10; 048 049 /** 050 * Pool of threads used by this WorkManager in order to process 051 * the Work instances submitted via the doWork methods. 052 */ 053 private Executor syncWorkExecutorPool; 054 055 /** 056 * Pool of threads used by this WorkManager in order to process 057 * the Work instances submitted via the startWork methods. 058 */ 059 private Executor startWorkExecutorPool; 060 061 /** 062 * Pool of threads used by this WorkManager in order to process 063 * the Work instances submitted via the scheduleWork methods. 064 */ 065 private Executor scheduledWorkExecutorPool; 066 067 private final XAWork transactionManager; 068 069 private final WorkExecutor scheduleWorkExecutor = new ScheduleWorkExecutor(); 070 private final WorkExecutor startWorkExecutor = new StartWorkExecutor(); 071 private final WorkExecutor syncWorkExecutor = new SyncWorkExecutor(); 072 073 /** 074 * Create a WorkManager. 075 */ 076 public GeronimoWorkManager() { 077 this(null, null, null, null); 078 } 079 080 public GeronimoWorkManager(Executor sync, Executor start, Executor sched, XAWork xaWork) { 081 syncWorkExecutorPool = sync; 082 startWorkExecutorPool = start; 083 scheduledWorkExecutorPool = sched; 084 this.transactionManager = xaWork; 085 } 086 087 public void doStart() throws Exception { 088 } 089 090 public void doStop() throws Exception { 091 } 092 093 public void doFail() { 094 try { 095 doStop(); 096 } catch (Exception e) { 097 //TODO what to do? 098 } 099 } 100 101 public Executor getSyncWorkExecutorPool() { 102 return syncWorkExecutorPool; 103 } 104 105 public Executor getStartWorkExecutorPool() { 106 return startWorkExecutorPool; 107 } 108 109 public Executor getScheduledWorkExecutorPool() { 110 return scheduledWorkExecutorPool; 111 } 112 113 /* (non-Javadoc) 114 * @see javax.resource.spi.work.WorkManager#doWork(javax.resource.spi.work.Work) 115 */ 116 public void doWork(Work work) throws WorkException { 117 executeWork(new WorkerContext(work, transactionManager), syncWorkExecutor, syncWorkExecutorPool); 118 } 119 120 /* (non-Javadoc) 121 * @see javax.resource.spi.work.WorkManager#doWork(javax.resource.spi.work.Work, long, javax.resource.spi.work.ExecutionContext, javax.resource.spi.work.WorkListener) 122 */ 123 public void doWork( 124 Work work, 125 long startTimeout, 126 ExecutionContext execContext, 127 WorkListener workListener) 128 throws WorkException { 129 WorkerContext workWrapper = 130 new WorkerContext(work, startTimeout, execContext, transactionManager, workListener); 131 workWrapper.setThreadPriority(Thread.currentThread().getPriority()); 132 executeWork(workWrapper, syncWorkExecutor, syncWorkExecutorPool); 133 } 134 135 /* (non-Javadoc) 136 * @see javax.resource.spi.work.WorkManager#startWork(javax.resource.spi.work.Work) 137 */ 138 public long startWork(Work work) throws WorkException { 139 WorkerContext workWrapper = new WorkerContext(work, transactionManager); 140 workWrapper.setThreadPriority(Thread.currentThread().getPriority()); 141 executeWork(workWrapper, startWorkExecutor, startWorkExecutorPool); 142 return System.currentTimeMillis() - workWrapper.getAcceptedTime(); 143 } 144 145 /* (non-Javadoc) 146 * @see javax.resource.spi.work.WorkManager#startWork(javax.resource.spi.work.Work, long, javax.resource.spi.work.ExecutionContext, javax.resource.spi.work.WorkListener) 147 */ 148 public long startWork( 149 Work work, 150 long startTimeout, 151 ExecutionContext execContext, 152 WorkListener workListener) 153 throws WorkException { 154 WorkerContext workWrapper = 155 new WorkerContext(work, startTimeout, execContext, transactionManager, workListener); 156 workWrapper.setThreadPriority(Thread.currentThread().getPriority()); 157 executeWork(workWrapper, startWorkExecutor, startWorkExecutorPool); 158 return System.currentTimeMillis() - workWrapper.getAcceptedTime(); 159 } 160 161 /* (non-Javadoc) 162 * @see javax.resource.spi.work.WorkManager#scheduleWork(javax.resource.spi.work.Work) 163 */ 164 public void scheduleWork(Work work) throws WorkException { 165 WorkerContext workWrapper = new WorkerContext(work, transactionManager); 166 workWrapper.setThreadPriority(Thread.currentThread().getPriority()); 167 executeWork(workWrapper, scheduleWorkExecutor, scheduledWorkExecutorPool); 168 } 169 170 /* (non-Javadoc) 171 * @see javax.resource.spi.work.WorkManager#scheduleWork(javax.resource.spi.work.Work, long, javax.resource.spi.work.ExecutionContext, javax.resource.spi.work.WorkListener) 172 */ 173 public void scheduleWork( 174 Work work, 175 long startTimeout, 176 ExecutionContext execContext, 177 WorkListener workListener) 178 throws WorkException { 179 WorkerContext workWrapper = 180 new WorkerContext(work, startTimeout, execContext, transactionManager, workListener); 181 workWrapper.setThreadPriority(Thread.currentThread().getPriority()); 182 executeWork(workWrapper, scheduleWorkExecutor, scheduledWorkExecutorPool); 183 } 184 185 /** 186 * Execute the specified Work. 187 * 188 * @param work Work to be executed. 189 * 190 * @exception WorkException Indicates that the Work execution has been 191 * unsuccessful. 192 */ 193 private void executeWork(WorkerContext work, WorkExecutor workExecutor, Executor pooledExecutor) throws WorkException { 194 work.workAccepted(this); 195 try { 196 workExecutor.doExecute(work, pooledExecutor); 197 WorkException exception = work.getWorkException(); 198 if (null != exception) { 199 throw exception; 200 } 201 } catch (InterruptedException e) { 202 WorkCompletedException wcj = new WorkCompletedException( 203 "The execution has been interrupted.", e); 204 wcj.setErrorCode(WorkException.INTERNAL); 205 throw wcj; 206 } 207 } 208 209 }