Sqoop1.4.4使用增量导入形式将MySQL数据库中数据导入到HDFS中

Sqoop1.4.4使用增量导入模式将MySQL数据库中数据导入到HDFS中

问题导读:

       1、什么是增量导入?

       2、--check-column、--incremental和--last-value三个参数的作用?

       3、使用增量导入,需要注意什么?

一、增量导入模式简介

       Sqoop提供将新数据导入,而已经导入的数据不再执行导入操作,即增量导入操作。使用以下参数:

--check-column (col)    检查指定的列,根据此列判断哪些记录是新数据且需要导入的,列不能是字符相关类型(CHAR/NCHAR/VARCHAR/VARNCHAR/ LONGVARCHAR/LONGNVARCHAR)
--incremental (mode)    指定增量模式,mode包含两种方式:append和lastmodified
      append:       当表中的记录是以id持续增加导入新的记录的时候,可以使用append模式,--check-column id 用于检查id
      lastmodified: 表有时候也会执行更新操作,此时可以使用lastmodified导入
--last-value (value):  指定某个值,将大于该值的检查列记录导入,以确定仅将新的或者更新后的记录导入新的文件系统

        一般情况下,--check-column是数据库中的关键字,以此来判断哪些列是新增数据--incremental 增量导入模式分为两种:append和lastmodified--last-value 后面接的值一般是上一次导入操作后指定列的最大值,便于Sqoop将此最大值后新增的值导入。

二、先将MySQL数据库表中现有数据导入到HDFS中

        此部分内容此处不详细介绍,可以参考我的另外一篇文章:使用Sqoop1.4.4将MySQL数据库表中数据导入到HDFS中

[hadoopUser@secondmgt ~]$ sqoop import --connect jdbc:mysql://secondmgt:3306/spice --username hive --password hive --table users --target-dir /output/incrimport/
       查看结果:

