Java 自定义方法 - KAFKA:修订间差异

来自牛奶河Wiki
跳到导航 跳到搜索
第80行: 第80行:
|KAFKA
|KAFKA
|定义 kafka,默认从 db.cnf 中取 default
|定义 kafka,默认从 db.cnf 中取 default
|KAFKA k1 = new KAFKA("kafkap182", "test", "test", "db.cnf")
|KAFKA k1 = new KAFKA("kafkap182", "db.cnf")
|-
|-
|1
|1
|consumer
|reset
|改变消费模式
|重设消费位置
|k1.consumer("test", "test", N)  # 从N开始消费
|k1.reset("test", "test", N)  # 从N开始消费
|-
|-
|2
|2
|get
|get
|获取记录
|获取记录
|k1.get(9)     # 指定条数
|k1.get(topic1, group1)       # 默认条数,500
k1.get()       # 默认条数,1000
k1.getStream(topic1, group1, N) # 流 -> ConsumerRecords<String, String> cr
|-
|-
|3
|3
|getos
|begin, end, offset
|k1.getos()    # [0, 16, 8]
|-
|4
|put
|put
|生产记录
|生产记录
|k1.put("test", ldat1)    # ArrayList<String> ldat1
|k1.put("test", ldat1)    # ArrayList<String> ldat1
|-
|4
|getTopics
|获取集群上 topic 列表
|ArrayList<String> l1 = k1.getTopics()
|-
|5
|getos
|begin, end, offset
|k1.getos(topic1, group1)    # [0, 16, 8]
|}
|}
  package com.udf.base;
  package com.udf.base;
  /*
  /*
第113行: 第119行:
   ---------  ----------  ---------------  ------------------------------------
   ---------  ----------  ---------------  ------------------------------------
   1.0        2024/3/5    Adam            Create.
   1.0        2024/3/5    Adam            Create.
  1.2        2024/3/8    Adam            putsync, putcb
   
   
   format:
   format:
     property: consumer
     property: consumer
     method  : get, put
     method  : get, put, reset
   
   
         <!--Kafka-->
         <!--Kafka-->
第147行: 第154行:
  import org.apache.kafka.clients.producer.Producer;
  import org.apache.kafka.clients.producer.Producer;
  import org.apache.kafka.clients.producer.ProducerRecord;
  import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
  import org.apache.kafka.common.PartitionInfo;
  import org.apache.kafka.common.PartitionInfo;
  import org.apache.kafka.common.TopicPartition;
  import org.apache.kafka.common.TopicPartition;
第165行: 第173行:
     public static String jval = "";                                  // json set result
     public static String jval = "";                                  // json set result
     public static int cs = 0;                                        // rows count
     public static int cs = 0;                                        // rows count
    public static boolean status = false;                            // MQ connect status
     public static int PullMS = 1000;                                // default pull max 1000 millisecond
     public static int PullMS = 1000;                                // default pull max 1000 millisecond
     public static int MaxRows = 1000;
     public static int MaxRows = 500;
    private static String sServ, sTopic, sGroup;
     private static final Properties props = new Properties();
     private static final Properties props = new Properties();
     private static final Logger logger = Logger.getLogger(KAFKA.class);
     private static final Logger logger = Logger.getLogger(KAFKA.class);
     public KAFKA(String mqname1, String topic1, String group1, String mq1) {
     public KAFKA(String mqname1, String mq1) {
         // get db jdbc info
         // get db jdbc info
         CNF cnf1 = new CNF(mq1);
         CNF cnf1 = new CNF(mq1);
         sServ = cnf1.get(mqname1);      // s1:9092,s2:9092,...
         String serv1 = cnf1.get(mqname1 + ".host");      // s1:9092,s2:9092,...
        sTopic = topic1;
        sGroup = group1;
   
   
         props.put("bootstrap.servers", sServ);
         _init(serv1);    // props init
         AdminClient client = KafkaAdminClient.create(props);
    }
    public KAFKA(String mqname1) {
        this(mqname1, "db.cnf");
    }
    public static ArrayList getTopics() {
         AdminClient client1 = KafkaAdminClient.create(props);
         // get topic list
         // get topic list
         Set topics1 = null;
         Set topics1 = null;
         try {
         try {
             topics1 = client.listTopics().names().get();
             topics1 = client1.listTopics().names().get();
         } catch (Exception e) {
         } catch (Exception e) {
             logger.error(e);
             logger.error(e);
         }
         }
         topics = new ArrayList<>(topics1);
         client1.close();
         init();
         return new ArrayList<String>(topics1);
    }
    public KAFKA(String mqname1) {
        this(mqname1, "test", "test", "db.cnf");
    }
    public static void init() {
        props.put("bootstrap.servers", sServ);
        props.put("group.id", sGroup);
        // producer
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        //consumer
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("auto.offset.reset", "earliest");
        props.put("max.poll.records", MaxRows);
     }
     }
     public static void consumer(String topic1, String group1, long offset1) {
     public static ArrayList getos(String topic1, String group1) {
        sTopic = topic1;
        sGroup = group1;
         _consumer(topic1, group1, 1);
         _consumer(topic1, group1, 1);
        reset(offset1);
    }
    public static void consumer(String topic1, String group1) {
        sTopic = topic1;
        sGroup = group1;
        _consumer(topic1, group1, MaxRows);
    }
    public static void get(int rows) {
        if (rows < 1)
            return;
   
   
        _consumer(sTopic, sGroup, rows);
        _get();
    }
    public static void get() {
        _consumer(sTopic, sGroup, MaxRows);
        _get();
    }
    public static ArrayList getos() {
        //consumer(sTopic, sGroup);
         long osb = 0;                                    // beginOffsets
         long osb = 0;                                    // beginOffsets
         long ose = 0;                                    // endOffsets
         long ose = 0;                                    // endOffsets
第247行: 第212行:
         List<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
         List<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
         List<PartitionInfo> partitionsFor;
         List<PartitionInfo> partitionsFor;
        try {
            partitionsFor = consumer.partitionsFor(sTopic);
         partitionsFor = consumer.partitionsFor(topic1);
         } catch (Exception e) {
            consumer(sTopic, sGroup);;
            partitionsFor = consumer.partitionsFor(sTopic);
        }
         for (PartitionInfo partitionInfo : partitionsFor) {
         for (PartitionInfo partitionInfo : partitionsFor) {
             TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
             TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
第263行: 第225行:
             beginOffsetMap.put(partitionInfo.partition(), beginOffsets.get(partitionInfo));
             beginOffsetMap.put(partitionInfo.partition(), beginOffsets.get(partitionInfo));
         }
         }
//        for (Integer partitionId : beginOffsetMap.keySet()) {
//            logger.info(String.format("topic:%s, partition:%s, logSize:%s", sTopic, partitionId, beginOffsetMap.get(partitionId)));
//        }
         // endOffsetMap
         // endOffsetMap
         Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);
         Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);
第271行: 第230行:
             endOffsetMap.put(partitionInfo.partition(), endOffsets.get(partitionInfo));
             endOffsetMap.put(partitionInfo.partition(), endOffsets.get(partitionInfo));
         }
         }
//        for (Integer partitionId : endOffsetMap.keySet()) {
//            logger.info(String.format("topic:%s, partition:%s, logSize:%s", sTopic, partitionId, endOffsetMap.get(partitionId)));
//        }
   
   
         //commitOffsetMap
         //commitOffsetMap
         for (TopicPartition topicAndPartition : topicPartitions) {
         for (TopicPartition topicAndPartition : topicPartitions) {
             OffsetAndMetadata committed = consumer.committed(topicAndPartition);
             OffsetAndMetadata committed = consumer.committed(topicAndPartition);
             commitOffsetMap.put(topicAndPartition.partition(), committed.offset());
             if (committed==null)
                commitOffsetMap.put(topicAndPartition.partition(), 0l);
            else
                commitOffsetMap.put(topicAndPartition.partition(), committed.offset());
         }
         }
   
   
第295行: 第254行:
                 ose = Math.max(ose, endOffSet);
                 ose = Math.max(ose, endOffSet);
                 osc = Math.max(osc, commitOffSet);
                 osc = Math.max(osc, commitOffSet);
//                logger.info("Topic:" + sTopic + ", groupID:" + sGroup + ", partition:" + partition + ", beginOffSet:" + beginOffSet + ", endOffset:" +  endOffSet + ", commitOffset:" + commitOffSet + ", diffOffset:" + diffOffset);
             }
             }
//            logger.info("Topic:" + sTopic + ", groupID:" + sGroup + ", LAG:" + lagSum);
         } else {
         } else {
             logger.error("this topic partitions lost.");
             logger.error("this topic partitions lost.");
         }
         }
         los1.addAll(Arrays.asList(osb, ose, osc));
         los1.addAll(Arrays.asList(osb, ose, osc));
        _close();
         return los1;
         return los1;
     }
     }
     // !!!! traversed all the records, consumer needs to be closed.
     // !!!! traversed all the records(cr), consumer needs to be closed.
     // .consumer.close()
     // .close()
     public static void getStream(int rows) {
     public static void getStream(String topic1, String group1, int records1) {
         if (rows < 1)
         if (records1 < 1)
             return;
             return;
   
   
         _consumer(sTopic, sGroup, rows);
         _consumer(topic1, group1, records1);
   
   
         cr = consumer.poll(Duration.ofMillis(PullMS));
         cr = consumer.poll(Duration.ofMillis(PullMS));
         cs = cr.count();
         cs = cr.count();
         logger.info("Get records: " + cs);
         logger.info("Get records: " + cs);
        //_close();
     }
     }
     public static void put(String topic1, ArrayList<String> dat1) {
     public static void put(String topic1, ArrayList<String> dat1) {
第324行: 第284行:
         logger.info(String.format("Producer %s records successed.", dat1.size()));
         logger.info(String.format("Producer %s records successed.", dat1.size()));
     }
     }
     protected static void _consumer(String topic1, String group1, int records1) {
     public static void putsync(String topic1, ArrayList<String> dat1) {
         // Consumer properties
         producer = new KafkaProducer<>(props);
         sTopic = topic1;
        for (String s : dat1) {
         sGroup = group1;
            try {
                RecordMetadata metadata = producer.send(new ProducerRecord<String, String>(topic1, s)).get();
         // Create consumer
                //logger.info("offset = "+metadata.offset());
         if (status) {
            } catch (Exception e) {
            consumer.close();
                logger.error(e);
             status = false;
                return;
            }
        }
         producer.close();
         logger.info(String.format("Synchronous Producer %s records successed.", dat1.size()));
    }
    public static void putcb(String topic1, ArrayList<String> dat1) {
         producer = new KafkaProducer<>(props);
         for (String s : dat1) {
            producer.send(new ProducerRecord<String, String>(topic1, s), (RecordMetadata metadata, Exception e) -> {
                if (e != null) {
                    logger.error(e);
                }
             });
         }
         }
         props.put("max.poll.records", records1);
         producer.close();
         consumer = new KafkaConsumer(props);
         logger.info(String.format("CallBack Producer %s records successed.", dat1.size()));
        // Subscribe to topic
        consumer.subscribe(Arrays.asList(sTopic));
        status = true;
     }
     }
     protected static void _get() {
     public static void get(String topic1, String group1, int records1) {
        _consumer(topic1, group1, records1);
         long i;
         long i;
         cr = consumer.poll(Duration.ofMillis(PullMS));
         cr = consumer.poll(Duration.ofMillis(PullMS));
         cs = cr.count();
         cs = cr.count();
         logger.info("Get records: " + cs);
         logger.info(String.format("Topic:[%s], Group:[%s], get records: %s", topic1, group1, cs));
   
   
         i = 0;
         i = 0;
第356行: 第328行:
             val.put(i++, lrs1);
             val.put(i++, lrs1);
         }
         }
         consumer.close();
         _close();
        status = false;
         jval = json.toJson(val);
         jval = json.toJson(val);
     }
     }
     private static void reset(long offset1) {
     public static void get(String topic1, String group1) {
        get(topic1, group1, MaxRows);
    }
    public static void reset(String topic1, String group1, long offset1) {
        _consumer(topic1, group1, 1);
         Set<TopicPartition> assignment1 = new HashSet<>();
         Set<TopicPartition> assignment1 = new HashSet<>();
         while (assignment1.size() == 0) {
         while (assignment1.size() == 0) {
第366行: 第342行:
             assignment1 = consumer.assignment();
             assignment1 = consumer.assignment();
         }
         }
         logger.info("Partition Assignment: " + assignment1);
         //logger.info("Partition Assignment: " + assignment1);
         Map<TopicPartition, Long> beginOffsets = consumer.beginningOffsets(assignment1);
         Map<TopicPartition, Long> beginOffsets = consumer.beginningOffsets(assignment1);
         Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment1);
         Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment1);
第377行: 第353行:
                 offset1 = Math.min(offset1, endOffsets.get(tp));
                 offset1 = Math.min(offset1, endOffsets.get(tp));
             }
             }
             logger.info(beginOffsets.get(tp));
             logger.info(String.format("Reset Topic-Partition [%s] offset: [%s,%s]=%s", tp, beginOffsets.get(tp), endOffsets.get(tp), offset1));
            logger.info(endOffsets.get(tp));
            logger.info("Reset Partition " + tp + " from " + offset1 + " offset consumer.");
             consumer.seek(tp, offset1);
             consumer.seek(tp, offset1);
         }
         }
         consumer.close();
         _close();
         status = false;
    }
    protected static void _init(String serv1) {
        props.put("bootstrap.servers", serv1);
        //props.put("group.id", sGroup);
        // producer
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 10);
        props.put("buffer.memory", 33554432);
        //consumer
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("auto.offset.reset", "earliest");
        //props.put("max.poll.records", MaxRows);
    }
    protected static void _consumer(String topic1, String group1, int records1) {
        // Create consumer
        _close();
        props.put("group.id", group1);
        props.put("max.poll.records", records1);
        consumer = new KafkaConsumer(props);
        // Subscribe to topic
        consumer.subscribe(Arrays.asList(topic1));
    }
    protected  static void _close() {
        try {
            consumer.close();
         } catch (Exception e) {
            logger.error("Consumer closed.");
        }
     }
     }
  }
  }


[[分类:Develop]]
[[分类:Develop]]
[[分类:Java]]
[[分类:Java]]

2024年3月8日 (五) 15:14的版本

Java 自定义函数 - KAFKA


Producer

producer 是线程安全的。

Producer<String, String> producer

消息发送方式

# 异步,默认方法
producer.send(new ProducerRecord<String, String>(topic1, s));
# 异步,回调
producer.send(new ProducerRecord<String, String>(topic1, s), (RecordMetadata metadata, Exception e) -> {});
# 同步
RecordMetadata metadata = producer.send(new ProducerRecord<String, String>(topic1, s)).get();

消息分区策略

消息键保序(key-ordering )策略:Kafka 中每条消息都可以定义 Key,同一个 Key 的所有消息都进入到相同的分区中。否则采用轮询(Round-robin)、随机策略(Randomness)等策略。

producer.send(new ProducerRecord<String, String>(TOPIC, key, val)
-.OR.-
producer.send(new ProducerRecord<String, String>(TOPIC, val)

集群同步规则

ack 配置项用来控制 producer 要求 leader 确认多少消息后返回调用成功

  • 0 不需要等待任何确认消息
  • 1 需要等待leader确认
  • -1或all 需要全部 ISR 集合返回确认,默认值
ack=-1         # 默认需要全部 ISR 集合返回确认

缓冲区

缓冲一批数据发送或超过延时时间(linger.ms=0)发送,默认 16k。

batch.size=1048576    # 1M
linger.ms=10          # 10ms

消息重发

消息发送错误时设置的系统重发消息次数(retries=2147483647)、重发间隔(retry.backoff.ms=100)。若保证消息的有序性,设置 max_in_flight_requests_per_connection=1。

max_in_flight_requests_per_connection=1
retries=10
retry.backoff.ms=200

压缩方式

发送的所有数据的压缩方式:none(默认), gzip, snappy, lz4, zstd。

compression.type=lz4

其他

enable.idempotence=true                      # 默认开启幂等性
bootstrap.servers=node1:9092,node2:9092,...  # 并非需要所有的broker地址,生产者可以从给定的 broker 里查找到其他 broker 信息

Consumer

设置每次消费最大记录数

默认值为 500 条。

Properties props = new Properties();
props.put("max.poll.records", N);
...
consumer = new KafkaConsumer(props);

props 配置变化,需要 new consumer。

当 N 较大时,需要设置 fetch.max.bytes(默认值 52428800,单位 byte,下同),该参数与 fetch.min.bytes(默认值 1)参数对应,用来配置 Consumer 在一次请求中从 Kafka 中拉取的多批总数据量大小。若一批次的数据大于该值,仍然可以拉取。

获取消费记录

直到取到 max.poll.records 条记录或超过指定时长(如下面语句为 1000 毫秒)。

KafkaConsumer<String, String> consumer;
consumer.poll(Duration.ofMillis(1000));

常见问题

  • Cannot invoke "org.apache.kafka.clients.consumer.OffsetAndMetadata.offset()" because "committed" is null
在下列语句中可能会遇到此类问题,原因是该 topic & group 从未被消费过。
OffsetAndMetadata committed = consumer.committed(topicAndPartition);
commitOffsetMap.put(topicAndPartition.partition(), committed.offset());
需要判断 committed==null,committed.offset()替换为零。

Sample

No Method Explain Example
KAFKA 定义 kafka,默认从 db.cnf 中取 default KAFKA k1 = new KAFKA("kafkap182", "db.cnf")
1 reset 重设消费位置 k1.reset("test", "test", N) # 从N开始消费
2 get 获取记录 k1.get(topic1, group1) # 默认条数,500

k1.getStream(topic1, group1, N) # 流 -> ConsumerRecords<String, String> cr

3 put 生产记录 k1.put("test", ldat1) # ArrayList<String> ldat1
4 getTopics 获取集群上 topic 列表 ArrayList<String> l1 = k1.getTopics()
5 getos begin, end, offset k1.getos(topic1, group1) # [0, 16, 8]
package com.udf.base;
/*
------------------------------------------------------------------------------
  Name     : Udf.base.KAFKA
  Purpose  : Kafka product & consumer
  Author   : Adam
  Revisions:
  Ver        Date        Author           Description
  ---------  ----------  ---------------  ------------------------------------
  1.0        2024/3/5    Adam             Create.
  1.2        2024/3/8    Adam             putsync, putcb

 format:
    property: consumer
    method  : get, put, reset

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.0.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>1.0.0</version>
        </dependency>

------------------------------------------------------------------------------
*/

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.log4j.Logger;

