|
|
(未显示同一用户的5个中间版本) |
第62行: |
第62行: |
| consumer.poll(Duration.ofMillis(1000)); | | consumer.poll(Duration.ofMillis(1000)); |
|
| |
|
| ====Sample==== | | === 常见问题 === |
| | * Cannot invoke "org.apache.kafka.clients.consumer.OffsetAndMetadata.offset()" because "committed" is null |
| | 在下列语句中可能会遇到此类问题,原因是该 topic & group 从未被消费过。 |
| | OffsetAndMetadata committed = consumer.committed(topicAndPartition); |
| | commitOffsetMap.put(topicAndPartition.partition(), committed.offset()); |
| | 需要判断 committed==null,committed.offset()替换为零。 |
| | |
| | === Sample === |
| {| class="wikitable" | | {| class="wikitable" |
| |+ | | |+ |
第73行: |
第80行: |
| |KAFKA | | |KAFKA |
| |定义 kafka,默认从 db.cnf 中取 default | | |定义 kafka,默认从 db.cnf 中取 default |
| |KAFKA k1 = new KAFKA("kafkap182", "test", "test", "db.cnf") | | |KAFKA k1 = new KAFKA("kafkap182", "db.cnf") |
| |- | | |- |
| |1 | | |1 |
| |consumer | | |reset |
| |改变消费模式 | | |重设消费位置 |
| |k1.consumer("test", "test", N) # 从N开始消费 | | |k1.reset("test", "test", N) # 从N开始消费 |
| |- | | |- |
| |2 | | |2 |
| |get | | |get |
| |获取记录 | | |获取记录 |
| |k1.get(9) # 指定条数 | | |k1.get(topic1, group1) # 默认条数,500 |
| k1.get() # 默认条数,1000 | | k1.getStream(topic1, group1, N) # 流 -> ConsumerRecords<String, String> cr |
| |- | | |- |
| |3 | | |3 |
| |getos
| |
| |begin, end, offset
| |
| |k1.getos() # [0, 16, 8]
| |
| |-
| |
| |4
| |
| |put | | |put |
| |生产记录 | | |生产记录 |
| |k1.put("test", ldat1) # ArrayList<String> ldat1 | | |k1.put("test", ldat1) # ArrayList<String> ldat1 |
| | |- |
| | |4 |
| | |getTopics |
| | |获取集群上 topic 列表 |
| | |ArrayList<String> l1 = k1.getTopics() |
| | |- |
| | |5 |
| | |getos |
| | |begin, end, offset |
| | |k1.getos(topic1, group1) # [0, 16, 8] |
| |} | | |} |
| package com.udf.base;
| | |
| /*
| | === GIT === |
| ------------------------------------------------------------------------------
| | https://github.com/ldscfe/devudefj2/blob/main/src/main/com/udf/KAFKA.java |
| Name : Udf.base.KAFKA
| |
| Purpose : Kafka product & consumer
| |
| Author : Adam
| |
| Revisions:
| |
| Ver Date Author Description
| |
| --------- ---------- --------------- ------------------------------------
| |
| 1.0 2024/3/5 Adam Create.
| |
|
| |
| format:
| |
| property: consumer
| |
| method : get, put
| |
|
| |
| <!--Kafka-->
| |
| <dependency>
| |
| <groupId>org.apache.kafka</groupId>
| |
| <artifactId>kafka-clients</artifactId>
| |
| <version>2.0.0</version>
| |
| </dependency>
| |
| <dependency>
| |
| <groupId>org.apache.kafka</groupId>
| |
| <artifactId>kafka_2.11</artifactId>
| |
| <version>0.10.0.1</version>
| |
| </dependency>
| |
| <dependency>
| |
| <groupId>org.apache.kafka</groupId>
| |
| <artifactId>kafka-streams</artifactId>
| |
| <version>1.0.0</version>
| |
| </dependency>
| |
|
| |
| ------------------------------------------------------------------------------
| |
| */
| |
|
| |
| import org.apache.kafka.clients.admin.AdminClient;
| |
| import org.apache.kafka.clients.admin.KafkaAdminClient;
| |
| import org.apache.kafka.clients.consumer.ConsumerRecord;
| |
| import org.apache.kafka.clients.consumer.ConsumerRecords;
| |
| import org.apache.kafka.clients.consumer.KafkaConsumer;
| |
| import org.apache.kafka.clients.consumer.OffsetAndMetadata;
| |
| import org.apache.kafka.clients.producer.KafkaProducer;
| |
| import org.apache.kafka.clients.producer.Producer;
| |
| import org.apache.kafka.clients.producer.ProducerRecord;
| |
| import org.apache.kafka.common.PartitionInfo;
| |
| import org.apache.kafka.common.TopicPartition;
| |
| import org.apache.kafka.common.serialization.StringDeserializer;
| |
| import org.apache.kafka.common.serialization.StringSerializer;
| |
| import org.apache.log4j.Logger;
| |
|
| |
| import java.time.Duration;
| |
| import java.util.*;
| |
|
| |
| public class KAFKA extends BASE {
| |
| public static final String VERSION = "v1.0";
| |
| public static Producer<String, String> producer;
| |
| public static KafkaConsumer<String, String> consumer;
| |
| public static ConsumerRecords<String, String> cr; // set stream
| |
| public static ArrayList<String> topics = new ArrayList<>(); // topic list
| |
| public static HashMap<Long, ArrayList<String>> val = new HashMap<>(); // set result, [offset, value]
| |
| public static String jval = ""; // json set result
| |
| public static int cs = 0; // rows count
| |
| public static boolean status = false; // MQ connect status
| |
| public static int PullMS = 1000; // default pull max 1000 millisecond
| |
| public static int MaxRows = 1000;
| |
| private static String sServ, sTopic, sGroup;
| |
| private static final Properties props = new Properties();
| |
| private static final Logger logger = Logger.getLogger(KAFKA.class);
| |
| public KAFKA(String mqname1, String topic1, String group1, String mq1) {
| |
| // get db jdbc info
| |
| CNF cnf1 = new CNF(mq1);
| |
| sServ = cnf1.get(mqname1); // s1:9092,s2:9092,...
| |
| sTopic = topic1;
| |
| sGroup = group1;
| |
|
| |
| props.put("bootstrap.servers", sServ);
| |
| AdminClient client = KafkaAdminClient.create(props);
| |
| // get topic list
| |
| Set topics1 = null;
| |
| try {
| |
| topics1 = client.listTopics().names().get();
| |
| } catch (Exception e) {
| |
| logger.error(e);
| |
| }
| |
| topics = new ArrayList<>(topics1);
| |
| init();
| |
| }
| |
| public KAFKA(String mqname1) {
| |
| this(mqname1, "test", "test", "db.cnf");
| |
| }
| |
| public static void init() {
| |
| props.put("bootstrap.servers", sServ);
| |
| props.put("group.id", sGroup);
| |
| // producer
| |
| props.put("key.serializer", StringSerializer.class.getName());
| |
| props.put("value.serializer", StringSerializer.class.getName());
| |
| props.put("acks", "all");
| |
| props.put("retries", 0);
| |
| props.put("batch.size", 16384);
| |
| props.put("linger.ms", 1);
| |
| props.put("buffer.memory", 33554432);
| |
| //consumer
| |
| props.put("key.deserializer", StringDeserializer.class.getName());
| |
| props.put("value.deserializer", StringDeserializer.class.getName());
| |
| props.put("enable.auto.commit", "true");
| |
| props.put("auto.commit.interval.ms", "1000");
| |
| props.put("session.timeout.ms", "30000");
| |
| props.put("auto.offset.reset", "earliest");
| |
| props.put("max.poll.records", MaxRows);
| |
| }
| |
| public static void consumer(String topic1, String group1, long offset1) {
| |
| sTopic = topic1;
| |
| sGroup = group1;
| |
| _consumer(topic1, group1, 1);
| |
| reset(offset1);
| |
| }
| |
| public static void consumer(String topic1, String group1) {
| |
| sTopic = topic1;
| |
| sGroup = group1;
| |
| _consumer(topic1, group1, MaxRows);
| |
| }
| |
| public static void get(int rows) {
| |
| if (rows < 1)
| |
| return;
| |
|
| |
| _consumer(sTopic, sGroup, rows);
| |
| _get();
| |
| }
| |
| public static void get() {
| |
| _consumer(sTopic, sGroup, MaxRows);
| |
| _get();
| |
| }
| |
| public static ArrayList getos() {
| |
| //consumer(sTopic, sGroup);
| |
| long osb = 0; // beginOffsets
| |
| long ose = 0; // endOffsets
| |
| long osc = 0; // currentOffsets
| |
| ArrayList los1 = new ArrayList();
| |
| Map<Integer, Long> beginOffsetMap = new HashMap<Integer, Long>();
| |
| Map<Integer, Long> endOffsetMap = new HashMap<Integer, Long>();
| |
| Map<Integer, Long> commitOffsetMap = new HashMap<Integer, Long>();
| |
|
| |
| List<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
| |
| List<PartitionInfo> partitionsFor;
| |
| try {
| |
| partitionsFor = consumer.partitionsFor(sTopic);
| |
| } catch (Exception e) {
| |
| consumer(sTopic, sGroup);;
| |
| partitionsFor = consumer.partitionsFor(sTopic);
| |
| }
| |
| for (PartitionInfo partitionInfo : partitionsFor) {
| |
| TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
| |
| topicPartitions.add(topicPartition);
| |
| }
| |
|
| |
| // beginOffsetMap
| |
| Map<TopicPartition, Long> beginOffsets = consumer.beginningOffsets(topicPartitions);
| |
| for (TopicPartition partitionInfo : beginOffsets.keySet()) {
| |
| beginOffsetMap.put(partitionInfo.partition(), beginOffsets.get(partitionInfo));
| |
| }
| |
| // for (Integer partitionId : beginOffsetMap.keySet()) {
| |
| // logger.info(String.format("topic:%s, partition:%s, logSize:%s", sTopic, partitionId, beginOffsetMap.get(partitionId)));
| |
| // }
| |
| // endOffsetMap
| |
| Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);
| |
| for (TopicPartition partitionInfo : endOffsets.keySet()) {
| |
| endOffsetMap.put(partitionInfo.partition(), endOffsets.get(partitionInfo));
| |
| }
| |
| // for (Integer partitionId : endOffsetMap.keySet()) {
| |
| // logger.info(String.format("topic:%s, partition:%s, logSize:%s", sTopic, partitionId, endOffsetMap.get(partitionId)));
| |
| // }
| |
|
| |
| //commitOffsetMap
| |
| for (TopicPartition topicAndPartition : topicPartitions) {
| |
| OffsetAndMetadata committed = consumer.committed(topicAndPartition);
| |
| commitOffsetMap.put(topicAndPartition.partition(), committed.offset());
| |
| }
| |
|
| |
| //sum lag
| |
| long lagSum = 0;
| |
| osb = 9999999999999999l;
| |
| ose = 0;
| |
| if (endOffsetMap.size() == commitOffsetMap.size()) {
| |
| for (Integer partition : endOffsetMap.keySet()) {
| |
| long beginOffSet = beginOffsetMap.get(partition);
| |
| long endOffSet = endOffsetMap.get(partition);
| |
| long commitOffSet = commitOffsetMap.get(partition);
| |
| long diffOffset = endOffSet - commitOffSet;
| |
| lagSum += diffOffset;
| |
| osb = Math.min(osb, beginOffSet);
| |
| ose = Math.max(ose, endOffSet);
| |
| osc = Math.max(osc, commitOffSet);
| |
| // logger.info("Topic:" + sTopic + ", groupID:" + sGroup + ", partition:" + partition + ", beginOffSet:" + beginOffSet + ", endOffset:" + endOffSet + ", commitOffset:" + commitOffSet + ", diffOffset:" + diffOffset);
| |
| }
| |
| // logger.info("Topic:" + sTopic + ", groupID:" + sGroup + ", LAG:" + lagSum);
| |
| } else {
| |
| logger.error("this topic partitions lost.");
| |
| }
| |
| los1.addAll(Arrays.asList(osb, ose, osc));
| |
| return los1;
| |
| }
| |
| // !!!! traversed all the records, consumer needs to be closed.
| |
| // .consumer.close()
| |
| public static void getStream(int rows) {
| |
| if (rows < 1)
| |
| return;
| |
|
| |
| _consumer(sTopic, sGroup, rows);
| |
|
| |
| cr = consumer.poll(Duration.ofMillis(PullMS));
| |
| cs = cr.count();
| |
| logger.info("Get records: " + cs);
| |
| }
| |
| public static void put(String topic1, ArrayList<String> dat1) {
| |
| producer = new KafkaProducer<>(props);
| |
| for (String s : dat1) {
| |
| producer.send(new ProducerRecord<String, String>(topic1, s));
| |
| }
| |
| producer.close();
| |
| logger.info(String.format("Producer %s records successed.", dat1.size()));
| |
| }
| |
| protected static void _consumer(String topic1, String group1, int records1) {
| |
| // Consumer properties
| |
| sTopic = topic1;
| |
| sGroup = group1;
| |
|
| |
| // Create consumer
| |
| if (status) {
| |
| consumer.close();
| |
| status = false;
| |
| }
| |
| props.put("max.poll.records", records1);
| |
| consumer = new KafkaConsumer(props);
| |
| // Subscribe to topic
| |
| consumer.subscribe(Arrays.asList(sTopic));
| |
| status = true;
| |
| }
| |
| protected static void _get() {
| |
| long i;
| |
| cr = consumer.poll(Duration.ofMillis(PullMS));
| |
| cs = cr.count();
| |
| logger.info("Get records: " + cs);
| |
|
| |
| i = 0;
| |
| ArrayList lrs1;
| |
| for (ConsumerRecord<String, String> record : cr) {
| |
| lrs1 = new ArrayList();
| |
| lrs1.add(record.offset());
| |
| lrs1.add(record.value());
| |
| // lrs1.add(record.key());
| |
| // lrs1.add(record.timestamp());
| |
| val.put(i++, lrs1);
| |
| }
| |
| consumer.close();
| |
| status = false;
| |
| jval = json.toJson(val);
| |
| }
| |
| private static void reset(long offset1) {
| |
| Set<TopicPartition> assignment1 = new HashSet<>();
| |
| while (assignment1.size() == 0) {
| |
| consumer.poll(Duration.ofMillis(100));
| |
| assignment1 = consumer.assignment();
| |
| }
| |
| logger.info("Partition Assignment: " + assignment1);
| |
| Map<TopicPartition, Long> beginOffsets = consumer.beginningOffsets(assignment1);
| |
| Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment1);
| |
|
| |
| for (TopicPartition tp : assignment1) {
| |
| if (offset1 < 0) {
| |
| offset1 = Math.max(beginOffsets.get(tp), endOffsets.get(tp) + offset1 + 1);
| |
| } else {
| |
| offset1 = Math.max(offset1, beginOffsets.get(tp));
| |
| offset1 = Math.min(offset1, endOffsets.get(tp));
| |
| }
| |
| logger.info(beginOffsets.get(tp));
| |
| logger.info(endOffsets.get(tp));
| |
| logger.info("Reset Partition " + tp + " from " + offset1 + " offset consumer.");
| |
| consumer.seek(tp, offset1);
| |
| }
| |
| consumer.close();
| |
| status = false;
| |
| }
| |
| }
| |
|
| |
|
| [[分类:Develop]] | | [[分类:Develop]] |
| [[分类:Java]] | | [[分类:Java]] |
Java 自定义函数 - KAFKA
Producer
producer 是线程安全的。
Producer<String, String> producer
消息发送方式
# 异步,默认方法
producer.send(new ProducerRecord<String, String>(topic1, s));
# 异步,回调
producer.send(new ProducerRecord<String, String>(topic1, s), (RecordMetadata metadata, Exception e) -> {});
# 同步
RecordMetadata metadata = producer.send(new ProducerRecord<String, String>(topic1, s)).get();
消息分区策略
消息键保序(key-ordering )策略:Kafka 中每条消息都可以定义 Key,同一个 Key 的所有消息都进入到相同的分区中。否则采用轮询(Round-robin)、随机策略(Randomness)等策略。
producer.send(new ProducerRecord<String, String>(TOPIC, key, val)
-.OR.-
producer.send(new ProducerRecord<String, String>(TOPIC, val)
集群同步规则
ack 配置项用来控制 producer 要求 leader 确认多少消息后返回调用成功
- 0 不需要等待任何确认消息
- 1 需要等待leader确认
- -1或all 需要全部 ISR 集合返回确认,默认值
ack=-1 # 默认需要全部 ISR 集合返回确认
缓冲区
缓冲一批数据发送或超过延时时间(linger.ms=0)发送,默认 16k。
batch.size=1048576 # 1M
linger.ms=10 # 10ms
消息重发
消息发送错误时设置的系统重发消息次数(retries=2147483647)、重发间隔(retry.backoff.ms=100)。若保证消息的有序性,设置 max_in_flight_requests_per_connection=1。
max_in_flight_requests_per_connection=1
retries=10
retry.backoff.ms=200
压缩方式
发送的所有数据的压缩方式:none(默认), gzip, snappy, lz4, zstd。
compression.type=lz4
其他
enable.idempotence=true # 默认开启幂等性
bootstrap.servers=node1:9092,node2:9092,... # 并非需要所有的broker地址,生产者可以从给定的 broker 里查找到其他 broker 信息
Consumer
设置每次消费最大记录数
默认值为 500 条。
Properties props = new Properties();
props.put("max.poll.records", N);
...
consumer = new KafkaConsumer(props);
props 配置变化,需要 new consumer。
当 N 较大时,需要设置 fetch.max.bytes(默认值 52428800,单位 byte,下同),该参数与 fetch.min.bytes(默认值 1)参数对应,用来配置 Consumer 在一次请求中从 Kafka 中拉取的多批总数据量大小。若一批次的数据大于该值,仍然可以拉取。
获取消费记录
直到取到 max.poll.records 条记录或超过指定时长(如下面语句为 1000 毫秒)。
KafkaConsumer<String, String> consumer;
consumer.poll(Duration.ofMillis(1000));
常见问题
- Cannot invoke "org.apache.kafka.clients.consumer.OffsetAndMetadata.offset()" because "committed" is null
在下列语句中可能会遇到此类问题,原因是该 topic & group 从未被消费过。
OffsetAndMetadata committed = consumer.committed(topicAndPartition);
commitOffsetMap.put(topicAndPartition.partition(), committed.offset());
需要判断 committed==null,committed.offset()替换为零。
Sample
No
|
Method
|
Explain
|
Example
|
|
KAFKA
|
定义 kafka,默认从 db.cnf 中取 default
|
KAFKA k1 = new KAFKA("kafkap182", "db.cnf")
|
1
|
reset
|
重设消费位置
|
k1.reset("test", "test", N) # 从N开始消费
|
2
|
get
|
获取记录
|
k1.get(topic1, group1) # 默认条数,500
k1.getStream(topic1, group1, N) # 流 -> ConsumerRecords<String, String> cr
|
3
|
put
|
生产记录
|
k1.put("test", ldat1) # ArrayList<String> ldat1
|
4
|
getTopics
|
获取集群上 topic 列表
|
ArrayList<String> l1 = k1.getTopics()
|
5
|
getos
|
begin, end, offset
|
k1.getos(topic1, group1) # [0, 16, 8]
|
GIT
https://github.com/ldscfe/devudefj2/blob/main/src/main/com/udf/KAFKA.java