DataX简介、部署、原理和使用介绍

1.DataX简介

1-1.项目地址

项目地址:https://github.com/alibaba/DataX

官方文档:https://github.com/alibaba/DataX/blob/master/introduction.md

1-2.DataX概述

DataX 是阿里云 DataWorks数据集成 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS, databend 等各种异构数据源之间高效的数据同步功能

DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通

1-3.DataX支持的数据源

DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入

类型数据源Reader(读)Writer(写)文档
RDBMS 关系型数据库MySQL
 Oracle
 OceanBase
 SQLServer
 PostgreSQL
 DRDS
 Kingbase
 通用RDBMS(支持所有关系型数据库)
阿里云数仓数据存储ODPS
 ADB 
 ADS 
 OSS
 OCS 
 Hologres 
 AnalyticDB For PostgreSQL 
阿里云中间件datahub读 、写
 SLS读 、写
阿里云图数据库GDB
NoSQL数据存储OTS
 Hbase0.94
 Hbase1.1
 Phoenix4.x
 Phoenix5.x
 MongoDB
 Cassandra
数仓数据存储StarRocks读 、
 ApacheDoris 
 ClickHouse 
 Databend 
 Hive
 kudu 
无结构化数据存储TxtFile
 FTP
 HDFS
 Elasticsearch 
时间序列数据库OpenTSDB 
 TSDB
 TDengine

1-4.DataX特点

 

2.DataX原理

2-1.DataX设计理念

  1. 异构数据源同步问题,就是不同框架之间同步数据时,相同的数据在不同框架中具有不同的数据结构。
  2. DataX的设计理念: DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接数据各种数据源。 当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

 

2-2.DataX框架设计

DataX本身作为离线数据同步框架,采用Framework+plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中

Framework的几大功能

ReaderWriter 可能会有读写速度不一致的情况,所以中间需要一个组件作为缓冲

控制数据传输的速度,DataX 可以随意根据需求调整数据传输速度

并发的同步或写入数据

既然是异构,那么说明读 Reader 的数据源与 写 Writer 的数据源 数据结构可能不同,数据结构不同的话,需要做数据转换操作,转换也在 Framework 中完成

 

2-3.DataX运行流程

DataX支持单机多线程模式完成同步作业,下面用一个DataX作业生命周期的时序图,用以说明DataX的运行流程、核心概念以及每个概念的关系

  1. DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
  2. DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
  3. 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
  4. 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
  5. DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0

 

2-4.DataX调度策略

举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:

  1. DataXJob根据分库分表切分成了100个Task。
  2. 根据20个并发,DataX计算共需要分配4个TaskGroup。
  3. 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。

 

3.DataX安装部署

 

4.DataX使用介绍

4-1.同步MySQl全量数据到HDFS案例

将MySQL的全量数据,利用DataX工具同步至HDFS 1.查看MySQL被迁移的数据情况 2.根据需求确定reader为mysqlreader,writer为hdfswriter

查看reader和writer模板的方式(-r 读模板; -w 写模板):

3.编写同步json脚本 4.确定HDFS上目标路径是否存在 5.通过datax.py指定json任务运行同步数据 6.数据验证,查看HDFS上是否已经有MySQL对应表中的所有数据

这里先跑通一个实验案例,再根据操作来总结

mysql2hdfs.json内容:

截至到这里,可以看到最终数据文件的内容和原MySQL数据匹配的上

总结:

MysqlReader插件介绍:实现了从Mysql读取数据。在底层实现上,MysqlReader通过JDBC连接远程Mysql数据库,并执行相应的SQL语句将数据从mysql库中select出来。

MysqlReader插件原理:MysqlReader通过JDBC连接器连接到远程的Mysql数据库,并根据用户配置的信息生成查询语句,然后发送到远程Mysql数据库,并将该SQL执行返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。

HdfsWriter插件介绍:提供项HDFS文件系统指定路径中写入TextTile和OrcFile类型的文件,文件内容可与Hive表相关联。

HdfsWriter:插件实现过程:首先根据用户指定的path,创建一个hdfs文件系统上的不存在的临时目录,创建规则是:path_随机;然后将读取的文件写入到这个临时目录中;待到全部写入后,再将这个临时目录下的文件移动到用户所指定的目录下,(在创建文件时保证文件名不重复);最后删除临时目录。如果在中间过程中发生网络中断等情况,造成无法与hdfs建立连接,需要用户手动删除已经写入的文件和临时目录

 

4-2.同步MySQl需求数据到HDFS案例(where)

相对上个案例的变化:

1.增加了where关键词,过滤同步的数据范围

2.去除了压缩格式:"compress": "gzip"

3.更换了分隔符,由原\t变成不可见分隔字符\u0001

mysql2hdfs_2.json脚本内容:

执行任务:

验证:

可以看到只有满足id>=2的2条数据被写入到了HDFS,直接看似乎没有分隔符,字符相连了,把文件下载到本地再次验证

注意,cat文件是看不到特殊分隔符的

4-3.同步MySQl需求数据到HDFS案例(传条件参数)

在生产环境中,离线数据同步任务需要在任务调度平台每日定时重复执行去拉取某个时间窗口的数据,例如每日同步T-1的数据到HDFS,但脚本中如果写了固定日期,每日任务都需要修改日期条件,显然不合理。因此为实现这个业务需求,需要使用DataX的传参功能。

创建测试表:

当前时间为20230216,

拟定2个变量:

START_FLAG=date -d"1 day ago" +%Y%m%d END_FLAG=date +%Y%m%d

编写同步脚本任务

mysql2hdfs_3.json脚本任务内容:

"where": "updated>=${START_FLAG} AND updated<${END_FLAG}"

相当于updated大于等于2023-02-15 00:00:00,小于2023-02-16 00:00:00的数据

当前日期为2月16日,则意为着数据是前一天日内的全量数据

执行任务:

验证数据:

可以看到数据只收取到了T-1日的2条数据

4-4.同步HDFS数据到MySQL案例

准备HDFS文件目录

创建MySQL被导入的测试表

hdfs2mysql.json任务内容:

执行任务:

验证查看MySQL

4-5.同步CSV文件数据倒MySQL案例

这里只提供部分样例数据用于调试

读出记录总数 : 173559

说明本次任务同步到MySQL涉及到173559行

和datax记录总数可以对上,说明CSV文件全部都同步到MySQL

5.DataX常见的参数设置

5-1.加速相关配置

参数说明注意事项
job.setting.speed.channel设置并发数 
job.setting.speed.record总record限速配置此参数,则必须配置单个channel的record限速参数
job.setting.speed.byte总byte限速配置此参数,则必须配置单个channel的byte限速参数
core.transport.channel.speed.record单个channel的record限速,默认10000条/s 

【注意】:如果配置了总record限速和总byte限速,channel并发数就会失效。因为配置了这两个参数后,实际的channel并发数是通过计算得到的

5-2.运行内存调整

当提升DataX Job内的Channel并发数时,内存的占用会明显增加,因为DataX作为数据交换通道,在内存中会缓存较多的数据。

例如:channel中会有一个Buffer,作为临时的数据交换缓冲区,而在Reader和Write中,也会有一些buffer,为了防止OOM等错误,需要适当调大JVM堆内存

修改datax.py

启动时使用参数:python bin/datax.py --jvm = "-Xms8G -Xmx8G” job.json