kafka, 在围棋中,Kafka的生产者和消费者

分享于 

5分钟阅读

GitHub

  繁體 雙語
Producer and Consumer for Kafka in Go
  • 源代码名称:kafka
  • 源代码网址:http://www.github.com/jdamick/kafka
  • kafka源代码文档
  • kafka源代码下载
  • Git URL:
    git://www.github.com/jdamick/kafka.git
    Git Clone代码到本地:
    git clone http://www.github.com/jdamick/kafka
    Subversion代码到本地:
    $ svn co --depth empty http://www.github.com/jdamick/kafka
    Checked out revision 1.
    $ cd repo
    $ svn up trunk
    

    Build Status

    :Go中的发布者&消费程序

    Join the chat at https://gitter.im/jdamick/kafka

    Kafka 是一个分布式发布订阅消息系统: ( http://kafka.apache.org )

    转到语言:( http://golang.org/ )

    对于 Kafka 0.8. x,请看一下 https://github.com/Shopify/sarama

    更改

    2015年05月

    • 固定 Bug 在接收响应结束时处理部分消息,当负载为 <4字节时
    • 如果正在读取 Kafka 日志段,则尝试从最早的可用偏移量恢复使用者

    2015年04月

    • 增加了对快速压缩的支持
    • 在每次提取响应结束时固定处理部分消息
    • 在发布服务器中添加了 ProduceFromChannel() 方法,以镜像用户中的ConsumeOnChannel() 方法
    • 将退出通道类型更改为空结构 {},增加了在无需竞争条件的情况下停止使用者的能力
    • 在 BatchPublish ( ) 中重用连接,而不是每次都建立一个全新。
    • 在代码上应用 gofmt/golint ( 将 Id() 重命名为 ID() 以进行合规性)
    • 添加的评论
    • 更好地区分调试和错误日志,以及如何使消费者在最大提取尺寸过小时取消。

    2013年04月

    • 从jira中的apache存储库&未完成补丁合并回来

    启动并运行

    安装( 版本 1 ):
    有关更多信息,请参见:http://weekly.golang.org/doc/install.html#install

    确保正确设置GOROOT的( http://golang.org/doc/install.html#environment )。 还可以适当设置你的GOPATH: http://weekly.golang.org/doc/code.html#tmp_13

    从源生成:

    make kafka
    制作工具( 发布者&使用者)
    make tools
    启动动物园管理员,Kafka 服务器
    有关 Kafka的更多信息,请参见: http://incubator.apache.org/kafka/quickstart.html

    命令行工具

    启动使用者:

    
    
    
    
     $GOPATH/bin/consumer -topic test -consumeforever
    
    
     Consuming Messages :
    
    
     From: localhost:9092, topic: test, partition: 0
    
    
     ---------------------- 
    
    
    
    

    现在,消费者将只是轮询,直到收到消息。

    发布消息:

    
    
    
    
     $GOPATH/bin/publisher -topic test -message"Hello World"
    
    
    
    

    消费者应该输出消息。

    API用法

    发布

    
    
    
    
    
    broker := kafka.NewBrokerPublisher("localhost:9092","mytesttopic", 0)
    
    
    broker.Publish(kafka.NewMessage([]byte("testing 1 2 3")))
    
    
    
    
    

    发布压缩消息

    
    
    
    
    
    broker := kafka.NewBrokerPublisher("localhost:9092","mytesttopic", 0)
    
    
    broker.Publish(kafka.NewCompressedMessage([]byte("testing 1 2 3")))
    
    
    
    
    

    客户

    
    
    
    
    broker := kafka.NewBrokerConsumer("localhost:9092","mytesttopic", 0, 0, 1048576)
    
    
    broker.Consume(func(msg *kafka.Message) { msg.Print() })
    
    
    
    
    

    或者,使用者可以使用基于通道的方法:

    
    
    
    
    broker := kafka.NewBrokerConsumer("localhost:9092","mytesttopic", 0, 0, 1048576)
    
    
    go broker.ConsumeOnChannel(msgChan, 10, quitChan)
    
    
    
    
    

    消耗偏移量

    
    
    
    
    broker := kafka.NewBrokerOffsetConsumer("localhost:9092","mytesttopic", 0)
    
    
    offsets, err := broker.GetOffsets(-1, 1)
    
    
    
    

    联系人

    jeffreydamick ( at ) gmail ( 圆点)

    http://twitter.com/jeffreydamick

    隆重感谢 NeuStar 赞助这里工作。


    相关文章