Apache Spark 的 diff()
函数用于计算两个 DataFrame 或 Dataset 之间的差异
- 使用
join()
函数将两个 DataFrame 或 Dataset 进行连接,然后使用withColumn()
函数创建一个新列,该列表示原始 DataFrame 或 Dataset 中的行与另一个 DataFrame 或 Dataset 中的行的差异。
from pyspark.sql import SparkSession from pyspark.sql.functions import col, when # 创建 Spark 会话 spark = SparkSession.builder \ .appName("Spark Diff Example") \ .getOrCreate() # 创建第一个 DataFrame data1 = [("A", 1), ("B", 2), ("C", 3)] columns1 = ["ID", "Value"] df1 = spark.createDataFrame(data1, columns1) # 创建第二个 DataFrame data2 = [("A", 1), ("B", 2), ("D", 4)] columns2 = ["ID", "Value"] df2 = spark.createDataFrame(data2, columns2) # 使用 join() 函数将两个 DataFrame 连接在一起,然后使用 withColumn() 函数创建一个新列,表示差异 df_diff = df1.join(df2, on=["ID"], how="outer").na.fill({"Value": float("inf")}).withColumn("Diff", when(col("Value1") != col("Value2"), 1).otherwise(0)) # 显示结果 df_diff.show()
在这个例子中,我们首先创建了两个 DataFrame df1
和 df2
。然后,我们使用 join()
函数将它们连接在一起,并通过 na.fill()
函数处理可能出现的空值。接下来,我们使用 withColumn()
函数创建一个新列 “Diff”,该列表示原始 DataFrame 中的行与另一个 DataFrame 中的行的差异。最后,我们使用 show()
函数显示结果。
- 使用
except()
或filter()
函数计算两个 DataFrame 或 Dataset 之间的差异。
from pyspark.sql import SparkSession from pyspark.sql.functions import col # 创建 Spark 会话 spark = SparkSession.builder \ .appName("Spark Diff Example") \ .getOrCreate() # 创建第一个 DataFrame data1 = [("A", 1), ("B", 2), ("C", 3)] columns1 = ["ID", "Value"] df1 = spark.createDataFrame(data1, columns1) # 创建第二个 DataFrame data2 = [("A", 1), ("B", 2), ("D", 4)] columns2 = ["ID", "Value"] df2 = spark.createDataFrame(data2, columns2) # 使用 except() 函数计算两个 DataFrame 之间的差异 df_diff1 = df1.except(df2) # 或者使用 filter() 函数计算两个 DataFrame 之间的差异 df_diff2 = df1.filter(~col("ID").isin(df2["ID"])) # 显示结果 df_diff1.show() df_diff2.show()
在这个例子中,我们首先创建了两个 DataFrame df1
和 df2
。然后,我们使用 except()
函数计算两个 DataFrame 之间的差异,得到一个新的 DataFrame df_diff1
。另外,我们还展示了如何使用 filter()
函数计算两个 DataFrame 之间的差异,得到一个新的 DataFrame df_diff2
。最后,我们使用 show()
函数显示结果。
这两种方法都可以用于处理多种数据,具体取决于你的需求和数据结构。