import java.time.Duration;
import java.util.*;

public class KAFKA extends BASE {
    public static final String VERSION = "v1.0";
    public static Producer<String, String> producer;
    public static KafkaConsumer<String, String> consumer;
    public static ConsumerRecords<String, String> cr;                // set stream
    public static ArrayList<String> topics = new ArrayList<>();      // topic list
    public static HashMap<Long, ArrayList<String>> val = new HashMap<>();  // set result, [offset, value]
    public static String jval = "";                                  // json set result
    public static int cs = 0;                                        // rows count
    public static int PullMS = 1000;                                 // default pull max 1000 millisecond
    public static int MaxRows = 500;
    private static final Properties props = new Properties();
    private static final Logger logger = Logger.getLogger(KAFKA.class);
    public KAFKA(String mqname1, String mq1) {
        // get db jdbc info
        CNF cnf1 = new CNF(mq1);
        String serv1 = cnf1.get(mqname1 + ".host");      // s1:9092,s2:9092,...

        _init(serv1);    // props init
    }
    public KAFKA(String mqname1) {
        this(mqname1, "db.cnf");
    }
    public static ArrayList getTopics() {
        AdminClient client1 = KafkaAdminClient.create(props);
        // get topic list
        Set topics1 = null;
        try {
            topics1 = client1.listTopics().names().get();
        } catch (Exception e) {
            logger.error(e);
        }
        client1.close();
        return new ArrayList<String>(topics1);
    }
    public static ArrayList getos(String topic1, String group1) {
        _consumer(topic1, group1, 1);

        long osb = 0;                                     // beginOffsets
        long ose = 0;                                     // endOffsets
        long osc = 0;                                     // currentOffsets
        ArrayList los1 = new ArrayList();
        Map<Integer, Long> beginOffsetMap = new HashMap<Integer, Long>();
        Map<Integer, Long> endOffsetMap = new HashMap<Integer, Long>();
        Map<Integer, Long> commitOffsetMap = new HashMap<Integer, Long>();

        List<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
        List<PartitionInfo> partitionsFor;

        partitionsFor = consumer.partitionsFor(topic1);

        for (PartitionInfo partitionInfo : partitionsFor) {
            TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
            topicPartitions.add(topicPartition);
        }

        // beginOffsetMap
        Map<TopicPartition, Long> beginOffsets = consumer.beginningOffsets(topicPartitions);
        for (TopicPartition partitionInfo : beginOffsets.keySet()) {
            beginOffsetMap.put(partitionInfo.partition(), beginOffsets.get(partitionInfo));
        }
        // endOffsetMap
        Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);
        for (TopicPartition partitionInfo : endOffsets.keySet()) {
            endOffsetMap.put(partitionInfo.partition(), endOffsets.get(partitionInfo));
        }

