📜  将 PySpark RDD 转换为 DataFrame(1)

📅  最后修改于: 2023-12-03 15:09:32.521000             🧑  作者: Mango

将 PySpark RDD 转换为 DataFrame

在 PySpark 中,RDD(Resilient Distributed Datasets)是最基本的数据处理单位。但是,由于 RDD 是强类型的数据集合,它不适合处理结构化数据,而 DataFrame 很适合处理结构化数据。因此,我们需要将 RDD 转换为 DataFrame。本文将介绍如何将 PySpark RDD 转换为 DataFrame。

1. PySpark RDD 转换为 PySpark DataFrame

在 PySpark 中,我们可以将 RDD 转换为 DataFrame。我们可以使用 toDF() 方法将 RDD 转换为 DataFrame。

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("convertRDDtoDF").getOrCreate()

# 创建一个 RDD
rdd = spark.sparkContext.parallelize([(1, "John Doe", 22), (2, "Mike Smith", 33), (3, "Mary Johnson", 44)])

# 将 RDD 转换为 DataFrame
df = rdd.toDF(["id", "name", "age"])

# 显示 DataFrame
df.show()

输出:

+---+-------------+---+
| id|         name|age|
+---+-------------+---+
|  1|      John Doe| 22|
|  2|    Mike Smith| 33|
|  3| Mary Johnson| 44|
+---+-------------+---+
2. PySpark RDD 转换为 PySpark DataFrame Schema

如果你想更明确地指定 DataFrame 的 Schema,可以使用 createDataFrame() 方法。

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

spark = SparkSession.builder.appName("convertRDDtoDFwithSchema").getOrCreate()

# 创建一个 RDD
rdd = spark.sparkContext.parallelize([(1, "John Doe", 22), (2, "Mike Smith", 33), (3, "Mary Johnson", 44)])

# 定义 DataFrame 成员的类型
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

# 将 RDD 转换为 DataFrame
df = spark.createDataFrame(rdd, schema)

# 显示 DataFrame
df.show()

输出:

+---+-------------+---+
| id|         name|age|
+---+-------------+---+
|  1|      John Doe| 22|
|  2|    Mike Smith| 33|
|  3| Mary Johnson| 44|
+---+-------------+---+
3. PySpark RDD 转换为 PySpark DataFrame 时处理缺失值

在将 RDD 转换为 DataFrame 时,如果 RDD 中存在缺失值,则我们需要使用 map() 方法将缺失值转换为 None。

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

spark = SparkSession.builder.appName("convertRDDtoDFwithMissingValues").getOrCreate()

# 创建一个 RDD,它包含缺失值
rdd_with_missing_values = spark.sparkContext.parallelize([(1, "John Doe", None), (2, None, 33), (3, "Mary Johnson", 44)])

# 将缺失值转换为 None
rdd_with_missing_values = rdd_with_missing_values.map(lambda x: (x[0], x[1] if x[1] else None, x[2] if x[2] else None))

# 定义 DataFrame 成员的类型
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

# 将 RDD 转换为 DataFrame
df = spark.createDataFrame(rdd_with_missing_values, schema)

# 显示 DataFrame
df.show()

输出:

+---+-------------+----+
| id|         name| age|
+---+-------------+----+
|  1|      John Doe|null|
|  2|         null|  33|
|  3| Mary Johnson|  44|
+---+-------------+----+
结语

通过本文,您学会了如何将 PySpark RDD 转换为 PySpark DataFrame。您还学会了如何处理缺失值。现在,您可以更轻松地使用 PySpark 处理结构化数据了。