📜  将数据附加到 PySpark 中的空数据框

📅  最后修改于: 2022-05-13 01:55:12.536000             🧑  作者: Mango

将数据附加到 PySpark 中的空数据框

在本文中,我们将了解如何使用Python编程语言将数据附加到 PySpark 中的空 DataFrame。

方法一:做一个空的DataFrame,和一个非空的具有相同schema的DataFrame做一个union

union()函数对于这个操作是最重要的。它用于混合具有相同列模式的两个 DataFrame。

例子:

在这个例子中,我们创建了一个具有特定模式的 DataFrame,并且 data 创建了一个具有相同方案的 EMPTY DataFrame,并使用Python语言中的 union()函数对这两个 DataFrame 进行联合。

Python
# Importing PySpark and the SparkSession
# DataType functionality
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
 
# Creating a spark session
spark_session = SparkSession.builder.appName(
    'Spark_Session').getOrCreate()
 
# Creating an empty RDD to make a
# DataFrame with no data
emp_RDD = spark_session.sparkContext.emptyRDD()
 
# Defining the schema of the DataFrame
columns1 = StructType([StructField('Name', StringType(), False),
                       StructField('Salary', IntegerType(), False)])
 
# Creating an empty DataFrame
first_df = spark_session.createDataFrame(data=emp_RDD,
                                         schema=columns1)
 
# Printing the DataFrame with no data
first_df.show()
 
# Hardcoded data for the second DataFrame
rows = [['Ajay', 56000], ['Srikanth', 89078],
        ['Reddy', 76890], ['Gursaidutt', 98023]]
columns = ['Name', 'Salary']
 
# Creating the DataFrame
second_df = spark_session.createDataFrame(rows, columns)
 
# Printing the non-empty DataFrame
second_df.show()
 
# Storing the union of first_df and
# second_df in first_df
first_df = first_df.union(second_df)
 
# Our first DataFrame that was empty,
# now has data
first_df.show()


Python
# Importing PySpark and the SparkSession,
# DataType functionality
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
 
# Creating a spark session
spark_session = SparkSession.builder.appName(
    'Spark_Session').getOrCreate()
 
# Creating an empty RDD to make a DataFrame
# with no data
emp_RDD = spark_session.sparkContext.emptyRDD()
 
# Defining the schema of the DataFrame
columns = StructType([StructField('Stadium', StringType(), False),
                      StructField('Capacity', IntegerType(), False)])
 
# Creating an empty DataFrame
df = spark_session.createDataFrame(data=emp_RDD,
                                   schema=columns)
 
# Printing the DataFrame with no data
df.show()
 
# Hardcoded row for the second DataFrame
added_row = [['Motera Stadium', 132000]]
 
# Creating the DataFrame
added_df = spark_session.createDataFrame(added_row, columns)
 
# Storing the union of first_df and second_df
# in first_df
df = df.union(added_df)
 
# Our first DataFrame that was empty,
# now has data
df.show()


Python
# Importing PySpark and the SparkSession,
# DataType functionality
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
 
# Creating a spark session
spark_session = SparkSession.builder.appName(
    'Spark_Session').getOrCreate()
 
# Creating an empty RDD to make a DataFrame
# with no data
emp_RDD = spark_session.sparkContext.emptyRDD()
 
# Defining the schema of the DataFrame
columns = StructType([StructField('Stadium', StringType(), False),
                      StructField('Capacity', IntegerType(), False)])
 
# Creating an empty DataFrame
df = spark_session.createDataFrame(data=emp_RDD,
                                   schema=columns)
 
# Printing the DataFrame with no data
df.show()
 
# Hardcoded row for the second DataFrame
added_row = [['Motera Stadium', 132000]]
 
# Creating the DataFrame whose data
# needs to be added
added_df = spark_session.createDataFrame(added_row,
                                         columns)
 
# converting our PySpark DataFrames to
# Pandas DataFrames
pandas_added = added_df.toPandas()
df = df.toPandas()
 
# using append() function to add the data
df = df.append(pandas_added, ignore_index=True)
 
# reconverting our DataFrame back
# to a PySpark DataFrame
df = spark_session.createDataFrame(df)
 
# Printing resultant DataFrame
df.show()


输出 :

+----+------+
|Name|Salary|
+----+------+
+----+------+

