猪小花1号

个人签名

282篇博客

微服务端点之间的通信(2)— 同步通信中篇

猪小花1号2018-12-21 16:57
3.使用消息代理的方式实现异步通信
为了讲解消息代理的异步通信实现方式,让我们以一个名为“Crispy Bun”的汉堡店作为例子。这是一个得来速式的汉堡店,顾客可以在一个窗口下好订单之后,在下一个窗口等着订单,中途一直不用下车。这个汉堡店的订单系统可以在第一个窗口处接收订单,然后将订单信息封装成消息或者事件之后,推送到某个队列或者主题。在每个厨师面前有一个组件用于展示所有用户的订单。我们的需求是订单提交到某个中间代理(队列或主题)之后,所有厨师都可以监听这些新来的消息。即便厨师都在忙,订单也不会从列表中消失,如果某个厨师看完订单信息开始准备订单内容的时候,就将订单保存下来,从代办列表中撤离。这个例子还可以展开很多细节,例如同一主题最多的订阅数,再如订单消息可以同时发给厨师和打包团队,在厨师准备食物的时候,打包团队可以准备些盒子用于装汉堡,又或者如果订单中有饮料,还需要同时发给饮料部门,同时准备配制饮料。所以不同组件可以同时监听订单消息,然后各自采取相应的行动。
简单起见,只考虑只有一个接订单组件和一个厨师组件的简单示例。RabbitMQ是目前最为流行的实现了高级消息队列协议(Advanced Message QueuingProtocal,AMQP)协议的消息代理解决方案之一。它是基于Erlang语言的,跟普通消息代理直接将消息发布到队列中不同的是,RabbitMQ中需要先经过一个消息交换机。交换机将消息按照属性、连接和路由队列分发路由给不同队列的组件。一个交换机上可以有多个不同的队列,通过不同的路由键进行区分。例如,在我们的例子中,订单中可能会有饮料,然后相应地我们需要两种绑定键来区分,即绑定Chef_key和绑定All_key。其中绑定All_key会发给厨师和饮料两个队列,而绑定Chef_key则只发给厨师队列。如果订单中没有饮料的话,只需要发给厨师队列,会使用绑定Chef_key,如图3-8所示。

如果对RabbitMQ不熟悉,强烈建议先学习一些与RabbitMQ相关的知识。下面是在Unix型操作系统的机器上安装RabbitMQ的步骤的简单描述。
(1)执行sudo apt-get update命令。
(2)使用下面的命令添加RabbitMQ应用代码库:
echo "deb http://www.rabbitmq.com/debian/ testing main" >>/etc/apt/sources.list
curl http://www.rabbitmq.com/rabbitmq-signing-key-public.asc |sudo apt-key add
(3)使用sudo apt-get update命令再次更新。
(4)运行sudo apt-get install rabbitmq-server命令来安装RabbitMQ。
执行完上述这些操作之后,会在机器上安装Erlang和RabbitMQ,并且启动RabbitMQ服务。如果服务没有自动启动,也可以使用service rabbitmq-server start命令来手动启动RabbitMQ。安装完这些之后,可能需要一个拥有管理员权限的控制台来处理RabbitMQ的集群和节点。需要通过以下命令安装RabbitMQ插件:sudo rabbotmq-plugins enable rabbitmq_management执行完上述命令之后,就可以在浏览器中打开http://<你的本机ip或者localhost>:15672来查看RabbitMQ的管理控制台,大概如图3-9所示。打开管理控制台需要用户名和密码,默认都是
guest。

前面已经提过,对这个示例而言需要两个应用,一个是消息的生产者,另一个是消费者。目前RabbitMQ已经启动,还需要给生产者写一点代码,让它能真正产生订单,并且将订单消息提交到默认的交换机中去。默认的交换机没有名字,是一个空字符串"",此时消息会被转发到名为crispyBunOrder的队列中去。
订单生产者的pom.xml文件如下:
<?xml version="1.0" encoding="UTF-8"?> 
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.practicalMircorservice</groupId>
<artifactId>EventProducer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>EventProducer</name>
<description>com.practicalMircorservice </description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.1.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8
</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>

<version>Camden.SR1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

