文章目录

  • 01 引言
  • 02 DataX引入
  • 03 DataX
    • 3.1 DataX概念
    • 3.2 DataX原理
    • 3.3 DataX架构
      • 3.3.1 Job作业
      • 3.3.2 Task子任务
      • 3.3.3 TaskGroup
    • 3.4 DataX代码执行流程
  • 04 文末

01 引言

因为最近使用到了DataX,所以接下来需要来个系统的学习,并以博客的形式记录。

DataX的源码地址:https://github/alibaba/DataX
DataX官方介绍:https://github/alibaba/DataX/blob/master/introduction.md

02 DataX引入

很多时候,我们都需要把不同数据库的数据做迁移,典型的就如Oracle数据库的数据迁移到MySQL或者迁移到SQLServer,那么问题来了,我们把数据源迁移到另外一个新的数据库,都需要写一个程序,这是十分麻烦的(如下图):

那么有没有一个框架,能实现同步数据库之间的数据同步呢?其实是有的,就是本文要讲的DataX

03 DataX

3.1 DataX概念

首先我们要知道的是DataX为何物?官方是这样描述的:

  • DataX是阿里云DataWorks数据集成 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台
  • DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各种异构数据源之间高效的数据同步功能。

可以知道,DataX就是一个离线异构数据源同步工具,它的设计理念图如下,这也是网上举例最多的图了:

3.2 DataX原理

首先看看DataX的原理图:

从上图可以看到,DataX主要由3部分组成:

  • ReaderReader为数据采集模块,负责采集数据源的数据,将数据发送给Framework
  • WriterWriter为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端;
  • FrameworkFramework用于连接readerwriter,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

DataX采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

DataX已经把主流的RDBMS数据库、NOSQL、大数据计算系统插件都已经接入了,如下:

类型数据源Reader(读)Writer(写)文档
RDBMS 关系型数据库MySQL读 、写
Oracle读 、写
OceanBase读 、写
SQLServer读 、写
PostgreSQL读 、写
DRDS读 、写
通用RDBMS(支持所有关系型数据库)读 、写
阿里云数仓数据存储ODPS读 、写
ADS
OSS读 、写
OCS
NoSQL数据存储OTS读 、写
Hbase0.94读 、写
Hbase1.1读 、写
Phoenix4.x读 、写
Phoenix5.x读 、写
MongoDB读 、写
Hive读 、写
Cassandra读 、写
无结构化数据存储TxtFile读 、写
FTP读 、写
HDFS读 、写
Elasticsearch
时间序列数据库OpenTSDB
TSDB读 、写

3.3 DataX架构

看看DataX的架构图:

主要由Job模块、Task模块、TaskGroup模块组成,当DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0。下面细讲每个模块:

3.3.1 Job作业

  • DataX完成单个数据同步的作业,我们称之为Job
  • DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。
  • DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)TaskGroup管理等功能。

3.3.2 Task子任务

  • DataX Job启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。
  • Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。

3.3.3 TaskGroup

  • 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。
  • 每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
  • 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。

3.4 DataX代码执行流程

这里参考了:https://zhuanlan.zhihu/p/81817787,后面的文章也会继续编写(这里可以跳过)

流程:

  1. 解析配置,包括job.json、core.json、plugin.json三个配置
  2. 设置jobId到configuration当中
  3. 启动Engine,通过Engine.start()进入启动程序
  4. 设置RUNTIME_MODE到configuration当中
  5. 通过JobContainer的start()方法启动
  6. 依次执行job的preHandler()、init()、prepare()、split()、schedule()、- post()、postHandle()等方法。
  7. init()方法涉及到根据configuration来初始化reader和writer插件,这里涉及到jar包热加载以及调用插件init()操作方法,同时设置reader和writer的configuration信息
  8. prepare()方法涉及到初始化reader和writer插件的初始化,通过调用插件的prepare()方法实现,每个插件都有自己的jarLoader,通过集成URLClassloader实现而来
  9. split()方法通过adjustChannelNumber()方法调整channel个数,同时执行reader和writer最细粒度的切分,需要注意的是,writer的切分结果要参照reader的切分结果,达到切分后数目相等,才能满足1:1的通道模型
  10. channel的计数主要是根据byte和record的限速来实现的(如果自己没有设置了channel的个数),在split()的函数中第一步就是计算channel的大小
  11. split()方法reader插件会根据channel的值进行拆分,但是有些reader插件可能不会参考channel的值,writer插件会完全根据reader的插件1:1进行返回
  12. split()方法内部的mergeReaderAndWriterTaskConfigs()负责合并reader、writer、以及transformer三者关系,生成task的配置,并且重写job.content的配置
  13. schedule()方法根据split()拆分生成的task配置分配生成taskGroup对象,根据task的数量和单个taskGroup支持的task数量进行配置,两者相除就可以得出taskGroup的数量14、schdule()内部通过AbstractScheduler的schedule()执行,继续执行startAllTaskGroup()方法创建所有的TaskGroupContainer组织相关的task,TaskGroupContainerRunner负责运行TaskGroupContainer执行分配的task。
  14. taskGroupContainerExecutorService启动固定的线程池用以执行TaskGroupContainerRunner对象,TaskGroupContainerRunner的run()方法调用taskGroupContainer.start()方法,针对每个channel创建一个TaskExecutor,通过taskExecutor.doStart()启动任务。

04 文末

DataX的源码解读,可以参考其它博主的博客:https://waterwang.blog.csdn/article/details/114630690

更多推荐

DataX教程(01)- 入门