sparksql

Spark SQL
一、sparkSQL的特点
1.支持多种数据源:hive RDD Partquet JSON JDBC
2.多种性能优化技术:in-memory columnar storage  byte-code generation  cost model 动态评估
3.组件扩展性:对于SQL的语法解析器、分析器、以及优化器,用户都可以自己重新开发,并且动态扩展

Spark sql 的性能优化技术简介

1.内存列存储(in-memory columnar storage)
内存列存储意味着spark sql的数据,不是使用java对象的方式进行存储,而是使用面向列存储的方式进行存储,也就是说,每一列,作为一个数据存储的单位,从而大大优化内存使用的效率。使用列存储之后,减少对内存的消耗,也就避免了对GC(垃圾回收)大量数据的性能开销

2.字节码生成技术(byte-code generation )
Spark sql在其catalyst模块的Expressions中增加一codegen模块,对于sql语句中的计算表达式,比如select num + num from t 这种sql,就可以使用动态字节码生成技术来优化其性能。

3.Scala代码编写的优化
对于Scala代码编写中,可能会造成大量的性能的开销,自己重写,使用更加复杂的方式,来获取更好的性能。比如option样例类、for循环、map/filter/foreach等高阶函数,以及不可变对象,都改成用null,while循环来实现,并且重用可变的对象


二、dataframe的使用
1.spark sql 和 dataframe引言
Spark sql 是spark中的一个模块,主要是进行结构化数据的处理。他提供的最核心的编程抽象,就是dataframe。同时spark sql 还可以作为分布式的sql查询引擎。Spark sql最重要的功能之一就是从hive中查询数据

Dataframe,可以理解为时,以列的形式组织的,分布式的数据集合,他其实和关系型数据库中的表非常类似,但是底层做了很对的优化。Dataframe可以通过很多来源,包括:结构化数据文件,hive表,外部关系型数据库以及RDD


2.SQLContext
要使用spark sql ,首先就得创建一个SQLContext对象,或者是他的子类的对象(HiveContext),比如HiveContext对象;

Java版本:
JavaSparkContext sc = ....;
SQLContext SQLContext = new SQLContext(sc);

Scala版本:
Val sc = SparkContext..
Val SQLContext = new SQLContext(sc)
Import SQLContext.implicits._

3.HiveContext 
除了基本的SQLContext以外,还可以使用它的子类---HiveContext。HiveContext的功能除了包含SQLContext提供的所有的功能外,还包括额外的专门针对hive的一些功能。这些额外的功能包括:使用hive语法编写和执行sql,使用hive的UDF函数,从hive表中读取数据

要使用HiveContext,就必须预先安装好hive,SQLContext支持的数据源,HiveContext也同样支持,而不只是支持hive,对spark1.3.x以上的版本,都推荐使用HiveContext,因为其功能更加丰富和完善

Spark sql 还支持使用spark.sql.dialect参数设置sql方言,使用SQLContext的serConf()即可进行设置。对与SQLContext,他只支持”sql”一种方言。对于HiveContext,它默认的方言是“hiveql”




4.创建Dataframe
使用SQLContext,可以从RDD、hive表中或者其他额数据源,来创建dataframe。

Java版本:
package com.spark.spark_sql;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

/**
 * 使用json文件创建dataframe
 * @author Administrator
 */
public class DataFrameCreate {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("DataFrameCreate");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext SQLContext = new SQLContext(sc);
        DataFrame df = SQLContext.read().json("hdfs://hadoop01:8020/spark_input/students.json");
        df.show();

    }
}


提交至集群:运行打包,上传。编写脚本进行提交
【这里是一台机器测试】
bin/spark-submit 
--class com.spark.spark_sql.DataFrameCreate 
--files /opt/modules/apache-hive-0.13.1-bin/conf/hive-site.xml 
--driver-class-path /opt/softwares/mysql-connector-java-5.1.27-bin.jar 
/opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6/sql/sparksql_01.jar

 运行的结果
+---+---+--------+
|age| id|    name|
+---+---+--------+
| 10|  1|     leo|
| 25|  2|    kity|
| 30|  4|    lucy|
| 20|  3|     tom|
| 18|  7|    jack|
| 23| 10|  edison|
| 36|  5|    owen|
| 20|  8|    jiny|
| 40|  6|    lisi|
| 45|  9|zhangsan|
+---+---+--------+












Scala版本:
package com.spark.spark_sql
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext

object DataFrameCreate {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("DataFrameCreate")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val df = sqlContext.read.json("hdfs://hadoop01:8020/spark_input/student.json")
    df.show
  }
}


Dataframe常用的操作
java
package com.spark.spark_sql;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

/**
 * Dataframe的常用的操作
 * @author Administrator
 *
 */
public class DataFrameOperation {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("DataFrameOperation");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext SQLContext = new SQLContext(sc);
        //创建出来的dataframe完全可以可以理解为一张表
        DataFrame df = SQLContext.read().json("hdfs://hadoop01:8020/input/students.txt");
        //打印dataframe中所有的数据
        df.show();
        //打印dataframe中元数据的信息
        df.printSchema();
        //查询某列所有的数据
        df.select("name").show();
        //查询某几列所有的数据,并对列进行计算
        df.select(df.col("name"), df.col("age").plus(1)).show();//将查询出来的age进行加一
        //根据某一列的值进行过滤
        df.filter(df.col("age").gt(30)).show();                //年龄大于30的进行过滤
        //根据某一列进行分组聚合
        df.groupBy(df.col("age")).count().show();
    }
}

Scala
package com.spark.spark_sql
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext

object DataframeOperation {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("DataframeOperation")
    val sc = new SparkContext(conf)
    val SQLContext = new SQLContext(sc)
    val df = SQLContext.read.json("hdfs://hadoop01:8020/spark_input/student.json" 
    df.show
    df.printSchema()
    df.select("name").show
    df.select(df("name"),df("age")+1).show
    df.filter(df("age") > 30).show
    df.groupBy("age").count().show
 }
}


5.RDD与Dataframe之间转换
为什么要将RDD转换为Dataframe?因为这样的话,我们就可以直接针对hdfs上的任何可以构建为RDD的数据,使用spark sql 进行sql查询,这个功能无比强大。想象一下,针对hdfs中的数据,直接就可以使用sql进行查询

Spark sql 支持两种方式来将RDD转换为Dataframe

第一种方式,是使用反射来推断包含了特定数据类型的RDD的元数据,这种基于反射的方式,代码比较简单,当你已经知道你的RDD的元数据时,是一种非常不错的方式。

第二种方式,是通过编程接口来创建dataframe,你可以在程序运行的时候动态构建一份元数据,然后将其应用到已经存在的RDD上,这种方式的代码比较冗长,但是如果在编写程序时,还不知道RDD的元数据,只有在程序运行时,才能动态得知元数据,那么只能通过这种动态构建元数据的方式。


1.使用反射的方式推断元数据
Java版本:spark sql是支持将包含javaBean的RDD转换为dataframe的,Javabean的信息,就定义了元数据。Spark sql 现在是不支持包含嵌套javabean或者list等复杂元数据的Javabean。

Scala版本:而scala由于具有隐式转换的特性,所以spark sql的scala接口,是支持自动将包含case class 的RDD转换为Dataframe的。Case class 就定义了元数据。Spark sql 会通过反射读取传递给 case class 的参数的名称,然后将其作为列名。与java不同的是,Spark sql是支持将包含了嵌套的数据结构的case class作为元数据的,比如包含了Array等。
Java版本:
package com.spark.spark_sql;
import java.io.Serializable;
public class Student implements Serializable {
    private static final long serialVersionUID = 1L;
    private int id;
    private String name;
    private int age;
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public int getAge() {
        return age;
    }
    public void setAge(int age) {
        this.age = age;
    }
    public Student() {
    }
    public Student(int id, String name, int age) {
        super();
        this.id = id;
        this.name = name;
        this.age = age;
    }
    @Override
    public String toString() {
        return "Student [;
    }

}


