基于java的示例使用Kafka Consumer,Producer和Streaming API - 代码示例

分享于 

5分钟阅读

Microsoft

  繁體
浏览代码下载ZIP

该存储库中的示例演示了如何将Kafka Consumer,Producer和Streaming API与HDInsight集群上的Kafka一起使用。

此存储库中包括两个项目:

Producer-Consumer:这包含使用名为test的Kafka主题的生产者和消费者。

stream:它包含使用Kafka stream API (在Kafka 0.10.0或更高版本中)的应用程序,它从test主题中读取数据,将数据拆分为单词。

注意:这两个项目均假定使用Kafka 0.10.0,该版本随HDInsight群集版本3.6上的Kafka一起提供。

生产者和消费者

要运行使用者和生产者示例,请使用以下步骤:

将存储库分克隆到开发环境。

安装Java JDK 8或更高版本,这是用Oracle Java 8测试的,但是应该在像OpenJDK这样的环境中工作。

安装Maven

assum Java和Maven都在路径中,并且所有内容都为JAVA_HOME配置好了,使用以下命令构建使用者和生产者示例:


 cd Producer-Consumer
 mvn clean package

名为的文件kafka-producer-consumer-1.0-SNAPSHOT.jartarget目录中可用。

使用SCP将文件上载到Kafka集群:


 scp ./target/kafka-producer-consumer-1.0-SNAPSHOT.jar SSHUSER@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar

将SSHUSER替换为集群的SSH用户,并将CLUSTERNAME替换为集群的名称,当提示输入SSH用户的密码时。

使用SSH连接到集群:


 ssh USERNAME@CLUSTERNAME

在SSH会话中使用以下命令获取集群的Zookeeper主机和Kafka代理,使用Kafka时需要此信息,注意,JQ也被安装,因为它使得解析从Ambari返回的JSON更容易,用集群的登录(管理员)密码替换PASSWORD,将KAFKANAME替换为Kafka在HDInsight集群上的名称。


 sudo apt -y install jq
 export KAFKAZKHOSTS=`curl -sS -u admin:$PASSWORD -G https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`

 export KAFKABROKERS=`curl -sS -u admin:$PASSWORD -G https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`

使用以下命令验证环境变量是否已正确填充:


 echo '$KAFKAZKHOSTS='$KAFKAZKHOSTS
 echo '$KAFKABROKERS='$KAFKABROKERS

以下是$KAFKAZKHOSTS的内容的示例:


 zk0-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181,zk2-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181

以下是$KAFKABROKERS的内容的示例:


 wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092,wn0-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092

注意:在集群上执行缩放操作时,这些信息改变,因为这会添加和删除worker节点,在使用Kafka之前,应该始终检索Zookeeper和Broker信息。

此示例使用名为test的主题,使用以下方法创建此主题:


 /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS

使用生产者-消费者示例将记录写入主题:


java -jar kafka-producer-consumer.jar producer test $KAFKABROKERS

计数器显示已写入的记录数。

使用生产者-消费者读取刚刚写入的记录:


java -jar kafka-producer-consumer.jar consumer test $KAFKABROKERS

返回随机句子的列表,以及读取的次数的计数。


JAVA  API  BASE  str  sam  CONS