@w5/redistream
v0.0.64
Published
[‼️]: ✏️README.mdt
Downloads
61
Readme
@w5/redistream
redis stream 用法
- 创建流和消费组
stream_name = 'img'
group_name = 'get'
# 0 表示从头开始读取
await redis.xgroup('create', stream_name, group_name, 0, 'MKSTREAM')
- 每个消费者批量认领任务(redis 会确保一个消费者只认领一个任务)
> os > hostname
BLOCK_TIMEOUT = 10 * 60 * 1000 # 10分钟超时
consumer = hostname()
for [_, li] from ( # _ 是 stream name
await redis.xreadgroup(
'GROUP', group_name
consumer
'COUNT', limit,
'BLOCK', block_timeout
'STREAMS',stream_name
'>'
) or []
)
for [id, msg] from li
try
await func(id,msg)
finnaly
await redis.xdel stream_name, id
- 处理长期未 ACK 的 pending(被 xdel 的也会在这个阶段删除)
loop
[
_
li
] = await redis.xautoclaim(stream_name, GROUP_NAME, consumer, 1000, '0-0')
if not li.length
break
for [id, msg] from li
try
await func(id,msg)
finnaly
await redis.xdel stream_name, id
- 每天清理长期未工作的消费者
for [_,name,_,pending,_,idle] from await redis.xinfo('CONSUMERS', stream_name, GROUP_NAME)
if pending == 0 and idle > 86400000
await redis.xgroup('DELCONSUMER',stream_name,GROUP_NAME,name)
#!/usr/bin/env coffee
output :
./out.txt