限流算法
目录
固定窗口
FixedWindowRateLimiter
类表示一个固定窗口限流器,使用 limit
和 interval
参数分别表示限制请求数量和时间间隔(毫秒)。在 allowRequest()
方法中,通过比较当前时间与上一次请求时间来判断是否需要重置请求数和上一次请求时间。如果请求数还没有达到限制数量,允许请求并增加请求数,否则拒绝请求。 缺点:短时间内可能会流量翻倍
public class FixedWindowRateLimiter {
private final int limit; // 限制请求数量
private final AtomicInteger count; // 当前请求数
private final long interval; // 时间间隔(毫秒)
private long lastRequestTime; // 上一次请求时间
public FixedWindowRateLimiter(int limit, long interval) {
this.limit = limit;
this.interval = interval;
this.count = new AtomicInteger(0);
this.lastRequestTime = System.currentTimeMillis();
}
public synchronized boolean allowRequest() {
long currentTime = System.currentTimeMillis();
if (currentTime - lastRequestTime > interval) {
// 如果距离上一次请求时间已经超过了时间间隔,重置请求数和上一次请求时间
count.set(0);
lastRequestTime = currentTime;
}
// 如果请求数还没有达到限制数量,允许请求并增加请求数
if (count.get() < limit) {
count.incrementAndGet();
return true;
}
return false; // 否则拒绝请求
}
}
// 使用示例
FixedWindowRateLimiter limiter = new FixedWindowRateLimiter(10, 1000); // 每秒最多处理10个请求
for (int i = 0; i < 20; i++) { // 尝试发起20个请求
if (limiter.allowRequest()) {
System.out.println("Allow request " + i);
} else {
System.out.println("Reject request " + i);
}
Thread.sleep(200); // 每次请求间隔200毫秒
}
滑动窗口
相比于固定窗口,滑动窗口有以下几个好处:
- 平滑限流:滑动窗口限流算法会以时间为轴将请求限制平均到每个时间段内,从而平滑了请求的涌入。相比于简单粗暴的限制请求的数量或速率等方式,这种平滑的限流方式能够更好地保证服务的可用性和稳定性。
- 精确控制:滑动窗口限流算法可以根据具体的业务需要设置窗口大小和时间间隔,从而实现对请求的精确控制。通过适当调整窗口大小和时间间隔,可以达到更好的限流效果。
关于这个,我觉得sentinel中的滑动窗口就非常的nice,下面是从sentinel中摘出来改一下的示例(同时也运用在我本人的中间件内),总得来说有三部分:
- 窗口存放的实体类(监控指标) HystrixEntity
- 窗口的定义 HystrixWindow
- 滑动窗口具体实现 HystrixWindowArray
// 窗口存放的实体类(监控指标)
public class HystrixEntity {
// 窗口请求数
private AtomicInteger requestCount;
// 窗口异常数
private AtomicInteger errorCount;
public HystrixEntity(){
this.requestCount=new AtomicInteger(0);
this.errorCount=new AtomicInteger(0);
}
public int getRequestCountValue() {
return requestCount.get();
}
public int getErrorCountValue() {
return errorCount.get();
}
public void resetValue() {
this.errorCount.set(0);
this.requestCount.set(0);
}
public void addErrorCount(){
this.errorCount.addAndGet(1);
}
public void addRequestCount(){
this.requestCount.addAndGet(1);
}
}
//窗口的定义 HystrixWindow
public class HystrixWindow {
// 窗口的长度 单位:ms
private final int windowLengthInMs;
// 窗口的开始时间戳 单位:ms
private long windowStartInMs;
// 窗口内存放的实体类
private HystrixEntity hystrixEntity;
public HystrixWindow(int windowLengthInMs, long windowStartInMs, HystrixEntity hystrixEntity) {
this.windowLengthInMs = windowLengthInMs;
this.windowStartInMs = windowStartInMs;
this.hystrixEntity = hystrixEntity;
}
public int getWindowLengthInMs() {
return windowLengthInMs;
}
public long getWindowStartInMs() {
return windowStartInMs;
}
public HystrixEntity getHystrixEntity() {
return hystrixEntity;
}
public void setHystrixEntity(HystrixEntity hystrixEntity) {
this.hystrixEntity = hystrixEntity;
}
/**
* @Description 重置窗口
**/
public HystrixWindow resetTo(long startTime) {
this.windowStartInMs = startTime;
hystrixEntity.resetValue();
return this;
}
/**
* @Description 判断时间是否属于该窗口
**/
public boolean isTimeInWindow(long timeMillis) {
return windowStartInMs <= timeMillis && timeMillis < windowStartInMs + windowLengthInMs;
}
}
//滑动窗口具体实现
public class HystrixWindowArray {
// 单个窗口的长度
private int windowLengthInMs;
// 窗口数量
private int sampleCount;
// 所有窗口的总长度
private int intervalInMs;
// 窗口数组
private final AtomicReferenceArray<HystrixWindow> array;
/**
* The conditional (predicate) update lock is used only when current bucket is deprecated.
*/
private final ReentrantLock updateLock = new ReentrantLock();
/**
* @Param [sampleCount, intervalInMs]
* sampleCount: 窗口数量 intervalInMs:所有窗口的总长度
**/
public HystrixWindowArray(int sampleCount, int intervalInMs) {
Assert.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
Assert.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
Assert.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount);
}
/**
* 获取当前时间所在的窗口下标索引
*/
private int calculateTimeIdx(long timeMillis) {
long timeId = timeMillis / windowLengthInMs;
// Calculate current index so we can map the timestamp to the leap array.
return (int)(timeId % array.length());
}
/**
* 获取当前时间所在的窗口开始时间
*/
private long calculateWindowStart(long timeMillis) {
return timeMillis - timeMillis % windowLengthInMs;
}
private HystrixEntity newEmptyWindowValue(long timeMillis){
return new HystrixEntity();
}
/**
* 获取当前窗口
*/
public HystrixWindow currentWindow() {
return currentWindow(System.currentTimeMillis());
}
private HystrixWindow currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
int idx = calculateTimeIdx(timeMillis);
// Calculate current bucket start time.
long windowStart = calculateWindowStart(timeMillis);
while (true) {
HystrixWindow old = array.get(idx);
if (old == null) {
// 如果获取为空,说明窗口还没创建,所以我们创建一个新的窗口(CAS保证线程安全)
HystrixWindow window = new HystrixWindow(windowLengthInMs, windowStart, newEmptyWindowValue(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// 创建成功则返回当前窗口
return window;
} else {
// 创建失败说明发生了竞争,所以暂时先让出CPU
Thread.yield();
}
} else if (windowStart == old.getWindowStartInMs()) {
// 如果窗口已经存在,则对比窗口的开始时间是否相同,相同说明是用同一个窗口,直接返回窗口就可以了
return old;
} else if (windowStart > old.getWindowStartInMs()) {
// 如果窗口已经存在,而且窗口开始时间比之前的窗口开始时间要大
// 说明原来的窗口已经过时了,需要替换一个新的窗口
// 所以加锁防止竞争
if (updateLock.tryLock()) {
try {
// 这里我选择直接重置之前的窗口()
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart < old.getWindowStartInMs()) {
// 窗口的开始时间比之前的窗口开始时间还会小,这种属于异常情况
// 要是真出现了也只能新建一个窗口返回了
return new HystrixWindow(windowLengthInMs, windowStart, newEmptyWindowValue(timeMillis));
}
}
}
/**
* 获取当前窗口内的值
*/
public HystrixEntity getWindowValue() {
return getWindowValue(System.currentTimeMillis());
}
public HystrixEntity getWindowValue(long timeMillis) {
if (timeMillis < 0) {
return null;
}
int idx = calculateTimeIdx(timeMillis);
HystrixWindow bucket = array.get(idx);
if (bucket == null || !bucket.isTimeInWindow(timeMillis)) {
return null;
}
return bucket.getHystrixEntity();
}
/**
* 重置一个窗口
*/
private HystrixWindow resetWindowTo(HystrixWindow window, long startTime){
return window.resetTo(startTime);
}
public List<HystrixEntity> values() {
return values(System.currentTimeMillis());
}
private List<HystrixEntity> values(long timeMillis) {
if (timeMillis < 0) {
return new ArrayList<HystrixEntity>();
}
int size = array.length();
List<HystrixEntity> result = new ArrayList<HystrixEntity>(size);
for (int i = 0; i < size; i++) {
HystrixWindow window = array.get(i);
if (window == null || isWindowDeprecated(timeMillis, window)) {
continue;
}
result.add(window.getHystrixEntity());
}
return result;
}
/**
* 判断窗口是否有效
*/
public boolean isWindowDeprecated(long time, HystrixWindow window) {
return time - window.getWindowStartInMs() > intervalInMs;
}
}
使用示例:
// 初始化 代表监控1s内的指标 窗口数为2
HystrixWindowArray hystrixWindowArray = new HystrixWindowArray(2, 1000);
// 指标监控 数量+1
windowArray.currentWindow().getHystrixEntity().addErrorCount();
windowArray.currentWindow().getHystrixEntity().addRequestCount();
// 获取所有窗口的指标累计 判断是否超标,也就是1s内的总计
List<HystrixEntity> windowValues = windowArray.values();
Integer errorCount = hystrixEntities.stream().map(HystrixEntity::getErrorCountValue).reduce(Integer::sum).get();
Integer requestCount = hystrixEntities.stream().map(HystrixEntity::getRequestCountValue).reduce(Integer::sum).get();
令牌桶算法
思想: 固定时间内(例如 1 秒)通过一个桶来存储令牌,每当接收到一个请求时就会消耗一个令牌。如果请求过来时没有令牌,则无法继续处理该请求。
优点:
- 相比于漏桶算法,令牌桶算法具有更好的适应性,可以应对短时间内的流量波动。(漏桶算法只能处理恒定速率的流量)
代码如下:
public class TokenBucket {
private long lastTime; // 上次请求时间
private double rate; // 令牌放入速率
private long capacity; // 令牌桶容量
private long tokens; // 当前令牌数量
public TokenBucket(double rate, long capacity) {
this.lastTime = System.currentTimeMillis();
this.rate = rate;
this.capacity = capacity;
this.tokens = capacity;
}
public synchronized boolean getToken() {
long now = System.currentTimeMillis();
long timeElapsed = now - lastTime;
tokens += timeElapsed * rate;
if (tokens > capacity) {
tokens = capacity;
}
lastTime = now;
if (tokens >= 1) {
tokens--;
return true;
} else {
return false;
}
}
}
漏斗桶算法
漏桶算法(Leaky Bucket)是网络世界中流量整形(Traffic Shaping)或速率限制(Rate Limiting)时经常使用的一种算法,它的主要目的是控制数据注入到网络的速率,平滑网络上的突发流量。漏桶算法提供了一种机制,通过它,突发流量可以被整形以便为网络提供一个稳定的流量
优点:
- 控制速率:漏斗桶算法可以限制数据流量的传输速率,确保各个环节的数据处理能力都得到了满足,从而避免了系统因为数据过多而导致的崩溃和瘫痪。
- 平滑输出:漏斗桶算法通过将数据分散到不同的时间段内进行处理,使得数据传输的输出更加平稳,不会出现明显的波动,提高了网络的稳定性。
代码如下:
public class LeakyBucket {
private int capacity; //漏桶容量
private int rate; //漏水速率
private int water; //当前水量
private Instant timestamp; //上次漏水时间
public LeakyBucket(int capacity, int rate) {
this.capacity = capacity;
this.rate = rate;
this.water = 0;
this.timestamp = Instant.now();
}
public synchronized boolean allow() { //判断是否允许通过
Instant now = Instant.now();
long duration = now.toEpochMilli() - timestamp.toEpochMilli(); //计算距上次漏水过去了多久
int outflow = (int) (duration * rate / 1000); //计算过去的时间内漏出的水量
water = Math.max(0, water - outflow); //更新当前水量,不能小于0
if (water < capacity) { //如果漏桶还没满,放行
water++;
timestamp = now;
return true;
}
return false; //否则拒绝通过
}
}