欢迎光临
我一直在奋斗

一次发送邮件引发的线程问题

1.程序大致逻辑流程

出问题的代码主要是通过开放一个接口,然后当其他的服务调用这个的时候。能够推送相应的信息,如短信,邮件,系统消息。而我的处理方式是通过加上异步注解@Async来进行的,这也为了后面的坑。

2.问题出现

在本地操作的时候,可能由于处理的数量不是很大,程序并没有阻塞。在压力测试的时候也没有问题(这个地方有点迷)。到了线上,正常运行了差不多10天左右,用户反馈说收不到验证码了。我马上查看程序,并没有看见异常。查看线上的rabbitmq,发现有1条unack的消息。由于当时线上的就只开了一个消费者,默认的percount也是1.所以队列就卡在那里了。当时只有紧急重启消费者,重新消费,这一次一起正常。当时也是比较郁闷的,首先发现服务器日志上有channel shutdown的warn警告。查询了一下,大概的原因是由于spring-rabbit构建了一个CachingConnectionFactory。默认值是25个连接,多余这个就会逻辑关闭和物理关闭。Channel这个概念应该熟悉,可以认为是一个连接通道,CachingConnectionFactory中默认最多可以缓存25个Channel,也可以通过方法setChannelCacheSize()设置。如果进行物理关闭的话,在Channel的消息就无法正常确认。会提示:

PRECONDITION_FAILED - unknown   delivery tag

查看代码,发现是由于在代码里面没有加上我们自定义的connectionFactory类,导致了我们的手动确认和导入的包的自动确认。解决了这个问题后,再次将程序上线,这次加上了50个消费者。就算一个消费者堵住,其他的任然可以消费。

2.纳尼!问题再次出现

过了差不多一周,线上的服务器rabbitm队列又出现unack了。并且程序ack的速度很慢,从10分钟到一个小时不等。我判断我的代码逻辑是没有问题的,应该是线程阻塞了。直接进入线上服务器,切入docker中。使用jps 查看阻塞的进程pid。执行操作 jstack id号 >> jstack.txt。导入到本地进行查看。


3.发现端倪

这样看不好看出具体问题的,这里我们使用IMB的ThreadDump软件查看。推荐给大家一个工具IBM Thread and Monitor Dump Analyzer for Java(点击下载)

我们发现其中有大量的Waiting on condition等待,它们都是调用的sun.misc.Unsafe.park(Native Method)方法。在进入详情页面查看,大多数都是

结合自己的代码判断应该是

@Async
    public void consumeQueueMessage(QueueMessage queueMessage, PushType pushType) {

        if (!QueueType.SEND_QUEUE.equals(queueMessage.getQueueType())) {
            LOG.debug("消息所在的队列类型是[{}],不是当前队列类型,不作处理", queueMessage.getQueueType()
                    .name());
            return;
        }
        if (!MessageType.NORMAL.equals(queueMessage.getMessageType())) {
            LOG.debug("消息类型为:[{}],不是当前队列中的消息类型,不作处理", queueMessage.getMessageType()
                    .name());
            return;
        }
        Assert.notNull(queueMessage, "待推送消息为空");
        Assert.notNull(queueMessage.getPushData(), "待推送的数据为空");
        MultisendResp resp = new MultisendResp();
        List<PushResult> results = new ArrayList<>();
        for (PushData pushData : queueMessage.getPushData()) {
            LOG.debug("调用推送服务,推送每个目标对象,将推送结果放入结果列表");
            PushResult pushResult = pushService.send(pushData);
            results.add(pushResult);
        }
        LOG.debug("将推送结果组合成统一响应结构");
        resp.setCode(PusherCode.SUCCESS.getCode());
        resp.setMessage(PusherCode.SUCCESS.getMessage());
        resp.setPushResults(results);
        resp.setMessageTag(queueMessage.getMessageTag());
        LOG.debug("将推送结果通过Topic交换器发布出去,发送到对应的结果队列中去");
        switch (pushType) {
        case SMS_PUSH:
            queueTemplate.getResultTemplate()
                    .convertAndSend(RouteKey.SMS_RESULT_QUEUE_KEY.getKey(), resp);

            break;
        case EMAIL_PUSH:
            queueTemplate.getResultTemplate()
                    .convertAndSend(RouteKey.EMAIL_RESULT_QUEUE_KEY.getKey(), resp);
            break;
        case WS_PUSH:
            queueTemplate.getResultTemplate()
                    .convertAndSend(RouteKey.WS_RESULT_QUEUE_KEY.getKey(), resp);
            break;

        default:
            LOG.error("出现未知队列类型错误。SendResultQueueServiceImpl.consumeQueueMessage");
            break;
        }

    }

}

