自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

PySpark 數(shù)據(jù)類型定義 StructType & StructField

大數(shù)據(jù) 數(shù)據(jù)分析
在本文中,云朵君和大家一起學(xué)習(xí)了 SQL StructType、StructField 的用法,以及如何在運(yùn)行時(shí)更改 Pyspark DataFrame 的結(jié)構(gòu),將案例類轉(zhuǎn)換為模式以及使用 ArrayType、MapType。

PySpark StructType 和 StructField 類用于以編程方式指定 DataFrame 的schema并創(chuàng)建復(fù)雜的列,如嵌套結(jié)構(gòu)、數(shù)組和映射列。StructType是StructField的集合,它定義了列名、列數(shù)據(jù)類型、布爾值以指定字段是否可以為空以及元數(shù)據(jù)。

目錄

  • StructType--定義Dataframe的結(jié)構(gòu)
  • StructField--定義DataFrame列的元數(shù)據(jù)
  • 將 PySpark StructType & StructField 與 DataFrame 一起使用
  • 定義嵌套的StructType對(duì)象結(jié)構(gòu)
  • 添加和更改 DataFrame 結(jié)構(gòu)
  • 使用 SQL ArrayType 和 MapType
  • 從 JSON 文件創(chuàng)建 StructType 對(duì)象結(jié)構(gòu)
  • 從 DDL 字符串創(chuàng)建 StructType 對(duì)象結(jié)構(gòu)
  • 檢查 DataFrame 中是否存在列
  • PySpark StructType & StructField 完整示例

StructType--定義Dataframe的結(jié)構(gòu)

PySpark 提供從pyspark.sql.types import StructType類來定義 DataFrame 的結(jié)構(gòu)。其中,StructType 是 StructField 對(duì)象的集合或列表。

DataFrame 上的 PySpark printSchema()方法將 StructType 列顯示為struct。

DataFrame.printSchema()

StructField--定義DataFrame列的元數(shù)據(jù)

PySpark 提供pyspark.sql.types import StructField類來定義列,包括列名(String)、列類型(DataType)、可空列(Boolean)和元數(shù)據(jù)(MetaData)。

將 PySpark StructType & StructField 與 DataFrame 一起使用

在創(chuàng)建 PySpark DataFrame 時(shí),我們可以使用 StructType 和 StructField 類指定結(jié)構(gòu)。StructType 是 StructField 的集合,用于定義列名、數(shù)據(jù)類型和是否可為空的標(biāo)志。使用 StructField 我們還可以添加嵌套結(jié)構(gòu)模式、用于數(shù)組的 ArrayType 和用于鍵值對(duì)的 MapType ,我們將在后面的部分中詳細(xì)討論。

下面的示例演示了一個(gè)非常簡(jiǎn)單的示例,說明如何在 DataFrame 上創(chuàng)建 StructType 和 StructField 以及它與示例數(shù)據(jù)一起使用來支持它。

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType

spark = SparkSession.builder.master("local[1]") \
                    .appName('SparkByExamples.com') \
                    .getOrCreate()

data = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
 
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show(truncate=False)

通過運(yùn)行上面的代碼片段,它會(huì)顯示在下面的輸出中。

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|id   |gender|salary|
+---------+----------+--------+-----+------+------+
|James    |          |Smith   |36636|M     |3000  |
|Michael  |Rose      |        |40288|M     |4000  |
|Robert   |          |Williams|42114|M     |4000  |
|Maria    |Anne      |Jones   |39192|F     |4000  |
|Jen      |Mary      |Brown   |     |F     |-1    |
+---------+----------+--------+-----+------+------+

定義嵌套的StructType對(duì)象結(jié)構(gòu)

在處理 DataFrame 時(shí),我們經(jīng)常需要使用嵌套的結(jié)構(gòu)列,這可以使用 StructType 來定義。

在下面的示例列中,“name” 數(shù)據(jù)類型是嵌套的 StructType。

structureData = [
    (("James","","Smith"),"36636","M",3100),
    (("Michael","Rose",""),"40288","M",4300),
    (("Robert","","Williams"),"42114","M",1400),
    (("Maria","Anne","Jones"),"39192","F",5500),
    (("Jen","Mary","Brown"),"","F",-1)
  ]
structureSchema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('id', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])

df2 = spark.createDataFrame(data=structureData,
                            schema=structureSchema)
df2.printSchema()
df2.show(truncate=False)

模式和 DataFrame 下方的輸出。

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+--------------------+-----+------+------+
|name                |id   |gender|salary|
+--------------------+-----+------+------+
|[James, , Smith]    |36636|M     |3100  |
|[Michael, Rose, ]   |40288|M     |4300  |
|[Robert, , Williams]|42114|M     |1400  |
|[Maria, Anne, Jones]|39192|F     |5500  |
|[Jen, Mary, Brown]  |     |F     |-1    |
+--------------------+-----+------+------+

