spark教程(七)-SparkSession and SparkContext

spark教程(七)-SparkSession and SparkContext

DataFrame

dataframe 类似于 pandas 中的 dataframe,也就是 表格数据

这是 spark2.x 中新增的数据格式,由 SparkSession 直接读取,不管文件是什么类型,txt也好,csv也罢,输出格式都是 dataframe

而 SparkContext 不管读什么文件,输出格式都是 RDD

>>> spark.read.text('README.md')
DataFrame[value: string]

# dataframe 的属性
['__class__', '__delattr__', '__dict__', '__doc__', '__format__', '__getattr__', '__getattribute__', '__getitem__', '__hash__', '__init__', '__module__', '__new__', '__reduce__', '__reduce_
ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_collectAsArrow', '_jcols', '_jdf', '_jmap', '_jseq', '_lazy_rdd', '_repr_html_', '_sc', '_schema', 
'_sort_cols', '_support_repr_html', 'agg', 'alias', 'approxQuantile', 'cache', 'checkpoint', 'coalesce', 'colRegex', 'collect', 'columns', 'corr', 'count', 'cov', 'createGlobalTempView',
'createOrReplaceGlobalTempView', 'createOrReplaceTempView', 'createTempView', 'crossJoin', 'crosstab', 'cube', 'describe', 'distinct', 'drop', 'dropDuplicates', 'drop_duplicates', 'dropna',
'dtypes', 'exceptAll', 'explain', 'fillna', 'filter', 'first', 'foreach', 'foreachPartition', 'freqItems', 'groupBy', 'groupby', 'head', 'hint', 'intersect', 'intersectAll', 'isLocal', 'isStreaming',
'is_cached', 'join', 'limit', 'localCheckpoint', 'na', 'orderBy', 'persist', 'printSchema', 'randomSplit', 'rdd', 'registerTempTable', 'repartition', 'repartitionByRange','replace', 'rollup', 'sample',
'sampleBy', 'schema', 'select', 'selectExpr', 'show', 'sort', 'sortWithinPartitions', 'sql_ctx', 'stat', 'storageLevel', 'subtract', 'summary', 'take', 'toDF', 'toJSON', 'toLocalIterator', 'toPandas', 'union',
'unionAll', 'unionByName', 'unpersist', 'where', 'withColumn', 'withColumnRenamed', 'withWatermark', 'write', 'writeStream']

datafram 的操作不同于 RDD,类似于 pandas

 

后面还会细讲 dataframe

SparkSession and SparkContext

SparkSession 是 spark2.x 引入的新概念,SparkSession 为用户提供统一的切入点,字面理解是创建会话,或者连接 spark

在 spark1.x 中,SparkContext 是 spark 的主要切入点,由于 RDD 作为主要的 API,我们通过 SparkContext 来创建和操作 RDD,

这个问题在于:

1. 不同的应用中,需要使用不同的 context,在 Streaming 中需要使用 StreamingContext,在 sql 中需要使用 sqlContext,在 hive 中需要使用 hiveContext,比较麻烦

2. 随着 DataSet 和 DataFrame API 逐渐成为标准 API,需要为他们创建接入点,即 SparkSession

SparkSession 实际上封装了 SparkContext,另外也封装了 SparkConf、sqlContext,随着版本增加,可能更多,

所以我们尽量使用 SparkSession ,如果发现有些 API 不在 SparkSession 中,也可以通过 SparkSession 拿到 SparkContext 和其他 Context 等

在 shell 操作中,原生创建了 SparkSession,故无需再创建,创建了也不会起作用

在 shell 中,SparkContext 叫 sc,SparkSession 叫 spark

通过 spark 拿到 sc

>>> dir(spark)
['Builder', '__class__', '__delattr__', '__dict__', '__doc__', '__enter__', '__exit__', '__format__', '__getattribute__', '__hash__', '__init__', '__module__', '__new__', '__reduce__', '__r
educe_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_convert_from_pandas', '_createFromLocal', '_createFromRDD', '_create_from_pandas_with_arrow', 
'_create_shell_session', '_get_numpy_record_dtype', '_inferSchema', '_inferSchemaFromList', '_instantiatedSession', '_jsc', '_jsparkSession', '_jvm', '_jwrapped', '_repr_html_', '_sc', '_wrapped',
'builder', 'catalog', 'conf', 'createDataFrame', 'newSession', 'range', 'read', 'readStream', 'sparkContext', 'sql', 'stop', 'streams', 'table', 'udf', 'version'] spark.sparkContext # 即 sc

