登录  | 加入社区

黑狼游客您好!登录后享受更多精彩

只需一步,快速开始

新浪微博登陆

只需一步, 快速开始

查看: 441|回复: 0

Kafka实战条记

[复制链接]

172

主题

172

帖子

0

现金

黑狼菜鸟

Rank: 1

积分
0
发表于 2019-2-28 19:22:17 | 显示全部楼层 |阅读模式 来自 中国
单机版搭建

kafka的运行必要提前配好Java 情况,笔者的是 java version "1.8.0_201"
第一步  下载步伐

下载源码 此处用的是2.11版本
解压
[root@cluster01:opt] # tar -xzf kafka_2.11-2.1.0.tgz
[root@cluster01:opt] # cd kafka_2.11-2.1.0第二步  启动服务

开启zookeeper, kafka的运行必要zookeeper
[root@cluster01:kafka_2.11-2.1.0] # /opt/Apache/zookeeper-3.4.10/bin/zkServer.sh start
大概可以利用kafka自带的zookeeper
[root@cluster01:kafka_2.11-2.1.0] # bin/zookeeper-server-start.sh config/zookeeper.properties运行kafka步伐
[root@cluster01:kafka_2.11-2.1.0] # bin/kafka-server-start.sh config/server.properties
ajj4uffdF3SSTTLF.jpg

第三步  创建Topic


创建名为 test3 且分区数为1,重复因子为1的 topic3
[root@cluster01:kafka_2.11-2.1.0] # bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test3
Created topic "test3".
rUvXmhW0XZ5H66G0.jpg

检察创建的 topic

[root@cluster01:kafka_2.11-2.1.0] # bin/kafka-topics.sh --list --zookeeper localhost:2181
KAFKA-STORM
__consumer_offsets
connect-test
first
test
test2

第四步  发送消息

只用kafka自带的生产者脚本发送消息
[root@cluster02:kafka_2.11-2.1.0] # bin/kafka-console-producer.sh --broker-list cluster02:9092 --topic test
>hello
>hu
Rw1Q881KjQv6Bj0l.jpg

第五步  消耗消息


利用kafka自带的消耗者脚本消耗消息
[root@cluster02:kafka_2.11-2.1.0] # bin/kafka-console-consumer.sh --bootstrap-server   cluster02:9092 --topic test --from-beginning
hello
hunote : 应当利用 --bootstrap-server  参数,之前的版本大概利用 --zookeeper , --from-beginning 参数指明从最初的消息开始读取(消耗)
n4U3cJyu2kpaujuh.jpg

至此,单机版 kafka 已经尝鲜完毕,总结就是下载步伐安装,开启zookeeper,kafka,创建topic,生产消息,消耗消息


集群版搭建

这里的集群情况是三台CentOS 7,VMware假造机
otWzHB9fH9RdhQzb.jpg

第一步  下载步伐


下载源码 此处用的是2.11版本在三台CentOS 7服务器中分别下载好kafka步伐包,如不想利用自带的zookeeper可以本身下载安装,笔者的三台服务器中之前已经安装好了
解压
[root@cluster01:opt] # tar -xzf kafka_2.11-2.1.0.tgz
[root@cluster01:opt] # cd kafka_2.11-2.1.0第二步  修改主机名【可跳过】

登录主机之后利用 hostname 下令修改主机名,如许好区分三台主机,固然,不修改也无伤风雅
# 第一台
[root@localhost:~] # hostname cluster01
# 第二台
[root@localhost:~] # hostname cluster02
# 第三台
[root@localhost:~] # hostname cluster03设置 hosts
为了不每次指定的服务器的时间都利用 IP 指定,可以在这三台服务器和外部主机的 hosts 文件中指定名称
我的外部主机利用的是 Ubuntu 18.10
起首利用 ifconfig/ ip address 下令检察CentOS7服务器的 IP 地点

然后修改各自的 hosts 文件
[root@sairoPC:sairo] # vim /etc/hosts
......
192.168.67.129  cluster01
192.168.67.130  cluster02
192.168.67.131  cluster03
......别的CentOS7自带的tty终端界面感觉有些貌寝,Ubuntu的用户可以利用 ssh 长途毗连下令,大概利用 Terminus 长途毗连工具,Windows 的用户可以实验 XShell , XFtp
由于我们的假造机的IP是根据DHCP协议动态分配的,以是有的时间IP肯能会主动改变,这个是可以:

  • 修改hosts文件
  • 固定服务器的IP分配计谋,使其静态分配,详细方法可参考笔者的另一篇博客


第三步  修改设置文件

