Sentinel常用算法及简易实现


这篇博文主要是针对Sentinel中间件中常用算法进行源码解析和进行一些简易实现代码

计数器限流算法

假设限流1s 1000个请求。定义起始点,每来一个请求计数器加一,到了1000限流,到了1s计数器清空。

  • 优点:实现简单,基本用redis实现一个lua脚本

  • 缺点:会出现突刺现象,在1s的前半秒1000个请求打完了,后续直接停止访问。统计不精确,假设0~0.5s 200请求

    0.51s 700请求 11.5s 600请求 1.52s 200请求,这种情况其实是不会限流的,但在0.51.5确有1300请求,这时是有可能吧机器打崩掉的。

Demo

通过redis过期时间控制计数器清空

// 时间
private int limitPeriod = 1000;
// 请求数
private int limitCount = 1000;

// 检查是否限流
public boolean check(){
    String luaScript = buildLuaScript();
    RedisScript<Number> redisScript = new DefaultRedisScript<>(luaScript, Number.class);
    Number count = redisTemplate.execute(redisScript, keys, limitCount, limitPeriod);
    if (count != null && count.intValue() <= limitCount) {
        return false;
    } else {
        return true;
    }
}

// 构建lua脚本
public String buildLuaScript() {
        // 调用不超过最大值,则直接返回,执行计算器自加
        // 从第一次调用开始限流,设置对应键值的过期
        return "local c" +
                "\nc = redis.call('get',KEYS[1])" +
                "\nif c and tonumber(c) > tonumber(ARGV[1]) then" +
                "\nreturn c;" +
                "\nend" +
                "\nc = redis.call('incr',KEYS[1])" +
                "\nif tonumber(c) == 1 then" +
                "\nredis.call('expire',KEYS[1],ARGV[2])" +
                "\nend" +
                "\nreturn c;";
    }

滑动时间窗算法

滑动时间窗也是由计数器限流算法演进而来。它的出现就是为了解决计数器算法中的一些问题。

滑动时间窗算法和技术器算法基本一致,但在起始点这里做了改变,起始点不再是固定的,而是滑动的。我们可以这么想象有一把尺子上面有刻度,然后我们统计的请求都会放到每一个刻度上面,这时候我们有一个仅仅只有1cm(1s)的短尺在这把长的尺子上面不断滑动,短尺覆盖的范围所有刻度的请求加起来就是这1s的请求数。这种统计方式和计数器限流算法一样会有精度问题,但会大大缩小,想要越精确就是使刻度越多,但性能会降低。因为每一个刻度都是存在内存中的,存的越多性能就越低。

Demo

// 服务访问次数,可以放在Redis中,实现分布式系统的访问计数
private static Long counter = 0L;
// 使用LinkedList来记录滑动窗口的10个格子。
private static LinkedList<Long> slots = new LinkedList<>();
// 限流标记
private static Boolean limit = false;

public static void main(String[] args) throws InterruptedException {
    // 开启线程,每100ms统计下共有多少个请求,所有请求放到slots格子中的一个
    // slot 长度为 10 ,每个格子是100ms(sleep时间)。所以这段代码表示的是10*100ms时间内通过100个请求
    new Thread(() -> {
        try {
            doCheck();
        } catch (InterruptedException e) {
            e.printStackTrace();

        }
    }).start();
}

private static void doCheck() throws InterruptedException {
    while (true) {
        slots.addLast(counter);
        // 丢弃之前的请求,能用的也就10个格子,之前的都可以丢弃掉,不丢弃可能会造成内存溢出
        // 这也是我们移格子的过程
        if (slots.size() > 10) {
            slots.removeFirst();
        } 
        // 比较最后一个和第一个,两者相差100以上就限流
        // 10*100ms时间内通过100个请求
        limit = (slots.peekLast() - slots.peekFirst()) > 100;
        // 表示没个格子长度
        Thread.sleep(100);
    }
}

Sentinel源码

构造时间窗格

// 时间窗格
public static volatile int SAMPLE_COUNT = 2;
// 时间间隔
public static volatile int INTERVAL = RuleConstant.DEFAULT_WINDOW_INTERVAL_MS;
// StatisticNode  --  构造了一个ArrayMetric  具体构造方法在LeapArray中
// 秒级统计
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
        IntervalProperty.INTERVAL);
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
// 分钟级统计
public LeapArray(int sampleCount, int intervalInMs) {
    AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
    AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
    AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");

    // 每个窗格所占用时间长度,时间间隔/窗格数
    this.windowLengthInMs = intervalInMs / sampleCount;
    // 时间间隔
    this.intervalInMs = intervalInMs;
    // 时间间隔/秒
    this.intervalInSecond = intervalInMs / 1000.0;
    // 窗格数
    this.sampleCount = sampleCount;

    this.array = new AtomicReferenceArray<>(sampleCount);
}