        //commitOffsetMap
        for (TopicPartition topicAndPartition : topicPartitions) {
            OffsetAndMetadata committed = consumer.committed(topicAndPartition);
            if (committed==null)
                commitOffsetMap.put(topicAndPartition.partition(), 0l);
            else
                commitOffsetMap.put(topicAndPartition.partition(), committed.offset());
        }

        //sum lag
        long lagSum = 0;
        osb = 9999999999999999l;
        ose = 0;
        if (endOffsetMap.size() == commitOffsetMap.size()) {
            for (Integer partition : endOffsetMap.keySet()) {
                long beginOffSet = beginOffsetMap.get(partition);
                long endOffSet = endOffsetMap.get(partition);
                long commitOffSet = commitOffsetMap.get(partition);
                long diffOffset = endOffSet - commitOffSet;
                lagSum += diffOffset;
                osb = Math.min(osb, beginOffSet);
                ose = Math.max(ose, endOffSet);
                osc = Math.max(osc, commitOffSet);
            }
        } else {
            logger.error("this topic partitions lost.");
        }
        los1.addAll(Arrays.asList(osb, ose, osc));
        _close();
        return los1;
    }
    // !!!! traversed all the records(cr), consumer needs to be closed.
    // .close()
    public static void getStream(String topic1, String group1, int records1) {
        if (records1 < 1)
            return;

        _consumer(topic1, group1, records1);

        cr = consumer.poll(Duration.ofMillis(PullMS));
        cs = cr.count();
        logger.info("Get records: " + cs);

        //_close();
    }
    public static void put(String topic1, ArrayList<String> dat1) {
        producer = new KafkaProducer<>(props);
        for (String s : dat1) {
            producer.send(new ProducerRecord<String, String>(topic1, s));
        }
        producer.close();
        logger.info(String.format("Producer %s records successed.", dat1.size()));
    }
    public static void putsync(String topic1, ArrayList<String> dat1) {
        producer = new KafkaProducer<>(props);
        for (String s : dat1) {
            try {
                RecordMetadata metadata = producer.send(new ProducerRecord<String, String>(topic1, s)).get();
                //logger.info("offset = "+metadata.offset());
            } catch (Exception e) {
                logger.error(e);
                return;
            }
        }
        producer.close();
        logger.info(String.format("Synchronous Producer %s records successed.", dat1.size()));
    }
    public static void putcb(String topic1, ArrayList<String> dat1) {
        producer = new KafkaProducer<>(props);
        for (String s : dat1) {
            producer.send(new ProducerRecord<String, String>(topic1, s), (RecordMetadata metadata, Exception e) -> {
                if (e != null) {
                    logger.error(e);
                }
            });
        }
        producer.close();
        logger.info(String.format("CallBack Producer %s records successed.", dat1.size()));
    }
    public static void get(String topic1, String group1, int records1) {
        _consumer(topic1, group1, records1);

        long i;
        cr = consumer.poll(Duration.ofMillis(PullMS));
        cs = cr.count();
        logger.info(String.format("Topic:[%s], Group:[%s], get records: %s", topic1, group1, cs));

        i = 0;
        ArrayList lrs1;
        for (ConsumerRecord<String, String> record : cr) {
            lrs1 = new ArrayList();
            lrs1.add(record.offset());
            lrs1.add(record.value());
//            lrs1.add(record.key());
//            lrs1.add(record.timestamp());
            val.put(i++, lrs1);
        }
        _close();
        jval = json.toJson(val);
    }
    public static void get(String topic1, String group1) {
        get(topic1, group1, MaxRows);
    }
    public static void reset(String topic1, String group1, long offset1) {
        _consumer(topic1, group1, 1);

        Set<TopicPartition> assignment1 = new HashSet<>();
        while (assignment1.size() == 0) {
            consumer.poll(Duration.ofMillis(100));
            assignment1 = consumer.assignment();
        }
        //logger.info("Partition Assignment: " + assignment1);
        Map<TopicPartition, Long> beginOffsets = consumer.beginningOffsets(assignment1);
        Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment1);

        for (TopicPartition tp : assignment1) {
            if (offset1 < 0) {
                offset1 = Math.max(beginOffsets.get(tp), endOffsets.get(tp) + offset1 + 1);
            } else {
                offset1 = Math.max(offset1, beginOffsets.get(tp));
                offset1 = Math.min(offset1, endOffsets.get(tp));
            }
            logger.info(String.format("Reset Topic-Partition [%s] offset: [%s,%s]=%s", tp, beginOffsets.get(tp), endOffsets.get(tp), offset1));
            consumer.seek(tp, offset1);
        }
        _close();
    }
    protected static void _init(String serv1) {
        props.put("bootstrap.servers", serv1);
        //props.put("group.id", sGroup);
        // producer
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 10);
        props.put("buffer.memory", 33554432);
        //consumer
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("auto.offset.reset", "earliest");
        //props.put("max.poll.records", MaxRows);
    }
    protected static void _consumer(String topic1, String group1, int records1) {
        // Create consumer
        _close();
        props.put("group.id", group1);
        props.put("max.poll.records", records1);
        consumer = new KafkaConsumer(props);
        // Subscribe to topic
        consumer.subscribe(Arrays.asList(topic1));
    }
    protected  static void _close() {
        try {
            consumer.close();
        } catch (Exception e) {
            logger.error("Consumer closed.");
        }
    }
}