Java 自定义方法 - KAFKA:修订间差异
无编辑摘要 |
|||
(未显示同一用户的11个中间版本) | |||
第4行: | 第4行: | ||
=== Producer === | === Producer === | ||
producer 是线程安全的。 | producer 是线程安全的。 | ||
Producer<String, String> producer | |||
==== 消息发送方式 ==== | |||
# 异步,默认方法 | |||
producer.send(new ProducerRecord<String, String>(topic1, s)); | |||
# 异步,回调 | |||
producer.send(new ProducerRecord<String, String>(topic1, s), (RecordMetadata metadata, Exception e) -> {}); | |||
# 同步 | |||
RecordMetadata metadata = producer.send(new ProducerRecord<String, String>(topic1, s)).get(); | |||
==== 消息分区策略 ==== | |||
消息键保序(key-ordering )策略:Kafka 中每条消息都可以定义 Key,同一个 Key 的所有消息都进入到相同的分区中。否则采用轮询(Round-robin)、随机策略(Randomness)等策略。 | 消息键保序(key-ordering )策略:Kafka 中每条消息都可以定义 Key,同一个 Key 的所有消息都进入到相同的分区中。否则采用轮询(Round-robin)、随机策略(Randomness)等策略。 | ||
producer.send(new ProducerRecord<String, String>(TOPIC, key, val) | producer.send(new ProducerRecord<String, String>(TOPIC, key, val) | ||
-.OR.- | -.OR.- | ||
producer.send(new ProducerRecord<String, String>(TOPIC, val) | producer.send(new ProducerRecord<String, String>(TOPIC, val) | ||
==== 集群同步规则 ==== | |||
ack 配置项用来控制 producer 要求 leader 确认多少消息后返回调用成功 | |||
* 0 不需要等待任何确认消息 | |||
* 1 需要等待leader确认 | |||
* -1或all 需要全部 ISR 集合返回确认,默认值 | |||
ack=-1 # 默认需要全部 ISR 集合返回确认 | |||
==== 缓冲区 ==== | |||
缓冲一批数据发送或超过延时时间(linger.ms=0)发送,默认 16k。 | |||
batch.size=1048576 # 1M | |||
linger.ms=10 # 10ms | |||
==== 消息重发 ==== | |||
消息发送错误时设置的系统重发消息次数(retries=2147483647)、重发间隔(retry.backoff.ms=100)。若保证消息的有序性,设置 max_in_flight_requests_per_connection=1。 | |||
max_in_flight_requests_per_connection=1 | |||
retries=10 | |||
retry.backoff.ms=200 | |||
==== 压缩方式 ==== | |||
发送的所有数据的压缩方式:none(默认), gzip, snappy, lz4, zstd。 | |||
compression.type=lz4 | |||
==== 其他 ==== | |||
enable.idempotence=true # 默认开启幂等性 | |||
bootstrap.servers=node1:9092,node2:9092,... # 并非需要所有的broker地址,生产者可以从给定的 broker 里查找到其他 broker 信息 | |||
=== Consumer === | === Consumer === | ||
第24行: | 第55行: | ||
props 配置变化,需要 new consumer。 | props 配置变化,需要 new consumer。 | ||
当 N | 当 N 较大时,需要设置 fetch.max.bytes(默认值 52428800,单位 byte,下同),该参数与 fetch.min.bytes(默认值 1)参数对应,用来配置 Consumer 在一次请求中从 Kafka 中拉取的多批总数据量大小。若一批次的数据大于该值,仍然可以拉取。 | ||
==== 获取消费记录 ==== | ==== 获取消费记录 ==== | ||
第31行: | 第62行: | ||
consumer.poll(Duration.ofMillis(1000)); | consumer.poll(Duration.ofMillis(1000)); | ||
=== 常见问题 === | |||
* Cannot invoke "org.apache.kafka.clients.consumer.OffsetAndMetadata.offset()" because "committed" is null | |||
在下列语句中可能会遇到此类问题,原因是该 topic & group 从未被消费过。 | |||
OffsetAndMetadata committed = consumer.committed(topicAndPartition); | |||
commitOffsetMap.put(topicAndPartition.partition(), committed.offset()); | |||
需要判断 committed==null,committed.offset()替换为零。 | |||
=== Sample === | |||
{| class="wikitable" | |||
|+ | |||
!No | |||
!Method | |||
!Explain | |||
!Example | |||
|- | |||
| | |||
|KAFKA | |||
|定义 kafka,默认从 db.cnf 中取 default | |||
|KAFKA k1 = new KAFKA("kafkap182", "db.cnf") | |||
|- | |||
|1 | |||
|reset | |||
|重设消费位置 | |||
|k1.reset("test", "test", N) # 从N开始消费 | |||
|- | |||
|2 | |||
|get | |||
|获取记录 | |||
|k1.get(topic1, group1) # 默认条数,500 | |||
k1.getStream(topic1, group1, N) # 流 -> ConsumerRecords<String, String> cr | |||
|- | |||
|3 | |||
|put | |||
|生产记录 | |||
|k1.put("test", ldat1) # ArrayList<String> ldat1 | |||
|- | |||
|4 | |||
|getTopics | |||
|获取集群上 topic 列表 | |||
|ArrayList<String> l1 = k1.getTopics() | |||
|- | |||
|5 | |||
|getos | |||
|begin, end, offset | |||
|k1.getos(topic1, group1) # [0, 16, 8] | |||
|} | |||
=== GIT === | |||
https://github.com/ldscfe/devudefj2/blob/main/src/main/com/udf/KAFKA.java | |||
[[分类:Develop]] | [[分类:Develop]] | ||
[[分类:Java]] | [[分类:Java]] |
2024年10月30日 (三) 16:05的最新版本
Java 自定义函数 - KAFKA
Producer
producer 是线程安全的。
Producer<String, String> producer
消息发送方式
# 异步,默认方法 producer.send(new ProducerRecord<String, String>(topic1, s)); # 异步,回调 producer.send(new ProducerRecord<String, String>(topic1, s), (RecordMetadata metadata, Exception e) -> {}); # 同步 RecordMetadata metadata = producer.send(new ProducerRecord<String, String>(topic1, s)).get();
消息分区策略
消息键保序(key-ordering )策略:Kafka 中每条消息都可以定义 Key,同一个 Key 的所有消息都进入到相同的分区中。否则采用轮询(Round-robin)、随机策略(Randomness)等策略。
producer.send(new ProducerRecord<String, String>(TOPIC, key, val) -.OR.- producer.send(new ProducerRecord<String, String>(TOPIC, val)
集群同步规则
ack 配置项用来控制 producer 要求 leader 确认多少消息后返回调用成功
- 0 不需要等待任何确认消息
- 1 需要等待leader确认
- -1或all 需要全部 ISR 集合返回确认,默认值
ack=-1 # 默认需要全部 ISR 集合返回确认
缓冲区
缓冲一批数据发送或超过延时时间(linger.ms=0)发送,默认 16k。
batch.size=1048576 # 1M linger.ms=10 # 10ms
消息重发
消息发送错误时设置的系统重发消息次数(retries=2147483647)、重发间隔(retry.backoff.ms=100)。若保证消息的有序性,设置 max_in_flight_requests_per_connection=1。
max_in_flight_requests_per_connection=1 retries=10 retry.backoff.ms=200
压缩方式
发送的所有数据的压缩方式:none(默认), gzip, snappy, lz4, zstd。
compression.type=lz4
其他
enable.idempotence=true # 默认开启幂等性 bootstrap.servers=node1:9092,node2:9092,... # 并非需要所有的broker地址,生产者可以从给定的 broker 里查找到其他 broker 信息
Consumer
设置每次消费最大记录数
默认值为 500 条。
Properties props = new Properties(); props.put("max.poll.records", N); ... consumer = new KafkaConsumer(props);
props 配置变化,需要 new consumer。
当 N 较大时,需要设置 fetch.max.bytes(默认值 52428800,单位 byte,下同),该参数与 fetch.min.bytes(默认值 1)参数对应,用来配置 Consumer 在一次请求中从 Kafka 中拉取的多批总数据量大小。若一批次的数据大于该值,仍然可以拉取。
获取消费记录
直到取到 max.poll.records 条记录或超过指定时长(如下面语句为 1000 毫秒)。
KafkaConsumer<String, String> consumer; consumer.poll(Duration.ofMillis(1000));
常见问题
- Cannot invoke "org.apache.kafka.clients.consumer.OffsetAndMetadata.offset()" because "committed" is null
在下列语句中可能会遇到此类问题,原因是该 topic & group 从未被消费过。 OffsetAndMetadata committed = consumer.committed(topicAndPartition); commitOffsetMap.put(topicAndPartition.partition(), committed.offset()); 需要判断 committed==null,committed.offset()替换为零。
Sample
No | Method | Explain | Example |
---|---|---|---|
KAFKA | 定义 kafka,默认从 db.cnf 中取 default | KAFKA k1 = new KAFKA("kafkap182", "db.cnf") | |
1 | reset | 重设消费位置 | k1.reset("test", "test", N) # 从N开始消费 |
2 | get | 获取记录 | k1.get(topic1, group1) # 默认条数,500
k1.getStream(topic1, group1, N) # 流 -> ConsumerRecords<String, String> cr |
3 | put | 生产记录 | k1.put("test", ldat1) # ArrayList<String> ldat1 |
4 | getTopics | 获取集群上 topic 列表 | ArrayList<String> l1 = k1.getTopics() |
5 | getos | begin, end, offset | k1.getos(topic1, group1) # [0, 16, 8] |
GIT
https://github.com/ldscfe/devudefj2/blob/main/src/main/com/udf/KAFKA.java