📜  如何遍历 PySpark 中的每一行数据帧?

📅  最后修改于: 2022-05-13 01:55:30.916000             🧑  作者: Mango

如何遍历 PySpark 中的每一行数据帧?

在本文中,我们将看到如何在 PySpark 中循环遍历 Dataframe 的每一行。循环遍历每一行有助于我们在 RDD 或 Dataframe 上执行复杂的操作。

创建用于演示的数据框:

Python3
# importing necessary libraries
import pyspark
from pyspark.sql import SparkSession
 
# function to create new SparkSession
def create_session():
    spk = SparkSession.builder \
        .master("local") \
        .appName("employee_profile.com") \
        .getOrCreate()
    return spk
 
 
def create_df(spark, data, schema):
    df1 = spark.createDataFrame(data, schema)
    return df1
 
 
if __name__ == "__main__":
 
    # calling function to create SparkSession
    spark = create_session()
 
    input_data = [(1, "Shivansh", "Data Scientist", "Noida"),
                  (2, "Rishabh", "Software Developer", "Banglore"),
                  (3, "Swati", "Data Analyst", "Hyderabad"),
                  (4, "Amar", "Data Analyst", "Noida"),
                  (5, "Arpit", "Android Developer", "Pune"),
                  (6, "Ranjeet", "Python Developer", "Gurugram"),
                  (7, "Priyanka", "Full Stack Developer", "Banglore")]
 
    schema = ["Id", "Name", "Job Profile", "City"]
 
    # calling function to create dataframe
    df = create_df(spark, input_data, schema)
 
    # retrieving all the elements of
    # the dataframe using collect()
    # Storing in the variable
    data_collect = df.collect()
 
    df.show()


Python3
# retrieving all the elements
# of the dataframe using collect()
# Storing in the variable
data_collect = df.collect()
 
# looping thorough each row of the dataframe
for row in data_collect:
    # while looping through each
    # row printing the data of Id, Name and City
    print(row["Id"],row["Name"],"  ",row["City"])


Python
data_itr = df.rdd.toLocalIterator()
 
# looping thorough each row of the dataframe
for row in data_itr:
   
    # while looping through each row printing
    # the data of Id, Job Profile and City
    print(row["Id"]," ",row["Job Profile"],"  ",row["City"])


Python
pd_df = df.toPandas()
 
# looping through each row using iterrows()
# used to iterate over dataframe rows as index,
# series pair
for index, row in pd_df.iterrows():
   
    # while looping through each row
    # printing the Id, Name and Salary
    # by passing index instead of Name
    # of the column
    print(row[0],row[1]," ",row[3])


Python
# importing necessary libraries
import pyspark
from pyspark.sql import SparkSession
 
# function to create new SparkSession
def create_session():
  spk = SparkSession.builder \
      .master("local") \
      .appName("employee_profile.com") \
      .getOrCreate()
  return spk
 
def create_df(spark,data,schema):
  df1 = spark.createDataFrame(data,schema)
  return df1
 
if __name__ == "__main__":
 
  # calling function to create SparkSession
  spark = create_session()
     
  input_data = [(1,"Shivansh","Data Scientist",2000000,"Noida"),
          (2,"Rishabh","Software Developer",1500000,"Banglore"),
          (3,"Swati","Data Analyst",1000000,"Hyderabad"),
          (4,"Amar","Data Analyst",950000,"Noida"),
          (5,"Arpit","Android Developer",1600000,"Pune"),
          (6,"Ranjeet","Python Developer",1800000,"Gurugram"),
          (7,"Priyanka","Full Stack Developer",2200000,"Banglore")]
 
  schema = ["Id","Name","Job Profile","Salary","City"]
 
  # calling function to create dataframe
  df = create_df(spark,input_data,schema)
 
  # map() is only be performed on rdd
  # so converting the dataframe into rdd using df.rdd
  rdd = df.rdd.map(lambda loop: (
      loop["Id"],loop["Name"],loop["Salary"],loop["City"])
  )
 
  # after looping the getting the data from each row
  # converting back from RDD to Dataframe
  df2 = rdd.toDF(["Id","Name","Salary","City"])
 
  # showing the new Dataframe
  df2.show()


