📜  将 PySpark DataFrame 列转换为Python列表(1)

📅  最后修改于: 2023-12-03 15:25:14.662000             🧑  作者: Mango

将 PySpark DataFrame 列转换为 Python 列表

在 PySpark 中,DataFrame 是一个分布式的数据集合。它类似于关系型数据库中的表,但是分布式存储在不同的节点上,可以并行处理数据。有时候,我们需要将 DataFrame 中的某一列转换为 Python 列表进行后续的数据处理。以下是 PySpark DataFrame 列转换为 Python 列表的几种方式。

使用 collect() 和 map() 函数

如果 DataFrame 中的数据不是太大,可以使用 collect() 函数获取所有数据,再使用 map() 函数对某一列进行转换。

from pyspark.sql.functions import col

# 创建 DataFrame
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3)]
df = spark.createDataFrame(data, ['Name', 'ID'])

# 将 ID 列转换为 Python 列表
id_list = df.select(col('ID')).rdd.map(lambda x: x[0]).collect()

print(id_list)
# 输出: [1, 2, 3]

这里使用 select() 函数选择需要转换的列,然后使用 rdd 将 DataFrame 转换为 RDD,再使用 map() 函数将每个行数据中的第一个元素(即 ID 列)提取出来,最后使用 collect() 函数将数据收集到本地。

需要注意的是,当 DataFrame 中的数据较大时,这种方式会产生数据倾斜或者内存不足等问题,因此需要慎重使用。

使用 toPandas() 函数

如果 DataFrame 中的数据不是特别大,但是使用上述方法存在性能问题,可以考虑使用 toPandas() 函数将 DataFrame 转换为 Pandas DataFrame,再使用 Pandas 的方法进行转换。

import pandas as pd

# 创建 DataFrame
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3)]
df = spark.createDataFrame(data, ["Name", "ID"])

# 将 ID 列转换为 Python 列表
id_list = pd.DataFrame(df.select(col("ID")).collect(), columns=["ID"])["ID"].tolist()

print(id_list)
# 输出: [1, 2, 3]

这里使用 select() 函数选择需要转换的列,然后使用 collect() 函数将数据收集到本地,并将其转换为 Pandas DataFrame。最后,使用 Pandas 的方法将数据转换为 Python 列表。

需要注意的是,这种方式会将数据收集到 Driver 端,如果数据量过大会导致内存不足问题。

使用 toLocalIterator() 函数

如果 DataFrame 中的数据非常大,并且无法使用上述两种方式进行转换,可以考虑使用 toLocalIterator() 函数逐行读取数据。

# 创建 DataFrame
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3)]
df = spark.createDataFrame(data, ["Name", "ID"])

# 将 ID 列转换为 Python 列表
id_list = []
for row in df.select(col("ID")).toLocalIterator():
  id_list.append(row[0])

print(id_list)
# 输出: [1, 2, 3]

这里使用 select() 函数选择需要转换的列,然后使用 toLocalIterator() 函数逐行读取数据,并将每行数据中的第一个元素(即 ID 列)添加到列表中。

需要注意的是,这种方式会逐行读取数据,如果数据量过大会导致性能问题,因此需要慎重使用。

以上是几种将 PySpark DataFrame 列转换为 Python 列表的方式。需要根据具体的情况选择合适的方法。