📜  使用 StructField 和 StructType 定义 DataFrame Schema

📅  最后修改于: 2022-05-13 01:55:32.573000             🧑  作者: Mango

使用 StructField 和 StructType 定义 DataFrame Schema

在本文中,我们将学习如何使用 StructField 和 StructType 定义 DataFrame Schema。

  • StructType 和 StructFields 用于为 Dataframe 定义架构或其部分。这定义了每列的名称、数据类型和可为空标志。
  • StructType 对象是 StructFields 对象的集合。它是包含 StructField 列表的内置数据类型。

为了定义模式,我们必须使用 StructType() 对象,我们必须在其中定义或传递 StructField(),其中包含列的名称、列的数据类型和可空标志。我们可以写:-

schema = StructType([StructField(column_name1,datatype(),nullable_flag),
            StructField(column_name2,datatype(),nullable_flag),
            StructField(column_name3,datatype(),nullable_flag)
            ])

示例 1:使用 StructType 和 StructField 定义具有架构的 DataFrame。



Python
# importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, LongType, StringType, FloatType
 
# function to create SparkSession
def create_session():
    spk = SparkSession.builder \
        .master("local") \
        .appName("Product_mart.com") \
        .getOrCreate()
    return spk
 
# function to create dataframe
def create_df(spark, data, schema):
    df1 = spark.createDataFrame(data, schema)
    return df1
 
 
if __name__ == "__main__":
 
    # calling function to create SparkSession
    spark = create_session()
 
    input_data = [("Refrigerator", 112345, 4.0, 12499),
                  ("LED TV", 114567, 4.2, 49999),
                  ("Washing Machine", 113465, 3.9, 69999),
                  ("T-shirt", 124378, 4.1, 1999),
                  ("Jeans", 126754, 3.7, 3999),
                  ("Running Shoes", 134565, 4.7, 1499),
                  ("Face Mask", 145234, 4.6, 999)]
 
    # defining schema for the dataframe with
    # StructType and StructField
    schm = StructType([
        StructField("Product Name", StringType(), True),
        StructField("Product ID", LongType(), True),
        StructField("Rating", FloatType(), True),
        StructField("Product Price", IntegerType(), True),
    ])
 
    # calling function to create dataframe
    df = create_df(spark, input_data, schm)
 
    # visualizing dataframe and it's schema
    df.printSchema()
    df.show()


Python
# importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, LongType, StringType, FloatType
 
# function to create SparkSession
def create_session():
    spk = SparkSession.builder \
        .master("local") \
        .appName("Product_mart.com") \
        .getOrCreate()
    return spk
 
# function to create dataframe
def create_df(spark, data, schema):
    df1 = spark.createDataFrame(data, schema)
    return df1
 
 
if __name__ == "__main__":
 
    # calling function to create SparkSession
    spark = create_session()
 
    input_data = [(("Refrigerator", 112345), 4.0, 12499),
                  (("LED TV", 114567), 4.2, 49999),
                  (("Washing Machine", 113465), 3.9, 69999),
                  (("T-shirt", 124378), 4.1, 1999),
                  (("Jeans", 126754), 3.7, 3999),
                  (("Running Shoes", 134565), 4.7, 1499),
                  (("Face Mask", 145234), 4.6, 999)]
 
    # defining schema for the dataframe using
    # nested StructType
    schm = StructType([
        StructField('Product', StructType([
            StructField('Product Name', StringType(), True),
            StructField('Product ID', LongType(), True),
        ])),
       
        StructField('Rating', FloatType(), True),
        StructField('Price', IntegerType(), True)])
 
    # calling function to create dataframe
    df = create_df(spark, input_data, schm)
 
    # visualizing dataframe and it's schema
    df.printSchema()
    df.show(truncate=False)


Python
# importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import col, struct, when
from pyspark.sql.types import StructType, StructField, IntegerType, LongType, StringType, FloatType
 
# function to create SparkSession
def create_session():
    spk = SparkSession.builder \
        .master("local") \
        .appName("Product_mart.com") \
        .getOrCreate()
    return spk
 
# function to create dataframe
def create_df(spark, data, schema):
    df1 = spark.createDataFrame(data, schema)
    return df1
 
 
