04、如何优雅的拿到线程池中的异常?

一、背景

当下有很多隔离技术,包括服务隔离、数据库隔离、线程池隔离。我们之前有个业务场景:异步写Oracle、写ES、调用三方服务,这其中牵扯到三个彼此隔离的线程池,线程池的异常需要分别记录到日志搜集系统中。由此引发了一个问题:线程池中的异常该如何获取?

二、解决方案

1)直接在run()方法中try-catch(最low的)
import java.util.concurrent.*;

/**
 * @author Saint
 */
public class ThreadPoolExecutorTest {

    public static void main(String[] args) throws InterruptedException {

        ThreadPoolExecutor myThreadPool = new ThreadPoolExecutor(1, 1, 0,
                TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
                r -> {

                    Thread t = new Thread(r);
                    return t;
                }, new ThreadPoolExecutor.AbortPolicy());
        myThreadPool.execute(() -> {

            try {

                int i = 1 / 0;
            } catch (Exception e) {

                System.out.println("出错了");
            }
        });
        myThreadPool.shutdown();
        // 等了一分钟,线程池中的任务还没跑完,主线程便结束。如果线程池中的任务很快就结束了,那么此处不会等1分钟
        myThreadPool.awaitTermination(1, TimeUnit.MINUTES);
    }
}
2)使用future获取返回值(感觉很像try-catch)
import java.util.concurrent.*;

/**
 * @author Saint
 */
public class ThreadPoolExecutorTest2 {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ThreadPoolExecutor myThreadPool = new ThreadPoolExecutor(1, 1, 0,
                TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
                r -> {

                    Thread t = new Thread(r);
                    return t;
                }, new ThreadPoolExecutor.AbortPolicy());
        Future<String> future = myThreadPool.submit(new Callable<String>() {

            @Override
            public String call() {

                try {

                    int i = 1 / 0;
                } catch (Exception e) {

                    // 返回异常信息
                    return e.getMessage();
                }
                return "success";
            }
        });
        String res = future.get();
        System.out.println(res);
        myThreadPool.shutdown();
    }
}
3)自定义线程池,并自定义线程工厂,在自定义的线程工厂中给线程设置uncaughtExceptionHandler

Thread的uncaughtExceptionHandler()源码如下:

/**
     * Set the handler invoked when this thread abruptly terminates
     * due to an uncaught exception.
     * <p>A thread can take full control of how it responds to uncaught
     * exceptions by having its uncaught exception handler explicitly set.
     * If no such handler is set then the thread's <tt>ThreadGroup</tt>
     * object acts as its handler.
     * @param eh the object to use as this thread's uncaught exception
     * handler. If <tt>null</tt> then this thread has no explicit handler.
     * @throws  SecurityException  if the current thread is not allowed to
     *          modify this thread.
     * @see #setDefaultUncaughtExceptionHandler
     * @see ThreadGroup#uncaughtException
     * @since 1.5
     */
    public void setUncaughtExceptionHandler(UncaughtExceptionHandler eh) {

        checkAccess();
        uncaughtExceptionHandler = eh;
    }

demo如下:

/**
 * @author Saint
 */
public class ThreadPoolExecutorTest3 {

    public static void main(String[] args) throws InterruptedException {

        MyThreadPool myThreadPool = new MyThreadPool(1, 1, 0,
                TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
                r -> {

                    Thread t = new Thread(r);
                    // 获取线程池中的异常
                    t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {

                        @Override
                        public void uncaughtException(Thread t, Throwable e) {

                            // 对异常进行处理
                            System.out.println("注意,出错了!");
                        }
                    });
                    return t;
                }, new ThreadPoolExecutor.AbortPolicy());
        myThreadPool.execute(() -> System.out.println("1"));
        myThreadPool.execute(() -> System.out.println("1"));
        myThreadPool.execute(() -> System.out.println("1"));
        myThreadPool.shutdown();
        // 等了一分钟,线程池中的任务还没跑完,主线程便结束。如果线程池中的任务很快就结束了,那么此处不会等1分钟
        myThreadPool.awaitTermination(1, TimeUnit.MINUTES);
    }
}

/**
 * threadPoolExecutor实现类,这里是使用ThreadPoolExecutor的runWorker()方法中的一个模板方法
 */
class MyThreadPool extends ThreadPoolExecutor {

    public MyThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {

        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    /**
     * 让afterExecute钩子函数出现异常
     */
    @Override
    protected void afterExecute(Runnable r, Throwable t) {

        super.afterExecute(r, t);
        int i = 1 / 0;
    }
}