上面这段代码构造了一个时间窗口(秒级)

请求累加

StaticNode#addPassRequest

@Override
public void addPassRequest(int count) {
    // 秒级别累加
    rollingCounterInSecond.addPass(count);
    // 分钟级累加
    rollingCounterInMinute.addPass(count);
}

@Override
public void addPass(int count) {
    // 定位当前窗口(时间窗口核心代码)
    WindowWrap<MetricBucket> wrap = data.currentWindow();
    // 对当前窗口的请求次数进行累加
    wrap.value().addPass(count);
}

LeapArray#currentWindow

获取当前时间窗格(注:这里以秒级时间窗格数组做理解

public WindowWrap<T> currentWindow(long timeMillis) {
    if (timeMillis < 0) {
        return null;
    }

    // 计算这次请求应该落到哪个时间窗格
    // 具体逻辑:首先当前时间除以上诉构造的每个窗格所占时间
    // 对结果进行对窗格数组长度进行取模运算
    int idx = calculateTimeIdx(timeMillis);
    // Calculate current bucket start time.
    // 由于窗口开始时间是一直在变得,因为数组长度就是2,以秒级来说的话,就2个格子,每个格子就500ms,就是0~500~1000那么运行到1s以后他就会重新定义格子500~1000~1500,所以这里会计算他的开始时间
    // 计算逻辑,当前时间-当前时间%每个窗格所占时间
    // 假设运行到1200,1200-1200%500=1000,开始时间为1000
    long windowStart = calculateWindowStart(timeMillis);

    /*
     * Get bucket item at given time from the array.
     *
     * (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
     * (2) Bucket is up-to-date, then just return the bucket.
     * (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
     */
    while (true) {
        // 根据应落得索引位置获取时间窗格
        WindowWrap<T> old = array.get(idx);
        // 时间窗格为空,表示第一次进来
        if (old == null) {
            /*
             *     B0       B1      B2    NULL      B4
             * ||_______|_______|_______|_______|_______||___
             * 200     400     600     800     1000    1200  timestamp
             *                             ^
             *                          time=888
             *            bucket is empty, so create new and update
             *
             * If the old bucket is absent, then we create a new bucket at {@code windowStart},
             * then try to update circular array via a CAS operation. Only one thread can
             * succeed to update, while other threads yield its time slice.
             */
            // 构造空的窗格
            WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
               // CAS 防并发
            if (array.compareAndSet(idx, null, window)) {
                // Successfully updated, return the created bucket.
                // 将空窗格返回
                return window;
            } else {
                // Contention failed, the thread will yield its time slice to wait for bucket available.
                Thread.yield();
            }
        // 上面计算的当前时间的开始时间如果和我们idx取得格子的开始时间一样,就用老格子进行统计计算
        } else if (windowStart == old.windowStart()) {
            /*
             *     B0       B1      B2     B3      B4
             * ||_______|_______|_______|_______|_______||___
             * 200     400     600     800     1000    1200  timestamp
             *                             ^
             *                          time=888
             *            startTime of Bucket 3: 800, so it's up-to-date
             *
             * If current {@code windowStart} is equal to the start timestamp of old bucket,
             * that means the time is within the bucket, so directly return the bucket.
             */
            return old;
        // 上面计算的当前时间的开始时间如果大于我们idx取得格子的开始时间,证明了,我们已经跨格子了,这时候我们应该滑动窗口
        // 重置时间窗格开始时间
        } else if (windowStart > old.windowStart()) {
            /*
             *   (old)
             *             B0       B1      B2    NULL      B4
             * |_______||_______|_______|_______|_______|_______||___
             * ...    1200     1400    1600    1800    2000    2200  timestamp
             *                              ^
             *                           time=1676
             *          startTime of Bucket 2: 400, deprecated, should be reset
             *
             * If the start timestamp of old bucket is behind provided time, that means
             * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
             * Note that the reset and clean-up operations are hard to be atomic,
             * so we need a update lock to guarantee the correctness of bucket update.
             *
             * The update lock is conditional (tiny scope) and will take effect only when
             * bucket is deprecated, so in most cases it won't lead to performance loss.
             */
            if (updateLock.tryLock()) {
                try {
                    // 重置格子开始时间
                    // Successfully get the update lock, now we reset the bucket.
                    return resetWindowTo(old, windowStart);
                } finally {
                    updateLock.unlock();
                }
            } else {
                // Contention failed, the thread will yield its time slice to wait for bucket available.
                Thread.yield();
            }
        // 这种情况基本不存在,出现的情况可能是时钟回拨问题,这里简单处理为了提高系统稳定性
        } else if (windowStart < old.windowStart()) {
            // Should not go through here, as the provided time is already behind.
            return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
        }
    }
}

指标计数

ArrayMetric中有很多方法比如addPass,addRT在其中都会调用add(MetricEvent.xxx, n);方法

// 很多类似方法
public void addPass(int n) {
    add(MetricEvent.PASS, n);
}

// 通过枚举index,来统计请求,放到counters数组中
// counters[1] === MetricEvent.PASS  等等
public MetricBucket add(MetricEvent event, long n) {
    counters[event.ordinal()].add(n);
    return this;
}
    
// 各种指标
public enum MetricEvent {

    /**
     * Normal pass.
     */
    PASS,
    /**
     * Normal block.
     */
    BLOCK,
    EXCEPTION,
    SUCCESS,
    RT,

    /**
     * Passed in future quota (pre-occupied, since 1.5.0).
     */
    OCCUPIED_PASS
}

漏桶算法

漏桶算法顾名思义就是一个桶,桶下面有个洞,水滴从洞里面往下滴。在这里面桶就是我们可容纳请求数。而水便是我们的请求,洞就是我们每次可通过的请求数。从这里我们也可以看出漏桶算法就是保证请求匀速通过。它被应用于Sentinel排队等待流控效果

Demo

 /**
     * 当前时间
     */
    public long timeStamp = System.currentTimeMillis();

    /**
     * 桶的容量
     */
    public long capacity;

    /**
     * 水漏出的速度(每秒系统能处理的请求数)
     */
    public long rate;

    /**
     * 当前水量(当前累积请求数)
     */
    public long water;

    public boolean doCheck() {

        long now = System.currentTimeMillis();
        // ((now - timeStamp) / 1000) * rate 通过时间和速率计算出我们应该放过的请求,也就是漏的水
        water = Math.max(0, water - ((now - timeStamp) / 1000) * rate);
        // 先执行漏水, 计算剩余水量
        timeStamp = now;
        if ((water + 1) < capacity) {
            // 尝试加水,并且水还未满
            water += 1;
            return true;
        } else {
            // 水满,拒绝加水
            return false;
        }
    }

Sentinel源码

排队等待核心逻辑

RateLimiterController#canPass

public boolean canPass(Node node, int acquireCount, boolean prioritized) {
    // Pass when acquire count is less or equal than 0.
    if (acquireCount <= 0) {
        return true;
    }
    // Reject when count is less or equal than 0.
    // Otherwise,the costTime will be max of long and waitTime will overflow in some cases.
    if (count <= 0) {
        return false;
    }

    // 计算当前时间
    long currentTime = TimeUtil.currentTimeMillis();
    // Calculate the interval between every two requests.
    // 每2个请求之间间隔时间
    // acquireCount 请求数一般为1  count qps阈值 自己设定(每秒多少请求)假设是10
    // 1/10*1000=100ms 每100ms放过1个请求,qps=10
    long costTime = Math.round(1.0 * (acquireCount) / count * 1000);

    // Expected pass time of this request.
    // latestPassedTime:上一次请求调用时间
    // expectedTime:期望请求时间,上次调用时间假请求间隔时间
    long expectedTime = costTime + latestPassedTime.get();

    // 当前时间大于期望时间,表示大于请求间隔时间,可以调用,重新辅助latestPassedTime(上一次请求调用时间)
    // TODO issue#1615
    if (expectedTime <= currentTime) {
        // Contention may exist here, but it's okay.
        latestPassedTime.set(currentTime);
        return true;
    } else {
        // Calculate the time to wait.
        // 我们需要等待时间
        long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
        // maxQueueingTimeMs:超时时间(可配置)
        // 等待时间超过超时时间,放弃请求直接返回限流
        if (waitTime > maxQueueingTimeMs) {
            return false;
        } else {
            // 请求间隔时间加上上一次请求调用时间(其实就是期望时间)
            long oldTime = latestPassedTime.addAndGet(costTime);
            try {
                // 计算等待时间
                waitTime = oldTime - TimeUtil.currentTimeMillis();
                // 等待时间超过超时时间,放弃请求直接返回限流
                if (waitTime > maxQueueingTimeMs) {
                    // 上面对latestPassedTime进行过加法运算,这里还原数据
                    latestPassedTime.addAndGet(-costTime);
                    return false;
                }
                // in race condition waitTime may <= 0
                // 睡眠等待时间
                if (waitTime > 0) {
                    Thread.sleep(waitTime);
                }
                return true;
            } catch (InterruptedException e) {
            }
        }
    }
    return false;
}

令牌桶算法

令牌通算法在Sentginel的Warm up流控效果中使用,简称预热模式。大致意思是在一定时间内达到当前Qps。

令牌桶算法简介:

首先会以一个固定速率(可通过控制速率来控制请求速度)往token bucket里面放令牌,而当我们请求过来的时候会先从bucket里面去取令牌,取到了令牌就可以请求,取不到就拒绝请求。

Demo

/**
 * 令牌桶限流算法
 */
public class TokenBucket {
    //当前时间
    public long timeStamp = System.currentTimeMillis();
    //桶的容量
    public long capacity;
    //令牌放入速度
    public long rate;
    //当前令牌数
    public long tokens;

    public boolean grant() {
        long now = System.currentTimeMillis();
        // 先添加令牌
        tokens = Math.min(capacity, tokens + (now - timeStamp) * rate);
        timeStamp = now;
        if (tokens < 1) {
            // 若不到1个令牌,则拒绝 
            return false;

        } else {
            // 还有令牌,领取令牌
            tokens -= 1;
            return true;

        }
    }
}

Sentinel源码

Sentinel 会动态控制令牌放入速率。使请求通过速度更加平滑

WarmUpController#canPass

public boolean canPass(Node node, int acquireCount, boolean prioritized) {
    long passQps = (long) node.passQps();

    long previousQps = (long) node.previousPassQps();
    syncToken(previousQps);

    // 开始计算它的斜率
    // 如果进入了警戒线,开始调整他的qps
    long restToken = storedTokens.get();
    if (restToken >= warningToken) {
        long aboveToken = restToken - warningToken;
        // 消耗的速度要比warning快,但是要比慢
        // current interval = restToken*slope+1/count
        double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
        if (passQps + acquireCount <= warningQps) {
            return true;
        }
    } else {
        if (passQps + acquireCount <= count) {
            return true;
        }
    }

    return false;
}
protected void syncToken(long passQps) {
    long currentTime = TimeUtil.currentTimeMillis();
    currentTime = currentTime - currentTime % 1000;
    long oldLastFillTime = lastFilledTime.get();
    if (currentTime <= oldLastFillTime) {
        return;
    }

    long oldValue = storedTokens.get();
    long newValue = coolDownTokens(currentTime, passQps);

    if (storedTokens.compareAndSet(oldValue, newValue)) {
        long currentValue = storedTokens.addAndGet(0 - passQps);
        if (currentValue < 0) {
            storedTokens.set(0L);
        }
        lastFilledTime.set(currentTime);
    }
}

private long coolDownTokens(long currentTime, long passQps) {
    long oldValue = storedTokens.get();
    long newValue = oldValue;

    // 添加令牌的判断前提条件:
    // 当令牌的消耗程度远远低于警戒线的时候
    if (oldValue < warningToken) {
        newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
    } else if (oldValue > warningToken) {
        if (passQps < (int)count / coldFactor) {
            newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
        }
    }
    return Math.min(newValue, maxToken);
}

文章作者: dm
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 dm !
评论
 上一篇
Sentinel源码解析 Sentinel源码解析
入口在Springcloud中引入sentinel我们发现只是引入了一个jar包就完事了,说明肯定是通过SpringBoot自动装配将sentinel引入进来,既然这样,那么我没找到spring.factories就能找到自动装配的类 sp
2023-01-14
下一篇 
Sentinel基本使用 Sentinel基本使用
前言在一个微服务系统中,经常会出现一个服务依赖于多个服务,而一个调用逻辑会调用多个服务的情况出现。如果出现某个服务提供者出现故障不可用,就会导致服务消费者不可用,又由于是同步调用最后线程全部阻塞在服务消费者身上,最后导致服务消费者也不可用。
2022-12-01
  目录