Java 自定义方法 - KAFKA:修订间差异
跳到导航
跳到搜索
(创建页面,内容为“Java 自定义函数 - KAFKA === 每次消费最大记录数 === Properties props = new Properties(); props.put("max.poll.records", N); ... consumer = new KafkaConsumer(props); props 配置变化,需要 new consumer。当 N 较大时,需要相应设置 fetch.max.bytes。 KafkaConsumer<String, String> consumer; consumer.poll(Duration.ofMillis(1000)); 单位为毫秒,在指定时间内,最多消费 max.poll.records 记录。 分类:Develop …”) |
无编辑摘要 |
||
第1行: | 第1行: | ||
Java 自定义函数 - KAFKA | Java 自定义函数 - KAFKA | ||
=== | === 消费 === | ||
==== 设置每次消费最大记录数 ==== | |||
默认值为 500 条。 | |||
Properties props = new Properties(); | Properties props = new Properties(); | ||
props.put("max.poll.records", N); | props.put("max.poll.records", N); | ||
... | ... | ||
consumer = new KafkaConsumer(props); | consumer = new KafkaConsumer(props); | ||
props 配置变化,需要 new | props 配置变化,需要 new consumer。 | ||
当 N 较大时,需要相应设置 fetch.max.bytes,该参数与 fetch.min.bytes(默认值 1B) 参数对应,它用来配置 Consumer 在一次拉取请求中从Kafka中拉取的最大数据量,默认值为52428800(B),也就是50MB。 | |||
==== 获取消费记录 ==== | |||
直到取到 max.poll.records 条记录或超过指定时长(如下面语句为 1000 毫秒)。 | |||
KafkaConsumer<String, String> consumer; | KafkaConsumer<String, String> consumer; | ||
consumer.poll(Duration.ofMillis(1000)); | consumer.poll(Duration.ofMillis(1000)); | ||
[[分类:Develop]] | [[分类:Develop]] | ||
[[分类:Java]] | [[分类:Java]] |
2024年3月7日 (四) 09:10的版本
Java 自定义函数 - KAFKA
消费
设置每次消费最大记录数
默认值为 500 条。
Properties props = new Properties(); props.put("max.poll.records", N); ... consumer = new KafkaConsumer(props);
props 配置变化,需要 new consumer。
当 N 较大时,需要相应设置 fetch.max.bytes,该参数与 fetch.min.bytes(默认值 1B) 参数对应,它用来配置 Consumer 在一次拉取请求中从Kafka中拉取的最大数据量,默认值为52428800(B),也就是50MB。
获取消费记录
直到取到 max.poll.records 条记录或超过指定时长(如下面语句为 1000 毫秒)。
KafkaConsumer<String, String> consumer; consumer.poll(Duration.ofMillis(1000));