一篇文了解分布式队列编程:从模型、实战到优化(8)
当同一请求最小重复长度大于消费者队列长度,如下图.假定有3个消费者,Consumer1将会处理r1,Consumer2将会处理r2,Consumer3将会处理r3,如果每个请求处理的时间严格相等,Consumer1在处理完r1之后,接着处理r4,Consumer2将会处理r2之后会处理r1.虽然r1被再次处理,但是任何时刻,只有这一个消费者在处理r1,不会出现多个消费者同时处理同一请求的场景. 高重复消费模型如下图,仍然假定有3个消费者,队列中前面4个请求都是r1,它会同时被3个消费者线程处理: 显然,对于无重复和稀疏重复的分布式队列,排重优化并不会带来额外的好处.排重优化所针对的对象是高重复消费模型,特别是对于并行处理消费者比较多的情况,重复处理同一请求,资源消耗极大. 排重状态机排重优化的主要对象是高重复的队列,多个消费者线程或进程同时处理同一个幂等请求只会浪费计算资源并延迟其他待请求处理.所以,排重状态机的一个目标是处理唯一性,即:同一时刻,同一个请求只有一个消费者处理. 如果消费者获取一条请求消息,但发现其他消费者正在处理该消息,则当前消费者应该处于等待状态.如果对同一请求,有一个消费者在处理,一个消费者在等待,而同一请求再次被消费者读取,再次等待则没有意义. 所以,状态机的第二个目标是等待唯一性,同一个请求最多只有一个消费者处于等待状态.总上述,状态机的目标是:处理唯一性和等待唯一性.我们把正在处理的请求称为头部请求,正在等待的请求称为尾部请求. 由于状态机的处理单元是请求,所以需要针对每一个请求建立一个排重状态机.基于以上要求,我们设计的排重状态机包含4个状态Init,Process,Block,Decline.各个状态之间转化过程如下图:
构思状态机描述的是针对单个请求操作所引起状态变化,排重优化需要解决队列中所有请求的排重问题,需要对所有请求的状态机进行管理.这里只考虑单虚拟机内部对所有请求状态机的管理,对于跨虚拟机的管理可以采用类似的方法.对于多状态机管理主要包含三个方面:一致性问题、完整性问题和请求缓存驱逐问题. 一致性问题一致性在这里要求同一请求的不同消费者只会操作一个状态机.由于每个请求都产生一个状态机,系统将会包含大量的状态机.为了兼顾性能和一致性,我们采用ConcurrentHashMap保存所有的状态机.用ConcurrentHashMap而不是对整个状态机队列进行加锁,可以提高并行处理能力,使得系统可以同时操作不同状态机. 为了避免处理同一请求的多消费者线程同时对ConcurrentHashMap进行插入所导致状态机不一致问题,我们利用了ConcurrentHashMap的putIfAbsent()方法.代码方案如下,key2Status用于存储所有的状态机. 消费者在处理请求之前,从状态机队列中读取排重状态机TrafficAutomate.如果没有找到,则创建一个新的状态机,并通过putIfAbsent()方法插入到状态机队列中. 完整性问题完整性要求保障状态机Init,Decline四种状态正确、状态之间的转换也正确.由于状态机的操作非常轻量级,兼顾完整性和降低代码复杂度,我们对状态机的所有方法进行加锁. 请求缓存驱逐问题(Cache Eviction)如果不同请求的数量太多,内存永久保存所有请求的状态机的内存开销太大.所以,某些状态机需要在恰当的时候被驱逐出内存.这里有两个思路:
标识问题每个请求对应于一个状态机,不同的状态机采用不同的请求进行识别. 对于同一状态机的不同消费者,在单虚拟机方案中,我们采用线程id进行标识. 实施排重优化的主要功能都是通过排重状态机(TrafficAutomate)和状态机队列(QueueCoordinator)来实施的.排重状态机描述的是针对单个请求的排重问题,状态机队列解决所有请求状态机的排重问题. 状态机实施(TrafficAutomate)根据状态机模型,其主要操作为enQueue和deQueue,其状态由头部请求和尾部请求的状态共同决定,所以需要定义两个变量为head和tail,用于表示头部请求和尾部请求.为了确保多线程操作下状态机的完整性(Integraty),所有的操作都将加上锁. enQueue操作当一个消费者执行enQueue操作时:如果此时尾部请求不为空,根据等待唯一性要求,返回DECLINE,当前消费者应该抛弃该请求;如果头部请求为空,返回ACCPET,当前消费者应该立刻处理该消息;否则,返回BLOCK,该消费者应该等待,并不停的查看状态机的状态,一直到头部请求处理完成.enQueue代码如下: deQueue操作对于deQueue操作,首先将尾部请求赋值给头部请求,并将尾部请求置为无效.deQueue代码如下: (编辑:ASP站长网) |