+----------+------+
|      Name|Salary|
+----------+------+
|      Ajay| 56000|
|  Srikanth| 89078|
|     Reddy| 76890|
|Gursaidutt| 98023|
+----------+------+

+----------+------+
|      Name|Salary|
+----------+------+
|      Ajay| 56000|
|  Srikanth| 89078|
|     Reddy| 76890|
|Gursaidutt| 98023|
+----------+------+

方法2:通过将行转换为DataFrame,将单行添加到空DataFrame

我们可以使用createDataFrame()以Python列表的形式转换单行。 createDataFrame() 的详细信息是:

例子:

在这个例子中,我们创建一个具有特定模式和单行的 DataFrame,并使用createDataFrame()创建一个具有相同模式的 EMPTY DataFrame,使用 union()函数将这两个 DataFrame 合并,进一步将上述结果存储在前面的空中DataFrame 并使用show()查看更改。

Python

# Importing PySpark and the SparkSession,
# DataType functionality
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
 
# Creating a spark session
spark_session = SparkSession.builder.appName(
    'Spark_Session').getOrCreate()
 
# Creating an empty RDD to make a DataFrame
# with no data
emp_RDD = spark_session.sparkContext.emptyRDD()
 
# Defining the schema of the DataFrame
columns = StructType([StructField('Stadium', StringType(), False),
                      StructField('Capacity', IntegerType(), False)])
 
# Creating an empty DataFrame
df = spark_session.createDataFrame(data=emp_RDD,
                                   schema=columns)
 
# Printing the DataFrame with no data
df.show()
 
# Hardcoded row for the second DataFrame
added_row = [['Motera Stadium', 132000]]
 
# Creating the DataFrame
added_df = spark_session.createDataFrame(added_row, columns)
 
# Storing the union of first_df and second_df
# in first_df
df = df.union(added_df)
 
# Our first DataFrame that was empty,
# now has data
df.show()

输出 :

+-------+--------+
|Stadium|Capacity|
+-------+--------+
+-------+--------+

+--------------+--------+
|       Stadium|Capacity|
+--------------+--------+
|Motera Stadium|  132000|
+--------------+--------+

方法 3:将空 DataFrame 转换为 Pandas DataFrame 并使用 append()函数

我们将使用toPandas()将 PySpark DataFrame 转换为 Pandas DataFrame。它的语法是:

然后我们将使用 Pandas 的append()函数。它的语法是:

例子:

这里我们创建一个要添加数据的空 DataFrame,然后我们使用createDataFrame()将要添加的数据转换为 Spark DataFrame,并使用toPandas()进一步将两个 DataFrame 转换为 Pandas DataFrame,并使用append()函数将非空数据帧添加到空数据帧并忽略索引,因为我们正在获得一个新的数据帧。最后,我们使用createDataFrame() 将最终的 Pandas 数据帧转换为 Spark 数据帧。

Python

# Importing PySpark and the SparkSession,
# DataType functionality
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
 
# Creating a spark session
spark_session = SparkSession.builder.appName(
    'Spark_Session').getOrCreate()
 
# Creating an empty RDD to make a DataFrame
# with no data
emp_RDD = spark_session.sparkContext.emptyRDD()
 
# Defining the schema of the DataFrame
columns = StructType([StructField('Stadium', StringType(), False),
                      StructField('Capacity', IntegerType(), False)])
 
# Creating an empty DataFrame
df = spark_session.createDataFrame(data=emp_RDD,
                                   schema=columns)
 
# Printing the DataFrame with no data
df.show()
 
# Hardcoded row for the second DataFrame
added_row = [['Motera Stadium', 132000]]
 
# Creating the DataFrame whose data
# needs to be added
added_df = spark_session.createDataFrame(added_row,
                                         columns)
 
# converting our PySpark DataFrames to
# Pandas DataFrames
pandas_added = added_df.toPandas()
df = df.toPandas()
 
# using append() function to add the data
df = df.append(pandas_added, ignore_index=True)
 
# reconverting our DataFrame back
# to a PySpark DataFrame
df = spark_session.createDataFrame(df)
 
# Printing resultant DataFrame
df.show()

输出 :

+-------+--------+
|Stadium|Capacity|
+-------+--------+
+-------+--------+

+--------------+--------+
|       Stadium|Capacity|
+--------------+--------+
|Motera Stadium|  132000|
+--------------+--------+