提交 905eac76 编写于 作者: S star

kafka

上级 a9a8f0c4
......@@ -972,12 +972,149 @@ Kafka 最佳实践配置:
---
#### 9.Kafka 与
#### 9.Kafka 与 SpringBoot
创建 Kafka Producer 程序:
1. 创建 SpringBoot 应用程序,选择 web、kafka 组件。
2. 进行配置:
~~~properties
server.port=8080
spring.kafka.producer.bootstrap-servers=192.168.253.136:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
~~~
3. 创建生产者组件:
~~~java
@Component
public class KafkaProducer {
public static final String TOPIC = "test-topic";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(KafkaMessage message) {
kafkaTemplate.send(TOPIC, JSONUtil.toJsonStr(message));
}
}
~~~
`kafkaTemplate.send` 方法将会返回一个 `ListenableFuture<SendResult<String, String>>` 对象,我们可以从这个 future 中获取消息发送结果,也可以添加回调函数:
~~~java
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC, JSONUtil.toJsonStr(message));
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
System.out.println("消息发送失败:" + throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, String> stringStringSendResult) {
System.out.println("消息发送成功:" + stringStringSendResult.getRecordMetadata());
}
});
~~~
4. 创建 controller 用来生产消息:
~~~java
@Slf4j
@RestController
@RequestMapping("/demo")
public class KafkaController {
@Autowired
private KafkaProducer producer;
@PostMapping("/send")
public void send(@RequestBody KafkaMessage message) {
log.info("received message...");
producer.sendMessage(message);
}
}
~~~
创建 Kafka Consumer 程序:
1. 创建 SpringBoot 应用程序,选择 kafka 组件。
2. 进行配置:
~~~properties
server.port=8081
spring.kafka.consumer.bootstrap-servers=192.168.253.136:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
~~~
3. 创建监听程序:
~~~java
@Component
public class KafkaConsumer {
public static final String TOPIC = "test-topic";
@KafkaListener(topics = TOPIC, groupId = "my-group")
public void acceptMessage(ConsumerRecord<String, String> record) {
System.out.println("topic:" + record.topic() + ",partition:" + record.partition() +
",key:" + record.key() + ",value:" + record.value() + ",timestamp:" + record.timestamp());
}
}
~~~
通过 KafkaListener 注解,当监听到消息时传入到 record 中进行后续处理。
> 如果在 SpringBoot 中用到的主题未创建,SpringBoot 将会为我们自动创建主题。
SpringBoot 设置多线程消费:
~~~java
@KafkaListener(topics = TOPIC, groupId = "my-group", concurrency = "3")
public void acceptMessage(ConsumerRecord<String, String> record, Consumer<String, String> consumer) {
System.out.println(consumer.metrics().keySet());
System.out.println("topic:" + record.topic() + ",partition:" + record.partition() +
",key:" + record.key() + ",value:" + record.value() + ",timestamp:" + record.timestamp());
}
~~~
意:concurrency 为消费者数量,应当小于等于主题分区数量。
> :boxing_glove: 更多 Kafka 与 SpringBoot 的使用方法参考:https://docs.spring.io/spring-kafka/docs/current/reference/html/#preface
---
#### 10.Kafka 常见面试题
\ No newline at end of file
#### 10.Kafka 常见面试题
Kafka 常见应用场景?Q
1. 日志收集或流式系统
2. 消息系统
3. 用户活动跟踪或运营指标监控
Kafka 如何保证消息有序性?(Kafka 只支持单 Partion 有序)
1. 对每一组需要顺序的消息指定同一个 key,这样这些消息将会被分发到同一个 Partion 中。
2. 读取消息时的处理方法:
1. 单线程消费消息时,消息是有序的。
2. 采用多线程消费 Partion 中的消息时,可以设计 N 个 queue,相同 key 的消息放入同一个 queue,每个 queue 由一个单独的线程执行。
几大消息中间件比较:https://my.oschina.net/blogByRzc/blog/3012251
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册