欢迎光临
我一直在奋斗

spring-cloud-stream学习以及常见的坑

stream这个项目能够让我们不必通过繁琐的自定义ampq来建立exchange,通道名称,以及队列名称和路由方式。只需要简单几步我们就轻松使用stream完成推送到rabbitmq和kafafa,并完成监听工作。

首先,stream提供了默认的输入和输出通过。如果我们不需要多个通道,可以通过@Enbalebing(Sink.Class)来绑定输入通道。对应的application里面的

  # rabbitmq默认地址配置
  rabbitmq:
    host: asdf.me
    port: 5672
    username: guest
    password: guest
  cloud:
    stream:
      bindings:
        input:
         destination: push-exchange
        output:
          destination: push-exchange

这样会自动建立一个exchange为push-exchange名字的输出通道。同理@Enbalebing(Input.Class)是绑定输入通道的。下面创建一个生产者和消费者:


@EnableBinding(Source.class)
public class Producer {
    @Autowired
    @Output(Source.OUTPUT)
    private MessageChannel channel;

    public void send() {
        channel.send(MessageBuilder.withPayload("22222222222" + UUID.randomUUID().toString()).build());
    }

@EnableBinding(Sink.class)
public class Consumer {
    @StreamListener(Sink.INPUT)
    public void receive(Message<String> message) {
        System.out.println("接收到MQ消息:" + JSONObject.toJSONString(message));
    }
}

stream默认提供的消费者和生产者接口:

public interface Sink {
  String INPUT = "input";

  @Input(Sink.INPUT)
  SubscribableChannel input();
}

public interface Source {
    String OUTPUT = "output";

    @Output("output")
    MessageChannel output();
}

可以看出,会去找到我们在application.yaml里面定义的input,output下面的destination。分别作为输入和输出通道。我们也可以自己定义接口来实现:

   String WS_INPUT = "ws-consumer";
    String EMAIL_INPUT = "email-consumer";
    String SMS_INPUT = "sms-consumer";
    @Input(MqMessageInputConfig.EMAIL_INPUT)
    SubscribableChannel emailChannel();

    @Input(MqMessageInputConfig.WS_INPUT)
    SubscribableChannel wsChannel();

    @Input(MqMessageInputConfig.SMS_INPUT)
    SubscribableChannel smChannel();

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface MqMessageOutputConfig {
    String MESSAGE_OUTPUT = "message-producter";

    @Output(MqMessageOutputConfig.MESSAGE_OUTPUT)
    MessageChannel outPutChannel();

}

坑1. 需要注意的是,最好不要自定义输入输出在同一个类里面。这样,如果我们只调用生产者发送消息。会导致提示Dispatcher has no subscribers for channel。并且会让我们发送消息的次数莫名减少几次。详细情况可以查看gihub官方issue,也给出的这种解决方法官方解决方式

建立一个testjunit类,然后使用生产者发送消息。消费者监听队列获取消息。

接收到MQ消息:{"headers":{"amqp_receivedDeliveryMode":"PERSISTENT","amqp_receivedRoutingKey":"my-test-channel","amqp_receivedExchange":"my-test-channel","amqp_deliveryTag":1,"amqp_consumerQueue":"my-test-channel.anonymous.vYA2O6ZSQE-S9MOnE0ZoJQ","amqp_redelivered":false,"id":"805e7fc3-a046-e07a-edf5-def58d9c8eab","amqp_consumerTag":"amq.ctag-QwsmRKg5f0DGSp-7wbpYxQ","contentType":"text/plain","timestamp":1523930106483},"payload":"22222222222a7d24456-5b11-4c25-9270-876e7bbc556a"}

坑2.stream生成的exchang默认是topic模式。就是按照前缀匹配,发送消息给对应的队列。

*(星号):可以(只能)匹配一个单词

#(井号):可以匹配多个单词(或者零个)

fanout:广播模式,发送到所有的队列

direct:直传。完全匹配routingKey的队列可以收到消息。

坑3.默认消息异常之后,都会往死消息队列里面写,然而异常是放到一个header里面去的。默认消息队列支持的最大frame_max 是128kb,超过这个大小,服务器就主动给你关闭连接,然后把你的消息会不断的重试。

坑4.看到国内好多博客,使用@Input和@output都是用MessageChannel,这是不对的。@Output对MessageChannel,@input对应SubscribableChannel 。切记


坑5.我使用的stream版本是1.2.1,springboot版本时1.5.6。没有办法使用routingkey属性,即在spring.cloud.stream.rabbit这个属性无法显示。应该是我的stream版本偏低吧。遇到这种情况,大家果断换新版本,或者使用自带的ampq来实现吧。

坑6.stream的destination对应生成rabbitmq的exchange。加上了group后,例如destination:wx-consumer,group:queue。那么经过stream后会变成wx-consumer.group。如果使用group对应的是持久化队列,不会被rabbitmq删除。

未经允许不得转载:奋斗者的足迹 » spring-cloud-stream学习以及常见的坑
分享到: 更多 (0)

评论 1

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
  1. #1

    不错,加油。

    过滤沙缸7个月前 (10-23)回复

奋斗者的足迹

联系我们加入我们