|
|
第109行: |
第109行: |
| |} | | |} |
|
| |
|
| package com.udf.base;
| | === GIT === |
| /*
| | https://github.com/ldscfe/udefj2/blob/main/src/main/com/udf/KAFKA.java |
| ------------------------------------------------------------------------------
| |
| Name : Udf.base.KAFKA
| |
| Purpose : Kafka product & consumer
| |
| Author : Adam
| |
| Revisions:
| |
| Ver Date Author Description
| |
| --------- ---------- --------------- ------------------------------------
| |
| 1.0 2024/3/5 Adam Create.
| |
| 1.2 2024/3/8 Adam putsync, putcb
| |
|
| |
| format:
| |
| property: consumer
| |
| method : get, put, reset
| |
|
| |
| <!--Kafka-->
| |
| <dependency>
| |
| <groupId>org.apache.kafka</groupId>
| |
| <artifactId>kafka-clients</artifactId>
| |
| <version>2.0.0</version>
| |
| </dependency>
| |
| <dependency>
| |
| <groupId>org.apache.kafka</groupId>
| |
| <artifactId>kafka_2.11</artifactId>
| |
| <version>0.10.0.1</version>
| |
| </dependency>
| |
| <dependency>
| |
| <groupId>org.apache.kafka</groupId>
| |
| <artifactId>kafka-streams</artifactId>
| |
| <version>1.0.0</version>
| |
| </dependency>
| |
|
| |
| ------------------------------------------------------------------------------
| |
| */
| |
|
| |
| import org.apache.kafka.clients.admin.AdminClient;
| |
| import org.apache.kafka.clients.admin.KafkaAdminClient;
| |
| import org.apache.kafka.clients.consumer.ConsumerRecord;
| |
| import org.apache.kafka.clients.consumer.ConsumerRecords;
| |
| import org.apache.kafka.clients.consumer.KafkaConsumer;
| |
| import org.apache.kafka.clients.consumer.OffsetAndMetadata;
| |
| import org.apache.kafka.clients.producer.KafkaProducer;
| |
| import org.apache.kafka.clients.producer.Producer;
| |
| import org.apache.kafka.clients.producer.ProducerRecord;
| |
| import org.apache.kafka.clients.producer.RecordMetadata;
| |
| import org.apache.kafka.common.PartitionInfo;
| |
| import org.apache.kafka.common.TopicPartition;
| |
| import org.apache.kafka.common.serialization.StringDeserializer;
| |
| import org.apache.kafka.common.serialization.StringSerializer;
| |
| import org.apache.log4j.Logger;
| |
|
| |
| import java.time.Duration;
| |
| import java.util.*;
| |
|
| |
| public class KAFKA extends BASE {
| |
| public static final String VERSION = "v1.0";
| |
| public static Producer<String, String> producer;
| |
| public static KafkaConsumer<String, String> consumer;
| |
| public static ConsumerRecords<String, String> cr; // set stream
| |
| public static ArrayList<String> topics = new ArrayList<>(); // topic list
| |
| public static HashMap<Long, ArrayList<String>> val = new HashMap<>(); // set result, [offset, value]
| |
| public static String jval = ""; // json set result
| |
| public static int cs = 0; // rows count
| |
| public static int PullMS = 1000; // default pull max 1000 millisecond
| |
| public static int MaxRows = 500;
| |
| private static final Properties props = new Properties();
| |
| private static final Logger logger = Logger.getLogger(KAFKA.class);
| |
| public KAFKA(String mqname1, String mq1) {
| |
| // get db jdbc info
| |
| CNF cnf1 = new CNF(mq1);
| |
| String serv1 = cnf1.get(mqname1 + ".host"); // s1:9092,s2:9092,...
| |
|
| |
| _init(serv1); // props init
| |
| }
| |
| public KAFKA(String mqname1) {
| |
| this(mqname1, "db.cnf");
| |
| }
| |
| public static ArrayList getTopics() {
| |
| AdminClient client1 = KafkaAdminClient.create(props);
| |
| // get topic list
| |
| Set topics1 = null;
| |
| try {
| |
| topics1 = client1.listTopics().names().get();
| |
| } catch (Exception e) {
| |
| logger.error(e);
| |
| }
| |
| client1.close();
| |
| return new ArrayList<String>(topics1);
| |
| }
| |
| public static ArrayList getos(String topic1, String group1) {
| |
| _consumer(topic1, group1, 1);
| |
|
| |
| long osb = 0; // beginOffsets
| |
| long ose = 0; // endOffsets
| |
| long osc = 0; // currentOffsets
| |
| ArrayList los1 = new ArrayList();
| |
| Map<Integer, Long> beginOffsetMap = new HashMap<Integer, Long>();
| |
| Map<Integer, Long> endOffsetMap = new HashMap<Integer, Long>();
| |
| Map<Integer, Long> commitOffsetMap = new HashMap<Integer, Long>();
| |
|
| |
| List<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
| |
| List<PartitionInfo> partitionsFor;
| |
|
| |
| partitionsFor = consumer.partitionsFor(topic1);
| |
|
| |
| for (PartitionInfo partitionInfo : partitionsFor) {
| |
| TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
| |
| topicPartitions.add(topicPartition);
| |
| }
| |
|
| |
| // beginOffsetMap
| |
| Map<TopicPartition, Long> beginOffsets = consumer.beginningOffsets(topicPartitions);
| |
| for (TopicPartition partitionInfo : beginOffsets.keySet()) {
| |
| beginOffsetMap.put(partitionInfo.partition(), beginOffsets.get(partitionInfo));
| |
| }
| |
| // endOffsetMap
| |
| Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);
| |
| for (TopicPartition partitionInfo : endOffsets.keySet()) {
| |
| endOffsetMap.put(partitionInfo.partition(), endOffsets.get(partitionInfo));
| |
| }
| |
|
| |
| //commitOffsetMap
| |
| for (TopicPartition topicAndPartition : topicPartitions) {
| |
| OffsetAndMetadata committed = consumer.committed(topicAndPartition);
| |
| if (committed==null)
| |
| commitOffsetMap.put(topicAndPartition.partition(), 0l);
| |
| else
| |
| commitOffsetMap.put(topicAndPartition.partition(), committed.offset());
| |
| }
| |
|
| |
| //sum lag
| |
| long lagSum = 0;
| |
| osb = 9999999999999999l;
| |
| ose = 0;
| |
| if (endOffsetMap.size() == commitOffsetMap.size()) {
| |
| for (Integer partition : endOffsetMap.keySet()) {
| |
| long beginOffSet = beginOffsetMap.get(partition);
| |
| long endOffSet = endOffsetMap.get(partition);
| |
| long commitOffSet = commitOffsetMap.get(partition);
| |
| long diffOffset = endOffSet - commitOffSet;
| |
| lagSum += diffOffset;
| |
| osb = Math.min(osb, beginOffSet);
| |
| ose = Math.max(ose, endOffSet);
| |
| osc = Math.max(osc, commitOffSet);
| |
| }
| |
| } else {
| |
| logger.error("this topic partitions lost.");
| |
| }
| |
| los1.addAll(Arrays.asList(osb, ose, osc));
| |
| _close();
| |
| return los1;
| |
| }
| |
| // !!!! traversed all the records(cr), consumer needs to be closed.
| |
| // .close()
| |
| public static void getStream(String topic1, String group1, int records1) {
| |
| if (records1 < 1)
| |
| return;
| |
|
| |
| _consumer(topic1, group1, records1);
| |
|
| |
| cr = consumer.poll(Duration.ofMillis(PullMS));
| |
| cs = cr.count();
| |
| logger.info("Get records: " + cs);
| |
|
| |
| //_close();
| |
| }
| |
| public static void put(String topic1, ArrayList<String> dat1) {
| |
| producer = new KafkaProducer<>(props);
| |
| for (String s : dat1) {
| |
| producer.send(new ProducerRecord<String, String>(topic1, s));
| |
| }
| |
| producer.close();
| |
| logger.info(String.format("Producer %s records successed.", dat1.size()));
| |
| }
| |
| public static void putsync(String topic1, ArrayList<String> dat1) {
| |
| producer = new KafkaProducer<>(props);
| |
| for (String s : dat1) {
| |
| try {
| |
| RecordMetadata metadata = producer.send(new ProducerRecord<String, String>(topic1, s)).get();
| |
| //logger.info("offset = "+metadata.offset());
| |
| } catch (Exception e) {
| |
| logger.error(e);
| |
| return;
| |
| }
| |
| }
| |
| producer.close();
| |
| logger.info(String.format("Synchronous Producer %s records successed.", dat1.size()));
| |
| }
| |
| public static void putcb(String topic1, ArrayList<String> dat1) {
| |
| producer = new KafkaProducer<>(props);
| |
| for (String s : dat1) {
| |
| producer.send(new ProducerRecord<String, String>(topic1, s), (RecordMetadata metadata, Exception e) -> {
| |
| if (e != null) {
| |
| logger.error(e);
| |
| }
| |
| });
| |
| }
| |
| producer.close();
| |
| logger.info(String.format("CallBack Producer %s records successed.", dat1.size()));
| |
| }
| |
| public static void get(String topic1, String group1, int records1) {
| |
| _consumer(topic1, group1, records1);
| |
|
| |
| long i;
| |
| cr = consumer.poll(Duration.ofMillis(PullMS));
| |
| cs = cr.count();
| |
| logger.info(String.format("Topic:[%s], Group:[%s], get records: %s", topic1, group1, cs));
| |
|
| |
| i = 0;
| |
| ArrayList lrs1;
| |
| for (ConsumerRecord<String, String> record : cr) {
| |
| lrs1 = new ArrayList();
| |
| lrs1.add(record.offset());
| |
| lrs1.add(record.value());
| |
| // lrs1.add(record.key());
| |
| // lrs1.add(record.timestamp());
| |
| val.put(i++, lrs1);
| |
| }
| |
| _close();
| |
| jval = json.toJson(val);
| |
| }
| |
| public static void get(String topic1, String group1) {
| |
| get(topic1, group1, MaxRows);
| |
| }
| |
| public static void reset(String topic1, String group1, long offset1) {
| |
| _consumer(topic1, group1, 1);
| |
|
| |
| Set<TopicPartition> assignment1 = new HashSet<>();
| |
| while (assignment1.size() == 0) {
| |
| consumer.poll(Duration.ofMillis(100));
| |
| assignment1 = consumer.assignment();
| |
| }
| |
| //logger.info("Partition Assignment: " + assignment1);
| |
| Map<TopicPartition, Long> beginOffsets = consumer.beginningOffsets(assignment1);
| |
| Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment1);
| |
|
| |
| for (TopicPartition tp : assignment1) {
| |
| if (offset1 < 0) {
| |
| offset1 = Math.max(beginOffsets.get(tp), endOffsets.get(tp) + offset1 + 1);
| |
| } else {
| |
| offset1 = Math.max(offset1, beginOffsets.get(tp));
| |
| offset1 = Math.min(offset1, endOffsets.get(tp));
| |
| }
| |
| logger.info(String.format("Reset Topic-Partition [%s] offset: [%s,%s]=%s", tp, beginOffsets.get(tp), endOffsets.get(tp), offset1));
| |
| consumer.seek(tp, offset1);
| |
| }
| |
| _close();
| |
| }
| |
| protected static void _init(String serv1) {
| |
| props.put("bootstrap.servers", serv1);
| |
| //props.put("group.id", sGroup);
| |
| // producer
| |
| props.put("key.serializer", StringSerializer.class.getName());
| |
| props.put("value.serializer", StringSerializer.class.getName());
| |
| props.put("acks", "all");
| |
| props.put("retries", 0);
| |
| props.put("batch.size", 16384);
| |
| props.put("linger.ms", 10);
| |
| props.put("buffer.memory", 33554432);
| |
| //consumer
| |
| props.put("key.deserializer", StringDeserializer.class.getName());
| |
| props.put("value.deserializer", StringDeserializer.class.getName());
| |
| props.put("enable.auto.commit", "true");
| |
| props.put("auto.commit.interval.ms", "1000");
| |
| props.put("session.timeout.ms", "30000");
| |
| props.put("auto.offset.reset", "earliest");
| |
| //props.put("max.poll.records", MaxRows);
| |
| }
| |
| protected static void _consumer(String topic1, String group1, int records1) {
| |
| // Create consumer
| |
| _close();
| |
| props.put("group.id", group1);
| |
| props.put("max.poll.records", records1);
| |
| consumer = new KafkaConsumer(props);
| |
| // Subscribe to topic
| |
| consumer.subscribe(Arrays.asList(topic1));
| |
| }
| |
| protected static void _close() {
| |
| try {
| |
| consumer.close();
| |
| } catch (Exception e) {
| |
| logger.error("Consumer closed.");
| |
| }
| |
| }
| |
| }
| |
| | |
|
| |
|
| [[分类:Develop]] | | [[分类:Develop]] |
| [[分类:Java]] | | [[分类:Java]] |