📌  相关文章
📜  如何在 Pyspark 数据框中复制行 N 次?

📅  最后修改于: 2022-05-13 01:54:23.907000             🧑  作者: Mango

如何在 Pyspark 数据框中复制行 N 次?

在本文中,我们将学习如何在 PySpark DataFrame 中复制一行 N 次。

方法一:根据列值重复行

在这种方法中,我们将首先使用createDataFrame()创建一个 PySpark DataFrame。在我们的示例中,“Y”列有一个数值,只能在此处用于重复行。我们将在这里使用withColumn()函数,下面将解释其参数 expr。

这里的colName是“Y”。我们将在这里使用的col表达式是:

explode(array_repeat(Y,int(Y)))
  • array_repeat是一个表达式,它创建一个包含列重复计数次数的数组。
  • explode是一个表达式,它为给定数组或映射中的每个元素返回一个新行。

例子:

Python
# Importing PySpark and Pandas
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,expr
 
# Session Creation
Spark_Session = SparkSession.builder.appName(
    'Spark Session'
).getOrCreate()
 
# Accepting n from the user.
n = int(input('Enter n : '))
 
# Data filled in our DataFrame
rows = [['a',1,'@'],
        ['b',3,'_'],
        ['c',2,'!'],
        ['d',6,'(']]
 
# Columns of our DataFrame
columns = ['X','Y','Z']
 
# DataFrame is created
df = Spark_Session.createDataFrame(rows,columns)
 
# Printing the DataFrame
df.show()
 
# Creating a new DataFrame with a
# expression using functions
new_df = df.withColumn(
  "Y", expr("explode(array_repeat(Y,int(Y)))"))
 
# Printing the new DataFrame
new_df.show()


Python
# Importing PySpark and random
import pyspark
from pyspark.sql import SparkSession
import random
 
# Session Creation
Spark_Session = SparkSession.builder.appName(
    'Spark Session'
).getOrCreate()
 
# Accepting n from the user.
n = int(input('Enter n : '))
 
# Data filled in our DataFrame
rows = [['a',1,'@'],
        ['b',3,'_'],
        ['c',2,'!'],
        ['d',6,'(']]
 
# Columns of our DataFrame
columns = ['X','Y','Z']
 
# DataFrame is created
df = Spark_Session.createDataFrame(rows,columns)
 
# Showing the DataFrame
df.show()
 
# Creating a list of rows and
# getting a random row from the list
row_list = df.collect()
repeated = random.choice(row_list)
 
# adding a row object to the list
# n times
for _ in range(n):
  row_list.append(repeated)
 
# Final DataFrame
df = Spark_Session.createDataFrame(row_list)
 
# Result
df.show()


Python
# Importing PySpark and Pandas
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
 
# Session Creation
Spark_Session = SparkSession.builder.appName(
    'Spark Session'
).getOrCreate()
 
# Accepting n from the user.
n = int(input('Enter n : '))
 
# Data filled in our DataFrame
rows = [['a',1,'@'],
        ['b',3,'_'],
        ['c',2,'!'],
        ['d',6,'(']]
 
# Columns of our DataFrame
columns = ['X','Y','Z']
 
# DataFrame is created
df = Spark_Session.createDataFrame(rows,columns)
 
# Converting to a Pandas DataFrame
df_pandas = df.toPandas()
 
# The initial DataFrame
print('First DF')
print(df_pandas)
 
# the first row
first_row = df_pandas[:1]
 
# Appending the row n times
for _ in range(n):
  df_pandas = df_pandas.append(first_row,ignore_index = True)
 
# Final DataFrame
print('New DF')
print(df_pandas)


输出 :

方法 2:使用 collect() 并在列表中附加一个随机行

在这种方法中,我们将首先接受来自用户的 N。然后我们将使用createDataFrame()创建一个 PySpark DataFrame。然后我们可以存储使用collect()方法找到的 Row 对象列表。需要的语法是:

DataFrame.collect()

在一个变量中。然后,我们将使用Python List append()函数在列表中追加一个行对象,这将在 N 次迭代的循环中完成。最后,Row 对象列表将转换为 PySpark DataFrame。

例子:

Python

# Importing PySpark and random
import pyspark
from pyspark.sql import SparkSession
import random
 
# Session Creation
Spark_Session = SparkSession.builder.appName(
    'Spark Session'
).getOrCreate()
 
# Accepting n from the user.
n = int(input('Enter n : '))
 
# Data filled in our DataFrame
rows = [['a',1,'@'],
        ['b',3,'_'],
        ['c',2,'!'],
        ['d',6,'(']]
 
# Columns of our DataFrame
columns = ['X','Y','Z']
 
# DataFrame is created
df = Spark_Session.createDataFrame(rows,columns)
 
# Showing the DataFrame
df.show()
 
# Creating a list of rows and
# getting a random row from the list
row_list = df.collect()
repeated = random.choice(row_list)
 
# adding a row object to the list
# n times
for _ in range(n):
  row_list.append(repeated)
 
# Final DataFrame
df = Spark_Session.createDataFrame(row_list)
 
# Result
df.show()

输出 :

方法 3:将 PySpark DataFrame 转换为 Pandas DataFrame

在这种方法中,我们将首先接受来自用户的 N。然后我们将使用createDataFrame()创建一个 PySpark DataFrame。然后,我们将使用toPandas()将 PySpark DataFrame 转换为 Pandas DataFrame。然后,我们将使用语法DataFrame[:1] 进行切片来获取 DataFrame 的第一行。然后,我们将使用append()函数通过循环将行粘贴到 Pandas DataFrame。它们的 append() 语法是:

例子:

Python

# Importing PySpark and Pandas
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
 
# Session Creation
Spark_Session = SparkSession.builder.appName(
    'Spark Session'
).getOrCreate()
 
# Accepting n from the user.
n = int(input('Enter n : '))
 
# Data filled in our DataFrame
rows = [['a',1,'@'],
        ['b',3,'_'],
        ['c',2,'!'],
        ['d',6,'(']]
 
# Columns of our DataFrame
columns = ['X','Y','Z']
 
# DataFrame is created
df = Spark_Session.createDataFrame(rows,columns)
 
# Converting to a Pandas DataFrame
df_pandas = df.toPandas()
 
# The initial DataFrame
print('First DF')
print(df_pandas)
 
# the first row
first_row = df_pandas[:1]
 
# Appending the row n times
for _ in range(n):
  df_pandas = df_pandas.append(first_row,ignore_index = True)
 
# Final DataFrame
print('New DF')
print(df_pandas)

输出 :