时间:2022-10-04 10:52:08 | 栏目:Python代码 | 点击:次
spark访问本地文件并执行运算时,可能会遇到权限问题或是dll错误。这是因为spark需要使用到Hadoop的winutils和hadoop.dll,首先我们必须配置好Hadoop相关的环境。可以到github下载:https://github.com/4ttty/winutils
gitcode提供了镜像加速:https://gitcode.net/mirrors/4ttty/winutils
我选择了使用这个仓库提供的最高的Hadoop版本3.0.0将其解压到D:\deploy\hadoop-3.0.0目录下,然后配置环境变量:
我们还需要将对应的hadoop.dll复制到系统中,用命令表达就是:
copy D:\deploy\hadoop-3.0.0\bin\hadoop.dll C:\Windows\System32
不过这步需要拥有管理员权限才可以操作。
为了能够在任何地方使用winutils命令工具,将%HADOOP_HOME%\bin
目录加入环境变量中:
首先,我们安装spark当前(2022-2-17)的最新版本:
pip install pyspark==3.2.1
需要注意pyspark的版本决定了jdk的最高版本,例如假如安装2.4.5版本的pyspark就只能安装1.8版本的jdk,否则会报出java.lang.IllegalArgumentException: Unsupported class file major version 55
的错误。
这是因为pyspark内置了Scala,而Scala是基于jvm的编程语言,Scala与jdk的版本存在兼容性问题,JDK与scala的版本兼容性表:
JDK version | Minimum Scala versions | Recommended Scala versions |
---|---|---|
17 | 2.13.6, 2.12.15 (forthcoming) | 2.13.6, 2.12.15 (forthcoming) |
16 | 2.13.5, 2.12.14 | 2.13.6, 2.12.14 |
13, 14, 15 | 2.13.2, 2.12.11 | 2.13.6, 2.12.14 |
12 | 2.13.1, 2.12.9 | 2.13.6, 2.12.14 |
11 | 2.13.0, 2.12.4, 2.11.12 | 2.13.6, 2.12.14, 2.11.12 |
8 | 2.13.0, 2.12.0, 2.11.0, 2.10.2 | 2.13.6, 2.12.14, 2.11.12, 2.10.7 |
6, 7 | 2.11.0, 2.10.0 | 2.11.12, 2.10.7 |
当前3.2.1版本的pyspark内置的Scala版本为2.12.15,意味着jdk17与其以下的所有版本都支持。
这里我依然选择安装jdk8的版本:
测试一下:
>java -version java version "1.8.0_201" Java(TM) SE Runtime Environment (build 1.8.0_201-b09) Java HotSpot(TM) 64-Bit Server VM (build 25.201-b09, mixed mode)
jdk11的详细安装教程(jdk1.8在官网只有安装包,无zip绿化压缩包):
绿化版Java11的环境配置与Python调用Java
https://xxmdmst.blog.csdn.net/article/details/118366166
pip安装当前最新的graphframes:
pip install graphframes==0.6
然后在官网下载graphframes的jar包。
下载地址:https://spark-packages.org/package/graphframes/graphframes
由于安装的pyspark版本是3.2,所以这里我选择了这个jar包:
然后将该jar包放入pyspark安装目录的jars目录下:
pyspark安装位置可以通过pip查看:
C:\Users\ASUS>pip show pyspark Name: pyspark Version: 3.2.1 Summary: Apache Spark Python API Home-page: https://github.com/apache/spark/tree/master/python Author: Spark Developers Author-email: dev@spark.apache.org License: http://www.apache.org/licenses/LICENSE-2.0 Location: d:\miniconda3\lib\site-packages Requires: py4j Required-by:
学习pyspark的最佳路径是官网:https://spark.apache.org/docs/latest/quick-start.html
在下面的网页,官方提供了在线jupyter:
https://spark.apache.org/docs/latest/api/python/getting_started/index.html
本地模式启动spark:
from pyspark.sql import SparkSession, Row spark = SparkSession \ .builder \ .appName("Python Spark") \ .master("local[*]") \ .getOrCreate() sc = spark.sparkContext spark
SparkSession输出的内容中包含了spark的web页面,新标签页打开页面后大致效果如上。
点击Environment选项卡可以查看当前环境中的变量:
找到pyspark的安装位置,例如我的电脑在D:\Miniconda3\Lib\site-packages\pyspark
手动创建conf目录并将hive-site.xml配置文件复制到其中。如果hive使用了MySQL作为原数据库,则还需要将MySQL对应的驱动jar包放入spark的jars目录下。
创建spark会话对象时可通过enableHiveSupport()
开启hive支持:
from pyspark.sql import SparkSession from pyspark.sql import Row spark = SparkSession \ .builder \ .appName("Python Spark SQL Hive integration example") \ .enableHiveSupport() \ .getOrCreate() sc = spark.sparkContext spark
spark访问hive自己创建的表有可能会出现如下的权限报错:
Caused by: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS s
hould be writable. Current permissions are: rwx------
是因为当前用户不具备对\tmp\hive的操作权限:
>winutils ls \tmp\hive drwx------ 1 BUILTIN\Administrators XIAOXIAOMING\None 0 May 4 2020 \tmp\hive
把\tmp\hive目录的权限改为777即可顺利访问:
>winutils chmod 777 \tmp\hive >winutils ls \tmp\hive drwxrwxrwx 1 BUILTIN\Administrators XIAOXIAOMING\None 0 May 4 2020 \tmp\hive
从spark2.x开始将RDD和DataFrame的API统一抽象成dataset,DataFrame就是Dataset[Row],RDD则是Dataset.rdd。可以将DataFrame理解为包含结构化信息的RDD。
将含row的RDD转换为DataFrame只需要调用toDF方法或SparkSession的createDataFrame方法即可,也可以传入schema覆盖类型或名称设置。
DataFrame默认支持DSL风格语法,例如:
//查看DataFrame中的内容 df.show() //查看DataFrame部分列中的内容 df.select(df['name'], df['age'] + 1).show() df.select("name").show() //打印DataFrame的Schema信息 df.printSchema() //过滤age大于等于 21 的 df.filter(df['age'] > 21).show() //按年龄进行分组并统计相同年龄的人数 personDF.groupBy("age").count().show()
将DataFrame注册成表或视图之后即可进行纯SQL操作:
df.createOrReplaceTempView("people") //df.createTempView("t_person") //查询年龄最大的前两名 spark.sql("select * from t_person order by age desc limit 2").show() //显示表的Schema信息 spark.sql("desc t_person").show()
Pyspark可以直接很方便的注册udf并直接使用:
strlen = spark.udf.register("len", lambda x: len(x)) print(spark.sql("SELECT len('test') length").collect()) print(spark.sql("SELECT 'foo' AS text").select(strlen("text").alias('length')).collect())
执行结果:
[Row(length='4')]
[Row(length='3')]
DataFrame的本质是对RDD的包装,可以理解为DataFrame=RDD[Row]+schema。
RDD(A Resilient Distributed Dataset)叫做弹性可伸缩分布式数据集,是Spark中最基本的数据抽象。它代表一个不可变、自动容错、可伸缩性、可分区、里面的元素可并行计算的集合。
在每一个RDD内部具有五大属性:
一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。
**一个计算每个分区的函数。**Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。
**RDD之间的依赖关系。**RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。
**一个Partitioner,即RDD的分片函数。**当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
**一个列表,存储存取每个Partition的优先位置(preferred location)。**对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。
RDD包含Transformation API和 Action API,Transformation API都是延迟加载的只是记住这些应用到基础数据集上的转换动作,只有当执行Action API时这些转换才会真正运行。
Transformation API产生的两类RDD最重要,分别是MapPartitionsRDD和ShuffledRDD。
产生MapPartitionsRDD的算子有map、keyBy、keys、values、flatMap、mapValues 、flatMapValues、mapPartitions、mapPartitionsWithIndex、glom、filter和filterByRange 。其中用的最多的是map和flatMap,但任何产生MapPartitionsRDD的算子都可以直接使用mapPartitions或mapPartitionsWithIndex实现。
产生ShuffledRDD的算子有combineByKeyWithClassTag、combineByKey、aggregateByKey、foldByKey 、reduceByKey 、distinct、groupByKey、groupBy、partitionBy、sortByKey 和 repartitionAndSortWithinPartitions。
combineByKey到groupByKey 底层均是调用combineByKeyWithClassTag方法:
@Experimental def combineByKeyWithClassTag[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope { combineByKeyWithClassTag(createCombiner,mergeValue,mergeCombiners ,defaultPartitioner(self)) } def combineByKey[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope { combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null) } def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope { val createCombiner = (v: V) => CompactBuffer(v) val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2 val bufs = combineByKeyWithClassTag[CompactBuffer[V]]( createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false) bufs.asInstanceOf[RDD[(K, Iterable[V])]] }
三个重要参数的含义:
groupByKey的partitioner未指定时会传入默认的defaultPartitioner。例如:
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2).keyBy(_.length) a.groupByKey.collect res9: Array[(Int, Iterable[String])] = Array((4,CompactBuffer(lion)), (6,CompactBuffer(spider)), (3,CompactBuffer(dog, cat)), (5,CompactBuffer(tiger, eagle)))
aggregateByKey:每个分区使用zeroValue作为初始值,迭代每一个元素用seqOp进行合并,对所有分区的结果用combOp进行合并。例如:
val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2) pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect res6: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6)) pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect res7: Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))
reduceByKey :每个分区迭代每一个元素用func进行合并,对所有分区的结果用func再进行合并,例如:
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val b = a.map(x => (x.length, x)) b.reduceByKey(_ + _).collect
Action API有:
动作 | 含义 |
---|---|
reduce(func) | 通过func函数聚集RDD中的所有元素,这个功能必须是课交换且可并联的 |
collect() | 在驱动程序中,以数组的形式返回数据集的所有元素 |
count() | 返回RDD的元素个数 |
first() | 返回RDD的第一个元素(类似于take(1)) |
take(n) | 返回一个由数据集的前n个元素组成的数组 |
takeSample(withReplacement*,*num, [seed]) | 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子 |
takeOrdered(n, [ordering]) | 排序并取前N个元素 |
saveAsTextFile(path) | 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本 |
saveAsSequenceFile(path) | 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。 |
saveAsObjectFile(path) | 将RDD中的元素用NullWritable作为key,实际元素作为value保存为sequencefile格式 |
countByKey() | 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。 |
foreach(func) | 在数据集的每一个元素上,运行函数func进行更新。 |
spark模拟实现mapreduce版wordcount:
object MapreduceWordcount { def main(args: Array[String]): Unit = { import org.apache.spark._ val sc: SparkContext = new SparkContext(new SparkConf().setAppName("wordcount").setMaster("local[*]")) sc.setLogLevel("WARN") import org.apache.hadoop.io.{LongWritable, Text} import org.apache.hadoop.mapred.TextInputFormat import org.apache.spark.rdd.HadoopRDD import scala.collection.mutable.ArrayBuffer def map(k: LongWritable, v: Text, collect: ArrayBuffer[(String, Int)]) = { for (word <- v.toString.split("\\s+")) collect += ((word, 1)) } def reduce(key: String, value: Iterator[Int], collect: ArrayBuffer[(String, Int)]) = { collect += ((key, value.sum)) } val rdd = sc.hadoopFile("/hdfs/wordcount/in1/*", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 2) .asInstanceOf[HadoopRDD[LongWritable, Text]] .mapPartitionsWithInputSplit((split, it) =>{ val collect: ArrayBuffer[(String, Int)] = ArrayBuffer[(String, Int)]() it.foreach(kv => map(kv._1, kv._2, collect)) collect.toIterator }) .repartitionAndSortWithinPartitions(new HashPartitioner(2)) .mapPartitions(it => { val collect: ArrayBuffer[(String, Int)] = ArrayBuffer[(String, Int)]() var lastKey: String = "" var values: ArrayBuffer[Int] = ArrayBuffer[Int]() for ((currKey, value) <- it) { if (!currKey.equals(lastKey)) { if (values.length != 0) reduce(lastKey, values.toIterator, collect) values.clear() } values += value lastKey = currKey } if (values.length != 0) reduce(lastKey, values.toIterator, collect) collect.toIterator }) rdd.foreach(println) } }
当我们需要给Datafream添加自增列时,可以使用zipWithUniqueId方法:
from pyspark.sql.types import StructType, LongType schema = data.schema.add(StructField("id", LongType())) rowRDD = data.rdd.zipWithUniqueId().map(lambda t: t[0]+Row(t[1])) data = rowRDD.toDF(schema) data.show()
API用法详情可参考:https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html#pyspark.RDD
RDD通过persist方法或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。
rdd.persist()
checkpoint的源码注释可以看到:
从中我们得知,在执行checkpoint方法时,最好同时,将该RDD缓存起来,否则,checkpoint也会产生一个计算任务。
sc.setCheckpointDir("checkpoint") rdd.cache() rdd.checkpoint()
GraphFrame是将Spark中的Graph算法统一到DataFrame接口的Graph操作接口,为Scala、Java和Python提供了统一的图处理API。
Graphframes是开源项目,源码工程如下:https://github.com/graphframes/graphframes
可以参考:
在GraphFrames中图的顶点(Vertex)和边(edge)都是以DataFrame形式存储的:
创建图的示例:
from graphframes import GraphFrame vertices = spark.createDataFrame([ ("a", "Alice", 34), ("b", "Bob", 36), ("c", "Charlie", 30), ("d", "David", 29), ("e", "Esther", 32), ("f", "Fanny", 36), ("g", "Gabby", 60)], ["id", "name", "age"]) edges = spark.createDataFrame([ ("a", "b", "friend"), ("b", "c", "follow"), ("c", "b", "follow"), ("f", "c", "follow"), ("e", "f", "follow"), ("e", "d", "friend"), ("d", "a", "friend"), ("a", "e", "friend") ], ["src", "dst", "relationship"]) # 生成图 g = GraphFrame(vertices, edges)
GraphFrame提供三种视图:
print("顶点表视图:") graph.vertices.show() # graph.vertices 就是原始的vertices print("边表视图:") graph.edges.show() # graph.edges 就是原始的 edges print("三元组视图:") graph.triplets.show()
获取顶点的度、入度和出度:
# 顶点的度 graph.degrees.show() # 顶点的入度 graph.inDegrees.show() # 顶点的出度 graph.outDegrees.show()
示例:
# 多个路径条件 motif = graph.find("(a)-[e]->(b); (b)-[e2]->(a)") # 在搜索的结果上进行过滤 motif.filter("b.age > 30") # 不需要返回路径中的元素时,可以使用匿名顶点和边 motif = graph.find("(start)-[]->()") # 设置路径不存在的条件 motif = graph.find("(a)-[]->(b); !(b)-[]->(a)")
假设我们要想给用户推荐关注的人,可以找出这样的关系:A关注B,B关注C,但是A未关注C。找出这样的关系就可以把C推荐给A:
# Motif: A->B->C but not A->C results = graph.find("(A)-[]->(B); (B)-[]->(C); !(A)-[]->(C)") # 排除自己 results = results.filter("A.id != C.id") # 选择需要的列 results = results.select(results.A.id.alias("A"), results.C.id.alias("C")) results.show()
结果:
+---+---+
| A| C|
+---+---+
| e| c|
| e| a|
| d| b|
| a| d|
| f| b|
| d| e|
| a| f|
| a| c|
+---+---+
Motif在查找路径过程的过程中,还可以沿着路径携带状态。例如我们想要找出关系链有4个顶点,而且其中3条边全部都是"friend"关系:
from pyspark.sql.functions import col, lit, when from functools import reduce chain4 = graph.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)") def sumFriends(cnt, relationship): "定义下一个顶点更新状态的条件:如果关系为friend则cnt+1" return when(relationship == "friend", cnt+1).otherwise(cnt) # 将更新方法应用到整个链的,链上每有一个关系是 friend 就加一,链上共三个关系。 condition = reduce(lambda cnt, e: sumFriends( cnt, col(e).relationship), ["ab", "bc", "cd"], lit(0)) chainWith2Friends2 = chain4.where(condition >= 3) chainWith2Friends2.show()
结果:
+---------------+--------------+---------------+--------------+---------------+--------------+---------------+
| a| ab| b| bc| c| cd| d|
+---------------+--------------+---------------+--------------+---------------+--------------+---------------+
| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|
|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, b, friend}| {b, Bob, 36}|
| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|
|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|
+---------------+--------------+---------------+--------------+---------------+--------------+---------------+
可以直接过滤其顶点或边,dropIsolatedVertices()
方法用于删除孤立没有连接的点:
graph.filterVertices("age > 30").filterEdges("relationship = 'friend'").dropIsolatedVertices()
还可以基于模式发现获取到的边创建Subgraphs :
paths = graph.find("(a)-[e]->(b)")\ .filter("e.relationship = 'follow'")\ .filter("a.age < b.age") # 抽取边信息e2 = paths.select("e.src", "e.dst", "e.relationship") e2 = paths.select("e.*") # 创建Subgraphs g2 = GraphFrame(graph.vertices, e2)
pageRank算法:
results = graph.pageRank(resetProbability=0.15, maxIter=10) results.vertices.sort("pagerank", ascending=False).show()
结果:
+---+-------+---+-------------------+
| id| name|age| pagerank|
+---+-------+---+-------------------+
| b| Bob| 36| 2.7025217677349773|
| c|Charlie| 30| 2.6667877057849627|
| a| Alice| 34| 0.4485115093698443|
| e| Esther| 32| 0.3613490987992571|
| f| Fanny| 36|0.32504910549694244|
| d| David| 29|0.32504910549694244|
| g| Gabby| 60|0.17073170731707318|
+---+-------+---+-------------------+
可以设置起始顶点:
graph.pageRank(resetProbability=0.15, maxIter=10, sourceId="a") graph.parallelPersonalizedPageRank(resetProbability=0.15, sourceIds=["a", "b", "c", "d"], maxIter=10)
广度优先搜索BFS:
搜索从姓名叫Esther到年龄小于32的最小路径:
paths = graph.bfs("name = 'Esther'", "age < 32") paths.show()
+--------------+--------------+---------------+ | from| e0| to| +--------------+--------------+---------------+ |{a, Alice, 34}|{a, e, friend}|{e, Esther, 32}| +--------------+--------------+---------------+
可以指定只能在指定的边搜索:
graph.bfs("name = 'Esther'", "age < 32", edgeFilter="relationship != 'friend'", maxPathLength=4 ).show()
+---------------+--------------+--------------+--------------+----------------+ | from| e0| v1| e1| to| +---------------+--------------+--------------+--------------+----------------+ |{e, Esther, 32}|{e, f, follow}|{f, Fanny, 36}|{f, c, follow}|{c, Charlie, 30}| +---------------+--------------+--------------+--------------+----------------+
Connected components 连通组件:
必须先设置检查点:
sc.setCheckpointDir("checkpoint") graph.connectedComponents().show()
结果:
+---+-------+---+------------+
| id| name|age| component|
+---+-------+---+------------+
| a| Alice| 34|412316860416|
| b| Bob| 36|412316860416|
| c|Charlie| 30|412316860416|
| d| David| 29|412316860416|
| e| Esther| 32|412316860416|
| f| Fanny| 36|412316860416|
| g| Gabby| 60|146028888064|
+---+-------+---+------------+
可以看到仅g点在一个连通区域内,可以调用dropIsolatedVertices()
方法,删除这种孤立的没有连接的点:
graph.dropIsolatedVertices().connectedComponents().show()
结果:
+---+-------+---+------------+
| id| name|age| component|
+---+-------+---+------------+
| a| Alice| 34|412316860416|
| b| Bob| 36|412316860416|
| c|Charlie| 30|412316860416|
| d| David| 29|412316860416|
| e| Esther| 32|412316860416|
| f| Fanny| 36|412316860416|
+---+-------+---+------------+
Strongly connected components 强连通组件:
graph.stronglyConnectedComponents(maxIter=10).show()
Shortest paths 最短路径:
每个顶点到a或d的最短路径:
graph.shortestPaths(landmarks=["a", "d"]).show()
+---+-------+---+----------------+ | id| name|age| distances| +---+-------+---+----------------+ | g| Gabby| 60| {}| | f| Fanny| 36| {}| | e| Esther| 32|{a -> 2, d -> 1}| | d| David| 29|{a -> 1, d -> 0}| | c|Charlie| 30| {}| | b| Bob| 36| {}| | a| Alice| 34|{a -> 0, d -> 2}| +---+-------+---+----------------+
Triangle count 三角形计数:
graph.triangleCount().show()
+-----+---+-------+---+ |count| id| name|age| +-----+---+-------+---+ | 1| a| Alice| 34| | 0| b| Bob| 36| | 0| c|Charlie| 30| | 1| d| David| 29| | 1| e| Esther| 32| | 0| g| Gabby| 60| | 0| f| Fanny| 36| +-----+---+-------+---+
说明顶点a/e/d构成三角形。
标签传播算法(LPA):
graph.labelPropagation(maxIter=5).orderBy("label").show()
+---+-------+---+-------------+ | id| name|age| label| +---+-------+---+-------------+ | g| Gabby| 60| 146028888064| | f| Fanny| 36|1047972020224| | b| Bob| 36|1047972020224| | a| Alice| 34|1382979469312| | c|Charlie| 30|1382979469312| | e| Esther| 32|1460288880640| | d| David| 29|1460288880640| +---+-------+---+-------------+
Pyspark从3.0版本开始出现了pandas_udf装饰器、applyInPandas和mapInPandas,基于这些方法,我们就可以使用熟悉的pandas的语法处理spark对象的数据。
首先创建几条测试数据,并启动 Apache Arrow:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") df = spark.createDataFrame( [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")) df.show()
pyspark暂不支持自定义UDTF。
使用pandas_udf装饰器我们可以创建出基于pandas的udf自定义函数,在DSL的语法中可以被直接使用:
from pyspark.sql.functions import pandas_udf import pandas as pd @pandas_udf("double") def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series: return a * b df.select(multiply_func("id", "v").alias("product")).show()
注册函数和视图后,可以直接在SQL中使用:
df.createOrReplaceTempView("t") spark.udf.register("multiply", multiply_func) spark.sql('select multiply(id, v) product from t').show()
结果均为:
+-------+
|product|
+-------+
| 1.0|
| 2.0|
| 6.0|
| 10.0|
| 20.0|
+-------+
还支持聚合函数和窗口函数:
from pyspark.sql import Window @pandas_udf("double") def mean_udf(v: pd.Series) -> float: return v.mean() # 对字段'v'进行求均值 df.select(mean_udf('v').alias("mean_v")).show() # 按照‘id'分组,求'v'的均值 df.groupby("id").agg(mean_udf('v').alias("mean_v")).show() # 按照‘id'分组,求'v'的均值,并赋值给新的一列 df.withColumn('mean_v', mean_udf("v").over(Window.partitionBy('id'))).show()
注册到udf之后同样可以直接使用SQL实现:
spark.udf.register("mean2", mean_udf) spark.sql('select mean2(v) mean_v from t').show() spark.sql('select id,mean2(v) mean_v from t group by id').show() spark.sql('select id,v,mean2(v) over(partition by id) mean_v from t').show()
结果均为:
+--------+
| mean_v |
+--------+
| 4.2|
+--------++---+--------+
| id| mean_v |
+---+--------+
| 1| 1.5|
| 2| 6.0|
+---+--------++---+----+------+
| id| v|mean_v|
+---+----+------+
| 1| 1.0| 1.5|
| 1| 2.0| 1.5|
| 2| 3.0| 6.0|
| 2| 5.0| 6.0|
| 2|10.0| 6.0|
+---+----+------+
applyInPandas需要在datafream调用groupby之后才能使用:
def subtract_mean(pdf): v = pdf.v pdf['v1'] = v - v.mean() pdf['v2'] = v + v.mean() return pdf t = df.groupby("id") t.applyInPandas( subtract_mean, schema="id long, v double, v1 double, v2 double").show()
结果:
+---+----+----+----+
| id| v| v1| v2|
+---+----+----+----+
| 1| 1.0|-0.5| 2.5|
| 1| 2.0| 0.5| 3.5|
| 2| 3.0|-3.0| 9.0|
| 2| 5.0|-1.0|11.0|
| 2|10.0| 4.0|16.0|
+---+----+----+----+
subtract_mean函数接收的是对应id的dataframe数据,schema指定了返回值的名称和类型列表。
通过以下代码我们可以知道,applyInPandas可以借助cogroup进行表连接:
val a = sc.parallelize(List(1, 2, 1, 3)) val b = a.map((_, "b")) val c = a.map((_, "c")) val d = a.map((_, "d")) val e = a.map((_, "e")) scala> b.cogroup(c).foreach(println) (3,(CompactBuffer(b),CompactBuffer(c))) (1,(CompactBuffer(b, b),CompactBuffer(c, c))) (2,(CompactBuffer(b),CompactBuffer(c)))
示例:
df1 = spark.createDataFrame( [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)], ("time", "id", "v1")) df2 = spark.createDataFrame( [(20000101, 1, "x"), (20000101, 2, "y")], ("time", "id", "v2")) def asof_join(l, r): # l、r is a pandas.DataFrame # 这里是按照id分组 # 那么,l和r分别是对应id的df1和df2数据 return pd.merge_asof(l, r, on="time", by="id") df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas( asof_join, schema="time int, id int, v1 double, v2 string").show() # +--------+---+---+---+ # | time| id| v1| v2| # +--------+---+---+---+ # |20000101| 1|1.0| x| # |20000102| 1|3.0| x| # |20000101| 2|2.0| y| # |20000102| 2|4.0| y| # +--------+---+---+---+
执行以下代码:
def filter_func(iterator): for i, pdf in enumerate(iterator): print(i, pdf.values.tolist()) yield pdf df.mapInPandas(filter_func, schema=df.schema).show()
后台看到执行结果为:
0 [[2.0, 5.0]]
0 [[2.0, 3.0]]
0 [[1.0, 1.0]]
0 [[1.0, 2.0]]
0 [[2.0, 10.0]]
前台结果几乎保持原样。可以知道iterator是一个分区迭代器,迭代出当前分区的每一行数据都被封装成一个pandas对象。
将spark的Datafream对象转换为原生的pandas对象只需调用toPandas()方法即可:
sdf.toPandas()
将原生的pandas对象转换为spark对象可以使用spark的顶级方法:
spark.createDataFrame(pdf)
习惯使用pandas的童鞋,还可以直接使用pandas-on-Spark,在spark3.2.0版本时已经匹配到pandas 1.3版本的API。通过pandas-on-Spark,我们可以完全用pandas的api操作数据,而底层执行却是spark的并行化。
使用pandas-on-Spark最好设置一下环境变量:
import os os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
将spark对象转换为pandas-on-Spark对象:
df = spark.createDataFrame( [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")) pdf = df.to_pandas_on_spark() print(type(pdf)) pdf
pandas-on-Spark对象也可以还原成spark对象:
pdf.to_spark()
另外spark提供直接将文件读取成pandas-on-Spark对象的api,例如:
import pyspark.pandas as ps pdf = ps.read_csv("example_csv.csv")
ps对象与原生pandas对象的API几乎完全一致。
ps对象相对于原生pandas对象的API几乎一致,同时还支持一些强悍的功能,例如直接以SQL形式访问:
ps.sql("SELECT count(*) as num FROM {pdf}")
{pdf}访问了变量名为pdf的pandas-on-Spark对象。