package com.lin.spark
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
/**
* Created by Yaooo on 2019/6/8.
*/
object SparkSQLExample {
case class Person(name:String,age:Long)
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("Spark SQL")
.config("spark.come.config.option","some-value")
.master("local[2]")
.getOrCreate()
runBasicDataFrameExample(spark)
runDatasetCreationExample(spark)
runInferSchemaExample(spark)
runProgrammaticSchemaExample(spark)
}
private def runProgrammaticSchemaExample(spark:SparkSession): Unit ={
import spark.implicits._
val personRDD = spark.sparkContext.textFile("src/main/resources/people.txt")
val schemaString = "name age"
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
val rowRDD = personRDD
.map(_.split(","))
.map(att => Row(att(0),att(1).trim))
val peopleDF = spark.createDataFrame(rowRDD,schema)
peopleDF.createOrReplaceTempView("people")
val results = spark.sql("select * from people")
results.map(att=>"Name : "+att(0)).show()
}
private def runInferSchemaExample(spark:SparkSession): Unit ={
import spark.implicits._
val personDF = spark.sparkContext
.textFile("src/main/resources/people.txt")
.map(_.split(","))
.map(attributes => Person(attributes(0),attributes(1).trim.toInt))
.toDF()
personDF.createOrReplaceTempView("people")
val teenagersDF = spark.sql("select * from people where age between 13 and 19")
teenagersDF.show()
teenagersDF.map(teenager =>"name: "+teenager(0)).show()
teenagersDF.map(teenager => "Name: "+ teenager.getAs[String]("name")).show()
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name","age"))).collect()
.foreach(println)
}
private def runDatasetCreationExample(spark:SparkSession): Unit ={
import spark.implicits._
val caseClassDS = Seq(Person("Andy",18)).toDF()
caseClassDS.show()
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_+1).collect().foreach(println)
val path = "src/main/resources/person.json"
val personDS = spark.read.json(path).as[Person]
personDS.show()
}
private def runBasicDataFrameExample(spark:SparkSession): Unit ={
import spark.implicits._
val df = spark.read.json("src/main/resources/person.json")
df.show()
df.printSchema()
df.select("name").show()
df.select($"name",$"age"+1).show()
df.filter($"age">21).show()
df.groupBy($"age").count().show()
/*df.createOrReplaceTempView("people")
val sqlDF = spark.sql("select * from people")
sqlDF.show()*/
df.createOrReplaceGlobalTempView("people")
spark.sql("select * from global_temp.people").show()
}
}