参考书籍:《redis深度历险:核心原理与应用实践》
如果系统要限定用户的某个行为在指定的时间里只能允许发生N次,这里使用Redis的数据结构来实现这个简单限流。
首先先定义一个接口
#指定用户user_id的某个行为action_key在特定的时间内period只允许发生的最多次数max_countdef is_action_allowed(user_id,action_key,period,max_count): return True#调用接口can_reply=is_action_allowed(user_id,action_key,period,max_count)if can_reply: do_reply()else: raise ActionthresholdOverflow()
在这个接口中,我们实现这样一个解决方南。我们创建一个滑动时间窗口,我们只需要保留这个渐渐窗口,窗口之外的数据都可以砍掉。我们用zset的score值圈出这个时间窗口而zast的value只需要保证唯一性就好,使用uuid会比较浪费空间,就改用毫秒时间戳就好。
如上图所示,我们用一个zset结构记录用户的行为历史,每一个行为都会作为zset中的key保存下来。同一个用户的同一种行为为一个zset记录。为了节省内存,我们只需要保留窗口时间内的行为记录。如果是冷用户,zset在是空的的时候会自动释放内存这个特性可以解决。
通过统计滑动窗口内的行为数量与预支进行比较就可以得出当前的行为是否允许。
下面我们来实现这个接口
import timeimport redisclient=redis.StrictRedis()def is_action_allowed(user_id,action_key,period,max_count): key='hist:%s%s'%(user_id,action_key) now_ts=int(time.time()*1000)#毫秒时间戳 with client.pipeline() as pipe:#pipe现在是client.pipeline得到的 #记录行为 pipe.zadd(key,now_ts,now_ts)#value和score都是用毫秒时间戳 #移除时间窗口之前的行为记录,剩下的都是时间窗口内的 pipe.zremrangebyscore(key,0,now_ts-period*1000) #获取窗口内的行为数量 pipe.zcard(key) #设置zset过期时间,避免冷用户持续占用内存 过期时间应该等于时间窗口的长度 pipe.expire(key,period) #批量执行 _,_,current_count,_=pipe.execute() #比较数量是否超标 return current_count<=max_count#运行这个后你会发现,其实会输出8个True是因为会生成部分相同的毫秒时间戳,zadd会去重 #所以个数就多了,可以将时间再精确一点来改善这种情况 for i in range(20): print(is_action_allowed('hello','reply',60,5))
在这个实现代码里,我们是先将行为添加进去,然后在查询是否超过了操作次序的原因是这样的。如果请求进来,第一步不是查询而是添加,这样即使最后计算结果是超过了操作次数,不继续执行后续代码,看起来用户的操作没有起到作用,但是数据是已经进入到了redis,如果遭到恶意攻击,数据库运行是正常的,但是redis中有着太多垃圾数据,这时候内存可能会不够用,导致redis的节点不工作,从而扩展到整个服务器。