📜  PySpark Collect() – 从 DataFrame 中检索数据

📅  最后修改于: 2022-05-13 01:55:07.799000             🧑  作者: Mango

PySpark Collect() – 从 DataFrame 中检索数据

Collect() 是 RDD 或 Dataframe 的函数、操作,用于从 Dataframe 中检索数据。它用于从 RDD 中的每个分区检索行的所有元素并将其带到驱动程序节点/程序上。

因此,在本文中,我们将学习如何使用 collect() 操作操作从 Dataframe 中检索数据。

示例 1:使用 collect() 从 Dataframe 中检索所有数据。



创建数据帧后,为了从数据帧中检索所有数据,我们通过编写df.collect()使用了 collect() 操作这将返回行类型的数组,在下面的输出中显示了数据帧的架构和实际创建的数据框。

Python
# importing necessary libraries
from pyspark.sql import SparkSession
  
# function to create new SparkSession
def create_session():
  spk = SparkSession.builder \
      .appName("Corona_cases_statewise.com") \
      .getOrCreate()
  return spk
  
# function to create RDD
def create_RDD(sc_obj,data):
  df = sc.parallelize(data)
  return df
  
  
if __name__ == "__main__":
      
  input_data = [("Uttar Pradesh",122000,89600,12238),
          ("Maharashtra",454000,380000,67985),
          ("Tamil Nadu",115000,102000,13933),
          ("Karnataka",147000,111000,15306),
          ("Kerala",153000,124000,5259)]
  
  # calling function to create SparkSession
  spark = create_session()
  
  # creating spark context object
  sc = spark.sparkContext
  
  # calling function to create RDD
  rd_df = create_RDD(sc,input_data)
  
  schema_lst = ["State","Cases","Recovered","Deaths"]
  
  # creating the dataframe using createDataFrame function
  df = spark.createDataFrame(rd_df,schema_lst)
  # printing schema of the dataframe and showing the dataframe
  df.printSchema()
  df.show()
      
  # retrieving the data from the dataframe using collect() 
  df2= df.collect()
  print("Retrieved Data is:-")
  print(df2)


Python
# importing necessary libraries
from pyspark.sql import SparkSession
  
# function to create new SparkSession
def create_session():
  spk = SparkSession.builder \
      .appName("Corona_cases_statewise.com") \
      .getOrCreate()
  return spk
  
# function to create RDD
def create_RDD(sc_obj,data):
  df = sc.parallelize(data)
  return df
  
  
if __name__ == "__main__":
      
  input_data = [("Uttar Pradesh",122000,89600,12238),
          ("Maharashtra",454000,380000,67985),
          ("Tamil Nadu",115000,102000,13933),
          ("Karnataka",147000,111000,15306),
          ("Kerala",153000,124000,5259)]
  
  # calling function to create SparkSession
  spark = create_session()
  
  # creating spark context object
  sc = spark.sparkContext
  
  # calling function to create RDD
  rd_df = create_RDD(sc,input_data)
  
  schema_lst = ["State","Cases","Recovered","Deaths"]
  
  # creating the dataframe using createDataFrame function
  df = spark.createDataFrame(rd_df,schema_lst)
  
  # printing schema of the dataframe and showing the dataframe
  df.printSchema()
  df.show()
    
  print("Retrieved Data is:-")
    
  # Retrieving data from 0th row 
  print(df.collect()[0][0:])


Python
# importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
  
# function to create new SparkSession
def create_session():
  spk = SparkSession.builder \
      .appName("Corona_cases_statewise.com") \
      .getOrCreate()
  return spk
  
# function to create RDD
def create_RDD(sc_obj,data):
  df = sc.parallelize(data)
  return df
  
  
if __name__ == "__main__":
      
  input_data = [("Uttar Pradesh",122000,89600,12238),
          ("Maharashtra",454000,380000,67985),
          ("Tamil Nadu",115000,102000,13933),
          ("Karnataka",147000,111000,15306),
          ("Kerala",153000,124000,5259)]
  
  # calling function to create SparkSession
  spark = create_session()
  
  # creating spark context object
  sc = spark.sparkContext
  
  # calling function to create RDD
  rd_df = create_RDD(sc,input_data)
  
  schema_lst = ["State","Cases","Recovered","Deaths"]
  
  # creating the dataframe using createDataFrame function
  df = spark.createDataFrame(rd_df,schema_lst)
  
  # showing the dataframe and schema
  df.printSchema()
  df.show()
    
  print("Retrieved Data is:-")
    
  # Retrieving multiple rows using collect() and for loop
  for row in df.collect()[0:3]:
    print((row["State"]),",",str(row["Cases"]),",",
          str(row["Recovered"]),",",str(row["Deaths"]))


Python
# importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
  
# function to create new SparkSession
def create_session():
  spk = SparkSession.builder \
      .appName("Corona_cases_statewise.com") \
      .getOrCreate()
  return spk
  
