📜  创建 PySpark 数据帧

📅  最后修改于: 2022-05-13 01:55:30.081000             🧑  作者: Mango

创建 PySpark 数据帧

在本文中,我们将学习如何创建 PySpark DataFrame。 PySpark 应用程序从初始化SparkSession开始,它是 PySpark 的入口点,如下所示。

注意: PySpark shell 通过 pyspark 可执行文件,在变量 spark 中为用户自动创建会话。因此,您还将使用 shell 运行它。

创建 PySpark 数据帧

PySpark DataFrame 通常通过pyspark.sql.SparkSession.createDataFrame创建。我们将通过 pyspark.sql.SparkSession.createDataFrame 创建 PySpark DataFrame 的方法。 pyspark.sql.SparkSession.createDataFrame使用 schema 参数来指定 DataFrame 的架构。当它被省略时,PySpark 通过从数据中抽取样本来推断相应的模式。

句法

下面有不同的方法来创建 PySpark DataFrame:

从行清单创建 PySpark DataFrame

在给出的实现中,我们将使用行清单创建 pyspark 数据框。为此,我们为每一行中的每个变量(特征)提供值并添加到数据框对象中。完成此操作后,我们将显示数据框和模式。

Python3
# Need to import to use date time
from datetime import datetime, date
  
# need to import for working with pandas
import pandas as pd
  
# need to import to use pyspark
from pyspark.sql import Row
  
# need to import for session creation
from pyspark.sql import SparkSession
  
# creating the session
spark = SparkSession.builder.getOrCreate()
  
# schema creation by passing list
df = spark.createDataFrame([
    Row(a=1, b=4., c='GFG1', d=date(2000, 8, 1),
        e=datetime(2000, 8, 1, 12, 0)),
    
    Row(a=2, b=8., c='GFG2', d=date(2000, 6, 2), 
        e=datetime(2000, 6, 2, 12, 0)),
    
    Row(a=4, b=5., c='GFG3', d=date(2000, 5, 3),
        e=datetime(2000, 5, 3, 12, 0))
])
  
# show table
df.show()
  
# show schema
df.printSchema()


Python3
# Need to import to use date time
from datetime import datetime, date
  
# need to import for working with pandas
import pandas as pd
  
# need to import to use pyspark
from pyspark.sql import Row
  
# need to import for session creation
from pyspark.sql import SparkSession
  
# creating the session
spark = SparkSession.builder.getOrCreate()
  