Python
# using list comprehension for looping
# through each row  storing the list of
# data in the variable 
table = [x["Job Profile"] for x in df.rdd.collect()]
 
# looping the list for printing 
for row in table:
    print(row)


Python
# importing necessary libraries
import pyspark
from pyspark.sql import SparkSession
 
# function to create new SparkSession
def create_session():
    spk = SparkSession.builder \
        .master("local") \
        .appName("employee_profile.com") \
        .getOrCreate()
    return spk
 
 
def create_df(spark, data, schema):
    df1 = spark.createDataFrame(data, schema)
    return df1
 
 
if __name__ == "__main__":
 
    # calling function to create SparkSession
    spark = create_session()
 
    input_data = [(1, "Shivansh", "Data Scientist", 2000000, "Noida"),
                  (2, "Rishabh", "Software Developer", 1500000, "Banglore"),
                  (3, "Swati", "Data Analyst", 1000000, "Hyderabad"),
                  (4, "Amar", "Data Analyst", 950000, "Noida"),
                  (5, "Arpit", "Android Developer", 1600000, "Pune"),
                  (6, "Ranjeet", "Python Developer", 1800000, "Gurugram"),
                  (7, "Priyanka", "Full Stack Developer", 2200000, "Banglore")]
 
    schema = ["Id", "Name", "Job Profile", "Salary", "City"]
 
    # calling function to create dataframe
    df = create_df(spark, input_data, schema)
 
    # getting each row of dataframe containing
    # only selected columns Selected columns are
    # 'Name' and 'Salary' getting the list of rows
    # with selected column data using collect()
    rows_looped = df.select("Name", "Salary").collect()
 
    # printing the data of each row
    for rows in rows_looped:
       
        # here index 0 and 1 refers to the data
        # of 'Name' column and 'Salary' column
        print(rows[0], rows[1])


输出:



方法 1:使用 collect()

我们可以使用 collect() 操作操作将数据集的所有元素检索到驱动程序函数,然后使用 for 循环遍历它。

蟒蛇3

# retrieving all the elements
# of the dataframe using collect()
# Storing in the variable
data_collect = df.collect()
 
# looping thorough each row of the dataframe
for row in data_collect:
    # while looping through each
    # row printing the data of Id, Name and City
    print(row["Id"],row["Name"],"  ",row["City"])

输出:

方法二:使用 toLocalIterator()

我们可以使用 toLocalIterator()。这将返回一个包含 DataFrame 中所有行的迭代器。它类似于collect()。唯一的区别是 collect() 返回列表,而 toLocalIterator() 返回迭代器。

Python

data_itr = df.rdd.toLocalIterator()
 
# looping thorough each row of the dataframe
for row in data_itr:
   
    # while looping through each row printing
    # the data of Id, Job Profile and City
    print(row["Id"]," ",row["Job Profile"],"  ",row["City"])

输出:



注意:这个函数类似于上面例子中使用的 collect()函数,唯一的区别是这个函数返回迭代器,而 collect()函数返回列表。

方法 3:使用 iterrows()

迭代Dataframe每一行的iterrows()函数是pandas库的函数,所以首先我们必须使用toPandas()函数将PySpark Dataframe转换成Pandas Dataframe。然后使用 for 循环遍历它。

Python

pd_df = df.toPandas()
 
# looping through each row using iterrows()
# used to iterate over dataframe rows as index,
# series pair
for index, row in pd_df.iterrows():
   
    # while looping through each row
    # printing the Id, Name and Salary
    # by passing index instead of Name
    # of the column
    print(row[0],row[1]," ",row[3])

输出:

方法 4:使用 map()