if __name__ == "__main__":
 
    # calling function to create SparkSession
    spark = create_session()
 
    input_data = [("Refrigerator", 112345, 4.0, 12499),
                  ("LED TV", 114567, 4.2, 49999),
                  ("Washing Machine", 113465, 3.9, 69999),
                  ("T-shirt", 124378, 4.1, 1999),
                  ("Jeans", 126754, 3.7, 3999),
                  ("Running Shoes", 134565, 4.7, 1499),
                  ("Face Mask", 145234, 4.6, 999)]
 
    # defining schema for the dataframe using
    # nested StructType
    schm = StructType([
        StructField("Product Name", StringType(), True),
        StructField("Product ID", LongType(), True),
        StructField("Rating", FloatType(), True),
        StructField("Product Price", IntegerType(), True)])
 
    # calling function to create dataframe
    df = create_df(spark, input_data, schm)
 
    # copying the columns to the new struct
    # Product
    new_df = df.withColumn("Product",
                           struct(col("Product Name").alias("Name"),
                                  col("Product ID").alias("ID"),
                                  col("Rating").alias("Rating"),
                                  col("Product Price").alias("Price")))
 
    # adding new column according to the given
    # condition
    new_df = new_df.withColumn("Product Range",
                               when(col("Product Price").cast(
                                   IntegerType()) < 1000, "Low")
                               .when(col("Product Price").cast(IntegerType()
                                                              ) < 7000, "Medium")
                               .otherwise("High"))
 
    # dropping the columns as all column values
    # are copied in Product column
    new_df = new_df.drop("Product Name", "Product ID",
                         "Rating", "Product Price")
 
    # visualizing dataframe and it's schema
    new_df.printSchema()
    new_df.show(truncate=False)


Python
# importing necessary libraries
from pyspark.sql import SparkSession
import pyspark.sql.types as T
import json
 
# function to create SparkSession
def create_session():
    spk = SparkSession.builder \
        .master("local") \
        .appName("Product_mart.com") \
        .getOrCreate()
    return spk
 
# function to create dataframe
def create_df(spark, data, schema):
    df1 = spark.createDataFrame(data, schema)
    return df1
 
 
if __name__ == "__main__":
 
    # calling function to create SparkSession
    spark = create_session()
 
    input_data = [("Refrigerator", 4.0),
                  ("LED TV", 4.2),
                  ("Washing Machine", 3.9),
                  ("T-shirt", 4.1)
                  ]
 
    # defining schema for the dataframe with
    # StructType and StructField
    schm = T.StructType([
        T.StructField("Product Name", T.StringType(), True),
        T.StructField("Rating", T.FloatType(), True)
    ])
 
    # calling function to create dataframe
    df = create_df(spark, input_data, schm)
 
    # visualizing dataframe and it's schema
    print("Original Dataframe:-")
    df.printSchema()
    df.show()
 
    print("-------------------------------------------")
    print("Schema in json format:-")
 
    # storing schema in json format using
    # schema.json() function
    schma = df.schema.json()
    print(schma)
 
    # loading the json format schema
    schm1 = StructType.fromJson(json.loads(schma))
 
    # creating dataframe using json format schema
    json_df = spark.createDataFrame(
        spark.sparkContext.parallelize(input_data), schm1)
    print("-------------------------------------------")
    print("Dataframe using json schema:-")
     
    # showing the created dataframe from json format
    # schema printing the schema of created dataframe
    json_df.printSchema()
    json_df.show()


Python
# importing necessary libraries
from pyspark.sql import SparkSession
import pyspark.sql.types as T
 
# function to create SparkSession
def create_session():
    spk = SparkSession.builder \
        .master("local") \
        .appName("Product_mart.com") \
        .getOrCreate()
    return spk
 
# function to create dataframe
def create_df(spark, data, schema):
    df1 = spark.createDataFrame(data, schema)
    return df1
 
 
if __name__ == "__main__":
 
    # calling function to create SparkSession
    spark = create_session()
 
    # Data containing the Array and Map- key,value pair
    input_data = [
        ("Alex", 'Buttler', ["Mathematics", "Computer Science"],
         {"Mathematics": 'Physics', "Chemistry": "Biology"}),
        ("Sam", "Samson", ["Chemistry, Biology"],
         {"Chemistry": 'Physics', "Mathematics": "Biology"}),
        ("Rossi", "Bryant", ["English", "Geography"],
         {"History": 'Geography', "Chemistry": "Biology"}),
        ("Sidz", "Murraz", ["History", "Environmental Science"],
         {"English": 'Environmental Science', "Chemistry": "Mathematics"}),
        ("Robert", "Cox", ["Physics", "English"],
         {"Computer Science": 'Environmental Science', "Chemistry": "Geography"})
    ]
 
    # defining schema with ArrayType and MapType()
    # using StructType() and StructField()
    array_schm = StructType([
        StructField('Firstname', StringType(), True),
        StructField('Lastname', StringType(), True),
        StructField('Subject', ArrayType(StringType()), True),
        StructField('Subject Combinations', MapType(
            StringType(), StringType()), True)
    ])
 
    # calling function for creating the dataframe
    df = create_df(spark, input_data, array_schm)
 
    # printing schema of df and showing dataframe
    df.printSchema()
    df.show(truncate=False)


