Spark 编程指南 (二) [Spark Programming Guide] | logging.DEBUG 

JerryXia 发表于 , 阅读 (0)

Python Programming Guide - Spark


弹性分布式数据集 (RDDs)

Spark的核心概念是弹性分布式数据集—Resilient Distributed Datasets,这是一个具有容错能力并且可以进行并行计算的元素集合

对于RDD的基本概念,在 Spark 编程指南 (一) [Spark Programming Guide] 中有详细介绍

RDD的创建

用户可以通过两种方式创建RDD:

  • 并行化(Parallelizing)一个已经存在与驱动程序(Driver Program)中的集合(Collection),如set、list

  • 引用外部存储系统上的一个数据集,比如HDFS、HBase,或者任何提供了Hadoop InputFormat的数据源

并行集合(Parallelized Collections)

并行集合是在驱动程序中,由SparkContext’s parallelize方法从一个已经存在的迭代器或者集合中创建,集合中的元素会被复制到一个可以进行并行操作的分布式数据集中

例如:如下代码演示如何创建一个元素为1到5的并行数据集

1
2
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

这个数据集一旦创建,就可以被并行的操作,例如用下代码就可以对上面列表中元素进行叠加

1
distData.reduce(lambda a, b: a + b)

在并行集合中有一个重要的参数—分片数,表示数据集的切分片数;Spark会在集群中为每个分片启动一个任务(task),通常情况下你希望集群中的每个CPU都有2—4个分片,但Spark会根据集群情况自动分配分片数;然而,你也可以通过第二个参数手动设置分片数

1
sc.parallelize(data, 10)

外部数据集(External Datasets)

PySpark可以从Hadoop所支持的任何存储数据源中构建出分布式数据集,包括本地文件系统、HDFS、Cassandra、HBase、Amazon S3,Spark支持text files、SequenceFiles和任何Hadoop InputFormat

Text file RDDs可以通过SparkContext’s textFile方法创建,这个方法接收一个URI文件地址作为参数(或者是一个本地路径、hdfs://,s3n://等),并读取文件作为行的集合,下面是一个调用实例:

1
distFile = sc.textFile("data.txt")

一旦创建完成,distFile就可以执行数据集的相关操作。例如:要对文件中的所有行进行求和,就可以用map和reduce操作

1
distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)

Spark读取文件时的一些注意事项:

  • 如果用本地文件系统,该文件必须在其工作节点上的相同目录下也可以访问。也可以将文件拷贝到所有的workers节点上,或者使用network-mounted共享文件系统

  • Spark中所有基于文件的输入方法,包括textFile,都支持在目录上运行,压缩文件和通配符。如:可以使用 textFile(“/my/directory”), textFile(“/my/directory/.txt”), 和 textFile(“/my/directory/.gz”)

  • textFile方法也带有第二个可选参数,其作用是控制文件的分片数。默认情况下,Spark会为文件的每一个block(在HDFS中,block的默认大小为64MB)创建一个分片,或者你也可以通过传入更大的值,来设置更高的分片数,但要注意,你设置的分片数不能比文件的块数小

除了text files,Spark的Python API还支持其他的数据格式:

  • SparkContext.wholeTextFiles 可以让你读取包含多个小text files的目录,并且对每一个文件返回这样的元祖对(filename, content),而对于对应的textFile,文件的每一行对应着一条上述所说的返回元祖对

  • RDD.saveAsPickleFile 和 SparkContext.pickleFile支持将RDD保存成由pickled Python对象组成的简单格式,使用批处理的方式对pickle的对象进行序列化,默认的处理批次是10

  • SequenceFile 和 Hadoop Input/Output 的格式

注意:这个功能目前属于实验性质的,为高级用户而提供。在将来的版本中,可能会因为支持Spark SQL的读写而被取代,且Spark SQL的读写是首选方法

Writable支持

PySpark的SequenceFile支持加载Java中的键值对RDD(key-value),将Writable转换为基本的Java类型,并且通过Pyrolite在结果Java对象上执行pickles序列化操作。当将一个键值对的RDD保存为SequenceFIle时,PySpark会对其进行反操作。它会unpickles Python的对象为Java对象,然后再将它们转换为Writables。

下表中的Writables会被自动地转换: