如果是第一次运行,或者上一次任务失败时还没有触发checkpoint,那么offset就不存在,根据offset和通道可以确定具体的查询sql:
offset存在时
第一个通道:
select * from data_test
where id mod 2=0
and id > ${offset_0};
第二个通道:
select * from data_test
where id mod 2=1
and id > ${offset_1};
offset不存在时
第一个通道:
select * from data_test
where id mod 2=0;
第二个通道:
select * from data_test
where id mod 2=1;
数据分片构造好之后,每个通道就根据自己的数据分片去读数据了。
2)写数据
写数据前会先做几个操作:
a、检测 /data_test 目录是否存在,如果目录不存在,则创建这个目录,如果目录存在,进行2操作;
b、判断是不是以覆盖模式写数据,如果是,则删除 /data_test目录,然后再创建目录,如果不是,则进行3操作;
c、检测 /data_test/.data 目录是否存在,如果存在就先删除,再创建,确保没有其它任务因异常失败遗留的脏数据文件;
数据写入hdfs是单条写入的,不支持批量写入。数据会先写入/data_test/.data/目录下,数据文件的命名格式为:
channelIndex.jobId.fileIndex
包含通道索引,jobId,文件索引三个部分。
3)checkpoint触发时
在FlinkX中“状态”表示的是标识字段id的值,我们假设checkpoint触发时两个通道的读取和写入情况如图中所示:
checkpoint触发后,两个reader先生成Snapshot记录读取状态,通道0的状态为 id=12,通道1的状态为 id=11。Snapshot生成之后向数据流里面插入barrier,barrier随数据流向Writer。以Writer_0为例,Writer_0接收Reader_0和Reader_1发来的数据,假设先收到了Reader_0的barrier,这个时候Writer_0停止写出数据到HDFS,将接收到的数据先放到 InputBuffer里面,一直等待Reader_1的barrier到达之后再将Buffer里的数据全部写出,然后生成Writer的Snapshot,整个checkpoint结束后,记录的任务状态为:
Reader_0:id=12
Reader_1:id=11
Writer_0:id=无法确定
Writer_1:id=无法确定
任务状态会记录到配置的HDFS目录/flinkx/checkpoint/abc123下。因为每个Writer会接收两个Reader的数据,以及各个通道的数据读写速率可能不一样,所以导致writer接收到的数据顺序是不确定的,但是这不影响数据的准确性,因为读取数据时只需要Reader记录的状态就可以构造查询sql,我们只要确保这些数据真的写到HDFS就行了。在Writer生成Snapshot之前,会做一系列操作保证接收到的数据全部写入HDFS:
a、close写入HDFS文件的数据流,这时候会在/data_test/.data目录下生成两个两个文件:
/data_test/.data/0.abc123.0
/data_test/.data/1.abc123.0
b、将生成的两个数据文件移动到/data_test目录下;
c、更新文件名称模板更新为:channelIndex.abc123.1;
快照生成后任务继续读写数据,如果生成快照的过程中有任何异常,任务会直接失败,这样这次快照就不会生成,任务恢复时会从上一个成功的快照恢复。
4)任务正常结束
任务正常结束时也会做和生成快照时同样的操作,close文件流,移动临时数据文件等。
5)任务异常终止
任务如果异常结束,假设任务结束时最后一个checkpoint记录的状态为:
Reader_0:id=12Reader_1:id=11
那么任务恢复的时候就会把各个通道记录的状态赋值给offset,再次读取数据时构造的sql为:
第一个通道:
select * from data_test
where id mod 2=0
and id > 12;
第二个通道:
select * from data_test
where id mod 2=1
and id > 11;
这样就可以从上一次失败的位置继续读取数据了。
3、支持断点续传的插件
理论上只要支持过滤数据的数据源,和支持事务的数据源都可以支持断点续传的功能,目前FlinkX支持的插件如下:
四、实时采集
目前FlinkX支持实时采集的插件有KafKa、binlog插件,binlog插件是专门针对mysql数据库做实时采集的,如果要支持其它的数据源,只需要把数据打到Kafka,然后再用FlinkX的Kafka插件消费数据即可,比如oracle,只需要使用oracle的ogg将数据打到Kafka。这里我们专门讲解一下mysql的实时采集插件binlog。
1、binlog
binlog是Mysql sever层维护的一种二进制日志,与innodb引擎中的redo/undo log是完全不同的日志;其主要是用来记录对mysql数据更新或潜在发生更新的SQL语句,并以"事务"的形式保存在磁盘中。
binlog的作用主要有:
1)复制:MySQL Replication在Master端开启binlog,Master把它的二进制日志传递给slaves并回放来达到master-slave数据一致的目的;
2)数据恢复:通过mysqlbinlog工具恢复数据;
3)增量备份。
2、MySQL 主备复制
有了记录数据变化的binlog日志还不够,我们还需要借助MySQL的主备复制功能:主备复制是指 一台服务器充当主数据库服务器,另一台或多台服务器充当从数据库服务器,主服务器中的数据自动复制到从服务器之中。
主备复制的过程:
1)MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看);
2)MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log);
3)MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据。
3、写入Hive
binlog插件可以监听多张表的数据变更情况,解析出的数据中包含表名称信息,读取到的数据可以全部写入目标数据库的一张表,也可以根据数据中包含的表名信息写入不同的表,目前只有Hive插件支持这个功能。Hive插件目前只有写入插件,功能基于HDFS的写入插件实现,也就是说从binlog读取,写入hive也支持失败恢复的功能。
写入Hive的过程:
1)从数据中解析出MySQL的表名,然后根据表名映射规则转换成对应的Hive表名;
2)检查Hive表是否存在,如果不存在就创建Hive表;
3)查询Hive表的相关信息,构造HdfsOutputFormat;
4)调用HdfsOutputFormat将数据写入HDFS。