Kafka Demo Record

这篇博客用来根据师兄做的一个基于Kafka队列的消息产生、传输、存储模型,记录一些关于Kafka基本架构,命令使用,代码开发的东西。另外,还有使用Gobblin工具从kafka队列获取数据保存在本地或者分布式文件系统HDFS和Hive中的过程。

1 Kafka基本架构

首先,Kafka可以运行在单机或者分布式集群中,然后,数据流或者消息队列是按照Topic的概念进行存储的,每定义一个topic代表一个队列,队列中的每条记录包含一个key,value和时间戳。
基本模型是一个生产者和消费者模型,对于开发者而言,API包括四个部分:

  • The Producer API allows an application to publish a stream of records to one or more Kafka topics.
  • The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.
  • The Streams API allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams.
  • The Connector API allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems. For example, a connector to a relational database might capture every change to a table.
对于Kafka topic使用partition进行分区,官网的说明是:
The partitions in the log serve several purposes. First, they allow the log to scale beyond a size that will fit on a single server. Each individual partition must fit on the servers that host it, but a topic may have many partitions so it can handle an arbitrary amount of data. Second they act as the unit of parallelism—more on that in a bit.
详细的再看看论文。

每一个consumer从队列中获取数据都独立维护一个offset,也就是说它可以自己任意选择要获取队列中什么位置的消息。

看过论文后在做补充。

2 Kafka使用的一些命令

Kafka运行在zookeeper下,先启动zookeeper,在启动Kafka。

$ bin/kafka-server-start.sh config/server.properties
新建topic
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
list it
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
发送消息
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
^C
创建一个consumer
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

3 demo的命令

这个demo包含三个Kafka模块,运行在一个zookeeper下:Kafka,Kafka1,Kafka2.
SourceToLKafka会向两个局部的Kafka队列jiangsu和shandong中生产消息,LKafkaToGKafka从两个局部Kafka拉取消息推到全局topic global中。
对于打包好的jar包,可以使用命令:
$ java -cp kafka-hierarchic-1.0-SNAPSHOT-jar-with-dependencies.jar Demo.SourceToLKafka
$ java -cp kafka-hierarchic-1.0-SNAPSHOT-jar-with-dependencies.jar Demo.LKafkaToGKafka

4 Gobblin拉取数据到本地

gobbin可以将Kafka中的数据拉取到本地文件系统或者分布式文件系统如HDFS中
使用gobblin-Kafka要设置配置文件目录job-conf-dir和work目录,job-conf-dir里包含一个作业配置的文件,用于作业的执行时间、频率,Kafka broker topic等的配置,work目录在运行前创建,主要用于该任务的作业输出,日志文件输出等。本次实验的配置文件在gobblin-dist/job-config目录下。
使用gobblin获取global topic里的数据命令是:
$ bin/gobblin-standalone.sh start

会在work/job-out/global目录下输出结果。





评论