📌  相关文章
📜  如何在 PySpark 数据框中添加列总和作为新列?

📅  最后修改于: 2022-05-13 01:55:39.196000             🧑  作者: Mango

如何在 PySpark 数据框中添加列总和作为新列?

在本文中,我们将了解如何通过各种方法在 Pyspark 数据框中添加新列。这意味着我们要创建一个新列,该列将包含给定行中所有值的总和。现在让我们讨论如何将 sum 添加为新列的各种方法

但首先,让我们为演示创建 Dataframe

Python3
# import SparkSession from the pyspark
from pyspark.sql import SparkSession
 
# build and create  the SparkSession
# with name "sum as new_col"
spark = SparkSession.builder.appName("sum as new_col").getOrCreate()
 
# Creating the Spark DataFrame
data = spark.createDataFrame([('x', 5, 3, 7),
                              ('Y', 3, 3, 6),
                              ('Z', 5, 2, 6)],
                             ['A', 'B', 'C', 'D'])
 
# Print the schema of the DataFrame by
# printSchema()
data.printSchema()
 
# Showing the DataFrame
data.show()


Python3
# import the functions as F from pyspark.sql
import pyspark.sql.functions as F
from  pyspark.sql.types import IntegerType
  
# define the sum_col
def sum_col(b, c, d):
    col_sum = b+c+d
    return col_sum
 
# integer datatype is defined
new_f = F.udf(sum_col, IntegerType())
 
# calling and creating the new
# col as udf_method_sum
df_col1 = data.withColumn("Udf_method_sum",
                          new_f("B", "C", "D"))
 
# Showing and printing the schema of the Dataframe
df_col1.printSchema()
df_col1.show()


Python3
# import expr from the functions
from pyspark.sql.functions import expr
 
# create the new column as by withcolumn
# by giving argument  as
# col_name ='expression_method_sum'
# and  expr() function which
# will take expressions  argument as string
df_col1 = df_col1.withColumn('expression_method_sum',
                             expr("B+C + D"))
 
# Showing and printing the schema of
# the Dataframe
df_col1.printSchema()
df_col1.show()


Python3
# Creating the temporary view
# of the DataFrame as temp.
df_col1 = df_col1.createTempView("temp")
 
# By using sql clause creating
# new columns as sql_method
df_col1=spark.sql('select *, B+C+D as sql_method from temp')
 
# Printing the schema of the dataFrame
# and showing the DataFrame
df_col1.printScheam()
df_col1.show()


Python3
# select everything from table df_col1 and
# create new sum  column as " select_method_sum".
df_col1 = df_col1.select('*',
                         (df_col1["B"]+df_col1["C"]+df_col1['D']).
                         alias("select_method_sum"))
 
# Showing the schema and table
df_col1.printSchema()
df_col1.show()


Python3
# by using withcolumn function
df_col1 = df_col1.withColumn('withcolum_Sum',
                             data['B']+data['C']+data['D'])
 
# Showing and printing the schema
# of the Dataframe
df_col1.printSchema()
df_col1.show()


输出:



现在我们将看到有关如何在 spark Dataframe 中添加新列的不同方法。

方法一:使用UDF

在这个方法中,我们将定义一个函数,它将列名作为参数并返回行的总和。通过使用 UDF(User-defined Functions) 方法,该方法用于在 spark 中创建可重用的函数。这个函数允许我们根据我们的要求创建新函数,这就是为什么这也被称为使用的定义函数。

现在我们定义 udf函数的数据类型并创建将返回值的函数,该值是行中所有值的总和。

蟒蛇3

# import the functions as F from pyspark.sql
import pyspark.sql.functions as F
from  pyspark.sql.types import IntegerType
  
# define the sum_col
def sum_col(b, c, d):
    col_sum = b+c+d
    return col_sum
 
# integer datatype is defined
new_f = F.udf(sum_col, IntegerType())
 
# calling and creating the new
# col as udf_method_sum
df_col1 = data.withColumn("Udf_method_sum",
                          new_f("B", "C", "D"))
 
# Showing and printing the schema of the Dataframe
df_col1.printSchema()
df_col1.show()

输出:

方法二:使用 expr()函数。

通过使用 expr(str) 将表达式参数作为字符串的函数。 pyspark 中有另一个函数,它将数学表达式作为字符串形式的参数。例如,如果您想要行的总和,则将参数作为“n1+n2+n3+n4……”传递。其中 n1,n2,n3... 是列名

蟒蛇3



# import expr from the functions
from pyspark.sql.functions import expr
 
# create the new column as by withcolumn
# by giving argument  as
# col_name ='expression_method_sum'
# and  expr() function which
# will take expressions  argument as string
df_col1 = df_col1.withColumn('expression_method_sum',
                             expr("B+C + D"))
 
# Showing and printing the schema of
# the Dataframe
df_col1.printSchema()
df_col1.show()

输出:

方法三:使用SQL操作

在这个方法中,我们首先要创建表的临时视图,借助 createTempView 我们可以创建临时视图。这个 temp 的寿命取决于 sparkSession 的寿命

然后在创建表后通过 SQL 子句选择表,它将所有值作为字符串

蟒蛇3

# Creating the temporary view
# of the DataFrame as temp.
df_col1 = df_col1.createTempView("temp")
 
# By using sql clause creating
# new columns as sql_method
df_col1=spark.sql('select *, B+C+D as sql_method from temp')
 
# Printing the schema of the dataFrame
# and showing the DataFrame
df_col1.printScheam()
df_col1.show()

输出:

方法 4:使用 select()

使用 select() 方法选择表并传递参数第一个是列名,或“*”用于选择整个表,第二个参数传递列名进行添加,别名()函数用于给出新创建的列的名称。

蟒蛇3

# select everything from table df_col1 and
# create new sum  column as " select_method_sum".
df_col1 = df_col1.select('*',
                         (df_col1["B"]+df_col1["C"]+df_col1['D']).
                         alias("select_method_sum"))
 
# Showing the schema and table
df_col1.printSchema()
df_col1.show()

输出:



方法 5:使用 withcolumn()

WithColumn() 是数据框的转换函数,用于更改值、更改数据类型以及从现有列创建新列。

此函数将参数作为求和的新列名和列名。

蟒蛇3

# by using withcolumn function
df_col1 = df_col1.withColumn('withcolum_Sum',
                             data['B']+data['C']+data['D'])
 
# Showing and printing the schema
# of the Dataframe
df_col1.printSchema()
df_col1.show()

输出: