ML 函数作为 pyspark UDF

ML 函数作为 pyspark UDF

问题描述:

我对 pyspark 和 python 有点陌生.我正在尝试将 ML 函数作为 pyspark UDF 运行.

I am a bit new to pyspark and python. I am trying to run ML function as pyspark UDF.

这是一个例子:

from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import StringType

df = spark.createDataFrame(['Bob has a dog. He loves him'], StringType())

def parse(text):
    import spacy
    import neuralcoref
    nlp = spacy.load('en_core_web_sm')
    # Let's try before using the conversion dictionary:
    neuralcoref.add_to_pipe(nlp)
    doc = nlp(text)
    return doc._.coref_resolved

 pd_udf = pandas_udf(parse, returnType=StringType())

 df.select(pd_udf(col("value"))).show()

出现此错误:

org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/user/tools/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/home/user/tools/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/user/tools/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 286, in dump_stream
    for series in iterator:
  File "<string>", line 1, in <lambda>
  File "/home/user/tools/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 101, in <lambda>
    return lambda *a: (verify_result_length(*a), arrow_return_type)
  File "/home/user/tools/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 92, in verify_result_length
    result = f(*a)
  File "/home/user/tools/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<stdin>", line 7, in parse
  File "/home/user/anaconda3/lib/python3.7/site-packages/spacy/language.py", line 377, in __call__
    doc = self.make_doc(text)
  File "/home/user/anaconda3/lib/python3.7/site-packages/spacy/language.py", line 401, in make_doc
    return self.tokenizer(text)
TypeError: Argument 'string' has incorrect type (expected str, got Series)

是否可以在 Pyspark 上运行此代码?

Is this possible to run this code on Pyspark?

另一种方法是使用 Pandas UDF 如果您更熟悉 Pandas -

Another way would be to use Pandas UDF if you are more familiar with Pandas in general -

from pyspark import SparkContext
from pyspark.sql import SQLContext
from functools import reduce
import pyspark.sql.functions as F

from pyspark.sql.types import StringType,StructType,StructField,FloatType,ArrayType,IntegerType,DateType

from functools import partial

from pyspark.sql.functions import lit,array,pandas_udf,PandasUDFType

import pandas as pd

import spacy
import neuralcoref


#### Broadcast the load and makes it available across the worker nodes

nlp = sc.broadcast(spacy.load('en_core_web_sm'))


def udf_parse(text,input_col='value'):

    neuralcoref.add_to_pipe(nlp)
    
    doc = nlp(text.loc[:,input_col])
    
    text['parsed_text'] = doc._.coref_resolved
    
    return text
    
sc = SparkContext.getOrCreate()
sql = SQLContext(sc)

sparkDF = sql.createDataFrame(['Bob has a dog. He loves him'], StringType())

schema = StructType([
           StructField('value', StringType(), True),
           StructField('parsed_value', StringType(), True)
 ])

partial_func = partial(udf_parse,input_col='value')


sparkDF_agg = sparkDF.groupby().applyInPandas(partial_func,schema)