4.陷入深思

去掉了@Async这个注解后,没过几天就又出现了队列unackd的情况。由于队列当时还在堵着,我直接使用导出ThreadDump日志进行分析。

"SimpleAsyncTaskExecutor-5" #49 prio=5 os_prio=0 tid=0x000000ffe0d5f000 nid=0xe7 runnable [0x00007f1298588000]
   java.lang.Thread.State: RUNNABLE
        at java.net.PlainSocketImpl.socketConnect(Native Method)
        at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
        - locked <0x00000000fae3fcf8> (a java.net.SocksSocketImpl)
        at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
        at java.net.Socket.connect(Socket.java:589)
        at com.sun.mail.util.SocketFetcher.createSocket(SocketFetcher.java:317)
        at com.sun.mail.util.SocketFetcher.getSocket(SocketFetcher.java:233)
        at com.sun.mail.smtp.SMTPTransport.openServer(SMTPTransport.java:1938)
        at com.sun.mail.smtp.SMTPTransport.protocolConnect(SMTPTransport.java:642)
        - locked <0x00000000fae3e1e8> (a com.sun.mail.smtp.SMTPTransport)
        at javax.mail.Service.connect(Service.java:317)
        - locked <0x00000000fae3e1e8> (a com.sun.mail.smtp.SMTPTransport)
        at javax.mail.Service.connect(Service.java:176)
        at javax.mail.Service.connect(Service.java:125)
        at jodd.mail.SendMailSession.open(SendMailSession.java:76)
        at cn.signit.wesign.mss.pusher.util.msg.email.EmailDirectSender.send(EmailDirectSender.java:191)
        at cn.signit.wesign.mss.pusher.util.msg.email.EmailDirectSender.send(EmailDirectSender.java:208)
        at cn.signit.wesign.mss.pusher.util.msg.email.EmailDirectSender.send(EmailDirectSender.java:208)
        at cn.signit.wesign.mss.pusher.util.msg.email.EmailDirectSender.send(EmailDirectSender.java:208)
        at cn.signit.wesign.mss.pusher.util.msg.email.EmailDirectSender.send(EmailDirectSender.java:208)
        at cn.signit.wesign.mss.pusher.util.msg.email.EmailDirectSender.send(EmailDirectSender.java:208)
        at cn.signit.wesign.mss.pusher.util.msg.email.EmailDirectSender.send(EmailDirectSender.java:208)
        at cn.signit.wesign.mss.pusher.util.msg.email.EmailDirectSender.send(EmailDirectSender.java:208)
        at cn.signit.wesign.mss.pusher.util.msg.email.EmailDirectSender.send(EmailDirectSender.java:208)
        at cn.signit.wesign.mss.pusher.util.msg.email.EmailDirectSender.send(EmailDirectSender.java:208)
        at cn.signit.wesign.mss.pusher.util.msg.email.EmailDirectSender.send(EmailDirectSender.java:208)
        at cn.signit.wesign.mss.pusher.util.msg.email.EmailDirectSender.send(EmailDirectSender.java:208)
        at cn.signit.wesign.mss.pusher.util.msg.email.EmailDirectSender.send(EmailDirectSender.java:208)
        at cn.signit.wesign.mss.pusher.util.msg.email.EmailDirectSender.send(EmailDirectSender.java:208)
        at cn.signit.wesign.mss.pusher.util.msg.email.EmailDirectSender.send(EmailDirectSender.java:208)
        at cn.signit.wesign.mss.pusher.util.msg.email.EmailDirectSender.send(EmailDirectSender.java:208)
        at cn.signit.wesign.mss.pusher.util.msg.email.EmailDirectSender.send(EmailDirectSender.java:208)
        at cn.signit.wesign.mss.pusher.util.msg.email.EmailDirectSender.send(EmailDirectSender.java:208)
        at cn.signit.wesign.mss.pusher.util.msg.email.EmailDirectSender.send(EmailDirectSender.java:208)
        at cn.signit.wesign.mss.pusher.util.msg.email.EmailDirectSender.send(EmailDirectSender.java:208)
        at cn.signit.wesign.mss.pusher.util.msg.email.EmailDirectSender.send(EmailDirectSender.java:208)
        at cn.signit.wesign.mss.pusher.util.msg.email.EmailDirectSender.send(EmailDirectSender.java:208)
        at cn.signit.wesign.mss.pusher.util.msg.email.EmailDirectSender.send(EmailDirectSender.java:208)
        at cn.signit.wesign.mss.pusher.util.msg.email.EmailDirectSender.send(EmailDirectSender.java:208)
        at cn.signit.wesign.mss.pusher.util.msg.email.EmailDirectSender.send(EmailDirectSender.java:208)
        at cn.signit.wesign.mss.pusher.util.msg.email.EmailDirectSender.send(EmailDirectSender.java:208)
        at cn.signit.wesign.mss.pusher.util.msg.email.EmailDirectSender.send(EmailDirectSender.java:208)
        at cn.signit.wesign.mss.pusher.util.msg.email.EmailDirectSender.send(EmailDirectSender.java:208)
        at cn.signit.wesign.mss.pusher.util.msg.email.EmailDirectSender$$FastClassBySpringCGLIB$$ef5f1eb5.invoke(<generated>)
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:738)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
        at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:133)
        at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:121)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:673)
        at cn.signit.wesign.mss.pusher.util.msg.email.EmailDirectSender$$EnhancerBySpringCGLIB$$29ba9a66.send(<generated>)
        at cn.signit.wesign.mss.pusher.service.impl.PushServiceImpl.emailPush(PushServiceImpl.java:339)
        at cn.signit.wesign.mss.pusher.service.impl.PushServiceImpl.send(PushServiceImpl.java:223)
        at cn.signit.wesign.mss.pusher.queue.service.impl.SendQueueServiceImpl.consumeSendQueueMessage(SendQueueServiceImpl.java:104)
        at sun.reflect.GeneratedMethodAccessor338.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:180)
        at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:112)
        at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:49)
        at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:126)
        at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:106)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:822)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:745)
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:97)
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:189)
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1276)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:726)
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1219)
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1189)
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1500(SimpleMessageListenerContainer.java:97)
        at org.springframework.amqp.rabbit.listen

可以看到程序多次执行了邮件发送的代码,结合当时写的逻辑看。出现了发送失败的情况,邮件会自动切换账户进行发送,这导致其可能会一直调用发送邮件的函数,结果是程序最终会发送邮件成功,但消耗的时间是很长的.我怀疑使用的邮件发送商通过smtp协议发送邮件,可能存在限制的情况,导致了我的发送失败,并一直调用send函数导致了程序进入自己调用自己的循环,虽然最后会发送成功并退出,但是耗时太长.


总结:JAVA的学习之路还很漫长,本次排查历时较久,主要原因是这种情况的出现十分少见,排查CPU占用率也未见异常。通过这个问题,我学到了docker的基本操作,如何通过jstack来查看线程的情况。以及常见线程的8中情况。最后,路漫漫其修远兮,吾将上下而求索共勉。

未经允许不得转载:奋斗者的足迹 » 一次发送邮件引发的线程问题
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

奋斗者的足迹

联系我们加入我们