map()函数和 lambda函数,用于迭代 Datafarame 的每一行。为了首先使用 map() 循环遍历每一行,我们必须将 PySpark 数据帧转换为 RDD,因为 map() 仅在 RDD 上执行,因此首先转换为 RDD,然后使用 map() 其中,lambda函数遍历每一行并将新的 RDD 存储在某个变量中,然后使用toDF()通过将模式传递给它来将新的 RDD 转换回 Datafarame。

Python

# importing necessary libraries
import pyspark
from pyspark.sql import SparkSession
 
# function to create new SparkSession
def create_session():
  spk = SparkSession.builder \
      .master("local") \
      .appName("employee_profile.com") \
      .getOrCreate()
  return spk
 
def create_df(spark,data,schema):
  df1 = spark.createDataFrame(data,schema)
  return df1
 
if __name__ == "__main__":
 
  # calling function to create SparkSession
  spark = create_session()
     
  input_data = [(1,"Shivansh","Data Scientist",2000000,"Noida"),
          (2,"Rishabh","Software Developer",1500000,"Banglore"),
          (3,"Swati","Data Analyst",1000000,"Hyderabad"),
          (4,"Amar","Data Analyst",950000,"Noida"),
          (5,"Arpit","Android Developer",1600000,"Pune"),
          (6,"Ranjeet","Python Developer",1800000,"Gurugram"),
          (7,"Priyanka","Full Stack Developer",2200000,"Banglore")]
 
  schema = ["Id","Name","Job Profile","Salary","City"]
 
  # calling function to create dataframe
  df = create_df(spark,input_data,schema)
 
  # map() is only be performed on rdd
  # so converting the dataframe into rdd using df.rdd
  rdd = df.rdd.map(lambda loop: (
      loop["Id"],loop["Name"],loop["Salary"],loop["City"])
  )
 
  # after looping the getting the data from each row
  # converting back from RDD to Dataframe
  df2 = rdd.toDF(["Id","Name","Salary","City"])
 
  # showing the new Dataframe
  df2.show()

输出:

方法 5:使用列表理解

我们可以使用列表理解来循环遍历我们将在示例中讨论的每一行。



Python

# using list comprehension for looping
# through each row  storing the list of
# data in the variable 
table = [x["Job Profile"] for x in df.rdd.collect()]
 
# looping the list for printing 
for row in table:
    print(row)

输出:

方法 6:使用select()

select()函数用于选择列数。选择列后,我们使用 collect()函数返回仅包含所选列数据的行列表。

Python

# importing necessary libraries
import pyspark
from pyspark.sql import SparkSession
 
# function to create new SparkSession
def create_session():
    spk = SparkSession.builder \
        .master("local") \
        .appName("employee_profile.com") \
        .getOrCreate()
    return spk
 
 
def create_df(spark, data, schema):
    df1 = spark.createDataFrame(data, schema)
    return df1
 
 
if __name__ == "__main__":
 
    # calling function to create SparkSession
    spark = create_session()
 
    input_data = [(1, "Shivansh", "Data Scientist", 2000000, "Noida"),
                  (2, "Rishabh", "Software Developer", 1500000, "Banglore"),
                  (3, "Swati", "Data Analyst", 1000000, "Hyderabad"),
                  (4, "Amar", "Data Analyst", 950000, "Noida"),
                  (5, "Arpit", "Android Developer", 1600000, "Pune"),
                  (6, "Ranjeet", "Python Developer", 1800000, "Gurugram"),
                  (7, "Priyanka", "Full Stack Developer", 2200000, "Banglore")]
 
    schema = ["Id", "Name", "Job Profile", "Salary", "City"]
 
    # calling function to create dataframe
    df = create_df(spark, input_data, schema)
 
    # getting each row of dataframe containing
    # only selected columns Selected columns are
    # 'Name' and 'Salary' getting the list of rows
    # with selected column data using collect()
    rows_looped = df.select("Name", "Salary").collect()
 
    # printing the data of each row
    for rows in rows_looped:
       
        # here index 0 and 1 refers to the data
        # of 'Name' column and 'Salary' column
        print(rows[0], rows[1])

输出: