Spark SQL
一、sparkSQL的特点
1.支持多种数据源:hive RDD Partquet JSON JDBC
2.多种性能优化技术:in-memory columnar storage byte-code generation cost model 动态评估
3.组件扩展性:对于SQL的语法解析器、分析器、以及优化器,用户都可以自己重新开发,并且动态扩展
Spark sql 的性能优化技术简介
1.内存列存储(in-memory columnar storage)
内存列存储意味着spark sql的数据,不是使用java对象的方式进行存储,而是使用面向列存储的方式进行存储,也就是说,每一列,作为一个数据存储的单位,从而大大优化内存使用的效率。使用列存储之后,减少对内存的消耗,也就避免了对GC(垃圾回收)大量数据的性能开销
2.字节码生成技术(byte-code generation )
Spark sql在其catalyst模块的Expressions中增加一codegen模块,对于sql语句中的计算表达式,比如select num + num from t 这种sql,就可以使用动态字节码生成技术来优化其性能。
3.Scala代码编写的优化
对于Scala代码编写中,可能会造成大量的性能的开销,自己重写,使用更加复杂的方式,来获取更好的性能。比如option样例类、for循环、map/filter/foreach等高阶函数,以及不可变对象,都改成用null,while循环来实现,并且重用可变的对象
二、dataframe的使用
1.spark sql 和 dataframe引言
Spark sql 是spark中的一个模块,主要是进行结构化数据的处理。他提供的最核心的编程抽象,就是dataframe。同时spark sql 还可以作为分布式的sql查询引擎。Spark sql最重要的功能之一就是从hive中查询数据
Dataframe,可以理解为时,以列的形式组织的,分布式的数据集合,他其实和关系型数据库中的表非常类似,但是底层做了很对的优化。Dataframe可以通过很多来源,包括:结构化数据文件,hive表,外部关系型数据库以及RDD
2.SQLContext
要使用spark sql ,首先就得创建一个SQLContext对象,或者是他的子类的对象(HiveContext),比如HiveContext对象;
Java版本:
JavaSparkContext sc = ....;
SQLContext SQLContext = new SQLContext(sc);
Scala版本:
Val sc = SparkContext..
Val SQLContext = new SQLContext(sc)
Import SQLContext.implicits._
3.HiveContext
除了基本的SQLContext以外,还可以使用它的子类---HiveContext。HiveContext的功能除了包含SQLContext提供的所有的功能外,还包括额外的专门针对hive的一些功能。这些额外的功能包括:使用hive语法编写和执行sql,使用hive的UDF函数,从hive表中读取数据
要使用HiveContext,就必须预先安装好hive,SQLContext支持的数据源,HiveContext也同样支持,而不只是支持hive,对spark1.3.x以上的版本,都推荐使用HiveContext,因为其功能更加丰富和完善
Spark sql 还支持使用spark.sql.dialect参数设置sql方言,使用SQLContext的serConf()即可进行设置。对与SQLContext,他只支持”sql”一种方言。对于HiveContext,它默认的方言是“hiveql”
4.创建Dataframe
使用SQLContext,可以从RDD、hive表中或者其他额数据源,来创建dataframe。
Java版本:
package com.spark.spark_sql;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
/**
* 使用json文件创建dataframe
* @author Administrator
*/
public class DataFrameCreate {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("DataFrameCreate");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext SQLContext = new SQLContext(sc);
DataFrame df = SQLContext.read().json("hdfs://hadoop01:8020/spark_input/students.json");
df.show();
}
}
提交至集群:运行打包,上传。编写脚本进行提交
【这里是一台机器测试】
bin/spark-submit
--class com.spark.spark_sql.DataFrameCreate
--files /opt/modules/apache-hive-0.13.1-bin/conf/hive-site.xml
--driver-class-path /opt/softwares/mysql-connector-java-5.1.27-bin.jar
/opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6/sql/sparksql_01.jar
运行的结果
+---+---+--------+
|age| id| name|
+---+---+--------+
| 10| 1| leo|
| 25| 2| kity|
| 30| 4| lucy|
| 20| 3| tom|
| 18| 7| jack|
| 23| 10| edison|
| 36| 5| owen|
| 20| 8| jiny|
| 40| 6| lisi|
| 45| 9|zhangsan|
+---+---+--------+
Scala版本:
package com.spark.spark_sql
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
object DataFrameCreate {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("DataFrameCreate")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.json("hdfs://hadoop01:8020/spark_input/student.json")
df.show
}
}
Dataframe常用的操作
java
package com.spark.spark_sql;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
/**
* Dataframe的常用的操作
* @author Administrator
*
*/
public class DataFrameOperation {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("DataFrameOperation");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext SQLContext = new SQLContext(sc);
//创建出来的dataframe完全可以可以理解为一张表
DataFrame df = SQLContext.read().json("hdfs://hadoop01:8020/input/students.txt");
//打印dataframe中所有的数据
df.show();
//打印dataframe中元数据的信息
df.printSchema();
//查询某列所有的数据
df.select("name").show();
//查询某几列所有的数据,并对列进行计算
df.select(df.col("name"), df.col("age").plus(1)).show();//将查询出来的age进行加一
//根据某一列的值进行过滤
df.filter(df.col("age").gt(30)).show(); //年龄大于30的进行过滤
//根据某一列进行分组聚合
df.groupBy(df.col("age")).count().show();
}
}
Scala
package com.spark.spark_sql
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
object DataframeOperation {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("DataframeOperation")
val sc = new SparkContext(conf)
val SQLContext = new SQLContext(sc)
val df = SQLContext.read.json("hdfs://hadoop01:8020/spark_input/student.json"
df.show
df.printSchema()
df.select("name").show
df.select(df("name"),df("age")+1).show
df.filter(df("age") > 30).show
df.groupBy("age").count().show
}
}
5.RDD与Dataframe之间转换
为什么要将RDD转换为Dataframe?因为这样的话,我们就可以直接针对hdfs上的任何可以构建为RDD的数据,使用spark sql 进行sql查询,这个功能无比强大。想象一下,针对hdfs中的数据,直接就可以使用sql进行查询
Spark sql 支持两种方式来将RDD转换为Dataframe
第一种方式,是使用反射来推断包含了特定数据类型的RDD的元数据,这种基于反射的方式,代码比较简单,当你已经知道你的RDD的元数据时,是一种非常不错的方式。
第二种方式,是通过编程接口来创建dataframe,你可以在程序运行的时候动态构建一份元数据,然后将其应用到已经存在的RDD上,这种方式的代码比较冗长,但是如果在编写程序时,还不知道RDD的元数据,只有在程序运行时,才能动态得知元数据,那么只能通过这种动态构建元数据的方式。
1.使用反射的方式推断元数据
Java版本:spark sql是支持将包含javaBean的RDD转换为dataframe的,Javabean的信息,就定义了元数据。Spark sql 现在是不支持包含嵌套javabean或者list等复杂元数据的Javabean。
Scala版本:而scala由于具有隐式转换的特性,所以spark sql的scala接口,是支持自动将包含case class 的RDD转换为Dataframe的。Case class 就定义了元数据。Spark sql 会通过反射读取传递给 case class 的参数的名称,然后将其作为列名。与java不同的是,Spark sql是支持将包含了嵌套的数据结构的case class作为元数据的,比如包含了Array等。
Java版本:
package com.spark.spark_sql;
import java.io.Serializable;
public class Student implements Serializable {
private static final long serialVersionUID = 1L;
private int id;
private String name;
private int age;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public Student() {
}
public Student(int id, String name, int age) {
super();
this.id = id;
this.name = name;
this.age = age;
}
@Override
public String toString() {
return "Student [;
}
}
package com.spark.spark_sql;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
/**
* 使用反射的方式将RDD转换为dataframe
* @author Administrator
*
*/
public class RDD2DataframeReflection {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("RDD2DataframeReflection");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext SQLContext = new SQLContext(sc);
JavaRDD<String> lines = sc.textFile("E://student.txt");
JavaRDD<Student> students = lines.map(new Function<String, Student>() {
private static final long serialVersionUID = 1L;
@Override
public Student call(String line) throws Exception {
String[] split = line.split(",");
Student stu = new Student();
stu.setId(Integer.parseInt(split[0]));
stu.setName(split[1]);
stu.setAge(Integer.parseInt(split[2]));
return stu;
}
});
//使用反射的方式将RDD转换为dataframe
//将student.class传入进去其实就是通过反射的方式来创建dataframe
//因为student.class 本身就是反射的一个应用
//然后底层还得通过student class 进行反射。来获取其中的fields
//这里要求Javabean要实现Serializable接口,可以序列化
DataFrame studentDF = sqlContext.createDataFrame(students, Student.class);
//拿到一个dataframe之后,就可以将其注册为一张临时表,然后针对其中的数据进行sql语句
studentDF.registerTempTable("student");
//针对student临时表执行sql语句,查询年龄大于20岁的学生
DataFrame df =sqlContext.sql("select * from student where age > 20");
//将查询出来的dataframe再次转换为RDD
JavaRDD<Row> teenagerRDD = df.javaRDD();
//将RDD中的数据,进行映射,给每个人的年龄,然后映射为student
JavaRDD<Student> teenagerStudentRDD = teenagerRDD.map(new Function<Row, Student>() {
private static final long serialVersionUID = 1L;
@Override
public Student call(Row row) throws Exception {
//row中的顺序是按照字典顺序进行排列的
Student student = new Student();
student.setAge(row.getInt(0));
student.setId(row.getInt(1));
student.setName(row.getString(2));
return student;
}
});
//将数据collect 回来,打印出来
List<Student> studentList = teenagerStudentRDD.collect();
for (Student student : studentList) {
System.out.println(student);
}
}
}
Scala版本:
package com.spark.spark_sql
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
/**
* 如果要使用scala开发spark程序
* 然后在其中还要实现基于反射的RDD到dataframe的转换,就必须得用object extends App的方式
* 不能使用def main()方法的方式来运行程序,否则就会报错no typetag for ... class
*/
object RDD2DaframeReflection extends App{
val conf = new SparkConf().setMaster("local").setAppName("RDD2DaframeReflection")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val lines = sc.textFile("e://student.txt", 1)
//在scala中使用反射的方式,进行RDD到Dataframe的转换,需要手动的导入一个隐士转换
import SQLContext.implicits._
case class Student(id : Int ,name : String ,age : Int)
//这里其实就是一个普通的,元素是case class 的rdd
val students = lines.map(line =>line.split(",")).map(arr => Student(arr(0).trim().toInt,arr(1),arr(2).trim().toInt))
//直接使用RDD的toDF,即可将其转换为dataframe
val studentDF=students.toDF()
//注册为一个临时表
studentDF.registerTempTable("student")
//
val teenagerDF = sqlContext.sql("select * from student where age > 20")
val teenagerRDD = teenagerDF.rdd
//在scala中,row中的数据的顺序,反而是按照我们期望的来排列的,这个是跟java是不一样的
teenagerRDD.map{row => Student(row(0).toString().toInt,row(1).toString(),row(2).toString().toInt )
}.collect().foreach(stu => print(stu.id+ ":" +stu.name+":"+stu.age))
//在scala中,对row的使用比java中的row更加的丰富
//在scala中,可以用row的getAs()方法,获取指定列名的列
teenagerRDD.map(row =>Student(row.getAs[Int]("id"),row.getAs("name"),row.getAs("age"))).collect()
.foreach(stu => print(stu.id+ ":" +stu.name+":"+stu.age))
//还可以通过row的getValuesMap,方法,获取指定几列的值,返回的是一个map
teenagerRDD.map(row => {
val map = row.getValuesMap(Array("id","name","age"))
Student(map("id").toString().toInt,map("name").toString(),map("age").toString().toInt)
}).collect().foreach(stu => print(stu.id+ ":" +stu.name+":"+stu.age))
}
2.通过编程接口来创建dataframe
Java版本:
package com.spark.spark_sql;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
/**
* 以编程的方式动态的执行元数据,将RDD转化为dataframe
* @author Administrator
*
*/
public class RDD2DataProgrammatically {
public static void main(String[] args) {
//创建sparkconf
SparkConf conf = new SparkConf().setMaster("local").setAppName("RDD2DataProgrammatically");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
//第一步,创建一个普通的RDD,但是必须将其转换为RDD<Row>的格式
JavaRDD<String> lines = sc.textFile("e://student.txt");
JavaRDD<Row> rows = lines.map(new Function<String, Row>() {
private static final long serialVersionUID = 1L;
@Override
public Row call(String line) throws Exception {
String[] split = line.split(",");
return RowFactory.create(Integer.parseInt(split[0]),split[1],Integer.parseInt(split[2]));
}
});
//第二步,动态元数构造据
//比如说,id name age 等fields的名称和类型都是在程序运行过程中,
//动态的从MySQL等DB中或者是配置文件中,加载出来的,是不固定的
//所以特别适合这种编程的方式来构造元数据
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));
fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
StructType structType = DataTypes.createStructType(fields);
//第三部,使用动态构造元数据,将RDD转换为dataframe
DataFrame studentDF = sqlContext.createDataFrame(rows, structType);
//后面就可以直接使用这个df了
//注册临时表
studentDF.registerTempTable("student");
DataFrame stus = sqlContext.sql("select * from student where age > 20");
List<Row> list = stus.javaRDD().collect();
for (Row row : list) {
System.out.println(row);
}
}
}
Scala版本:
package com.spark.spark_sql
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType
object RDD2DataframeProgramatically extends App{
val conf = new SparkConf().setMaster("local").setAppName("RDD2DataframeProgramatically");
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
//第一步:构造出元素为Row的普通的RDD
val studentRDD = sc.textFile("e://student.txt", 1)
.map(line => Row(
line.split(",")(0).toInt,
line.split(",")(1),
line.split(",")(2).toInt))
//第二步:以编程的方式构造元数据
val structType =StructType(Array(
StructField("id",IntegerType,true),
StructField("name",StringType,true),
StructField("age",IntegerType,true)))
//第三步:进行RDD到dataframe的转换
val studentDF = SQLContext.createDataFrame(studentRDD,structType )
studentDF.registerTempTable("student");
val teenagerDF = sqlContext.sql("select * from student where age > 20")
teenagerDF.rdd.collect.foreach(row => println(row))
}
6.通用的load和save操作
1.dataframe的load和save
对于的spark sql的dataframe来说,无论是从什么数据源创建出来的dataframe,都有一些共同的load和save操作,load操作主要是用于加载数据,创建出来的dataframe:save操作,主要用于将dataframe中的数据集保存到文件中(保存到的是一个目录中)
Java版本:
Dataframe df = sqlContext.read().load(“users.parquet”);
df.select(“name”,”facourite_color”).write().save(“nameAndFav_color_dir”);
Scala版本:
Val df = sqlContext.read.load(“users.parquet”)
Df.select(“name”,”facourite_color”).write().save(“nameAndFav_color_dir”)
Java版本:
package com.spark.spark_sql;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
/**
* 通用的load和save操作
* @author Administrator
*/
public class GenericLoadSave {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("GenericLoadSave");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
DataFrame usersDF = SQLContext.read().load("C://Users//Administrator//Desktop//users.parquet");
usersDF.printSchema();
usersDF.show();
usersDF.select("name","favourite_color").write().save("e://users2");
}
}
Scala版本:
package com.spark.spark_sql
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import scala.tools.scalap.Main
import org.apache.spark.sql.DataFrame
object GenericLoadSave {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("GenericLoadSave")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val usersDF = sqlContext.read.load("hdfs://hadoop01:8020/spark_input/users.parquet")
usersDF.select("name","favourite_color").write.save("hdfs://hadoop01:8020/spark_output")
}
}
2.手动指定数据源:
也可以手动指定用来操作的数据源,数据源通常是使用其权限定名来指定,比如parquet是org.apache.spark.sql.parquet。但是spark sql内置了一些数据源类型,比如json,parquet.jdbc等等,实际上,通过这个功能,就可以在不同类型的数据源之间进行转换了。比如将json文件中的数据存储到parquet文件中。默认情况下,如果不指定数据源,默认就是parquet
Java版本:
DataFrame df =SQLContext.read().format(“json”).load(“people,json”)
Df.select(“name”,”age”).write().format(“parquet”).save(“out”)
Scala版本:
Val df = SQLContext.read.format(“json”).load(“people,json”)
Df.select(“name”,”age”).write.format(“parquet”).save(“out”)
Java版本
package com.spark.spark_sql;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
/**
* 手动指定数据源
* @author Administrator
*/
public class ManuallySpecifyOptions {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("ManuallySpecifyOptions");
SparkContext sc = new SparkContext(conf);
SQLContext SQLContext = new SQLContext(sc);
DataFrame df = SQLContext.read().format("json").load("e://users.parquet");
df.write().format("json").save("e://out");
}
}
3.Save mode
Spark sql 对于save操作,提供了不同的save mode.主要用来处理,当目标位置已经有数据时,应该如何处理。而且save操作并不会执行锁操作,并且不是原子的,因此是有一定风险出现脏数据的
Save mode 意义
SaveMode.ErrorIfExists(默认) 如果目标位置存在数据,那么就抛出异常
SaveMode.Append 如果目标位置存在数据,那么就将数据追加进去
SaveMode.Overwrite 如果目标位置存在数据,那么就将已经存在的数据删除,用新的数据进行覆盖
SaveMode.Ignore 如果目标位置存在数据,那么就忽略,不做任何的操作
Java版本:
package com.spark.spark_sql;
import org.apache.derby.impl.tools.sysinfo.Main;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
/**
* savemode 示例
* @author Administrator
*/
public class SaveModeDemo {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("SaveModeDemo");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
DataFrame df = sqlContext.read().format("json").load("e://users.parquet");
df.save("e://out", SaveMode.Append);
}
}
三、Parquet数据源
1.使用编程方式加载数据
Parquet是面向分析型业务的列式存储格式,由Twitter和cloudera合作开发。2015年成为Apache的顶级项目
列式存储和行式存储相比较有哪些优点;
1.可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量
2.压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码(例如Run Length Encoding 和 Delta Encoding)进一步节约磁盘空间。
3.只读取需要的列,支持向量运算,能够获取更好的扫描性能
Java版本:
package com.spark.spark_sql;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
/**
* parquet数据源之使用编程方式加载数据
*/
public class ParquetLoadData {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("ParquetLoadData");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
//读取parquet文件中的数据,创建一个dataframe
DataFrame userDF = sqlContext.read().parquet("e://users.parquet");
//将其注册为临时表,使用sql查询所需要的数据
userDF.registerTempTable("users");
DataFrame userNamesDF = sqlContext.sql("select name from users");
//对查询出来的dataframe进行transformation操作,处理数据,然后打印出来
List<String> userNames = userNamesDF.javaRDD().map(new Function<Row, String>() {
private static final long serialVersionUID = 1L;
@Override
public String call(Row row) throws Exception {
return "Name: "+row.getString(0);
}
}).collect();
for (String name : userNames) {
System.out.println(name);
}
}
}
Scala版本:
package com.spark.spark_sql
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
object PatquetLoadData {
val conf = new SparkConf().setAppName("PatquetLoadData")
val sc = new SparkContext(conf)
val sqlContext =new SQLContext(sc)
val usersDF = sqlContext.read.parquet("hdfs://hadoop01:8020/spark_input/users.parquet");
usersDF.registerTempTable("user");
val userNameDF = sqlContext.sql("select * from user")
userNameDF.rdd.map(row => "Name: "+row(0)).collect.foreach(userName =>println(userName))
}
2.自动分区推断
表分区是一张常见的优化的方式,比如hive中就提供分区表的特性。在一个分区表中,不同分区的数据通常存储在不同的目录中,分区列的值通常就包含在了分区的目录名中。Spark sql中的parquet数据源,支持自动根据目录名推断出分区信息。例如,如果将入口数据存储在分区表中,并且使用性别和国家作为分区列。那么目录结构可能是如下所示:
|----tableName
|----gender=male
|----country=US
|.....
|----country=ZH
|----gender=female
|--country=...
|...
如果将tableName传入SQLContext.read.parquet()或者SQLContext.read.load()方法,那么sparksql就会自动根据目录的结构,推断出分区的信息,是gender和country。即使数据文件中包含两列的值name和age,但是sparksql返回的dataframe,调用printSchema()方法时,会打印出四个列的值:name age country gender .这就是自动分区推断的功能
此外,分区的列的数据类型,也是自动被推断出来的。目前,spark sql仅支持自动推断出数字类型和字符串类型。有时,用户也许不希望spark sql自动推断分区列的数据类型。此时只要设置一个配置即可,spark.sql.sources.partitionColumnTypeInference.enabled,默认是true,即自动推断分区列的类型,设置为false,即不会自动推断类型。进行自定推断分区列的类型时,所有的分区列的类型,就统一默认的是String
案列:
1.hdfs上创建相对应的目录结构:
bin/hdfs dfs -mkdir -p /spark_input/gender=male/country=US
2.将文件上传到目录下
bin/hdfs dfs -put users.parquet /spark_input/gender=male/country=US/
3.查询出schema
package com.spark.spark_sql;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
/**
*parquet数据源之 自动推断分区
*/
public class ParquetPartitionDiscovery {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("ParquetPartitionDiscovery").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
DataFrame userDF = sqlContext.read().parquet("hdfs://hadoop01:8020/spark_input/gender=male/country=US/users.parquet");
userDF.printSchema();
userDF.show();
}
}
4.结果:
root
|-- name: binary (nullable = true)
|-- favourite_color: binary (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)
+----------------+-------------------+------+-------+
| name| favourite_color|gender|country|
+----------------+-------------------+------+-------+
| [6C 65 6F]| [72 65 64]| male| US|
| [6A 61 63 6B]|[79 65 6C 6C 6F 77]| male| US|
|[6B 69 74 74 79]| [77 68 69 74 65]| male| US|
| [74 6F 6D]| [67 72 65 65 6E]| male| US|
| [61 6C 6C]| [70 69 6E 6B]| male| US|
+----------------+-------------------+------+-------+
【注意】前面是因为parquet数据的问题,自己从hive中导出来的。可以利用json格式的数据load,然后save的时候以parquet的方式生成一个parquet文件,用于测试
3.合并元数据
如同ProtocolBuffer,Avro,Thrift一样,Parquet也是支持元数据合并的。用户可以一开始就定义一个简单的元数据,然后随着业务需要。逐渐往元数据中添加更多的列,在这种情况下,用户可能会创建多个parquet文件,有着多个不同的却互相兼容的元数据。Parquet数据源支持自动推断这种情况,并且进行多个parquet文件的元数据的合并
因为元数据合并是一种相对耗时的操作,而且在大多数的情况下不是一种必要的特性,从spark1.5.0版本开始,默认是关闭parquet文件的自动合并元数据的。可以通过以下的两种方式开启parquet数据源的自动合并元数据的特性
1.读取parquet文件时,将数据源的选项,mergeSchema,设置为true
2.根据sqlContext.setConf()方法,将spark.paquet.mergeSchema参数设置为true
案例:
合并学生的基本信息和成绩信息的元数据
package com.spark.spark_sql
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SaveMode
object ParquetMergeSchema {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("ParquetMergeSchema")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import SQLContext.implicits._
//创建一个dataframe,作为学生的基本信息,并写入一个parquet文件中
val studentsWithNameAge =Array(("leo",23),("jack",25))
val studentsWithNameAgeDF = sc.parallelize(studentsWithNameAge, 2).toDF("name","age")
studentsWithNameAgeDF.save("hdfs://hadoop01:8020/spark_out/students","parquet", SaveMode.Append)
//创建一个dataframe,作为学生的成绩信息,并写入一个parquet文件中
val studentsWithNameGrade =Array(("marry","A"),("jack","B"))
val studentsWithNameGradeDF = sc.parallelize(studentsWithNameGrade, 2).toDF("name","age")
studentsWithNameGradeDF.save("hdfs://hadoop01:8020/spark_out/students","parquet", SaveMode.Append)
//首先,第一个dataframe和第二个dataframe的元数据肯定是不一样的
//一个包含了name和age两个列,一个是包含name和grade两个列
//所以,这期望的是,读取出来的表数据,自动合并两个文件的元数据,出现三列,name age grade
//用mergeSchema的方式,读取students表中的数据,进行元数据的合并
val studentsDF = sqlContext.read.option("mergeSchema", "true").parquet("hdfs://hadoop01:8020/spark_out/students")
studentsDF.printSchema();
studentsDF.show();
}
}
四、JSON数据源
Spark sql 可以自动推断JSON文件的元数据,并且加载其数据,创建一个dataframe。可以使用SQLContext.read.json()方法,针对一个元素为String的RDD,或者是一个JSON文件
但是要注意的是,这里使用的JSON文件与传统意义上的JSON文件是不一样的。每行都必须也只能包含一个,单独的,自包含的,有效的JSON对象。不能让json对象分散在多行。否则会报错
综合性复杂案例:查询成绩为80分以上的学生的基本信息与成绩信息
Java 版本
package com.spark.spark_sql;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
/**
* json数据源
*/
public class JSONDataSource {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("JSONDataSource").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
// 针对json文件,创建出dataframe
DataFrame studentDF = sqlContext.read().json("e://student.json"); //hdfs://hadoop01:8020/spark_input/student.json
// 针对学生成绩信息的dataframe,注册临时表,查询分数大于80分的学生的姓名和分数
studentDF.registerTempTable("student");
DataFrame stuNameDF = sqlContext.sql("select name,score from student where score > 80");
List<String> goodName = stuNameDF.javaRDD().map(new Function<Row, String>() {
private static final long serialVersionUID = 1L;
@Override
public String call(Row row) throws Exception {
return row.getString(0);
}
}).collect();
// 针对JavaRDD<String> 创建dataframe
List<String> studentInfoJSONs = new ArrayList<String>();
studentInfoJSONs.add("{"name":"leo","age":18}");
studentInfoJSONs.add("{"name":"marry","age":15}");
studentInfoJSONs.add("{"name":"jack","age":30}");
JavaRDD<String> studentInfoJSONsRDD = sc.parallelize(studentInfoJSONs);
DataFrame studentInfoDF = sqlContext.read().json(studentInfoJSONsRDD);
// 针对学生的基本信息的dataframe,注册临时表,然后查询分数大于80分的学生的基本信息
studentInfoDF.registerTempTable("info");
String sql = "select name , age from info where name in (";
for (int i = 0; i < goodName.size(); i++) {
sql += "'" + goodName.get(i) + "'";
if (i < goodName.size() - 1) {
sql += ",";
}
}
sql += ")";
DataFrame goodStuInfosDF = sqlContext.sql(sql);
// 将两份数据的dataframe转换为javapairRDD,执行join transformation
JavaPairRDD<String, Tuple2<Integer, Integer>> pairs = goodStuInfosDF.javaRDD()
.mapToPair(new PairFunction<Row, String, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(Row row) throws Exception {
return new Tuple2<String, Integer>(row.getString(0), Integer.valueOf((int) row.getLong(1)));
}
}).join(stuNameDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(Row row) throws Exception {
return new Tuple2<String, Integer>(row.getString(0), Integer.valueOf((int) row.getLong(1)));
}
}));
//将封装在RDD中好学生的全部信息,转换为一个javaRDD<Row>的格式
JavaRDD<Row> rows = pairs.map(new Function<Tuple2<String,Tuple2<Integer,Integer>>, Row>() {
private static final long serialVersionUID = 1L;
@Override
public Row call(Tuple2<String, Tuple2<Integer, Integer>> t) throws Exception {
return RowFactory.create(t._1,t._2._2,t._2._1);
}
});
//创建一份元数据,将JavaRDD<Row>转换为dataframe
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("score", DataTypes.IntegerType, true));
fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
StructType structType = DataTypes.createStructType(fields);
DataFrame goodStudentsDF =sqlContext.createDataFrame(rows, structType);
//将好学生的全部信息保存到json文件中去
goodStudentsDF.write().format("json").save("e://good"); //hdfs://hadoop01:8020/spark_out/goodStudent
}
}
Scala版本:
package com.spark.spark_sql
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.IntegerType
object JSONDatasource {
val conf = new SparkConf().setAppName("JSONDatasource")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val studentScoreDF =sqlContext.read.json("hdfs://hadoop01:8020/spark_input/student.json")
studentScoreDF.registerTempTable("stu_score")
val goodStuScoreDF = sqlContext.sql("select name , score from stu_score where score > 80")
val goodStuNames = goodStuScoreDF.map(row => row(0)).collect()
val studentInfoJSON =Array("{"name":"Leo","age":18}","{"name":"marry","age":15}","{"name":"jack","age":30}")
val studentInfoRDD =sc.parallelize(studentInfoJSON, 3)
val studentInfoDF = sqlContext.read.json(studentInfoRDD)//json格式特有的,可以接受RDD
studentInfoDF.registerTempTable("stu_info")
var sql = "select name , age from stu_info where name in ("
for(i <- 0 until goodStuNames.length){
sql += "'"+goodStuNames(i)+"'"
if(i < goodStuNames.length-1){
sql += ","
}
}
sql += ")"
val goodStuInfoDF = sqlContext.sql(sql)
val goodStuRDD = goodStuInfoDF.rdd.map{row => (row.getAs[String]("name"),row.getAs[Long]("age"))}.join (goodStuScoreDF.rdd.map{row => (row.getAs[String]("name"),row.getAs[Long]("score"))})
val goodStuRows = goodStuRDD.map(info => Row(info._1,info._2._2.toInt,info._2._1.toInt))
val structType = StructType(Array(StructField("name",StringType,true),StructField("score",IntegerType,true),StructField("age",IntegerType,true)))
val goodStudentDF = sqlContext.createDataFrame(goodStuRows, structType)
goodStudentDF.write.format("json").save("hdfs://hadoop01:8020/spark_out/good_scala")
}
五、Hive数据源
Spark sql 支持对hive中存储的数据进行读写,操作hive中的数据时,就必须创建HiveContext,而不是SQLContext。HiveContext继承SQLContext,但是增加了在hive元数据库中查找表,以及用hiveql语法编写SQL的功能。处理sql()方法外,HiveContext还提供hql()方法,从而用hive语法来编译sql。
使用HiveContext,可以执行Hive的大部分功能,包括创建表、往表里导入数据以及用SQL语句查询表中的数据,查询出来的数据是一个row数组
将hive-site.xml拷贝到spark/conf目录下,将mysql connector拷贝到spark/lib目录下
HiveContext sqlContext = new HiveContext(sc);
sqlContext.sql(“create table if not exists student (name string ,age int)”)
sqlContext.sql(“load data local inpath ‘/usr/local/student.txt’into table students”);
Row[] teenagers = sqlContext.sql(“select name ,age from students where age <= 18”).collect();
将数据保存到表中
Spark sql还允许将数据保存到hive表中。调用Dataframe的saveAsTable,即可将dataframe中的数据保存到hive表中。与registerTempTable不同,saveAsTable是会将dataframe汇总的数据物化到hive表中,而且还会在hive元数据库中创建表的元数据
默认情况下,saveAsTable会创建一张hive managed table,也就是说,数据的位置都是有元数据库中的信息控制的。当managed table 被删除时,表中的数据也会一并内物理删除
RegisterTempTable只是注册一个临时的表,只要spark application重启或者停止了,那么表就没有了。而SaveAsTable创建的是物化的表,无论是spark application重启还是停止,表都会一直存在
调用HiveContext.table()方法,还可以直接针对hive中的表,创建一个dataframe
案例:查询分数大于80分的学生的信息
package com.spark.spark_sql;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.hive.HiveContext;
/**
* hive数据源
* @author Administrator
*/
public class HiveDataSource {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("HiveDataSource");
JavaSparkContext sc = new JavaSparkContext(conf);
//创建HiveContext,注意,这里接收的是sparkContext作为参数,不是Javasparkcontext
HiveContext hiveContext = new HiveContext(sc.sc());
//第一个功能,使用HiveContext的sql()/hql()方法,可以执行hive中可以执行的hiveQL语句
//判断是否存在student_info表,如果存在则删除
hiveContext.sql("drop table if exists student_info");
//判断student_info表是否不存在,不存在就创建该表
hiveContext.sql("create table if not exists student_info(name string,age int) row format delimited fields terminated by ' '");
//将学生基本信息导入到Student_info表
hiveContext.sql("load data local inpath '/home/hadoop/student_info.txt' into table student_info");
//用同样的方式向student_scores导入数据
hiveContext.sql("drop table if exists student_score");
hiveContext.sql("create table if not exists student_score(name string,score int)row format delimited fields terminated by ' '");
hiveContext.sql("load data local inpath '/home/hadoop/student_score.txt' into table student_score");
//第二个功能个,执行sql还可以返回dataframe,用于查询
//执行sql查询,关联两种表,查询成绩大于80 分的学生信息
DataFrame goodstudentDF = hiveContext.sql("select i.name,i.age,s.score from student_info i join student_score s on i.name=s.name where s.score >=80");
//第三个功能,可以将dataframe中的数据,理论上来说,dataframe对应的RDD的元素是ROW就可以
//将Dataframe中的数据保存到hive表中
//将dataframe中的数据保存到good_student_info表中
hiveContext.sql("drop table if exists good_student_info");
goodstudentDF.saveAsTable("good_student_info");
//第四个功能,可以使用table()方法,针对hive表直接创建dataframe
//针对good_student_info表,直接创建dataframe
Row[] rows = hiveContext.table("good_student_info").collect();
for (Row row : rows) {
System.out.println(row);
}
sc.close();
}
}
Scala版本:
package com.spark.spark_sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import org.apache.spark.SparkContext
object HiveDataSource {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("HiveDataSource");
val sc = new SparkContext(conf)
//创建HiveContext,注意,这里接收的是sparkContext作为参数,不是Javasparkcontext
val hiveContext = new HiveContext(sc);
//第一个功能,使用HiveContext的sql()/hql()方法,可以执行hive中可以执行的hiveQL语句
//判断是否存在student_info表,如果存在则删除
hiveContext.sql("drop table if exists student_info");
//判断student_info表是否不存在,不存在就创建该表
hiveContext.sql("create table if not exists student_info(name string,age int) row format delimited fields terminated by ' '");
//将学生基本信息导入到Student_info表
hiveContext.sql("load data local inpath '/home/hadoop/student_info.txt' into table student_info");
//用同样的方式向student_scores导入数据
hiveContext.sql("drop table if exists student_score");
hiveContext.sql("create table if not exists student_score(name string,score int)row format delimited fields terminated by ' '");
hiveContext.sql("load data local inpath '/home/hadoop/student_score.txt' into table student_score");
//第二个功能个,执行sql还可以返回dataframe,用于查询
//执行sql查询,关联两种表,查询成绩大于80 分的学生信息
val goodstudentDF = hiveContext.sql("select i.name,i.age,s.score from student_info i join student_score s on i.name=s.name where s.score >=80");
//第三个功能,可以将dataframe中的数据,理论上来说,dataframe对应的RDD的元素是ROW就可以
//将Dataframe中的数据保存到hive表中
//将dataframe中的数据保存到good_student_info表中
hiveContext.sql("drop table if exists good_student_info");
goodstudentDF.saveAsTable("good_student_info");
//第四个功能,可以使用table()方法,针对hive表直接创建dataframe
//针对good_student_info表,直接创建dataframe
val rows = hiveContext.table("good_student_info").collect();
for ( row <- rows) {
println(row);
}
}
}
六、内置函数
在spark 1.5.x版本,增加了一系列内置函数到dataframe API中,并且实现了code-generation的优化。与普通的函数不同,dataframe的函数并不会执行后立即返回一个结果值,而是返回一个Column对象,用于在并行作业中进行求值。Column可以用在dataframe的操作之中,比如select, filter,groupBy等操作。函数的输入值,也可以是Column。
种类 函数
聚合函数 approxCountDistinct,avg,count,
countDistinct,first,last,max,mean,min,sum,sumDistinct
集合函数 Array_contains,explode,size,sort_array
日期/时间函数 日期时间转换
Unix_timestamp,from_unixtime,to_date
Quarter,day,dayofyear,weekofyear,
From_utc_timestamp,to_utc_timestamp
从日期中提取字段
Year,month,dayofmonth,hour,minute,
second
日期/时间函数 日期时间计算
Dateiff,date_add,date_sub,add_months,
last_day,next_day,months_between
获取当前时间
Current_date,current_timestamp,trunc,
date_format
数学函数 Abs,scros,asin,atan,atan2,bin,cbrt,
ceil,conv,cos,sosh,exp,expm1,factorial,floor,hex,hypot,log,log10,log1p,log2,
Pmod,pow,rint,round,shiftLeft,
shiftRight,shiftRightUnsigned,sifnum,
Sin,sinh,sqrt,tan,tanh,toDegrees,
toRadians,unhex
混合函数 Array,bitwiseNOT,callUDF,coalesce,
crc32,greatest,if,inputFileName,isNaN,
isnotnull,isnull,least,lit,md5,
monotonicalliIncreasingId,nanvl,negate,not,rand,randn,randn
sha,sha1,sparkPartitionId,struct,when
字符串函数 Ascii,base64,concat,concat_ws,decode,encode,format_number,format_string,get_json_object,initcap,instr,length,levenshtein,locate,lower,lpad,ltrim,printf,redexp_extract,regexp_replace,repeat,reverse,rpad,rtrim,soundex,space,split,substring,substring_index,translate,trim,unbase64,upper
窗口函数 cumeDist,denseRank,lag,lead,ntile,percentRank,rank,rowNumber
案例:
根据每天的用户访问和购买日志,统计每日的UV和销售额
统计每日的UV
package com.spark.spark_sql
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.functions._
object DailyUV {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("DailyUV")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
//构造用户 访问日志数据,并创建dataframe
//要使用spark内置函数,就必须在这里代入sparkcontext下的隐式转换
import sqlContext.implicits._
//模拟用户访问日志,日志用逗号隔开,第一列是日期,第二列是用户id
val userAccessLog = Array("2015-10-01,1122","2015-10-01,1122", "2015-10-01,1123", "2015-10-01,1125","2015-10-01,1125", "2015-10-02,1122", "2015-10-02,1124","2015-10-02,1122", "2015-10-02,1123")
val userAccessLogRDD= sc.parallelize(userAccessLog, 1);
//将模拟出来的用户日志RDD,转换为dataframe
//首先将普通的RDD转换为row的RDD
val userAccessLogRowRDD = userAccessLogRDD.map(log =>Row(log.split(",")(0),log.split(",")(1).toInt))
//构造dataframe元数据
val structType = StructType(Array(StructField("date",StringType,true),StructField("userid",IntegerType,true)))
val userAccessLogRowDF=sqlContext.createDataFrame(userAccessLogRowRDD, structType)
//这里正式使用spark1.5.x版本的提供的最新特性,内置函数,countDistinct
//每天都有很多用户来访问,但是每个用户可能每天都会访问很多次
//UV:指的是对用户进行去重以后的访问总数
//聚合函数的用法;
//首先对dataframe调用groupBy()方法,对某一列进行分组
//然后调用agg()方法,第一个参数,必须必须传入之前在groupBy方法中出现的字段
//第二个参数,传入countDistinct、sum,first等,spark提供的内置函数
//内置函数中,传入的参数也是用单引号作为前缀的,其他的字段
userAccessLogRowDF.groupBy("date").agg('date, countDistinct('userid)).map(row => Row(row(1),row(2))).collect.foreach(println)
}
}
统计每日的销售额
package com.spark.spark_sql
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.functions._
object DailySale {
val conf = new SparkConf().setMaster("local").setAppName("DailySale")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
//模拟元数据
//实际上,有些时候,会出现日志的上报错误,比如日志里丢了用户的信息,那么这种,就一律不统计了
val userSaleLog = Array("2015-10-01,100,1122","2015-10-01,100,1133","2015-10-01,100,","2015-10-02,300,1122","2015-10-02,200,1122","2015-10-02,100,","2015-10-02,100,1122")
val userSaleLogRDD = sc.parallelize(userSaleLog, 1);
//进行过滤
val filteredUserSaleRowRDD = userSaleLogRDD.filter(log => if(log.split(",").length ==3) true else false)
val userSaleLogRowRDD = filteredUserSaleRowRDD.map(log => Row(log.split(",")(0),log.split(",")(1).toDouble))
val structType = StructType(Array(StructField("date",StringType,true),StructField("sale_amount",DoubleType,true)))
val userSaleLogDF = sqlContext.createDataFrame(userSaleLogRowRDD, structType)
//开始统计每日销售额的统计
userSaleLogDF.groupBy("date").agg('date, sum('sale_amount)).map(row => Row(row(1),row(2))).collect().foreach(println)
}
七、开窗函数
Spark1.4.x版本以后,为spark sql和dataframe引入了开窗函数,比如最经典最常用的row_number(),可以让我们实现分组取topN的逻辑
案例:统计每个种类的销售额排名前3 的产品
package com.spark.spark_sql;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.hive.HiveContext;
/**
* 开窗函数row_number
* @author Administrator
*/
public class RowNumberWindowFunction {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("NewNumberWindowFunction");
JavaSparkContext sc = new JavaSparkContext(conf);
HiveContext hiveContext = new HiveContext(sc.sc());
//创建销售表sales表
hiveContext.sql("drop table if exists sales");
hiveContext.sql("create table if not exists sales (product string,category string ,revenue bigint) row format delimited fields terminated by ' '");
hiveContext.sql("load data local inpath '/home/hadoop/sales.txt' into table sales");
//开始编写我们的统计逻辑,使用row_number()开窗函数
//先说明一下,row_number()开窗函数的作用
//其实就是给每一个分组的数据,按照其排序顺序,打上一个分组内的行号
//比如说,有一个分组date=20151001,里面有三条数据,1122 1121 1124
//那么对这个分组的每一行使用row_number()开窗函数以后,三行依次会获得一个组内的行号
//行号从1开始递增,比如 1122 1, 1121 2, 1124 3
DataFrame top3SalesDF = hiveContext.sql("select product,category,revenue from "
+ "(select product,category,revenue ,row_number() over (partition by category order by revenue desc ) rank from sales) tmp_sales where rank <=3");
//row_number()开窗函数的语法说明
//首先,可以在select查询时,使用row_number()函数
//其次,row_number()函数后面先跟上over关键字
//然后括号中是partition by 也就是根据那个字段进行分组
//其次是可以使用order by进行组内排序
//然后row_number()就可以给每一个组内的行,一个组一个行号
//将每组排名前三的数据,保存到一个表中
hiveContext.sql("drop table if exists top3_sales");
top3SalesDF.saveAsTable("top3_sales");
sc.close();
}
}
函数名(列) OVER(选项)
八、UDF自定义函数
UDF:User Defined Function.用户自定义函数
package com.spark.spark_sql
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
object UDF {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("UDF")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
//构造模拟数据
val names =Array("Leo","Marry","Jack","Tom")
val namesRDD = sc.parallelize(names, 1)
val namesRowRDD = namesRDD.map(name => Row(name))
val structType =StructType(Array(StructField("name",StringType)))
val namesRowDF =sqlContext.createDataFrame(namesRowRDD, structType)
//注册一张name表
namesRowDF.registerTempTable("names")
//定义和注册自定义函数
//定义函数
//注册函数
sqlContext.udf.register("strLen",(str:String) => str.length)
//使用自己的函数
sqlContext.sql("select name,strLen(name) from names").collect.foreach(println)
}
}
九、UDAF自定义聚合函数
UDAF:User Defined Aggregate Function。用户自定义聚合函数,是spark1.5.x引入的最新特性。
UDF针对的是单行输入,返回一个输出,
UDAF,则可以针对多行输入,进行聚合计算,返回一个输出,功能更加的强大
Scala代码:
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
class StringCount extends UserDefinedAggregateFunction {
//inputschema 指的是,输入数据的数据
override def inputSchema: StructType = {
StructType(Array(StructField("str",StringType,true)))
}
//bufferSchema,指的是,中间进行聚合时,所处理的数据类型
override def bufferSchema: StructType = {
StructType(Array(StructField("count",IntegerType,true)))
}
//dataType,指的是,函数的返回值的类型
override def dataType: DataType = {
IntegerType
}
override def deterministic: Boolean = {
true
}
//为每一个分组的数据执行初始化操作
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0)=0
}
//指的是,每个分组,有新的值进来的时候,如何进行分组,对应的聚合值的计算
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0)=buffer.getAs[Int](0)+1
}
//由于spark是分布式的,所以一个分组的数据,可能会在不同的节点上进行局部的聚合,就是update
//但是,最后一个分组,在各个节点上的聚合值,要执行merge,也就是合并
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0)=buffer1.getAs[Int](0)+buffer2.getAs[Int](0)
}
//最后,指的是,一个分组的聚合,如何通过中间的缓存聚合值,随后返回一个最终的聚合值
override def evaluate(buffer: Row): Any = {
buffer.getAs[Int](0)
}
}
十、工作原理以及性能调优
1.工作原理
Sqlparse、Analyser、Optimizer、SparkPlan
2.性能调优
1.设置shuffle过程中的并行度spark.sql.shuffle.partitions(sqlContext.setConf())
2.在hive数据仓库建设过程中,合理设置数据类型,比如能设置为int的就不要设置为bigint.减少数据类型导致的不必要的内存开销
3.编写sql时,尽量给出明确的列名,比如select name from students。不要写select * 的方式
4.并行处理查询结果:对于spark sql查询的结果,如果数据量比较大的话,比如超多1000条,那么就不要一次collect到driver在处理。使用foreach()算子,并行处理查询结果
5.缓存表:对于一条sql语句中可能多次使用到表,可以对其进行缓存没使用sqlContext.cache Table(tableName),或者DataFrame.cache()即可spark sql会用内存列存储的格式进行表的缓存。然后将spark sql就可以仅仅扫描需要使用的列,并且自动优化压缩,来最小化内存使用和GC开销。sqlcontext.uncacheTable(tableName)可以将表从缓存中移除。用sqlcontext.setConf(),设置spark.sql.inMemoryColumnarStorage.batchSize参数(默认10000),可以配置列存储单位
6.广播join表:spark.sql.autoBroadcastJoinThreshold.默认是10485760(10MB),也就是在10M以内的表将其广播出去,在内存足够用的情况下,增加其大小,让更多的表广播出去,就可以将join中的较小的表广播出去,而不是进行网络数据传输了。
7.钨丝计划:spark.sql.tungsten.enable,默认是true 自动管理内存
最有效的就是第四点,缓存表和广播join表也是非常不错的
十一、Hive on spark
背景:
Hive是目前大数据领域,事实上的sql标准。其底层默认是基于MapReduce实现的。但是由于mapreduce速度是在比较慢。因此这两年,陆续出来了新的sql查询引擎。包括spark sql,hive on Taz ,hive on spark
Spark sql 与hive on spark 是不一样的,spark sql 自己研发出来的针对各种数据源,包括hive、json、parquet、JDBC、RDD等都可以执行查询的,一套基于spark计算引擎的查询引擎。因此它是spark的一个项目,只不过提供了针对hive执行查询的功能而已。适合在一些使用spark技术栈的大数据应用类系统中使用。
而hive on spark是hive 的一个项目,他是指,不通过mapreudce作为唯一的引擎,而是将spark作为底层的查询引擎,hive on spark ,只适用于hive,在可预见的未来,很有可能hive默认的底层引擎从mapreduce切换为spark,适合于将原有的hive数据仓库以及数据分析替换为spark引擎,作为公司通用大数据统计计算分析引擎
Hive基本的工作原理:
hiveQL语句==>
语法分==>AST==>
生成逻辑执行计划==>operator Tree==>
优化逻辑执行计划==>optimized operator Tree==>
生成物理执行计划==>Task Tree==>
优化物理执行计划==>Optimized Task Tree==>
执行优化后的Optimized Task Tree
Hive on spark 的计算原理有如下的几个要点:
1.将hive表作为spark RDD来进行操作
2.使用hive 原语
对于一些针对RDD的操作,比如,groupByKey(),sortByKey()等等。不使用spark的transformation操作和原语。如果那样做的话,那么就需要重新实现一套hive的原语,而且hive增加了新功能,那么又要实现新的spark 原语。因此,选择将hive的原语包装为针对RDD的操作即可
3.新的物理执行计划生成机制
使用sparkCompiler将逻辑执行计划,集=即Operator Tree,转换为Task tree。提交spark task给spark进行执行。Sparktask包装了DAG,DAG包装为sparkwork,Sparktask根据sparkwork表示的DAG计算
4.sparkContext生命周期
Hive on spark 会为每一个用户的会话,比如执行一次sql语句,创建一个sparkcontext,但是spark不允许在一个JVM内创建多个sparkcontext,因此,需要在单独额JVM中启动每一个会话的sparkcontext,然后通过RPC与远程JVM中的sparkContext进行通信。
5.本地和远程运行模式
Hive on spark 提供两种运行模式,本地和远程。如果将spark master设置为local
十二、Spark sql与spark core整合
案例:每日top3热点搜索词统计案例
日志格式:
日期、用户、搜索词、城市、平台、版本
需求:
1.筛选出符合条件(城市、平台、版本))的数据
2.统计出每天搜索的UV排名前三的搜索词
3.按照每天的top3搜索词的UV搜总次数,倒序排列
4.将数据保存到hive表中
实现思路分析
1.针对原始的数据(HDFS文件),获取输入RDD
2.使用filter算子,去针对输入RDD中的数据,进行数据过滤,过滤出符合条件的数据
2.1普通的做法:直接在filter算子函数中,使用外部的查询条件(map),但是这样的话,是不是查询条件map,会发送到每一个task上一份副本(性能并不好)
2.2优化后的做法:将查询条件,封装为Broadcast广播变量,在filter算子中使用Broadcast广播变量
3.将数据转换为“日期_搜索词,用户”的格式,然后对他进行分组,再次进行映射。对每天每个搜索词的搜索用户进行去重操作,并统计去重后的数量,即为每天每个搜索词的UV,最后获得(日期_搜索词,UV)
4.将得到的每天的搜索词的UV,RDD映射为元素类型的Row的RDD,将RDD转换为dataframe
5.将dataframe注册为临时表,使用spark sql的开窗函数,来统计每天的UV排名前三的搜索词,以及他的搜索UV,最后获知,是一个dateframe
6.将dateframe转换为RDD。继续操作,按照每天日期进行分组。并进行映射。计算出每天的top3所搜词的搜索uv的总数,然后将UV总数作为key,将每天的top3搜索词以及搜索次数,拼接为一个字符串
7.按照每天的top3搜索总UV,进行排序,倒序排序
8.将排好序的数据,再次映射回来,变成“日期_搜索词_UV”的格式
9.再次映射为dataframe,并将数据保存到hive表中
package com.spark.spark_sql;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
/**
* 每日top3热点搜索词统计
* @author Administrator
*
*/
public class DailyTop3Keyword {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("DailyTop3Keyword");
JavaSparkContext jsc = new JavaSparkContext(conf);
HiveContext hiveContext = new HiveContext(jsc.sc());
//针对hdfs上的文件中的日志,获取输入的RDD
JavaRDD<String> rawRDD = jsc.textFile("hdfs://hadoop01:8020/keyword.txt");
//伪造一份数据,查询条件
//备注:实际上,在实际的企业的项目开发中,很可能,这个查询条件是J2EE平台发送到MySQL表中的
//然后,这里实际上通常是会用Spring框架和ORM框架的,去提取MySQL表中的查询条件
Map<String,List<String>> queryParamMap =new HashMap<String, List<String>>();
queryParamMap.put("city", Arrays.asList("beijing"));
queryParamMap.put("platform", Arrays.asList("android"));
queryParamMap.put("version", Arrays.asList("1.0","1.2","1.5","2.0"));
//将map封装为广播变量,每个work节点只拷贝一份数据即可,这样可以进行优化
final Broadcast<Map<String, List<String>>> queryParamMapBroadcast = jsc.broadcast(queryParamMap);
//使用广播变量进行筛选
JavaRDD<String> filterRDD = rawRDD.filter(new Function<String, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(String log) throws Exception {
String[] logSplited = log.split(" ");
String city =logSplited[3];
String platform =logSplited[4];
String version = logSplited[5];
//与查询条件进行比较,任何一个
Map<String, List<String>> queryParamMap = queryParamMapBroadcast.value();
List<String> cities = queryParamMap.get("city");
if(cities.size()>0 && !cities.contains(city)){
return false;
}
List<String> platforms = queryParamMap.get("city");
if(platforms.size()>0 && !platforms.contains(platform)){
return false;
}
List<String> versions = queryParamMap.get("city");
if(versions.size()>0 && !versions.contains(version)){
return false;
}
return true;
}
});
//将过滤出来的原始的日志进行映射为(日期_搜索词,用户)的格式
JavaPairRDD<String, String> dateKeywordUserRDD = filterRDD.mapToPair(new PairFunction<String, String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> call(String t) throws Exception {
String[] logSplited = t.split(" ");
String date =logSplited[0];
String user =logSplited[1];
String keyword =logSplited[2];
return new Tuple2<String, String>(date+"_"+keyword, user);
}
});
//进行分组,获取每天的搜索词,有哪些用户进行搜索了(没有去重)
JavaPairRDD<String, Iterable<String>> dateKeywordUsersRDD = dateKeywordUserRDD.groupByKey();
//对每天每个搜索词的搜索用户,执行去重操作,获得其UV值
JavaPairRDD<String, Long> dateKeywordUVRDD = dateKeywordUsersRDD.mapToPair(new PairFunction<Tuple2<String,Iterable<String>>, String, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Long> call(Tuple2<String, Iterable<String>> t) throws Exception {
String dateKeyword = t._1;
Iterator<String> users = t._2.iterator();
//对用户进行去重,并且统计数量
List<String> distinctUsers = new ArrayList<String>();
while(users.hasNext()){
String user = users.next();
if(!distinctUsers.contains(user)){
distinctUsers.add(user);
}
}
//获取uv
long uv = distinctUsers.size();
return new Tuple2<String, Long>(dateKeyword, uv);
}
});
//将每天每个搜索词的UV数据转换成dataframe
JavaRDD<Row> dateKeywordUVRowRDD = dateKeywordUVRDD.map(new Function<Tuple2<String,Long>, Row>() {
@Override
public Row call(Tuple2<String, Long> v1) throws Exception {
String date = v1._1.split("_")[0];
String keyword = v1._1.split("_")[1];
long uv = v1._2;
return RowFactory.create(date,keyword,uv);
}
});
List<StructField> structFields = Arrays.asList(
DataTypes.createStructField("date", DataTypes.StringType, true),
DataTypes.createStructField("keyword", DataTypes.StringType,true),
DataTypes.createStructField("uv", DataTypes.LongType,true)
);
StructType structType =DataTypes.createStructType(structFields);
DataFrame dateKeywordUVDF = hiveContext.createDataFrame(dateKeywordUVRowRDD, structType);
//使用spark sql的开窗函数,统计每天的搜索排名前三的热点搜索词
dateKeywordUVDF.registerTempTable("daily_keyword_uv");
DataFrame dailyTop3KeywordDF = hiveContext.sql("select date,keyword,uv from "
+ "(select date,keyword,uv,row_number over (partition by date order by uv desc ) rank from daily_keyword_uv )tmp "
+ "where rank <=3");
//将dateframe 转换为RDD,然后映射,计算出每天的top3搜索词的搜索uv的总数
JavaPairRDD<String, String> Top3DateKeywordUvRDD = dailyTop3KeywordDF.javaRDD().mapToPair(new PairFunction<Row, String, String>() {
@Override
public Tuple2<String, String> call(Row row) throws Exception {
String date = String.valueOf(row.get(0));
String keyword =String.valueOf(row.get(1));
Long uv = Long.valueOf(String.valueOf(row.get(2)));
return new Tuple2<String, String>(date, keyword+"_"+uv);
}
});
JavaPairRDD<String, Iterable<String>> Top3DateKeywordRDD = Top3DateKeywordUvRDD.groupByKey();
JavaPairRDD<Long, String> uvDateKeywordsRDD = Top3DateKeywordRDD.mapToPair(new PairFunction<Tuple2<String,Iterable<String>>, Long, String>() {
@Override
public Tuple2<Long, String> call(Tuple2<String, Iterable<String>> t) throws Exception {
String date = t._1;
Iterator<String> keywordUvIterator = t._2.iterator();
Long totalUv = 0L;
String dateKeywordUv = date;
while(keywordUvIterator.hasNext()){
String keywordUv= keywordUvIterator.next();
Long uv = Long.valueOf(keywordUv.split("_")[1]);
totalUv+=uv;
dateKeywordUv+=","+keywordUv;
}
return new Tuple2<Long, String>(totalUv, dateKeywordUv);
}
});
//按照每天的总搜索进行倒序排序
JavaPairRDD<Long, String> sortedUvDateKeywordsRDD = uvDateKeywordsRDD.sortByKey(false);
//在此进行映射,将排序后的数据,映射回原始的格式Iterable<ROW>
JavaRDD<Row> sortedRowRDD = sortedUvDateKeywordsRDD.flatMap(new FlatMapFunction<Tuple2<Long,String>, Row>() {
@Override
public Iterable<Row> call(Tuple2<Long, String> t) throws Exception {
String dateKeywords=t._2;
String[] dateKeywordsSplited = dateKeywords.split(",");
String date = dateKeywordsSplited[0];
List<Row> rows= new ArrayList<Row>();
rows.add(RowFactory.create(date,
dateKeywordsSplited[1].split("_")[0],
Long.valueOf(dateKeywordsSplited[1].split("_")[1])
));
rows.add(RowFactory.create(date,
dateKeywordsSplited[1].split("_")[0],
Long.valueOf(dateKeywordsSplited[2].split("_")[1])
));
rows.add(RowFactory.create(date,
dateKeywordsSplited[1].split("_")[0],
Long.valueOf(dateKeywordsSplited[3].split("_")[1])
));
return rows;
}
});
//将最终的数据转换为dateframe
DataFrame finalDF = hiveContext.createDataFrame(sortedRowRDD, structType);
finalDF.saveAsTable("daily_top3_keyword_uv");
jsc.close();
}
}