添加和更改 DataFrame 結(jié)構(gòu)

使用 PySpark SQL 函數(shù) struct(),我們可以更改現(xiàn)有 DataFrame 的結(jié)構(gòu)并向其添加新的 StructType。下面學(xué)習(xí)如何將列從一個(gè)結(jié)構(gòu)復(fù)制到另一個(gè)結(jié)構(gòu)并添加新列。PySpark Column 類還提供了一些函數(shù)來處理 StructType 列。

from pyspark.sql.functions import col,struct,when
updatedDF = df2.withColumn("OtherInfo", 
    struct(col("id").alias("identifier"),
    col("gender").alias("gender"),
    col("salary").alias("salary"),
    when(col("salary").cast(IntegerType()) < 2000,"Low")
      .when(col("salary").cast(IntegerType()) < 4000,"Medium")
      .otherwise("High").alias("Salary_Grade")
  )).drop("id","gender","salary")

updatedDF.printSchema()
updatedDF.show(truncate=False)

在這里,它將 gender,salary 和 id 復(fù)制到新結(jié)構(gòu) otherInfo,并添加一個(gè)新列 Salary_Grade。

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- OtherInfo: struct (nullable = false)
 |    |-- identifier: string (nullable = true)
 |    |-- gender: string (nullable = true)
 |    |-- salary: integer (nullable = true)
 |    |-- Salary_Grade: string (nullable = false)

使用 SQL ArrayType 和 MapType

SQL StructType 還支持 ArrayType 和 MapType 來分別為數(shù)組和地圖集合定義 DataFrame 列。在下面的示例中,列hobbies定義為 ArrayType(StringType) ,列properties定義為 MapType(StringType, StringType),表示鍵和值都為字符串。

arrayStructureSchema = StructType([
    StructField('name', StructType([
       StructField('firstname', StringType(), True),
       StructField('middlename', StringType(), True),
       StructField('lastname', StringType(), True)
       ])),
       StructField('hobbies', ArrayType(StringType()), True),
       StructField('properties', MapType(StringType(),StringType()), True)
    ])

輸出以下模式。注意字段 Hobbies 是 array類型,properties是 map類型。

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- hobbies: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

從 JSON 文件創(chuàng)建 StructType 對(duì)象結(jié)構(gòu)

如果有太多列并且 DataFrame 的結(jié)構(gòu)不時(shí)發(fā)生變化,一個(gè)很好的做法是從 JSON 文件加載 SQL StructType schema??梢允褂?nbsp;df2.schema.json() 獲取 schema 并將其存儲(chǔ)在文件中,然后使用它從該文件創(chuàng)建 schema。

print(df2.schema.json())
{
  "type" : "struct",
  "fields" : [ {
    "name" : "name",
    "type" : {
      "type" : "struct",
      "fields" : [ {
        "name" : "firstname",
        "type" : "string",
        "nullable" : true,
        "metadata" : { }
      }, {
        "name" : "middlename",
        "type" : "string",
        "nullable" : true,
        "metadata" : { }
      }, {
        "name" : "lastname",
        "type" : "string",
        "nullable" : true,
        "metadata" : { }
      } ]
    },
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "dob",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "gender",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "salary",
    "type" : "integer",
    "nullable" : true,
    "metadata" : { }
  } ]
}

或者也可以使用 df.schema.simpleString()返回一個(gè)相對(duì)簡(jiǎn)單的schema 格式。

現(xiàn)在讓我們加載 json 文件并使用它來創(chuàng)建一個(gè) DataFrame。

import json
schemaFromJson = StructType.fromJson(json.loads(schema.json))
df3 = spark.createDataFrame(
        spark.sparkContext.parallelize(structureData),
        schemaFromJson)
df3.printSchema()

這將打印與上一節(jié)相同的輸出。還可以在逗號(hào)分隔的文件中為可為空的文件提供名稱、類型和標(biāo)志,我們可以使用這些以編程方式創(chuàng)建 StructType。

從 DDL 字符串創(chuàng)建 StructType 對(duì)象結(jié)構(gòu)

就像從 JSON 字符串中加載結(jié)構(gòu)一樣,我們也可以從 DLL 中創(chuàng)建結(jié)構(gòu)(通過使用SQL StructType 類 StructType.fromDDL 上的 fromDDL()靜態(tài)函數(shù))。還可以使用 toDDL() 從模式生成 DDL。結(jié)構(gòu)對(duì)象上的 printTreeString() 打印模式,類似于 printSchema() 函數(shù)返回的結(jié)果。

