kafka使用
kafka使用
gradle 依赖
dependencies {
api 'org.apache.kafka:kafka-clients:2.2.1'
}
生产者
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
public class KafkaClient {
private final KafkaProducer<String, byte[]> producer;
public kafkaClient(String kafkaServers){
this.producer = buildProducer(kafkaServers);
}
private KafkaProducer<String, byte[]> buildProducer(String kafkaServers) {
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
producerProps.put(ProducerConfig.ACKS_CONFIG, "1");// leader ack
producerProps.put(ProducerConfig.RETRIES_CONFIG, 3);
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, 5);
producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 100);
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 100);
producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
return new KafkaProducer<>(producerProps);
}
public void send(String topic, String data) {
ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(topic, data.getBytes(StandardCharsets.UTF_8));
this.producer.send(producerRecord, (metadata, exception) -> {
if (exception != null) {
log.error("send kafka message error! topic:{}, value:{}", topic, new String(producerRecord.value()), exception);
}
});
}
}
消费者
public class KafkaConsumerExample {
private final static String TOPIC = "my-example-topic";
private final static String BOOTSTRAP_SERVERS =
"localhost:9092,localhost:9093,localhost:9094";
private static Consumer<Long, String> createConsumer() {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG,
"KafkaExampleConsumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
// Create the consumer using props.
final Consumer<Long, String> consumer =
new KafkaConsumer<>(props);
// Subscribe to the topic.
consumer.subscribe(Collections.singletonList(TOPIC));
return consumer;
}
//起一个线程替代main方法,拉取消息
static void runConsumer() throws InterruptedException {
final Consumer<Long, String> consumer = createConsumer();
final int giveUp = 100; int noRecordsCount = 0;
while (true) {
final ConsumerRecords<Long, String> consumerRecords =
consumer.poll(1000);
if (consumerRecords.count()==0) {
noRecordsCount++;
if (noRecordsCount > giveUp) break;
else continue;
}
consumerRecords.forEach(record -> {
System.out.printf("Consumer Record:(%d, %s, %d, %d)\n",
record.key(), record.value(),
record.partition(), record.offset());
});
consumer.commitAsync();
}
consumer.close();
System.out.println("DONE");
}
public static void main(String... args) throws Exception {
runConsumer();
}
}
留下评论