📜  Spring – JMS 集成

📅  最后修改于: 2022-05-13 01:54:47.504000             🧑  作者: Mango

Spring – JMS 集成

JMS 是一个标准的Java API,它允许Java应用程序向另一个应用程序发送消息。它具有高度可扩展性,允许我们使用异步消息传递松散地耦合应用程序。使用 JMS,我们可以读取、发送和读取消息。

下面是一些JMS的实现如下:

  • 亚马逊 SQS
  • Apache ActiveMQ
  • JBoss 消息传递
  • 兔MQ

JMS 消息 

 一条 JMS 消息可以分为以下三部分:

  1. 标头:它包含有关消息的元数据。
  2. 属性:它可以进一步细分为三个部分——
    • Application:发送消息的Java应用程序。
    • 提供者:它由 JMS 提供者使用,并且是特定于实现的。
    • 标准属性:这些由 JMS API 定义。
  3. 有效负载:此字段是消息本身。

实现:在这里,我们将构建一个示例问候(基于 Maven)应用程序来演示如何集成和使用 JMS。为简单起见,我们将使用嵌入式服务器而不是创建另一个应用程序。

项目结构如下:

第 1 步:使用 spring initializr 创建一个 spring 项目。如下图所示创建文件夹和文件,如下所示:

项目结构

第 2 步:将以下依赖项添加到 pom.xml 文件中。

XML

           org.apache.activemq
           artemis-server
       
       
           org.apache.activemq
           artemis-jms-server
       
       
           org.springframework.boot
           spring-boot-starter-artemis
       
       
           org.springframework.boot
           spring-boot-starter-web
       
 
       
           org.springframework.boot
           spring-boot-devtools
           runtime
           true
       
       
           org.projectlombok
           lombok
           true
       
       
           org.springframework.boot
           spring-boot-starter-test
           test
       


Java
// Java Program to Illustrate Model Layer
  
package com.anuanu.springjms.model;
  
// Importing required classes
import java.io.Serializable;
import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
  
// Annotation
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
  
// Class
// Implementing Serializable interface
public class GreetingMessage implements Serializable {
  
    // Class data members
    static final long serialVersionUID
        = -7462433555964441775L;
    private UUID id;
    private String message;
}


Java
// Java Program to Illustrate Embedded Server Configuration
  
package com.anuanu.springjms;
  
// Importing required classes
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
  
@SpringBootApplication
public class SpringJmsApplication {
  
    // Main driver method
    public static void main(String[] args) throws Exception
    {
  
        // Embedded Server Configuration
        ActiveMQServer activeMQServer
            = ActiveMQServers.newActiveMQServer(
                new ConfigurationImpl()
                    .setPersistenceEnabled(false)
                    .setJournalDirectory(
                        "target/data/journal")
                    .setSecurityEnabled(false)
                    .addAcceptorConfiguration("invm",
                                              "vm://0"));
  
        activeMQServer.start();
        SpringApplication.run(SpringJmsApplication.class,
                              args);
    }
}


Java
// Java Program to Illustrate Task Configuration
  
package com.anuanu.springjms.config;
  
// Importing required classes
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
  
// Annotations
@EnableScheduling
@EnableAsync
@Configuration
  
// Class
public class TaskConfig {
  
    // Task Configuration
    @Bean TaskExecutor taskExecutor()
    {
        return new SimpleAsyncTaskExecutor();
    }
  
    // taskExecutor bean is injected into spring context,
    // and spring will use it to execute tasks for us
}


Java
// Java Program to Illustrate Task Configuration
  
package com.anuanu.springjms.config;
  
// Importing required classes
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;
  
// Annotations
@Configuration
  
// Class
public class JmsConfig {
  
    // Class data member
    public static final String QUEUE = "greet";
  
    // Annotation
    @Bean
  
    // Class
    public MessageConverter messageConverter()
    {
  
        MappingJackson2MessageConverter converter
            = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
  
        return converter;
    }
  
    // Enabling spring to take jms messages and flip it
    // to a json message. then it can read
    // that jms message as a jms text message and
    // convert it back to java object
}


Java
// Java Program to Illustrate Sending JMS Messages
  
package com.anuanu.springjms.sender;
  
// Importing required classes
import com.anuanu.springjms.config.JmsConfig;
import com.anuanu.springjms.model.GreetingMessage;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
  
// Annotations
@RequiredArgsConstructor
@Component
  
// Class
public class MessageSender {
  
    // Class data member
    private final JmsTemplate jmsTemplate;
    private static Integer ID = 1;
  
