couchbase-kafka-connector, 旧版Couchbase到 Kafka 连接器,由 Kafka 连接取代

分享于 

5分钟阅读

GitHub

  繁體 雙語
A Couchbase to Kafka connector.
  • 源代码名称:couchbase-kafka-connector
  • 源代码网址:http://www.github.com/couchbase/couchbase-kafka-connector
  • couchbase-kafka-connector源代码文档
  • couchbase-kafka-connector源代码下载
  • Git URL:
    git://www.github.com/couchbase/couchbase-kafka-connector.git
    Git Clone代码到本地:
    git clone http://www.github.com/couchbase/couchbase-kafka-connector
    Subversion代码到本地:
    $ svn co --depth empty http://www.github.com/couchbase/couchbase-kafka-connector
    Checked out revision 1.
    $ cd repo
    $ svn up trunk
    
    这里项目已经过时

    这个项目被 kafka-connect-couchbase 取代,它与 Kafka Connect框架集成。

    Couchbase Kafka 连接器

    欢迎使用官方 Couchbase Kafka 连接器 ! 它提供了将事件流从Couchbase服务器定向到 Kafka的功能。

    你可以阅读快速入门指南 below 或者参考以下文档: http://developer.couchbase.com/documentation/server/4.1/connectors/kafka-2.0/kafka-intro.html

    问题追踪器可以在 https://issues.couchbase.com/browse/KAFKAC 找到。

    快速入门

    示例 build.gradle:

    apply plugin: 'java'repositories {
     mavenCentral()
     maven { url { "http://files.couchbase.com/maven2" } }
     mavenLocal()
    }
    dependencies {
     compile(group: 'com.couchbase.client', name: 'kafka-connector', version: '2.0.1')
    }

    使用这个库很容易。 假设我们想接收来自Couchbase服务器的每个修改,并且只发送到 Kafka 文档主体( 默认情况下,连接器将文档正文和元数据序列化为 JSON )。 要实现这一点,你需要定义一个过滤器类,该过滤器类只允许 MutationMessage的实例通过:

    packageexample;importcom.couchbase.client.core.message.dcp.MutationMessage;importcom.couchbase.kafka.DCPEvent;importcom.couchbase.kafka.filter.Filter;publicclassSampleFilterimplementsFilter {
     @Overridepublicbooleanpass(finalDCPEventdcpEvent) {
     return dcpEvent.message() instanceofMutationMessage;
     }
    }

    你还需要一个编码器类,它将文档值转换为字节 array:

    packageexample;importcom.couchbase.client.core.message.dcp.MutationMessage;importcom.couchbase.client.deps.io.netty.util.CharsetUtil;importcom.couchbase.kafka.DCPEvent;importcom.couchbase.kafka.coder.AbstractEncoder;importkafka.utils.VerifiableProperties;publicclassSampleEncoderextendsAbstractEncoder {
     publicSampleEncoder(finalVerifiablePropertiesproperties) {
     super(properties);
     }
     @Overridepublicbyte[] toBytes(finalDCPEventdcpEvent) {
     MutationMessage message = (MutationMessage)dcpEvent.message();
     return message.content().toString(CharsetUtil.UTF_8).getBytes();
     }
    }

    这基本上足以设置 couchbase kafka桥:

    packageexample;importcom.couchbase.kafka.CouchbaseKafkaConnector;importcom.couchbase.kafka.CouchbaseKafkaEnvironment;importcom.couchbase.kafka.DefaultCouchbaseKafkaEnvironment;publicclassExample {
     publicstaticvoidmain(String[] args) {
     DefaultCouchbaseKafkaEnvironment.Builder builder = (DefaultCouchbaseKafkaEnvironment.Builder) DefaultCouchbaseKafkaEnvironment. builder()
    . kafkaFilterClass("example.SampleFilter")
    . kafkaValueSerializerClass("example.SampleEncoder")
    . kafkaTopic("default")
    . kafkaZookeeperAddress("kafka1.vagrant")
    . couchbaseNodes("couchbase1.vagrant")
    . couchbaseBucket("default")
    . dcpEnabled(true);
     CouchbaseKafkaConnector connector =CouchbaseKafkaConnector.create(builder.build());
     connector.run();
     }
    }

    也可以从某个已知的状态开始,也可以观察有限的分区集。 示例 below 将只从起始( 请参见 currentState()loadState() 帮助器) 开始流 115.

    ConnectorState startState = connector.startState(115);ConnectorState endState = connector.endState(115);
    connector.run(startState, endState);

    couchbase1.vagrantkafka1.vagrant 地址 上面 分别是Couchbase服务器和 Kafka的位置,可以使用 env/ 目录中的配置脚本轻松设置。 只要在那里导航并运行 vagrant up。 使用 Ansible (。安装指南)的Vagrant脚本。

    许可证

    版权所有 2015 Couchbase公司。

    在Apache许可下许可,版本 2.0.

    请参见Apache许可协议中的


    BASE  SUP  Super  kafka  传统  连接器  
    相关文章