package com.spark.spark_sql;

import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;

/**
 * 使用反射的方式将RDD转换为dataframe
 * @author Administrator
 *
 */
public class RDD2DataframeReflection {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("RDD2DataframeReflection");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext SQLContext = new SQLContext(sc);
        JavaRDD<String> lines = sc.textFile("E://student.txt");
        JavaRDD<Student> students = lines.map(new Function<String, Student>() {
            private static final long serialVersionUID = 1L;

            @Override
            public Student call(String line) throws Exception {
                String[] split = line.split(",");
                Student stu = new Student();
                stu.setId(Integer.parseInt(split[0]));
                stu.setName(split[1]);
                stu.setAge(Integer.parseInt(split[2]));
                return stu;
            }
        });
        //使用反射的方式将RDD转换为dataframe
        
        //将student.class传入进去其实就是通过反射的方式来创建dataframe
        //因为student.class 本身就是反射的一个应用
        //然后底层还得通过student class 进行反射。来获取其中的fields
        //这里要求Javabean要实现Serializable接口,可以序列化
DataFrame studentDF = sqlContext.createDataFrame(students, Student.class);
        
        //拿到一个dataframe之后,就可以将其注册为一张临时表,然后针对其中的数据进行sql语句
        studentDF.registerTempTable("student");
        
        //针对student临时表执行sql语句,查询年龄大于20岁的学生
        DataFrame df =sqlContext.sql("select * from student where age > 20");
        
        //将查询出来的dataframe再次转换为RDD
        JavaRDD<Row> teenagerRDD = df.javaRDD();
        
        //将RDD中的数据,进行映射,给每个人的年龄,然后映射为student
        JavaRDD<Student> teenagerStudentRDD = teenagerRDD.map(new Function<Row, Student>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Student call(Row row) throws Exception {
                //row中的顺序是按照字典顺序进行排列的
                Student student = new Student();
                student.setAge(row.getInt(0));
                student.setId(row.getInt(1));
                student.setName(row.getString(2));
                return student;
            }
        });
        
        //将数据collect 回来,打印出来
        List<Student> studentList = teenagerStudentRDD.collect();
        for (Student student : studentList) {
            System.out.println(student);
        }
        
    }

}

Scala版本:
package com.spark.spark_sql

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
/**
 * 如果要使用scala开发spark程序
 * 然后在其中还要实现基于反射的RDD到dataframe的转换,就必须得用object  extends App的方式
 * 不能使用def main()方法的方式来运行程序,否则就会报错no typetag for ... class
 */
object RDD2DaframeReflection extends App{
  
    val conf = new SparkConf().setMaster("local").setAppName("RDD2DaframeReflection")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val lines = sc.textFile("e://student.txt", 1)
    
    //在scala中使用反射的方式,进行RDD到Dataframe的转换,需要手动的导入一个隐士转换
    import SQLContext.implicits._
    case class Student(id : Int ,name : String ,age : Int)
    
    //这里其实就是一个普通的,元素是case class 的rdd
    val students = lines.map(line =>line.split(",")).map(arr => Student(arr(0).trim().toInt,arr(1),arr(2).trim().toInt))
    
    //直接使用RDD的toDF,即可将其转换为dataframe
    val studentDF=students.toDF()
    
    //注册为一个临时表
    studentDF.registerTempTable("student")
    
    //
    val teenagerDF = sqlContext.sql("select * from student where age > 20")
    
    val teenagerRDD = teenagerDF.rdd
    
    //在scala中,row中的数据的顺序,反而是按照我们期望的来排列的,这个是跟java是不一样的
    teenagerRDD.map{row => Student(row(0).toString().toInt,row(1).toString(),row(2).toString().toInt )
    
    }.collect().foreach(stu => print(stu.id+ ":" +stu.name+":"+stu.age))
    
    
    //在scala中,对row的使用比java中的row更加的丰富
    //在scala中,可以用row的getAs()方法,获取指定列名的列
    teenagerRDD.map(row =>Student(row.getAs[Int]("id"),row.getAs("name"),row.getAs("age"))).collect()
      .foreach(stu => print(stu.id+ ":" +stu.name+":"+stu.age))
    
      //还可以通过row的getValuesMap,方法,获取指定几列的值,返回的是一个map
    teenagerRDD.map(row => {
      val map = row.getValuesMap(Array("id","name","age"))
      Student(map("id").toString().toInt,map("name").toString(),map("age").toString().toInt)
    }).collect().foreach(stu => print(stu.id+ ":" +stu.name+":"+stu.age))
}




2.通过编程接口来创建dataframe
Java版本:
package com.spark.spark_sql;

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
 * 以编程的方式动态的执行元数据,将RDD转化为dataframe
 * @author Administrator
 *
 */
public class RDD2DataProgrammatically {
public static void main(String[] args) {
    
    //创建sparkconf
    SparkConf conf = new SparkConf().setMaster("local").setAppName("RDD2DataProgrammatically");
    JavaSparkContext sc = new JavaSparkContext(conf);
    SQLContext sqlContext = new SQLContext(sc);
    
    //第一步,创建一个普通的RDD,但是必须将其转换为RDD<Row>的格式
        JavaRDD<String> lines = sc.textFile("e://student.txt");
        
        JavaRDD<Row> rows = lines.map(new Function<String, Row>() {
            private static final long serialVersionUID = 1L;

            @Override
            public Row call(String line) throws Exception {
                String[] split = line.split(",");
                
                return RowFactory.create(Integer.parseInt(split[0]),split[1],Integer.parseInt(split[2]));
            }
        });
        //第二步,动态元数构造据
        //比如说,id name age 等fields的名称和类型都是在程序运行过程中,
        //动态的从MySQL等DB中或者是配置文件中,加载出来的,是不固定的
        //所以特别适合这种编程的方式来构造元数据
        
        List<StructField> fields = new ArrayList<StructField>();
        
        fields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));
        fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
        fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
        
        StructType structType = DataTypes.createStructType(fields);
        
        //第三部,使用动态构造元数据,将RDD转换为dataframe
        DataFrame studentDF = sqlContext.createDataFrame(rows, structType);
        
        //后面就可以直接使用这个df了
        //注册临时表
        studentDF.registerTempTable("student");
        DataFrame stus = sqlContext.sql("select * from student where age > 20");
        
        List<Row> list = stus.javaRDD().collect();
        
        for (Row row : list) {
            System.out.println(row);
        }    
}
}
Scala版本:
package com.spark.spark_sql

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType

object RDD2DataframeProgramatically extends App{
 val conf = new SparkConf().setMaster("local").setAppName("RDD2DataframeProgramatically");
  
  val sc = new SparkContext(conf)
  
  val sqlContext = new SQLContext(sc)
  
  //第一步:构造出元素为Row的普通的RDD
  val studentRDD = sc.textFile("e://student.txt", 1)
      .map(line => Row(
            line.split(",")(0).toInt,
            line.split(",")(1),
            line.split(",")(2).toInt))
  
   //第二步:以编程的方式构造元数据
   val structType =StructType(Array(
       StructField("id",IntegerType,true),
       StructField("name",StringType,true),
       StructField("age",IntegerType,true)))
  
  //第三步:进行RDD到dataframe的转换
       
   val studentDF = SQLContext.createDataFrame(studentRDD,structType )  
   
   studentDF.registerTempTable("student");
  
  val teenagerDF = sqlContext.sql("select * from student where age > 20")
  
   teenagerDF.rdd.collect.foreach(row => println(row))
  

  
}


6.通用的load和save操作
1.dataframe的load和save

对于的spark sql的dataframe来说,无论是从什么数据源创建出来的dataframe,都有一些共同的load和save操作,load操作主要是用于加载数据,创建出来的dataframe:save操作,主要用于将dataframe中的数据集保存到文件中(保存到的是一个目录中)