输出:

在上面的代码中,我们使可为空标志=True。将其设为 True 的用途是,如果在创建 Dataframe 时任何字段值为 NULL/None,则 Dataframe 也将创建为无值。

示例 2:使用嵌套的 StructType 定义 Dataframe 架构。

Python

# importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, LongType, StringType, FloatType
 
# function to create SparkSession
def create_session():
    spk = SparkSession.builder \
        .master("local") \
        .appName("Product_mart.com") \
        .getOrCreate()
    return spk
 
# function to create dataframe
def create_df(spark, data, schema):
    df1 = spark.createDataFrame(data, schema)
    return df1
 
 
if __name__ == "__main__":
 
    # calling function to create SparkSession
    spark = create_session()
 
    input_data = [(("Refrigerator", 112345), 4.0, 12499),
                  (("LED TV", 114567), 4.2, 49999),
                  (("Washing Machine", 113465), 3.9, 69999),
                  (("T-shirt", 124378), 4.1, 1999),
                  (("Jeans", 126754), 3.7, 3999),
                  (("Running Shoes", 134565), 4.7, 1499),
                  (("Face Mask", 145234), 4.6, 999)]
 
    # defining schema for the dataframe using
    # nested StructType
    schm = StructType([
        StructField('Product', StructType([
            StructField('Product Name', StringType(), True),
            StructField('Product ID', LongType(), True),
        ])),
       
        StructField('Rating', FloatType(), True),
        StructField('Price', IntegerType(), True)])
 
    # calling function to create dataframe
    df = create_df(spark, input_data, schm)
 
    # visualizing dataframe and it's schema
    df.printSchema()
    df.show(truncate=False)

输出:



示例 3:使用 PySpark 列类更改数据框的结构并添加新列。

Python

# importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import col, struct, when
from pyspark.sql.types import StructType, StructField, IntegerType, LongType, StringType, FloatType
 
# function to create SparkSession
def create_session():
    spk = SparkSession.builder \
        .master("local") \
        .appName("Product_mart.com") \
        .getOrCreate()
    return spk
 
# function to create dataframe
def create_df(spark, data, schema):
    df1 = spark.createDataFrame(data, schema)
    return df1
 
 
if __name__ == "__main__":
 
    # calling function to create SparkSession
    spark = create_session()
 
    input_data = [("Refrigerator", 112345, 4.0, 12499),
                  ("LED TV", 114567, 4.2, 49999),
                  ("Washing Machine", 113465, 3.9, 69999),
                  ("T-shirt", 124378, 4.1, 1999),
                  ("Jeans", 126754, 3.7, 3999),
                  ("Running Shoes", 134565, 4.7, 1499),
                  ("Face Mask", 145234, 4.6, 999)]
 
    # defining schema for the dataframe using
    # nested StructType
    schm = StructType([
        StructField("Product Name", StringType(), True),
        StructField("Product ID", LongType(), True),
        StructField("Rating", FloatType(), True),
        StructField("Product Price", IntegerType(), True)])
 
    # calling function to create dataframe
    df = create_df(spark, input_data, schm)
 
    # copying the columns to the new struct
    # Product
    new_df = df.withColumn("Product",
                           struct(col("Product Name").alias("Name"),
                                  col("Product ID").alias("ID"),
                                  col("Rating").alias("Rating"),
                                  col("Product Price").alias("Price")))
 
    # adding new column according to the given
    # condition
    new_df = new_df.withColumn("Product Range",
                               when(col("Product Price").cast(
                                   IntegerType()) < 1000, "Low")
                               .when(col("Product Price").cast(IntegerType()
                                                              ) < 7000, "Medium")
                               .otherwise("High"))
 
    # dropping the columns as all column values
    # are copied in Product column
    new_df = new_df.drop("Product Name", "Product ID",
                         "Rating", "Product Price")
 
    # visualizing dataframe and it's schema
    new_df.printSchema()
    new_df.show(truncate=False)

