📜  在 AXON 框架中使用 KAFKA 的微服务之间的异步通信 (1)

📅  最后修改于: 2023-12-03 15:23:06.798000             🧑  作者: Mango

在 AXON 框架中使用 KAFKA 的微服务之间的异步通信

在微服务架构中,服务之间的通信通常使用 HTTP 协议进行同步通信,但是这种同步通信方式会导致服务之间的耦合性较高,同时也会存在单点故障的问题。为了解决这些问题,异步通信方式变得愈发重要。Kafka 作为互联网公司及大型组织在异步消息处理系统中的首选数据存储,因其稳定性以及高可靠性、高容错性、高吞吐量和可扩展性等特点而备受青睐。Axon 是一个基于 CQRS 和事件驱动架构(EDA)的开源框架,Axon 可以轻松地集成 Kafka,从而实现微服务之间的异步通信。接下来,我们将介绍如何在 Axon 框架中使用 Kafka 实现微服务之间的异步通信。

Axon 框架

Axon 框架是一个基于 CQRS 和事件驱动架构的开源框架。它提供一组 API,使开发人员能够使用 CQRS 和 EDA 架构进行开发,可以轻松实现事件驱动架构的概念。Axon 框架对于开发 CQRS 和事件驱动架构的应用程序非常有用,Axon 本身包含多个组件,例如:

  • 命令模块
  • 事件模块
  • 聚合模块
  • 消息模块
  • 事件存储模块
  • 分布式锁模块等。

Axon 框架为开发人员提供了更简单的开发方式,使得开发 CQRS 和事件驱动架构的应用程序变得更容易。

Kafka

Kafka 是一个高效、可扩展的分布式消息系统,能够处理大量的实时消息。Kafka 在很多大型网站,如 LinkedIn、Uber、Netflix 等中得到广泛应用。它具有以下特点:

  • 高性能:Kafka 能够处理高达每秒几十万条消息的吞吐量。
  • 可扩展性:Kafka 能够轻松地扩展到成千上万个分区。
  • 可靠性:Kafka 可以提供高可靠性,确保消息不会丢失。
  • 持久化:Kafka 能够将消息持久化存储,即使在消息被消费之后仍然可以进行消息查看和复用。
Axon 框架集成 Kafka

Axon 框架通过快速集成 Kafka,可以实现微服务之间的异步通信,为微服务架构提供了一个高效、可扩展、高性能的解决方案。

Axon 框架提供了 axon-kafka 扩展,该扩展使得在 Axon 框架中使用 Kafka 非常方便。

添加 Axon Kafka 扩展

首先,需要在项目的 pom.xml 中添加 Axon Kafka 扩展的依赖:

<dependency>
    <groupId>org.axonframework.kafka</groupId>
    <artifactId>axon-kafka</artifactId>
    <version>4.3.3</version>
</dependency>
配置 Kafka

在集成 Axon 框架和 Kafka 之前,需要对 Kafka 进行配置。

这里提供一个简单的 Kafka 配置示例:

kafka.bootstrapServers=localhost:9092
kafka.commandTopic=my-command-topic
kafka.commandGroup=my-command-group
kafka.eventTopic=my-event-topic
kafka.eventGroup=my-event-group

需要配置以下内容:

  • kafka.bootstrapServers:Kafka 服务器的地址。
  • kafka.commandTopic:命令主题的名称。
  • kafka.commandGroup:命令组的名称。
  • kafka.eventTopic:事件主题的名称。
  • kafka.eventGroup:事件组的名称。
集成 Axon 框架和 Kafka

在集成 Axon 框架和 Kafka 之前,需要定义一些类别名,用于为不同的命令门户、事件总线、命令处理程序和事件处理程序定义唯一的标识符。

axon.kafka.default-commands.type=COMMANDS
axon.kafka.default-commands.destination=my-command-topic
axon.kafka.default-commands.contentType=application/json

axon.kafka.default-events.type=EVENTS
axon.kafka.default-events.destination=my-event-topic
axon.kafka.default-events.contentType=application/json

axon.kafka.command-subscribing.type=SUBSCRIBING
axon.kafka.command-subscribing.destination=my-command-topic
axon.kafka.command-subscribing.contentType=application/json
axon.kafka.command-subscribing.group.id=my-command-group

axon.kafka.event-listening.type=LISTENING
axon.kafka.event-listening.destination=my-event-topic
axon.kafka.event-listening.contentType=application/json
axon.kafka.event-listening.group.id=my-event-group
  • axon.kafka.default-commands:用于定义默认的命令。
  • axon.kafka.default-events:用于定义默认的事件。
  • axon.kafka.command-subscribing:用于定义用于订阅命令的门户。
  • axon.kafka.event-listening:用于定义用于监听事件的总线。
实现命令处理程序

在 Axon 框架中,命令处理程序由 Java 类实现,需要在 Java 类中使用 @CommandHandler 注释来标识该类的方法是一个命令处理程序。

