查看“Java 自定义方法 - KAFKA”的源代码
←
Java 自定义方法 - KAFKA
跳到导航
跳到搜索
因为以下原因,您没有权限编辑本页:
您请求的操作仅限属于该用户组的用户执行:
用户
您可以查看和复制此页面的源代码。
Java 自定义函数 - KAFKA === Producer === producer 是线程安全的。 Producer<String, String> producer ==== 消息发送方式 ==== # 异步,默认方法 producer.send(new ProducerRecord<String, String>(topic1, s)); # 异步,回调 producer.send(new ProducerRecord<String, String>(topic1, s), (RecordMetadata metadata, Exception e) -> {}); # 同步 RecordMetadata metadata = producer.send(new ProducerRecord<String, String>(topic1, s)).get(); ==== 消息分区策略 ==== 消息键保序(key-ordering )策略:Kafka 中每条消息都可以定义 Key,同一个 Key 的所有消息都进入到相同的分区中。否则采用轮询(Round-robin)、随机策略(Randomness)等策略。 producer.send(new ProducerRecord<String, String>(TOPIC, key, val) -.OR.- producer.send(new ProducerRecord<String, String>(TOPIC, val) ==== 集群同步规则 ==== ack 配置项用来控制 producer 要求 leader 确认多少消息后返回调用成功 * 0 不需要等待任何确认消息 * 1 需要等待leader确认 * -1或all 需要全部 ISR 集合返回确认,默认值 ack=-1 # 默认需要全部 ISR 集合返回确认 ==== 缓冲区 ==== 缓冲一批数据发送或超过延时时间(linger.ms=0)发送,默认 16k。 batch.size=1048576 # 1M linger.ms=10 # 10ms ==== 消息重发 ==== 消息发送错误时设置的系统重发消息次数(retries=2147483647)、重发间隔(retry.backoff.ms=100)。若保证消息的有序性,设置 max_in_flight_requests_per_connection=1。 max_in_flight_requests_per_connection=1 retries=10 retry.backoff.ms=200 ==== 压缩方式 ==== 发送的所有数据的压缩方式:none(默认), gzip, snappy, lz4, zstd。 compression.type=lz4 ==== 其他 ==== enable.idempotence=true # 默认开启幂等性 bootstrap.servers=node1:9092,node2:9092,... # 并非需要所有的broker地址,生产者可以从给定的 broker 里查找到其他 broker 信息 === Consumer === ==== 设置每次消费最大记录数 ==== 默认值为 500 条。 Properties props = new Properties(); props.put("max.poll.records", N); ... consumer = new KafkaConsumer(props); props 配置变化,需要 new consumer。 当 N 较大时,需要设置 fetch.max.bytes(默认值 52428800,单位 byte,下同),该参数与 fetch.min.bytes(默认值 1)参数对应,用来配置 Consumer 在一次请求中从 Kafka 中拉取的多批总数据量大小。若一批次的数据大于该值,仍然可以拉取。 ==== 获取消费记录 ==== 直到取到 max.poll.records 条记录或超过指定时长(如下面语句为 1000 毫秒)。 KafkaConsumer<String, String> consumer; consumer.poll(Duration.ofMillis(1000)); === 常见问题 === * Cannot invoke "org.apache.kafka.clients.consumer.OffsetAndMetadata.offset()" because "committed" is null 在下列语句中可能会遇到此类问题,原因是该 topic & group 从未被消费过。 OffsetAndMetadata committed = consumer.committed(topicAndPartition); commitOffsetMap.put(topicAndPartition.partition(), committed.offset()); 需要判断 committed==null,committed.offset()替换为零。 === Sample === {| class="wikitable" |+ !No !Method !Explain !Example |- | |KAFKA |定义 kafka,默认从 db.cnf 中取 default |KAFKA k1 = new KAFKA("kafkap182", "db.cnf") |- |1 |reset |重设消费位置 |k1.reset("test", "test", N) # 从N开始消费 |- |2 |get |获取记录 |k1.get(topic1, group1) # 默认条数,500 k1.getStream(topic1, group1, N) # 流 -> ConsumerRecords<String, String> cr |- |3 |put |生产记录 |k1.put("test", ldat1) # ArrayList<String> ldat1 |- |4 |getTopics |获取集群上 topic 列表 |ArrayList<String> l1 = k1.getTopics() |- |5 |getos |begin, end, offset |k1.getos(topic1, group1) # [0, 16, 8] |} === GIT === https://github.com/ldscfe/udefj2/blob/main/src/main/com/udf/KAFKA.java [[分类:Develop]] [[分类:Java]]
返回
Java 自定义方法 - KAFKA
。
导航菜单
个人工具
登录
命名空间
页面
讨论
大陆简体
查看
阅读
查看源代码
查看历史
更多
搜索
导航
首页
最近更改
随机页面
目录
文章分类
侧边栏
帮助
工具
链入页面
相关更改
特殊页面
页面信息