kafka集群搭建必须要修改以下三项
broker.id=1# 必须唯一标识broker
listeners=listeners = listener_name://host_name:port
log.dirs=/tmp/kafka-logs-1# 日记目次我的修改如下
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1# 修改,各个服务器中的数值必须差别以标识broker

########################## enable delete topic ###########################
delete.topic.enable=true # 添加此项
......
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://cluster01:9092# 添加此项,此处的cluster01对应于详细的主机名
......
# A comma separated list of directories under which to store log files
log.dirs=/opt/Apache/kafka_2.11-2.1.0/logs# 修改此项,留意这个logs 目次事先不存在,需自行创建,也可以随意指定
......
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=cluster01:2181,cluster02:2181,cluster03:2181# 指定zookeeper,cluster*对应hosts文件中ip对应# 的名称,读者按需修改
至此,kafka的集群的搭建已经完成,必要提示的是kafka集群的运行必要zookeeper和谐, 至于zookeeper集群的搭建读者可以谷歌搜刮相干资料。大概直接利用kafka自带的zookeeper插件,不消再搭建zookeeper集群。笔者偶然间也会再写一篇zookeeper集群情况搭建的文章。


第四步  启动kafka

进入到kafka目次
[root@cluster02:~] # cd /opt/Apache/kafka_2.11-2.1.0/
[root@cluster02:kafka_2.11-2.1.0] # ls
bin  config  libs  LICENSE  logs  NOTICE  site-docs
qhBPJZ4jjCvMumhl.jpg

以下的操纵在三台服务器中是雷同的,节流篇幅只列出一台操纵过程,三台服务器的操纵不分先后

先启动zookeeper, 这里利用的黑白内置的zookeeper,
[root@cluster03:kafka_2.11-2.1.0] # /opt/Apache/zookeeper-3.4.10/bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /opt/Apache/zookeeper-3.4.10/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@cluster03:kafka_2.11-2.1.0] # /opt/Apache/zookeeper-3.4.10/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/Apache/zookeeper-3.4.10/bin/../conf/zoo.cfg
Mode: follower
[root@cluster03:kafka_2.11-2.1.0] # 在下令行实行如下下令以启动kafka
[root@cluster02:kafka_2.11-2.1.0] # bin/kafka-server-start.sh config/server.properties
[2019-02-26 21:58:44,997] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2019-02-26 21:58:45,390] INFO starting (kafka.server.KafkaServer)
[2019-02-26 21:58:45,391] INFO Connecting to zookeeper on cluster01:2181,cluster02:2181,cluster03:2181 (kafka.server.KafkaServer)
......请留意,启动kafka的脚本是kafka-server-start.sh,参数是设置文件 server.properties(这个文件名不是固定的) ,以是每一个设置文件对应于一个kafka的broker, 以是读者假如想在一台服务器上搭建多个kafka节点,可以创建多个设置文件,修改此中的设置参数,然后用kafka-server-start.sh脚本启动,至于怎样修改请参考第三步和官方教程


第五步  创建一个topic

[root@cluster01:kafka_2.11-2.1.0] # bin/kafka-topics.sh --create --zookeeper cluster01:2181 --replication-factor 3 --partitions 1 --topic test4
Created topic "test4".
[root@cluster01:kafka_2.11-2.1.0] # bin/kafka-topics.sh --list --zookeeper cluster01:2181
KAFKA-STORM
__consumer_offsets
connect-test
first
test
test2
test3
test4
[root@cluster01:kafka_2.11-2.1.0] # 第六步  启动生产者

[root@cluster01:kafka_2.11-2.1.0] # bin/kafka-console-producer.sh --broker-list cluster01:9092 --topic test4
>1
>2
>
>3
>第七步  启动消耗者

在另一台呆板上
[root@cluster02:kafka_2.11-2.1.0] # bin/kafka-console-consumer.sh --bootstrap-server cluster02:9092 --from-beginning --topic test4
1
2

3看,集群的结果出来了
以上的演示是kafka从生产者生产消息然后消耗消息,但现实上消息的泉源是多方面的,可以从数据库获取,当地文件获取。kafka自己也提供一个工具 connect-standalone.sh,有爱好的读者可以看看,其官网有提供示例。


编程案例

接下来我们用代码实现生产者生产消息,消耗者消耗消息


KafkaProducer API

KafkaProducer API的焦点部门是KafkaProducer类。KafkaProducer类提供了一个选项,用于在其构造函数中利用以下方法毗连Kafka broker。

  • KafkaProducer类提供send方法以异步方式topic发送消息。send() 的署名如下