Java版本:
Dataframe df = sqlContext.read().load(“users.parquet”);
df.select(“name”,”facourite_color”).write().save(“nameAndFav_color_dir”);


Scala版本:
Val df = sqlContext.read.load(“users.parquet”)
Df.select(“name”,”facourite_color”).write().save(“nameAndFav_color_dir”)


Java版本:
package com.spark.spark_sql;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
/**
 * 通用的load和save操作
 * @author Administrator
 */
public class GenericLoadSave {
    public static void main(String[] args) {
        
    SparkConf conf = new SparkConf().setMaster("local").setAppName("GenericLoadSave");
    JavaSparkContext sc = new JavaSparkContext(conf);
    SQLContext sqlContext = new SQLContext(sc);
    DataFrame usersDF = SQLContext.read().load("C://Users//Administrator//Desktop//users.parquet");
    
    usersDF.printSchema();
    usersDF.show();
    usersDF.select("name","favourite_color").write().save("e://users2");
    }
}

Scala版本:
package com.spark.spark_sql

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import scala.tools.scalap.Main
import org.apache.spark.sql.DataFrame

object GenericLoadSave {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("GenericLoadSave")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val  usersDF = sqlContext.read.load("hdfs://hadoop01:8020/spark_input/users.parquet")
    usersDF.select("name","favourite_color").write.save("hdfs://hadoop01:8020/spark_output")

  }
}

2.手动指定数据源:

也可以手动指定用来操作的数据源,数据源通常是使用其权限定名来指定,比如parquet是org.apache.spark.sql.parquet。但是spark sql内置了一些数据源类型,比如json,parquet.jdbc等等,实际上,通过这个功能,就可以在不同类型的数据源之间进行转换了。比如将json文件中的数据存储到parquet文件中。默认情况下,如果不指定数据源,默认就是parquet
Java版本:
DataFrame df =SQLContext.read().format(“json”).load(“people,json”)
Df.select(“name”,”age”).write().format(“parquet”).save(“out”)

Scala版本:
Val df = SQLContext.read.format(“json”).load(“people,json”)
Df.select(“name”,”age”).write.format(“parquet”).save(“out”) 


Java版本
     
package com.spark.spark_sql;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

/**
 * 手动指定数据源
 * @author Administrator
 */
public class ManuallySpecifyOptions {
    public static void main(String[] args) {
        
        SparkConf conf = new SparkConf().setMaster("local").setAppName("ManuallySpecifyOptions");
        
        SparkContext sc = new SparkContext(conf);
        
        SQLContext SQLContext = new SQLContext(sc);
        
        DataFrame df = SQLContext.read().format("json").load("e://users.parquet");
        
        df.write().format("json").save("e://out");
        
    }
}


3.Save mode 

Spark sql 对于save操作,提供了不同的save mode.主要用来处理,当目标位置已经有数据时,应该如何处理。而且save操作并不会执行锁操作,并且不是原子的,因此是有一定风险出现脏数据的

Save mode    意义
SaveMode.ErrorIfExists(默认)    如果目标位置存在数据,那么就抛出异常
SaveMode.Append    如果目标位置存在数据,那么就将数据追加进去
SaveMode.Overwrite    如果目标位置存在数据,那么就将已经存在的数据删除,用新的数据进行覆盖
SaveMode.Ignore    如果目标位置存在数据,那么就忽略,不做任何的操作
Java版本:
package com.spark.spark_sql;
import org.apache.derby.impl.tools.sysinfo.Main;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
/**
 * savemode 示例
 * @author Administrator
 */
public class SaveModeDemo {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("SaveModeDemo");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);
        DataFrame df = sqlContext.read().format("json").load("e://users.parquet");
        df.save("e://out", SaveMode.Append);

    }
}

三、Parquet数据源
1.使用编程方式加载数据
Parquet是面向分析型业务的列式存储格式,由Twitter和cloudera合作开发。2015年成为Apache的*项目

列式存储和行式存储相比较有哪些优点;
1.可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量
2.压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码(例如Run Length Encoding 和 Delta Encoding)进一步节约磁盘空间。
3.只读取需要的列,支持向量运算,能够获取更好的扫描性能
Java版本:

package com.spark.spark_sql;

import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;

/**
 * parquet数据源之使用编程方式加载数据
 */
public class ParquetLoadData {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("ParquetLoadData");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);
        //读取parquet文件中的数据,创建一个dataframe
        DataFrame userDF = sqlContext.read().parquet("e://users.parquet");
        
        //将其注册为临时表,使用sql查询所需要的数据
        userDF.registerTempTable("users");
        
        DataFrame userNamesDF = sqlContext.sql("select name from users");
        
        //对查询出来的dataframe进行transformation操作,处理数据,然后打印出来
        List<String> userNames = userNamesDF.javaRDD().map(new Function<Row, String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public String call(Row row) throws Exception {
                
                return "Name: "+row.getString(0);
            }
        }).collect();
        
        for (String name : userNames) {
            System.out.println(name);
        }
    }
}



Scala版本:
package com.spark.spark_sql

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext

object PatquetLoadData {
  
  val conf = new SparkConf().setAppName("PatquetLoadData")
  val sc = new SparkContext(conf)
  val sqlContext =new SQLContext(sc)
  val usersDF = sqlContext.read.parquet("hdfs://hadoop01:8020/spark_input/users.parquet");
  usersDF.registerTempTable("user");
  val userNameDF = sqlContext.sql("select * from user")
  userNameDF.rdd.map(row => "Name: "+row(0)).collect.foreach(userName =>println(userName))
}
2.自动分区推断
表分区是一张常见的优化的方式,比如hive中就提供分区表的特性。在一个分区表中,不同分区的数据通常存储在不同的目录中,分区列的值通常就包含在了分区的目录名中。Spark sql中的parquet数据源,支持自动根据目录名推断出分区信息。例如,如果将入口数据存储在分区表中,并且使用性别和国家作为分区列。那么目录结构可能是如下所示:

|----tableName
|----gender=male
  |----country=US
          |.....
  |----country=ZH
|----gender=female
  |--country=...
           |...
如果将tableName传入SQLContext.read.parquet()或者SQLContext.read.load()方法,那么sparksql就会自动根据目录的结构,推断出分区的信息,是gender和country。即使数据文件中包含两列的值name和age,但是sparksql返回的dataframe,调用printSchema()方法时,会打印出四个列的值:name age country gender .这就是自动分区推断的功能

此外,分区的列的数据类型,也是自动被推断出来的。目前,spark sql仅支持自动推断出数字类型和字符串类型。有时,用户也许不希望spark sql自动推断分区列的数据类型。此时只要设置一个配置即可,spark.sql.sources.partitionColumnTypeInference.enabled,默认是true,即自动推断分区列的类型,设置为false,即不会自动推断类型。进行自定推断分区列的类型时,所有的分区列的类型,就统一默认的是String

案列:
1.hdfs上创建相对应的目录结构:
bin/hdfs dfs -mkdir -p /spark_input/gender=male/country=US
2.将文件上传到目录下
bin/hdfs dfs -put users.parquet /spark_input/gender=male/country=US/

3.查询出schema
package com.spark.spark_sql;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
/**
 *parquet数据源之 自动推断分区
 */
public class ParquetPartitionDiscovery {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("ParquetPartitionDiscovery").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);
        DataFrame userDF = sqlContext.read().parquet("hdfs://hadoop01:8020/spark_input/gender=male/country=US/users.parquet");
        userDF.printSchema();
         userDF.show();
    }
}


4.结果:
root
 |-- name: binary (nullable = true)
 |-- favourite_color: binary (nullable = true)
 |-- gender: string (nullable = true)
 |-- country: string (nullable = true)

