一篇文了解分布式队列编程:从模型、实战到优化(9)
状态机队列实施(QueueCoordinator)接口定义状态机队列集中管理所有请求的排重状态机,所以其操作和单个状态机一样,即enQueue和deQueuqe接口.这两个接口的实现需要识别特定请求的状态机,所以它们的入参应该是请求.为了兼容不同类型的请求消息,我们采用了Java泛型编程.接口定义如下: enQueue操作enQueue操作过程如下: 首先,根据传入的请求key值,获取状态机,如果不存在则创建一个新的状态机,并保存在ConcurrentHashMap中. 接下来,获取线程id作为该消费者的唯一标识,并对对应状态机进行enQueue操作. 如果状态机返回值为ACCEPT或者DECLINE,返回业务层处理代码,ACCEPT意味着业务层需要处理该消息,DECLINE表示业务层可以抛弃当前消息.如果状态机返回值为Block,则该线程保持等待状态. 在某些情况下,头部请求线程可能由于异常,未能对状态机进行deQueue操作(作为组件提供方,不能假定所有的规范被使用方实施).为了避免处于阻塞状态的消费者无期限地等待,建议对状态机设置安全超时时限.超过了一定时间后,状态机强制清空头部请求,返回到业务层,业务层开始处理该请求. 代码如下: deQueue操作deQueue操作首先从ConcurrentHashMap获取改请求所对应的状态机,接着获取该线程的线程id,对状态机进行deQueue操作. enQueue代码如下: 源代码完整源代码可以在QueueCoordinator获取.链接: https://github.com/dinglau2008/QueueCoordinator/tree/master/src 文章出处:美团点评技术团队 (编辑:ASP站长网) |