网站首页 编程语言 正文
1、接口 IRule
public interface IRule{
//选择server
public Server choose(Object key);
//设置LoadBalancer
public void setLoadBalancer(ILoadBalancer lb);
//获取LoadBalancer
public ILoadBalancer getLoadBalancer();
}
2、接口的抽象实现类 AbstractLoadBalancerRule
public abstract class AbstractLoadBalancerRule implements IRule, IClientConfigAware {
private ILoadBalancer lb;
@Override
public void setLoadBalancer(ILoadBalancer lb){
this.lb = lb;
}
@Override
public ILoadBalancer getLoadBalancer(){
return lb;
}
}
上一篇中,IRule是在 BaseLoadBalancer 中做了一个绑定,在这里,在接口中设置了Loadbalancer
3、轮询 RoundRobinRule
public class RoundRobinRule extends AbstractLoadBalancerRule {
//下一个计数
private AtomicInteger nextServerCyclicCounter;
private static final boolean AVAILABLE_ONLY_SERVERS = true;
private static final boolean ALL_SERVERS = false;
private static Logger log = LoggerFactory.getLogger(RoundRobinRule.class);
public RoundRobinRule() {
nextServerCyclicCounter = new AtomicInteger(0);
}
public RoundRobinRule(ILoadBalancer lb) {
this();
setLoadBalancer(lb);
}
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
log.warn("no load balancer");
return null;
}
Server server = null;
//尝试次数
int count = 0;
while (server == null && count++ < 10) {
//可用的servers
List<Server> reachableServers = lb.getReachableServers();
//所有的servers
List<Server> allServers = lb.getAllServers();
int upCount = reachableServers.size();
int serverCount = allServers.size();
if ((upCount == 0) || (serverCount == 0)) {
log.warn("No up servers available from load balancer: " + lb);
return null;
}
//获取server,递增计数并取模
int nextServerIndex = incrementAndGetModulo(serverCount);
server = allServers.get(nextServerIndex);
if (server == null) {
/* Transient. */
Thread.yield();
continue;
}
if (server.isAlive() && (server.isReadyToServe())) {
return (server);
}
// Next.
server = null;
}
//尝试10次还没有获取到server,则返回空
if (count >= 10) {
log.warn("No available alive servers after 10 tries from load balancer: "
+ lb);
}
return server;
}
//递增计数并取模
private int incrementAndGetModulo(int modulo) {
for (;;) {
int current = nextServerCyclicCounter.get();
int next = (current + 1) % modulo;
if (nextServerCyclicCounter.compareAndSet(current, next))
return next;
}
}
@Override
public Server choose(Object key) {
return choose(getLoadBalancer(), key);
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
}
}
4、权重轮询WeightedResponseTimeRule
将server的应答时间的平均值或者百分比作为权重
public class WeightedResponseTimeRule extends RoundRobinRule {
public static final IClientConfigKey<Integer> WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY = new IClientConfigKey<Integer>() {
@Override
public String key() {
return "ServerWeightTaskTimerInterval";
}
@Override
public String toString() {
return key();
}
@Override
public Class<Integer> type() {
return Integer.class;
}
};
//默认时间周期 30秒
public static final int DEFAULT_TIMER_INTERVAL = 30 * 1000;
//权重计算定时时间 默认 30秒
private int serverWeightTaskTimerInterval = DEFAULT_TIMER_INTERVAL;
private static final Logger logger = LoggerFactory.getLogger(WeightedResponseTimeRule.class);
//服务的权重值,后面的会叠加前面的
private volatile List<Double> accumulatedWeights = new ArrayList<Double>();
private final Random random = new Random();
protected Timer serverWeightTimer = null;
protected AtomicBoolean serverWeightAssignmentInProgress = new AtomicBoolean(false);
String name = "unknown";
public WeightedResponseTimeRule() {
super();
}
public WeightedResponseTimeRule(ILoadBalancer lb) {
super(lb);
}
@Override
public void setLoadBalancer(ILoadBalancer lb) {
super.setLoadBalancer(lb);
if (lb instanceof BaseLoadBalancer) {
name = ((BaseLoadBalancer) lb).getName();
}
initialize(lb);
}
void initialize(ILoadBalancer lb) {
if (serverWeightTimer != null) {
serverWeightTimer.cancel();
}
serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-"
+ name, true);
serverWeightTimer.schedule(new DynamicServerWeightTask(), 0,
serverWeightTaskTimerInterval);
// do a initial run
ServerWeight sw = new ServerWeight();
sw.maintainWeights();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
logger
.info("Stopping NFLoadBalancer-serverWeightTimer-"
+ name);
serverWeightTimer.cancel();
}
}));
}
public void shutdown() {
if (serverWeightTimer != null) {
logger.info("Stopping NFLoadBalancer-serverWeightTimer-" + name);
serverWeightTimer.cancel();
}
}
List<Double> getAccumulatedWeights() {
return Collections.unmodifiableList(accumulatedWeights);
}
//选择server
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE")
@Override
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
return null;
}
Server server = null;
while (server == null) {
//获取权重值
List<Double> currentWeights = accumulatedWeights;
if (Thread.interrupted()) {
return null;
}
//获取所有server
List<Server> allList = lb.getAllServers();
int serverCount = allList.size();
if (serverCount == 0) {
return null;
}
int serverIndex = 0;
//权重值集合的最后一个,代表权重值得和
double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1);
// 没有server或者权重值没有初始化,退化到 轮询 rule
if (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) {
server = super.choose(getLoadBalancer(), key);
if(server == null) {
return server;
}
} else {
// 创建一个随机值,0~maxTotalWeight
double randomWeight = random.nextDouble() * maxTotalWeight;
// pick the server index based on the randomIndex
int n = 0;
//选择随机数小于等于权重值的server
for (Double d : currentWeights) {
if (d >= randomWeight) {
serverIndex = n;
break;
} else {
n++;
}
}
server = allList.get(serverIndex);
}
if (server == null) {
/* Transient. */
Thread.yield();
continue;
}
if (server.isAlive()) {
return (server);
}
// Next.
server = null;
}
return server;
}
//权重值更新定时器
class DynamicServerWeightTask extends TimerTask {
public void run() {
ServerWeight serverWeight = new ServerWeight();
try {
serverWeight.maintainWeights();
} catch (Exception e) {
logger.error("Error running DynamicServerWeightTask for {}", name, e);
}
}
}
//权重值更新逻辑
class ServerWeight {
public void maintainWeights() {
//获取负载均衡器
ILoadBalancer lb = getLoadBalancer();
if (lb == null) {
return;
}
if (!serverWeightAssignmentInProgress.compareAndSet(false, true)) {
return;
}
try {
logger.info("Weight adjusting job started");
AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;
//负载均衡器的状态
LoadBalancerStats stats = nlb.getLoadBalancerStats();
if (stats == null) {
// no statistics, nothing to do
return;
}
//计算所有server的平均相应时间的综合
double totalResponseTime = 0;
for (Server server : nlb.getAllServers()) {
// 获取服务状态
ServerStats ss = stats.getSingleServerStat(server);
totalResponseTime += ss.getResponseTimeAvg();
}
// 计算每个server的权重
//weightSoFar + totalResponseTime - server的平均响应时间
Double weightSoFar = 0.0;
List<Double> finalWeights = new ArrayList<Double>();
for (Server server : nlb.getAllServers()) {
ServerStats ss = stats.getSingleServerStat(server);
double weight = totalResponseTime - ss.getResponseTimeAvg();
weightSoFar += weight;
finalWeights.add(weightSoFar);
}
setWeights(finalWeights);
} catch (Exception e) {
logger.error("Error calculating server weights", e);
} finally {
serverWeightAssignmentInProgress.set(false);
}
}
}
void setWeights(List<Double> weights) {
this.accumulatedWeights = weights;
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
super.initWithNiwsConfig(clientConfig);
serverWeightTaskTimerInterval = clientConfig.get(WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY, DEFAULT_TIMER_INTERVAL);
}
}
5、ClientConfigEnabledRoundRobinRule
public class ClientConfigEnabledRoundRobinRule extends AbstractLoadBalancerRule {
RoundRobinRule roundRobinRule = new RoundRobinRule();
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
roundRobinRule = new RoundRobinRule();
}
@Override
public void setLoadBalancer(ILoadBalancer lb) {
super.setLoadBalancer(lb);
roundRobinRule.setLoadBalancer(lb);
}
@Override
public Server choose(Object key) {
if (roundRobinRule != null) {
return roundRobinRule.choose(key);
} else {
throw new IllegalArgumentException(
"This class has not been initialized with the RoundRobinRule class");
}
}
}
这个策略什么也没有做,直接依赖了一个RoundRobinRule,但是它是这些策略的基础类,提供了一个兜底能力。
BestAvailableRule,PredicateBasedRule,ZoneAvoidanceRule,AvailabilityFilteringRule。
6、BestAvailableRule 最小连接数
public class BestAvailableRule extends ClientConfigEnabledRoundRobinRule {
private LoadBalancerStats loadBalancerStats;
@Override
public Server choose(Object key) {
if (loadBalancerStats == null) {
return super.choose(key);
}
List<Server> serverList = getLoadBalancer().getAllServers();
int minimalConcurrentConnections = Integer.MAX_VALUE;
long currentTime = System.currentTimeMillis();
Server chosen = null;
//遍历所有server
for (Server server: serverList) {
//获取server的状态
ServerStats serverStats = loadBalancerStats.getSingleServerStat(server);
//如果没有熔断
if (!serverStats.isCircuitBreakerTripped(currentTime)) {
//当前server连接数
int concurrentConnections = serverStats.getActiveRequestsCount(currentTime);
//判断当前可用server的连接数是否小于最小连接数
//将最小连接数赋值当前连接数,循环后得到最小连接的server
if (concurrentConnections < minimalConcurrentConnections) {
minimalConcurrentConnections = concurrentConnections;
chosen = server;
}
}
}
//如果没有选择出server,则直接调用父类的方法选择server
if (chosen == null) {
return super.choose(key);
} else {
return chosen;
}
}
@Override
public void setLoadBalancer(ILoadBalancer lb) {
super.setLoadBalancer(lb);
if (lb instanceof AbstractLoadBalancer) {
loadBalancerStats = ((AbstractLoadBalancer) lb).getLoadBalancerStats();
}
}
}
7、PredicateBasedRule 断言基础策略
public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {
//获取断言
public abstract AbstractServerPredicate getPredicate();
//选择server,在过滤后进行轮询
@Override
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
}
8、AvailabilityFilteringRule 可用性过滤
public class AvailabilityFilteringRule extends PredicateBasedRule {
private AbstractServerPredicate predicate;
public AvailabilityFilteringRule() {
super();
predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, null))
.addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
.build();
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, clientConfig))
.addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
.build();
}
//获得可用服务的数量
@Monitor(name="AvailableServersCount", type = DataSourceType.GAUGE)
public int getAvailableServersCount() {
ILoadBalancer lb = getLoadBalancer();
List<Server> servers = lb.getAllServers();
if (servers == null) {
return 0;
}
return Collections2.filter(servers, predicate.getServerOnlyPredicate()).size();
}
//用轮询策略选择server,再判断是否可用,最大尝试十次
@Override
public Server choose(Object key) {
int count = 0;
Server server = roundRobinRule.choose(key);
while (count++ <= 10) {
//predicate.apply 是否不用跳过
if (predicate.apply(new PredicateKey(server))) {
return server;
}
server = roundRobinRule.choose(key);
}
return super.choose(key);
}
@Override
public AbstractServerPredicate getPredicate() {
return predicate;
}
}
9、ZoneAvoidanceRule
public class ZoneAvoidanceRule extends PredicateBasedRule {
private static final Random random = new Random();
private CompositePredicate compositePredicate;
public ZoneAvoidanceRule() {
super();
ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this);
AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this);
compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);
}
private CompositePredicate createCompositePredicate(ZoneAvoidancePredicate p1, AvailabilityPredicate p2) {
return CompositePredicate.withPredicates(p1, p2)
.addFallbackPredicate(p2)
.addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
.build();
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this, clientConfig);
AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this, clientConfig);
compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);
}
static Map<String, ZoneSnapshot> createSnapshot(LoadBalancerStats lbStats) {
Map<String, ZoneSnapshot> map = new HashMap<String, ZoneSnapshot>();
for (String zone : lbStats.getAvailableZones()) {
ZoneSnapshot snapshot = lbStats.getZoneSnapshot(zone);
map.put(zone, snapshot);
}
return map;
}
static String randomChooseZone(Map<String, ZoneSnapshot> snapshot,
Set<String> chooseFrom) {
if (chooseFrom == null || chooseFrom.size() == 0) {
return null;
}
String selectedZone = chooseFrom.iterator().next();
if (chooseFrom.size() == 1) {
return selectedZone;
}
int totalServerCount = 0;
for (String zone : chooseFrom) {
totalServerCount += snapshot.get(zone).getInstanceCount();
}
int index = random.nextInt(totalServerCount) + 1;
int sum = 0;
for (String zone : chooseFrom) {
sum += snapshot.get(zone).getInstanceCount();
if (index <= sum) {
selectedZone = zone;
break;
}
}
return selectedZone;
}
//获取可用区域
public static Set<String> getAvailableZones(
Map<String, ZoneSnapshot> snapshot, double triggeringLoad,
double triggeringBlackoutPercentage) {
if (snapshot.isEmpty()) {
return null;
}
//获取可用区域名称
Set<String> availableZones = new HashSet<String>(snapshot.keySet());
if (availableZones.size() == 1) {
return availableZones;
}
Set<String> worstZones = new HashSet<String>();
double maxLoadPerServer = 0;
boolean limitedZoneAvailability = false;
//遍历区域快照
for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) {
//区域名称
String zone = zoneEntry.getKey();
//区域快照
ZoneSnapshot zoneSnapshot = zoneEntry.getValue();
//区域实例
int instanceCount = zoneSnapshot.getInstanceCount();
//该区域没有服务,移除
if (instanceCount == 0) {
availableZones.remove(zone);
limitedZoneAvailability = true;
} else {
double loadPerServer = zoneSnapshot.getLoadPerServer();
//该区域服务的负载小于0,或者服务的故障率大于等于阈值,移除
if (((double) zoneSnapshot.getCircuitTrippedCount())
/ instanceCount >= triggeringBlackoutPercentage
|| loadPerServer < 0) {
availableZones.remove(zone);
limitedZoneAvailability = true;
} else {
//选出平均负载最差的区域
if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) {
worstZones.add(zone);
} else if (loadPerServer > maxLoadPerServer) {
maxLoadPerServer = loadPerServer;
worstZones.clear();
worstZones.add(zone);
}
}
}
}
//最大负载小于阈值并且没有被限制的区域
if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) {
// zone override is not needed here
return availableZones;
}
//从最差区域中轮询一个,然后从可用区域中移除
String zoneToAvoid = randomChooseZone(snapshot, worstZones);
if (zoneToAvoid != null) {
availableZones.remove(zoneToAvoid);
}
return availableZones;
}
//获取可用的区域
public static Set<String> getAvailableZones(LoadBalancerStats lbStats,
double triggeringLoad, double triggeringBlackoutPercentage) {
if (lbStats == null) {
return null;
}
//生成区域快照
Map<String, ZoneSnapshot> snapshot = createSnapshot(lbStats);
return getAvailableZones(snapshot, triggeringLoad,
triggeringBlackoutPercentage);
}
@Override
public AbstractServerPredicate getPredicate() {
return compositePredicate;
}
}
原文链接:https://blog.csdn.net/xuwenjingrenca/article/details/125032956
相关推荐
- 2022-06-13 C语言结构体超详细讲解_C 语言
- 2023-02-27 python定时任务schedule库用法详细讲解_python
- 2024-03-19 Shell脚本中的条件测试命令简介
- 2022-08-27 Terraform集成简单Gitlab CI方案详解_其它综合
- 2023-02-15 docker容器存储清理删除所需命令和方法_docker
- 2021-12-09 Linux内存管理和寻址详细介绍_Linux
- 2022-09-13 Nginx报错104:Connection reset by peer问题的解决及分析_nginx
- 2023-01-17 pytest用yaml文件编写测试用例流程详解_python
- 最近更新
-
- window11 系统安装 yarn
- 超详细win安装深度学习环境2025年最新版(
- Linux 中运行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存储小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基础操作-- 运算符,流程控制 Flo
- 1. Int 和Integer 的区别,Jav
- spring @retryable不生效的一种
- Spring Security之认证信息的处理
- Spring Security之认证过滤器
- Spring Security概述快速入门
- Spring Security之配置体系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置权
- redisson分布式锁中waittime的设
- maven:解决release错误:Artif
- restTemplate使用总结
- Spring Security之安全异常处理
- MybatisPlus优雅实现加密?
- Spring ioc容器与Bean的生命周期。
- 【探索SpringCloud】服务发现-Nac
- Spring Security之基于HttpR
- Redis 底层数据结构-简单动态字符串(SD
- arthas操作spring被代理目标对象命令
- Spring中的单例模式应用详解
- 聊聊消息队列,发送消息的4种方式
- bootspring第三方资源配置管理
- GIT同步修改后的远程分支