+----------------+-------------------+------+-------+
|            name|    favourite_color|gender|country|
+----------------+-------------------+------+-------+
|      [6C 65 6F]|         [72 65 64]|  male|     US|
|   [6A 61 63 6B]|[79 65 6C 6C 6F 77]|  male|     US|
|[6B 69 74 74 79]|   [77 68 69 74 65]|  male|     US|
|      [74 6F 6D]|   [67 72 65 65 6E]|  male|     US|
|      [61 6C 6C]|      [70 69 6E 6B]|  male|     US|
+----------------+-------------------+------+-------+
【注意】前面是因为parquet数据的问题,自己从hive中导出来的。可以利用json格式的数据load,然后save的时候以parquet的方式生成一个parquet文件,用于测试

3.合并元数据
如同ProtocolBuffer,Avro,Thrift一样,Parquet也是支持元数据合并的。用户可以一开始就定义一个简单的元数据,然后随着业务需要。逐渐往元数据中添加更多的列,在这种情况下,用户可能会创建多个parquet文件,有着多个不同的却互相兼容的元数据。Parquet数据源支持自动推断这种情况,并且进行多个parquet文件的元数据的合并

因为元数据合并是一种相对耗时的操作,而且在大多数的情况下不是一种必要的特性,从spark1.5.0版本开始,默认是关闭parquet文件的自动合并元数据的。可以通过以下的两种方式开启parquet数据源的自动合并元数据的特性

1.读取parquet文件时,将数据源的选项,mergeSchema,设置为true

2.根据sqlContext.setConf()方法,将spark.paquet.mergeSchema参数设置为true

案例:
合并学生的基本信息和成绩信息的元数据
package com.spark.spark_sql

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SaveMode

object ParquetMergeSchema {
  def main(args: Array[String]): Unit = {
    
 
  val conf = new SparkConf().setAppName("ParquetMergeSchema")
  val sc = new SparkContext(conf)
  val sqlContext = new SQLContext(sc)
  
  import SQLContext.implicits._
  //创建一个dataframe,作为学生的基本信息,并写入一个parquet文件中
  val studentsWithNameAge =Array(("leo",23),("jack",25))
  
  val studentsWithNameAgeDF = sc.parallelize(studentsWithNameAge, 2).toDF("name","age")
  studentsWithNameAgeDF.save("hdfs://hadoop01:8020/spark_out/students","parquet", SaveMode.Append)
  
  //创建一个dataframe,作为学生的成绩信息,并写入一个parquet文件中
   val studentsWithNameGrade =Array(("marry","A"),("jack","B"))
  
  val studentsWithNameGradeDF = sc.parallelize(studentsWithNameGrade, 2).toDF("name","age")
  studentsWithNameGradeDF.save("hdfs://hadoop01:8020/spark_out/students","parquet", SaveMode.Append)
  
  
  //首先,第一个dataframe和第二个dataframe的元数据肯定是不一样的
  //一个包含了name和age两个列,一个是包含name和grade两个列
  //所以,这期望的是,读取出来的表数据,自动合并两个文件的元数据,出现三列,name age grade 
  
  
  //用mergeSchema的方式,读取students表中的数据,进行元数据的合并
  val studentsDF = sqlContext.read.option("mergeSchema", "true").parquet("hdfs://hadoop01:8020/spark_out/students")
  
  studentsDF.printSchema();
  studentsDF.show();
  
  } 
}
四、JSON数据源
Spark sql 可以自动推断JSON文件的元数据,并且加载其数据,创建一个dataframe。可以使用SQLContext.read.json()方法,针对一个元素为String的RDD,或者是一个JSON文件

但是要注意的是,这里使用的JSON文件与传统意义上的JSON文件是不一样的。每行都必须也只能包含一个,单独的,自包含的,有效的JSON对象。不能让json对象分散在多行。否则会报错

综合性复杂案例:查询成绩为80分以上的学生的基本信息与成绩信息
Java 版本
package com.spark.spark_sql;

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import scala.Tuple2;
/**
 * json数据源
 */
public class JSONDataSource {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("JSONDataSource").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);

        // 针对json文件,创建出dataframe
        DataFrame studentDF = sqlContext.read().json("e://student.json");    //hdfs://hadoop01:8020/spark_input/student.json
        // 针对学生成绩信息的dataframe,注册临时表,查询分数大于80分的学生的姓名和分数
        studentDF.registerTempTable("student");

        DataFrame stuNameDF = sqlContext.sql("select name,score from student where score > 80");

        List<String> goodName = stuNameDF.javaRDD().map(new Function<Row, String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public String call(Row row) throws Exception {

                return row.getString(0);
            }
        }).collect();

        // 针对JavaRDD<String> 创建dataframe
        List<String> studentInfoJSONs = new ArrayList<String>();
        studentInfoJSONs.add("{"name":"leo","age":18}");
        studentInfoJSONs.add("{"name":"marry","age":15}");
        studentInfoJSONs.add("{"name":"jack","age":30}");
        JavaRDD<String> studentInfoJSONsRDD = sc.parallelize(studentInfoJSONs);

        DataFrame studentInfoDF = sqlContext.read().json(studentInfoJSONsRDD);

        // 针对学生的基本信息的dataframe,注册临时表,然后查询分数大于80分的学生的基本信息

        studentInfoDF.registerTempTable("info");
        String sql = "select name , age  from  info where name in (";

        for (int i = 0; i < goodName.size(); i++) {
            sql += "'" + goodName.get(i) + "'";
            if (i < goodName.size() - 1) {
                sql += ",";
            }
        }
        sql += ")";
        DataFrame goodStuInfosDF = sqlContext.sql(sql);

        // 将两份数据的dataframe转换为javapairRDD,执行join transformation

        JavaPairRDD<String, Tuple2<Integer, Integer>> pairs = goodStuInfosDF.javaRDD()
                .mapToPair(new PairFunction<Row, String, Integer>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, Integer> call(Row row) throws Exception {

                        return new Tuple2<String, Integer>(row.getString(0), Integer.valueOf((int) row.getLong(1)));
                    }
                }).join(stuNameDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, Integer> call(Row row) throws Exception {
                        return new Tuple2<String, Integer>(row.getString(0), Integer.valueOf((int) row.getLong(1)));
                    }
                }));
        
        //将封装在RDD中好学生的全部信息,转换为一个javaRDD<Row>的格式
        JavaRDD<Row> rows = pairs.map(new Function<Tuple2<String,Tuple2<Integer,Integer>>, Row>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Row call(Tuple2<String, Tuple2<Integer, Integer>> t) throws Exception {
                
                return RowFactory.create(t._1,t._2._2,t._2._1);
            }
        });
        
        //创建一份元数据,将JavaRDD<Row>转换为dataframe
        List<StructField> fields = new ArrayList<StructField>();
        fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
        fields.add(DataTypes.createStructField("score", DataTypes.IntegerType, true));
        fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
        StructType structType = DataTypes.createStructType(fields);
        DataFrame goodStudentsDF =sqlContext.createDataFrame(rows, structType);
                
        //将好学生的全部信息保存到json文件中去
        goodStudentsDF.write().format("json").save("e://good");    //hdfs://hadoop01:8020/spark_out/goodStudent
        
    }
}

 
Scala版本:
package com.spark.spark_sql

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.IntegerType

object JSONDatasource {
  val conf = new SparkConf().setAppName("JSONDatasource")
  val sc = new SparkContext(conf)
  val sqlContext = new SQLContext(sc)
  val studentScoreDF =sqlContext.read.json("hdfs://hadoop01:8020/spark_input/student.json")
  studentScoreDF.registerTempTable("stu_score")
  val goodStuScoreDF = sqlContext.sql("select name , score from stu_score where score > 80")
  
  val goodStuNames = goodStuScoreDF.map(row => row(0)).collect()
  
  
  val studentInfoJSON =Array("{"name":"Leo","age":18}","{"name":"marry","age":15}","{"name":"jack","age":30}") 
  
  
  val studentInfoRDD =sc.parallelize(studentInfoJSON, 3)
  
  val studentInfoDF = sqlContext.read.json(studentInfoRDD)//json格式特有的,可以接受RDD
  
  studentInfoDF.registerTempTable("stu_info")
  
