erlbus, 在Erlang中,编写了简单分布式和可以扩展的PubSub消息总线

分享于 

11分钟阅读

GitHub

  繁體 雙語
Erlang Message Event Bus
  • 源代码名称:erlbus
  • 源代码网址:http://www.github.com/cabol/erlbus
  • erlbus源代码文档
  • erlbus源代码下载
  • Git URL:
    git://www.github.com/cabol/erlbus.git
    Git Clone代码到本地:
    git clone http://www.github.com/cabol/erlbus
    Subversion代码到本地:
    $ svn co --depth empty http://www.github.com/cabol/erlbus
    Checked out revision 1.
    $ cd repo
    $ svn up trunk
    

    ErlBus Build Status

    用Erlang编写的消息/事件总线。

    PubSub内核是原始。杰出和经过验证的 PubSub层插件的克隆,但是在Erlang中编写。

    构建基于应用程序的软实时高可以伸缩消息传递的新方法,而不是集中式 !

    可以在这里找到文档

    另请参阅:西部。

    简介

    ErlBus 是一个简单而轻量级的库/工具,用于构建基于消息的应用程序。

    采用 ErlBus框架实现了收费实现,它提供了一个令人惊奇的。可以扩展的和经过验证的PubSub解决方案。 此外,英镑提供了一个可用的和简单的接口,在这个实现之上。

    你可以在这里阅读更多关于PubSub实现的信息。

    构建 ErlBus

    假设你有一个工作的Erlang安装( 18或者更高版本),那么构建的ErlBus 应该尽可能简单:

    
    $ git clone https://github.com/cabol/erlbus.git
    
    
    $ cd erlbus
    
    
    $ make
    
    
    
    

    快速入门示例

    使用 ebus 运行一个Erlang控制台:

     
    $ make shell
    
    
    
     

    一旦进入erlang控制台:

    % subscribe the current shell processebus:sub(self(), "foo").ok% spawn a processPid=spawn_link(fun() -> timer:sleep(infinity) end).<0.57.0>% subscribe spawned PIDebus:sub(Pid, "foo").ok% publish a messageebus:pub("foo", {foo, "hi"}).ok% check received message for Pidebus_proc:messages(Pid). 
    [{foo,"hi"}]% check received message for selfebus_proc:messages(self()). 
    [{foo,"hi"}]% unsubscribe selfebus:unsub(self(), "foo").ok% publish other messageebus:pub("foo", {foo, "hello"}).ok% check received message for Pidebus_proc:messages(Pid). 
    [{foo,"hi"},{foo,"hello"}]% check received message for self (last message didn't arrive)ebus_proc:messages(self()). 
    [{foo,"hi"}]% check subscribers (only Pid should be in the returned list)ebus:subscribers("foo").
    [<0.57.0>]% check topicsebus:topics().
    [<<"foo">>]% subscribe self to other topicebus:sub(self(), "bar").ok% check topicsebus:topics().
    [<<"bar">>,<<"foo">>]% publish other messageebus:pub("bar", {bar, "hi bar"}).ok% check received message for Pid (last message didn't arrive)ebus_proc:messages(Pid). 
    [{foo,"hi"},{foo,"hello"}]% check received message for selfebus_proc:messages(self()). 
    [{foo,"hi"},{bar,"hi bar"}]

    备注:

    • 你可能已经注意到,创建/删除主题并没有必要额外的步骤/调用,这是由 ebus 自动处理的,所以你不用担心它。

    现在,让我们更有趣,启动两个Erlang控制台,第一个:

    
    $ erl -name node1@127.0.0.1 -setcookie ebus -pa _build/default/lib/*/ebin -s ebus -config test/test.config
    
    
    
    

    第二个:

    
    $ erl -name node2@127.0.0.1 -setcookie ebus -pa _build/default/lib/*/ebin -s ebus -config test/test.config
    
    
    
    

    然后我们需要做的是将这些Erlang节点放在群集中,所以从它的中任何一个发送ping到另一个:

    % From node1 ping node2net_adm:ping('node2@127.0.0.1').pong

    很好,我们有集群中的两个节点,感谢发布Erlang的美丽。 那么,让我们重复 上面 练习,但现在在两个节点中。

    node1 中,为某个主题创建处理程序和订阅:

    % create a callback fun to use ebus_proc utilityCB1=fun(Msg) ->
     io:format("CB1: ~p~n", [Msg])end#Fun<erl_eval.6.54118792>% other callback but receiving additional arguments,% which may be used when message arrivesCB2=fun(Msg, Args) ->
     io:format("CB2: Msg: ~p, Args: ~p~n", [Msg, Args])end.
    #Fun<erl_eval.12.54118792>% use ebus_proc utility to spawn a handlerH1=ebus_proc:spawn_handler(CB1).<0.70.0>H2=ebus_proc:spawn_handler(CB2, ["any_ctx"]).<0.72.0>% subscribe handlersebus:sub(H1, "foo").okebus:sub(H2, "foo").ok

    node2 中重复相同的东西 上面。

    在两个节点中订阅了同一频道后,可以从任何 node 发布一些消息:

    % publish messageebus:pub("foo", {foo, "again"}).CB1: {foo,"again"}CB2: Msg: {foo,"again"}, Args: "any_ctx"ok

    在其他 node 中,你将看到这些消息也到达了:

    CB1: {foo,"again"}CB2: Msg: {foo,"again"}, Args: "any_ctx"

    让我们在任何Erlang控制台中检查订阅服务器:

    % returns local and remote subscribersebus:subscribers("foo").
    [<7023.67.0>,<7023.69.0>,<0.70.0>,<0.72.0>]

    你还可以检查测试了解更多关于使用 ebus的信息。

    到目前为止我们继续 !

    Point-To-Point示例

    最重要的是,你不需要特殊的东西来实现point-to-point行为。 它就像这样简单:

    ebus:dispatch("topic1", #{payload=>"M1"}).

    分派函数获取订阅者,然后选择其中一个发送消息。 你可以提供一个调度函数来获取订阅服务器,否则提供了( 随机选取订阅者)的默认函数。

    调度功能有 3种不同的风格:

    • ebus:dispatch/2: 接收主题和消息。
    • ebus:dispatch/3: 接收主题,消息和选项列表。
    • ebus:dispatch/4: 与以前相同,但作为 1st 参数接收服务器的NAME,它默认位于其他函数中。

    调度选项包括:

    • {scope, local | global} :允许你选择是否要选择本地订阅服务器。 默认值:local
    • {dispatch_fun, fun(([term()]) -> term())} :获取订阅服务器的函数。 如果未提供,则提供默认的随机函数。

    要了解这个函数是如何实现的,请在这里执行

    让我们看看一个例子:

    % subscribe local processebus:sub(self(), "foo").ok% spawn a processPid=spawn_link(fun() -> timer:sleep(infinity) end).<0.57.0>% subscribe spawned PIDebus:sub(Pid, "foo").ok% check that we have two subscribersebus:subscribers("foo").
    [<0.57.0>,<0.38.0>]% now dispatch a message (default dispatch fun and scope)ebus:dispatch("foo", #{payload=>foo}).ok% check that only one subscriber received the messageebus_proc:messages(self()).
    [#{payload=>foo}]ebus_proc:messages(Pid).
    []% dispatch with optionsFun=fun([H | _]) -> Hend.
    #Fun<erl_eval.6.54118792>ebus:dispatch("foo", <<"M1">>, [{scope, global}, {dispatch_fun, Fun}]).ok% check againebus_proc:messages(self()). 
    [#{payload=>foo}]ebus_proc:messages(Pid). 
    [<<"M1">>]

    非常简单?

    分布式 ErlBus

    ErlBus distributed,它不需要任何额外/神奇的东西。

    一旦你拥有一个Erlang集群,消息将使用 PG2 ( 即默认的PubSub适配器) 广播。 要记住,它是一个 Phoenix 克隆,所以架构和设计是一样的。

    Phoenix频道支持在PubSub层,这是核心。 看一下这个博客帖子。

    示例

    请参见示例

    运行测试

     
    $ make tests
    
    
    
     

    Edoc

     
    $ make doc
    
    
    
     

    注意:运行前一条命令后,将创建一个新的文件夹 doc,你将拥有非常漂亮的HTML文档。

    ErlBus配置文件

    目前为止,提供的唯一附加概要文件是 debug,因为 default 概要文件足以完成所有构建和测试任务。

    调试配置文件

    ErlBus 提供了在调试配置文件中编译和运行 ebus的机会。 在这种模式下,将获取额外的监视,调试和测试依赖项:

    • 侦察: 在生产中调试Erlang的函数和脚本的集合。
    • eper: 性能相关工具的集合(。redbugdtopntopatop )。

    要在启用调试配置文件的情况下运行 ebus:

    
    $ make REBAR_PROFILE=debug shell
    
    
    
    

    现在你可以像希望的那样使用 reconeper 来监视和调试 ebus

    更改日志

    所有注意到这个项目的变更将在 CHANGELOG.md 文档。

    版权和许可证

    原始工作版权( c ) 2014 Chris McCord

    ErlBus 源代码是在 MIT许可证许可的。

    注意:: pub/亚实现取自 Phoenix框架


    SCA  Scala  Message  分布式  Erlang  总线  
    相关文章