-

spoon
2023年4月13日发(作者:pptv破解版)

kettlespoon判断增量更新_使⽤Kettle实现数据实时增量同步时

间戳增量回滚同步

0.前⾔

本⽂介绍了使⽤Kettle对⼀张业务表数据(500万条数据以上)进⾏实时(10秒)同步,采⽤了时间戳增量回滚同步的⽅法。关于ETL和Kettle的⼊门知识⼤家可以阅读相关的blog和⽂档学习。

1.时间戳增量回滚同步

假定在源数据表中有⼀个字段会记录数据的新增或修改时间,可以通过它对数据在时间维度上进⾏排序。通过中间表记录每次更新的时间戳,在下⼀个同步周期时,通过这个时间戳同步该时间戳以后的增量数据。这是时间戳增量同步。

但是时间戳增量同步不能对源数据库中历史数据的删除操作进⾏同步,我们可以通过在每次同步时,把时间戳往前回滚⼀段时间,从⽽同步

⼀定时间段内的删除操作。这就是时间戳增量回滚同步,这个名字是我⾃⼰给取得,意会即可,就是在时间戳增量同步的同时回滚⼀定的时

间段。说明:

源数据表需要被同步的数据表

⽬标数据表同步⾄的数据表中间表存储时间戳的表

2.前期准备

在两个数据库中分别创建数据表,并通过脚本在源数据表中插⼊500万条数据,完成后再以每秒⼀条的速度插⼊新数据,模拟⽣产环境。源数据表结构如下:

CREATETABLE`im_message`(

`id`int(11)NOTNULLAUTO_INCREMENT,

`sender`varchar(45)COLLATEutf8_binNOTNULLCOMMENT'消息发送者:SYSTEM',`send_time`datetime(6)NOTNULL,

`receiver`varchar(45)COLLATEutf8_binNOTNULLCOMMENT'消息接受者',

`content`varchar(255)COLLATEutf8_binNOTNULLCOMMENT'消息内容',

`is_read`tinyint(4)NOTNULLCOMMENT'消息是否被读取:0-未读;⾮0-已读',

`read_time`datetimeDEFAULTNULL,PRIMARYKEY(`id`),

UNIQUEKEY`id_UNIQUE`(`id`)

)ENGINE=InnoDBAUTO_INCREMENT=1DEFAULTCHARSET=utf8COLLATE=utf8_binCOMMENT='消息表'

3.作业流程

1.开始组件

2.建时间戳中间表

3.获取中间表的时间戳,并设置为全局变量

4.删除⽬标表中时间戳及时间戳以后的数据

5.抽取两个数据表的时间戳及时间戳以后的数据进⾏⽐对,并根据⽐对结果进⾏删除、新增或修改操作

6.更新时间戳

4.创建作业

作业的最终截图如下:

作业截图

4.1创建作业和DB连接

打开Spoon⼯具,新建作业,然后在左侧主对象树DB连接中新建DB连接。创建连接并测试通过后可以在左侧DB连接下右键共享出来。因

为在单个作业或者转换中新建的DB连接都是局域数据源,在其他转换和作业中是不能使⽤的,即使属于同⼀个作业下的不同转换,所以需要把他们共享,这样DB连接就会成为全局数据源,不⽤多次编辑。

4.2建时间戳中间表

这⼀步是为了在⽬标数据库建中间表etl_temp,并插⼊初始的时间戳字段。因为该作业在⽣产环境是循环调⽤的,该步骤在每⼀个同步周期中

都会调⽤,所以在建表时需要判断该表是否已经存在,如果不存在才建表。SQL代码和组件配置截图如下:

CREATETABLEIFNOTEXISTSetl_temp(idintprimarykey,time_stamptimestamp);INSERTIGNOREINTOetl_temp(id,time_stamp)VALUES(1,'2018-05-2200:00:00');

中间配置截图

我把该作业时间戳的ID设为1,在接下来的步骤中也是通过这个ID查询我们想要的时间戳

4.2获取时间戳并设为变量

新建⼀个转换,在转换中使⽤表输⼊和设置变量两个组件

表输⼊

SQL代码和组件配置截图如下

在Kettle中设置的变量都是字符串类型,为了便于⽐较。我在SQL语句把查出的时间戳进⾏了格式转换

selectdate_format(time_stamp,'%Y-%m-%d%H:%i:%s')time_stampfrometl_tempwhereid='1'

这⾥写图⽚描述

设置变量

变量活动类型可以为该变量设置四种有效活动范围,分别是JVM、该Job、⽗Job和祖⽗Job

这⾥写图⽚描述

4.3删除⽬标表中时间戳及时间戳以后的数据

这样做有两个好处:

1.避免在同步中重复或者遗漏数据。例如当时间戳在源数据表中不是唯⼀的,上⼀次同步周期最后⼀条数据的时间戳是2018-05-25

