如何基于日志,同步实现数据的一致性和实时抽取?(3)
消息中schema部分,定义了namespace 是由 类型+数据源名+schema名+表名+版本号+分库号+分表号?能够描述整个公司的所有表,通过一个namespace就能唯一定位.
payload是指具体的数据,一个json包里面可以包含1条至多条数据,提高数据的有效载荷. UMS中支持的数据类型,参考了Hive类型并进行简化,基本上包含了所有数据类型. 全量和增量的一致性 在整个数据传输中,为了尽量的保证日志消息的顺序性,kafka我们使用的是1个partition的方式.在一般情况下,基本上是顺序的和唯一的. 但是我们知道写kafka会失败,有可能重写,Storm也用重做机制,因此,我们并不严格保证exactly once和完全的顺序性,但保证的是at least once. 因此_ums_id_变得尤为重要. 对于全量抽取,_ums_id_是唯一的,从zk中每个并发度分别取不同的id片区,保证了唯一性和性能,填写负数,不会与增量数据冲突,也保证他们是早于增量消息的. 对于增量抽取,我们使用的是MySQL的日志文件号 + 日志偏移量作为唯一id.Id作为64位的long整数,高7位用于日志文件号,低12位作为日志偏移量. 例如:000103000012345678. 103 是日志文件号,12345678 是日志偏移量. 这样,从日志层面保证了物理唯一性(即便重做也这个id号也不变),同时也保证了顺序性(还能定位日志).通过比较_ums_id_ 消费日志就能通过比较_ums_id_知道哪条消息更新. 其实_ums_ts_与_ums_id_意图是类似的,只不过有时候_ums_ts_可能会重复,即在1毫秒中发生了多个操作,这样就得靠比较_ums_id_了. 心跳监控和预警 整个系统涉及到数据库的主备同步,Canal Server,多个并发度Storm进程等各个环节. 因此对流程的监控和预警就尤为重要. 通过心跳模块,例如每分钟(可配置)对每个被抽取的表插入一条心态数据并保存发送时间,这个心跳表也被抽取,跟随着整个流程下来,与被同步表在实际上走相同的逻辑(因为多个并发的的Storm可能有不同的分支),当收到心跳包的时候,即便没有任何增删改的数据,也能证明整条链路是通的. Storm程序和心跳程序将数据发送公共的统计topic,再由统计程序保存到influxdb中,使用grafana进行展示,就可以看到如下效果: 图中是某业务系统的实时监控信息.上面是实时流量情况,下面是实时延时情况.可以看到,实时性还是很不错的,基本上1~2秒数据就已经到末端kafka中. Granfana提供的是一种实时监控能力. 如果出现延时,则是通过dbus的心跳模块发送邮件报警或短信报警. 实时脱敏 考虑到数据安全性,对于有脱敏需求的场景,Dbus的全量storm和增量storm程序也完成了实时脱敏的功能.脱敏方式有3种: 总结一下:简单的说,Dbus就是将各种源的数据,实时的导出,并以UMS的方式提供订阅,支持实时脱敏,实际监控和报警. 四、Wormhole解决方案说完Dbus,该说一下Wormhole,为什么两个项目不是一个,而要通过kafka来对接呢? 其中很大一个原因就是解耦,kafka具有天然的解耦能力,程序直接可以通过kafka做异步的消息传递.Dbus和Wornhole内部也使用了kafka做消息传递和解耦. 另外一个原因就是,UMS是自描述的,通过订阅kafka,任何有能力的使用方来直接消费UMS来使用. 虽然UMS的结果可以直接订阅,但还需要开发的工作.Wormhole解决的是:提供一键式的配置,将kafka中的数据落地到各种系统中,让没有开发能力的数据使用方通过wormhole来实现使用数据. 如图所示,Wormhole 可以将kafka中的UMS 落地到各种系统,目前用的最多的HDFS,JDBC的数据库和HBase. 在技术栈上,wormhole选择使用spark streaming来进行. 在Wormhole中,一条flow是指从一个namaspace从源端到目标端.一个spark streaming服务于多条flow. (编辑:ASP站长网) |