如何实现一个简单易用且可靠的消息队列框架?(2)
由于默认的异常处理中,捕捉异常,在专用的错误回复日志中记录错误,并且继续处理下一个消息.考虑到可能上线失败,或者上游消息格式出错,导致所有消息处理都出错,堆满错误恢复日志的情况,我们需要诉诸于报警和监控系统来解决. 3. 优雅关机由于消费者本身是一个事件驱动的服务器,类似Tomcat,Tomcat接收HTTP请求返回HTTP响应,Consumer则接收Kafka消息,然后处理业务后返回,也可以将处理结果发送到下一个消息队列.所以消费者本身是非常复杂的,除了线程模型,异常处理,性能,稳定性,可用性等都是需要思考点.既然消费者是一个后台的服务器,我们需要考虑如何优雅的关机,也就是在消费者服务器在处理消息的时候,如果关机才能不导致处理的消息中断而丢失. 优雅关机的重点在于解决如下3个问题:
第一个问题: 如果一个后台程序运行在控制台的前台,通过Ctrl + C可以发送退出信号给JVM,也可以通过kill -2 PS_IS 或者 kill -15 PS_IS发送退出信号,但是不能发送kill -9 PS_IS,否则进程会无条件强制退出.JVM收到退出信号后,会调用注册的钩子,我们通过的注册的JVM退出钩子进行优雅关机. 第二个问题: 线程分为Daemon线程和非Daemon线程,一个线程默认继承父线程的Daemon属性,如果当前线程池是由Daemon线程创建的,则Worker线程是Daemon线程.如果Worker线程是Daemon线程,我们需要在JVM退出钩子中等待Worker线程完成当前手头处理的消息,再退出JVM.如果不是Daemon线程,即使JVM收到退出信号,也得等待Worker线程退出后再退出,不会丢掉正在处理的消息. 第三个问题: 在Worker线程从Kafka服务器消费消息的时候,Worker线程可能处于阻塞,这时需要中断线程以退出,没有消息被丢掉.在Worker线程处理业务时有可能有阻塞,例如:IO,网络IO,在指定退出时间内没有完成,我们也需要中断线程退出,这时会产生一个InterruptedException,在异常处理的默认处理器中被捕捉,并写入错误日志,Worker线程随后退出. 使用指南kclient提供了三种使用方法,对于每一种方法,按照下面的步骤可快速构建Kafka生产者和消费者程序. 前置步骤1) 下载源代码后在项目根目录执行如下命令安装打包文件到你的Maven本地库.
2) 在你的项目pom.xml文件中添加对kclient的依赖. 3) 根据Kafka官方文档搭建Kafka环境,并创建两个Topic,test1和test2. 4) 然后,从Kafka安装目录的config目录下拷贝kafka-consumer.properties和kafka-producer.properties到你的项目类路径下,通常是src/main/resources目录. Java APIJava API提供了最直接,最简单的使用kclient的方法. 构建Producer示例: 构建Consumer示例: Spring环境集成kclient可以与Spring环境无缝集成,你可以像使用Spring Bean一样来使用KafkaProducer和KafkaConsumer. 构建Producer示例: 构建Consumer示例: 服务源码注解kclient提供了类似Spring声明式的编程方法,使用注解声明Kafka处理器方法,所有的线程模型、异常处理、服务启动和关闭等都由后台服务自动完成,极大程度的简化了API的使用方法,提高了开发者的工作效率. 注解声明Kafka消息处理器: 注解启动程序: 注解Spring环境配置: API简介Producer APIKafkaProducer类提供了丰富的API来发送不同类型的消息,它支持发送字符串消息,发送一个普通的Bean,以及发送JSON对象等.在这些API中可以指定发送到某个Topic,也可以不指定而使用默认的Topic.对于发送的数据,支持带Key值的消息和不带Key值的消息. 发送字符串消息: 发送Bean消息: 发送JSON对象消息: Consumer APIKafkaConsumer类提供了丰富的构造函数用来指定Kafka消费者服务器的各项参数,包括线程池策略,线程池类型,流数量等等. 使用PROPERTIES文件初始化: 使用PROPERTIES对象初始化以及消息处理器注解、消息处理机模板项目可以查看以下链接继续阅读: http://www.jianshu.com/p/304f2fd8388b 性能压测Benchmark应该覆盖推送QPS、接收处理QPS以及单线程、多线程生产者的用例. 用例1: 轻量级服务同步线程模型和异步线程模型的性能对比. 用例2: 重量级服务同步线程模型和异步线程模型的性能对比. 用例3: 重量级服务异步线程模型中所有消费者流共享线程池和每个流独享线程池的性能对比. 用例4: 重量级服务异步线程模型中每个流独享线程池的对比的确定数量线程的线程池和线程数量可伸缩的线程池的性能对比. 由于笔者在发文的时候还没有时间完成前面四种场景的压测,暂时留给读者亲自动手,作为一个练习. 更多思考尽管本文设计和实现的kclient项目提供了许多高级功能,并且使用起来方便,而且笔者在几家公司里在线上进行了应用,已经发挥了不少的作用,还有一些细节需要提高. 用例1:kclient处理器项目中管理Restful服务暂时只提供了获得状态的API,需要进行进一步的丰富,增加对线程池的监控,以及消息处理性能的监控. 用例2:当前注解@ErrorHandler里面的exception参数为必选,完全可以使用方法第一参数进行推导,简化开发人员配置的工作. 用例3:模板项目还不完善,需要增加启动和关闭脚本,这样读者可以直接拷贝使用. 用例4:尽管线上应用已经证明了kclient没有性能问题,但是开发一款中间件系统是需要闭环的,需要尽快把性能压测这块昨晚并且形成压测报表. 文章来自微信公众号:聊聊架构 (编辑:ASP站长网) |