  var sql = "select name , age from stu_info where name in ("
  
  for(i <- 0 until goodStuNames.length){
    
    sql += "'"+goodStuNames(i)+"'"
    if(i < goodStuNames.length-1){
      sql += ","
    }
    
  }
  sql += ")"
  
  val goodStuInfoDF = sqlContext.sql(sql)
  
  val goodStuRDD = goodStuInfoDF.rdd.map{row => (row.getAs[String]("name"),row.getAs[Long]("age"))}.join (goodStuScoreDF.rdd.map{row => (row.getAs[String]("name"),row.getAs[Long]("score"))})
  
  val goodStuRows = goodStuRDD.map(info => Row(info._1,info._2._2.toInt,info._2._1.toInt))
  
  val structType = StructType(Array(StructField("name",StringType,true),StructField("score",IntegerType,true),StructField("age",IntegerType,true)))
  
  val goodStudentDF = sqlContext.createDataFrame(goodStuRows, structType)
  
  goodStudentDF.write.format("json").save("hdfs://hadoop01:8020/spark_out/good_scala")
  
  
}


五、Hive数据源
Spark sql 支持对hive中存储的数据进行读写,操作hive中的数据时,就必须创建HiveContext,而不是SQLContext。HiveContext继承SQLContext,但是增加了在hive元数据库中查找表,以及用hiveql语法编写SQL的功能。处理sql()方法外,HiveContext还提供hql()方法,从而用hive语法来编译sql。

使用HiveContext,可以执行Hive的大部分功能,包括创建表、往表里导入数据以及用SQL语句查询表中的数据,查询出来的数据是一个row数组

将hive-site.xml拷贝到spark/conf目录下,将mysql connector拷贝到spark/lib目录下

HiveContext sqlContext = new HiveContext(sc);
sqlContext.sql(“create table if not exists student (name string ,age int)”)
sqlContext.sql(“load data local inpath ‘/usr/local/student.txt’into table students”);
Row[] teenagers = sqlContext.sql(“select name ,age from students where age <= 18”).collect();


将数据保存到表中
Spark sql还允许将数据保存到hive表中。调用Dataframe的saveAsTable,即可将dataframe中的数据保存到hive表中。与registerTempTable不同,saveAsTable是会将dataframe汇总的数据物化到hive表中,而且还会在hive元数据库中创建表的元数据

默认情况下,saveAsTable会创建一张hive managed table,也就是说,数据的位置都是有元数据库中的信息控制的。当managed table 被删除时,表中的数据也会一并内物理删除

RegisterTempTable只是注册一个临时的表,只要spark application重启或者停止了,那么表就没有了。而SaveAsTable创建的是物化的表,无论是spark application重启还是停止,表都会一直存在

调用HiveContext.table()方法,还可以直接针对hive中的表,创建一个dataframe

案例:查询分数大于80分的学生的信息
package com.spark.spark_sql;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.hive.HiveContext;
/**
 * hive数据源
 * @author Administrator
 */
public class HiveDataSource {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("HiveDataSource");
        JavaSparkContext sc = new JavaSparkContext(conf);
        //创建HiveContext,注意,这里接收的是sparkContext作为参数,不是Javasparkcontext
        HiveContext hiveContext = new HiveContext(sc.sc());
        
        //第一个功能,使用HiveContext的sql()/hql()方法,可以执行hive中可以执行的hiveQL语句
        //判断是否存在student_info表,如果存在则删除
        hiveContext.sql("drop  table if exists student_info");
        //判断student_info表是否不存在,不存在就创建该表
        hiveContext.sql("create  table if not exists student_info(name string,age int) row format delimited fields terminated by '	'");
        
        //将学生基本信息导入到Student_info表
        hiveContext.sql("load data local inpath '/home/hadoop/student_info.txt' into table student_info");
        //用同样的方式向student_scores导入数据
        hiveContext.sql("drop  table if exists student_score");
        
        hiveContext.sql("create  table if not exists student_score(name string,score int)row format delimited fields terminated by '	'");
        
        hiveContext.sql("load data local inpath '/home/hadoop/student_score.txt' into table student_score");
        
        //第二个功能个,执行sql还可以返回dataframe,用于查询
        
        //执行sql查询,关联两种表,查询成绩大于80 分的学生信息
        DataFrame goodstudentDF  = hiveContext.sql("select i.name,i.age,s.score from student_info i join student_score s on i.name=s.name where s.score >=80");
        
        //第三个功能,可以将dataframe中的数据,理论上来说,dataframe对应的RDD的元素是ROW就可以
        //将Dataframe中的数据保存到hive表中
        
        //将dataframe中的数据保存到good_student_info表中
        hiveContext.sql("drop  table if exists good_student_info");
        goodstudentDF.saveAsTable("good_student_info");
        
        //第四个功能,可以使用table()方法,针对hive表直接创建dataframe
        
        //针对good_student_info表,直接创建dataframe
        Row[] rows = hiveContext.table("good_student_info").collect();
        for (Row row : rows) {
            System.out.println(row);
        }
        
        sc.close();
    }
}



Scala版本:
package com.spark.spark_sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import org.apache.spark.SparkContext

object HiveDataSource {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("HiveDataSource");
        val sc = new SparkContext(conf)
        //创建HiveContext,注意,这里接收的是sparkContext作为参数,不是Javasparkcontext
        val hiveContext = new HiveContext(sc);
        
        //第一个功能,使用HiveContext的sql()/hql()方法,可以执行hive中可以执行的hiveQL语句
        //判断是否存在student_info表,如果存在则删除
        hiveContext.sql("drop  table if exists student_info");
        //判断student_info表是否不存在,不存在就创建该表
        hiveContext.sql("create  table if not exists student_info(name string,age int) row format delimited fields terminated by '	'");
        
        //将学生基本信息导入到Student_info表
        hiveContext.sql("load data local inpath '/home/hadoop/student_info.txt' into table student_info");
        
        //用同样的方式向student_scores导入数据
        hiveContext.sql("drop  table if exists student_score");
        
        hiveContext.sql("create  table if not exists student_score(name string,score int)row format delimited fields terminated by '	'");
        
        hiveContext.sql("load data local inpath '/home/hadoop/student_score.txt' into table student_score");
        
        //第二个功能个,执行sql还可以返回dataframe,用于查询
        
        //执行sql查询,关联两种表,查询成绩大于80 分的学生信息
        val goodstudentDF  = hiveContext.sql("select i.name,i.age,s.score from student_info i join student_score s on i.name=s.name where s.score >=80");
        
        //第三个功能,可以将dataframe中的数据,理论上来说,dataframe对应的RDD的元素是ROW就可以
        //将Dataframe中的数据保存到hive表中
        
        //将dataframe中的数据保存到good_student_info表中
        hiveContext.sql("drop  table if exists good_student_info");
        goodstudentDF.saveAsTable("good_student_info");
        
        //第四个功能,可以使用table()方法,针对hive表直接创建dataframe
        
        //针对good_student_info表,直接创建dataframe
        val rows = hiveContext.table("good_student_info").collect();
        for ( row <- rows) {
            println(row);
        }
  }
}

六、内置函数
在spark 1.5.x版本,增加了一系列内置函数到dataframe API中,并且实现了code-generation的优化。与普通的函数不同,dataframe的函数并不会执行后立即返回一个结果值,而是返回一个Column对象,用于在并行作业中进行求值。Column可以用在dataframe的操作之中,比如select, filter,groupBy等操作。函数的输入值,也可以是Column。