# function to create RDD
def create_RDD(sc_obj,data):
  df = sc.parallelize(data)
  return df
  
if __name__ == "__main__":
      
  input_data = [("Uttar Pradesh",122000,89600,12238),
          ("Maharashtra",454000,380000,67985),
          ("Tamil Nadu",115000,102000,13933),
          ("Karnataka",147000,111000,15306),
          ("Kerala",153000,124000,5259)]
  
  # calling function to create SparkSession
  spark = create_session()
  
  # creating spark context object
  sc = spark.sparkContext
  
  # calling function to create RDD
  rd_df = create_RDD(sc,input_data)
  
  schema_lst = ["State","Cases","Recovered","Deaths"]
  
  # creating the dataframe using createDataFrame function
  df = spark.createDataFrame(rd_df,schema_lst)
  
  # showing the dataframe and schema
  df.printSchema()
  df.show()
    
  print("Retrieved Data is:-")
    
  # Retrieving data from the "Cases" column
  for col in df.collect():
    print(col["Cases"])


Python
# importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
  
# function to create new SparkSession
def create_session():
  spk = SparkSession.builder \
      .appName("Corona_cases_statewise.com") \
      .getOrCreate()
  return spk
  
# function to create RDD
def create_RDD(sc_obj,data):
  df = sc.parallelize(data)
  return df
  
  
