网站首页 编程语言 正文
1、自动配置类
public class RibbonAutoConfiguration {
...
@Bean
@ConditionalOnMissingBean
public SpringClientFactory springClientFactory() {
SpringClientFactory factory = new SpringClientFactory();
factory.setConfigurations(this.configurations);
return factory;
}
@Bean
@ConditionalOnMissingBean(LoadBalancerClient.class)
public LoadBalancerClient loadBalancerClient() {
return new RibbonLoadBalancerClient(springClientFactory());
}
...
}
可以看到,这里配置了一个bean,RibbonLoadBalancerClient,提供Ribbon的负载均衡能力。
2、RibbonLoadBalancerClient
public class RibbonLoadBalancerClient implements LoadBalancerClient {
private SpringClientFactory clientFactory;
public RibbonLoadBalancerClient(SpringClientFactory clientFactory) {
this.clientFactory = clientFactory;
}
//构建URI
@Override
public URI reconstructURI(ServiceInstance instance, URI original) {
Assert.notNull(instance, "instance can not be null");
String serviceId = instance.getServiceId();
RibbonLoadBalancerContext context = this.clientFactory
.getLoadBalancerContext(serviceId);
URI uri;
Server server;
if (instance instanceof RibbonServer) {
RibbonServer ribbonServer = (RibbonServer) instance;
server = ribbonServer.getServer();
uri = updateToSecureConnectionIfNeeded(original, ribbonServer);
}
else {
server = new Server(instance.getScheme(), instance.getHost(),
instance.getPort());
IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);
ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
uri = updateToSecureConnectionIfNeeded(original, clientConfig,
serverIntrospector, server);
}
return context.reconstructURIWithServer(server, uri);
}
//选择服务
@Override
public ServiceInstance choose(String serviceId) {
return choose(serviceId, null);
}
public ServiceInstance choose(String serviceId, Object hint) {
Server server = getServer(getLoadBalancer(serviceId), hint);
if (server == null) {
return null;
}
return new RibbonServer(serviceId, server, isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
}
//执行
@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request)
throws IOException {
return execute(serviceId, request, null);
}
public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
throws IOException {
//获取loadBalancer ZoneAwareLoadBalancer
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
//选择 server
Server server = getServer(loadBalancer, hint);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
//包装
RibbonServer ribbonServer = new RibbonServer(serviceId, server,
isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
//执行
return execute(serviceId, ribbonServer, request);
}
@Override
public <T> T execute(String serviceId, ServiceInstance serviceInstance,
LoadBalancerRequest<T> request) throws IOException {
//获取server
Server server = null;
if (serviceInstance instanceof RibbonServer) {
server = ((RibbonServer) serviceInstance).getServer();
}
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonLoadBalancerContext context = this.clientFactory
.getLoadBalancerContext(serviceId);
RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
try {
//请求
T returnVal = request.apply(serviceInstance);
statsRecorder.recordStats(returnVal);
return returnVal;
}
catch (IOException ex) {
statsRecorder.recordStats(ex);
throw ex;
}
catch (Exception ex) {
statsRecorder.recordStats(ex);
ReflectionUtils.rethrowRuntimeException(ex);
}
return null;
}
private ServerIntrospector serverIntrospector(String serviceId) {
ServerIntrospector serverIntrospector = this.clientFactory.getInstance(serviceId,
ServerIntrospector.class);
if (serverIntrospector == null) {
serverIntrospector = new DefaultServerIntrospector();
}
return serverIntrospector;
}
private boolean isSecure(Server server, String serviceId) {
IClientConfig config = this.clientFactory.getClientConfig(serviceId);
ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
return RibbonUtils.isSecure(config, serverIntrospector, server);
}
protected Server getServer(String serviceId) {
return getServer(getLoadBalancer(serviceId), null);
}
protected Server getServer(ILoadBalancer loadBalancer) {
return getServer(loadBalancer, null);
}
protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
if (loadBalancer == null) {
return null;
}
return loadBalancer.chooseServer(hint != null ? hint : "default");
}
protected ILoadBalancer getLoadBalancer(String serviceId) {
return this.clientFactory.getLoadBalancer(serviceId);
}
public static class RibbonServer implements ServiceInstance {
private final String serviceId;
private final Server server;
private final boolean secure;
private Map<String, String> metadata;
...
@Override
public URI getUri() {
return DefaultServiceInstance.getUri(this);
}
...
}
}
DefaultServiceInstance.getUri(this);
public static URI getUri(ServiceInstance instance) {
String scheme = (instance.isSecure()) ? "https" : "http";
String uri = String.format("%s://%s:%s", scheme, instance.getHost(),
instance.getPort());
return URI.create(uri);
}
在上篇负载均衡中,负载均衡只是提供了LoadBalancerClient 接口,并没有具体的实现,RibbonLoadBalancerClient 重写了 LoadBalancerClient,提供了负载均衡的实现。
3、获取loadBalancer
//org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient#getLoadBalancer
protected ILoadBalancer getLoadBalancer(String serviceId) {
return this.clientFactory.getLoadBalancer(serviceId);
}
//org.springframework.cloud.netflix.ribbon.SpringClientFactory#getLoadBalancer
public ILoadBalancer getLoadBalancer(String name) {
return getInstance(name, ILoadBalancer.class);
}
//org.springframework.cloud.netflix.ribbon.SpringClientFactory#getInstance
public <C> C getInstance(String name, Class<C> type) {
//type ILoadBalancer
//能获取到就直接返回,获取不到,生成ILoadBalancer的bean ZoneAwareLoadBalancer
C instance = super.getInstance(name, type);
if (instance != null) {
return instance;
}
IClientConfig config = getInstance(name, IClientConfig.class);
return instantiateWithConfig(getContext(name), type, config);
}
//org.springframework.cloud.context.named.NamedContextFactory#getInstance
//根据类型获取bean
public <T> T getInstance(String name, Class<T> type) {
AnnotationConfigApplicationContext context = getContext(name);
try {
return context.getBean(type);
}
catch (NoSuchBeanDefinitionException e) {
// ignore
}
return null;
}
注册 RibbonClientConfigration
//org.springframework.cloud.netflix.ribbon.SpringClientFactory#getContext
protected AnnotationConfigApplicationContext getContext(String name) {
return super.getContext(name);
}
//org.springframework.cloud.context.named.NamedContextFactory#getContext
protected AnnotationConfigApplicationContext getContext(String name) {
if (!this.contexts.containsKey(name)) {
synchronized (this.contexts) {
if (!this.contexts.containsKey(name)) {
this.contexts.put(name, createContext(name));
}
}
}
return this.contexts.get(name);
}
//org.springframework.cloud.context.named.NamedContextFactory#createContext
protected AnnotationConfigApplicationContext createContext(String name) {
...
//这里注册 RibbonClientConfigration
context.register(PropertyPlaceholderAutoConfiguration.class,
this.defaultConfigType);
...
}
//org.springframework.cloud.context.named.NamedContextFactory#defaultConfigType
private Class<?> defaultConfigType;
//在 RibbonAutoConfigration 注册 SpringClientFactory的时候,调用父类的构造方法设置了
//defaultConfigType = RibbonClientConfigration
public NamedContextFactory(Class<?> defaultConfigType, String propertySourceName,
String propertyName) {
this.defaultConfigType = defaultConfigType;
this.propertySourceName = propertySourceName;
this.propertyName = propertyName;
}
RibbonClientConfigration 注册的时候会注册 ZoneAwareLoadBalancer
ZoneAwareLoadBalancer
//org.springframework.cloud.netflix.ribbon.RibbonClientConfiguration#ribbonLoadBalancer
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
return this.propertiesFactory.get(ILoadBalancer.class, config, name);
}
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
serverListFilter, serverListUpdater);
}
4、获取服务
//org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient#getServer
protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
if (loadBalancer == null) {
return null;
}
return loadBalancer.chooseServer(hint != null ? hint : "default");
}
该方法中没有使用 RibbonLoadBalancerClient的choose 方法,而是调用了负载均衡器的chooseServer,此处是 ZoneAwareLoadBalancer 的 chooseServer
继承关系
public class ZoneAwareLoadBalancer<T extends Server> extends DynamicServerListLoadBalancer<T>
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer
public class BaseLoadBalancer extends AbstractLoadBalancer implements
PrimeConnections.PrimeConnectionListener, IClientConfigAware
public abstract class AbstractLoadBalancer implements ILoadBalancer
//com.netflix.loadbalancer.ZoneAwareLoadBalancer#chooseServer
public Server chooseServer(Object key) {
//区域不可用或者只有一个区域
if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
logger.debug("Zone aware logic disabled or there is only one zone");
return super.chooseServer(key);
}
//多个区域
Server server = null;
try {
LoadBalancerStats lbStats = getLoadBalancerStats();
//生成zone快照
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
logger.debug("Zone snapshots: {}", zoneSnapshot);
if (triggeringLoad == null) {
triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
}
if (triggeringBlackoutPercentage == null) {
triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
}
//从zone快照中获取可用zone
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
logger.debug("Available zones: {}", availableZones);
if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) {
//随机选择一个可用的zone
String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
logger.debug("Zone chosen: {}", zone);
if (zone != null) {
//根据zone选择负载均衡器,选择rule
BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
//根据负载均衡器选择server
server = zoneLoadBalancer.chooseServer(key);
}
}
} catch (Exception e) {
logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
}
if (server != null) {
return server;
} else {
logger.debug("Zone avoidance logic is not invoked.");
return super.chooseServer(key);
}
}
BaseLoadBalancer
//com.netflix.loadbalancer.BaseLoadBalancer#chooseServer
public Server chooseServer(Object key) {
//counter是计数器,监控
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
//按照规则选择server
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}
//com.netflix.loadbalancer.PredicateBasedRule#choose
public Server choose(Object key) {
//ZoneAwareLoadBalancer
ILoadBalancer lb = getLoadBalancer();
//轮询算法,获取server
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
5、请求 request.apply
//org.springframework.cloud.client.loadbalancer.LoadBalancerRequestFactory#createRequest
public LoadBalancerRequest<ClientHttpResponse> createRequest(
final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) {
return instance -> {
HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance,
this.loadBalancer);
if (this.transformers != null) {
for (LoadBalancerRequestTransformer transformer : this.transformers) {
serviceRequest = transformer.transformRequest(serviceRequest,
instance);
}
}
return execution.execute(serviceRequest, body);
};
}
最后execution.execute 又回到了InterceptingClientHttpRequest.InterceptingRequestExecution#execute,继续判断interceptor,若还有拦截器没有执行,则继续执行拦截器逻辑,如果没有拦截器了,执行请求
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
if (this.iterator.hasNext()) {
ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
return nextInterceptor.intercept(request, body, this);
}
else {
HttpMethod method = request.getMethod();
Assert.state(method != null, "No standard HTTP method");
//SimpleStreamingClientHttpRequest
ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
if (body.length > 0) {
if (delegate instanceof StreamingHttpOutputMessage) {
StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
}
else {
StreamUtils.copy(body, delegate.getBody());
}
}
return delegate.execute();
}
}
原文链接:https://blog.csdn.net/xuwenjingrenca/article/details/125011680
相关推荐
- 2022-12-10 C++中使用正则匹配问题_C 语言
- 2023-03-28 python程序中调用其他程序的实现_python
- 2022-11-28 详解如何使用C++写一个线程安全的单例模式_C 语言
- 2022-08-07 Android 文件存储系统原理_Android
- 2022-11-26 Qt实现模糊匹配功能的实例详解_C 语言
- 2022-10-23 Redis 异常 read error on connection 的解决方案_Redis
- 2022-09-24 C#/VB.NET实现PPT或PPTX转换为图像功能_C#教程
- 2022-05-27 一起来学习C语言的程序环境与预处理_C 语言
- 最近更新
-
- window11 系统安装 yarn
- 超详细win安装深度学习环境2025年最新版(
- Linux 中运行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存储小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基础操作-- 运算符,流程控制 Flo
- 1. Int 和Integer 的区别,Jav
- spring @retryable不生效的一种
- Spring Security之认证信息的处理
- Spring Security之认证过滤器
- Spring Security概述快速入门
- Spring Security之配置体系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置权
- redisson分布式锁中waittime的设
- maven:解决release错误:Artif
- restTemplate使用总结
- Spring Security之安全异常处理
- MybatisPlus优雅实现加密?
- Spring ioc容器与Bean的生命周期。
- 【探索SpringCloud】服务发现-Nac
- Spring Security之基于HttpR
- Redis 底层数据结构-简单动态字符串(SD
- arthas操作spring被代理目标对象命令
- Spring中的单例模式应用详解
- 聊聊消息队列,发送消息的4种方式
- bootspring第三方资源配置管理
- GIT同步修改后的远程分支