Java 自定义方法 - KAFKA:修订间差异
跳到导航
跳到搜索
(→获取消费记录) |
(→Sample) |
||
第80行: | 第80行: | ||
|KAFKA | |KAFKA | ||
|定义 kafka,默认从 db.cnf 中取 default | |定义 kafka,默认从 db.cnf 中取 default | ||
|KAFKA k1 = new KAFKA("kafkap182 | |KAFKA k1 = new KAFKA("kafkap182", "db.cnf") | ||
|- | |- | ||
|1 | |1 | ||
| | |reset | ||
| | |重设消费位置 | ||
|k1. | |k1.reset("test", "test", N) # 从N开始消费 | ||
|- | |- | ||
|2 | |2 | ||
|get | |get | ||
|获取记录 | |获取记录 | ||
|k1.get( | |k1.get(topic1, group1) # 默认条数,500 | ||
k1. | k1.getStream(topic1, group1, N) # 流 -> ConsumerRecords<String, String> cr | ||
|- | |- | ||
|3 | |3 | ||
|put | |put | ||
|生产记录 | |生产记录 | ||
|k1.put("test", ldat1) # ArrayList<String> ldat1 | |k1.put("test", ldat1) # ArrayList<String> ldat1 | ||
|- | |||
|4 | |||
|getTopics | |||
|获取集群上 topic 列表 | |||
|ArrayList<String> l1 = k1.getTopics() | |||
|- | |||
|5 | |||
|getos | |||
|begin, end, offset | |||
|k1.getos(topic1, group1) # [0, 16, 8] | |||
|} | |} | ||
package com.udf.base; | package com.udf.base; | ||
/* | /* | ||
第113行: | 第119行: | ||
--------- ---------- --------------- ------------------------------------ | --------- ---------- --------------- ------------------------------------ | ||
1.0 2024/3/5 Adam Create. | 1.0 2024/3/5 Adam Create. | ||
1.2 2024/3/8 Adam putsync, putcb | |||
format: | format: | ||
property: consumer | property: consumer | ||
method : get, put | method : get, put, reset | ||
<!--Kafka--> | <!--Kafka--> | ||
第147行: | 第154行: | ||
import org.apache.kafka.clients.producer.Producer; | import org.apache.kafka.clients.producer.Producer; | ||
import org.apache.kafka.clients.producer.ProducerRecord; | import org.apache.kafka.clients.producer.ProducerRecord; | ||
import org.apache.kafka.clients.producer.RecordMetadata; | |||
import org.apache.kafka.common.PartitionInfo; | import org.apache.kafka.common.PartitionInfo; | ||
import org.apache.kafka.common.TopicPartition; | import org.apache.kafka.common.TopicPartition; | ||
第165行: | 第173行: | ||
public static String jval = ""; // json set result | public static String jval = ""; // json set result | ||
public static int cs = 0; // rows count | public static int cs = 0; // rows count | ||
public static int PullMS = 1000; // default pull max 1000 millisecond | public static int PullMS = 1000; // default pull max 1000 millisecond | ||
public static int MaxRows = | public static int MaxRows = 500; | ||
private static final Properties props = new Properties(); | private static final Properties props = new Properties(); | ||
private static final Logger logger = Logger.getLogger(KAFKA.class); | private static final Logger logger = Logger.getLogger(KAFKA.class); | ||
public KAFKA(String mqname1 | public KAFKA(String mqname1, String mq1) { | ||
// get db jdbc info | // get db jdbc info | ||
CNF cnf1 = new CNF(mq1); | CNF cnf1 = new CNF(mq1); | ||
String serv1 = cnf1.get(mqname1 + ".host"); // s1:9092,s2:9092,... | |||
props | _init(serv1); // props init | ||
AdminClient | } | ||
public KAFKA(String mqname1) { | |||
this(mqname1, "db.cnf"); | |||
} | |||
public static ArrayList getTopics() { | |||
AdminClient client1 = KafkaAdminClient.create(props); | |||
// get topic list | // get topic list | ||
Set topics1 = null; | Set topics1 = null; | ||
try { | try { | ||
topics1 = | topics1 = client1.listTopics().names().get(); | ||
} catch (Exception e) { | } catch (Exception e) { | ||
logger.error(e); | logger.error(e); | ||
} | } | ||
client1.close(); | |||
return new ArrayList<String>(topics1); | |||
} | } | ||
public static | public static ArrayList getos(String topic1, String group1) { | ||
_consumer(topic1, group1, 1); | _consumer(topic1, group1, 1); | ||
long osb = 0; // beginOffsets | long osb = 0; // beginOffsets | ||
long ose = 0; // endOffsets | long ose = 0; // endOffsets | ||
第247行: | 第212行: | ||
List<TopicPartition> topicPartitions = new ArrayList<TopicPartition>(); | List<TopicPartition> topicPartitions = new ArrayList<TopicPartition>(); | ||
List<PartitionInfo> partitionsFor; | List<PartitionInfo> partitionsFor; | ||
partitionsFor = consumer.partitionsFor(topic1); | |||
for (PartitionInfo partitionInfo : partitionsFor) { | for (PartitionInfo partitionInfo : partitionsFor) { | ||
TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition()); | TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition()); | ||
第263行: | 第225行: | ||
beginOffsetMap.put(partitionInfo.partition(), beginOffsets.get(partitionInfo)); | beginOffsetMap.put(partitionInfo.partition(), beginOffsets.get(partitionInfo)); | ||
} | } | ||
// endOffsetMap | // endOffsetMap | ||
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions); | Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions); | ||
第271行: | 第230行: | ||
endOffsetMap.put(partitionInfo.partition(), endOffsets.get(partitionInfo)); | endOffsetMap.put(partitionInfo.partition(), endOffsets.get(partitionInfo)); | ||
} | } | ||
//commitOffsetMap | //commitOffsetMap | ||
for (TopicPartition topicAndPartition : topicPartitions) { | for (TopicPartition topicAndPartition : topicPartitions) { | ||
OffsetAndMetadata committed = consumer.committed(topicAndPartition); | OffsetAndMetadata committed = consumer.committed(topicAndPartition); | ||
commitOffsetMap.put(topicAndPartition.partition(), committed.offset()); | if (committed==null) | ||
commitOffsetMap.put(topicAndPartition.partition(), 0l); | |||
else | |||
commitOffsetMap.put(topicAndPartition.partition(), committed.offset()); | |||
} | } | ||
第295行: | 第254行: | ||
ose = Math.max(ose, endOffSet); | ose = Math.max(ose, endOffSet); | ||
osc = Math.max(osc, commitOffSet); | osc = Math.max(osc, commitOffSet); | ||
} | } | ||
} else { | } else { | ||
logger.error("this topic partitions lost."); | logger.error("this topic partitions lost."); | ||
} | } | ||
los1.addAll(Arrays.asList(osb, ose, osc)); | los1.addAll(Arrays.asList(osb, ose, osc)); | ||
_close(); | |||
return los1; | return los1; | ||
} | } | ||
// !!!! traversed all the records, consumer needs to be closed. | // !!!! traversed all the records(cr), consumer needs to be closed. | ||
// | // .close() | ||
public static void getStream(int | public static void getStream(String topic1, String group1, int records1) { | ||
if ( | if (records1 < 1) | ||
return; | return; | ||
_consumer( | _consumer(topic1, group1, records1); | ||
cr = consumer.poll(Duration.ofMillis(PullMS)); | cr = consumer.poll(Duration.ofMillis(PullMS)); | ||
cs = cr.count(); | cs = cr.count(); | ||
logger.info("Get records: " + cs); | logger.info("Get records: " + cs); | ||
//_close(); | |||
} | } | ||
public static void put(String topic1, ArrayList<String> dat1) { | public static void put(String topic1, ArrayList<String> dat1) { | ||
第324行: | 第284行: | ||
logger.info(String.format("Producer %s records successed.", dat1.size())); | logger.info(String.format("Producer %s records successed.", dat1.size())); | ||
} | } | ||
public static void putsync(String topic1, ArrayList<String> dat1) { | |||
// | producer = new KafkaProducer<>(props); | ||
for (String s : dat1) { | |||
try { | |||
RecordMetadata metadata = producer.send(new ProducerRecord<String, String>(topic1, s)).get(); | |||
//logger.info("offset = "+metadata.offset()); | |||
if ( | } catch (Exception e) { | ||
logger.error(e); | |||
return; | |||
} | |||
} | |||
producer.close(); | |||
logger.info(String.format("Synchronous Producer %s records successed.", dat1.size())); | |||
} | |||
public static void putcb(String topic1, ArrayList<String> dat1) { | |||
producer = new KafkaProducer<>(props); | |||
for (String s : dat1) { | |||
producer.send(new ProducerRecord<String, String>(topic1, s), (RecordMetadata metadata, Exception e) -> { | |||
if (e != null) { | |||
logger.error(e); | |||
} | |||
}); | |||
} | } | ||
producer.close(); | |||
logger.info(String.format("CallBack Producer %s records successed.", dat1.size())); | |||
} | } | ||
public static void get(String topic1, String group1, int records1) { | |||
_consumer(topic1, group1, records1); | |||
long i; | long i; | ||
cr = consumer.poll(Duration.ofMillis(PullMS)); | cr = consumer.poll(Duration.ofMillis(PullMS)); | ||
cs = cr.count(); | cs = cr.count(); | ||
logger.info(" | logger.info(String.format("Topic:[%s], Group:[%s], get records: %s", topic1, group1, cs)); | ||
i = 0; | i = 0; | ||
第356行: | 第328行: | ||
val.put(i++, lrs1); | val.put(i++, lrs1); | ||
} | } | ||
_close(); | |||
jval = json.toJson(val); | jval = json.toJson(val); | ||
} | } | ||
public static void get(String topic1, String group1) { | |||
get(topic1, group1, MaxRows); | |||
} | |||
public static void reset(String topic1, String group1, long offset1) { | |||
_consumer(topic1, group1, 1); | |||
Set<TopicPartition> assignment1 = new HashSet<>(); | Set<TopicPartition> assignment1 = new HashSet<>(); | ||
while (assignment1.size() == 0) { | while (assignment1.size() == 0) { | ||
第366行: | 第342行: | ||
assignment1 = consumer.assignment(); | assignment1 = consumer.assignment(); | ||
} | } | ||
logger.info("Partition Assignment: " + assignment1); | //logger.info("Partition Assignment: " + assignment1); | ||
Map<TopicPartition, Long> beginOffsets = consumer.beginningOffsets(assignment1); | Map<TopicPartition, Long> beginOffsets = consumer.beginningOffsets(assignment1); | ||
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment1); | Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment1); | ||
第377行: | 第353行: | ||
offset1 = Math.min(offset1, endOffsets.get(tp)); | offset1 = Math.min(offset1, endOffsets.get(tp)); | ||
} | } | ||
logger.info(beginOffsets.get(tp) | logger.info(String.format("Reset Topic-Partition [%s] offset: [%s,%s]=%s", tp, beginOffsets.get(tp), endOffsets.get(tp), offset1)); | ||
consumer.seek(tp, offset1); | consumer.seek(tp, offset1); | ||
} | } | ||
consumer.close(); | _close(); | ||
} | |||
protected static void _init(String serv1) { | |||
props.put("bootstrap.servers", serv1); | |||
//props.put("group.id", sGroup); | |||
// producer | |||
props.put("key.serializer", StringSerializer.class.getName()); | |||
props.put("value.serializer", StringSerializer.class.getName()); | |||
props.put("acks", "all"); | |||
props.put("retries", 0); | |||
props.put("batch.size", 16384); | |||
props.put("linger.ms", 10); | |||
props.put("buffer.memory", 33554432); | |||
//consumer | |||
props.put("key.deserializer", StringDeserializer.class.getName()); | |||
props.put("value.deserializer", StringDeserializer.class.getName()); | |||
props.put("enable.auto.commit", "true"); | |||
props.put("auto.commit.interval.ms", "1000"); | |||
props.put("session.timeout.ms", "30000"); | |||
props.put("auto.offset.reset", "earliest"); | |||
//props.put("max.poll.records", MaxRows); | |||
} | |||
protected static void _consumer(String topic1, String group1, int records1) { | |||
// Create consumer | |||
_close(); | |||
props.put("group.id", group1); | |||
props.put("max.poll.records", records1); | |||
consumer = new KafkaConsumer(props); | |||
// Subscribe to topic | |||
consumer.subscribe(Arrays.asList(topic1)); | |||
} | |||
protected static void _close() { | |||
try { | |||
consumer.close(); | |||
} catch (Exception e) { | |||
logger.error("Consumer closed."); | |||
} | |||
} | } | ||
} | } | ||
[[分类:Develop]] | [[分类:Develop]] | ||
[[分类:Java]] | [[分类:Java]] |
2024年3月8日 (五) 15:14的版本
Java 自定义函数 - KAFKA
Producer
producer 是线程安全的。
Producer<String, String> producer
消息发送方式
# 异步,默认方法 producer.send(new ProducerRecord<String, String>(topic1, s)); # 异步,回调 producer.send(new ProducerRecord<String, String>(topic1, s), (RecordMetadata metadata, Exception e) -> {}); # 同步 RecordMetadata metadata = producer.send(new ProducerRecord<String, String>(topic1, s)).get();
消息分区策略
消息键保序(key-ordering )策略:Kafka 中每条消息都可以定义 Key,同一个 Key 的所有消息都进入到相同的分区中。否则采用轮询(Round-robin)、随机策略(Randomness)等策略。
producer.send(new ProducerRecord<String, String>(TOPIC, key, val) -.OR.- producer.send(new ProducerRecord<String, String>(TOPIC, val)
集群同步规则
ack 配置项用来控制 producer 要求 leader 确认多少消息后返回调用成功
- 0 不需要等待任何确认消息
- 1 需要等待leader确认
- -1或all 需要全部 ISR 集合返回确认,默认值
ack=-1 # 默认需要全部 ISR 集合返回确认
缓冲区
缓冲一批数据发送或超过延时时间(linger.ms=0)发送,默认 16k。
batch.size=1048576 # 1M linger.ms=10 # 10ms
消息重发
消息发送错误时设置的系统重发消息次数(retries=2147483647)、重发间隔(retry.backoff.ms=100)。若保证消息的有序性,设置 max_in_flight_requests_per_connection=1。
max_in_flight_requests_per_connection=1 retries=10 retry.backoff.ms=200
压缩方式
发送的所有数据的压缩方式:none(默认), gzip, snappy, lz4, zstd。
compression.type=lz4
其他
enable.idempotence=true # 默认开启幂等性 bootstrap.servers=node1:9092,node2:9092,... # 并非需要所有的broker地址,生产者可以从给定的 broker 里查找到其他 broker 信息
Consumer
设置每次消费最大记录数
默认值为 500 条。
Properties props = new Properties(); props.put("max.poll.records", N); ... consumer = new KafkaConsumer(props);
props 配置变化,需要 new consumer。
当 N 较大时,需要设置 fetch.max.bytes(默认值 52428800,单位 byte,下同),该参数与 fetch.min.bytes(默认值 1)参数对应,用来配置 Consumer 在一次请求中从 Kafka 中拉取的多批总数据量大小。若一批次的数据大于该值,仍然可以拉取。
获取消费记录
直到取到 max.poll.records 条记录或超过指定时长(如下面语句为 1000 毫秒)。
KafkaConsumer<String, String> consumer; consumer.poll(Duration.ofMillis(1000));
常见问题
- Cannot invoke "org.apache.kafka.clients.consumer.OffsetAndMetadata.offset()" because "committed" is null
在下列语句中可能会遇到此类问题,原因是该 topic & group 从未被消费过。 OffsetAndMetadata committed = consumer.committed(topicAndPartition); commitOffsetMap.put(topicAndPartition.partition(), committed.offset()); 需要判断 committed==null,committed.offset()替换为零。
Sample
No | Method | Explain | Example |
---|---|---|---|
KAFKA | 定义 kafka,默认从 db.cnf 中取 default | KAFKA k1 = new KAFKA("kafkap182", "db.cnf") | |
1 | reset | 重设消费位置 | k1.reset("test", "test", N) # 从N开始消费 |
2 | get | 获取记录 | k1.get(topic1, group1) # 默认条数,500
k1.getStream(topic1, group1, N) # 流 -> ConsumerRecords<String, String> cr |
3 | put | 生产记录 | k1.put("test", ldat1) # ArrayList<String> ldat1 |
4 | getTopics | 获取集群上 topic 列表 | ArrayList<String> l1 = k1.getTopics() |
5 | getos | begin, end, offset | k1.getos(topic1, group1) # [0, 16, 8] |
package com.udf.base; /* ------------------------------------------------------------------------------ Name : Udf.base.KAFKA Purpose : Kafka product & consumer Author : Adam Revisions: Ver Date Author Description --------- ---------- --------------- ------------------------------------ 1.0 2024/3/5 Adam Create. 1.2 2024/3/8 Adam putsync, putcb format: property: consumer method : get, put, reset <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.0.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.10.0.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>1.0.0</version> </dependency> ------------------------------------------------------------------------------ */ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.KafkaAdminClient; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.log4j.Logger; import java.time.Duration; import java.util.*; public class KAFKA extends BASE { public static final String VERSION = "v1.0"; public static Producer<String, String> producer; public static KafkaConsumer<String, String> consumer; public static ConsumerRecords<String, String> cr; // set stream public static ArrayList<String> topics = new ArrayList<>(); // topic list public static HashMap<Long, ArrayList<String>> val = new HashMap<>(); // set result, [offset, value] public static String jval = ""; // json set result public static int cs = 0; // rows count public static int PullMS = 1000; // default pull max 1000 millisecond public static int MaxRows = 500; private static final Properties props = new Properties(); private static final Logger logger = Logger.getLogger(KAFKA.class); public KAFKA(String mqname1, String mq1) { // get db jdbc info CNF cnf1 = new CNF(mq1); String serv1 = cnf1.get(mqname1 + ".host"); // s1:9092,s2:9092,... _init(serv1); // props init } public KAFKA(String mqname1) { this(mqname1, "db.cnf"); } public static ArrayList getTopics() { AdminClient client1 = KafkaAdminClient.create(props); // get topic list Set topics1 = null; try { topics1 = client1.listTopics().names().get(); } catch (Exception e) { logger.error(e); } client1.close(); return new ArrayList<String>(topics1); } public static ArrayList getos(String topic1, String group1) { _consumer(topic1, group1, 1); long osb = 0; // beginOffsets long ose = 0; // endOffsets long osc = 0; // currentOffsets ArrayList los1 = new ArrayList(); Map<Integer, Long> beginOffsetMap = new HashMap<Integer, Long>(); Map<Integer, Long> endOffsetMap = new HashMap<Integer, Long>(); Map<Integer, Long> commitOffsetMap = new HashMap<Integer, Long>(); List<TopicPartition> topicPartitions = new ArrayList<TopicPartition>(); List<PartitionInfo> partitionsFor; partitionsFor = consumer.partitionsFor(topic1); for (PartitionInfo partitionInfo : partitionsFor) { TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition()); topicPartitions.add(topicPartition); } // beginOffsetMap Map<TopicPartition, Long> beginOffsets = consumer.beginningOffsets(topicPartitions); for (TopicPartition partitionInfo : beginOffsets.keySet()) { beginOffsetMap.put(partitionInfo.partition(), beginOffsets.get(partitionInfo)); } // endOffsetMap Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions); for (TopicPartition partitionInfo : endOffsets.keySet()) { endOffsetMap.put(partitionInfo.partition(), endOffsets.get(partitionInfo)); } //commitOffsetMap for (TopicPartition topicAndPartition : topicPartitions) { OffsetAndMetadata committed = consumer.committed(topicAndPartition); if (committed==null) commitOffsetMap.put(topicAndPartition.partition(), 0l); else commitOffsetMap.put(topicAndPartition.partition(), committed.offset()); } //sum lag long lagSum = 0; osb = 9999999999999999l; ose = 0; if (endOffsetMap.size() == commitOffsetMap.size()) { for (Integer partition : endOffsetMap.keySet()) { long beginOffSet = beginOffsetMap.get(partition); long endOffSet = endOffsetMap.get(partition); long commitOffSet = commitOffsetMap.get(partition); long diffOffset = endOffSet - commitOffSet; lagSum += diffOffset; osb = Math.min(osb, beginOffSet); ose = Math.max(ose, endOffSet); osc = Math.max(osc, commitOffSet); } } else { logger.error("this topic partitions lost."); } los1.addAll(Arrays.asList(osb, ose, osc)); _close(); return los1; } // !!!! traversed all the records(cr), consumer needs to be closed. // .close() public static void getStream(String topic1, String group1, int records1) { if (records1 < 1) return; _consumer(topic1, group1, records1); cr = consumer.poll(Duration.ofMillis(PullMS)); cs = cr.count(); logger.info("Get records: " + cs); //_close(); } public static void put(String topic1, ArrayList<String> dat1) { producer = new KafkaProducer<>(props); for (String s : dat1) { producer.send(new ProducerRecord<String, String>(topic1, s)); } producer.close(); logger.info(String.format("Producer %s records successed.", dat1.size())); } public static void putsync(String topic1, ArrayList<String> dat1) { producer = new KafkaProducer<>(props); for (String s : dat1) { try { RecordMetadata metadata = producer.send(new ProducerRecord<String, String>(topic1, s)).get(); //logger.info("offset = "+metadata.offset()); } catch (Exception e) { logger.error(e); return; } } producer.close(); logger.info(String.format("Synchronous Producer %s records successed.", dat1.size())); } public static void putcb(String topic1, ArrayList<String> dat1) { producer = new KafkaProducer<>(props); for (String s : dat1) { producer.send(new ProducerRecord<String, String>(topic1, s), (RecordMetadata metadata, Exception e) -> { if (e != null) { logger.error(e); } }); } producer.close(); logger.info(String.format("CallBack Producer %s records successed.", dat1.size())); } public static void get(String topic1, String group1, int records1) { _consumer(topic1, group1, records1); long i; cr = consumer.poll(Duration.ofMillis(PullMS)); cs = cr.count(); logger.info(String.format("Topic:[%s], Group:[%s], get records: %s", topic1, group1, cs)); i = 0; ArrayList lrs1; for (ConsumerRecord<String, String> record : cr) { lrs1 = new ArrayList(); lrs1.add(record.offset()); lrs1.add(record.value()); // lrs1.add(record.key()); // lrs1.add(record.timestamp()); val.put(i++, lrs1); } _close(); jval = json.toJson(val); } public static void get(String topic1, String group1) { get(topic1, group1, MaxRows); } public static void reset(String topic1, String group1, long offset1) { _consumer(topic1, group1, 1); Set<TopicPartition> assignment1 = new HashSet<>(); while (assignment1.size() == 0) { consumer.poll(Duration.ofMillis(100)); assignment1 = consumer.assignment(); } //logger.info("Partition Assignment: " + assignment1); Map<TopicPartition, Long> beginOffsets = consumer.beginningOffsets(assignment1); Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment1); for (TopicPartition tp : assignment1) { if (offset1 < 0) { offset1 = Math.max(beginOffsets.get(tp), endOffsets.get(tp) + offset1 + 1); } else { offset1 = Math.max(offset1, beginOffsets.get(tp)); offset1 = Math.min(offset1, endOffsets.get(tp)); } logger.info(String.format("Reset Topic-Partition [%s] offset: [%s,%s]=%s", tp, beginOffsets.get(tp), endOffsets.get(tp), offset1)); consumer.seek(tp, offset1); } _close(); } protected static void _init(String serv1) { props.put("bootstrap.servers", serv1); //props.put("group.id", sGroup); // producer props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 10); props.put("buffer.memory", 33554432); //consumer props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("auto.offset.reset", "earliest"); //props.put("max.poll.records", MaxRows); } protected static void _consumer(String topic1, String group1, int records1) { // Create consumer _close(); props.put("group.id", group1); props.put("max.poll.records", records1); consumer = new KafkaConsumer(props); // Subscribe to topic consumer.subscribe(Arrays.asList(topic1)); } protected static void _close() { try { consumer.close(); } catch (Exception e) { logger.error("Consumer closed."); } } }