一 背景
微服务重构
从spring boot 1.5.10 升级到 spring boot 2.2.1
对应的spring cloud版本也升级到了2.2.x
服务从spring boot web 换成了 spring webflux
其他底层业务逻辑没做大的变更
但是在订阅rabbitmq的时候却莫名报错了如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48Endpoint [com.xx.yy.coupon.service.mq.MallCouponMqConsumer]
Method [public void com.xx.yy.coupon.service.mq.MallCouponMqConsumer.memberCouponsDraw(com.xx.yy.member.level.common.domain.GiftMessageVo) throws java.io.UnsupportedEncodingException,com.xx.yy.base.service.exception.BaseException] with argument values:
[0] [type=java.lang.String] [value={"accountId":"E90A4FA5-1571-4B95-A421-174520F426D8","memberLevel":"0102","platform":"APP","source":"http://app-hybrid.tst.xx.com/member/gift"}] , failedMessage=GenericMessage [payload=byte[146], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=member_level_send_gift_events_local.yy_coupon_member_gift_local, amqp_receivedExchange=, amqp_deliveryTag=1, deliveryAttempt=3, amqp_consumerQueue=member_level_send_gift_events_local.yy_coupon_member_gift_local, amqp_redelivered=false, id=2b2d1431-0dfb-4a64-7684-8974d599d5f6, amqp_consumerTag=amq.ctag-WapvmXeYPSMiGFdscbaykA, sourceData=(Body:'{"accountId":"E90A4FA5-1571-4B95-A421-174520F426D8","memberLevel":"0102","platform":"APP","source":"http://app-hybrid.tst.xx.com/member/gift"}' MessageProperties [headers={}, contentType=text/plain, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=, receivedRoutingKey=member_level_send_gift_events_local.yy_coupon_member_gift_local, deliveryTag=1, consumerTag=amq.ctag-WapvmXeYPSMiGFdscbaykA, consumerQueue=member_level_send_gift_events_local.yy_coupon_member_gift_local]), contentType=text/plain, timestamp=1575890727715}]
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:64)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:127)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:170)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:198)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1300(AmqpInboundChannelAdapter.java:61)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:239)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:236)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1569)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1488)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1476)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1467)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1411)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:958)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:908)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:81)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1279)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1185)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: argument type mismatch
Endpoint [com.xx.yy.coupon.service.mq.MallCouponMqConsumer]
Method [public void com.xx.yy.coupon.service.mq.MallCouponMqConsumer.memberCouponsDraw(com.xx.yy.member.level.common.domain.GiftMessageVo) throws java.io.UnsupportedEncodingException,com.xx.yy.base.service.exception.BaseException] with argument values:
[0] [type=java.lang.String] [value={"accountId":"E90A4FA5-1571-4B95-A421-174520F426D8","memberLevel":"0102","platform":"APP","source":"http://app-hybrid.tst.xx.com/member/gift"}]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:176)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
... 29 more
Caused by: java.lang.IllegalArgumentException: argument type mismatch
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
... 31 more
订阅的java代码如下,其中使用了注解@Payload自动反射成对象:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18/**
* 会员等级升级消息
*
* @param memberLevelMsg
* @throws UnsupportedEncodingException
* @throws BaseException
*/
(value = MallCouponChannel.YY_COUPON_INPUT_MEMBER_GIFT)
public void memberCouponsDraw(@Payload GiftMessageVo memberLevelMsg) throws UnsupportedEncodingException, BaseException {
log.info("接收到会员礼包mq消息:channel={}, message={}", MallCouponChannel.YY_COUPON_INPUT_MEMBER_GIFT, JSON.toJSONString(memberLevelMsg));
MemberGiftParam memberGiftParam = new MemberGiftParam();
memberGiftParam.setAccountId(memberLevelMsg.getAccountId());
memberGiftParam.setPlatform(memberLevelMsg.getPlatform());
memberGiftParam.setMemberLevel(memberLevelMsg.getMemberLevel());
memberGiftParam.setSource(memberLevelMsg.getSource());
couponService.couponDrawForMember(memberGiftParam);
}
二 问题分析
1 rabbitmq生产者发送的消息contentType为text/plain
2 在Spring cloud 2.2.1对应的spring cloud stream对rabbitmq的消息做强制限制,content_type必须是application/json才能通过Payload注解转为对象
3 在spring cloud 1.x中没对Payload注解转换的对象的contentType类型做控制
三 问题解决方法
方法一 去掉payload注解,方法用String接收
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19/**
* 会员等级升级消息
*
* @param message
* @throws UnsupportedEncodingException
* @throws BaseException
*/
(value = MallCouponChannel.YY_COUPON_INPUT_MEMBER_GIFT)
public void memberCouponsDraw(String message) throws UnsupportedEncodingException, BaseException {
log.info("接收到会员礼包mq消息:channel={}, message={}", MallCouponChannel.YY_COUPON_INPUT_MEMBER_GIFT, message);
GiftMessageVo memberLevelMsg = JSON.parseObject(message, GiftMessageVo.class);
MemberGiftParam memberGiftParam = new MemberGiftParam();
memberGiftParam.setAccountId(memberLevelMsg.getAccountId());
memberGiftParam.setPlatform(memberLevelMsg.getPlatform());
memberGiftParam.setMemberLevel(memberLevelMsg.getMemberLevel());
memberGiftParam.setSource(memberLevelMsg.getSource());
couponService.couponDrawForMember(memberGiftParam);
}方法2 在SpringCloudStreamBindings的配置中增加contentType配置强制指定消息类型
1
2
3
4
5
6
7
8spring:
cloud:
stream:
bindings:
yy_coupon_input_member:
destination: member_level_send_gift_events
group: yy_coupon_member_gift
contentType: application/json