在Spark数据帧中查找

在Spark数据帧中查找

问题描述:

我正在使用Spark 1.6,我想知道如何在数据帧的查找中实现.

I am using Spark 1.6 and I would like to know how to implement in lookup in the dataframes.

我有两个数据框员工&部门.

I have two dataframes employee & department.

  • 员工数据框

  • Employee Dataframe

-------------------
Emp Id | Emp Name
------------------
1 | john
2 | David

  • 部门数据框

  • Department Dataframe

    --------------------
    Dept Id | Dept Name | Emp Id
    -----------------------------
    1 | Admin | 1
    2 | HR | 2
    

  • 我想从employee表到部门表中查找emp id,并获取部门名称.因此,结果集将是

    I would like to lookup emp id from the employee table to the department table and get the dept name. So, the resultset would be

    Emp Id | Dept Name
    -------------------
    1 | Admin
    2 | HR
    

    如何在SPARK中实现此查找UDF功能.我不想在两个数据帧上都使用JOIN.

    How do I implement this look up UDF feature in SPARK. I don't want to use JOIN on both the dataframes.

    正如注释中已经提到的,加入数据框是解决问题的方法.

    As already mentioned in the comments, joining the dataframes is the way to go.

    您可以使用查找,但是我认为没有分布式"解决方案,即您必须将查找表收集到驱动程序内存中.还要注意,这种方法假定EmpID是唯一的:

    You can use a lookup, but I think there is no "distributed" solution, i.e. you have to collect the lookup-table into driver memory. Also note that this approach assumes that EmpID is unique:

    import org.apache.spark.sql.functions._
    import sqlContext.implicits._
    import scala.collection.Map
    
    val emp = Seq((1,"John"),(2,"David"))
    val deps = Seq((1,"Admin",1),(2,"HR",2))
    
    val empRdd = sc.parallelize(emp)
    val depsDF = sc.parallelize(deps).toDF("DepID","Name","EmpID")
    
    
    val lookupMap = empRdd.collectAsMap()
    def lookup(lookupMap:Map[Int,String]) = udf((empID:Int) => lookupMap.get(empID))
    
    val combinedDF = depsDF
      .withColumn("empNames",lookup(lookupMap)($"EmpID"))
    

    我最初的想法是将empRdd传递给UDF并使用在PairRDD上定义的lookup方法,但这当然不起作用,因为您不能在转换中包含火花动作(即lookup) (即UDF).

    My initial thought was to pass the empRdd to the UDF and use the lookup method defined on PairRDD, but this does of course not work because you cannot have spark actions (i.e. lookup) within transformations (ie. the UDF).

    如果您的empDf有多个列(例如Name,Age),则可以使用此

    If your empDf has multiple columns (e.g. Name,Age), you can use this

    val empRdd = empDf.rdd.map{row =>
          (row.getInt(0),(row.getString(1),row.getInt(2)))}
    
    
        val lookupMap = empRdd.collectAsMap()
        def lookup(lookupMap:Map[Int,(String,Int)]) =
             udf((empID:Int) => lookupMap.lift(empID))
    
        depsDF
          .withColumn("lookup",lookup(lookupMap)($"EmpID"))
          .withColumn("empName",$"lookup._1")
          .withColumn("empAge",$"lookup._2")
          .drop($"lookup")
          .show()