简单操作

>>> sc.textFile('README.md').count()
>>> spark.read.text('README.md').count()

创建 SparkContext

class SparkContext(__builtin__.object):
    def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, gateway=None, jsc=None, profiler_cls=<cl
    ass 'pyspark.profiler.BasicProfiler'>)
    '''Create a new SparkContext. At least the master and app name should be set,
     |      either through the named parameters here or through C{conf}.
     |
     |      :param master: Cluster URL to connect to
     |             (e.g. mesos://host:port, spark://host:port, local[4]).   local 表示本地运行,4 表示使用4个 cpu核
     |      :param appName: A name for your job, to display on the cluster web UI.
     |      :param sparkHome: Location where Spark is installed on cluster nodes.
     |      :param pyFiles: Collection of .zip or .py files to send to the cluster
     |             and add to PYTHONPATH.  These can be paths on the local file
     |             system or HDFS, HTTP, HTTPS, or FTP URLs.
     |      :param environment: A dictionary of environment variables to set on
     |             worker nodes.
     |      :param batchSize: The number of Python objects represented as a single
     |             Java object. Set 1 to disable batching, 0 to automatically choose
     |             the batch size based on object sizes, or -1 to use an unlimited
     |             batch size
     |      :param serializer: The serializer for RDDs.
     |      :param conf: A L{SparkConf} object setting Spark properties.
     |      :param gateway: Use an existing gateway and JVM, otherwise a new JVM
     |             will be instantiated.
     |      :param jsc: The JavaSparkContext instance (optional).
     |      :param profiler_cls: A class of custom Profiler used to do profiling
     |             (default is pyspark.profiler.BasicProfiler).
     |
     |
     |      >>> from pyspark.context import SparkContext
     |      >>> sc = SparkContext('local', 'test')
     |
     |      >>> sc2 = SparkContext('local', 'test2') # doctest: +IGNORE_EXCEPTION_DETAIL
     |      Traceback (most recent call last):
     |          ...
     |      ValueError:...'''

示例如下

from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName('myapp1').setMaster('local[4]')   # 设定 appname 和 master
sc = SparkContext(conf=conf)

## 或者直接这样
sc = SparkContext("spark://hadoop10:7077")

创建 SparkSession

class SparkSession(__builtin__.object):
     def __init__(self, sparkContext, jsparkSession=None):
     ''' Creates a new SparkSession.
     |
     |      >>> from datetime import datetime
     |      >>> spark = SparkSession(sc)
     |      >>> allTypes = sc.parallelize([Row(i=1, s="string", d=1.0, l=1,
     |      ...     b=True, list=[1, 2, 3], dict={"s": 0}, row=Row(a=1),
     |      ...     time=datetime(2014, 8, 1, 14, 1, 5))])
     |      >>> df = allTypes.toDF()
     |      >>> df.createOrReplaceTempView("allTypes")
     |      >>> spark.sql('select i+1, d+1, not b, list[1], dict["s"], time, row.a '
     |      ...            'from allTypes where b and i > 0').collect()
     |      [Row((i + CAST(1 AS BIGINT))=2, (d + CAST(1 AS DOUBLE))=2.0, (NOT b)=False, list[1]=2,             dict[s]=0, time=datetime.datetime(2014, 8, 1, 14, 1, 5), a=1)]
     |      >>> df.rdd.map(lambda x: (x.i, x.s, x.d, x.l, x.b, x.time, x.row.a, x.list)).collect()
     |      [(1, u'string', 1.0, 1, True, datetime.datetime(2014, 8, 1, 14, 1, 5), 1, [1, 2, 3])]'''

示例如下

from pyspark.sql import SparkSession

conf = SparkConf().setAppName('myapp1').setMaster('local[4]')   # 设定 appname 和 master
sess = SparkSession.builder.config(conf=conf)

## 或者这样
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName('myapp1').setMaster('local[4]')   # 设定 appname 和 master
sc = SparkContext(conf=conf)
sess = SparkSession(sc)

参考资料:

https://blog.csdn.net/qq_21383435/article/details/77371142  spark学习-SparkSQL--12-SparkSession与SparkContext

https://blog.csdn.net/cjhnbls/article/details/79254188