这篇博文主要是针对Sentinel中间件中常用算法进行源码解析和进行一些简易实现代码
计数器限流算法
假设限流1s 1000个请求。定义起始点,每来一个请求计数器加一,到了1000限流,到了1s计数器清空。
优点:实现简单,基本用redis实现一个lua脚本
缺点:会出现突刺现象,在1s的前半秒1000个请求打完了,后续直接停止访问。统计不精确,假设0~0.5s 200请求
0.5
1s 700请求 11.5s 600请求 1.52s 200请求,这种情况其实是不会限流的,但在0.51.5确有1300请求,这时是有可能吧机器打崩掉的。
Demo
通过redis过期时间控制计数器清空
// 时间
private int limitPeriod = 1000;
// 请求数
private int limitCount = 1000;
// 检查是否限流
public boolean check(){
String luaScript = buildLuaScript();
RedisScript<Number> redisScript = new DefaultRedisScript<>(luaScript, Number.class);
Number count = redisTemplate.execute(redisScript, keys, limitCount, limitPeriod);
if (count != null && count.intValue() <= limitCount) {
return false;
} else {
return true;
}
}
// 构建lua脚本
public String buildLuaScript() {
// 调用不超过最大值,则直接返回,执行计算器自加
// 从第一次调用开始限流,设置对应键值的过期
return "local c" +
"\nc = redis.call('get',KEYS[1])" +
"\nif c and tonumber(c) > tonumber(ARGV[1]) then" +
"\nreturn c;" +
"\nend" +
"\nc = redis.call('incr',KEYS[1])" +
"\nif tonumber(c) == 1 then" +
"\nredis.call('expire',KEYS[1],ARGV[2])" +
"\nend" +
"\nreturn c;";
}
滑动时间窗算法
滑动时间窗也是由计数器限流算法演进而来。它的出现就是为了解决计数器算法中的一些问题。
滑动时间窗算法和技术器算法基本一致,但在起始点这里做了改变,起始点不再是固定的,而是滑动的。我们可以这么想象有一把尺子上面有刻度,然后我们统计的请求都会放到每一个刻度上面,这时候我们有一个仅仅只有1cm(1s)的短尺在这把长的尺子上面不断滑动,短尺覆盖的范围所有刻度的请求加起来就是这1s的请求数。这种统计方式和计数器限流算法一样会有精度问题,但会大大缩小,想要越精确就是使刻度越多,但性能会降低。因为每一个刻度都是存在内存中的,存的越多性能就越低。
Demo
// 服务访问次数,可以放在Redis中,实现分布式系统的访问计数
private static Long counter = 0L;
// 使用LinkedList来记录滑动窗口的10个格子。
private static LinkedList<Long> slots = new LinkedList<>();
// 限流标记
private static Boolean limit = false;
public static void main(String[] args) throws InterruptedException {
// 开启线程,每100ms统计下共有多少个请求,所有请求放到slots格子中的一个
// slot 长度为 10 ,每个格子是100ms(sleep时间)。所以这段代码表示的是10*100ms时间内通过100个请求
new Thread(() -> {
try {
doCheck();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
private static void doCheck() throws InterruptedException {
while (true) {
slots.addLast(counter);
// 丢弃之前的请求,能用的也就10个格子,之前的都可以丢弃掉,不丢弃可能会造成内存溢出
// 这也是我们移格子的过程
if (slots.size() > 10) {
slots.removeFirst();
}
// 比较最后一个和第一个,两者相差100以上就限流
// 10*100ms时间内通过100个请求
limit = (slots.peekLast() - slots.peekFirst()) > 100;
// 表示没个格子长度
Thread.sleep(100);
}
}
Sentinel源码
构造时间窗格
// 时间窗格
public static volatile int SAMPLE_COUNT = 2;
// 时间间隔
public static volatile int INTERVAL = RuleConstant.DEFAULT_WINDOW_INTERVAL_MS;
// StatisticNode -- 构造了一个ArrayMetric 具体构造方法在LeapArray中
// 秒级统计
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
// 分钟级统计
public LeapArray(int sampleCount, int intervalInMs) {
AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
// 每个窗格所占用时间长度,时间间隔/窗格数
this.windowLengthInMs = intervalInMs / sampleCount;
// 时间间隔
this.intervalInMs = intervalInMs;
// 时间间隔/秒
this.intervalInSecond = intervalInMs / 1000.0;
// 窗格数
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount);
}
上面这段代码构造了一个时间窗口(秒级)

请求累加
StaticNode#addPassRequest
@Override
public void addPassRequest(int count) {
// 秒级别累加
rollingCounterInSecond.addPass(count);
// 分钟级累加
rollingCounterInMinute.addPass(count);
}
@Override
public void addPass(int count) {
// 定位当前窗口(时间窗口核心代码)
WindowWrap<MetricBucket> wrap = data.currentWindow();
// 对当前窗口的请求次数进行累加
wrap.value().addPass(count);
}
LeapArray#currentWindow
获取当前时间窗格(注:这里以秒级时间窗格数组做理解)
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
// 计算这次请求应该落到哪个时间窗格
// 具体逻辑:首先当前时间除以上诉构造的每个窗格所占时间
// 对结果进行对窗格数组长度进行取模运算
int idx = calculateTimeIdx(timeMillis);
// Calculate current bucket start time.
// 由于窗口开始时间是一直在变得,因为数组长度就是2,以秒级来说的话,就2个格子,每个格子就500ms,就是0~500~1000那么运行到1s以后他就会重新定义格子500~1000~1500,所以这里会计算他的开始时间
// 计算逻辑,当前时间-当前时间%每个窗格所占时间
// 假设运行到1200,1200-1200%500=1000,开始时间为1000
long windowStart = calculateWindowStart(timeMillis);
/*
* Get bucket item at given time from the array.
*
* (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
* (2) Bucket is up-to-date, then just return the bucket.
* (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
*/
while (true) {
// 根据应落得索引位置获取时间窗格
WindowWrap<T> old = array.get(idx);
// 时间窗格为空,表示第一次进来
if (old == null) {
/*
* B0 B1 B2 NULL B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* bucket is empty, so create new and update
*
* If the old bucket is absent, then we create a new bucket at {@code windowStart},
* then try to update circular array via a CAS operation. Only one thread can
* succeed to update, while other threads yield its time slice.
*/
// 构造空的窗格
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
// CAS 防并发
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
// 将空窗格返回
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
// 上面计算的当前时间的开始时间如果和我们idx取得格子的开始时间一样,就用老格子进行统计计算
} else if (windowStart == old.windowStart()) {
/*
* B0 B1 B2 B3 B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* startTime of Bucket 3: 800, so it's up-to-date
*
* If current {@code windowStart} is equal to the start timestamp of old bucket,
* that means the time is within the bucket, so directly return the bucket.
*/
return old;
// 上面计算的当前时间的开始时间如果大于我们idx取得格子的开始时间,证明了,我们已经跨格子了,这时候我们应该滑动窗口
// 重置时间窗格开始时间
} else if (windowStart > old.windowStart()) {
/*
* (old)
* B0 B1 B2 NULL B4
* |_______||_______|_______|_______|_______|_______||___
* ... 1200 1400 1600 1800 2000 2200 timestamp
* ^
* time=1676
* startTime of Bucket 2: 400, deprecated, should be reset
*
* If the start timestamp of old bucket is behind provided time, that means
* the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
* Note that the reset and clean-up operations are hard to be atomic,
* so we need a update lock to guarantee the correctness of bucket update.
*
* The update lock is conditional (tiny scope) and will take effect only when
* bucket is deprecated, so in most cases it won't lead to performance loss.
*/
if (updateLock.tryLock()) {
try {
// 重置格子开始时间
// Successfully get the update lock, now we reset the bucket.
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
// 这种情况基本不存在,出现的情况可能是时钟回拨问题,这里简单处理为了提高系统稳定性
} else if (windowStart < old.windowStart()) {
// Should not go through here, as the provided time is already behind.
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
指标计数
ArrayMetric中有很多方法比如addPass,addRT在其中都会调用add(MetricEvent.xxx, n);方法
// 很多类似方法
public void addPass(int n) {
add(MetricEvent.PASS, n);
}
// 通过枚举index,来统计请求,放到counters数组中
// counters[1] === MetricEvent.PASS 等等
public MetricBucket add(MetricEvent event, long n) {
counters[event.ordinal()].add(n);
return this;
}
// 各种指标
public enum MetricEvent {
/**
* Normal pass.
*/
PASS,
/**
* Normal block.
*/
BLOCK,
EXCEPTION,
SUCCESS,
RT,
/**
* Passed in future quota (pre-occupied, since 1.5.0).
*/
OCCUPIED_PASS
}
漏桶算法
漏桶算法顾名思义就是一个桶,桶下面有个洞,水滴从洞里面往下滴。在这里面桶就是我们可容纳请求数。而水便是我们的请求,洞就是我们每次可通过的请求数。从这里我们也可以看出漏桶算法就是保证请求匀速通过。它被应用于Sentinel排队等待流控效果

Demo
/**
* 当前时间
*/
public long timeStamp = System.currentTimeMillis();
/**
* 桶的容量
*/
public long capacity;
/**
* 水漏出的速度(每秒系统能处理的请求数)
*/
public long rate;
/**
* 当前水量(当前累积请求数)
*/
public long water;
public boolean doCheck() {
long now = System.currentTimeMillis();
// ((now - timeStamp) / 1000) * rate 通过时间和速率计算出我们应该放过的请求,也就是漏的水
water = Math.max(0, water - ((now - timeStamp) / 1000) * rate);
// 先执行漏水, 计算剩余水量
timeStamp = now;
if ((water + 1) < capacity) {
// 尝试加水,并且水还未满
water += 1;
return true;
} else {
// 水满,拒绝加水
return false;
}
}
Sentinel源码
排队等待核心逻辑
RateLimiterController#canPass
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// Pass when acquire count is less or equal than 0.
if (acquireCount <= 0) {
return true;
}
// Reject when count is less or equal than 0.
// Otherwise,the costTime will be max of long and waitTime will overflow in some cases.
if (count <= 0) {
return false;
}
// 计算当前时间
long currentTime = TimeUtil.currentTimeMillis();
// Calculate the interval between every two requests.
// 每2个请求之间间隔时间
// acquireCount 请求数一般为1 count qps阈值 自己设定(每秒多少请求)假设是10
// 1/10*1000=100ms 每100ms放过1个请求,qps=10
long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
// Expected pass time of this request.
// latestPassedTime:上一次请求调用时间
// expectedTime:期望请求时间,上次调用时间假请求间隔时间
long expectedTime = costTime + latestPassedTime.get();
// 当前时间大于期望时间,表示大于请求间隔时间,可以调用,重新辅助latestPassedTime(上一次请求调用时间)
// TODO issue#1615
if (expectedTime <= currentTime) {
// Contention may exist here, but it's okay.
latestPassedTime.set(currentTime);
return true;
} else {
// Calculate the time to wait.
// 我们需要等待时间
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
// maxQueueingTimeMs:超时时间(可配置)
// 等待时间超过超时时间,放弃请求直接返回限流
if (waitTime > maxQueueingTimeMs) {
return false;
} else {
// 请求间隔时间加上上一次请求调用时间(其实就是期望时间)
long oldTime = latestPassedTime.addAndGet(costTime);
try {
// 计算等待时间
waitTime = oldTime - TimeUtil.currentTimeMillis();
// 等待时间超过超时时间,放弃请求直接返回限流
if (waitTime > maxQueueingTimeMs) {
// 上面对latestPassedTime进行过加法运算,这里还原数据
latestPassedTime.addAndGet(-costTime);
return false;
}
// in race condition waitTime may <= 0
// 睡眠等待时间
if (waitTime > 0) {
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}
令牌桶算法
令牌通算法在Sentginel的Warm up流控效果中使用,简称预热模式。大致意思是在一定时间内达到当前Qps。
令牌桶算法简介:
首先会以一个固定速率(可通过控制速率来控制请求速度)往token bucket里面放令牌,而当我们请求过来的时候会先从bucket里面去取令牌,取到了令牌就可以请求,取不到就拒绝请求。
Demo
/**
* 令牌桶限流算法
*/
public class TokenBucket {
//当前时间
public long timeStamp = System.currentTimeMillis();
//桶的容量
public long capacity;
//令牌放入速度
public long rate;
//当前令牌数
public long tokens;
public boolean grant() {
long now = System.currentTimeMillis();
// 先添加令牌
tokens = Math.min(capacity, tokens + (now - timeStamp) * rate);
timeStamp = now;
if (tokens < 1) {
// 若不到1个令牌,则拒绝
return false;
} else {
// 还有令牌,领取令牌
tokens -= 1;
return true;
}
}
}
Sentinel源码
Sentinel 会动态控制令牌放入速率。使请求通过速度更加平滑
WarmUpController#canPass
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
long passQps = (long) node.passQps();
long previousQps = (long) node.previousPassQps();
syncToken(previousQps);
// 开始计算它的斜率
// 如果进入了警戒线,开始调整他的qps
long restToken = storedTokens.get();
if (restToken >= warningToken) {
long aboveToken = restToken - warningToken;
// 消耗的速度要比warning快,但是要比慢
// current interval = restToken*slope+1/count
double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
if (passQps + acquireCount <= warningQps) {
return true;
}
} else {
if (passQps + acquireCount <= count) {
return true;
}
}
return false;
}
protected void syncToken(long passQps) {
long currentTime = TimeUtil.currentTimeMillis();
currentTime = currentTime - currentTime % 1000;
long oldLastFillTime = lastFilledTime.get();
if (currentTime <= oldLastFillTime) {
return;
}
long oldValue = storedTokens.get();
long newValue = coolDownTokens(currentTime, passQps);
if (storedTokens.compareAndSet(oldValue, newValue)) {
long currentValue = storedTokens.addAndGet(0 - passQps);
if (currentValue < 0) {
storedTokens.set(0L);
}
lastFilledTime.set(currentTime);
}
}
private long coolDownTokens(long currentTime, long passQps) {
long oldValue = storedTokens.get();
long newValue = oldValue;
// 添加令牌的判断前提条件:
// 当令牌的消耗程度远远低于警戒线的时候
if (oldValue < warningToken) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
} else if (oldValue > warningToken) {
if (passQps < (int)count / coldFactor) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
}
}
return Math.min(newValue, maxToken);
}