kafka基本操作
Kafka命令行操作
创建 topic
1 | ./kafka-topics.sh --zookeeper master:2181,slave1:2181,slave2:2181 --create --replication-factor 3 --partitions 3 --topic test_2 |
1 | --replication-factor 副本数量 |
手动指定副本的存储位置
1
./kafka-topics.sh --create --topic test_3 --zookeeper master:2181 --replica-assignment 0:1,1:2
删除 topic
1 | ./kafka-topics.sh --delete --topic tpc_1 --zookeeper master:2181 |
1 | 异步线程去删除)删除 topic,需要一个参数处于启用状态: delete.topic.enable = true,否则删不掉 |
查看 topic
列出当前系统中的所有 topic
1
kafka-topics.sh --list --zookeeper master:2181
查看 topic 详细信息
1
2
3./kafka-topics.sh --create --topic tpc_1 --zookeeper master:2181 --replica-assignment 0:1,1:2
./kafka-topics.sh --describe --topic tpc_1 --zookeper master:2181
增加分区数
1 | ./kafka-topics.sh --alter --topic test_3 --partitions 3 --zookeeper master:2181 |
1 | Kafka 只支持增加分区,不支持减少分区 |
动态配置 topic 参数
通过管理命令,可以为已创建的 topic 增加、修改、删除 topic level 参数
添加、修改配置参数(开启压缩发送传输种提高kafka消息吞吐量的有效办法(‘gzip’, ‘snappy’, ‘lz4’, ‘zstd’))
1
./kafka-configs.sh --zookeeper master:2181 --entity-type topics --entity-name tpc_1 --alter --add-config compression.type=gzip
删除配置参数
1
./kafka-configs.sh --zookeeper master:2181 --entity-type topics --entity-name tpc_1 --alter --delete-config compression.type
Kafka命令行生产者与消费者操作
生产者:kafka-console-producer
1
./kafka-console-producer.sh --broker-list master:9092, slave1:9092, slave2:9092 --topic tpc_1
消费者:kafka-console-consumer
消费消息
1
./kafka-console-consumer.sh --bootstrap-server master:9092, slave1:9092, slave2:9092 --topic tpc_1 --from-beginning
指定要消费的分区,和要消费的起始 offset
1
./kafka-console-consumer.sh --bootstrap-server master:9092,slave1:9092,slave2:9092 --topic tcp_1 --offset 2 --partition 0
配置管理 kafka-configs
比如查看 topic 的配置可以按如下方式执行:
1
./kafka-configs.sh zookeeper master: 2181 --describe --entity-type topics --entity-name tpc_2
比如查看 broker 的动态配置可以按如下方式执行:
1
./kafka-configs.sh zookeeper master: 2181 --describe --entity-type brokers --entity-name 0 --zookeeper master:2181
Kafka生产者api示例
生产者api示例
一个正常的生产逻辑需要具备以下几个步骤
1 | (1)配置生产者客户端参数及创建相应的生产者实例 |
- 示例代码(部分截取)
1 | Properties props = new Properties(); |
- 消息对象 ProducerRecord,它并不是单纯意义上的消息,它包含了多个属性,原本需要发送的与业务关的消息体只是其中的一个 value 属性 ,比“ Hello, rgzn!”只是 ProducerRecord 对象的一个属性。
1 | ProducerRecord 类的定义如下: |
必要参数配置
- 在创建真正的生产者实例前需要配置相应的参数,比如需要连接的 Kafka 集群地址。在 Kafka 生产者客户端 KatkaProducer 中有 3 个参数是必填的。
1 | * bootstrap.servers |
- 为了防止参数名字符串书写错误,可以使用如下方式进行设置:
1 | props.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,ProducerInterceptorPrefix.class.getName()); |
生产者api参数发送方式
这个客户端经过了生产环境测试并且通常情况它比原来Scals客户端更加快速、功能更加齐全。你可以通过添加以下示例的Maven坐标到客户端依赖中来使用这个新的客户端(你可以修改版本号来使用新的发布版本):
1 | <dependency> |
发后即忘( fire-and-forget)
发后即忘,它只管往 Kafka 发送,并不关心消息是否正确到达。在大多数情况下,这种发送方式没有问题; 不过在某些时候(比如发生不可重试异常时)会造成消息的丢失。这种发送方式的性能最高,可靠性最差。
1 | Future<RecordMetadata> send = producer.send(rcd); |
同步发送(sync )
0.8.x 前,有一个参数 producer.type=sycn|asycn 来决定生产者的发送模式;现已失效(新版中,producer 在底层只有异步)
1 | try { |
在调用 send
方法后可以接着调用 get()
方法,send
方法的返回值是一个 Future\对象,RecordMetadata 里面包含了发送消息的主题、分区、偏移量等信息。改写后的代码如下:
1 | for (int i = 0; i < 10; i++) { |
此时得到的输出如下:偏移量和调用次数有关,所有记录都分配到了 0 分区,这是因为在创建 Hello-Kafka
主题时候,使用 --partitions
指定其分区数为 1,即只有一个分区。
1 | topic=Hello-Kafka, partition=0, offset=40 |
异步发送(async )
回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是 RecordMetadata 和Exception,如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。
1 | 注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试 |
通常我们并不关心发送成功的情况,更多关注的是失败的情况,因此 Kafka 提供了异步发送和回调函数。 代码如下:
1 | for (int i = 0; i < 10; i++) { |
生产者原理解析
1 | 0、新建kafka生产实例,参数也是放在kafkaProducer里面 |
消费者API
一个正常的消费逻辑需要具备以下几个步骤:
1 | (1)配置消费者客户端参数 |
- 消费者API示例代码(部分截取)
1 | Properties props = new Properties(); |
Kafka消费者可选属性
1 | 1. fetch.min.byte |
- 必要参数配置
1 | Properties props = new Properties(); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); |
subscribe 订阅主题
subscribe 有如下重载方法:
1
2
3
4public void subscribe(Collection<String> topics,ConsumerRebalanceListener listener)
public void subscribe(Collection<String> topics)
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
public void subscribe(Pattern pattern)指定集合方式订阅主题
1
2consumer.subscribe(Arrays.asList(topic1));
consumer subscribe(Arrays.asList(topic2));正则方式订阅主题
如果消费者采用的是正则表达式的方式(subscribe(Pattern))订阅, 在之后的过程中,如果有人又创建了新的主题,并且主题名字与正表达式相匹配,那么这个消费者就可以消费到新添加的主题中的消息。如果应用程序需要消费多个主题,并且可以处理不同的类型,那么这种订阅方式就很有效。
正则表达式的方式订阅的示例如下
1
consumer.subscribe(Pattern.compile ("topic.*" ));
assign 订阅主题
消费者不仅可以通过 KafkaConsumer.subscribe() 方法订阅主题,还可直接订阅某些主题的指定分区;
在 KafkaConsumer 中提供了 assign() 方法来实现这些功能,此方法的具体定义如下:
1
public void assign(Collection<TopicPartition> partitions);
1
这个方法只接受参数 partitions,用来指定需要订阅的分区集合。
1
consumer.assign(Arrays.asList(new TopicPartition ("tpc_1" , 0),new TopicPartition(“tpc_2”,1))) ;
subscribe 与 assign 的区别
通过 subscribe()方法订阅主题具有消费者自动再均衡功能 ;
1
在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。 当消费组的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。
assign() 方法订阅分区时,是不具备消费者自动均衡的功能的;
1
其实这一点从 assign()方法参数可以看出端倪,两种类型 subscribe()都有 ConsumerRebalanceListener 类型参数的方法,而 assign()方法却没有。
取消订阅
可以使用 KafkaConsumer 中的 unsubscribe()方法采取消主题的订阅,这个方法既可以取消通过subscribe( Collection)方式实现的订阅; 也可以取消通过 subscribe(Pattem)方式实现的订阅,还可以取消通过 assign( Collection)方式实现的订阅。示例码如下:
1 | consumer.unsubscribe(); |
如果将 subscribe(Collection )或 assign(Collection)集合参数设置为空集合,作用与 unsubscribe()方法相同,如下示例中三行代码的效果相同:
1 | consumer.unsubscribe(); |
消息的消费模式
Kafka 中的消费是基于拉取模式的。消息的消费一般有两种模式:推送模式和拉取模式。推模式是服务端主动将消息推送给消费者,而拉模式是消费者主动向服务端发起请求来拉取消息。
对于 poll () 方法而言,如果某些分区中没有可供消费的消息,那么此分区对应的消息拉取的结果就为空如果订阅的所有分区中都没有可供消费的消息,那么 poll()方法返回为空的消息集; poll () 方法具体定义如下:
public ConsumerRecords
超时时间参数 timeout , 用来控制 poll() 方法的阻塞时间, 在消费者的缓冲区里没有可用数据时会发生阻塞。如果消费者程序只用来单纯拉取并消费数据,则为了提高吞吐率,可以把 timeout 设置为Long.MAX_VALUE;
消费者消费到的每条消息的类型为 ConsumerRecord
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public class ConsumerRecord<K, V> {
public static final long NO_TIMESTAMP = RecordBatch.NO_TIMESTAMP;
public static final int NULL_SIZE = -1;
public static final int NULL_CHECKSUM = -1;
private final String topic;
private final int partition;
private final long offset;
private final long timestamp;
private final TimestampType timestampType;
private final int serializedKeySize;
private final int serializedValueSize;
private final Headers headers;
private final K key;
private final V value;
private volatile Long checksum;1
2
3
4
5
6
7
8
9
10
11
12
13
14
15topic partition 这两个字段分别代表消息所属主题的名称和所在分区的编号。
offsset 表示消息在所属分区的偏移量。
timestamp 表示时间戳,与此对应的 timestampType 表示时间戳的类型。
timestampType 有两种类型 CreateTime 和 LogAppendTime , 分别代表消息创建的时间戳和消息追加到日志的时间戳。
headers 表示消息的头部内容。
key value 分别表示消息的键和消息的值,一般业务应用要读取的就是 value ;
serializedKeySize、serializedValueSize 分别表示 key、value 经过序列化之后的大小,如果 key 为空, 则 serializedKeySize 值为 -1,同样,如果 value 为空,则 serializedValueSize 的值也会为 -1;
checksum 是 CRC32 的校验值。
指定位移消费
有些时候,我们需要一种更细粒度的掌控,可以让我们从特定的位移处开始拉取消息,而KafkaConsumer 中的 seek() 方法正好提供了这个功能,让我们可以追前消费或回溯消费。
seek()方法的具体定义如下:
1
public void seek(TopicPartiton partition,long offset);
再均衡监听器
一个消费组中,一旦有消费者的增减发生,会触发消费者组的 rebalance 再均衡; 如果 A 消费者消费掉的一批消息还没来得及提交 offset, 而它所负责的分区在 rebalance 中转移给了 B 消费者,则有可能发生数据的重复消费处理。此情形下,可以通过再均衡监听器做一定程度的补救;
自动位移提交
Kafka 中默认的消费位移的提交方式是自动提交,这个由消费者客户端参数 enable.auto.commit 配置, 默认值为 true 。当然这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交,这个定期的周期时间由客户端参数 auto.commit.interval.ms 配置, 默认值为 5 秒, 此参数生效的前提是 enable.
auto.commit 参数为 true。
在默认的方式下,消费者每隔 5 秒会将拉取到的每个分区中最大的消息位移进行提交。自动位移提交的动作是在 poll() 方法的逻辑里完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移。
Kafka 消费的编程逻辑中位移提交是一大难点,自动提交消费位移的方式非常简便,它免去了复杂的位移提交逻辑,让编码更简洁。但随之而来的是重复消费和消息丢失的问题。
重复消费
假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那么又得从上一次位移提交的地方重新开始消费,这样便发生了重复消费的现象(对于再均衡的情况同样适用)。我们可以通过减小位移提交的时间间隔来减小重复消息的窗口大小,但这样并不能避免重复消费的发送,而且也会使位移提交更加频繁。
丢失消息
按照一般思维逻辑而言,自动提交是延时提交,重复消费可以理解,那么消息丢失又是在什么情形下会发生的呢?我们来看下图中的情形: 拉取线程不断地拉取消息并存入本地缓存, 比如在 BlockingQueue 中, 另一个处理线程从缓存中读取消息并进行相应的逻辑处理。设目前进行到了第 y+l 次拉取,以及第 m 次位移提交的时候,也就是x+6 之前的位移己经确认提交了, 处理线程却还正在处理 x+3 的消息; 此时如果处理线程发生了异常, 待其恢复之后会从第 m 次位移提交处,也就是 x+6 的位置开始拉取消息,那么 x+3 至 x+6 之间的消息就没有得到相应的处理,这样便发生消息丢失的现象。
手动位移提交(调用 kafka api)
自动位移提交的方式在正常情况下不会发生消息丢失或重复消费的现象, 但是在编程的世界里异常无可避免; 同时, 自动位移提交也无法做到精确的位移管理。 在 Kafka 中还提供了手动位移提交的方式, 这样可以使得开发人员对消费位移的管理控制更加灵活。
很多时候并不是说拉取到消息就算消费完成,而是需要将消息写入数据库、写入本地缓存,或者是更加复杂的业务处理。在这些场景下,所有的业务处理完成才能认为消息被成功消费; 手动的提交方式可以让开发人员根据程序的逻辑在合适的地方进行位移提交。 开启手动提交功能的前提是消费者客户端参数 enable.auto.commit 配置为 fals ,示例如下:
1 | props.put(ConsumerConf.ENABLE_AUTO_COMMIT_CONFIG, false); |
手动提交可以细分为同步提交和异步提交,对应于 KafkaConsumer 中的 commitSync()和commitAsync()两种类型的方法。
Topic管理 API
一般情况下,我们都习惯使用 kafka-topic.sh 本来管理主题,如果希望将管理类的功能集成到公司内部的系统中,打造集管理、监控、运维、告警为一体的生态平台,那么就需要以程序调用 API 方式去实现。这种调用 API 方式实现管理主要利用 KafkaAdminClient 工具类KafkaAdminClient 不仅可以用来管理 broker、配置和 ACL (Access Control List),还可用来管理主题)
列出主题
1 | ListTopicsResult listTopicsResult = adminClient.listTopics(); |
查看主题信息
1 | DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList("tpc_4", "tpc_3")); |
创建主题
- 代码示例(部分截取)
1 | // 参数配置 |
删除主题
1 | DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("tpc_1", "tpc_1")); |
其他管理
除了进行 topic 管理之外,KafkaAdminClient 也可以进行诸如动态参数管理,分区管理等各类管理操作;
powershell