Java 自定义方法 - KAFKA:修订间差异
跳到导航
跳到搜索
无编辑摘要 |
(→消费) |
||
第1行: | 第1行: | ||
Java 自定义函数 - KAFKA | Java 自定义函数 - KAFKA | ||
=== Producer === | |||
producer 是线程安全的。 | |||
ack 配置项用来控制producer要求leader确认多少消息后返回调用成功 | |||
* 0 不需要等待任何确认消息 | |||
* 1 需要等待leader确认 | |||
* -1或all 需要全部 ISR 集合返回确认,默认值 | |||
=== Consumer === | |||
==== 设置每次消费最大记录数 ==== | ==== 设置每次消费最大记录数 ==== | ||
默认值为 500 条。 | 默认值为 500 条。 |
2024年3月7日 (四) 10:39的版本
Java 自定义函数 - KAFKA
Producer
producer 是线程安全的。
ack 配置项用来控制producer要求leader确认多少消息后返回调用成功
- 0 不需要等待任何确认消息
- 1 需要等待leader确认
- -1或all 需要全部 ISR 集合返回确认,默认值
Consumer
设置每次消费最大记录数
默认值为 500 条。
Properties props = new Properties(); props.put("max.poll.records", N); ... consumer = new KafkaConsumer(props);
props 配置变化,需要 new consumer。
当 N 较大时,需要相应设置 fetch.max.bytes,该参数与 fetch.min.bytes(默认值 1B) 参数对应,它用来配置 Consumer 在一次拉取请求中从Kafka中拉取的最大数据量,默认值为52428800(B),也就是50MB。
获取消费记录
直到取到 max.poll.records 条记录或超过指定时长(如下面语句为 1000 毫秒)。
KafkaConsumer<String, String> consumer; consumer.poll(Duration.ofMillis(1000));