18:12:12,那么上⼀次同步周期结束后中间表中的时间戳就会更新为2018-05-2518:12:12。如果在下⼀个同步周期时源数据表中仍然有

时间戳为2018-05-2518:12:12的新数据,那么同步就会出现数据不⼀致。采⽤⼤于时间戳的⽅式同步就会遗漏数据,采⽤等于时间戳的⽅式同步就会重复同步数据。

2.增加健壮性当作业异常结束后,不⽤做任何多余的操作就可以重启。因为会删除⽬标表中时间戳及时间戳以后的数据,所以不⽤担⼼

数据⼀致性问题

2018-09-29:对增加健壮性进⾏补充:在⼀次同步周期中脚本异常中断,这时候中间表的时间戳没有更新,但是⽬标表已经同步了部分数据,当再次启动脚本就会出现数据重复的情况,⽽且在很多时候因为主键的存在,脚本启动会报错

在组件中使⽤了上⼀步骤设置的变量,所以必须勾选使⽤变量替换

deletefromtest__messagewheresend_time>='${TIME_STAMP}'

这⾥写图⽚描述

4.4抽取、⽐对和更新数据

这⼀步才是真正的数据同步步骤,完成了数据的抽取、⽐对,并根据不同的⽐对结果删除、更新、插⼊或不做任何操作。

正如前⽂所说,为了同步删除操作,在原始表输⼊和⽬标表输⼊步骤中回滚了⼀定时间段。其中回滚的时间段设置为了全局的参数。左右空⽩处右键即可设置参数,该作业下的所有作业和转换都能使⽤,设置如下图

这⾥写图⽚描述转换截图如下

这⾥写图⽚描述

原始表输⼊

SELECT

id

,sender

,send_time

,receiver

,content

,is_read,read_time

_message

wheresend_time>=date_sub(str_to_date('${TIME_STAMP}','%Y-%m-%d%H:%i:%s'),interval${ROLL_BACK_DAYS}day);

这⾥写图⽚描述

⽬标表输⼊

SELECT

id

,sender

,send_time

,receiver

,content

,is_read,read_time

FROMtest__message

wheresend_time>=date_sub(str_to_date('${TIME_STAMP}','%Y-%m-%d%H:%i:%s'),interval${ROLL_BACK_DAYS}day);

这⾥写图⽚描述

注意两个组件的数据库链接是不同的,当然它们也就这个和名字不同

⽐对记录

对两个表输⼊查出的数据进⾏⽐对,并把⽐对的结果写进输⼊流,传递给后⾯的组件。⽐对的结果有三种:

new

changeddeleted

标注字段表⽰⽐对结果的字段名,后⾯有⽤。关键字段表⽰⽐对的字段,在这个作业中我们⽐较两个的主键ID。

这⾥写图⽚描述

Switch

该步骤对上⼀步骤产⽣的标注字段进⾏路由,不同的结果路由到不同的步骤。其中⽬标步骤表⽰下⼀步骤的名字。

这⾥写图⽚描述

插⼊

Kettle有⼀个插⼊/更新组件,但是据⽹友介绍这个组件性能低下,每秒最多只能同步⼏百条数据,所有我对插⼊和更新分别作了不同的处理。插⼊使⽤表输出组件;更新使⽤更新组件。

为了进⼀步提升同步效率,我在表输出组件使⽤了多线程(右键>改变开始复制的数量),使同步速度达到每秒12000条。Switch组件和表输出组件中间的虚拟组件(空操作)也是为了使⽤多线程添加的。

这⾥写图⽚描述

这⾥写图⽚描述

勾选批量插⼊,可以极⼤提⾼同步速度

更新和删除

这⾥写图⽚描述

这⾥写图⽚描述

4.5更新时间戳

set@new_etl_start_time_stamp=(SELECTSEND_TIMEFROMtest__messageORDERBYSEND_TIMEDESCLIMIT1);updateetl_tempsettime_stamp=@new_etl_start_time_stampwhereid='1';

这⾥写图⽚描述

4.6发送邮箱

关于发送邮件组件⽹上有很多资料,就不多做介绍。特别强调⼀点,邮箱密码是单独的授权码,⽽不是邮箱登录密码。

运⾏

在开发环境点击Spoon界⾯左上⾓三⾓符号运⾏作业即可。

在第⼀次运⾏时,为了提⾼同步效率,可以先不创建⽬标表的索引。在第⼀此同步完成后,再创建索引。然后在START组件中编辑调度逻

辑,再次启动。如下图所⽰

这⾥写图⽚描述运⾏⽇志如下图

这⾥写图⽚描述

这样,⼀个使⽤时间戳增量回滚同步数据的作业就完成了

-

spoon

更多推荐

spoon