rsmq-worker, helper 来简单地实现RSMQ周围的一个工作人员( 一个简单的消息队列)

分享于 

19分钟阅读

GitHub

  繁體 雙語
Helper to simply implement a worker around RSMQ ( Redis Simple Message Queue )
  • 源代码名称:rsmq-worker
  • 源代码网址:http://www.github.com/mpneuried/rsmq-worker
  • rsmq-worker源代码文档
  • rsmq-worker源代码下载
  • Git URL:
    git://www.github.com/mpneuried/rsmq-worker.git
    Git Clone代码到本地:
    git clone http://www.github.com/mpneuried/rsmq-worker
    Subversion代码到本地:
    $ svn co --depth empty http://www.github.com/mpneuried/rsmq-worker
    Checked out revision 1.
    $ cd repo
    $ svn up trunk
    

    RSMQ-Worker

    Build StatusWindows TestsCoveralls Coverage

    Deps Statusnpm versionnpm downloads

    Join the chat at https://gitter.im/mpneuried/rsmq-worker

    helper 只需实现一个工人 RSMQ ( Redis简单消息队列)。

    NPM

    安装

     npm install rsmq-worker

    初始化

    newRSMQWorker( queuename, options );

    示例:

    var RSMQWorker =require( "rsmq-worker" );
     var worker =newRSMQWorker( "myqueue" );
     worker.on( "message", function( msg, next, id ){
     // process your messageconsole.log("Message id : "+ id);
     console.log(msg);
     next()
     });
     // optional error listenersworker.on('error', function( err, msg ){
     console.log( "ERROR", err, msg.id );
     });
     worker.on('exceeded', function( msg ){
     console.log( "EXCEEDED", msg.id );
     });
     worker.on('timeout', function( msg ){
     console.log( "TIMEOUT", msg.id, msg.rc );
     });
     worker.start();

    配置

    • queuename:: ( 需要 String ) 用于提取消息的值
    • 选项 ( Object 可选) 配置对象
      • options.interval: ( Number[] 可选;默认值= [ 0, 1, 5, 10 ] ) 秒增加等待时间的array。 更详细的细节
      • options.maxReceiveCount: ( Number 可选;默认值= 10 ) 接收计数,直到消息被超过
      • options.invisibletime: ( Number 可选;默认值= 30 ) 在接收消息后隐藏消息的时间秒。
      • 向队列发送新消息的默认延迟为 options.defaultDelay: ( Number 可选;默认值= 1 )。
      • options.autostart: ( Boolean 可选;默认值= false ) 启动时启动工作线程
      • options.timeout: ( Number 可选;默认值= 3000 ) 消息进程超时。 因此,你必须在 比如 3000毫秒之后调用 messagenext() 方法。 如果设置为 0,它将一直等待到无穷远。
      • 收费 : ( Function 可选;) 一个自定义函数,带有原始消息( 请参阅消息格式) 作为参数来构建自定义超过检查。 如果你返回一个 true,消息将不会超过。 在返回 false 时,将使用 maxReceiveCount的常规检查。
      • : ( Boolean 可选;默认值= false ),即使连接了错误监听器,也会记录到控制台。
      • options.rsmq: ( RedisSMQ 可选;默认值= null ) 一个已经存在的rsmq实例,而不是创建一个新的客户端
      • 如果没有定义 rsmq 实例,options.redis: ( RedisClient 可选;默认值= null ) 已经存在的redis客户机实例
      • 如果没有定义 rsmq 实例,则为 options.redisPrefix: ( String 可选;默认值= "" ) 前缀/命名空间。 这必须与RSMQ的选项 ns 匹配。
      • 如果没有定义 rsmq 或者 redis 实例,options.host: ( String 可选;默认值= "localhost" ) 主机连接到 redis
      • 如果没有定义 rsmq 或者 redis 实例,options.port: ( Number 可选;默认值= 6379 ) 端口连接到 redis
      • 如果没有定义 rsmq 或者 redis 实例,options.options: ( Object 可选;默认值= {} ) 选项可以连接到 redis

    原始消息格式

    消息( 事件 data 或者 customExceedCheck 收到的比如 ) 包含以下键:

    • msg.message: ( String ) 队列消息内容。 你可以使用文本字符串JSON来使用复杂的内容。
    • msg.id: ( String ) rsmq内部消息 id
    • msg.sent: ( Number ) 消息发送/创建时的时间戳。
    • msg.fr: ( Number ) 消息第一次接收时的时间戳。
    • msg.rc: ( Number ) 这里消息接收的次数。

    方法

    .start()

    如果还没有将配置 autostart 定义为 true,则必须调用 .start() 方法。

    Return

    ( 自我): 用于链接的实例本身。

    .stop()

    只需停止接收间隔。 这不会切断与 rsmq/redis的连接。 如果你想让脚本结束调用 .quit()

    Return

    ( 自我): 用于链接的实例本身。

    .send( msg [, delay ][, cb ] )

    helper 函数只在已经配置的队列中发送消息。

    参数

    • msg: ( 需要 String ): rsmq消息最佳做法是,它是一个带附加数据的字符串字符串。
    • delay: ( Number 可选;默认值= 0 ): 要隐藏下一个 x 秒的消息的消息延迟。
    • cb: ( Function 可选): 为成功发送获得安全响应的可选回调。

    Return

    ( 自我): 用于链接的实例本身。

    .del( id [, cb ] )

    helper 函数在处理消息后简单删除消息。

    参数

    • id: ( 需要 String ): rsmq消息标识。
    • cb: ( Function 可选): 为成功删除获得安全响应的可选回调。

    Return

    ( 自我): 用于链接的实例本身。

    .changeInterval( interval )

    更改操作中的间隔超时。

    参数

    • interval: ( 需要 Number|Array ): 新间隔。

    Return

    ( 自我): 用于链接的实例本身。

    .quit()

    停止工作人员并关闭连接。 之后,再也不能再重用工人实例了。 它只是想杀死所有定时器和连接,这样你的脚本就会结束。

    .info( cb )

    获取当前队列属性。 这只是指向 rsmq.getQueueAttributes的快捷方式。

    参数

    • cb: ( Function ): 带回拨 ( err, attributes ) 有关详细信息,请参阅 rsmq文档

    .size( [hidden=false], cb )

    获取当前队列大小。

    参数

    • hidden: ( Boolean 可选;默认值= false ): 消息的计数,包括当前隐藏/"在飞行中"消息。
    • cb: ( Function 可选): 带回拨 ( err, size )size 是一个 Number,表示队列中的消息数。 如果 hidden=true,你将收到当前隐藏消息的数目。

    Return

    ( 自我): 用于链接的实例本身。

    事件

    message

    捕获和处理消息的主要事件。 如果不为此事件设置处理程序,则不会发生任何事情。

    示例:

    worker.on( "message", function( message, next, msgid ){
     // process message.. . next();
    });

    参数

    • 消息: ( String ) 要处理的队列消息内容。 你可以使用文本字符串JSON来使用复杂的内容。
    • :( Function ) 在处理消息时需要调用的函数。
      参数
      • delete: ( Boolean|Error 可选;默认值= true ) Error: 如果返回错误,它将作为错误事件发出,Boolean: 可以防止工作人员在结束时自动删除消息。 如果你想多次弹出邮件,这是很有用的。 要实现这里功能,请检查配置 options.customExceedCheck
    • msgid: ( String ) 消息标识。 如果你想手动删除消息,这很有用。

    ready

    直到工作线程连接到 rsmq/redis并已经用给定的queuename初始化时触发。

    data

    收到消息时的原始事件。

    参数

    • msg: ( String ) 原始消息。 ( 请参阅部分消息格式)

    deleted

    在删除邮件后激发。

    参数

    • 收费: ( String ) rsmq消息 id

    exceeded

    在超过消息后被激发并立即删除。

    参数

    • msg: ( String ) 原始消息。 ( 请参阅部分消息格式)

    timeout

    如果消息处理超过配置的超时,则激发。

    参数

    • msg: ( String ) 原始消息。 ( 请参阅部分消息格式)

    Error

    如果消息处理引发错误,则激发。

    参数

    • err: ( Error|Any ) 抛出抛出错误
    • msg: ( String ) 原始消息。 ( 请参阅部分消息格式)

    高级示例

    这是一个高级示例,展示了一些功能。

    var fs =require( "fs" );
     var RSMQWorker =require( "rsmq-worker" );
     varfnCheck=function( msg ){
     // check function to not exceed the message if the content is `createmessages`if( msg.message==="createmessages" ){
     returntrue }
     returnfalse }
     var worker =newRSMQWorker( "myqueue", {
     interval: [ .1, 1 ], // wait 100ms between every receive and step up to 1,3 on empty receives invisibletime:2, // hide received message for 5 sec maxReceiveCount:2, // only receive a message 2 times until delete autostart:true, // start worker on init customExceedCheck: fnCheck // set the custom exceed check });
     // Listen to errorsworker.on('error', function( err, msg ){
     console.log( "ERROR", err, msg.id );
     });
     worker.on('timeout', function( msg ){
     console.log( "TIMEOUT", msg.id, msg.rc );
     });
     // handle exceeded messages// grab the internal rsmq instancevar rsmq =worker._getRsmq();
     worker.on('exceeded', function( msg ){
     console.log( "EXCEEDED", msg.id );
     // NOTE: make sure this queue existsrsmq.sendMessage( "YOUR_EXCEEDED_QUEUE", msq, function( err, resp ){
     if( err ){
     console.error( "write-to-exceeded-queue", err )
     }
     });
     });
     // listen to messagesworker.on( "message", function( message, next, id ){
     console.log( "message", message );
     if( message ==="createmessages" ){
     next( false )
     worker.send( JSON.stringify( { type:"writefile", filename:"./test.txt", txt:"Foo Bar" } ) );
     worker.send( JSON.stringify( { type:"deletefile", filename:"./test.txt" } ) );
     return 
     }
     var _data =JSON.parse( message )
     switch( _data.type ){
     case"writefile": 
     fs.writeFile( _data.filename, _data.txt, function( err ){
     if( err ){
     next( err );
     }else{
     next()
     }
     });
     break;
     case"deletefile": 
     fs.unlink( _data.filename, function( err ){
     if( err ){
     next( err );
     }else{
     next()
     }
     });
     break;
     }
     });
     worker.send( "createmessages" );

    详细信息

    选项 interval

    选项 interval 可以:

    ) 为一个数字,以便worker在每秒( 轮询队列)。 interval:. 5 = 每秒两次)

    ) 为数字的一个 array。 在启动 interval[0] 是轮询队列的时间。 每个工作线程收到空响应 ( 队列为空) 下一个 interval 将用于等待下一次轮询,直到到达最后一个定义 interval[ n ] 为止。 在每个接收到的消息上,等待时间将重置为 interval[0]

    例如 interval: [. 2, 1, 3 ]

    • 第一次轮询-> 没有消息-> 等待 .2s = 200ms
    • 第二轮投票-> 无消息-> 等待 1s
    • 第三次轮询-> 没有消息-> 等待 3s
    • 第四次轮询-> 没有消息-> 等待 3s
    • 第五次轮询-> 1消息-> 等待 .2s
    • 第六轮投票-> 无消息-> 等待 1s
    • 第七个轮询-> 1消息-> 等待 .2s
    • 第八个轮询-> 1消息-> 等待 .2s
    • 第九个轮询-> 没有消息-> 等待 .2s
    • 第十个轮询-> 没有消息-> 等待 1s

    todos/想法

    • 更多测试 !
    • 可选的并行执行。并行执行多个接收。
    • 将超过消息的自动写入写入已经配置的队列。

    发布历史

    版本日期说明
    0.5.22016-10-24优化的自述文件和更新的依赖项
    0.5.12016-08-22固定重新连接错误 Issue#20。 感谢 mstduff ;更新的deps ;从 repo 中删除生成的代码文档
    0.5.02016-07-14添加了 .info(cb) ( Issue#17 ) 和 .size( [hidden,] cb )
    0.4.32016-06-20优化的事件侦听器 Issue#15。 感谢 Kevin。
    0.4.22016-05-06添加了 .quit() 函数 Issue#11。 感谢 Sam
    0.4.12016-04-05修正缺少的isNumber函数
    0.4.02016-03-30更新的依赖项( 特别是lodash至 4. x )。 修复了由 extendIssue#7 array 合并引起的配置 Bug。 感谢 Hanneman
    0.3.82015-11-04固定停止行为。Pull#5。 感谢 Exinferis
    0.3.72015-09-02在消息处理过程中添加了检查行为的测试;添加选项 alwaysLogErrors 以防止在连接错误事件处理程序时控制控制台日志。 问题 #3
    0.3.62015-09-02更新的依赖项;优化的自述文件( 感谢 Tobias的拉 #4
    0.3.52015-04-27再次。.send()的固定参数调度
    0.3.42015-04-27.send()的固定参数调度,并为 .del() 添加了可选的cb
    0.3.32015-03-27添加了 changeInterval 以修改操作中的间隔
    0.3.22015-02-23更改默认前缀/命名空间;
    0.3.02015-02-16现在可以作为 next的第一个参数返回错误。 这将导致错误发出+ 优化的自述文件
    0.2.22015-01-27添加了选项 defaultDelaysend 方法的优化参数;固定 travis.yml
    0.2.02015-01-27增加了超时,更好的错误处理和发送回调
    0.1.22015-01-20重新组织代码,添加代码文档和优化的自述文件
    0.1.12015-01-17添加了测试脚本和优化的存储库文件列表
    0.1.02015-01-16第一个工作和文档化版本
    0.0.12015-01-14初始提交

    NPM

    最初用生成器 mpnodemodule 生成。

    其他项目

    名称 描述
    rsmq一个基于Redis的简单消息队列
    rsmqrsmq的终端客户端
    英镑休息 rsmqREST接口用于。
    的redis通知基于redis的通知引擎。 它实现 rsmq-worker来安全地创建通知和定期报告。
    节点高速缓存简单和快速的NodeJS内部缓存。 内存缓存中的node 内部,如 memcached。
    的redis会话用于NodeJS和Redis的高级会话存储
    架构通过预定义架构验证对象的简单模块
    connect-redis-sessions一个连接或者表示中间件,只需使用 redis会话。 使用 redis会话,你可以处理每个的多个会话。
    systemhealthnode 模块运行你的计算机的简单自定义检查或者它的连接。 它将使用 redis心跳将当前状态发送到 redis。
    task-queue-worker一个功能强大的工具,用于通过制定标准的http请求来处理任务的后台处理。
    soyerSoyer是面向服务器端使用 node.js.的服务器端使用的小 lib
    grunt-soy-compile编译goggles关闭模板( 大豆) 模板inclding处理XLIFF语言文件。
    backlunr将 Backbone 集合与浏览器全文搜索引擎 Lunr.js 一起使用的解决方案

    麻省理工学院许可证( MIT )

    版权所有 © 2015 Mathias,http://www.tcs.de

    若要在取得该软件副本时免费授予任何人,如有下列条件的软件,请免费授予该软件的副本,并与相关的文档文件("软件") 进行许可,包括不受限制的权利,包括以下条件:

    上述版权声明和本许可声明须包括在所有的副本或实质性部分的软件。

    软件是"是",没有任何保证,表示或者隐含,包括但不限于销售,适合特定用途和 NONINFRINGEMENT。 作者或者版权持有人在合同。侵权或者它的他与软件或者它的他用户交易的行为。


    IMP  RED  Message  Redis  Helper  队列  
    相关文章