网站首页 编程语言 正文
上一篇中进行负载均衡选择具体服务时,是使用的Ribbon 的 LoadBalancer 的 chooseServer 方法,方法中是使用的 Ribbon的 Rule 的 choose 方法。
看一下Ribbon 的 LoadBalancer
1、LoadBalancer 接口
//com.netflix.loadbalancer.ILoadBalancer
public interface ILoadBalancer {
//添加server
public void addServers(List<Server> newServers);
//选择server
public Server chooseServer(Object key);
//标记server 挂掉
public void markServerDown(Server server);
//丢弃的方法,获取serverList,根据是否可用
@Deprecated
public List<Server> getServerList(boolean availableOnly);
//获取可用的servers
public List<Server> getReachableServers();
//获取所有的servers
public List<Server> getAllServers();
}
2、Loadbalancer的抽象实现类
对比接口,添加了一个枚举,添加了三个方法
public abstract class AbstractLoadBalancer implements ILoadBalancer {
public enum ServerGroup{
ALL,
STATUS_UP,
STATUS_NOT_UP
}
//选择server
public Server chooseServer() {
return chooseServer(null);
}
//根据分组获取serverList
public abstract List<Server> getServerList(ServerGroup serverGroup);
//获取负载均衡状态
public abstract LoadBalancerStats getLoadBalancerStats();
}
AbstractLoadBalancer有两个实现类BaseLoadBalancer,NoOpLoadBalancer
3、BaseLoadBalancer
public class BaseLoadBalancer extends AbstractLoadBalancer implements
PrimeConnections.PrimeConnectionListener, IClientConfigAware {
//默认的rule是轮询 在这个类中才 组合了 IRule
private final static IRule DEFAULT_RULE = new RoundRobinRule();
//实现接口的方法 添加servers
public void addServers(List<Server> newServers) {
if (newServers != null && newServers.size() > 0) {
try {
ArrayList<Server> newList = new ArrayList<Server>();
newList.addAll(allServerList);
newList.addAll(newServers);
//这个方法里面使用了一个读写锁
setServersList(newList);
} catch (Exception e) {
logger.error("LoadBalancer [{}]: Exception while adding Servers", name, e);
}
}
}
//实现接口的方法,选择server 调用rule的choose方法
public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}
//实现接口的方法,标记server下线
public void markServerDown(Server server) {
if (server == null || !server.isAlive()) {
return;
}
logger.error("LoadBalancer [{}]: markServerDown called on [{}]", name, server.getId());
server.setAlive(false);
// forceQuickPing();
//通知server状态的变化
notifyServerStatusChangeListener(singleton(server));
}
//实现接口的方法,获取可用servers
public List<Server> getReachableServers() {
return Collections.unmodifiableList(upServerList);
}
//实现接口的方法,获取所有的server
public List<Server> getAllServers() {
return Collections.unmodifiableList(allServerList);
}
//实现抽象类的方法,获取serverList
public List<Server> getServerList(ServerGroup serverGroup) {
switch (serverGroup) {
case ALL:
return allServerList;
case STATUS_UP:
return upServerList;
case STATUS_NOT_UP:
ArrayList<Server> notAvailableServers = new ArrayList<Server>(
allServerList);
ArrayList<Server> upServers = new ArrayList<Server>(upServerList);
notAvailableServers.removeAll(upServers);
return notAvailableServers;
}
return new ArrayList<Server>();
}
//抽象类的实现方法,获取负载均衡状态
public LoadBalancerStats getLoadBalancerStats() {
return lbStats;
}
}
4、NoOpLoadBalancer 就是什么都不做
public class NoOpLoadBalancer extends AbstractLoadBalancer {
@Override
public void addServers(List<Server> newServers) {
logger.info("addServers to NoOpLoadBalancer ignored");
}
@Override
public Server chooseServer(Object key) {
return null;
}
@Override
public LoadBalancerStats getLoadBalancerStats() {
return null;
}
@Override
public List<Server> getServerList(ServerGroup serverGroup) {
return Collections.emptyList();
}
@Override
public void markServerDown(Server server) {
logger.info("markServerDown to NoOpLoadBalancer ignored");
}
@Override
public List<Server> getServerList(boolean availableOnly) {
// TODO Auto-generated method stub
return null;
}
@Override
public List<Server> getReachableServers() {
return null;
}
@Override
public List<Server> getAllServers() {
return null;
}
}
BaseLoadBalancer有个实现类DynamicServerListLoadBalancer
5、DynamicServerListLoadBalancer
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
//是否https
boolean isSecure = false;
boolean useTunnel = false;
// server状态是否正在更新
protected AtomicBoolean serverListUpdateInProgress = new AtomicBoolean(false);
volatile ServerList<T> serverListImpl;
volatile ServerListFilter<T> filter;
//这里实现动态刷新服务列表 会有一个定时器来执行
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers();
}
};
protected volatile ServerListUpdater serverListUpdater;
...
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping);
this.serverListImpl = serverList;
this.filter = filter;
//传入update定时器
this.serverListUpdater = serverListUpdater;
if (filter instanceof AbstractServerListFilter) {
((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
}
restOfInit(clientConfig);
}
...
void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
this.setEnablePrimingConnections(false);
//启动update的定时器
enableAndInitLearnNewServersFeature();
updateListOfServers();
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections()
.primeConnections(getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
//更新servers
@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
updateAllServerList(servers);
}
//更新所有servers
protected void updateAllServerList(List<T> ls) {
// other threads might be doing this - in which case, we pass
if (serverListUpdateInProgress.compareAndSet(false, true)) {
try {
for (T s : ls) {
s.setAlive(true); // set so that clients can start using these
// servers right away instead
// of having to wait out the ping cycle.
}
setServersList(ls);
super.forceQuickPing();
} finally {
serverListUpdateInProgress.set(false);
}
}
}
...
public void setServersList(List lsrv) {
//调用BaseLoadbalancer的setServersList,更新服务列表
//allServerList upServerList
super.setServersList(lsrv);
//更新zone 服务列表
List<T> serverList = (List<T>) lsrv;
Map<String, List<Server>> serversInZones = new HashMap<String, List<Server>>();
for (Server server : serverList) {
// make sure ServerStats is created to avoid creating them on hot
// path
getLoadBalancerStats().getSingleServerStat(server);
String zone = server.getZone();
if (zone != null) {
zone = zone.toLowerCase();
List<Server> servers = serversInZones.get(zone);
if (servers == null) {
servers = new ArrayList<Server>();
serversInZones.put(zone, servers);
}
servers.add(server);
}
}
setServerListForZones(serversInZones);
}
protected void setServerListForZones(
Map<String, List<Server>> zoneServersMap) {
LOGGER.debug("Setting server list for zones: {}", zoneServersMap);
getLoadBalancerStats().updateZoneServerMapping(zoneServersMap);
}
...
}
DynamicServerListLoadBalancer中添加了一个属性ServerListUpdater.UpdateAction,通过匿名内部类实现。
public void enableAndInitLearnNewServersFeature() {
LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
serverListUpdater.start(updateAction);
}
6、ZoneAwareLoadBalancer
DynamicServerListLoadBalancer的唯一实现类,也是Ribbon负载均衡默认的负载具衡器
public class ZoneAwareLoadBalancer<T extends Server> extends DynamicServerListLoadBalancer<T> {
//key:zone, value:LoadBalancer
private ConcurrentHashMap<String, BaseLoadBalancer> balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();
private static final Logger logger = LoggerFactory.getLogger(ZoneAwareLoadBalancer.class);
private volatile DynamicDoubleProperty triggeringLoad;
private volatile DynamicDoubleProperty triggeringBlackoutPercentage;
private static final DynamicBooleanProperty ENABLED = DynamicPropertyFactory.getInstance().getBooleanProperty("ZoneAwareNIWSDiscoveryLoadBalancer.enabled", true);
...
//设置区域和服务
@Override
protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {
super.setServerListForZones(zoneServersMap);
if (balancers == null) {
balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();
}
for (Map.Entry<String, List<Server>> entry: zoneServersMap.entrySet()) {
String zone = entry.getKey().toLowerCase();
getLoadBalancer(zone).setServersList(entry.getValue());
}
// check if there is any zone that no longer has a server
// and set the list to empty so that the zone related metrics does not
// contain stale data
for (Map.Entry<String, BaseLoadBalancer> existingLBEntry: balancers.entrySet()) {
if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {
existingLBEntry.getValue().setServersList(Collections.emptyList());
}
}
}
//选择server,上一篇讲过
@Override
public Server chooseServer(Object key) {
if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
logger.debug("Zone aware logic disabled or there is only one zone");
return super.chooseServer(key);
}
Server server = null;
try {
LoadBalancerStats lbStats = getLoadBalancerStats();
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
logger.debug("Zone snapshots: {}", zoneSnapshot);
if (triggeringLoad == null) {
triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
}
if (triggeringBlackoutPercentage == null) {
triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
}
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
logger.debug("Available zones: {}", availableZones);
if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) {
String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
logger.debug("Zone chosen: {}", zone);
if (zone != null) {
BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
server = zoneLoadBalancer.chooseServer(key);
}
}
} catch (Exception e) {
logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
}
if (server != null) {
return server;
} else {
logger.debug("Zone avoidance logic is not invoked.");
return super.chooseServer(key);
}
}
//根据zone获取负载均衡器
@VisibleForTesting
BaseLoadBalancer getLoadBalancer(String zone) {
zone = zone.toLowerCase();
BaseLoadBalancer loadBalancer = balancers.get(zone);
if (loadBalancer == null) {
// We need to create rule object for load balancer for each zone
IRule rule = cloneRule(this.getRule());
loadBalancer = new BaseLoadBalancer(this.getName() + "_" + zone, rule, this.getLoadBalancerStats());
BaseLoadBalancer prev = balancers.putIfAbsent(zone, loadBalancer);
if (prev != null) {
loadBalancer = prev;
}
}
return loadBalancer;
}
//克隆rule
private IRule cloneRule(IRule toClone) {
IRule rule;
if (toClone == null) {
rule = new AvailabilityFilteringRule();
} else {
String ruleClass = toClone.getClass().getName();
try {
rule = (IRule) ClientFactory.instantiateInstanceWithClientConfig(ruleClass, this.getClientConfig());
} catch (Exception e) {
throw new RuntimeException("Unexpected exception creating rule for ZoneAwareLoadBalancer", e);
}
}
return rule;
}
//设置rule
@Override
public void setRule(IRule rule) {
super.setRule(rule);
if (balancers != null) {
for (String zone: balancers.keySet()) {
balancers.get(zone).setRule(cloneRule(rule));
}
}
}
}
在RibbonClientConfiguration注册ZoneAwareLoadBalancer时,同时注册了serverList和serverListUpdater。
@Bean
@ConditionalOnMissingBean
public IRule ribbonRule(IClientConfig config) {
if (this.propertiesFactory.isSet(IRule.class, name)) {
return this.propertiesFactory.get(IRule.class, config, name);
}
ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
rule.initWithNiwsConfig(config);
return rule;
}
@Bean
@ConditionalOnMissingBean
public IPing ribbonPing(IClientConfig config) {
if (this.propertiesFactory.isSet(IPing.class, name)) {
return this.propertiesFactory.get(IPing.class, config, name);
}
return new DummyPing();
}
@Bean
@ConditionalOnMissingBean
@SuppressWarnings("unchecked")
public ServerList<Server> ribbonServerList(IClientConfig config) {
if (this.propertiesFactory.isSet(ServerList.class, name)) {
return this.propertiesFactory.get(ServerList.class, config, name);
}
ConfigurationBasedServerList serverList = new ConfigurationBasedServerList();
serverList.initWithNiwsConfig(config);
return serverList;
}
@Bean
@ConditionalOnMissingBean
public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
return new PollingServerListUpdater(config);
}
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
return this.propertiesFactory.get(ILoadBalancer.class, config, name);
}
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
serverListFilter, serverListUpdater);
}
@Bean
@ConditionalOnMissingBean
@SuppressWarnings("unchecked")
public ServerListFilter<Server> ribbonServerListFilter(IClientConfig config) {
if (this.propertiesFactory.isSet(ServerListFilter.class, name)) {
return this.propertiesFactory.get(ServerListFilter.class, config, name);
}
ZonePreferenceServerListFilter filter = new ZonePreferenceServerListFilter();
filter.initWithNiwsConfig(config);
return filter;
}
由此可见,ribbonServerListUpdater是注入的PollingServerListUpdater。
在上面DynamicServerListLoadBalancer的enableAndInitLearnNewServersFeature方法中启动了ServerListUpdater的start方法,就是调用了PollingServerListUpdater的start方法。
//com.netflix.loadbalancer.PollingServerListUpdater#start
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
//调用update方法,在DynamicServerListLoadBalancer
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
//定时器延迟1s,定时30s执行一次
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
定时器会刷新服务列表
原文链接:https://blog.csdn.net/xuwenjingrenca/article/details/125032840
相关推荐
- 2022-11-27 Git基础学习之文件删除操作命令详解_相关技巧
- 2022-09-22 为什么float4个字节比long8个字节所表示的数值范围广
- 2022-12-22 python3中超级好用的日志模块-loguru模块使用详解_python
- 2022-09-19 C语言数据结构之单链表存储详解_C 语言
- 2022-04-16 C++中allocator类使用示例_C 语言
- 2022-04-11 C#定时任务框架Quartz.NET介绍与用法_C#教程
- 2022-04-19 C#中的类继承详解_C#教程
- 2022-03-20 6ull加载linux驱动模块失败解决方法_Linux
- 最近更新
-
- window11 系统安装 yarn
- 超详细win安装深度学习环境2025年最新版(
- Linux 中运行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存储小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基础操作-- 运算符,流程控制 Flo
- 1. Int 和Integer 的区别,Jav
- spring @retryable不生效的一种
- Spring Security之认证信息的处理
- Spring Security之认证过滤器
- Spring Security概述快速入门
- Spring Security之配置体系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置权
- redisson分布式锁中waittime的设
- maven:解决release错误:Artif
- restTemplate使用总结
- Spring Security之安全异常处理
- MybatisPlus优雅实现加密?
- Spring ioc容器与Bean的生命周期。
- 【探索SpringCloud】服务发现-Nac
- Spring Security之基于HttpR
- Redis 底层数据结构-简单动态字符串(SD
- arthas操作spring被代理目标对象命令
- Spring中的单例模式应用详解
- 聊聊消息队列,发送消息的4种方式
- bootspring第三方资源配置管理
- GIT同步修改后的远程分支