用redis实现有优先级的"celery"

【需求背景】

对于异步任务处理,相信很多人首选celery,的确,celery处理异步任务非常强悍,使用简单,支持各种并发。但是,大家来看看我所遇到的一个应用场景:每次后台上传一个游戏母包,然后对这个母包处理(添加某种标识,比如id)生成多个游戏子包,其中有一些id号的包是要求尽快的处理的,剩下的可以闲时处理。这里就对要把一个母包分成两个任务来处理,其中一个是优先处理的,另一个是闲时处理。

【方案初探】

对于上面的场景,最先想到的方案是,把每个母包处理任务分成优先和闲时两个celery任务队列分别处理,分别单独配给cpu资源(土豪的话给多一台机器也行)专门处理。大家估计也想到这种做法的弊端了,这样无法有效使用资源,当优先任务队列没有任务时,闲时任务队列却满载,显然这种设计方案不是很好。

那么有没有更好的处理方案呢?试想如果任务可以按优先级别在队列中排队就好了。显然celery并没有提供优先队列这种机制,那么我们只能自己实现一个celery一样的异步事件队列,并且支持优先级的队列。这时候显然想到的是redis

【redis优先队列】

redis中提供了BLPOP,RPUSH(RLPOP,LPSUH)这些队列操作。

来看看BLOOP的介绍:
BLPOP key [key …] timeout
BLPOP 是列表的阻塞式(blocking)弹出原语。
它是 LPOP 命令的阻塞版本,当给定列表内没有任何元素可供弹出的时候,连接将被 BLPOP命令阻塞,直到等待超时,或有另一个客户端对给定 key 的任意一个执行 LPUSH 或 RPUSH 命令为止。
当给定多个 key 参数时,按参数 key的先后顺序依次检查各个列表,弹出第一个非空列表的头元素(这是就是实现优先级的关键)。

那么我们可以设置两个key,一个表示优先任务的key,姑且叫priority_task,另一个闲时任务的key,就叫normal_task。在添加任务时,把对应任务所要必备参数添加的对应的key值队列即可。具体如下:

1
2
3
4
5
6
7
8
9
10
11
priority_task = {  # 优先任务
'id_list': [1, 2, 3], # 对应要生成子包id列表
'root_package_id': 10086 # 母包数据表索引id
}
redis.rpush('priority_task', json.dumps(priority_task))

normal_task = { # 普通任务
'id_list': [5, 6, 7, 8], # 对应要生成子包id列表
'root_package_id': 10086 # 母包数据表索引id
}
redis.rpush('normal_task', json.dumps(normal_task))

成功入队后,接下来就是不断从队列中取出任务,然后对应处理,大概代码如下:

1
2
3
4
while 1:
# 监听任务,没有打包任务则阻塞
key, task = redis.blpop(['priority_task', 'normal_task'])
deal_task(key, json.loads(task))

【任务动态切换】

上面实现保证了每次从队列取出的任务都是优先级别最高的,但是存在着问题,比如当前正在处理闲时任务,可是这个闲时任务可能要处理200+个包,这时候队列中又来了一个优先任务,那么这个优先任务必须等待之前的闲时任务处理完成才能开始处理,这显然不是我们想要的,那么我们能挂起当前正在处理的闲时任务,先去处理优先任务吗。显然是可以的,就是一个最简单的协程:函数调用。只需要在闲时任务处理完每个子包后,检查优先任务队列是否有元素,有则调用函数先处理优先任务,等优先任务完成后,再继续处理闲时任务。
处理函数大概如下:

1
2
3
4
5
6
7
8
9
def deal_task(key, task):  # 任务处理函数
id_list = task['id_list'] # 要生成的子包id
for id in id_list:
do something.... # 生成对应的id子包
if key == 'normal_key': # 如果当前是闲时任务
while redis.llen('priority_key') > 0: # 检查是否有优先任务,有则获取并执行
priority_task = redis.lpop('priority_key')
if priority_task:
deal_task('priority_key', json.loads(priority_task)) # 执行优先任务处理

以上就是实现一个单进程处理异步优先任务队列的全过程。

【多进程化】

上面实现都是单进程处理的,为了提升处理效率,我们可以开多个进程提升并发量,这里建议使用supervisor来管理你的这些进程。这里需要注意:

  1. 多进程处理临界资源,如果没有相关临界资源的竞争那最好,如果有,那么你必须考虑怎么处理,一般是用队列顺序化。
  2. supervisor持久化进程数据库链接,会导致数据库虽然已经断开连接,但是进程并不知晓,当进程再次执行数据库查询时就会出错,mysql一般会报一个gone away的错误。
    注:还可以用进程池异步处理。

【最后】

以上是本人的处理方案,如果有更好的建议记得留下宝贵的意见(>▽<)。

文章目录
  1. 1. 【需求背景】
  2. 2. 【方案初探】
  3. 3. 【redis优先队列】
  4. 4. 【任务动态切换】
  5. 5. 【多进程化】
  6. 6. 【最后】
,