有这样一个场景:前端请求创建了一个批处理任务,请求会返回任务的进度,然后前端会根据进度定时重新获取最新进度,直到进度为1。现在出现了一个问题:在进度没有达到1的情况下,前端一直在发请求获取最新进度,但是后端返回的进度信息却一直不更新了。
一、伪代码
public class TaskTest {
private static final ExecutorService executor = Executors.newSingleThreadExecutor();
@RequestMapping("/detection")
public Object detection(List<Integer> serviceIds) {
return doDetection(serviceIds);
}
@RequestMapping("getRate")
public Object getTaskRate(String taskId){
// get rate from cache by taskId
return rate;
}
String doDetection(List<Integer> serviceIds){
executor.submit(new DetectionTask(serviceIds));
String taskId = ""; // generate taskId cached
return taskId;
}
static class DetectionTask implements Runnable{
List<Integer> serviceIds;
public DetectionTask(List<Integer> serviceIds) {
this.serviceIds = serviceIds;
}
@Override
public void run() {
// todo
List<ServiceInfo> serviceInfos = xxService.findServiceInfoByIds(serviceIds);
for (ServiceInfo serviceInfo : serviceInfos) {
if (serviceInfo.type.equals(ServiceInfo.ServiceType.WMS)){
// todo
}
}
}
}
static class ServiceInfo{
String name;
ServiceType type;
// ...
enum ServiceType {
MAP,
REST,
WMS,
OTHERS
}
}
}
就是这样一份代码,在测试时出现了问题,有一定几率后端返回的进度会卡在一个小于1的值,导致前端一直发请求。
二、调试
在代码的调试过程中,发现第35行代码小概率会卡住,也就是if (serviceInfo.type.equals(ServiceInfo.ServiceType.WMS))这个条件判断,第一反应会不会是抛了异常?经过与数据库数据的比对之后,应该就是这里的问题。同时又来了新的问题,既然这里抛了空指针,为何日志中一点信息没有?想了下,这是在线程池中执行的任务,所以猜测会不会是异常被吞了?带着这个疑问去看了下源代码。
三、源代码
在代码中使用的是线程池的submit方法,所以跟进去看一下。
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
/**
* Returns a {@code RunnableFuture} for the given runnable and default
* value.
*
* @param runnable the runnable task being wrapped
* @param value the default value for the returned future
* @param <T> the type of the given value
* @return a {@code RunnableFuture} which, when run, will run the
* underlying runnable and which, as a {@code Future}, will yield
* the given value as its result and provide for cancellation of
* the underlying task
* @since 1.6
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
可以看到,在线程池的submit方法中,将要执行的任务包装成了一个FutureTask�,而在FutureTask中对异常做了处理:
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/**
* Causes this future to report an {@link ExecutionException}
* with the given throwable as its cause, unless this future has
* already been set or has been cancelled.
*
* <p>This method is invoked internally by the {@link #run} method
* upon failure of the computation.
*
* @param t the cause of failure
*/
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
第31行,c.call(),会调用我们自己的业务代码,而我们的业务代码中抛了异常,在FutureTask中可以看到是被捕获了,异常经由37行setException方法被保存在了outcome中,而在调用get()方法获取返回值时,会抛出这个异常:
/**
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
所以使用submit方法时业务代码抛出的异常才没有任务日志显示,因为被捕获处理了。
四、execute()
看完了submit()方法,再来看一下另外一个经常使用的方法execute(),直接跟进源代码:
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
execute方法最直观的区别就是没有返回值!
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
没啥好看的,就调用了一下线程的start方法,没有catch。
所以使用execute方法在业务代码发生异常时是会直接抛出的。