问题场景
在使用Hystrix的情况下,使用RequestInterceptor
获取Oauth2 令牌并传递到下游服务时,发现从RequestContextHolder
中获取到的HttpServletRequest
为NULL。
@Component
public class AAOauth2TokenRequestInterceptor implements RequestInterceptor {
@Override
public void apply(RequestTemplate requestTemplate) {
// 1. 获取 HttpServletRequest
HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
}
}
原因分析
RequestContextHolder
RequestContextHolder
请求上下文持有者,可以看到在其ThreadLocal
中,保存了请求对象,我们可以在当前线程的任意位置,通过这个类来获取到当前请求的RequestAttributes
。
Hystrix隔离策略
Hystrix提供了两个隔离策略:THREAD
、SEMAPHORE
。其默认的策略为THREAD
(线程池)。
在执行请求的实际,可以看到其代理对象是通过HystrixInvocationHandler
实现的,首先会创建一个HystrixCommand
对象。
在其构造方法中,可以看到,会根据客户端名称,创建一个线程池,其最大线程数10。
由以上可知,RequestContextHolder
中的RequestAttributes
是保存在ThreadLocal
中,而ThreadLocal
,在子线程中(包括new Thread和new线程池)是无法获取到的,由此造成了获取到的HttpServletRequest
为null。
解决方案
方案1 修改隔离策略为SEMAPHORE
线程池和信号量最大的区别在于:
- 线程池通过每次都开启一个单独线程运行,每个服务单独用线程池,可以是异步,当线程池到达maxSize后,再请求会触发fallback接口进行熔断。调用线程和hystrixCommand线程不是同一个线程。
- 通过信号量的计数器,当信号量达到maxConcurrentRequests后。再请求会触发fallback。调用线程和hystrixCommand线程是同一个线程。
所以在SEMAPHORE
策略下,因为是同一个线程,所以可以获取到ThreadLocal
中值。
通过以下配置修改隔离策略为SEMAPHORE(信号量):
hystrix:
command:
default:
execution:
isolation:
strategy: SEMAPHORE
重启项目,发现成功获取到了HttpServletRequest
。
但是信号量存在一个很大的问题,信号量的调用是同步的,每次调用都得阻塞调用方的线程,直到结果返回。这样就导致了无法对访问做超时(只能依靠调用协议超时,无法主动释放)。官网也不推荐使用这种模式!
方案2 自定义隔离策略
参考文档
实现HystrixConcurrencyStrategy类,重写其wrapCallable
方法,该方可以在执行前包装/修饰{@code Callable<T>}
。
@Component
public class RequestAttributeHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
private static final Log log = LogFactory
.getLog(RequestAttributeHystrixConcurrencyStrategy.class);
private HystrixConcurrencyStrategy delegate;
public RequestAttributeHystrixConcurrencyStrategy() {
try {
this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy();
if (this.delegate instanceof RequestAttributeHystrixConcurrencyStrategy) {
// Welcome to singleton hell...
return;
}
HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins
.getInstance().getCommandExecutionHook();
HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance()
.getEventNotifier();
HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance()
.getMetricsPublisher();
HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance()
.getPropertiesStrategy();
this.logCurrentStateOfHystrixPlugins(eventNotifier, metricsPublisher,
propertiesStrategy);
HystrixPlugins.reset();
HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
HystrixPlugins.getInstance()
.registerCommandExecutionHook(commandExecutionHook);
HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
}
catch (Exception e) {
log.error("Failed to register Sleuth Hystrix Concurrency Strategy", e);
}
}
private void logCurrentStateOfHystrixPlugins(HystrixEventNotifier eventNotifier,
HystrixMetricsPublisher metricsPublisher,
HystrixPropertiesStrategy propertiesStrategy) {
if (log.isDebugEnabled()) {
log.debug("Current Hystrix plugins configuration is ["
+ "concurrencyStrategy [" + this.delegate + "]," + "eventNotifier ["
+ eventNotifier + "]," + "metricPublisher [" + metricsPublisher + "],"
+ "propertiesStrategy [" + propertiesStrategy + "]," + "]");
log.debug("Registering Sleuth Hystrix Concurrency Strategy.");
}
}
@Override
public <T> Callable<T> wrapCallable(Callable<T> callable) {
RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
return new WrappedCallable<>(callable, requestAttributes);
}
@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
HystrixProperty<Integer> corePoolSize,
HystrixProperty<Integer> maximumPoolSize,
HystrixProperty<Integer> keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
return this.delegate.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize,
keepAliveTime, unit, workQueue);
}
@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
HystrixThreadPoolProperties threadPoolProperties) {
return this.delegate.getThreadPool(threadPoolKey, threadPoolProperties);
}
@Override
public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
return this.delegate.getBlockingQueue(maxQueueSize);
}
@Override
public <T> HystrixRequestVariable<T> getRequestVariable(
HystrixRequestVariableLifecycle<T> rv) {
return this.delegate.getRequestVariable(rv);
}
static class WrappedCallable<T> implements Callable<T> {
private final Callable<T> target;
private final RequestAttributes requestAttributes;
public WrappedCallable(Callable<T> target, RequestAttributes requestAttributes) {
this.target = target;
this.requestAttributes = requestAttributes;
}
@Override
public T call() throws Exception {
try {
RequestContextHolder.setRequestAttributes(requestAttributes);
return target.call();
}
finally {
RequestContextHolder.resetRequestAttributes();
}
}
}
}
测试,发现能正常获取到HttpServletRequest
。