    // Annotation
    @Scheduled(fixedRate = 2000)
  
    // Method
    public void sendMessage()
    {
        // Display command
        System.out.println("Greetings user");
  
        GreetingMessage message
            = GreetingMessage.builder()
                  .id(UUID.randomUUID())
                  .message("Greetings user " + ID++)
                  .build();
  
        jmsTemplate.convertAndSend(JmsConfig.QUEUE,
                                   message);
  
        // Display command
        System.out.println("Message sent!!!");
    }
}


Java
// Java Program to Illustrate Receiving JMS Messages
  
package com.anuanu.springjms.listener;
  
// Importing required classes
import com.anuanu.springjms.config.JmsConfig;
import com.anuanu.springjms.model.GreetingMessage;
import javax.jms.Message;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
  
// Annotation
@Component
  
// Class
public class MessageListener {
  
    @JmsListener(destination = JmsConfig.QUEUE)
  
    public void
    listen(@Payload GreetingMessage greetingMessage,
           @Headers MessageHeaders messageHeaders,
           Message message)
    {
  
        // Display command
        System.out.println("Greeting Received!!!");
  
        System.out.println(greetingMessage);
    }
}


模型层

第 3 步:创建一个简单的 POJO(Plain old Java类),它将作为我们发送和接收消息的模型。在这里,我们使用 Lombok 来减少样板代码。使用的注解如下:

  1. @Data:此注解为所有字段生成 getter、setter、toString 方法和 equals & hashCode 方法。
  2. @Builder:此注解使用构建器模式来自动构建复杂的构建器 API。
  3. @NoArgsConstructor:这个注解生成一个没有参数的构造函数。
  4. @AllArgsConstructor:这个注解生成一个带有所有参数的构造函数。

例子:

Java

// Java Program to Illustrate Model Layer
  
package com.anuanu.springjms.model;
  
// Importing required classes
import java.io.Serializable;
import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
  
// Annotation
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
  
// Class
// Implementing Serializable interface
public class GreetingMessage implements Serializable {
  
    // Class data members
    static final long serialVersionUID
        = -7462433555964441775L;
    private UUID id;
    private String message;
}

所需配置如下:

A. 嵌入式服务器配置

如前所述,我们将创建一个嵌入式服务器来演示 JMS 消息传递。我们将使用 ActiveMQServers 来创建我们的嵌入式服务器。这是 newActiveMQServer 的方法签名:

Modifier and TypeMethods and Description
static ActiveMQServernewActiveMQServer(Configuration config)

当我们的 Spring Boot 应用程序启动时,嵌入式服务器也会启动。 (因为我们在服务器上调用了 start() 方法)。

例子:

Java

// Java Program to Illustrate Embedded Server Configuration
  
package com.anuanu.springjms;
  
// Importing required classes
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
  
@SpringBootApplication
public class SpringJmsApplication {
  
    // Main driver method
    public static void main(String[] args) throws Exception
    {
  
        // Embedded Server Configuration
        ActiveMQServer activeMQServer
            = ActiveMQServers.newActiveMQServer(
                new ConfigurationImpl()
                    .setPersistenceEnabled(false)
                    .setJournalDirectory(
                        "target/data/journal")
                    .setSecurityEnabled(false)
                    .addAcceptorConfiguration("invm",
                                              "vm://0"));
  
        activeMQServer.start();
        SpringApplication.run(SpringJmsApplication.class,
                              args);
    }
}

C. 任务配置

taskConfig 类将帮助我们异步执行任务。这里的任务是以固定的时间间隔发送消息。为了以固定的时间间隔发送消息,我们使用 @EnableScheduling 注解启用了调度程序。使用的注解如下:  

  1. @EnableScheduling:此注解为我们的应用程序启用调度程序。
  2. @EnableAsync:这个注解使spring能够在后台线程池中运行@Async方法。
  3. @Configuration:这个注解表示指定的类有@Bean定义方法。
  4. @Bean:它是一个方法级别的注解,用于显式声明一个bean。

例子:

Java

// Java Program to Illustrate Task Configuration
  
package com.anuanu.springjms.config;
  
// Importing required classes
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
  
// Annotations
@EnableScheduling
@EnableAsync
@Configuration
  
// Class
public class TaskConfig {
  
    // Task Configuration
    @Bean TaskExecutor taskExecutor()
    {
        return new SimpleAsyncTaskExecutor();
    }
  
    // taskExecutor bean is injected into spring context,
    // and spring will use it to execute tasks for us
}