输出:

  • 在上面的示例中,我们使用 struct()函数更改 Dataframe 的结构,并将列复制到新的 struct 'Product' 中,并使用 withColumn()函数创建 Product 列。
  • “产品名称”、“产品 ID”、“评级”、“产品价格”复制到新的结构“产品”后。
  • 我们正在使用 withColumn()函数添加新列“价格范围”,根据给定的条件分为三类,即低、中和高。如果“产品价格”小于 1000,则该产品属于低类别,如果“产品价格”小于 7000,则该产品属于中类别,否则该产品属于高类别。
  • 创建新的结构“产品”并添加新列“价格范围”后,我们必须使用 drop()函数删除“产品名称”、“产品 ID”、“评级”、“产品价格”列。然后使用更改后的 Dataframe 结构和添加的列打印模式。

示例 4:使用JSON 格式和 StructType()定义数据帧架构

Python

# importing necessary libraries
from pyspark.sql import SparkSession
import pyspark.sql.types as T
import json
 
# function to create SparkSession
def create_session():
    spk = SparkSession.builder \
        .master("local") \
        .appName("Product_mart.com") \
        .getOrCreate()
    return spk
 
# function to create dataframe
def create_df(spark, data, schema):
    df1 = spark.createDataFrame(data, schema)
    return df1
 
 
if __name__ == "__main__":
 
    # calling function to create SparkSession
    spark = create_session()
 
    input_data = [("Refrigerator", 4.0),
                  ("LED TV", 4.2),
                  ("Washing Machine", 3.9),
                  ("T-shirt", 4.1)
                  ]
 
    # defining schema for the dataframe with
    # StructType and StructField
    schm = T.StructType([
        T.StructField("Product Name", T.StringType(), True),
        T.StructField("Rating", T.FloatType(), True)
    ])
 
    # calling function to create dataframe
    df = create_df(spark, input_data, schm)
 
    # visualizing dataframe and it's schema
    print("Original Dataframe:-")
    df.printSchema()
    df.show()
 
    print("-------------------------------------------")
    print("Schema in json format:-")
 
    # storing schema in json format using
    # schema.json() function
    schma = df.schema.json()
    print(schma)
 
    # loading the json format schema
    schm1 = StructType.fromJson(json.loads(schma))
 
    # creating dataframe using json format schema
    json_df = spark.createDataFrame(
        spark.sparkContext.parallelize(input_data), schm1)
    print("-------------------------------------------")
    print("Dataframe using json schema:-")
     
    # showing the created dataframe from json format
    # schema printing the schema of created dataframe
    json_df.printSchema()
    json_df.show()

输出:

注意:您也可以将JSON格式存储在文件中并使用该文件来定义模式,代码也与上面相同,只是您需要在loads()函数传递JSON文件,在上面的例子中, JSON 格式的架构存储在一个变量中,我们使用该变量来定义架构。

示例 5:使用 StructType() 和 ArrayType() 和 MapType() 定义数据帧架构。

Python

# importing necessary libraries
from pyspark.sql import SparkSession
import pyspark.sql.types as T
 
# function to create SparkSession
def create_session():
    spk = SparkSession.builder \
        .master("local") \
        .appName("Product_mart.com") \
        .getOrCreate()
    return spk
 
# function to create dataframe
def create_df(spark, data, schema):
    df1 = spark.createDataFrame(data, schema)
    return df1
 
 
if __name__ == "__main__":
 
    # calling function to create SparkSession
    spark = create_session()
 
    # Data containing the Array and Map- key,value pair
    input_data = [
        ("Alex", 'Buttler', ["Mathematics", "Computer Science"],
         {"Mathematics": 'Physics', "Chemistry": "Biology"}),
        ("Sam", "Samson", ["Chemistry, Biology"],
         {"Chemistry": 'Physics', "Mathematics": "Biology"}),
        ("Rossi", "Bryant", ["English", "Geography"],
         {"History": 'Geography', "Chemistry": "Biology"}),
        ("Sidz", "Murraz", ["History", "Environmental Science"],
         {"English": 'Environmental Science', "Chemistry": "Mathematics"}),
        ("Robert", "Cox", ["Physics", "English"],
         {"Computer Science": 'Environmental Science', "Chemistry": "Geography"})
    ]
 
    # defining schema with ArrayType and MapType()
    # using StructType() and StructField()
    array_schm = StructType([
        StructField('Firstname', StringType(), True),
        StructField('Lastname', StringType(), True),
        StructField('Subject', ArrayType(StringType()), True),
        StructField('Subject Combinations', MapType(
            StringType(), StringType()), True)
    ])
 
    # calling function for creating the dataframe
    df = create_df(spark, input_data, array_schm)
 
    # printing schema of df and showing dataframe
    df.printSchema()
    df.show(truncate=False)

输出: