异步任务处理框架设计
2024-12-12
我们的产品添加了一个需求:用户通过手机采集了自己的脑电数据,我们产品提供脑电分析的功能,分析完成后,会在手机APP中展示分析后的结果。部分分析结果如下图所示:
这个功能的实现需要多个流程组合完成:
- 手机App采集到脑电数据,存储为edf文件
- App将edf文件传输到核心云平台存储,存储于OSS
- 脑电分析节点获取脑电edf文件,进行分析,输出结果
- 用户获取脑电分析结果
根据这个产品功能,我们要设计一个任务处理后台,来实现脑电分析的功能。
在架构设计中,首选思考这个问题:这个任务耗时多久?
经过测试,如果是较小脑电文件,分析需要0.5秒,如果是比较大的脑电文件(我们业务上支持的最大脑电文件为60分钟约10MB),分析时间需要3秒以上,如果让用户阻塞等待数秒,体验会很不好。在高并发的情况下,这里后台可能处理不过来。尤其是我们产品的用户使用多人冥想的功能时,这里的人数可能是数十人,一旦房间冥想结束时,会瞬间数十个脑电分析请求到后台,此时可能击垮业务后台。
综合分析后,这个脑电分析后台应设计为异步任务调度架构,用户提交任务后,不应该长时间阻塞,而是快速返回,让用户决定是希望原页面等待还是去使用其他功能。
第二个问题,这是IO密集型任务还是计算密集型任务?
我们继续拆解脑电分析的核心流程:
- 首先是脑电文件的下载:网络IO
- edf文件加载到内存:文件IO
- 算法分析:CPU计算
- 结果编码和压缩:CPU计算
- 分析结果存储于数据库和OSS:网络IO
综合来看,这是IO密集和计算密集的混合任务。
以下是我们的脑电分析-异步任务处理框架图。数据库选择MySQL,消息队列rabbitMQ,通过celery去调度:
数据库选择MySQL,消息队列rabbitMQ,通过celery去调度,节点上分为脑电分析计算节点(即celery的worker节点)和应用后台服务(提供数据查询和celery任务投递功能)。
脑电分析流程总共分为7大步骤,这里详细讲解:
- 手机APP收集好脑电数据,存储为edf文件,接着edf文件会通过网络传输到OSS进行存储。
- 确定原始脑电文件上传成功后,APP向应用后台发起脑电分析请求。
- 应用后台向数据库插入新任务,标记该任务的状态为running,返回任务ID为客户端,客户端若还处于原页面,那开始限时2分钟的轮询,轮询间隔是2秒。
- 应用后台会向celery投递任务。
- 脑电分析计算节点从celery拉取到任务,分别进行脑电下载、脑电分析、结果进行msgpack编码、写入文件、压缩。
- 脑电分析结果文件上传OSS。
- 任务状态标记为success,写入数据库。
脑电分析计算节点上,任意一个子步骤发生异常,都会进程“失败处理”逻辑。异常会被捕捉,然后执行celery的retry,重试就是该任务会被重新推进消息队列里,等待消费。retry的总次数我们设置为9次,超过该次数后,任务会标记为失败,更新到数据库里,不再重试。重试策略使用了二进制指数退避策略,即:
第一次失败: 等待1秒后重试
第二次失败:等待2秒后重试
第三次失败:等待8秒后重试
第四次失败:等待16秒后重试
...
第九次失败:等待512秒重试
第十次失败:结束重试,标记任务失败
所以一个任务最多在队列里停留13分钟(假设没有堆积),这策略保证了异常的任务不会永久堆积在消息队列从而导致的资源浪费的情况。
一般而言,应用后台服务用Go实现,从业务场景上看没有性能瓶颈,因为都是些IO操作的请求,且请求量不多,之前做过压力测试,抗10000并发没有问题。
性能瓶颈在于脑电分析节点,celery是Python实现,性能上首先就不占优势,另外计算节点的CPU密集型操作都以Python实现,这里很可能会因为用户的上升导致处理速度越来越慢,所以横向扩展的能力就十分需要了。celery的worker的横向扩展做得不错,比如worker的启动是支持指定进程数的, -c
表示起多少个进程,这样我们可以调整合适的进程数,压榨单机性能:
celery -A celery_worker worker -l info -c 2
如果单机性能不足,也可以在另外的一台机上部署celery worker加强消费能力即可,形成分布式计算,横向扩展十分方便。
一个简单的脑波分析任务各流程耗时打印:
[2024-12-12 13:51:18,285: INFO/MainProcess] Task celery_worker.eeg_seperation[8c76fea9-0632-47bd-ab68-a436e3f647b6] received
[2024-12-12 13:51:18,286: INFO/ForkPoolWorker-1] 收到消息: {"jobuuid": "e640a64b-f8fe-45dd-aa4d-7a9c579064a4", "eeguuid": "f99520a0-b84c-11ef-ab6b-b98a12423cee", "task_type": "seperation", "useraccount": "", "appid": 2, "extra": null, "retry": 0, "uid": 3205}
[2024-12-12 13:51:18,287: INFO/ForkPoolWorker-1] 开始处理脑电任务: {'jobuuid': 'e640a64b-f8fe-45dd-aa4d-7a9c579064a4', 'eeguuid': 'f99520a0-b84c-11ef-ab6b-b98a12423cee', 'task_type': 'seperation', 'useraccount': '', 'appid': 2, 'extra': None, 'retry': 0, 'uid': 3205}
[2024-12-12 13:51:18,337: INFO/ForkPoolWorker-1] Downloaded f99520a0-b84c-11ef-ab6b-b98a12423cee from bucket hnnk-eeg to ./tmp/e06d370fb114495f9c5533f33a1920cb/in_f99520a0-b84c-11ef-ab6b-b98a12423cee
[2024-12-12 13:51:18,337: INFO/ForkPoolWorker-1] 下载操作耗时: 0.021227121353149414 秒
[2024-12-12 13:51:18,383: INFO/ForkPoolWorker-1] 波形分离耗时: 0.04579472541809082 秒
[2024-12-12 13:51:18,478: INFO/ForkPoolWorker-1] 上传操作耗时: 0.0941157341003418 秒, 压缩=True,response=None
[2024-12-12 13:51:18,501: INFO/ForkPoolWorker-1] 写数据库操作耗时: 0.022878646850585938 秒
[2024-12-12 13:51:18,501: INFO/ForkPoolWorker-1] 任务处理函数总耗时: 0.21423101425170898 秒
[2024-12-12 13:51:18,501: INFO/ForkPoolWorker-1] 任务处理完成: {'jobuuid': 'e640a64b-f8fe-45dd-aa4d-7a9c579064a4', 'eeguuid': 'f99520a0-b84c-11ef-ab6b-b98a12423cee', 'task_type': 'seperation', 'useraccount': '', 'appid': 2, 'extra': None, 'retry': 0, 'uid': 3205}
[2024-12-12 13:51:18,501: INFO/ForkPoolWorker-1] Task celery_worker.eeg_seperation[8c76fea9-0632-47bd-ab68-a436e3f647b6] succeeded in 0.2151807639747858s: None