# 如何限流?在工作中是怎么做的?说一下具体的实现? - Author: [HuiFer](https://github.com/huifer) - Description: 该文简单介绍限流相关技术以及实现 ## 什么是限流 > 限流可以认为服务降级的一种,限流就是限制系统的输入和输出流量已达到保护系统的目的。一般来说系统的吞吐量是可以被测算的,为了保证系统的稳定运行,一旦达到的需要限制的阈值,就需要限制流量并采取一些措施以完成限制流量的目的。比如:延迟处理,拒绝处理,或者部分拒绝处理等等。 ## 限流方法 ### 计数器 #### 实现方式 - 控制单位时间内的请求数量 ```java import java.util.concurrent.atomic.AtomicInteger; public class Counter { /** * 最大访问数量 */ private final int limit = 10; /** * 访问时间差 */ private final long timeout = 1000; /** * 请求时间 */ private long time; /** * 当前计数器 */ private AtomicInteger reqCount = new AtomicInteger(0); public boolean limit() { long now = System.currentTimeMillis(); if (now < time + timeout) { // 单位时间内 reqCount.addAndGet(1); return reqCount.get() <= limit; } else { // 超出单位时间 time = now; reqCount = new AtomicInteger(0); return true; } } } ``` - 劣势 - 假设在 00:01 时发生一个请求,在 00:01-00:58 之间不在发送请求,在 00:59 时发送剩下的所有请求 `n-1` (n 为限流请求数量),在下一分钟的 00:01 发送 n 个请求,这样在 2 秒钟内请求到达了 `2n - 1` 个. - 设每分钟请求数量为 60 个,每秒可以处理 1 个请求,用户在 00:59 发送 60 个请求,在 01:00 发送 60 个请求 此时 2 秒钟有 120 个请求(每秒 60 个请求),远远大于了每秒钟处理数量的阈值 ### 滑动窗口 #### 实现方式 - 滑动窗口是对计数器方式的改进, 增加一个时间粒度的度量单位 - 把一分钟分成若干等分(6 份,每份 10 秒), 在每一份上设置独立计数器,在 00:00-00:09 之间发生请求计数器累加 1.当等分数量越大限流统计就越详细 ```java package com.example.demo1.service; import java.util.Iterator; import java.util.Random; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.stream.IntStream; public class TimeWindow { private ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); /** * 间隔秒数 */ private int seconds; /** * 最大限流 */ private int max; public TimeWindow(int max, int seconds) { this.seconds = seconds; this.max = max; /** * 永续线程执行清理queue 任务 */ new Thread(() -> { while (true) { try { // 等待 间隔秒数-1 执行清理操作 Thread.sleep((seconds - 1) * 1000L); } catch (InterruptedException e) { e.printStackTrace(); } clean(); } }).start(); } public static void main(String[] args) throws Exception { final TimeWindow timeWindow = new TimeWindow(10, 1); // 测试3个线程 IntStream.range(0, 3).forEach((i) -> { new Thread(() -> { while (true) { try { Thread.sleep(new Random().nextInt(20) * 100); } catch (InterruptedException e) { e.printStackTrace(); } timeWindow.take(); } }).start(); }); } /** * 获取令牌,并且添加时间 */ public void take() { long start = System.currentTimeMillis(); try { int size = sizeOfValid(); if (size > max) { System.err.println("超限"); } synchronized (queue) { if (sizeOfValid() > max) { System.err.println("超限"); System.err.println("queue中有 " + queue.size() + " 最大数量 " + max); } this.queue.offer(System.currentTimeMillis()); } System.out.println("queue中有 " + queue.size() + " 最大数量 " + max); } } public int sizeOfValid() { Iterator it = queue.iterator(); Long ms = System.currentTimeMillis() - seconds * 1000; int count = 0; while (it.hasNext()) { long t = it.next(); if (t > ms) { // 在当前的统计时间范围内 count++; } } return count; } /** * 清理过期的时间 */ public void clean() { Long c = System.currentTimeMillis() - seconds * 1000; Long tl = null; while ((tl = queue.peek()) != null && tl < c) { System.out.println("清理数据"); queue.poll(); } } } ``` ### Leaky Bucket 漏桶 #### 实现方式 - 规定固定容量的桶, 有水进入, 有水流出. 对于流进的水我们无法估计进来的数量、速度, 对于流出的水我们可以控制速度. ```java public class LeakBucket { /** * 时间 */ private long time; /** * 总量 */ private Double total; /** * 水流出去的速度 */ private Double rate; /** * 当前总量 */ private Double nowSize; public boolean limit() { long now = System.currentTimeMillis(); nowSize = Math.max(0, (nowSize - (now - time) * rate)); time = now; if ((nowSize + 1) < total) { nowSize++; return true; } else { return false; } } } ``` ### Token Bucket 令牌桶 #### 实现方式 - 规定固定容量的桶, token 以固定速度往桶内填充, 当桶满时 token 不会被继续放入, 每过来一个请求把 token 从桶中移除, 如果桶中没有 token 不能请求 ```java public class TokenBucket { /** * 时间 */ private long time; /** * 总量 */ private Double total; /** * token 放入速度 */ private Double rate; /** * 当前总量 */ private Double nowSize; public boolean limit() { long now = System.currentTimeMillis(); nowSize = Math.min(total, nowSize + (now - time) * rate); time = now; if (nowSize < 1) { // 桶里没有token return false; } else { // 存在token nowSize -= 1; return true; } } } ``` ## 工作中的使用 ### spring cloud gateway - spring cloud gateway 默认使用 redis 进行限流, 笔者一般只是修改修改参数属于拿来即用. 并没有去从头实现上述那些算法. ```xml org.springframework.cloud spring-cloud-starter-gateway org.springframework.boot spring-boot-starter-data-redis-reactive ``` ```yaml spring: cloud: gateway: routes: - id: requestratelimiter_route uri: lb://pigx-upms order: 10000 predicates: - Path=/admin/** filters: - name: RequestRateLimiter args: redis-rate-limiter.replenishRate: 1 # 令牌桶的容积 redis-rate-limiter.burstCapacity: 3 # 流速 每秒 key-resolver: "#{@remoteAddrKeyResolver}" #SPEL表达式去的对应的bean - StripPrefix=1 ``` ```java @Bean KeyResolver remoteAddrKeyResolver() { return exchange -> Mono.just(exchange.getRequest().getRemoteAddress().getHostName()); } ``` ### sentinel - 通过配置来控制每个 url 的流量 ```xml com.alibaba.cloud spring-cloud-starter-alibaba-sentinel ``` ```yaml spring: cloud: nacos: discovery: server-addr: localhost:8848 sentinel: transport: dashboard: localhost:8080 port: 8720 datasource: ds: nacos: server-addr: localhost:8848 dataId: spring-cloud-sentinel-nacos groupId: DEFAULT_GROUP rule-type: flow namespace: xxxxxxxx ``` - 配置内容在 nacos 上进行编辑 ```json [ { "resource": "/hello", "limitApp": "default", "grade": 1, "count": 1, "strategy": 0, "controlBehavior": 0, "clusterMode": false } ] ``` - resource:资源名,即限流规则的作用对象。 - limitApp:流控针对的调用来源,若为 default 则不区分调用来源。 - grade:限流阈值类型,QPS 或线程数模式,0 代表根据并发数量来限流,1 代表根据 QPS 来进行流量控制。 - count:限流阈值 - strategy:判断的根据是资源自身,还是根据其它关联资源 (refResource),还是根据链路入口 - controlBehavior:流控效果(直接拒绝 / 排队等待 / 慢启动模式) - clusterMode:是否为集群模式 ### 总结 > sentinel 和 spring cloud gateway 两个框架都是很好的限流框架, 但是在我使用中还没有将[spring-cloud-alibaba](https://github.com/alibaba/spring-cloud-alibaba)接入到项目中进行使用, 所以我会选择**spring cloud gateway**, 当接入完整的或者接入 Nacos 项目使用 setinel 会有更加好的体验.