|
|
(未显示同一用户的2个中间版本) |
第109行: |
第109行: |
| |} | | |} |
|
| |
|
| package com.udf.base;
| | === GIT === |
| /*
| | https://github.com/ldscfe/devudefj2/blob/main/src/main/com/udf/KAFKA.java |
| ------------------------------------------------------------------------------
| |
| Name : Udf.base.KAFKA
| |
| Purpose : Kafka product & consumer
| |
| Author : Adam
| |
| Revisions:
| |
| Ver Date Author Description
| |
| --------- ---------- --------------- ------------------------------------
| |
| 1.0 2024/3/5 Adam Create.
| |
| 1.2 2024/3/8 Adam putsync, putcb
| |
|
| |
| format:
| |
| property: consumer
| |
| method : get, put, reset
| |
|
| |
| <!--Kafka-->
| |
| <dependency>
| |
| <groupId>org.apache.kafka</groupId>
| |
| <artifactId>kafka-clients</artifactId>
| |
| <version>2.0.0</version>
| |
| </dependency>
| |
| <dependency>
| |
| <groupId>org.apache.kafka</groupId>
| |
| <artifactId>kafka_2.11</artifactId>
| |
| <version>0.10.0.1</version>
| |
| </dependency>
| |
| <dependency>
| |
| <groupId>org.apache.kafka</groupId>
| |
| <artifactId>kafka-streams</artifactId>
| |
| <version>1.0.0</version>
| |
| </dependency>
| |
|
| |
| ------------------------------------------------------------------------------
| |
| */
| |
|
| |
| import org.apache.kafka.clients.admin.AdminClient;
| |
| import org.apache.kafka.clients.admin.KafkaAdminClient;
| |
| import org.apache.kafka.clients.consumer.ConsumerRecord;
| |
| import org.apache.kafka.clients.consumer.ConsumerRecords;
| |
| import org.apache.kafka.clients.consumer.KafkaConsumer;
| |
| import org.apache.kafka.clients.consumer.OffsetAndMetadata;
| |
| import org.apache.kafka.clients.producer.KafkaProducer;
| |
| import org.apache.kafka.clients.producer.Producer;
| |
| import org.apache.kafka.clients.producer.ProducerRecord;
| |
| import org.apache.kafka.clients.producer.RecordMetadata;
| |
| import org.apache.kafka.common.PartitionInfo;
| |
| import org.apache.kafka.common.TopicPartition;
| |
| import org.apache.kafka.common.serialization.StringDeserializer;
| |
| import org.apache.kafka.common.serialization.StringSerializer;
| |
| import org.apache.log4j.Logger;
| |
|
| |
| import java.time.Duration;
| |
| import java.util.*;
| |
|
| |
| public class KAFKA extends BASE {
| |
| public static final String VERSION = "v1.0";
| |
| public static Producer<String, String> producer;
| |
| public static KafkaConsumer<String, String> consumer;
| |
| public static ConsumerRecords<String, String> cr; // set stream
| |
| public static ArrayList<String> topics = new ArrayList<>(); // topic list
| |
| public static HashMap<Long, ArrayList<String>> val = new HashMap<>(); // set result, [offset, value]
| |
| public static String jval = ""; // json set result
| |
| public static int cs = 0; // rows count
| |
| public static int PullMS = 1000; // default pull max 1000 millisecond
| |
| public static int MaxRows = 500;
| |
| private static final Properties props = new Properties();
| |
| private static final Logger logger = Logger.getLogger(KAFKA.class);
| |
| public KAFKA(String mqname1, String mq1) {
| |
| // get db jdbc info
| |
| CNF cnf1 = new CNF(mq1);
| |
| String serv1 = cnf1.get(mqname1 + ".host"); // s1:9092,s2:9092,...
| |
|
| |
| _init(serv1); // props init
| |
| }
| |
| public KAFKA(String mqname1) {
| |
| this(mqname1, "db.cnf");
| |
| }
| |
| public static ArrayList getTopics() {
| |
| AdminClient client1 = KafkaAdminClient.create(props);
| |
| // get topic list
| |
| Set topics1 = null;
| |
| try {
| |
| topics1 = client1.listTopics().names().get();
| |
| } catch (Exception e) {
| |
| logger.error(e);
| |
| }
| |
| client1.close();
| |
| return new ArrayList<String>(topics1);
| |
| }
| |
| public static ArrayList getos(String topic1, String group1) {
| |
| _consumer(topic1, group1, 1);
| |
|
| |
| long osb = 0; // beginOffsets
| |
| long ose = 0; // endOffsets
| |
| long osc = 0; // currentOffsets
| |
| ArrayList los1 = new ArrayList();
| |
| Map<Integer, Long> beginOffsetMap = new HashMap<Integer, Long>();
| |
| Map<Integer, Long> endOffsetMap = new HashMap<Integer, Long>();
| |
| Map<Integer, Long> commitOffsetMap = new HashMap<Integer, Long>();
| |
|
| |
| List<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
| |
| List<PartitionInfo> partitionsFor;
| |
|
| |
| partitionsFor = consumer.partitionsFor(topic1);
| |
|
| |
| for (PartitionInfo partitionInfo : partitionsFor) {
| |
| TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
| |
| topicPartitions.add(topicPartition);
| |
| }
| |
|
| |
| // beginOffsetMap
| |
| Map<TopicPartition, Long> beginOffsets = consumer.beginningOffsets(topicPartitions);
| |
| for (TopicPartition partitionInfo : beginOffsets.keySet()) {
| |
| beginOffsetMap.put(partitionInfo.partition(), beginOffsets.get(partitionInfo));
| |
| }
| |
| // endOffsetMap
| |
| Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);
| |
| for (TopicPartition partitionInfo : endOffsets.keySet()) {
| |
| endOffsetMap.put(partitionInfo.partition(), endOffsets.get(partitionInfo));
| |
| }
| |
|
| |
| //commitOffsetMap
| |
| for (TopicPartition topicAndPartition : topicPartitions) {
| |
| OffsetAndMetadata committed = consumer.committed(topicAndPartition);
| |
| if (committed==null)
| |
| commitOffsetMap.put(topicAndPartition.partition(), 0l);
| |
| else
| |
| commitOffsetMap.put(topicAndPartition.partition(), committed.offset());
| |
| }
| |
|
| |
| //sum lag
| |
| long lagSum = 0;
| |
| osb = 9999999999999999l;
| |
| ose = 0;
| |
| if (endOffsetMap.size() == commitOffsetMap.size()) {
| |
| for (Integer partition : endOffsetMap.keySet()) {
| |
| long beginOffSet = beginOffsetMap.get(partition);
| |
| long endOffSet = endOffsetMap.get(partition);
| |
| long commitOffSet = commitOffsetMap.get(partition);
| |
| long diffOffset = endOffSet - commitOffSet;
| |
| lagSum += diffOffset;
| |
| osb = Math.min(osb, beginOffSet);
| |
| ose = Math.max(ose, endOffSet);
| |
| osc = Math.max(osc, commitOffSet);
| |
| }
| |
| } else {
| |
| logger.error("this topic partitions lost.");
| |
| }
| |
| los1.addAll(Arrays.asList(osb, ose, osc));
| |
| _close();
| |
| return los1;
| |
| }
| |
| // !!!! traversed all the records(cr), consumer needs to be closed.
| |
| // .close()
| |
| public static void getStream(String topic1, String group1, int records1) {
| |
| if (records1 < 1)
| |
| return;
| |
|
| |
| _consumer(topic1, group1, records1);
| |
|
| |
| cr = consumer.poll(Duration.ofMillis(PullMS));
| |
| cs = cr.count();
| |
| logger.info("Get records: " + cs);
| |
|
| |
| //_close();
| |
| }
| |
| public static void put(String topic1, ArrayList<String> dat1) {
| |
| producer = new KafkaProducer<>(props);
| |
| for (String s : dat1) {
| |
| producer.send(new ProducerRecord<String, String>(topic1, s));
| |
| }
| |
| producer.close();
| |
| logger.info(String.format("Producer %s records successed.", dat1.size()));
| |
| }
| |
| public static void putsync(String topic1, ArrayList<String> dat1) {
| |
| producer = new KafkaProducer<>(props);
| |
| for (String s : dat1) {
| |
| try {
| |
| RecordMetadata metadata = producer.send(new ProducerRecord<String, String>(topic1, s)).get();
| |
| //logger.info("offset = "+metadata.offset());
| |
| } catch (Exception e) {
| |
| logger.error(e);
| |
| return;
| |
| }
| |
| }
| |
| producer.close();
| |
| logger.info(String.format("Synchronous Producer %s records successed.", dat1.size()));
| |
| }
| |
| public static void putcb(String topic1, ArrayList<String> dat1) {
| |
| producer = new KafkaProducer<>(props);
| |
| for (String s : dat1) {
| |
| producer.send(new ProducerRecord<String, String>(topic1, s), (RecordMetadata metadata, Exception e) -> {
| |
| if (e != null) {
| |
| logger.error(e);
| |
| }
| |
| });
| |
| }
| |
| producer.close();
| |
| logger.info(String.format("CallBack Producer %s records successed.", dat1.size()));
| |
| }
| |
| public static void get(String topic1, String group1, int records1) {
| |
| _consumer(topic1, group1, records1);
| |
|
| |
| long i;
| |
| cr = consumer.poll(Duration.ofMillis(PullMS));
| |
| cs = cr.count();
| |
| logger.info(String.format("Topic:[%s], Group:[%s], get records: %s", topic1, group1, cs));
| |
|
| |
| i = 0;
| |
| ArrayList lrs1;
| |
| for (ConsumerRecord<String, String> record : cr) {
| |
| lrs1 = new ArrayList();
| |
| lrs1.add(record.offset());
| |
| lrs1.add(record.value());
| |
| // lrs1.add(record.key());
| |
| // lrs1.add(record.timestamp());
| |
| val.put(i++, lrs1);
| |
| }
| |
| _close();
| |
| jval = json.toJson(val);
| |
| }
| |
| public static void get(String topic1, String group1) {
| |
| get(topic1, group1, MaxRows);
| |
| }
| |
| public static void reset(String topic1, String group1, long offset1) {
| |
| _consumer(topic1, group1, 1);
| |
|
| |
| Set<TopicPartition> assignment1 = new HashSet<>();
| |
| while (assignment1.size() == 0) {
| |
| consumer.poll(Duration.ofMillis(100));
| |
| assignment1 = consumer.assignment();
| |
| }
| |
| //logger.info("Partition Assignment: " + assignment1);
| |
| Map<TopicPartition, Long> beginOffsets = consumer.beginningOffsets(assignment1);
| |
| Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment1);
| |
|
| |
| for (TopicPartition tp : assignment1) {
| |
| if (offset1 < 0) {
| |
| offset1 = Math.max(beginOffsets.get(tp), endOffsets.get(tp) + offset1 + 1);
| |
| } else {
| |
| offset1 = Math.max(offset1, beginOffsets.get(tp));
| |
| offset1 = Math.min(offset1, endOffsets.get(tp));
| |
| }
| |
| logger.info(String.format("Reset Topic-Partition [%s] offset: [%s,%s]=%s", tp, beginOffsets.get(tp), endOffsets.get(tp), offset1));
| |
| consumer.seek(tp, offset1);
| |
| }
| |
| _close();
| |
| }
| |
| protected static void _init(String serv1) {
| |
| props.put("bootstrap.servers", serv1);
| |
| //props.put("group.id", sGroup);
| |
| // producer
| |
| props.put("key.serializer", StringSerializer.class.getName());
| |
| props.put("value.serializer", StringSerializer.class.getName());
| |
| props.put("acks", "all");
| |
| props.put("retries", 0);
| |
| props.put("batch.size", 16384);
| |
| props.put("linger.ms", 10);
| |
| props.put("buffer.memory", 33554432);
| |
| //consumer
| |
| props.put("key.deserializer", StringDeserializer.class.getName());
| |
| props.put("value.deserializer", StringDeserializer.class.getName());
| |
| props.put("enable.auto.commit", "true");
| |
| props.put("auto.commit.interval.ms", "1000");
| |
| props.put("session.timeout.ms", "30000");
| |
| props.put("auto.offset.reset", "earliest");
| |
| //props.put("max.poll.records", MaxRows);
| |
| }
| |
| protected static void _consumer(String topic1, String group1, int records1) {
| |
| // Create consumer
| |
| _close();
| |
| props.put("group.id", group1);
| |
| props.put("max.poll.records", records1);
| |
| consumer = new KafkaConsumer(props);
| |
| // Subscribe to topic
| |
| consumer.subscribe(Arrays.asList(topic1));
| |
| }
| |
| protected static void _close() {
| |
| try {
| |
| consumer.close();
| |
| } catch (Exception e) {
| |
| logger.error("Consumer closed.");
| |
| }
| |
| }
| |
| }
| |
| | |
|
| |
|
| [[分类:Develop]] | | [[分类:Develop]] |
| [[分类:Java]] | | [[分类:Java]] |
Java 自定义函数 - KAFKA
Producer
producer 是线程安全的。
Producer<String, String> producer
消息发送方式
# 异步,默认方法
producer.send(new ProducerRecord<String, String>(topic1, s));
# 异步,回调
producer.send(new ProducerRecord<String, String>(topic1, s), (RecordMetadata metadata, Exception e) -> {});
# 同步
RecordMetadata metadata = producer.send(new ProducerRecord<String, String>(topic1, s)).get();
消息分区策略
消息键保序(key-ordering )策略:Kafka 中每条消息都可以定义 Key,同一个 Key 的所有消息都进入到相同的分区中。否则采用轮询(Round-robin)、随机策略(Randomness)等策略。
producer.send(new ProducerRecord<String, String>(TOPIC, key, val)
-.OR.-
producer.send(new ProducerRecord<String, String>(TOPIC, val)
集群同步规则
ack 配置项用来控制 producer 要求 leader 确认多少消息后返回调用成功
- 0 不需要等待任何确认消息
- 1 需要等待leader确认
- -1或all 需要全部 ISR 集合返回确认,默认值
ack=-1 # 默认需要全部 ISR 集合返回确认
缓冲区
缓冲一批数据发送或超过延时时间(linger.ms=0)发送,默认 16k。
batch.size=1048576 # 1M
linger.ms=10 # 10ms
消息重发
消息发送错误时设置的系统重发消息次数(retries=2147483647)、重发间隔(retry.backoff.ms=100)。若保证消息的有序性,设置 max_in_flight_requests_per_connection=1。
max_in_flight_requests_per_connection=1
retries=10
retry.backoff.ms=200
压缩方式
发送的所有数据的压缩方式:none(默认), gzip, snappy, lz4, zstd。
compression.type=lz4
其他
enable.idempotence=true # 默认开启幂等性
bootstrap.servers=node1:9092,node2:9092,... # 并非需要所有的broker地址,生产者可以从给定的 broker 里查找到其他 broker 信息
Consumer
设置每次消费最大记录数
默认值为 500 条。
Properties props = new Properties();
props.put("max.poll.records", N);
...
consumer = new KafkaConsumer(props);
props 配置变化,需要 new consumer。
当 N 较大时,需要设置 fetch.max.bytes(默认值 52428800,单位 byte,下同),该参数与 fetch.min.bytes(默认值 1)参数对应,用来配置 Consumer 在一次请求中从 Kafka 中拉取的多批总数据量大小。若一批次的数据大于该值,仍然可以拉取。
获取消费记录
直到取到 max.poll.records 条记录或超过指定时长(如下面语句为 1000 毫秒)。
KafkaConsumer<String, String> consumer;
consumer.poll(Duration.ofMillis(1000));
常见问题
- Cannot invoke "org.apache.kafka.clients.consumer.OffsetAndMetadata.offset()" because "committed" is null
在下列语句中可能会遇到此类问题,原因是该 topic & group 从未被消费过。
OffsetAndMetadata committed = consumer.committed(topicAndPartition);
commitOffsetMap.put(topicAndPartition.partition(), committed.offset());
需要判断 committed==null,committed.offset()替换为零。
Sample
No
|
Method
|
Explain
|
Example
|
|
KAFKA
|
定义 kafka,默认从 db.cnf 中取 default
|
KAFKA k1 = new KAFKA("kafkap182", "db.cnf")
|
1
|
reset
|
重设消费位置
|
k1.reset("test", "test", N) # 从N开始消费
|
2
|
get
|
获取记录
|
k1.get(topic1, group1) # 默认条数,500
k1.getStream(topic1, group1, N) # 流 -> ConsumerRecords<String, String> cr
|
3
|
put
|
生产记录
|
k1.put("test", ldat1) # ArrayList<String> ldat1
|
4
|
getTopics
|
获取集群上 topic 列表
|
ArrayList<String> l1 = k1.getTopics()
|
5
|
getos
|
begin, end, offset
|
k1.getos(topic1, group1) # [0, 16, 8]
|
GIT
https://github.com/ldscfe/devudefj2/blob/main/src/main/com/udf/KAFKA.java