种类    函数
聚合函数    approxCountDistinct,avg,count,
countDistinct,first,last,max,mean,min,sum,sumDistinct
集合函数    Array_contains,explode,size,sort_array
日期/时间函数    日期时间转换
Unix_timestamp,from_unixtime,to_date
Quarter,day,dayofyear,weekofyear,
From_utc_timestamp,to_utc_timestamp
从日期中提取字段
Year,month,dayofmonth,hour,minute,
second
日期/时间函数    日期时间计算
Dateiff,date_add,date_sub,add_months,
last_day,next_day,months_between
获取当前时间
Current_date,current_timestamp,trunc,
date_format
数学函数    Abs,scros,asin,atan,atan2,bin,cbrt,
ceil,conv,cos,sosh,exp,expm1,factorial,floor,hex,hypot,log,log10,log1p,log2,
Pmod,pow,rint,round,shiftLeft,
shiftRight,shiftRightUnsigned,sifnum,
Sin,sinh,sqrt,tan,tanh,toDegrees,
toRadians,unhex
混合函数    Array,bitwiseNOT,callUDF,coalesce,
crc32,greatest,if,inputFileName,isNaN,
isnotnull,isnull,least,lit,md5,
monotonicalliIncreasingId,nanvl,negate,not,rand,randn,randn
sha,sha1,sparkPartitionId,struct,when
字符串函数    Ascii,base64,concat,concat_ws,decode,encode,format_number,format_string,get_json_object,initcap,instr,length,levenshtein,locate,lower,lpad,ltrim,printf,redexp_extract,regexp_replace,repeat,reverse,rpad,rtrim,soundex,space,split,substring,substring_index,translate,trim,unbase64,upper
窗口函数     cumeDist,denseRank,lag,lead,ntile,percentRank,rank,rowNumber

案例:
根据每天的用户访问和购买日志,统计每日的UV和销售额

统计每日的UV
package com.spark.spark_sql

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.functions._

object DailyUV {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("DailyUV")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    //构造用户 访问日志数据,并创建dataframe
    //要使用spark内置函数,就必须在这里代入sparkcontext下的隐式转换
    import sqlContext.implicits._
    //模拟用户访问日志,日志用逗号隔开,第一列是日期,第二列是用户id
    val userAccessLog = Array("2015-10-01,1122","2015-10-01,1122", "2015-10-01,1123", "2015-10-01,1125","2015-10-01,1125", "2015-10-02,1122", "2015-10-02,1124","2015-10-02,1122",  "2015-10-02,1123")
    val userAccessLogRDD= sc.parallelize(userAccessLog, 1);
    //将模拟出来的用户日志RDD,转换为dataframe
    //首先将普通的RDD转换为row的RDD
    val userAccessLogRowRDD = userAccessLogRDD.map(log =>Row(log.split(",")(0),log.split(",")(1).toInt))
  
    
    //构造dataframe元数据
    val structType = StructType(Array(StructField("date",StringType,true),StructField("userid",IntegerType,true)))
    val userAccessLogRowDF=sqlContext.createDataFrame(userAccessLogRowRDD, structType)
    
    //这里正式使用spark1.5.x版本的提供的最新特性,内置函数,countDistinct
    //每天都有很多用户来访问,但是每个用户可能每天都会访问很多次
    //UV:指的是对用户进行去重以后的访问总数
    //聚合函数的用法;
    //首先对dataframe调用groupBy()方法,对某一列进行分组
    //然后调用agg()方法,第一个参数,必须必须传入之前在groupBy方法中出现的字段
    //第二个参数,传入countDistinct、sum,first等,spark提供的内置函数
    //内置函数中,传入的参数也是用单引号作为前缀的,其他的字段
    
    
    userAccessLogRowDF.groupBy("date").agg('date, countDistinct('userid)).map(row => Row(row(1),row(2))).collect.foreach(println)
        
  }
}


统计每日的销售额
package com.spark.spark_sql

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.functions._

object DailySale {
  
  val conf = new SparkConf().setMaster("local").setAppName("DailySale")
  val sc = new SparkContext(conf)
  val sqlContext = new SQLContext(sc)
  import sqlContext.implicits._
  
  //模拟元数据
  //实际上,有些时候,会出现日志的上报错误,比如日志里丢了用户的信息,那么这种,就一律不统计了
  val userSaleLog = Array("2015-10-01,100,1122","2015-10-01,100,1133","2015-10-01,100,","2015-10-02,300,1122","2015-10-02,200,1122","2015-10-02,100,","2015-10-02,100,1122")

  val userSaleLogRDD = sc.parallelize(userSaleLog, 1);
  
  //进行过滤
  
  val filteredUserSaleRowRDD = userSaleLogRDD.filter(log => if(log.split(",").length ==3) true else false)
  
  
  val userSaleLogRowRDD = filteredUserSaleRowRDD.map(log => Row(log.split(",")(0),log.split(",")(1).toDouble))
  
  val structType = StructType(Array(StructField("date",StringType,true),StructField("sale_amount",DoubleType,true)))
  
  val userSaleLogDF = sqlContext.createDataFrame(userSaleLogRowRDD, structType)
  //开始统计每日销售额的统计
  userSaleLogDF.groupBy("date").agg('date, sum('sale_amount)).map(row => Row(row(1),row(2))).collect().foreach(println)
  
  
}
七、开窗函数
Spark1.4.x版本以后,为spark sql和dataframe引入了开窗函数,比如最经典最常用的row_number(),可以让我们实现分组取topN的逻辑

案例:统计每个种类的销售额排名前3 的产品

package com.spark.spark_sql;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.hive.HiveContext;

/**
 * 开窗函数row_number
 * @author Administrator
 */
public class RowNumberWindowFunction {
public static void main(String[] args) {
    
    SparkConf conf = new SparkConf().setAppName("NewNumberWindowFunction");
    JavaSparkContext sc = new JavaSparkContext(conf);
    HiveContext hiveContext = new HiveContext(sc.sc());
    
    //创建销售表sales表
    hiveContext.sql("drop table if exists sales");
    hiveContext.sql("create table if not exists sales (product string,category string ,revenue bigint) row format delimited fields terminated by '	'");
    hiveContext.sql("load data local inpath '/home/hadoop/sales.txt' into table sales");
    //开始编写我们的统计逻辑,使用row_number()开窗函数
    //先说明一下,row_number()开窗函数的作用
    //其实就是给每一个分组的数据,按照其排序顺序,打上一个分组内的行号
    //比如说,有一个分组date=20151001,里面有三条数据,1122 1121 1124 
    //那么对这个分组的每一行使用row_number()开窗函数以后,三行依次会获得一个组内的行号
    //行号从1开始递增,比如 1122 11121 21124 3
    
    DataFrame top3SalesDF = hiveContext.sql("select product,category,revenue from "
            + "(select product,category,revenue ,row_number() over (partition by category order by revenue desc ) rank from sales) tmp_sales  where rank <=3");
    
    //row_number()开窗函数的语法说明
    //首先,可以在select查询时,使用row_number()函数
    //其次,row_number()函数后面先跟上over关键字
    //然后括号中是partition by 也就是根据那个字段进行分组
    //其次是可以使用order by进行组内排序
    //然后row_number()就可以给每一个组内的行,一个组一个行号
    
    //将每组排名前三的数据,保存到一个表中
    hiveContext.sql("drop table if exists top3_sales");
    top3SalesDF.saveAsTable("top3_sales");
    sc.close();
}
}

函数名(列) OVER(选项) 
八、UDF自定义函数
UDF:User Defined Function.用户自定义函数
package com.spark.spark_sql

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType

object UDF {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("UDF")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    
    //构造模拟数据
    val names =Array("Leo","Marry","Jack","Tom")
    val namesRDD = sc.parallelize(names, 1)
    val namesRowRDD = namesRDD.map(name => Row(name))
    val structType =StructType(Array(StructField("name",StringType)))
    val namesRowDF =sqlContext.createDataFrame(namesRowRDD, structType)
    
    //注册一张name表
    namesRowDF.registerTempTable("names")
    
    //定义和注册自定义函数
    //定义函数
    //注册函数
    sqlContext.udf.register("strLen",(str:String) => str.length)
    
