Spark之spark.sql spark-shell之spark.sql



数据结构

订单表: badou.orders

字段(string)

  • order_id 订单id
  • user_id 用户id
  • eval_set 值:prior(历史行为),train(训练,test中user已经购买的商品),test(预测的数据集)
  • order_number 订单编号
  • order_dow 周几下的订单
  • order_hour_of_day 一天中的哪个小时(24小时制)
  • days_since_prior_order order_number与上一个订单的间隔天数

行为表: badou.priors

字段(string)

  • order_id 订单id,与orders订单表关联
  • product_id 订单中的产品id
  • add_to_cart_order 加购物车的位置
  • reordered 有没有被再次购买,以用户为单位,首次为0,再次为1

登录spak-shell

  • 在yarn上运行spark-shell
[root@master ~]# /usr/local/src/spark-2.0.2-bin-hadoop2.6/bin/spark-shell --master yarn
  • 引包,生成表df
scala> import spark.sql
scala> val orders=sql("select * from badou.orders")
scala> val priors=sql("select * from badou.priors")

练习

product 统计/特征

统计product被购买的数据量

scala> priors.groupBy("product_id").agg(count("product_id") as "cnt_product").show(3)
+----------+-----------+
|product_id|cnt_product|
+----------+-----------+
|     48370|       3934|
|     10096|        792|
|     16974|       2773|
+----------+-----------+

统计product被reordered的数量(再次购买),以及再次购买的比率

scala> priors.selectExpr("product_id","cast(reordered as int) as reordered").groupBy("product_id").agg(sum("reordered") as "sum_r",count("product_id") as "cnt_product",avg("reordered") as "avg_reordered").show(3)
+----------+-----+-----------+------------------+
|product_id|sum_r|cnt_product|     avg_reordered|
+----------+-----+-----------+------------------+
|     48370| 2751|       3934| 0.699288256227758|
|     10096|  503|        792|  0.63510101010101|
|     16974| 1845|       2773|0.6653443923548503|
+----------+-----+-----------+------------------+

user 统计/特征

每个用户平均购买订单的间隔周期

scala> val ord=orders.selectExpr("user_id","if(days_since_prior_order='',0,days_since_prior_order) as dspo")
scala> ord.selectExpr("user_id","cast(dspo as int) as dspo").groupBy("user_id").agg(sum("dspo") as "sum_dspo",count("user_id") as "cnt_user_id",avg("dspo") as "avg_dspo").show(5)
+-------+--------+-----------+------------------+
|user_id|sum_dspo|cnt_user_id|          avg_dspo|
+-------+--------+-----------+------------------+
| 104454|     147|          6|              24.5|
| 104603|     132|          6|              22.0|
| 104665|     273|         18|15.166666666666666|
| 104870|      90|          4|              22.5|
| 105344|      60|          5|              12.0|
+-------+--------+-----------+------------------+

每个用户的总订单数量

scala> orders.groupBy("user_id").count().show(5)
+-------+-----+
|user_id|count|
+-------+-----+
| 104454|    6|
| 104603|    6|
| 104665|   18|
| 104870|    4|
| 105344|    5|
+-------+-----+

每个用户购买的product商品去重后的集合数据

scala> import spark.implicits._
scala> val op=orders.join(priors,"order_id").selectExpr("user_id","product_id").distinct
scala> val req=op.rdd.map(x=>(x(0).toString,x(1).toString)).groupByKey().mapValues(_.mkString(","))
scala> req.toDF("user_id","product_id").filter(col("user_id")===124168).show()
+-------+-----------------------------+
|user_id|product_id                   |
+-------+-----------------------------+
|124168 |14303,20082,11323,46522,22108|
+-------+-----------------------------+
// 下面的是集合数据带 count
scala> val req=op.rdd.map(x=>(x(0).toString,x(1).toString)).groupByKey().mapValues{rs=>(rs.size,rs.mkString(","))}
scala> req.toDF("user_id","value").selectExpr("user_id","value._1 as product_cnt","value._2 as product_id_list").show(1,false)
+-------+-----------+-----------------------------+
|user_id|product_cnt|product_id_list                 |
+-------+-----------+-----------------------------+
|124168 |5          |14303,46522,20082,11323,22108|
+-------+-----------+-----------------------------+

import spark.implicits._ 是 RDD 转 DF 的一个隐式转换引包

每个用户购买的平均每个订单的商品数量

  1. 每个订单的商品数量
  2. 每个用户的总商品数量
  3. 每个用户有多少个订单
scala> val product_cnt = priors.selectExpr("order_id","product_id").groupBy("order_id").count()
scala> orders.join(product_cnt,"order_id").selectExpr("user_id","count as product_cnt").groupBy("user_id").agg(sum("product_cnt"),count("user_id"),avg("product_cnt")).withColumnRenamed("sum(product_cnt)","sum_product_cnt").show(3)
+-------+---------------+--------------+------------------+
|user_id|sum_product_cnt|count(user_id)|  avg(product_cnt)|
+-------+---------------+--------------+------------------+
| 115053|            414|            39|10.615384615384615|
|  65897|           1811|            83|21.819277108433734|
|  48147|            475|            95|               5.0|
+-------+---------------+--------------+------------------+