背景
为什么要限流?为什么要做这个需求?
因为生产故障,就是请求从以前的2000/m,突然飙高到6000/m,数据库万级别的连接都打满了,导致连接池满了,dubbo线程池也满了。
最后疯狂告警,
1.网关请求量翻了3倍
2.数据库万级别的连接满了
3.连接池满了
4.dubbo线程池满了
由于连接池满了,获取数据库连接阻塞,导致获取数据库连接耗时从几秒到几分钟,由于处理慢处理不过来,又导致dubbo线程池满了,后面的请求就直接失败了。也就是说,获取连接慢,导致阻塞,从而导致dubbo线程池满,dubbo线程池堆的越多,里面的请求由于获取连接耗时太久,很可能就超时了,导致请求失败。最后在很长的时间内,就产生了恶性循环。
怎么解决?首先请求高峰的问题不是彻底完全的解决,而是降低请求失败数量,降低失败时长,避免整个系统都崩溃了,但是可以允许部分请求失败。以前是大部分请求都失败了,要么由于耗时太久,导致请求超时;要么由于dubbo线程池满,直接把请求拒绝了。现在如果有限流,比如30个请求/秒,一分钟就是2000个请求,也就是说,1s内的30个请求是有效的,但是超过30个的请求就直接被拒绝了,这样就用允许少量的部分请求失败,避免了整个系统陷入之前的恶性循环,不仅会导致大部分请求都失败了,而且在很长的时间内,都一直是如此,那么整个系统长时间不可用,就等于崩溃了。
这就是限流的作用,就是控制单位时间内的总的请求数量,如果超过,就直接拒绝请求。
与并发的区别?
dubbo也可以限制并发数量,那区别是什么呢?区别是,一个是限制同一时间的并发数量(即dubbo线程池里的活跃数量),比如dubbo默认线程池数量是200,那么最高并发数量就是200,超过200,就会直接拒绝请求。
并发是同一时间(所谓同一时间是瞬时,平常所谓的峰值就是这个意思)的请求数量,而限流是单位时间(即一段时间,可以是1秒,也可以是1分钟)内的总的请求数量。
配置
<dubbo:service protocol="dubbo" interface="xxx.xxx.service.IxxxPay" ref="xxxPayService" validation="false">
<dubbo:parameter key="tps" value="30"/> //请求数量
<dubbo:parameter key="tps.interval" value="1000"/> //时间长度(单位是毫秒),比如1s内只允许30个请求,或者1m内只允许2000个请求
</dubbo:service>
复制代码
配置完这个,就可以生效了。
动态修改
如果想要在运行期间,动态修改怎么办?
在dubbo admin配置。
步骤
1.找到对应的服务
2.然后,添加配置参数
dubbo限流有bug
动态修改如果要生效,必须
1.先改tps为-1 //目的是删除之前的旧值
2.再改为其他正数值 //目的是使用当前动态配置的新值
详细实现原理和源码分析见源码分析那一节。
修复dubbo限流bug
这个bug在2.7版本已经修复,在2.6版本还没有修复。于是顺手修复dubbo bug。
github.com/apache/dubb…
主要修改是
1.以前如果旧值存在,直接就忽略新值了,继续使用旧值,导致新值没生效。
2.现在是如果旧值存在,就要比较新值和旧值,如果不一样,就使用新值。
源码分析
dubbo限流实现原理
dubbo限流拦截器
public static final String TPS_LIMIT_RATE_KEY = "tps";
/**
* Limit TPS for either service or service's particular method
*/
@Activate(group = Constants.PROVIDER, value = Constants.TPS_LIMIT_RATE_KEY//
@Activate注解的作用是,只要配置了tps,tps拦截器就自动生效)
public class TpsLimitFilter implements Filter {
private final TPSLimiter tpsLimiter = new DefaultTPSLimiter();
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
//校验是否通过限流拦截器
if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) {
throw new RpcException(
"Failed to invoke service " +
invoker.getInterface().getName() +
"." +
invocation.getMethodName() +
" because exceed max service tps.");
}
//调用dubbo接口
return invoker.invoke(invocation);
}
}
复制代码
限流功能实现类
public class DefaultTPSLimiter implements TPSLimiter {
private final ConcurrentMap<String, StatItem> stats
= new ConcurrentHashMap<String, StatItem>();
@Override
public boolean isAllowable(URL url, Invocation invocation) {
int rate = url.getParameter(Constants.TPS_LIMIT_RATE_KEY, -1); //从url获取tps值
long interval = url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY,
Constants.DEFAULT_TPS_LIMIT_INTERVAL); //从url获取时间长度值
String serviceKey = url.getServiceKey();
if (rate > 0) { //如果tps值大0
StatItem statItem = stats.get(serviceKey); //获取限流旧值
if (statItem == null) { //如果限流旧值不存在,就使用新值
stats.putIfAbsent(serviceKey,
new StatItem(serviceKey, rate, interval));
statItem = stats.get(serviceKey);
}
return statItem.isAllowable();
} else { //否则,删除旧值
StatItem statItem = stats.get(serviceKey);
if (statItem != null) {
stats.remove(serviceKey);
}
}
return true;
}
}
复制代码
限流数据和限流算法类,包含了
1.限流数据
2.限流算法
class StatItem {
private String name;
private long lastResetTime;
private long interval; //时间长度
private AtomicInteger token;
private int rate; //tps
//限流算法,校验限流是否通过
public boolean isAllowable() {
long now = System.currentTimeMillis();
if (now > lastResetTime + interval) {
token.set(rate);
lastResetTime = now;
}
int value = token.get();
boolean flag = false;
while (value > 0 && !flag) {
flag = token.compareAndSet(value, value - 1);
value = token.get();
}
return flag;
}
复制代码
dubbo限流为什么有bug?
主要有两点
1.直接动态修改,tps新值没有生效
为什么没有生效?看源码。
public class DefaultTPSLimiter implements TPSLimiter {
private final ConcurrentMap<String, StatItem> stats
= new ConcurrentHashMap<String, StatItem>();
@Override
public boolean isAllowable(URL url, Invocation invocation) {
int rate = url.getParameter(Constants.TPS_LIMIT_RATE_KEY, -1); //获取限流新值tps
long interval = url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY,
Constants.DEFAULT_TPS_LIMIT_INTERVAL); //获取限流新值时间长度
String serviceKey = url.getServiceKey();
if (rate > 0) {
StatItem statItem = stats.get(serviceKey); //获取限流旧值
if (statItem == null) { //校验限流旧值是否存在,如果不存在,才用限流新值——问题就出在这里,如果旧值存在,那么新值就没有生效,所以这是个bug。
stats.putIfAbsent(serviceKey,
new StatItem(serviceKey, rate, interval));
statItem = stats.get(serviceKey);
}
return statItem.isAllowable(); //校验当前请求是否通过限流
} else {
StatItem statItem = stats.get(serviceKey);
if (statItem != null) {
stats.remove(serviceKey);
}
}
return true;
}
}
复制代码
2.所以,解决方法是先改为负值(比如-1)
1)为什么改为负值就可以生效?
还是来看源码
public class DefaultTPSLimiter implements TPSLimiter {
private final ConcurrentMap<String, StatItem> stats
= new ConcurrentHashMap<String, StatItem>();
@Override
public boolean isAllowable(URL url, Invocation invocation) {
int rate = url.getParameter(Constants.TPS_LIMIT_RATE_KEY, -1);
long interval = url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY,
Constants.DEFAULT_TPS_LIMIT_INTERVAL);
String serviceKey = url.getServiceKey();
if (rate > 0) { 2.再改为正数,就走这里
StatItem statItem = stats.get(serviceKey); //获取限流旧值
if (statItem == null) { //因为刚才已经删除了旧值,所以现在旧值不存在
stats.putIfAbsent(serviceKey,
new StatItem(serviceKey, rate, interval)); //设置限流新值
statItem = stats.get(serviceKey);
}
return statItem.isAllowable();
} else { //1.先改为负值,如果是负值,就走这里
StatItem statItem = stats.get(serviceKey); //获取限流旧值
if (statItem != null) { //如果旧值存在,就删除旧值
stats.remove(serviceKey);
}
}
return true;
}
}
复制代码
实现思路是
1.先改为负值,目的是删除限流旧值
2.再改为正数,目的是使用限流新值
2)改为0可以吗?
不可以。为什么不可以?因为如果tps值为0,dubbo限流拦截器就失效了,根本没有进入限流拦截器,就无从谈起使用新值,反而连旧值也失效了,因为根本没有走限流拦截器。
为什么tps值为0,会导致限流拦截器失效?因为dubbo拦截器是否开启,会校验@Activate注解的value属性的值,即如果tps值为0,拦截器就会失效,这个失效不光光是限流拦截器,如果其他拦截器的@Activate注解的value属性的值为0,该拦截器也会失效。
下面是源码
原理是
1.请求来了
2.经过dubbo拦截器
3.那到底要经过哪些拦截器?
会校验每个拦截器是启用还是失效,源码实现如上截图。
4.如果限流拦截器失效,就不会走限流拦截器;
如果启用,就走。




近期评论