Spring Cloud搭建手册(3)——Spring Cloud Bus

2018-02-27 11:49:17来源:oschina作者:watashi人点击

分享

1、网络上或者相关书籍中使用Rabbit MQ作为例子的较多,这里我们选择引入kafka来实现消息总线,在pom.xml加入依赖:



org.springframework.cloud
spring-cloud-starter-bus-kafka

2、application.properties配置文件需要配置kafka和zookeeper的地址:


spring.cloud.stream.kafka.binder.brokers=172.23.25.125:9092
spring.cloud.stream.kafka.binder.zkNodes=172.23.25.128:2181
kafka.bootstrap.servers=172.23.25.125:9092

多个地址用英文逗号分隔。


3、使用config-server发送消息给消息总线,config-client通过消息总线获取消息的结构,新增kafka的配置类KafkaConfiguration,在该配置类中,配置KafkaTemplate和kafkaListenerContainerFactory这两个bean:


@Configuration
@EnableKafka
public class KafkaConfiguration {
@Value("${kafka.bootstrap.servers}")
private String bootstrapServers;
@Bean
public KafkaTemplate kafkaTemplate() {
Map props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.RETRIES_CONFIG, "3");
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000");
ProducerFactory factory = new DefaultKafkaProducerFactory<>(props);
return new KafkaTemplate(factory);
}
@Bean
public KafkaListenerContainerFactory> kafkaListenerContainerFactory() {
Map propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "default");
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(propsMap));
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
}

※ConsumerConfig.GROUP_ID_CONFIG必须设置,否则默认为空,启动时会找不到kafka对应的group id,如果没有创建新的group id,则使用default。


首先需要在config-server上实现一个KafkaSender:


@Component
public class KafkaSender {
@Autowired
private KafkaTemplate template;
public void send(String topic, String data) throws Exception {
ListenableFuture> future = template.send(topic, data);
future.addCallback(new SuccessCallback>() {
@Override
public void onSuccess(SendResult result) {
}
}, new FailureCallback() {
@Override
public void onFailure(Throwable ex) {
throw new KafkaException(ex.getMessage(), ex);
}
});
try {
future.get(3000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
throw new Exception("Send kafka message error:" + e.getMessage());
}
}
}

然后在config-clinet上实现一个KafkaReciever:


@Component
@KafkaListener(topics = {"spring_cloud_test"})
public class KafkaReceiver {
@KafkaHandler
public void process(String data) {
System.out.println("Receive : " + data);
}
}

4、可能遇到的问题:


①启动时报错springCloudBus=UNKNOWN_TOPIC_OR_PARTITION


原因:Spring Cloud Bus会使用一个名为springCloudBus的topic,首次启动或更换了Kafka服务器,会要求手动创建这个topic。


解决办法:创建topic后重新启动:


./kafka-topics.sh --create --zookeeper 172.23.25.128:2181 --replication-factor 3 --partitions 3 --topic springCloudBus

②kafka发送消息,key=null,并引发超时



原因:查看详细的发送日志,发现其中一行是bootstrap.servers = [localhost:9092],使用KafkaTemplate默认连接了localhost:9092去发送消息,由于kafka并不是安装在localhost上,所以导致发送超时。


解决办法:增加Kafka的配置类,KafkaConfiguration,配置KafkaTemplate的相关参数。

最新文章

123

最新摄影

闪念基因

微信扫一扫

第七城市微信公众平台