producer.send(new ProducerRecord(topic, partition, key1, value1) , callback);

  • ProducerRecord - 生产者管理等候发送的记载缓冲区。
  • Callback - 用户提供的回调,在服务器确认记载时实行(null表现没有回调)。
  • KafkaProducer类提供了一个flush方法,以确保全部先前发送的消息都已现实完成。flush方法的署名如下:
public void flush()

  • KafkaProducer类提供partitionFor方法,该方法有助于获取给定主题的分区元数据。这可以用于自界说分区。这种方法的署名如下:
public Map metrics()它返回生产者维护的内部metrics的映射。
public void close() - KafkaProducer类提供关闭方法块,直到完成全部先前发送的哀求。


Producer API

Producer API的焦点部门是Producer类。Producer类提供了一个通过以下方法在其构造函数中毗连Kafka broker 的选项。
The Producer Class

生产者类提供send方法,利用以下方法将消息发送到单个或多个主题。
public void send(KeyedMessaget message)
//sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(Listmessages)
//sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);生产者有两种范例 ---“同步”和“异步”
同样的API设置也实用于Sync生产者。它们之间的区别是同步型生产者直接发送消息,但是在背景发送。当您必要更高的吞吐量时,首选异步天生器。在0.8之前的版本中,异步生产者没有send()的回调来注册错误处置惩罚步伐。这仅在当前版本的0.9中可用。
public void close()Producer类提供close方法来关闭与全部Kafka broker 的生产者池毗连。


Configuration Settings

下表列出了Producer API的重要设置设置,以便更好地相识
S.NoConfiguration Settings and Description1client.id                                                                                                                                                                              identifies producer application2producer.type                                                                                                                                                                       either sync or async3acks                                                                                                                                                                                       The acks config controls the criteria under producer requests are con-sidered complete.4retries                                                                                                                                                                                      If producer request fails, then automatically retry with specific value.5bootstrap.servers                                                                                                                                     bootstrapping list of brokers.6linger.ms                                                                                                                                                                                 if you want to reduce the number of requests you can set linger.ms to something greater than some value.7key.serializer                                                                                                                                                                      Key for the serializer interface.8value.serializer                                                                                                                                                                      value for the serializer interface.9batch.size                                                                                                                                                                                     Buffer size.10buffer.memory                                                                                                                                                                      controls the total amount of memory available to the producer for buff-ering.

ProducerRecord API

ProducerRecord是一个键/值对,发送到Kafka cluster。ProducerRecord用于构造包罗分区,键和值对的记载。
public ProducerRecord (string topic, int partition, k key, v value)

  • Topic − 用户界说的主题名称将附加到记载。
  • Partition − 分区计数
  • Key − 将包罗在记载中的key
  • Value − 记载内容
public ProducerRecord (string topic, k key, v value)ProducerRecord类构造函数用于创建具有键/值对且没有分区的记载。

  • Topic − 创建 topic 以分配记载。
  • Key − 记载的 key
  • Value − 记载内容
public ProducerRecord (string topic, v value)

  • Topic − 创建主题
  • Value − 记载内容
ProducerRecord类方法列表
S.NoClass Methods and Description1public string topic()                                                                                                                                                      Topic will append to the record.2public K key()                                                                                                                                                                                       Key that will be included in the record. If no such key, null will be returned here.3public V value()                                                                                                                                                                        Record contents.4partition()                                                                                                                                                                                     Partition count for the record

停止现在,我们已经创建了一个生产者来向Kafka集群发送消息。如今让我们创建一个消耗者来消耗Kafka集群中的消息。KafkaConsumer API用于利用来自Kafka聚集的消息。KafkaConsumer类构造函数界说如下:
public KafkaConsumer(java.util.Map configs)

  • configs  - 返回消耗者设置图。


KafkaConsumer

KafkaConsumer类具有以下紧张方法,如下表所示:
S.NoMethod and Description1public java.util.Set assignment()                                                                                          Get the set of partitions currently assigned by the con-sumer.2public string subscription()                                                                                                                                                 Subscribe to the given list of topics to get dynamically as-signed partitions.3public void subscribe(java.util.List topics, ConsumerRebalanceListener listener)                                                                                                                                                                                      Subscribe to the given list of topics to get dynamically as-signed partitions.4public void unsubscribe()                                                                                                                                                Unsubscribe the topics from the given list of partitions.5public void subscribe(java.util.List topics)                                                                                                                          Subscribe to the given list of topics to get dynamically as-signed partitions. If the given list of topics is empty, it is treated the same as unsubscribe().6public void subscribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener)                                                  The argument pattern refers to the subscribing pattern in the format of regular expression and the listener argument gets notifications from the subscribing pattern.7public void assign(java.util.List partitions)                                                                               Manually assign a list of partitions to the customer.8poll()                                                                                                                                                                                                 Fetch data for the topics or partitions specified using one of the subscribe/assign APIs. This will return error, if the topics are not subscribed before the polling for data.9public void commitSync()                                                                                                                                                     Commit offsets returned on the last poll() for all the sub-scribed list of topics and partitions. The same operation is applied to commitAsyn().10public void seek(TopicPartition partition, long offset)                                                                                                          Fetch the current offset value that consumer will use on the next poll() method.11public void resume()                                                                                                                                                                    Resume the paused partitions.12public void wakeup()                                                                                                                                                            Wakeup the consumer.ConsumerRecord API