    //使用自己的函数
    sqlContext.sql("select name,strLen(name) from names").collect.foreach(println)

  }
}
九、UDAF自定义聚合函数
UDAF:User Defined Aggregate Function。用户自定义聚合函数,是spark1.5.x引入的最新特性。
UDF针对的是单行输入,返回一个输出,
UDAF,则可以针对多行输入,进行聚合计算,返回一个输出,功能更加的强大

Scala代码:
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._


class StringCount extends UserDefinedAggregateFunction {
  //inputschema   指的是,输入数据的数据
  override def inputSchema: StructType = {
    StructType(Array(StructField("str",StringType,true)))
  }
  //bufferSchema,指的是,中间进行聚合时,所处理的数据类型
  override def bufferSchema: StructType = {
    StructType(Array(StructField("count",IntegerType,true)))
  }

  //dataType,指的是,函数的返回值的类型
  override def dataType: DataType = {
   IntegerType
  }
  override def deterministic: Boolean = {
    true
  }
  //为每一个分组的数据执行初始化操作
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0)=0
  }
  //指的是,每个分组,有新的值进来的时候,如何进行分组,对应的聚合值的计算
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0)=buffer.getAs[Int](0)+1

  }
  //由于spark是分布式的,所以一个分组的数据,可能会在不同的节点上进行局部的聚合,就是update
  //但是,最后一个分组,在各个节点上的聚合值,要执行merge,也就是合并
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0)=buffer1.getAs[Int](0)+buffer2.getAs[Int](0)
  }
  //最后,指的是,一个分组的聚合,如何通过中间的缓存聚合值,随后返回一个最终的聚合值
  override def evaluate(buffer: Row): Any = {
    buffer.getAs[Int](0)
  }
}


十、工作原理以及性能调优

1.工作原理
Sqlparse、Analyser、Optimizer、SparkPlan



2.性能调优
1.设置shuffle过程中的并行度spark.sql.shuffle.partitions(sqlContext.setConf())
2.在hive数据仓库建设过程中,合理设置数据类型,比如能设置为int的就不要设置为bigint.减少数据类型导致的不必要的内存开销
3.编写sql时,尽量给出明确的列名,比如select name from students。不要写select * 的方式
4.并行处理查询结果:对于spark sql查询的结果,如果数据量比较大的话,比如超多1000条,那么就不要一次collect到driver在处理。使用foreach()算子,并行处理查询结果
5.缓存表:对于一条sql语句中可能多次使用到表,可以对其进行缓存没使用sqlContext.cache Table(tableName),或者DataFrame.cache()即可spark sql会用内存列存储的格式进行表的缓存。然后将spark sql就可以仅仅扫描需要使用的列,并且自动优化压缩,来最小化内存使用和GC开销。sqlcontext.uncacheTable(tableName)可以将表从缓存中移除。用sqlcontext.setConf(),设置spark.sql.inMemoryColumnarStorage.batchSize参数(默认10000),可以配置列存储单位

6.广播join表:spark.sql.autoBroadcastJoinThreshold.默认是10485760(10MB),也就是在10M以内的表将其广播出去,在内存足够用的情况下,增加其大小,让更多的表广播出去,就可以将join中的较小的表广播出去,而不是进行网络数据传输了。
7.钨丝计划:spark.sql.tungsten.enable,默认是true 自动管理内存

最有效的就是第四点,缓存表和广播join表也是非常不错的



十一、Hive on spark 
背景:
Hive是目前大数据领域,事实上的sql标准。其底层默认是基于MapReduce实现的。但是由于mapreduce速度是在比较慢。因此这两年,陆续出来了新的sql查询引擎。包括spark sql,hive on Taz ,hive on spark 

Spark sql 与hive on spark 是不一样的,spark sql 自己研发出来的针对各种数据源,包括hive、json、parquet、JDBC、RDD等都可以执行查询的,一套基于spark计算引擎的查询引擎。因此它是spark的一个项目,只不过提供了针对hive执行查询的功能而已。适合在一些使用spark技术栈的大数据应用类系统中使用。

而hive on spark是hive 的一个项目,他是指,不通过mapreudce作为唯一的引擎,而是将spark作为底层的查询引擎,hive on spark ,只适用于hive,在可预见的未来,很有可能hive默认的底层引擎从mapreduce切换为spark,适合于将原有的hive数据仓库以及数据分析替换为spark引擎,作为公司通用大数据统计计算分析引擎

Hive基本的工作原理:
hiveQL语句==>
语法分==>AST==>
生成逻辑执行计划==>operator Tree==>
优化逻辑执行计划==>optimized operator Tree==>
生成物理执行计划==>Task Tree==>
优化物理执行计划==>Optimized Task Tree==>
执行优化后的Optimized  Task Tree



Hive on spark 的计算原理有如下的几个要点:
1.将hive表作为spark RDD来进行操作

2.使用hive 原语
对于一些针对RDD的操作,比如,groupByKey(),sortByKey()等等。不使用spark的transformation操作和原语。如果那样做的话,那么就需要重新实现一套hive的原语,而且hive增加了新功能,那么又要实现新的spark 原语。因此,选择将hive的原语包装为针对RDD的操作即可

3.新的物理执行计划生成机制
使用sparkCompiler将逻辑执行计划,集=即Operator Tree,转换为Task tree。提交spark task给spark进行执行。Sparktask包装了DAG,DAG包装为sparkwork,Sparktask根据sparkwork表示的DAG计算

4.sparkContext生命周期
Hive on spark 会为每一个用户的会话,比如执行一次sql语句,创建一个sparkcontext,但是spark不允许在一个JVM内创建多个sparkcontext,因此,需要在单独额JVM中启动每一个会话的sparkcontext,然后通过RPC与远程JVM中的sparkContext进行通信。

5.本地和远程运行模式
Hive on spark 提供两种运行模式,本地和远程。如果将spark master设置为local






十二、Spark sql与spark core整合
案例:每日top3热点搜索词统计案例
日志格式:
日期、用户、搜索词、城市、平台、版本

需求:
1.筛选出符合条件(城市、平台、版本))的数据
2.统计出每天搜索的UV排名前三的搜索词
3.按照每天的top3搜索词的UV搜总次数,倒序排列
4.将数据保存到hive表中

实现思路分析
1.针对原始的数据(HDFS文件),获取输入RDD
2.使用filter算子,去针对输入RDD中的数据,进行数据过滤,过滤出符合条件的数据
2.1普通的做法:直接在filter算子函数中,使用外部的查询条件(map),但是这样的话,是不是查询条件map,会发送到每一个task上一份副本(性能并不好)
2.2优化后的做法:将查询条件,封装为Broadcast广播变量,在filter算子中使用Broadcast广播变量

3.将数据转换为“日期_搜索词,用户”的格式,然后对他进行分组,再次进行映射。对每天每个搜索词的搜索用户进行去重操作,并统计去重后的数量,即为每天每个搜索词的UV,最后获得(日期_搜索词,UV)

4.将得到的每天的搜索词的UV,RDD映射为元素类型的Row的RDD,将RDD转换为dataframe
5.将dataframe注册为临时表,使用spark sql的开窗函数,来统计每天的UV排名前三的搜索词,以及他的搜索UV,最后获知,是一个dateframe
6.将dateframe转换为RDD。继续操作,按照每天日期进行分组。并进行映射。计算出每天的top3所搜词的搜索uv的总数,然后将UV总数作为key,将每天的top3搜索词以及搜索次数,拼接为一个字符串
7.按照每天的top3搜索总UV,进行排序,倒序排序
8.将排好序的数据,再次映射回来,变成“日期_搜索词_UV”的格式
9.再次映射为dataframe,并将数据保存到hive表中

package com.spark.spark_sql;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import scala.Tuple2;

/**
 * 每日top3热点搜索词统计
 * @author Administrator
 *
 */
