Sqoop1.4.4使用增量导入形式将MySQL数据库中数据导入到HDFS中
Sqoop1.4.4使用增量导入模式将MySQL数据库中数据导入到HDFS中
一般情况下,--check-column是数据库中的关键字,以此来判断哪些列是新增数据。--incremental 增量导入模式分为两种:append和lastmodified。--last-value 后面接的值一般是上一次导入操作后指定列的最大值,便于Sqoop将此最大值后新增的值导入。
问题导读:
1、什么是增量导入?
2、--check-column、--incremental和--last-value三个参数的作用?
3、使用增量导入,需要注意什么?
一、增量导入模式简介
Sqoop提供将新数据导入,而已经导入的数据不再执行导入操作,即增量导入操作。使用以下参数:
--check-column (col) 检查指定的列,根据此列判断哪些记录是新数据且需要导入的,列不能是字符相关类型(CHAR/NCHAR/VARCHAR/VARNCHAR/ LONGVARCHAR/LONGNVARCHAR) --incremental (mode) 指定增量模式,mode包含两种方式:append和lastmodified append: 当表中的记录是以id持续增加导入新的记录的时候,可以使用append模式,--check-column id 用于检查id lastmodified: 表有时候也会执行更新操作,此时可以使用lastmodified导入 --last-value (value): 指定某个值,将大于该值的检查列记录导入,以确定仅将新的或者更新后的记录导入新的文件系统
一般情况下,--check-column是数据库中的关键字,以此来判断哪些列是新增数据。--incremental 增量导入模式分为两种:append和lastmodified。--last-value 后面接的值一般是上一次导入操作后指定列的最大值,便于Sqoop将此最大值后新增的值导入。
二、先将MySQL数据库表中现有数据导入到HDFS中
此部分内容此处不详细介绍,可以参考我的另外一篇文章:使用Sqoop1.4.4将MySQL数据库表中数据导入到HDFS中
[hadoopUser@secondmgt ~]$ sqoop import --connect jdbc:mysql://secondmgt:3306/spice --username hive --password hive --table users --target-dir /output/incrimport/查看结果:
[hadoopUser@secondmgt ~]$ hadoop fs -cat /output/incrimport/* 56,hua,hanyun,男,开通,2013-12-02,0,1 58,feng,123456,男,开通,2013-11-22,0,0 59,test,123456,男,开通,2014-03-05,58,0 60,user1,123456,男,开通,2014-06-26,66,0 61,user2,123,男,开通,2013-12-13,56,0 62,user3,123456,男,开通,2013-12-14,0,0 64,kai.zhou,123456,?,??,2014-03-05,65,0 65,test1,111,男,未开通,null,0,0 66,test2,111,男,未开通,null,0,0 67,test3,113,男,未开通,null,0,0三、新插入三条记录到MySQL数据库表中
mysql> select * from users; +----+-------------+----------+-----+-----------+------------+-------+------+ | id | username | password | sex | content | datetime | vm_id | isad | +----+-------------+----------+-----+-----------+------------+-------+------+ | 56 | hua | hanyun | 男 | 开通 | 2013-12-02 | 0 | 1 | | 58 | feng | 123456 | 男 | 开通 | 2013-11-22 | 0 | 0 | | 59 | test | 123456 | 男 | 开通 | 2014-03-05 | 58 | 0 | | 60 | user1 | 123456 | 男 | 开通 | 2014-06-26 | 66 | 0 | | 61 | user2 | 123 | 男 | 开通 | 2013-12-13 | 56 | 0 | | 62 | user3 | 123456 | 男 | 开通 | 2013-12-14 | 0 | 0 | | 64 | kai.zhou | 123456 | ? | ?? | 2014-03-05 | 65 | 0 | | 65 | test1 | 111 | 男 | 未开通 | NULL | 0 | 0 | | 66 | test2 | 111 | 男 | 未开通 | NULL | 0 | 0 | | 67 | test3 | 113 | 男 | 未开通 | NULL | 0 | 0 | | 68 | sqoopincr01 | 113 | 男 | 未开通 | NULL | 0 | 0 | | 69 | sqoopincr02 | 113 | 男 | 未开通 | NULL | 0 | 0 | | 70 | sqoopincr03 | 113 | 男 | 未开通 | NULL | 0 | 0 | +----+-------------+----------+-----+-----------+------------+-------+------+ 13 rows in set (0.00 sec)
四、使用Append模式,增量导入
[hadoopUser@secondmgt ~]$ sqoop import --connect jdbc:mysql://secondmgt:3306/spice --username hive --password hive --table users --check-column id --incremental append --target-dir /output/incrimport/此处我们--check-column 后接关键字列id,作为检查列,增量导入模式为--incremental append
查看结果:
[hadoopUser@secondmgt ~]$ hadoop fs -cat /output/incrimport/* 56,hua,hanyun,男,开通,2013-12-02,0,1 58,feng,123456,男,开通,2013-11-22,0,0 59,test,123456,男,开通,2014-03-05,58,0 60,user1,123456,男,开通,2014-06-26,66,0 61,user2,123,男,开通,2013-12-13,56,0 62,user3,123456,男,开通,2013-12-14,0,0 64,kai.zhou,123456,?,??,2014-03-05,65,0 65,test1,111,男,未开通,null,0,0 66,test2,111,男,未开通,null,0,0 67,test3,113,男,未开通,null,0,0 56,hua,hanyun,男,开通,2013-12-02,0,1 58,feng,123456,男,开通,2013-11-22,0,0 59,test,123456,男,开通,2014-03-05,58,0 60,user1,123456,男,开通,2014-06-26,66,0 61,user2,123,男,开通,2013-12-13,56,0 62,user3,123456,男,开通,2013-12-14,0,0 64,kai.zhou,123456,?,??,2014-03-05,65,0 65,test1,111,男,未开通,null,0,0 66,test2,111,男,未开通,null,0,0 67,test3,113,男,未开通,null,0,0 68,sqoopincr01,113,男,未开通,null,0,0 69,sqoopincr02,113,男,未开通,null,0,0 70,sqoopincr03,113,男,未开通,null,0,0由上查看结果可知,里面有之前导入的重复值,也就是说Sqoop又执行了一次完整导入,只是在原来的基础上append。很显然,不符合我们的要求,但是为什么会这样呢?原来是我们没有指定上次导入后最后的值即没有设置--last-value参数。修改后如下:
[hadoopUser@secondmgt ~]$ sqoop import --connect jdbc:mysql://secondmgt:3306/spice --username hive --password hive --table users --check-column id --incremental append --last-value 67 --target-dir /output/incrimport/ Warning: /usr/lib/hcatalog does not exist! HCatalog jobs will fail. Please set $HCAT_HOME to the root of your HCatalog installation. 15/01/18 12:40:39 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead. 15/01/18 12:40:40 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset. 15/01/18 12:40:40 INFO tool.CodeGenTool: Beginning code generation 15/01/18 12:40:40 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `users` AS t LIMIT 1 15/01/18 12:40:40 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `users` AS t LIMIT 1 15/01/18 12:40:40 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /home/hadoopUser/cloud/hadoop/programs/hadoop-2.2.0 Note: /tmp/sqoop-hadoopUser/compile/315cec389ff097cf4f6de538734929c1/users.java uses or overrides a deprecated API. Note: Recompile with -Xlint:deprecation for details. 15/01/18 12:40:41 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-hadoopUser/compile/315cec389ff097cf4f6de538734929c1/users.jar 15/01/18 12:40:41 INFO tool.ImportTool: Maximal id query for free form incremental import: SELECT MAX(`id`) FROM users 15/01/18 12:40:41 INFO tool.ImportTool: Incremental import based on column `id` 15/01/18 12:40:41 INFO tool.ImportTool: Upper bound value: 70 15/01/18 12:40:41 WARN manager.MySQLManager: It looks like you are importing from mysql. 15/01/18 12:40:41 WARN manager.MySQLManager: This transfer can be faster! Use the --direct 15/01/18 12:40:41 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path. 15/01/18 12:40:41 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql) 15/01/18 12:40:41 INFO mapreduce.ImportJobBase: Beginning import of users 15/01/18 12:40:41 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/hadoopUser/cloud/hadoop/programs/hadoop-2.2.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/hadoopUser/cloud/hbase/hbase-0.96.2-hadoop2/lib/slf4j-log4j12-1.6.4.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 15/01/18 12:40:42 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar 15/01/18 12:40:42 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps 15/01/18 12:40:42 INFO client.RMProxy: Connecting to ResourceManager at secondmgt/192.168.2.133:8032 15/01/18 12:40:43 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`id`), MAX(`id`) FROM `users` WHERE ( `id` <= 70 ) 15/01/18 12:40:43 INFO mapreduce.JobSubmitter: number of splits:4 15/01/18 12:40:43 INFO Configuration.deprecation: mapred.job.classpath.files is deprecated. Instead, use mapreduce.job.classpath.files 15/01/18 12:40:43 INFO Configuration.deprecation: user.name is deprecated. Instead, use mapreduce.job.user.name 15/01/18 12:40:43 INFO Configuration.deprecation: mapred.cache.files.filesizes is deprecated. Instead, use mapreduce.job.cache.files.filesizes 15/01/18 12:40:43 INFO Configuration.deprecation: mapred.cache.files is deprecated. Instead, use mapreduce.job.cache.files 15/01/18 12:40:43 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces 15/01/18 12:40:43 INFO Configuration.deprecation: mapred.output.value.class is deprecated. Instead, use mapreduce.job.output.value.class 15/01/18 12:40:44 INFO Configuration.deprecation: mapreduce.map.class is deprecated. Instead, use mapreduce.job.map.class 15/01/18 12:40:44 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name 15/01/18 12:40:44 INFO Configuration.deprecation: mapreduce.inputformat.class is deprecated. Instead, use mapreduce.job.inputformat.class 15/01/18 12:40:44 INFO Configuration.deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir 15/01/18 12:40:44 INFO Configuration.deprecation: mapreduce.outputformat.class is deprecated. Instead, use mapreduce.job.outputformat.class 15/01/18 12:40:44 INFO Configuration.deprecation: mapred.cache.files.timestamps is deprecated. Instead, use mapreduce.job.cache.files.timestamps 15/01/18 12:40:44 INFO Configuration.deprecation: mapred.output.key.class is deprecated. Instead, use mapreduce.job.output.key.class 15/01/18 12:40:44 INFO Configuration.deprecation: mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir 15/01/18 12:40:44 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1421373857783_0013 15/01/18 12:40:44 INFO impl.YarnClientImpl: Submitted application application_1421373857783_0013 to ResourceManager at secondmgt/192.168.2.133:8032 15/01/18 12:40:44 INFO mapreduce.Job: The url to track the job: http://secondmgt:8088/proxy/application_1421373857783_0013/ 15/01/18 12:40:44 INFO mapreduce.Job: Running job: job_1421373857783_0013 15/01/18 12:40:57 INFO mapreduce.Job: Job job_1421373857783_0013 running in uber mode : false 15/01/18 12:40:57 INFO mapreduce.Job: map 0% reduce 0% 15/01/18 12:41:07 INFO mapreduce.Job: map 25% reduce 0% 15/01/18 12:41:12 INFO mapreduce.Job: map 75% reduce 0% 15/01/18 12:41:16 INFO mapreduce.Job: map 100% reduce 0% 15/01/18 12:41:16 INFO mapreduce.Job: Job job_1421373857783_0013 completed successfully 15/01/18 12:41:16 INFO mapreduce.Job: Counters: 27 File System Counters FILE: Number of bytes read=0 FILE: Number of bytes written=368748 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=401 HDFS: Number of bytes written=522 HDFS: Number of read operations=16 HDFS: Number of large read operations=0 HDFS: Number of write operations=8 Job Counters Launched map tasks=4 Other local map tasks=4 Total time spent by all maps in occupied slots (ms)=172816 Total time spent by all reduces in occupied slots (ms)=0 Map-Reduce Framework Map input records=13 Map output records=13 Input split bytes=401 Spilled Records=0 Failed Shuffles=0 Merged Map outputs=0 GC time elapsed (ms)=221 CPU time spent (ms)=11650 Physical memory (bytes) snapshot=620658688 Virtual memory (bytes) snapshot=3546619904 Total committed heap usage (bytes)=335544320 File Input Format Counters Bytes Read=0 File Output Format Counters Bytes Written=522 15/01/18 12:41:16 INFO mapreduce.ImportJobBase: Transferred 522 bytes in 34.1265 seconds (15.296 bytes/sec) 15/01/18 12:41:16 INFO mapreduce.ImportJobBase: Retrieved 13 records. 15/01/18 12:41:16 INFO util.AppendUtils: Appending to directory incrimport 15/01/18 12:41:16 INFO util.AppendUtils: Using found partition 4 15/01/18 12:41:16 INFO tool.ImportTool: Incremental import complete! To run another incremental import of all data following this import, supply the following arguments: 15/01/18 12:41:16 INFO tool.ImportTool: --incremental append 15/01/18 12:41:16 INFO tool.ImportTool: --check-column id 15/01/18 12:41:16 INFO tool.ImportTool: --last-value 70 15/01/18 12:41:16 INFO tool.ImportTool: (Consider saving this with 'sqoop job --create')再次查看结果:
[hadoopUser@secondmgt ~]$ hadoop fs -cat /output/incrimport/* 56,hua,hanyun,男,开通,2013-12-02,0,1 58,feng,123456,男,开通,2013-11-22,0,0 59,test,123456,男,开通,2014-03-05,58,0 60,user1,123456,男,开通,2014-06-26,66,0 61,user2,123,男,开通,2013-12-13,56,0 62,user3,123456,男,开通,2013-12-14,0,0 64,kai.zhou,123456,?,??,2014-03-05,65,0 65,test1,111,男,未开通,null,0,0 66,test2,111,男,未开通,null,0,0 67,test3,113,男,未开通,null,0,0 68,sqoopincr01,113,男,未开通,null,0,0 69,sqoopincr02,113,男,未开通,null,0,0 70,sqoopincr03,113,男,未开通,null,0,0此时结果正确!
五、使用lastmodified,增量导入
使用lastmodified增量导入,指定列--check-column后面需要接的是一个时间戳列,然后再使用--last-value后接最近的一个时间,以此实现修改操作的导入。此处不详细介绍。
注意:不管是使用哪种模式导入,--check-column、--incremental和--last-value三个参数在命令中都必须同时使用。
推荐阅读:
上一篇: Sqoop1.4.4使用SQL语句形式将MySQL数据库表中数据导入到HDFS中
下一篇:Sqoop1.4.4将文件数据集从HDFS中导出到MySQL数据库表中