ConsumerRecord API用于吸收来自Kafka聚集的记载。此API包罗主题名称,分区号,从中吸收记载以及指向Kafka分区中记载的偏移量。ConsumerRecord类用于创建具有特定主题名称,分区计数和对的利用者记载。它有以下署名:
public ConsumerRecord(string topic,int partition, long offset,K key, V value)

  • Topic - 从Kafka聚集收到的利用者记载的主题名称。
  • Partition - 主题的分区。
  • Key - 记载的密钥,假如没有key,则返回null。
  • Value - 记载内容。
ConsumerRecords API

ConsumerRecords API充当ConsumerRecord的容器。此API用于保存特定主题的每个分区的ConsumerRecord列表。其构造函数界说如下。
public ConsumerRecords(java.util.Map>> records)

  • TopicPartition  - 返回特定主题的分区映射。
  • Records = ConsumerRecord的返回列表。
ConsumerRecords类界说了以下方法:
S.NoMethods and Description1public int count()                                                                                                                                                                             The number of records for all the topics.2public Set partitions()                                                                                                                                                                      The set of partitions with data in this record set (if no data was returned then the set is empty).3public Iterator iterator()                                                                                                                                                               Iterator enables you to cycle through a collection, obtaining or re-moving elements.4public List records()                                                                                                                                                                                 Get list of records for the given partition.Configuration Settings

S.NoSettings and Description1bootstrap.servers                                                                                                                                                                       Bootstrapping list of brokers.2group.id                                                                                                                                                                                         Assigns an individual consumer to a group.3enable.auto.commit                                                                                                                                                                           Enable auto commit for offsets if the value is true, otherwise not committed.4auto.commit.interval.ms                                                                                                                                                                 Return how often updated consumed offsets are written to ZooKeeper.5session.timeout.ms                                                                                                                                                                 Indicates how many milliseconds Kafka will wait for the ZooKeeper to respond to a request (read or write) before giving up and continuing to consume messages.



代码

以下代码已经放到 github http://github.com/lymboy/kafka1.git
在开始编码之前,先在假造机中启动 zookeeper 和创建一个 topic
笔者利用的 IDEA, 新建是 maven 项目,

   
       org.apache.kafka
       kafka_2.12
       2.1.0
   
生产者:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
* @author sairo
* @date 19-2-21
*/
public class MyKafkaProducer {

   public static void main(String[] args) {
       Properties props = new Properties();
       //设置服务器地点
       props.put("bootstrap.servers", "cluster01:9092");
       //设置对全部哀求应答
       props.put("acks", "all");
       //假如哀求失败,kafka可以主动重试
       props.put("retries", 0);
       //设置缓冲区巨细
       props.put("batch.size", 16384);
       //淘汰小于0的哀求数
       props.put("linger.ms", 1);
       //buffer.memory控制生产者可用于缓冲的总内存量。
       props.put("buffer.memory", 33554432);
       //设置序列化器
       props.put("key.serializer",
               "org.apache.kafka.common.serialization.StringSerializer");
       props.put("value.serializer",
               "org.apache.kafka.common.serialization.StringSerializer");

       Producer producer = new KafkaProducer(props);
<span role="presentation" style="box-sizing: border-box;padding-right: 0px;">        for(int i = 0; i <span class="cm-operator" style="box-sizing: border-box;color: rgb(152, 26, 26);">




上一篇:MongoDB学习篇:MongoDB的摆设和根本操纵
下一篇:更换centos默认源与安装vim编辑器(快速上手Linux第二章2-2) ...
您需要登录后才可以回帖 登录 | 加入社区

本版积分规则

 

QQ|申请友链|小黑屋|手机版|Hlshell Inc. ( 豫ICP备16002110号-5 )

GMT+8, 2024-5-3 02:10 , Processed in 0.078890 second(s), 47 queries .

HLShell有权修改版权声明内容,如有任何爭議,HLShell將保留最終決定權!

Powered by Discuz! X3.4

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表