订单生产者应用中,还需要有下面这两个Java文件:
EventProducerApplication.java:这个文件是应用程序的主应用类,我们会将REST控制器直接放在主应用类中。
CrispyBunOrder.java:这个是用来定义订单对象的类。EventProducerApplication.java的代码如下:
@SpringBootApplication
@EnableBinding
@RestController
public class EventProducerApplication {
private final String Queue = "crispyBunOrder";
@Autowired
private RabbitTemplate rabbitTemplate;
public static void main(String[] args) {
SpringApplication.run(EventProducerApplication.class, args);
}
@RequestMapping(method = RequestMethod.POST, value = "/orders/{orderId}")
public
void placeOrder(@PathVariable("ord
erId") UUID orderId, @RequestParam
("itemId") Integer
itemId,@RequestParam("userName") String userName) {
CrispyBunOrder orderObject = createOrder(orderId,itemId,userName);
rabbitTemplate.convertAndSend(Queue,orderObject);
}

private CrispyBunOrder createOrder(UUID orderId,Integer itemId,
String userName){
CrispyBunOrder order = new CrispyBunOrder();
order.setItemId(itemId);
order.setOrderId(orderId);
order.setUserName(userName);
order.setOrderPlacedTime(new Date());
return order;
}
}


这个类的主要业务方法中,会接受一组orderId、itemId和userName参数,然后创建一个订单,接着将该订单提交到名为crispyBunOrder的队列中。因为我们已经在前面的项目对象模型(Project Object Model,POM)文件中添加了与RabbitMQ相关的依赖,Spring Boot会自动创建一个RabbitMQ模板。有了RabbitMQ模板,就可以将任何对象发送到给定的队列名中去。这里其实是发给默认交换机的,默认交换机没有名字,只是一个空字符串""。因此,任何发往默认交换机的消息都会直接定向到相应名字的队列。
CrispyBunOrder类有4个属性,其内容如下:
package com.practicalMircorservices.eventProducer;
import java.io.Serializable;
import java.util.Date;
import java.util.UUID;
public class CrispyBunOrder implements Serializable{
/**
*
*/
private static final long serialVersionUID = 6572547218488352566L;
private UUID orderId;
private Integer itemId;
private Date orderPlacedTime;
private String userName;
public UUID getOrderId() {
return orderId;
}
public void setOrderId(UUID orderId) {
this.orderId = orderId;
}
public Integer getItemId() {

return itemId;
}
public void setItemId(Integer itemId) {
this.itemId = itemId;
}
public Date getOrderPlacedTime() {
return orderPlacedTime;
}
public void setOrderPlacedTime(Date orderPlacedTime) {
this.orderPlacedTime = orderPlacedTime;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
}

  
该应用的application.properties配置文件中有如下两个属性:spring.rabbitmq.host=localhostspring.rabbitmq.port=56725672是RabbitMQ的默认端口。现在轮到了消费者一方了。消费者是另外一个名为EventConsumerApplication全新的应用。这个应用可以直接从start.sprig.io上生成之后下载。下载之前要确保勾选了Stream Rabbit复选框。在这个应用中,需要有一个与生产者一模一样的CrispyBunOrder类,因为这里需要对这个类的数据进行反序列化。除此之外,还应该有一个监听器,用于监听RabbitMQ中名为crispyBunOrder的队列。这个类的代码如下:
package com.practicalMircorservices.eventProducer;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.handler.annotation.Payload;
@SpringBootApplication

@RabbitListener(queues = "crispyBunOrder")
@EnableAutoConfiguration
public class EventConsumerApplication {
@Bean
public Queue crispyBunOrderQueue() {
return new Queue("crispyBunOrder");
}
@RabbitHandler
public void process(@Payload CrispyBunOrder order) {
StringBuffer SB = new StringBuffer();
SB.append("New Order Received : \n");
SB.append("OrderId : " + order.getOrderId());
SB.append("\nItemId : " + order.getItemId());
SB.append("\nUserName : " + order.getUserName());
SB.append("\nDate : " + order.getOrderPlacedTime());
System.out.println(SB.toString());
}
public static void main(String[] args) throws Exception {
SpringApplication.run(EventConsumerApplication.class, args);
}
}

这里我们通过@RabbotListener(queues = "crispyBunOrder")注解定义了应该监听的队列名称。除此之外,我们还可以在这里定义多个其他参数,如交换机名字、路由键等。例如,下面这样的注解:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "myQueue", durable = "true"),
exchange = @Exchange(value = "auto.exch"),
key = "orderRoutingKey")
)


在不同控制台中使用mvn spring-boot:run分别启动这两个应用/组件。确保应用的application.properties文件中的server.port属性使用的是两个不同的端口,否则会有端口冲突。现在可以在命令行中使用curl命令访问产生订单应用的URL来测试返回值。例如,下面这样一个命令:
curl -H "Content-Type: application/x-www-form-urlencoded" --data "itemId=
1&userName=john"
http://localhost:8080/orders/02b425c0-da2b-445d-8726-3cf4dcf4326d;
在执行完这个命令后,就能在消费者的控制台中看到订单信息了。通过这个例子可以对基于消息的异步通信方式有个简单的理解。在实际生活中有很多复杂的例子,例如,从网站获得用户操作、股票价格的变动等。大部分时候使用消息代理都比较方便,例如,对于一个输入的数据流,需要基于路由的键来将数据分发给不同队列等类似场景。Kafka是另外一个不错的解决这种问题的工具。Spring提供了对Kafka、RabbitMQ和一些其他消息代理的内置支持。有了Spring的内置支持,开发人员可以快速搭建和开发一个基于消息代理的应用。


原文网址:https://www.epubit.com/book/detail/27566
内容来源:异步社区;版权属【人民邮电出版社 异步社区】所有,转载已获得授权;未经授权,不得以任何方式复制和传播本书内容,如需转载请联系异步社区。