# PySpark DataFrame with Ecplicit Schema
df = spark.createDataFrame([
    (1, 4., 'GFG1', date(2000, 8, 1), 
     datetime(2000, 8, 1, 12, 0)),
    
    (2, 8., 'GFG2', date(2000, 6, 2), 
     datetime(2000, 6, 2, 12, 0)),
    
    (3, 5., 'GFG3', date(2000, 5, 3), 
     datetime(2000, 5, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
  
# show table
df.show()
  
# show schema 
df.printSchema()


Python3
# Need to import to use date time
from datetime import datetime, date
  
# need to import for working with pandas
import pandas as pd
  
# need to import to use pyspark
from pyspark.sql import Row
  
# need to import for session creation
from pyspark.sql import SparkSession
  
# creating the session
spark = SparkSession.builder.getOrCreate()
  
## PySpark DataFrame from a pandas DataFrame
pandas_df = pd.DataFrame({
    'a': [1, 2, 3],
    
    'b': [4., 8., 5.],
    
    'c': ['GFG1', 'GFG2', 'GFG3'],
    
    'd': [date(2000, 8, 1), date(2000, 6, 2),
          date(2000, 5, 3)],
    
    'e': [datetime(2000, 8, 1, 12, 0), 
          datetime(2000, 6, 2, 12, 0), 
          datetime(2000, 5, 3, 12, 0)]
})
  
df = spark.createDataFrame(pandas_df)
df
  
# show table
df.show()
  
# show schema 
df.printSchema()


Python3
# Need to import to use date time
from datetime import datetime, date
  
# need to import for working with pandas
import pandas as pd
  
# need to import to use pyspark
from pyspark.sql import Row
  
# need to import for session creation
from pyspark.sql import SparkSession
  
# creating the session
spark = SparkSession.builder.getOrCreate()
  
# pyspark dataframe
rdd = spark.sparkContext.parallelize([
    (1, 4., 'GFG1', date(2000, 8, 1), datetime(2000, 8, 1, 12, 0)),
    (2, 8., 'GFG2', date(2000, 6, 2), datetime(2000, 6, 2, 12, 0)),
    (3, 5., 'GFG3', date(2000, 5, 3), datetime(2000, 5, 3, 12, 0))
])
df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])
df
  
# show table
df.show()
  
# show schema
df.printSchema()


Python3
# Need to import to use date time
from datetime import datetime, date
  
# need to import for working with pandas
import pandas as pd
  
# need to import to use pyspark
from pyspark.sql import Row
  
# need to import for session creation
from pyspark.sql import SparkSession
  
# creating the session
spark = SparkSession.builder.getOrCreate()
  
# PySpark DataFrame from a csv
df = spark.createDataFrame(pd.read_csv('data.csv'))
df
  
# show table
df.show()
  
# show schema
df.printSchema()


Python3
# Need to import to use date time
from datetime import datetime, date
  
# need to import for working with pandas
import pandas as pd
  
# need to import to use pyspark
from pyspark.sql import Row
  
# need to import for session creation
from pyspark.sql import SparkSession
  
# creating the session
spark = SparkSession.builder.getOrCreate()
  
# PySpark DataFrame from a csv
df = spark.createDataFrame(pd.read_csv('data.txt', delimiter="\t"))
df
  
# show table
df.show()
  
# show schema
df.printSchema()


Python3
# Need to import to use date time
from datetime import datetime, date
  
# need to import for working with pandas
import pandas as pd
  
# need to import to use pyspark
from pyspark.sql import Row
  
# need to import for session creation
from pyspark.sql import SparkSession
  
# creating the session
spark = SparkSession.builder.getOrCreate()
  
# PySpark DataFrame from a csv
df = spark.createDataFrame(pd.read_json('data.json'))
df
  
# show table
df.show()
  
# show schema
df.printSchema()


输出:

使用显式架构创建 PySpark DataFrame

在给出的实现中,我们将使用显式模式创建 pyspark 数据帧。为此,我们在每一行中提供特征值,并将它们添加到具有变量(特征)模式的数据框对象中。完成此操作后,我们将显示数据框和模式。

蟒蛇3

# Need to import to use date time
from datetime import datetime, date
  
# need to import for working with pandas
import pandas as pd
  
# need to import to use pyspark
from pyspark.sql import Row
  
# need to import for session creation
from pyspark.sql import SparkSession
  
# creating the session
spark = SparkSession.builder.getOrCreate()
  
# PySpark DataFrame with Ecplicit Schema
df = spark.createDataFrame([
    (1, 4., 'GFG1', date(2000, 8, 1), 
     datetime(2000, 8, 1, 12, 0)),
    
    (2, 8., 'GFG2', date(2000, 6, 2), 
     datetime(2000, 6, 2, 12, 0)),
    
    (3, 5., 'GFG3', date(2000, 5, 3), 
     datetime(2000, 5, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
  
# show table
df.show()
  
# show schema 
df.printSchema()

输出:

使用 Pandas 从 DataFrame 创建 PySpark DataFrame

在给出的实现中,我们将使用 Pandas Dataframe 创建 pyspark 数据帧。为此,我们提供了每个特征的值列表,这些值代表每一行的该列的值,并将它们添加到数据帧中。完成此操作后,我们将显示数据框和模式。

蟒蛇3

# Need to import to use date time
from datetime import datetime, date
  
# need to import for working with pandas
import pandas as pd
  
# need to import to use pyspark
from pyspark.sql import Row
  
# need to import for session creation
from pyspark.sql import SparkSession
  
# creating the session
spark = SparkSession.builder.getOrCreate()
  
## PySpark DataFrame from a pandas DataFrame
pandas_df = pd.DataFrame({
    'a': [1, 2, 3],
    
    'b': [4., 8., 5.],
    
    'c': ['GFG1', 'GFG2', 'GFG3'],
    
    'd': [date(2000, 8, 1), date(2000, 6, 2),
          date(2000, 5, 3)],
    
    'e': [datetime(2000, 8, 1, 12, 0), 
          datetime(2000, 6, 2, 12, 0), 
          datetime(2000, 5, 3, 12, 0)]
})
  
df = spark.createDataFrame(pandas_df)
df
  
# show table
df.show()
  
# show schema 
df.printSchema()

输出:

从 RDD 创建 PySpark DataFrame

在给出的实现中,我们将使用元组列表创建 pyspark 数据帧。为此,我们通过使用 parallelize() 方法提供每行中的特征值来创建 RDD,并将它们添加到具有变量(特征)模式的数据帧对象中。完成此操作后,我们将显示数据框和模式。

蟒蛇3

# Need to import to use date time
from datetime import datetime, date
  
# need to import for working with pandas
import pandas as pd
  
# need to import to use pyspark
from pyspark.sql import Row
  
# need to import for session creation
from pyspark.sql import SparkSession
  
# creating the session
spark = SparkSession.builder.getOrCreate()
  
# pyspark dataframe
rdd = spark.sparkContext.parallelize([
    (1, 4., 'GFG1', date(2000, 8, 1), datetime(2000, 8, 1, 12, 0)),
    (2, 8., 'GFG2', date(2000, 6, 2), datetime(2000, 6, 2, 12, 0)),
    (3, 5., 'GFG3', date(2000, 5, 3), datetime(2000, 5, 3, 12, 0))
])
df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])
df
  
# show table
df.show()
  
# show schema
df.printSchema()

输出:

从 CSV 创建 PySpark DataFrame

在给出的实现中,我们将使用 CSV 创建 pyspark 数据框。为此,我们正在打开 CSV 文件,将它们添加到数据框对象中。完成此操作后,我们将显示数据框和模式。

使用的CSV: train_dataset

蟒蛇3

# Need to import to use date time
from datetime import datetime, date
  
# need to import for working with pandas
import pandas as pd
  
# need to import to use pyspark
from pyspark.sql import Row
  
# need to import for session creation
from pyspark.sql import SparkSession
  
# creating the session
spark = SparkSession.builder.getOrCreate()
  
# PySpark DataFrame from a csv
df = spark.createDataFrame(pd.read_csv('data.csv'))
df
  
# show table
df.show()
  
# show schema
df.printSchema()

输出:

从文本文件创建 PySpark DataFrame

在给出的实现中,我们将使用文本文件创建 pyspark 数据框。为此,我们正在打开具有制表符分隔值的文本文件,将它们添加到数据框对象中。完成此操作后,我们将显示数据框和模式。

使用的文件:

蟒蛇3

# Need to import to use date time
from datetime import datetime, date
  
# need to import for working with pandas
import pandas as pd
  
# need to import to use pyspark
from pyspark.sql import Row
  
# need to import for session creation
from pyspark.sql import SparkSession
  
# creating the session
spark = SparkSession.builder.getOrCreate()
  
# PySpark DataFrame from a csv
df = spark.createDataFrame(pd.read_csv('data.txt', delimiter="\t"))
df
  
# show table
df.show()
  
# show schema
df.printSchema()

输出:

从 JSON 创建 PySpark DataFrame

在给出的实现中,我们将使用 JSON 创建 pyspark 数据帧。为此,我们打开 JSON 文件,将它们添加到数据帧对象中。完成此操作后,我们将显示数据框和模式。

使用的 JSON:

蟒蛇3

# Need to import to use date time
from datetime import datetime, date
  
# need to import for working with pandas
import pandas as pd
  
# need to import to use pyspark
from pyspark.sql import Row
  
# need to import for session creation
from pyspark.sql import SparkSession
  
# creating the session
spark = SparkSession.builder.getOrCreate()
  
# PySpark DataFrame from a csv
df = spark.createDataFrame(pd.read_json('data.json'))
df
  
# show table
df.show()
  
# show schema
df.printSchema()

输出:

所以这些都是Creating a PySpark DataFrame的方法。在上述程序中使用了以下数据集。