if __name__ == "__main__":
      
  input_data = [("Uttar Pradesh",122000,89600,12238),
          ("Maharashtra",454000,380000,67985),
          ("Tamil Nadu",115000,102000,13933),
          ("Karnataka",147000,111000,15306),
          ("Kerala",153000,124000,5259)]
  
  # calling function to create SparkSession
  spark = create_session()
  
  # creating spark context object
  sc = spark.sparkContext
  
  # calling function to create RDD
  rd_df = create_RDD(sc,input_data)
  
  schema_lst = ["State","Cases","Recovered","Deaths"]
  
  # creating the dataframe using createDataFrame function
  df = spark.createDataFrame(rd_df,schema_lst)
  
  # showing the dataframe and schema
  df.printSchema()
  df.show()
    
  print("Retrieved Data is:-")
    
  # Retrieving data of the "State",
  # "Recovered" and "Deaths" column
  for col in df.collect():
    print(col["State"],",",col["Recovered"],",
          ",col["Deaths"])


输出:

示例 2:使用 collect() 检索特定行的数据。

创建数据框后,我们通过分别编写print(df.collect()[0][0:])使用 collect() 操作检索第 0 行数据框的数据,在此我们在 collect() 之后传递行和列,在第一个打印语句中,我们将行和列作为 [0][0:] 传递,这里第一个 [0] 表示我们传递了 0 的行,第二个 [0:] 表示列和冒号 (:) 用于检索所有列,简而言之,我们检索了所有列元素的第 0 行。

Python

# importing necessary libraries
from pyspark.sql import SparkSession
  
# function to create new SparkSession
def create_session():
  spk = SparkSession.builder \
      .appName("Corona_cases_statewise.com") \
      .getOrCreate()
  return spk
  
# function to create RDD
def create_RDD(sc_obj,data):
  df = sc.parallelize(data)
  return df
  
  
if __name__ == "__main__":
      
  input_data = [("Uttar Pradesh",122000,89600,12238),
          ("Maharashtra",454000,380000,67985),
          ("Tamil Nadu",115000,102000,13933),
          ("Karnataka",147000,111000,15306),
          ("Kerala",153000,124000,5259)]
  
  # calling function to create SparkSession
  spark = create_session()
  
  # creating spark context object
  sc = spark.sparkContext
  
  # calling function to create RDD
  rd_df = create_RDD(sc,input_data)
  
  schema_lst = ["State","Cases","Recovered","Deaths"]
  
  # creating the dataframe using createDataFrame function
  df = spark.createDataFrame(rd_df,schema_lst)
  
  # printing schema of the dataframe and showing the dataframe
  df.printSchema()
  df.show()
    
  print("Retrieved Data is:-")
    
  # Retrieving data from 0th row 
  print(df.collect()[0][0:])

输出:



示例 3:使用 collect() 检索多行数据。

创建数据框后,我们使用 collect() 动作和 for 循环检索数据框前三行的数据,通过在 df.collect()[0:3] 中写入for row ,在写入 collect() 动作后我们正在传递我们想要的行数 [0:3],第一个 [0] 代表起始行并使用“:”分号,[3] 代表我们想要多行数据的结束行。

这是我们从中检索数据的行数是 0,1 和 2 最后一个索引总是被排除在外,即 3。

Python

# importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
  
# function to create new SparkSession
def create_session():
  spk = SparkSession.builder \
      .appName("Corona_cases_statewise.com") \
      .getOrCreate()
  return spk
  
# function to create RDD
def create_RDD(sc_obj,data):
  df = sc.parallelize(data)
  return df
  
  
if __name__ == "__main__":
      
  input_data = [("Uttar Pradesh",122000,89600,12238),
          ("Maharashtra",454000,380000,67985),
          ("Tamil Nadu",115000,102000,13933),
          ("Karnataka",147000,111000,15306),
          ("Kerala",153000,124000,5259)]
  
  # calling function to create SparkSession
  spark = create_session()
  
  # creating spark context object
  sc = spark.sparkContext
  
  # calling function to create RDD
  rd_df = create_RDD(sc,input_data)
  
  schema_lst = ["State","Cases","Recovered","Deaths"]
  
  # creating the dataframe using createDataFrame function
  df = spark.createDataFrame(rd_df,schema_lst)
  
  # showing the dataframe and schema
  df.printSchema()
  df.show()
    
  print("Retrieved Data is:-")
    
  # Retrieving multiple rows using collect() and for loop
  for row in df.collect()[0:3]:
    print((row["State"]),",",str(row["Cases"]),",",
          str(row["Recovered"]),",",str(row["Deaths"]))

输出:

示例 4:使用 collect() 从特定列中检索数据。

创建数据框后,我们使用 collect() 操作和 for 循环检索“案例”列的数据。通过将循环迭代到 df.collect(),这为我们提供了来自该行的行数组,我们正在通过编写print(col[“Cases”]);检索和打印 'Cases' 列的数据

由于我们通过从 Array of rows 迭代 for 循环来获取第一个行,因此我们仅从该行检索“Cases”列的数据。通过在此处从每一行写入print(col[“Cases”]) ,我们通过在 col 中传递 'Cases' 来检索 'Cases' 列的数据。

Python



# importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
  
# function to create new SparkSession
def create_session():
  spk = SparkSession.builder \
      .appName("Corona_cases_statewise.com") \
      .getOrCreate()
  return spk
  
# function to create RDD
def create_RDD(sc_obj,data):
  df = sc.parallelize(data)
  return df
  
if __name__ == "__main__":
      
  input_data = [("Uttar Pradesh",122000,89600,12238),
          ("Maharashtra",454000,380000,67985),
          ("Tamil Nadu",115000,102000,13933),
          ("Karnataka",147000,111000,15306),
          ("Kerala",153000,124000,5259)]
  
  # calling function to create SparkSession
  spark = create_session()
  
  # creating spark context object
  sc = spark.sparkContext
  
  # calling function to create RDD
  rd_df = create_RDD(sc,input_data)
  
  schema_lst = ["State","Cases","Recovered","Deaths"]
  
  # creating the dataframe using createDataFrame function
  df = spark.createDataFrame(rd_df,schema_lst)
  
  # showing the dataframe and schema
  df.printSchema()
  df.show()
    
  print("Retrieved Data is:-")
    
  # Retrieving data from the "Cases" column
  for col in df.collect():
    print(col["Cases"])

输出:

示例 5:使用 collect() 从多列中检索数据。

创建数据框后,我们将检索多列的数据,其中包括“State”、“Recovered”和“Deaths”。

为了检索多列的数据,首先我们必须获得我们使用df.collect()操作获得的行数组,现在迭代数组的每一行的 for 循环,因为通过迭代我们正在逐行获取行,所以从我们正在从每一列中检索“State”、“Recovered”和“Deaths”列的数据并通过写入打印数据, print(col[“State”],”,”,col[“Recovered”], ”,”,col[“死亡人数”])

Python

# importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
  
# function to create new SparkSession
def create_session():
  spk = SparkSession.builder \
      .appName("Corona_cases_statewise.com") \
      .getOrCreate()
  return spk
  
# function to create RDD
def create_RDD(sc_obj,data):
  df = sc.parallelize(data)
  return df
  
  
if __name__ == "__main__":
      
  input_data = [("Uttar Pradesh",122000,89600,12238),
          ("Maharashtra",454000,380000,67985),
          ("Tamil Nadu",115000,102000,13933),
          ("Karnataka",147000,111000,15306),
          ("Kerala",153000,124000,5259)]
  
  # calling function to create SparkSession
  spark = create_session()
  
  # creating spark context object
  sc = spark.sparkContext
  
  # calling function to create RDD
  rd_df = create_RDD(sc,input_data)
  
  schema_lst = ["State","Cases","Recovered","Deaths"]
  
  # creating the dataframe using createDataFrame function
  df = spark.createDataFrame(rd_df,schema_lst)
  
  # showing the dataframe and schema
  df.printSchema()
  df.show()
    
  print("Retrieved Data is:-")
    
  # Retrieving data of the "State",
  # "Recovered" and "Deaths" column
  for col in df.collect():
    print(col["State"],",",col["Recovered"],",
          ",col["Deaths"])

输出: