當前位置:
首頁 >
前端技术
> javascript
>内容正文
javascript
Spring Cloud Stream的使用(上)
生活随笔
收集整理的這篇文章主要介紹了
Spring Cloud Stream的使用(上)
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
操作消息隊列的另一種方法,SpringCloud是Spring的組件之一,官方定義Spring Cloud Stream,給微服務應用構(gòu)建消息驅(qū)動能力的框架,下面我簡稱Stream,應用程序通過Input,或者Output,來與Stream交互,而Stream中的Binder與中間件交互,Binder是Stream的一個抽象概念,是應用與消息中間件之間,聯(lián)合記,使用Stream最大的方便之處,對于消息中間件的進一步封裝,可以做到代碼方法的無感知,并且動態(tài)的切換成連接,切換Topic,當然他也有局限,Stream支持的只有兩種,一種是RabbitMQ,另外一種是Kafka,現(xiàn)在我們就來實際使用一下,找一下感覺
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.learn</groupId><artifactId>order</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><parent><groupId>cn.learn</groupId><artifactId>microcloud02</artifactId><version>0.0.1</version></parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version><thymeleaf.version>3.0.9.RELEASE</thymeleaf.version><thymeleaf-layout-dialect.version>2.2.2</thymeleaf-layout-dialect.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-eureka</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency></dependencies><!-- 這個插件,可以將應用打包成一個可執(zhí)行的jar包 --><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
server.port=8010eureka.client.serviceUrl.defaultZone=http://admin:1234@10.40.8.152:8761/eurekaspring.application.name=order
eureka.instance.prefer-ip-address=true
eureka.instance.instance-id=${spring.application.name}:${spring.cloud.client.ipAddress}:${spring.application.instance_id:${server.port}}spring.rabbitmq.host=59.110.138.145
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672spring.cloud.stream.bindings.input.group=order
package com.learn.message;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;public interface StreamClient {// 報錯:Invalid bean definition with name 'myMessageOrdersssss' // defined in com.imooc.order.message.StreamClient: bean definition with this name already exists//解決方法:@Input和@Output不可一樣,同一服務里面的信道名字不能一樣,在不同的服務里可以相同名字的信道String INPUT = "input";String OUTPUT = "output";@Input(StreamClient.INPUT)SubscribableChannel input();@Output(StreamClient.OUTPUT)MessageChannel output();// String INPUT2 = "input2";
// String OUTPUT2 = "output2";
// @Input(StreamClient.INPUT2)
// SubscribableChannel input2();
// @Output(StreamClient.OUTPUT2)
// MessageChannel output2();
}
package com.learn.message;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;import lombok.extern.slf4j.Slf4j;@Component
@EnableBinding(StreamClient.class)
@Slf4j
public class StreamReceiver {private final Logger log = LoggerFactory.getLogger(StreamReceiver.class);@StreamListener(StreamClient.INPUT)public void process(Object message){System.out.println(message);log.info("StreamReceiver:{}",message);}/*** 接收orderDTO對象消息* */
// @StreamListener(value = StreamClient.INPUT)
// @SendTo(StreamClient.INPUT2) //消息發(fā)送給誰
// public String process(OrderDTO message){
// log.info("StreamReceiver:{}",message);
// return"收到了";
// }/*** 接收到消息之后返回的消息* */
// @StreamListener(value = StreamClient.INPUT2)
// public void process2(OrderDTO message){
// log.info("StreamReceiver2:{}",message);
// }
}
package com.learn.controller;import java.util.Date;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import com.learn.message.StreamClient;@RestController
public class SendMessageController {@Autowiredprivate StreamClient streamClient;@GetMapping("/sendMessage")public void process(){String message = "now "+new Date();streamClient.input().send(MessageBuilder.withPayload(message).build());
// streamClient.output().send(MessageBuilder.withPayload(message).build());}// @GetMapping("/sendMessage")
// public void process(){
// OrderDTO orderDTO = new OrderDTO();
// orderDTO.setOrderId("123456");
// streamClient.input().send(MessageBuilder.withPayload(orderDTO).build());
// }
}
spring-cloud-starter-stream-rabbit<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId><version>2.2.1.RELEASE</version>
</dependency>KAFKA你就改成KAFKA,第二步,這里用的是RabbitMQ,所以你要在配置文件里面配置一下,但是我們上節(jié)課已經(jīng)配置過了,不需要再配置,接下來我們就來看如何來使用Stream,來發(fā)送和接收消息,一般我們會這么來做,StreamClientlocalhost:8010/sendMessage我們已經(jīng)完成發(fā)送和接收,功能上沒有多大的問題spring.cloud.stream.bindings.myMessage.group=orderspring.cloud.stream.bindings.input.group=order
package com.learn.dto;import java.math.BigDecimal;
import java.util.List;import com.learn.dataobject.OrderDetail;public class OrderDTO {/** 訂單id. */private String orderId;/** 買家名字. */private String buyerName;/** 買家手機號. */private String buyerPhone;/** 買家地址. */private String buyerAddress;/** 買家微信Openid. */private String buyerOpenid;/** 訂單總金額. */private BigDecimal orderAmount;/** 訂單狀態(tài), 默認為0新下單. */private Integer orderStatus;/** 支付狀態(tài), 默認為0未支付. */private Integer payStatus;private List<OrderDetail> orderDetailList;public String getOrderId() {return orderId;}public void setOrderId(String orderId) {this.orderId = orderId;}public String getBuyerName() {return buyerName;}public void setBuyerName(String buyerName) {this.buyerName = buyerName;}public String getBuyerPhone() {return buyerPhone;}public void setBuyerPhone(String buyerPhone) {this.buyerPhone = buyerPhone;}public String getBuyerAddress() {return buyerAddress;}public void setBuyerAddress(String buyerAddress) {this.buyerAddress = buyerAddress;}public String getBuyerOpenid() {return buyerOpenid;}public void setBuyerOpenid(String buyerOpenid) {this.buyerOpenid = buyerOpenid;}public BigDecimal getOrderAmount() {return orderAmount;}public void setOrderAmount(BigDecimal orderAmount) {this.orderAmount = orderAmount;}public Integer getOrderStatus() {return orderStatus;}public void setOrderStatus(Integer orderStatus) {this.orderStatus = orderStatus;}public Integer getPayStatus() {return payStatus;}public void setPayStatus(Integer payStatus) {this.payStatus = payStatus;}public List<OrderDetail> getOrderDetailList() {return orderDetailList;}public void setOrderDetailList(List<OrderDetail> orderDetailList) {this.orderDetailList = orderDetailList;}@Overridepublic String toString() {return "OrderDTO [orderId=" + orderId + ", buyerName=" + buyerName + ", buyerPhone=" + buyerPhone+ ", buyerAddress=" + buyerAddress + ", buyerOpenid=" + buyerOpenid + ", orderAmount=" + orderAmount+ ", orderStatus=" + orderStatus + ", payStatus=" + payStatus + ", orderDetailList=" + orderDetailList+ "]";}}
package com.learn.dataobject;import java.math.BigDecimal;public class OrderDetail {private String detailId;/** 訂單id. */private String orderId;/** 商品id. */private String productId;/** 商品名稱. */private String productName;/** 商品單價. */private BigDecimal productPrice;/** 商品數(shù)量. */private Integer productQuantity;/** 商品小圖. */private String productIcon;public String getDetailId() {return detailId;}public void setDetailId(String detailId) {this.detailId = detailId;}public String getOrderId() {return orderId;}public void setOrderId(String orderId) {this.orderId = orderId;}public String getProductId() {return productId;}public void setProductId(String productId) {this.productId = productId;}public String getProductName() {return productName;}public void setProductName(String productName) {this.productName = productName;}public BigDecimal getProductPrice() {return productPrice;}public void setProductPrice(BigDecimal productPrice) {this.productPrice = productPrice;}public Integer getProductQuantity() {return productQuantity;}public void setProductQuantity(Integer productQuantity) {this.productQuantity = productQuantity;}public String getProductIcon() {return productIcon;}public void setProductIcon(String productIcon) {this.productIcon = productIcon;}@Overridepublic String toString() {return "OrderDetail [detailId=" + detailId + ", orderId=" + orderId + ", productId=" + productId+ ", productName=" + productName + ", productPrice=" + productPrice + ", productQuantity="+ productQuantity + ", productIcon=" + productIcon + "]";}}
?
總結(jié)
以上是生活随笔為你收集整理的Spring Cloud Stream的使用(上)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。