核心思想:Schedulers虽然提供了很多种线程调度的策略,但如果服务部署在tomcat中,那么直接使用tomcat的线程池即可,没必要再另外创建线程,故采用Schedulers.immediate()
策略。
通过分析源码我们知道除了在过滤器WebFilter
中调用subscribeOn
、publishOn
去切换线程池以外,spring webflux在启动时,还有自动装配,我们直接覆盖自动装配即可。
首先原生的自动装配类为:
@AutoConfiguration(
after = {ReactiveWebServerFactoryAutoConfiguration.class, CodecsAutoConfiguration.class, ReactiveMultipartAutoConfiguration.class, ValidationAutoConfiguration.class, WebSessionIdResolverAutoConfiguration.class}
)
@ConditionalOnWebApplication(
type = Type.REACTIVE
)
@ConditionalOnClass({WebFluxConfigurer.class})
@ConditionalOnMissingBean({WebFluxConfigurationSupport.class})
@AutoConfigureOrder(-2147483638)
public class WebFluxAutoConfiguration {
public WebFluxAutoConfiguration() {
}
@Bean
@ConditionalOnMissingBean(
name = {"webSessionManager"}
)
public WebSessionManager webSessionManager(ObjectProvider<WebSessionIdResolver> webSessionIdResolver) {
DefaultWebSessionManager webSessionManager = new DefaultWebSessionManager();
Duration timeout = this.serverProperties.getReactive().getSession().getTimeout();
webSessionManager.setSessionStore(new WebFluxAutoConfiguration.MaxIdleTimeInMemoryWebSessionStore(timeout));
webSessionIdResolver.ifAvailable(webSessionManager::setSessionIdResolver);
return webSessionManager;
}
}
那么这里我则可以自己写一个装配类:
@Configuration
public class CustomWebFluxAutoConfiguration {
private final ServerProperties serverProperties;
public CustomWebFluxAutoConfiguration(ServerProperties serverProperties) {
this.serverProperties = serverProperties;
}
@Bean
@ConditionalOnMissingBean(name = WebHttpHandlerBuilder.WEB_SESSION_MANAGER_BEAN_NAME)
public WebSessionManager webSessionManager(ObjectProvider<WebSessionIdResolver> webSessionIdResolver) {
DefaultWebSessionManager webSessionManager = new DefaultWebSessionManager();
Duration timeout = this.serverProperties.getReactive().getSession().getTimeout();
webSessionManager.setSessionStore(new MaxIdleTimeInMemoryWebSessionStore(timeout));
webSessionIdResolver.ifAvailable(webSessionManager::setSessionIdResolver);
return webSessionManager;
}
static final class MaxIdleTimeInMemoryWebSessionStore extends InMemoryWebSessionStore {
private final Duration timeout;
private MaxIdleTimeInMemoryWebSessionStore(Duration timeout) {
this.timeout = timeout;
}
@NotNull
@Override
public Mono<WebSession> createWebSession() {
return new WebSessionConfig().createWebSession().doOnSuccess(this::setMaxIdleTime);
}
private void setMaxIdleTime(WebSession session) {
session.setMaxIdleTime(this.timeout);
}
}
}
WebSessionConfig类,我们重写其createWebSession()
方法,在里面指定线程策略,重写其他方法就照抄框架已有InMemoryWebSessionStore()
中的实现的即可。
public class WebSessionConfig implements WebSessionStore {
@NotNull
@Override
public Mono<WebSession> createWebSession() {
Instant now = this.clock.instant();
this.expiredSessionChecker.checkIfNecessary(now);
return Mono.<WebSession>fromSupplier(() -> new InMemoryWebSession(now))
.subscribeOn(Schedulers.immediate())
.publishOn(Schedulers.immediate());
}
}