spark教程(七)-SparkSession and SparkContext
DataFrame
dataframe 类似于 pandas 中的 dataframe,也就是 表格数据
这是 spark2.x 中新增的数据格式,由 SparkSession 直接读取,不管文件是什么类型,txt也好,csv也罢,输出格式都是 dataframe
而 SparkContext 不管读什么文件,输出格式都是 RDD
>>> spark.read.text('README.md') DataFrame[value: string] # dataframe 的属性 ['__class__', '__delattr__', '__dict__', '__doc__', '__format__', '__getattr__', '__getattribute__', '__getitem__', '__hash__', '__init__', '__module__', '__new__', '__reduce__', '__reduce_ ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_collectAsArrow', '_jcols', '_jdf', '_jmap', '_jseq', '_lazy_rdd', '_repr_html_', '_sc', '_schema',
'_sort_cols', '_support_repr_html', 'agg', 'alias', 'approxQuantile', 'cache', 'checkpoint', 'coalesce', 'colRegex', 'collect', 'columns', 'corr', 'count', 'cov', 'createGlobalTempView',
'createOrReplaceGlobalTempView', 'createOrReplaceTempView', 'createTempView', 'crossJoin', 'crosstab', 'cube', 'describe', 'distinct', 'drop', 'dropDuplicates', 'drop_duplicates', 'dropna',
'dtypes', 'exceptAll', 'explain', 'fillna', 'filter', 'first', 'foreach', 'foreachPartition', 'freqItems', 'groupBy', 'groupby', 'head', 'hint', 'intersect', 'intersectAll', 'isLocal', 'isStreaming',
'is_cached', 'join', 'limit', 'localCheckpoint', 'na', 'orderBy', 'persist', 'printSchema', 'randomSplit', 'rdd', 'registerTempTable', 'repartition', 'repartitionByRange','replace', 'rollup', 'sample',
'sampleBy', 'schema', 'select', 'selectExpr', 'show', 'sort', 'sortWithinPartitions', 'sql_ctx', 'stat', 'storageLevel', 'subtract', 'summary', 'take', 'toDF', 'toJSON', 'toLocalIterator', 'toPandas', 'union',
'unionAll', 'unionByName', 'unpersist', 'where', 'withColumn', 'withColumnRenamed', 'withWatermark', 'write', 'writeStream']
datafram 的操作不同于 RDD,类似于 pandas
后面还会细讲 dataframe
SparkSession and SparkContext
SparkSession 是 spark2.x 引入的新概念,SparkSession 为用户提供统一的切入点,字面理解是创建会话,或者连接 spark
在 spark1.x 中,SparkContext 是 spark 的主要切入点,由于 RDD 作为主要的 API,我们通过 SparkContext 来创建和操作 RDD,
这个问题在于:
1. 不同的应用中,需要使用不同的 context,在 Streaming 中需要使用 StreamingContext,在 sql 中需要使用 sqlContext,在 hive 中需要使用 hiveContext,比较麻烦
2. 随着 DataSet 和 DataFrame API 逐渐成为标准 API,需要为他们创建接入点,即 SparkSession
SparkSession 实际上封装了 SparkContext,另外也封装了 SparkConf、sqlContext,随着版本增加,可能更多,
所以我们尽量使用 SparkSession ,如果发现有些 API 不在 SparkSession 中,也可以通过 SparkSession 拿到 SparkContext 和其他 Context 等
在 shell 操作中,原生创建了 SparkSession,故无需再创建,创建了也不会起作用
在 shell 中,SparkContext 叫 sc,SparkSession 叫 spark
通过 spark 拿到 sc
>>> dir(spark) ['Builder', '__class__', '__delattr__', '__dict__', '__doc__', '__enter__', '__exit__', '__format__', '__getattribute__', '__hash__', '__init__', '__module__', '__new__', '__reduce__', '__r educe_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_convert_from_pandas', '_createFromLocal', '_createFromRDD', '_create_from_pandas_with_arrow',
'_create_shell_session', '_get_numpy_record_dtype', '_inferSchema', '_inferSchemaFromList', '_instantiatedSession', '_jsc', '_jsparkSession', '_jvm', '_jwrapped', '_repr_html_', '_sc', '_wrapped',
'builder', 'catalog', 'conf', 'createDataFrame', 'newSession', 'range', 'read', 'readStream', 'sparkContext', 'sql', 'stop', 'streams', 'table', 'udf', 'version'] spark.sparkContext # 即 sc
简单操作
>>> sc.textFile('README.md').count() >>> spark.read.text('README.md').count()
创建 SparkContext
class SparkContext(__builtin__.object): def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, gateway=None, jsc=None, profiler_cls=<cl ass 'pyspark.profiler.BasicProfiler'>) '''Create a new SparkContext. At least the master and app name should be set, | either through the named parameters here or through C{conf}. | | :param master: Cluster URL to connect to | (e.g. mesos://host:port, spark://host:port, local[4]). local 表示本地运行,4 表示使用4个 cpu核 | :param appName: A name for your job, to display on the cluster web UI. | :param sparkHome: Location where Spark is installed on cluster nodes. | :param pyFiles: Collection of .zip or .py files to send to the cluster | and add to PYTHONPATH. These can be paths on the local file | system or HDFS, HTTP, HTTPS, or FTP URLs. | :param environment: A dictionary of environment variables to set on | worker nodes. | :param batchSize: The number of Python objects represented as a single | Java object. Set 1 to disable batching, 0 to automatically choose | the batch size based on object sizes, or -1 to use an unlimited | batch size | :param serializer: The serializer for RDDs. | :param conf: A L{SparkConf} object setting Spark properties. | :param gateway: Use an existing gateway and JVM, otherwise a new JVM | will be instantiated. | :param jsc: The JavaSparkContext instance (optional). | :param profiler_cls: A class of custom Profiler used to do profiling | (default is pyspark.profiler.BasicProfiler). | | | >>> from pyspark.context import SparkContext | >>> sc = SparkContext('local', 'test') | | >>> sc2 = SparkContext('local', 'test2') # doctest: +IGNORE_EXCEPTION_DETAIL | Traceback (most recent call last): | ... | ValueError:...'''
示例如下
from pyspark import SparkContext, SparkConf conf = SparkConf().setAppName('myapp1').setMaster('local[4]') # 设定 appname 和 master sc = SparkContext(conf=conf) ## 或者直接这样 sc = SparkContext("spark://hadoop10:7077")
创建 SparkSession
class SparkSession(__builtin__.object): def __init__(self, sparkContext, jsparkSession=None): ''' Creates a new SparkSession. | | >>> from datetime import datetime | >>> spark = SparkSession(sc) | >>> allTypes = sc.parallelize([Row(i=1, s="string", d=1.0, l=1, | ... b=True, list=[1, 2, 3], dict={"s": 0}, row=Row(a=1), | ... time=datetime(2014, 8, 1, 14, 1, 5))]) | >>> df = allTypes.toDF() | >>> df.createOrReplaceTempView("allTypes") | >>> spark.sql('select i+1, d+1, not b, list[1], dict["s"], time, row.a ' | ... 'from allTypes where b and i > 0').collect() | [Row((i + CAST(1 AS BIGINT))=2, (d + CAST(1 AS DOUBLE))=2.0, (NOT b)=False, list[1]=2, dict[s]=0, time=datetime.datetime(2014, 8, 1, 14, 1, 5), a=1)] | >>> df.rdd.map(lambda x: (x.i, x.s, x.d, x.l, x.b, x.time, x.row.a, x.list)).collect() | [(1, u'string', 1.0, 1, True, datetime.datetime(2014, 8, 1, 14, 1, 5), 1, [1, 2, 3])]'''
示例如下
from pyspark.sql import SparkSession conf = SparkConf().setAppName('myapp1').setMaster('local[4]') # 设定 appname 和 master sess = SparkSession.builder.config(conf=conf) ## 或者这样 from pyspark import SparkContext, SparkConf conf = SparkConf().setAppName('myapp1').setMaster('local[4]') # 设定 appname 和 master sc = SparkContext(conf=conf) sess = SparkSession(sc)
参考资料:
https://blog.csdn.net/qq_21383435/article/details/77371142 spark学习-SparkSQL--12-SparkSession与SparkContext
https://blog.csdn.net/cjhnbls/article/details/79254188