D. 消息转换器配置

jmsConfig 类为生产者和消费者分别提供生产和消费消息的通用流。它还提供了一个用于转换Java对象和 JMS 消息的 bean。使用的注解如下:

  1. @Configuration :这个注解表示指定的类有@Bean 定义方法。
  2. @Bean :它是一个方法级别的注解,用于显式声明一个 bean。

例子:

Java

// Java Program to Illustrate Task Configuration
  
package com.anuanu.springjms.config;
  
// Importing required classes
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;
  
// Annotations
@Configuration
  
// Class
public class JmsConfig {
  
    // Class data member
    public static final String QUEUE = "greet";
  
    // Annotation
    @Bean
  
    // Class
    public MessageConverter messageConverter()
    {
  
        MappingJackson2MessageConverter converter
            = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
  
        return converter;
    }
  
    // Enabling spring to take jms messages and flip it
    // to a json message. then it can read
    // that jms message as a jms text message and
    // convert it back to java object
}

发送 JMS 消息

MessageSender 类(如下所述)主要用于创建消息并将消息生成/发送到消费者可以消费消息的公共流。我们使用了 JmsTemplate,它是一个帮助类,使我们更容易通过 JMS 接收和发送消息。我们还用 @Scheduled 注释了这个类,其值为 2000 ms,它告诉调度程序每隔 2 秒发送一次消息。使用的注解如下:  

  1. @RequiredArgsConstructor :此注释生成一个带有必需参数的构造函数,即具有最终字段或具有其他约束的字段的参数。
  2. @Component:这个注解将我们的类标记为允许spring检测任何自定义bean的组件。
  3. @Scheduled:这个注解标记了一个要被调度的方法。它必须具有这些 cron()、fixedDelay() 或 fixedRate() 属性中的任何一个。

例子:

Java

// Java Program to Illustrate Sending JMS Messages
  
package com.anuanu.springjms.sender;
  
// Importing required classes
import com.anuanu.springjms.config.JmsConfig;
import com.anuanu.springjms.model.GreetingMessage;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
  
// Annotations
@RequiredArgsConstructor
@Component
  
// Class
public class MessageSender {
  
    // Class data member
    private final JmsTemplate jmsTemplate;
    private static Integer ID = 1;
  
    // Annotation
    @Scheduled(fixedRate = 2000)
  
    // Method
    public void sendMessage()
    {
        // Display command
        System.out.println("Greetings user");
  
        GreetingMessage message
            = GreetingMessage.builder()
                  .id(UUID.randomUUID())
                  .message("Greetings user " + ID++)
                  .build();
  
        jmsTemplate.convertAndSend(JmsConfig.QUEUE,
                                   message);
  
        // Display command
        System.out.println("Message sent!!!");
    }
}

发送消息

接收 JMS 消息

MessageListener 类(如下所述)充当消费者,即它消费/接收驻留在公共流中且尚未消费的消息。公共流“JmsConfig.QUEUE”的位置被传递给@JmsListener 中的目标方法。使用的注解如下:  

  1. @JmsListener:此注解将方法标记为指定destination() 上JMS 消息侦听器的目标。
  2. @Component:这个注解将我们的类标记为允许spring检测任何自定义bean的组件。
  3. @Payload:该注解标志着要提取的消息的有效载荷为注解参数。
  4. @Headers:此注解提取 Map 中的所有标题。它在这里与实现 Map 并用于消息头的 MessageHeaders 类一起使用。

例子:

Java

// Java Program to Illustrate Receiving JMS Messages
  
package com.anuanu.springjms.listener;
  
// Importing required classes
import com.anuanu.springjms.config.JmsConfig;
import com.anuanu.springjms.model.GreetingMessage;
import javax.jms.Message;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
  
// Annotation
@Component
  
// Class
public class MessageListener {
  
    @JmsListener(destination = JmsConfig.QUEUE)
  
    public void
    listen(@Payload GreetingMessage greetingMessage,
           @Headers MessageHeaders messageHeaders,
           Message message)
    {
  
        // Display command
        System.out.println("Greeting Received!!!");
  
        System.out.println(greetingMessage);
    }
}

接收消息

输出:

消息每 2 秒发送和接收一次。为每条新消息传递一个唯一的用户名和 ID。

输出 - 1

检查 messageListener 类:

下图向我们展示了在 messageListener 类中接收到的 jms 消息。可以在标题字段中自定义各种属性。请注意我们在标题中添加的一个键值对。

'_type' -> 'com.anuanu.springjms.model.GreetingMessage'