public class MyCommandHandler {

    @CommandHandler
    public void handle(MyCommand command) {
        // your command handler logic here
    }
}

这里,我们用 MyCommandHandler 类来实现一个命令处理程序,并使用 @CommandHandler 注释来标识其方法 handle 是一个命令处理程序。

实现事件处理程序

在 Axon 框架中,事件处理程序由 Java 类实现,需要在 Java 类中使用 @EventSourcingHandler@EventHandler 注解来标识该类的方法是一个事件处理程序。

public class MyEventHandler {

    @EventHandler
    public void handle(MyEvent event) {
        // your event handler logic here
    }
}

这里,我们用 MyEventHandler 类来实现一个事件处理程序,并使用 @EventHandler 注释来标识其方法 handle 是一个事件处理程序。

实现命令模块和事件模块

在 Axon 框架中,需要实现 CommandEvent 类来定义命令和事件,这些类应该用于命令总线和事件总线。

public class MyCommand {

    private final String name;

    public MyCommand(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }
}

这里,我们定义了一个 MyCommand 类来表示一条命令,并将 name 作为命令的参数。

public class MyEvent {

    private final String name;

    public MyEvent(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }
}

同样地,我们定义了一个 MyEvent 类来表示一条事件,并将 name 作为事件的参数。

实现命令总线和事件总线

在 Axon 框架中,需要实现 CommandGatewayEventBus 两个接口来实现命令总线和事件总线。在 Axon 框架中,Kafka 扮演了基础设施的角色,用于命令总线和事件总线的实现。

@Configuration
public class AxonKafkaConfig {

    @Bean
    public CommandGateway commandGateway(KafkaMessageSender<String, String> kafkaMessageSender) {
        return DefaultCommandGateway.builder()
                .commandBus(KafkaCommandBus.builder()
                        .messageSender(kafkaMessageSender)
                        .build())
                .build();
    }

    @Bean
    public EventBus eventBus(KafkaMessageSource<String, String> kafkaMessageSource) {
        return KafkaEventBus.builder()
                .messageSource(kafkaMessageSource)
                .build();
    }

    @Bean
    public KafkaMessageSource<String, String> kafkaMessageSource(KafkaProperties kafkaProperties,
                                                                 ConsumerFactory<String, String> consumerFactory) {
        return KafkaMessageSource.<String, String>builder()
                .topics(Collections.singletonList(kafkaProperties.getEventTopic()))
                .groupId(kafkaProperties.getEventGroup())
                .pollTimeout(Duration.ofMillis(100))
                .buildWithDefaultConfigurer(consumerFactory);
    }

    @Bean
    public KafkaMessageSender<String, String> kafkaMessageSender(KafkaTemplate<String, String> kafkaTemplate) {
        return KafkaMessageSender.<String, String>newBuilder()
                .kafkaTemplate(kafkaTemplate)
                .build();
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
        return new DefaultKafkaConsumerFactory<>(
                kafkaProperties.buildConsumerProperties(),
                new StringDeserializer(),
                new StringDeserializer()
        );
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(KafkaProperties properties) {
        return new KafkaTemplate<>(properties.buildProducerProperties());
    }
}

这里我们定义了一个 AxonKafkaConfig 类来实现 Kafka 的配置。在 AxonKafkaConfig 中,我们使用 CommandGatewayEventBus 接口来定义命令总线和事件总线。在 AxonKafkaConfig 类中,我们还使用 KafkaMessageSourceKafkaMessageSender 两个类来实现数据传输。在 AxonKafkaConfig 类中,我们还定义了消费者工厂 ConsumerFactoryKafkaTemplate

发布命令和事件

在 Axon 框架中,可以使用 CommandGatewayEventBus 来发布命令和事件。

@RestController
@RequestMapping("/api")
public class MyController {

    private final CommandGateway commandGateway;
    private final EventBus eventBus;

    @Autowired
    public MyController(CommandGateway commandGateway, EventBus eventBus) {
        this.commandGateway = commandGateway;
        this.eventBus = eventBus;
    }

    @PostMapping("/my-command")
    public String publishCommand(@RequestBody MyCommand command) {
        commandGateway.send(command);
        return "Command sent successfully";
    }

    @PostMapping("/my-event")
    public String publishEvent(@RequestBody MyEvent event) {
        eventBus.publish(Collections.singletonList(event));
        return "Event sent successfully";
    }
}

我们使用 CommandGatewayEventBus 来发布命令和事件。在这里,我们定义了一个 MyController 类来实现发布命令和事件的 API,通过 HTTP POST 请求方式发送命令和事件。

至此,在 Axon 框架中使用 Kafka 实现微服务之间的异步通信的过程已经完成,该方案可以帮助我们实现高效、可靠和可扩展的数据传输,从而提高微服务的性能。