bee-queue, 一个简单快速健壮的node.js job/task队列, 支持Redis

分享于 

6分钟阅读

GitHub

  繁體
A simple, fast, robust job/task queue for Node.js, backed by Redis.
  • 源代码名称:bee-queue
  • 源代码网址:http://www.github.com/bee-queue/bee-queue
  • bee-queue源代码文档
  • bee-queue源代码下载
  • Git URL:
    git://www.github.com/bee-queue/bee-queue.git
    Git Clone代码到本地:
    git clone http://www.github.com/bee-queue/bee-queue
    Subversion代码到本地:
    $ svn co --depth empty http://www.github.com/bee-queue/bee-queue
    Checked out revision 1.
    $ cd repo
    $ svn up trunk
    

    Node.js的简单、快速、健壮的作业/任务队列,由Redis支持。

    • 简单:~1000 LOC和最小依赖
    • 健壮:设计时考虑到并发性,原子性和故障;接近完整的代码覆盖率
    constQueue=require('bee-queue');constqueue=newQueue('example');constjob=queue.createJob({x:2, y:3})job.save();job.on('succeeded', (result) => {
     console.log(`Received result for job ${job.id}: ${result}`);
    });// Process jobs from as many servers or processes as you likequeue.process(function (job, done) {
     console.log(`Processing job ${job.id}`);
     returndone(null, job.data.x+job.data.y);
    });

    安装

    $ npm install bee-queue

    还需要运行Redis 2.8+*。

    *我们已经注意到,由于Redis<3.2的问题,一些作业会被延迟,因此建议使用Redis 3.2+。

    创建队列

    对象是这个库一切的起点,我们只需要给它一个名称,通常表示它要处理的工作类型:

    constQueue=require('bee-queue');constaddQueue=newQueue('addition');

    队列是非常轻量级的,所以如果需要处理不同类型的作业,只需实例化一个队列。

    constsubQueue=newQueue('subtraction', {
     redis: {
     host:'somewhereElse' },
     isWorker:false});

    这里,我们传递一个settings对象来指定备用的Redis主机,并指示此队列将只添加作业(不处理它们),有关更多选项,请参见队列Settings

    创建作业

    作业是使用Queue.createJob(data)创建的,它返回一个存储任意data对象。

    作业有用于配置作业的链式API,.save([cb])方法用于将作业保存到Redis,并排队以便处理:

    constjob=addQueue.createJob({x:2, y:3});
    job
     .timeout(3000)
     .retries(2)
     .save()
     .then((job) => {
     // job enqueued, job.id populated });

    除了调用可选回调外,作业的save方法还返回一个Promise。

    每个作业都可以配置命令.setId(id).retries(n).backoff(strategy, delayFactor)

    处理作业

    要开始处理作业,请调用Queue.process,并提供处理程序函数:

    addQueue.process(function (job, done) {
     console.log(`Processing job ${job.id}`);
     returndone(null, job.data.x+job.data.y);
    });

    处理程序函数可以返回Promise,而不是调用提供的回调,这使直观地使用async /await

    addQueue.process(async (job) => {
     console.log(`Processing job ${job.id}`);
     returnjob.data.x+job.data.y;
    });

    .process还可以使用并发参数,如果你的作业大部分时间都在等待外部资源,你可能希望每个处理器实例一次最多处理10个:

    constbaseUrl='http://www.google.com/search?q=';subQueue.process(10, function (job, done) {
     http.get(`${baseUrl}${job.data.x}-${job.data.y}`, function (res) {
     // parse the difference out of the response...returndone(null, difference);
     });
    });

    进度报告

    处理程序可以发送进度报告,它将作为原始作业实例上的事件接收:

    constjob=addQueue.createJob({x:2, y:3}).save();job.on('progress', (progress) => {
     console.log(`Job ${job.id} reported progress: ${progress}%`);
    });addQueue.process(async (job) => {
     // do some workjob.reportProgress(30);
     // do more workjob.reportProgress(80);
     // do the rest});

    就像.process一样,这些progress事件跨多个进程或服务器工作;无论处理在哪里发生,作业实例都将接收进度事件,注意,这种机制依赖于Pub/Sub,因此会为每个额外的工作者node带来额外的开销。

    作业和队列事件

    Bee-Queue对象发出了三种事件:队列本地事件,队列PubSub事件和作业事件。

    设置

    默认队列设置为:

    constqueue=newQueue('test', {
     prefix:'bq',
     stallInterval:5000,
     nearTermWindow:1200000,
     delayedDebounce:1000,
     redis: {
     host:'127.0.0.1',
     port:6379,
     db:0,
     options: {}
     },
     isWorker:true,
     getEvents:true,
     sendEvents:true,
     storeJobs:true,
     ensureScripts:true,
     activateDelayedJobs:false,
     removeOnSuccess:false,
     removeOnFailure:false,
     redisScanCount:100});

    fast  RED  Redis  job  队列  Robust