ddlSchemaStr = "`fullName` STRUCT<`first`: STRING, `last`: STRING,
 `middle`: STRING>,`age` INT,`gender` STRING"
  ddlSchema = StructType.fromDDL(ddlSchemaStr)
  ddlSchema.printTreeString()

檢查 DataFrame 中是否存在列

如果要對(duì)DataFrame的元數(shù)據(jù)進(jìn)行一些檢查,例如,DataFrame中是否存在列或字段或列的數(shù)據(jù)類型;我們可以使用 SQL StructType 和 StructField 上的幾個(gè)函數(shù)輕松地做到這一點(diǎn)。

print(df.schema.fieldNames.contains("firstname"))
print(df.schema.contains(
        StructField("firstname", StringType,true)))

此示例在兩種情況下都返回True。對(duì)于第二個(gè),如果是 IntegerType 而不是 StringType,它會(huì)返回 False,因?yàn)槊至械臄?shù)據(jù)類型是 String,因?yàn)樗鼤?huì)檢查字段中的每個(gè)屬性。同樣,還可以檢查兩個(gè)模式是否相等或更多。

PySpark StructType & StructField 完整示例

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,ArrayType,MapType
from pyspark.sql.functions import col,struct,when

spark = SparkSession.builder.master("local[1]") \
                    .appName('SparkByExamples.com') \
                    .getOrCreate()

data = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = StructType([ 
    StructField("firstname",StringType(),True), 
    StructField("middlename",StringType(),True), 
    StructField("lastname",StringType(),True), 
    StructField("id", StringType(), True), 
    StructField("gender", StringType(), True), 
    StructField("salary", IntegerType(), True) 
  ])
 
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show(truncate=False)

structureData = [
    (("James","","Smith"),"36636","M",3100),
    (("Michael","Rose",""),"40288","M",4300),
    (("Robert","","Williams"),"42114","M",1400),
    (("Maria","Anne","Jones"),"39192","F",5500),
    (("Jen","Mary","Brown"),"","F",-1)
  ]
structureSchema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('id', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])

df2 = spark.createDataFrame(data=structureData,schema=structureSchema)
df2.printSchema()
df2.show(truncate=False)


updatedDF = df2.withColumn("OtherInfo", 
    struct(col("id").alias("identifier"),
    col("gender").alias("gender"),
    col("salary").alias("salary"),
    when(col("salary").cast(IntegerType()) < 2000,"Low")
      .when(col("salary").cast(IntegerType()) < 4000,"Medium")
      .otherwise("High").alias("Salary_Grade")
  )).drop("id","gender","salary")

updatedDF.printSchema()
updatedDF.show(truncate=False)


""" Array & Map"""


arrayStructureSchema = StructType([
    StructField('name', StructType([
       StructField('firstname', StringType(), True),
       StructField('middlename', StringType(), True),
       StructField('lastname', StringType(), True)
       ])),
       StructField('hobbies', ArrayType(StringType()), True),
       StructField('properties', MapType(StringType(),StringType()), True)
    ])

寫在最后

在本文中,云朵君和大家一起學(xué)習(xí)了 SQL StructType、StructField 的用法,以及如何在運(yùn)行時(shí)更改 Pyspark DataFrame 的結(jié)構(gòu),將案例類轉(zhuǎn)換為模式以及使用 ArrayType、MapType。

責(zé)任編輯:武曉燕 來源: 數(shù)據(jù)STUDIO
相關(guān)推薦

2010-01-25 10:41:59

C++數(shù)據(jù)類型

2009-09-11 12:00:33

C#預(yù)定義數(shù)據(jù)類型

2011-04-20 16:14:09

LinuxWindows

2019-08-12 11:40:48

數(shù)據(jù)庫SQLite3數(shù)據(jù)類型

2016-08-18 14:13:55

JavaScript基本數(shù)據(jù)引用數(shù)據(jù)

2023-11-14 10:05:52

Java開發(fā)工具

2022-06-20 08:26:39

Spring容器類型轉(zhuǎn)換

2014-01-05 17:08:09

PostgreSQL數(shù)據(jù)類型

2010-07-22 17:57:40

2010-01-20 09:54:27

C++數(shù)據(jù)類型

2010-01-19 13:01:32

C++數(shù)據(jù)類型

2010-08-10 17:17:59

2010-10-15 13:28:34

MySql數(shù)據(jù)類型

2017-07-10 13:38:07

MySQL數(shù)據(jù)類型整數(shù)類型

2013-07-30 14:48:58

.NET數(shù)據(jù)類型

2013-07-30 14:00:46

.NET數(shù)據(jù)類型

2010-08-11 09:14:33

DB2數(shù)據(jù)類型

2009-11-10 11:04:09

VB.NET數(shù)據(jù)類型

2009-11-17 10:01:11

Oracle數(shù)據(jù)類型

2024-03-14 11:54:37

C++數(shù)據(jù)類型
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)