public class DailyTop3Keyword {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("DailyTop3Keyword");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        HiveContext hiveContext = new HiveContext(jsc.sc());
        //针对hdfs上的文件中的日志,获取输入的RDD
        JavaRDD<String> rawRDD = jsc.textFile("hdfs://hadoop01:8020/keyword.txt");
        //伪造一份数据,查询条件
        //备注:实际上,在实际的企业的项目开发中,很可能,这个查询条件是J2EE平台发送到MySQL表中的
        //然后,这里实际上通常是会用Spring框架和ORM框架的,去提取MySQL表中的查询条件
        Map<String,List<String>> queryParamMap =new HashMap<String, List<String>>();
        queryParamMap.put("city", Arrays.asList("beijing"));
        queryParamMap.put("platform", Arrays.asList("android"));
        queryParamMap.put("version", Arrays.asList("1.0","1.2","1.5","2.0"));
        
        //将map封装为广播变量,每个work节点只拷贝一份数据即可,这样可以进行优化
        final Broadcast<Map<String, List<String>>> queryParamMapBroadcast = jsc.broadcast(queryParamMap);
        
        //使用广播变量进行筛选
        JavaRDD<String> filterRDD = rawRDD.filter(new Function<String, Boolean>() {
            private static final long serialVersionUID = 1L;

            @Override
            public Boolean call(String log) throws Exception {
                String[] logSplited = log.split("	");
                String city =logSplited[3];
                String platform =logSplited[4];
                String version = logSplited[5];
                //与查询条件进行比较,任何一个
                Map<String, List<String>> queryParamMap = queryParamMapBroadcast.value();
                
                List<String> cities = queryParamMap.get("city");
                if(cities.size()>0 && !cities.contains(city)){
                    return false;
                }
                
                List<String> platforms = queryParamMap.get("city");
                if(platforms.size()>0 && !platforms.contains(platform)){
                    return false;
                }
                List<String> versions = queryParamMap.get("city");
                if(versions.size()>0 && !versions.contains(version)){
                    return false;
                }
                
                return true;
            } 
        });
        
        //将过滤出来的原始的日志进行映射为(日期_搜索词,用户)的格式
        JavaPairRDD<String, String> dateKeywordUserRDD = filterRDD.mapToPair(new PairFunction<String, String, String>() {

            
            private static final long serialVersionUID = 1L;

            @Override
            public Tuple2<String, String> call(String t) throws Exception {
                 String[] logSplited = t.split("	");
                 String date =logSplited[0];
                 String user =logSplited[1];
                 String keyword =logSplited[2];
    
                return new Tuple2<String, String>(date+"_"+keyword, user);
            }
        });
        
        //进行分组,获取每天的搜索词,有哪些用户进行搜索了(没有去重)
        JavaPairRDD<String, Iterable<String>> dateKeywordUsersRDD = dateKeywordUserRDD.groupByKey();
        
        //对每天每个搜索词的搜索用户,执行去重操作,获得其UV值
        JavaPairRDD<String, Long> dateKeywordUVRDD = dateKeywordUsersRDD.mapToPair(new PairFunction<Tuple2<String,Iterable<String>>, String, Long>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Tuple2<String, Long> call(Tuple2<String, Iterable<String>> t) throws Exception {
                String  dateKeyword = t._1;
                Iterator<String> users = t._2.iterator();
                //对用户进行去重,并且统计数量
                List<String> distinctUsers = new ArrayList<String>();
                
                while(users.hasNext()){
                    String user = users.next();
                    if(!distinctUsers.contains(user)){
                        distinctUsers.add(user);
                    }
                }
                //获取uv
                long uv = distinctUsers.size();
                return new Tuple2<String, Long>(dateKeyword, uv);
            }
        });
        
        //将每天每个搜索词的UV数据转换成dataframe
        JavaRDD<Row> dateKeywordUVRowRDD = dateKeywordUVRDD.map(new Function<Tuple2<String,Long>, Row>() {

            @Override
            public Row call(Tuple2<String, Long> v1) throws Exception {
                String date = v1._1.split("_")[0];
                String keyword = v1._1.split("_")[1];
                long uv = v1._2;
                return RowFactory.create(date,keyword,uv);
            }
        });
        
        List<StructField> structFields = Arrays.asList(
                DataTypes.createStructField("date", DataTypes.StringType, true),
                DataTypes.createStructField("keyword", DataTypes.StringType,true),
                DataTypes.createStructField("uv", DataTypes.LongType,true)
                );
        
        StructType structType =DataTypes.createStructType(structFields);
        DataFrame dateKeywordUVDF = hiveContext.createDataFrame(dateKeywordUVRowRDD, structType);
        
        //使用spark sql的开窗函数,统计每天的搜索排名前三的热点搜索词
        dateKeywordUVDF.registerTempTable("daily_keyword_uv");
        
        
        DataFrame dailyTop3KeywordDF = hiveContext.sql("select date,keyword,uv from "
                + "(select date,keyword,uv,row_number over (partition by date order by uv desc ) rank from daily_keyword_uv )tmp "
                + "where rank <=3");
        
        //将dateframe 转换为RDD,然后映射,计算出每天的top3搜索词的搜索uv的总数
        JavaPairRDD<String, String> Top3DateKeywordUvRDD = dailyTop3KeywordDF.javaRDD().mapToPair(new PairFunction<Row, String, String>() {

            @Override
            public Tuple2<String, String> call(Row row) throws Exception {
                String date = String.valueOf(row.get(0));
                String keyword =String.valueOf(row.get(1));
                Long uv = Long.valueOf(String.valueOf(row.get(2)));
                
                return new Tuple2<String, String>(date, keyword+"_"+uv);
            }
        });
        
        JavaPairRDD<String, Iterable<String>> Top3DateKeywordRDD = Top3DateKeywordUvRDD.groupByKey();
        JavaPairRDD<Long, String> uvDateKeywordsRDD = Top3DateKeywordRDD.mapToPair(new PairFunction<Tuple2<String,Iterable<String>>, Long, String>() {

            @Override
            public Tuple2<Long, String> call(Tuple2<String, Iterable<String>> t) throws Exception {
                String date = t._1;
                
                Iterator<String> keywordUvIterator = t._2.iterator();
                Long totalUv = 0L;
                String dateKeywordUv = date;
                while(keywordUvIterator.hasNext()){
                    String keywordUv= keywordUvIterator.next();
                    Long uv = Long.valueOf(keywordUv.split("_")[1]);
                    totalUv+=uv;
                    dateKeywordUv+=","+keywordUv;
                }
                return new Tuple2<Long, String>(totalUv, dateKeywordUv);
            }
        });
        //按照每天的总搜索进行倒序排序
        
        JavaPairRDD<Long, String> sortedUvDateKeywordsRDD = uvDateKeywordsRDD.sortByKey(false);
        
        //在此进行映射,将排序后的数据,映射回原始的格式Iterable<ROW>
        JavaRDD<Row> sortedRowRDD = sortedUvDateKeywordsRDD.flatMap(new FlatMapFunction<Tuple2<Long,String>, Row>() {

            @Override
            public Iterable<Row> call(Tuple2<Long, String> t) throws Exception {
                String dateKeywords=t._2;
                String[] dateKeywordsSplited = dateKeywords.split(",");
                String date = dateKeywordsSplited[0];
                List<Row> rows= new ArrayList<Row>();
                rows.add(RowFactory.create(date,
                        dateKeywordsSplited[1].split("_")[0],
                        Long.valueOf(dateKeywordsSplited[1].split("_")[1])
                        ));
                rows.add(RowFactory.create(date,
                        dateKeywordsSplited[1].split("_")[0],
                        Long.valueOf(dateKeywordsSplited[2].split("_")[1])
                        ));
                rows.add(RowFactory.create(date,
                        dateKeywordsSplited[1].split("_")[0],
                        Long.valueOf(dateKeywordsSplited[3].split("_")[1])
                        ));    
                return rows;
            }
        });
        
        //将最终的数据转换为dateframe
        DataFrame finalDF = hiveContext.createDataFrame(sortedRowRDD, structType);
        
        finalDF.saveAsTable("daily_top3_keyword_uv");
        jsc.close();
    }
}