[hadoopUser@secondmgt ~]$ hadoop fs -cat /output/incrimport/*
56,hua,hanyun,男,开通,2013-12-02,0,1
58,feng,123456,男,开通,2013-11-22,0,0
59,test,123456,男,开通,2014-03-05,58,0
60,user1,123456,男,开通,2014-06-26,66,0
61,user2,123,男,开通,2013-12-13,56,0
62,user3,123456,男,开通,2013-12-14,0,0
64,kai.zhou,123456,?,??,2014-03-05,65,0
65,test1,111,男,未开通,null,0,0
66,test2,111,男,未开通,null,0,0
67,test3,113,男,未开通,null,0,0
三、新插入三条记录到MySQL数据库表中

mysql> select * from users;
+----+-------------+----------+-----+-----------+------------+-------+------+
| id | username    | password | sex | content   | datetime   | vm_id | isad |
+----+-------------+----------+-----+-----------+------------+-------+------+
| 56 | hua         | hanyun   | 男  | 开通      | 2013-12-02 |     0 |    1 |
| 58 | feng        | 123456   | 男  | 开通      | 2013-11-22 |     0 |    0 |
| 59 | test        | 123456   | 男  | 开通      | 2014-03-05 |    58 |    0 |
| 60 | user1       | 123456   | 男  | 开通      | 2014-06-26 |    66 |    0 |
| 61 | user2       | 123      | 男  | 开通      | 2013-12-13 |    56 |    0 |
| 62 | user3       | 123456   | 男  | 开通      | 2013-12-14 |     0 |    0 |
| 64 | kai.zhou    | 123456   | ?   | ??        | 2014-03-05 |    65 |    0 |
| 65 | test1       | 111      | 男  | 未开通    | NULL       |     0 |    0 |
| 66 | test2       | 111      | 男  | 未开通    | NULL       |     0 |    0 |
| 67 | test3       | 113      | 男  | 未开通    | NULL       |     0 |    0 |
| 68 | sqoopincr01 | 113      | 男  | 未开通    | NULL       |     0 |    0 |
| 69 | sqoopincr02 | 113      | 男  | 未开通    | NULL       |     0 |    0 |
| 70 | sqoopincr03 | 113      | 男  | 未开通    | NULL       |     0 |    0 |
+----+-------------+----------+-----+-----------+------------+-------+------+
13 rows in set (0.00 sec)

四、使用Append模式,增量导入

[hadoopUser@secondmgt ~]$ sqoop import --connect jdbc:mysql://secondmgt:3306/spice --username hive --password hive --table users --check-column id --incremental append  --target-dir /output/incrimport/
       此处我们--check-column 后接关键字列id,作为检查列,增量导入模式为--incremental append 

       查看结果:

[hadoopUser@secondmgt ~]$ hadoop fs -cat /output/incrimport/*
56,hua,hanyun,男,开通,2013-12-02,0,1
58,feng,123456,男,开通,2013-11-22,0,0
59,test,123456,男,开通,2014-03-05,58,0
60,user1,123456,男,开通,2014-06-26,66,0
61,user2,123,男,开通,2013-12-13,56,0
62,user3,123456,男,开通,2013-12-14,0,0
64,kai.zhou,123456,?,??,2014-03-05,65,0
65,test1,111,男,未开通,null,0,0
66,test2,111,男,未开通,null,0,0
67,test3,113,男,未开通,null,0,0
56,hua,hanyun,男,开通,2013-12-02,0,1
58,feng,123456,男,开通,2013-11-22,0,0
59,test,123456,男,开通,2014-03-05,58,0
60,user1,123456,男,开通,2014-06-26,66,0
61,user2,123,男,开通,2013-12-13,56,0
62,user3,123456,男,开通,2013-12-14,0,0
64,kai.zhou,123456,?,??,2014-03-05,65,0
65,test1,111,男,未开通,null,0,0
66,test2,111,男,未开通,null,0,0
67,test3,113,男,未开通,null,0,0
68,sqoopincr01,113,男,未开通,null,0,0
69,sqoopincr02,113,男,未开通,null,0,0
70,sqoopincr03,113,男,未开通,null,0,0
       由上查看结果可知,里面有之前导入的重复值,也就是说Sqoop又执行了一次完整导入,只是在原来的基础上append。很显然,不符合我们的要求,但是为什么会这样呢?原来是我们没有指定上次导入后最后的值即没有设置--last-value参数。修改后如下:

[hadoopUser@secondmgt ~]$ sqoop import --connect jdbc:mysql://secondmgt:3306/spice --username hive --password hive --table users --check-column id --incremental append --last-value 67 --target-dir /output/incrimport/
Warning: /usr/lib/hcatalog does not exist! HCatalog jobs will fail.
Please set $HCAT_HOME to the root of your HCatalog installation.
15/01/18 12:40:39 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
15/01/18 12:40:40 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
15/01/18 12:40:40 INFO tool.CodeGenTool: Beginning code generation
15/01/18 12:40:40 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `users` AS t LIMIT 1
15/01/18 12:40:40 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `users` AS t LIMIT 1
15/01/18 12:40:40 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /home/hadoopUser/cloud/hadoop/programs/hadoop-2.2.0
Note: /tmp/sqoop-hadoopUser/compile/315cec389ff097cf4f6de538734929c1/users.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
15/01/18 12:40:41 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-hadoopUser/compile/315cec389ff097cf4f6de538734929c1/users.jar
15/01/18 12:40:41 INFO tool.ImportTool: Maximal id query for free form incremental import: SELECT MAX(`id`) FROM users
15/01/18 12:40:41 INFO tool.ImportTool: Incremental import based on column `id`
15/01/18 12:40:41 INFO tool.ImportTool: Upper bound value: 70
15/01/18 12:40:41 WARN manager.MySQLManager: It looks like you are importing from mysql.
15/01/18 12:40:41 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
15/01/18 12:40:41 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
15/01/18 12:40:41 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
15/01/18 12:40:41 INFO mapreduce.ImportJobBase: Beginning import of users
15/01/18 12:40:41 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/hadoopUser/cloud/hadoop/programs/hadoop-2.2.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/hadoopUser/cloud/hbase/hbase-0.96.2-hadoop2/lib/slf4j-log4j12-1.6.4.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
15/01/18 12:40:42 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
15/01/18 12:40:42 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
15/01/18 12:40:42 INFO client.RMProxy: Connecting to ResourceManager at secondmgt/192.168.2.133:8032
15/01/18 12:40:43 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`id`), MAX(`id`) FROM `users` WHERE ( `id` <= 70 )
15/01/18 12:40:43 INFO mapreduce.JobSubmitter: number of splits:4
15/01/18 12:40:43 INFO Configuration.deprecation: mapred.job.classpath.files is deprecated. Instead, use mapreduce.job.classpath.files
15/01/18 12:40:43 INFO Configuration.deprecation: user.name is deprecated. Instead, use mapreduce.job.user.name
15/01/18 12:40:43 INFO Configuration.deprecation: mapred.cache.files.filesizes is deprecated. Instead, use mapreduce.job.cache.files.filesizes
15/01/18 12:40:43 INFO Configuration.deprecation: mapred.cache.files is deprecated. Instead, use mapreduce.job.cache.files
15/01/18 12:40:43 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
15/01/18 12:40:43 INFO Configuration.deprecation: mapred.output.value.class is deprecated. Instead, use mapreduce.job.output.value.class
15/01/18 12:40:44 INFO Configuration.deprecation: mapreduce.map.class is deprecated. Instead, use mapreduce.job.map.class
15/01/18 12:40:44 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
15/01/18 12:40:44 INFO Configuration.deprecation: mapreduce.inputformat.class is deprecated. Instead, use mapreduce.job.inputformat.class
15/01/18 12:40:44 INFO Configuration.deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
15/01/18 12:40:44 INFO Configuration.deprecation: mapreduce.outputformat.class is deprecated. Instead, use mapreduce.job.outputformat.class
15/01/18 12:40:44 INFO Configuration.deprecation: mapred.cache.files.timestamps is deprecated. Instead, use mapreduce.job.cache.files.timestamps
15/01/18 12:40:44 INFO Configuration.deprecation: mapred.output.key.class is deprecated. Instead, use mapreduce.job.output.key.class
15/01/18 12:40:44 INFO Configuration.deprecation: mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir
15/01/18 12:40:44 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1421373857783_0013
15/01/18 12:40:44 INFO impl.YarnClientImpl: Submitted application application_1421373857783_0013 to ResourceManager at secondmgt/192.168.2.133:8032
15/01/18 12:40:44 INFO mapreduce.Job: The url to track the job: http://secondmgt:8088/proxy/application_1421373857783_0013/
15/01/18 12:40:44 INFO mapreduce.Job: Running job: job_1421373857783_0013
15/01/18 12:40:57 INFO mapreduce.Job: Job job_1421373857783_0013 running in uber mode : false
15/01/18 12:40:57 INFO mapreduce.Job:  map 0% reduce 0%
15/01/18 12:41:07 INFO mapreduce.Job:  map 25% reduce 0%
15/01/18 12:41:12 INFO mapreduce.Job:  map 75% reduce 0%
15/01/18 12:41:16 INFO mapreduce.Job:  map 100% reduce 0%
15/01/18 12:41:16 INFO mapreduce.Job: Job job_1421373857783_0013 completed successfully
15/01/18 12:41:16 INFO mapreduce.Job: Counters: 27
        File System Counters
                FILE: Number of bytes read=0
                FILE: Number of bytes written=368748
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=401
                HDFS: Number of bytes written=522
                HDFS: Number of read operations=16
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=8
        Job Counters
                Launched map tasks=4
                Other local map tasks=4
                Total time spent by all maps in occupied slots (ms)=172816
                Total time spent by all reduces in occupied slots (ms)=0
        Map-Reduce Framework
                Map input records=13
                Map output records=13
                Input split bytes=401
                Spilled Records=0
                Failed Shuffles=0
                Merged Map outputs=0
                GC time elapsed (ms)=221
                CPU time spent (ms)=11650
                Physical memory (bytes) snapshot=620658688
                Virtual memory (bytes) snapshot=3546619904
                Total committed heap usage (bytes)=335544320
        File Input Format Counters
                Bytes Read=0
        File Output Format Counters
                Bytes Written=522
15/01/18 12:41:16 INFO mapreduce.ImportJobBase: Transferred 522 bytes in 34.1265 seconds (15.296 bytes/sec)
15/01/18 12:41:16 INFO mapreduce.ImportJobBase: Retrieved 13 records.
15/01/18 12:41:16 INFO util.AppendUtils: Appending to directory incrimport
15/01/18 12:41:16 INFO util.AppendUtils: Using found partition 4
15/01/18 12:41:16 INFO tool.ImportTool: Incremental import complete! To run another incremental import of all data following this import, supply the following arguments:
15/01/18 12:41:16 INFO tool.ImportTool:  --incremental append
15/01/18 12:41:16 INFO tool.ImportTool:   --check-column id
15/01/18 12:41:16 INFO tool.ImportTool:   --last-value 70
15/01/18 12:41:16 INFO tool.ImportTool: (Consider saving this with 'sqoop job --create')
         再次查看结果:

[hadoopUser@secondmgt ~]$ hadoop fs -cat /output/incrimport/*
56,hua,hanyun,男,开通,2013-12-02,0,1
58,feng,123456,男,开通,2013-11-22,0,0
59,test,123456,男,开通,2014-03-05,58,0
60,user1,123456,男,开通,2014-06-26,66,0
61,user2,123,男,开通,2013-12-13,56,0
62,user3,123456,男,开通,2013-12-14,0,0
64,kai.zhou,123456,?,??,2014-03-05,65,0
65,test1,111,男,未开通,null,0,0
66,test2,111,男,未开通,null,0,0
67,test3,113,男,未开通,null,0,0
68,sqoopincr01,113,男,未开通,null,0,0
69,sqoopincr02,113,男,未开通,null,0,0
70,sqoopincr03,113,男,未开通,null,0,0
    此时结果正确!

五、使用lastmodified,增量导入

       使用lastmodified增量导入,指定列--check-column后面需要接的是一个时间戳列,然后再使用--last-value后接最近的一个时间,以此实现修改操作的导入。此处不详细介绍。

       注意:不管是使用哪种模式导入,--check-column、--incremental和--last-value三个参数在命令中都必须同时使用。


推荐阅读:

        上一篇: Sqoop1.4.4使用SQL语句形式将MySQL数据库表中数据导入到HDFS中

        下一篇:Sqoop1.4.4将文件数据集从HDFS中导出到MySQL数据库表中