Java 自定义方法 - KAFKA:修订间差异
跳到导航
跳到搜索
无编辑摘要 |
|||
第31行: | 第31行: | ||
consumer.poll(Duration.ofMillis(1000)); | consumer.poll(Duration.ofMillis(1000)); | ||
====Sample==== | |||
{| class="wikitable" | |||
|+ | |||
!No | |||
!Method | |||
!Explain | |||
!Example | |||
|- | |||
| | |||
|KAFKA | |||
|定义 kafka,默认从 db.cnf 中取 default | |||
|KAFKA k1 = new KAFKA("kafkap182", "test", "test", "db.cnf") | |||
|- | |||
|1 | |||
|consumer | |||
|改变消费模式 | |||
|k1.consumer("test", "test", N) # 从N开始消费 | |||
|- | |||
|2 | |||
|get | |||
|获取记录 | |||
|k1.get(9) # 指定条数 | |||
k1.get() # 默认条数,1000 | |||
|- | |||
|3 | |||
|getos | |||
|begin, end, offset | |||
|k1.getos() # [0, 16, 8] | |||
|- | |||
|4 | |||
|put | |||
|生产记录 | |||
|k1.put("test", ldat1) # ArrayList<String> ldat1 | |||
|} | |||
package com.udf.base; | |||
/* | |||
------------------------------------------------------------------------------ | |||
Name : Udf.base.KAFKA | |||
Purpose : Kafka product & consumer | |||
Author : Adam | |||
Revisions: | |||
Ver Date Author Description | |||
--------- ---------- --------------- ------------------------------------ | |||
1.0 2024/3/5 Adam Create. | |||
format: | |||
property: consumer | |||
method : get, put | |||
<!--Kafka--> | |||
<dependency> | |||
<groupId>org.apache.kafka</groupId> | |||
<artifactId>kafka-clients</artifactId> | |||
<version>2.0.0</version> | |||
</dependency> | |||
<dependency> | |||
<groupId>org.apache.kafka</groupId> | |||
<artifactId>kafka_2.11</artifactId> | |||
<version>0.10.0.1</version> | |||
</dependency> | |||
<dependency> | |||
<groupId>org.apache.kafka</groupId> | |||
<artifactId>kafka-streams</artifactId> | |||
<version>1.0.0</version> | |||
</dependency> | |||
------------------------------------------------------------------------------ | |||
*/ | |||
import org.apache.kafka.clients.admin.AdminClient; | |||
import org.apache.kafka.clients.admin.KafkaAdminClient; | |||
import org.apache.kafka.clients.consumer.ConsumerRecord; | |||
import org.apache.kafka.clients.consumer.ConsumerRecords; | |||
import org.apache.kafka.clients.consumer.KafkaConsumer; | |||
import org.apache.kafka.clients.consumer.OffsetAndMetadata; | |||
import org.apache.kafka.clients.producer.KafkaProducer; | |||
import org.apache.kafka.clients.producer.Producer; | |||
import org.apache.kafka.clients.producer.ProducerRecord; | |||
import org.apache.kafka.common.PartitionInfo; | |||
import org.apache.kafka.common.TopicPartition; | |||
import org.apache.kafka.common.serialization.StringDeserializer; | |||
import org.apache.kafka.common.serialization.StringSerializer; | |||
import org.apache.log4j.Logger; | |||
import java.time.Duration; | |||
import java.util.*; | |||
public class KAFKA extends BASE { | |||
public static final String VERSION = "v1.0"; | |||
public static Producer<String, String> producer; | |||
public static KafkaConsumer<String, String> consumer; | |||
public static ConsumerRecords<String, String> cr; // set stream | |||
public static ArrayList<String> topics = new ArrayList<>(); // topic list | |||
public static HashMap<Long, ArrayList<String>> val = new HashMap<>(); // set result, [offset, value] | |||
public static String jval = ""; // json set result | |||
public static int cs = 0; // rows count | |||
public static boolean status = false; // MQ connect status | |||
public static int PullMS = 1000; // default pull max 1000 millisecond | |||
public static int MaxRows = 1000; | |||
private static String sServ, sTopic, sGroup; | |||
private static final Properties props = new Properties(); | |||
private static final Logger logger = Logger.getLogger(KAFKA.class); | |||
public KAFKA(String mqname1, String topic1, String group1, String mq1) { | |||
// get db jdbc info | |||
CNF cnf1 = new CNF(mq1); | |||
sServ = cnf1.get(mqname1); // s1:9092,s2:9092,... | |||
sTopic = topic1; | |||
sGroup = group1; | |||
props.put("bootstrap.servers", sServ); | |||
AdminClient client = KafkaAdminClient.create(props); | |||
// get topic list | |||
Set topics1 = null; | |||
try { | |||
topics1 = client.listTopics().names().get(); | |||
} catch (Exception e) { | |||
logger.error(e); | |||
} | |||
topics = new ArrayList<>(topics1); | |||
init(); | |||
} | |||
public KAFKA(String mqname1) { | |||
this(mqname1, "test", "test", "db.cnf"); | |||
} | |||
public static void init() { | |||
props.put("bootstrap.servers", sServ); | |||
props.put("group.id", sGroup); | |||
// producer | |||
props.put("key.serializer", StringSerializer.class.getName()); | |||
props.put("value.serializer", StringSerializer.class.getName()); | |||
props.put("acks", "all"); | |||
props.put("retries", 0); | |||
props.put("batch.size", 16384); | |||
props.put("linger.ms", 1); | |||
props.put("buffer.memory", 33554432); | |||
//consumer | |||
props.put("key.deserializer", StringDeserializer.class.getName()); | |||
props.put("value.deserializer", StringDeserializer.class.getName()); | |||
props.put("enable.auto.commit", "true"); | |||
props.put("auto.commit.interval.ms", "1000"); | |||
props.put("session.timeout.ms", "30000"); | |||
props.put("auto.offset.reset", "earliest"); | |||
props.put("max.poll.records", MaxRows); | |||
} | |||
public static void consumer(String topic1, String group1, long offset1) { | |||
sTopic = topic1; | |||
sGroup = group1; | |||
_consumer(topic1, group1, 1); | |||
reset(offset1); | |||
} | |||
public static void consumer(String topic1, String group1) { | |||
sTopic = topic1; | |||
sGroup = group1; | |||
_consumer(topic1, group1, MaxRows); | |||
} | |||
public static void get(int rows) { | |||
if (rows < 1) | |||
return; | |||
_consumer(sTopic, sGroup, rows); | |||
_get(); | |||
} | |||
public static void get() { | |||
_consumer(sTopic, sGroup, MaxRows); | |||
_get(); | |||
} | |||
public static ArrayList getos() { | |||
//consumer(sTopic, sGroup); | |||
long osb = 0; // beginOffsets | |||
long ose = 0; // endOffsets | |||
long osc = 0; // currentOffsets | |||
ArrayList los1 = new ArrayList(); | |||
Map<Integer, Long> beginOffsetMap = new HashMap<Integer, Long>(); | |||
Map<Integer, Long> endOffsetMap = new HashMap<Integer, Long>(); | |||
Map<Integer, Long> commitOffsetMap = new HashMap<Integer, Long>(); | |||
List<TopicPartition> topicPartitions = new ArrayList<TopicPartition>(); | |||
List<PartitionInfo> partitionsFor; | |||
try { | |||
partitionsFor = consumer.partitionsFor(sTopic); | |||
} catch (Exception e) { | |||
consumer(sTopic, sGroup);; | |||
partitionsFor = consumer.partitionsFor(sTopic); | |||
} | |||
for (PartitionInfo partitionInfo : partitionsFor) { | |||
TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition()); | |||
topicPartitions.add(topicPartition); | |||
} | |||
// beginOffsetMap | |||
Map<TopicPartition, Long> beginOffsets = consumer.beginningOffsets(topicPartitions); | |||
for (TopicPartition partitionInfo : beginOffsets.keySet()) { | |||
beginOffsetMap.put(partitionInfo.partition(), beginOffsets.get(partitionInfo)); | |||
} | |||
// for (Integer partitionId : beginOffsetMap.keySet()) { | |||
// logger.info(String.format("topic:%s, partition:%s, logSize:%s", sTopic, partitionId, beginOffsetMap.get(partitionId))); | |||
// } | |||
// endOffsetMap | |||
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions); | |||
for (TopicPartition partitionInfo : endOffsets.keySet()) { | |||
endOffsetMap.put(partitionInfo.partition(), endOffsets.get(partitionInfo)); | |||
} | |||
// for (Integer partitionId : endOffsetMap.keySet()) { | |||
// logger.info(String.format("topic:%s, partition:%s, logSize:%s", sTopic, partitionId, endOffsetMap.get(partitionId))); | |||
// } | |||
//commitOffsetMap | |||
for (TopicPartition topicAndPartition : topicPartitions) { | |||
OffsetAndMetadata committed = consumer.committed(topicAndPartition); | |||
commitOffsetMap.put(topicAndPartition.partition(), committed.offset()); | |||
} | |||
//sum lag | |||
long lagSum = 0; | |||
osb = 9999999999999999l; | |||
ose = 0; | |||
if (endOffsetMap.size() == commitOffsetMap.size()) { | |||
for (Integer partition : endOffsetMap.keySet()) { | |||
long beginOffSet = beginOffsetMap.get(partition); | |||
long endOffSet = endOffsetMap.get(partition); | |||
long commitOffSet = commitOffsetMap.get(partition); | |||
long diffOffset = endOffSet - commitOffSet; | |||
lagSum += diffOffset; | |||
osb = Math.min(osb, beginOffSet); | |||
ose = Math.max(ose, endOffSet); | |||
osc = Math.max(osc, commitOffSet); | |||
// logger.info("Topic:" + sTopic + ", groupID:" + sGroup + ", partition:" + partition + ", beginOffSet:" + beginOffSet + ", endOffset:" + endOffSet + ", commitOffset:" + commitOffSet + ", diffOffset:" + diffOffset); | |||
} | |||
// logger.info("Topic:" + sTopic + ", groupID:" + sGroup + ", LAG:" + lagSum); | |||
} else { | |||
logger.error("this topic partitions lost."); | |||
} | |||
los1.addAll(Arrays.asList(osb, ose, osc)); | |||
return los1; | |||
} | |||
// !!!! traversed all the records, consumer needs to be closed. | |||
// .consumer.close() | |||
public static void getStream(int rows) { | |||
if (rows < 1) | |||
return; | |||
_consumer(sTopic, sGroup, rows); | |||
cr = consumer.poll(Duration.ofMillis(PullMS)); | |||
cs = cr.count(); | |||
logger.info("Get records: " + cs); | |||
} | |||
public static void put(String topic1, ArrayList<String> dat1) { | |||
producer = new KafkaProducer<>(props); | |||
for (String s : dat1) { | |||
producer.send(new ProducerRecord<String, String>(topic1, s)); | |||
} | |||
producer.close(); | |||
logger.info(String.format("Producer %s records successed.", dat1.size())); | |||
} | |||
protected static void _consumer(String topic1, String group1, int records1) { | |||
// Consumer properties | |||
sTopic = topic1; | |||
sGroup = group1; | |||
// Create consumer | |||
if (status) { | |||
consumer.close(); | |||
status = false; | |||
} | |||
props.put("max.poll.records", records1); | |||
consumer = new KafkaConsumer(props); | |||
// Subscribe to topic | |||
consumer.subscribe(Arrays.asList(sTopic)); | |||
status = true; | |||
} | |||
protected static void _get() { | |||
long i; | |||
cr = consumer.poll(Duration.ofMillis(PullMS)); | |||
cs = cr.count(); | |||
logger.info("Get records: " + cs); | |||
i = 0; | |||
ArrayList lrs1; | |||
for (ConsumerRecord<String, String> record : cr) { | |||
lrs1 = new ArrayList(); | |||
lrs1.add(record.offset()); | |||
lrs1.add(record.value()); | |||
// lrs1.add(record.key()); | |||
// lrs1.add(record.timestamp()); | |||
val.put(i++, lrs1); | |||
} | |||
consumer.close(); | |||
status = false; | |||
jval = json.toJson(val); | |||
} | |||
private static void reset(long offset1) { | |||
Set<TopicPartition> assignment1 = new HashSet<>(); | |||
while (assignment1.size() == 0) { | |||
consumer.poll(Duration.ofMillis(100)); | |||
assignment1 = consumer.assignment(); | |||
} | |||
logger.info("Partition Assignment: " + assignment1); | |||
Map<TopicPartition, Long> beginOffsets = consumer.beginningOffsets(assignment1); | |||
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment1); | |||
for (TopicPartition tp : assignment1) { | |||
if (offset1 < 0) { | |||
offset1 = Math.max(beginOffsets.get(tp), endOffsets.get(tp) + offset1 + 1); | |||
} else { | |||
offset1 = Math.max(offset1, beginOffsets.get(tp)); | |||
offset1 = Math.min(offset1, endOffsets.get(tp)); | |||
} | |||
logger.info(beginOffsets.get(tp)); | |||
logger.info(endOffsets.get(tp)); | |||
logger.info("Reset Partition " + tp + " from " + offset1 + " offset consumer."); | |||
consumer.seek(tp, offset1); | |||
} | |||
consumer.close(); | |||
status = false; | |||
} | |||
} | |||
[[分类:Develop]] | [[分类:Develop]] | ||
[[分类:Java]] | [[分类:Java]] |
2024年3月7日 (四) 17:38的版本
Java 自定义函数 - KAFKA
Producer
producer 是线程安全的。
ack 配置项用来控制producer要求leader确认多少消息后返回调用成功
- 0 不需要等待任何确认消息
- 1 需要等待leader确认
- -1或all 需要全部 ISR 集合返回确认,默认值
消息键保序(key-ordering )策略:Kafka 中每条消息都可以定义 Key,同一个 Key 的所有消息都进入到相同的分区中。否则采用轮询(Round-robin)、随机策略(Randomness)等策略。
producer.send(new ProducerRecord<String, String>(TOPIC, key, val) -.OR.- producer.send(new ProducerRecord<String, String>(TOPIC, val)
Consumer
设置每次消费最大记录数
默认值为 500 条。
Properties props = new Properties(); props.put("max.poll.records", N); ... consumer = new KafkaConsumer(props);
props 配置变化,需要 new consumer。
当 N 较大时,需要相应设置 fetch.max.bytes,该参数与 fetch.min.bytes(默认值 1B) 参数对应,它用来配置 Consumer 在一次拉取请求中从Kafka中拉取的最大数据量,默认值为52428800(B),也就是50MB。
获取消费记录
直到取到 max.poll.records 条记录或超过指定时长(如下面语句为 1000 毫秒)。
KafkaConsumer<String, String> consumer; consumer.poll(Duration.ofMillis(1000));
Sample
No | Method | Explain | Example |
---|---|---|---|
KAFKA | 定义 kafka,默认从 db.cnf 中取 default | KAFKA k1 = new KAFKA("kafkap182", "test", "test", "db.cnf") | |
1 | consumer | 改变消费模式 | k1.consumer("test", "test", N) # 从N开始消费 |
2 | get | 获取记录 | k1.get(9) # 指定条数
k1.get() # 默认条数,1000 |
3 | getos | begin, end, offset | k1.getos() # [0, 16, 8] |
4 | put | 生产记录 | k1.put("test", ldat1) # ArrayList<String> ldat1 |
package com.udf.base; /* ------------------------------------------------------------------------------ Name : Udf.base.KAFKA Purpose : Kafka product & consumer Author : Adam Revisions: Ver Date Author Description --------- ---------- --------------- ------------------------------------ 1.0 2024/3/5 Adam Create. format: property: consumer method : get, put <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.0.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.10.0.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>1.0.0</version> </dependency> ------------------------------------------------------------------------------ */ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.KafkaAdminClient; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.log4j.Logger; import java.time.Duration; import java.util.*; public class KAFKA extends BASE { public static final String VERSION = "v1.0"; public static Producer<String, String> producer; public static KafkaConsumer<String, String> consumer; public static ConsumerRecords<String, String> cr; // set stream public static ArrayList<String> topics = new ArrayList<>(); // topic list public static HashMap<Long, ArrayList<String>> val = new HashMap<>(); // set result, [offset, value] public static String jval = ""; // json set result public static int cs = 0; // rows count public static boolean status = false; // MQ connect status public static int PullMS = 1000; // default pull max 1000 millisecond public static int MaxRows = 1000; private static String sServ, sTopic, sGroup; private static final Properties props = new Properties(); private static final Logger logger = Logger.getLogger(KAFKA.class); public KAFKA(String mqname1, String topic1, String group1, String mq1) { // get db jdbc info CNF cnf1 = new CNF(mq1); sServ = cnf1.get(mqname1); // s1:9092,s2:9092,... sTopic = topic1; sGroup = group1; props.put("bootstrap.servers", sServ); AdminClient client = KafkaAdminClient.create(props); // get topic list Set topics1 = null; try { topics1 = client.listTopics().names().get(); } catch (Exception e) { logger.error(e); } topics = new ArrayList<>(topics1); init(); } public KAFKA(String mqname1) { this(mqname1, "test", "test", "db.cnf"); } public static void init() { props.put("bootstrap.servers", sServ); props.put("group.id", sGroup); // producer props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); //consumer props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("auto.offset.reset", "earliest"); props.put("max.poll.records", MaxRows); } public static void consumer(String topic1, String group1, long offset1) { sTopic = topic1; sGroup = group1; _consumer(topic1, group1, 1); reset(offset1); } public static void consumer(String topic1, String group1) { sTopic = topic1; sGroup = group1; _consumer(topic1, group1, MaxRows); } public static void get(int rows) { if (rows < 1) return; _consumer(sTopic, sGroup, rows); _get(); } public static void get() { _consumer(sTopic, sGroup, MaxRows); _get(); } public static ArrayList getos() { //consumer(sTopic, sGroup); long osb = 0; // beginOffsets long ose = 0; // endOffsets long osc = 0; // currentOffsets ArrayList los1 = new ArrayList(); Map<Integer, Long> beginOffsetMap = new HashMap<Integer, Long>(); Map<Integer, Long> endOffsetMap = new HashMap<Integer, Long>(); Map<Integer, Long> commitOffsetMap = new HashMap<Integer, Long>(); List<TopicPartition> topicPartitions = new ArrayList<TopicPartition>(); List<PartitionInfo> partitionsFor; try { partitionsFor = consumer.partitionsFor(sTopic); } catch (Exception e) { consumer(sTopic, sGroup);; partitionsFor = consumer.partitionsFor(sTopic); } for (PartitionInfo partitionInfo : partitionsFor) { TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition()); topicPartitions.add(topicPartition); } // beginOffsetMap Map<TopicPartition, Long> beginOffsets = consumer.beginningOffsets(topicPartitions); for (TopicPartition partitionInfo : beginOffsets.keySet()) { beginOffsetMap.put(partitionInfo.partition(), beginOffsets.get(partitionInfo)); } // for (Integer partitionId : beginOffsetMap.keySet()) { // logger.info(String.format("topic:%s, partition:%s, logSize:%s", sTopic, partitionId, beginOffsetMap.get(partitionId))); // } // endOffsetMap Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions); for (TopicPartition partitionInfo : endOffsets.keySet()) { endOffsetMap.put(partitionInfo.partition(), endOffsets.get(partitionInfo)); } // for (Integer partitionId : endOffsetMap.keySet()) { // logger.info(String.format("topic:%s, partition:%s, logSize:%s", sTopic, partitionId, endOffsetMap.get(partitionId))); // } //commitOffsetMap for (TopicPartition topicAndPartition : topicPartitions) { OffsetAndMetadata committed = consumer.committed(topicAndPartition); commitOffsetMap.put(topicAndPartition.partition(), committed.offset()); } //sum lag long lagSum = 0; osb = 9999999999999999l; ose = 0; if (endOffsetMap.size() == commitOffsetMap.size()) { for (Integer partition : endOffsetMap.keySet()) { long beginOffSet = beginOffsetMap.get(partition); long endOffSet = endOffsetMap.get(partition); long commitOffSet = commitOffsetMap.get(partition); long diffOffset = endOffSet - commitOffSet; lagSum += diffOffset; osb = Math.min(osb, beginOffSet); ose = Math.max(ose, endOffSet); osc = Math.max(osc, commitOffSet); // logger.info("Topic:" + sTopic + ", groupID:" + sGroup + ", partition:" + partition + ", beginOffSet:" + beginOffSet + ", endOffset:" + endOffSet + ", commitOffset:" + commitOffSet + ", diffOffset:" + diffOffset); } // logger.info("Topic:" + sTopic + ", groupID:" + sGroup + ", LAG:" + lagSum); } else { logger.error("this topic partitions lost."); } los1.addAll(Arrays.asList(osb, ose, osc)); return los1; } // !!!! traversed all the records, consumer needs to be closed. // .consumer.close() public static void getStream(int rows) { if (rows < 1) return; _consumer(sTopic, sGroup, rows); cr = consumer.poll(Duration.ofMillis(PullMS)); cs = cr.count(); logger.info("Get records: " + cs); } public static void put(String topic1, ArrayList<String> dat1) { producer = new KafkaProducer<>(props); for (String s : dat1) { producer.send(new ProducerRecord<String, String>(topic1, s)); } producer.close(); logger.info(String.format("Producer %s records successed.", dat1.size())); } protected static void _consumer(String topic1, String group1, int records1) { // Consumer properties sTopic = topic1; sGroup = group1; // Create consumer if (status) { consumer.close(); status = false; } props.put("max.poll.records", records1); consumer = new KafkaConsumer(props); // Subscribe to topic consumer.subscribe(Arrays.asList(sTopic)); status = true; } protected static void _get() { long i; cr = consumer.poll(Duration.ofMillis(PullMS)); cs = cr.count(); logger.info("Get records: " + cs); i = 0; ArrayList lrs1; for (ConsumerRecord<String, String> record : cr) { lrs1 = new ArrayList(); lrs1.add(record.offset()); lrs1.add(record.value()); // lrs1.add(record.key()); // lrs1.add(record.timestamp()); val.put(i++, lrs1); } consumer.close(); status = false; jval = json.toJson(val); } private static void reset(long offset1) { Set<TopicPartition> assignment1 = new HashSet<>(); while (assignment1.size() == 0) { consumer.poll(Duration.ofMillis(100)); assignment1 = consumer.assignment(); } logger.info("Partition Assignment: " + assignment1); Map<TopicPartition, Long> beginOffsets = consumer.beginningOffsets(assignment1); Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment1); for (TopicPartition tp : assignment1) { if (offset1 < 0) { offset1 = Math.max(beginOffsets.get(tp), endOffsets.get(tp) + offset1 + 1); } else { offset1 = Math.max(offset1, beginOffsets.get(tp)); offset1 = Math.min(offset1, endOffsets.get(tp)); } logger.info(beginOffsets.get(tp)); logger.info(endOffsets.get(tp)); logger.info("Reset Partition " + tp + " from " + offset1 + " offset consumer."); consumer.seek(tp, offset1); } consumer.close(); status = false; } }