Преобразование типов данных в JSON из Kafka Spark Streaming

avatar
Adam
8 августа 2021 в 18:34
93
2
1

У меня есть JSON, который я читаю из темы kafka, используя потоковую передачу искры

{"COUNTRY_REGION": "United States",  "GROCERY_AND_PHARMACY_CHANGE_PERC": "-7", "PARKS_CHANGE_PERC": "\\\\N",  "LAST_UPDATE_DATE": "05:31.7"}

Я понимаю, что сначала нам нужно создать схему, которую я сделал здесь, и проанализировать входной json, который мы получили от Kafka, то есть поле значения с помощью функции from_json.

schema = StructType([ 
    StructField("COUNTRY_REGION",StringType(),True), 
    StructField("PROVINCE_STATE",StringType(),True),
    StructField("ISO_3166_1",StringType(),True), 
    StructField("ISO_3166_2", StringType(), True), 
    StructField("DATE", DateType(), True), 
    StructField("GROCERY_AND_PHARMACY_CHANGE_PERC", IntegerType(), True),
    StructField("PARKS_CHANGE_PERC", IntegerType(), True), 
    StructField("RESIDENTIAL_CHANGE_PERC", IntegerType(), True), 
    StructField("RETAIL_AND_RECREATION_CHANGE_PERC", IntegerType(), True),                    
    StructField("TRANSIT_STATIONS_CHANGE_PERC", IntegerType(), True),  
    StructField("WORKPLACES_CHANGE_PERC", IntegerType(), True), 
    StructField("LAST_UPDATE_DATE", DateType(), True),
    StructField("LAST_REPORTED_FLAG", BooleanType(), True),
    StructField("SUB_REGION_2", StringType(), True),
  ])

json_edit = df.select (from_json("value",schema).alias("json"))

Однако я понимаю, что GROCERY_AND_PHARMACY_CHANGE_PERC, PARKS_CHANGE_PERC и LAST_UPDATE_DATE становятся нулевыми.

display(json_edit)

{"COUNTRY_REGION": "United States",  "GROCERY_AND_PHARMACY_CHANGE_PERC": null, "PARKS_CHANGE_PERC": null, "LAST_UPDATE_DATE": null}

Я понял, что это из-за исходного JSON, например "GROCERY_AND_PHARMACY_CHANGE_PERC": "-7", хотя должно быть "GROCERY_AND_PHARMACY_CHANGE_PERC": -7.

Есть ли какой-либо способ преобразовать строку в тип double/int, прежде чем я буду анализировать ее в схеме?

Спасибо!

Источник

Ответы (2)

avatar
werner
8 августа 2021 в 21:09
0

Вы можете изменить тип трех столбцов на StringType в schema, проанализировать json и затем обработать три столбца по отдельности позже:

df=...

schema = StructType([ 
    StructField("COUNTRY_REGION",StringType(),True), 
    StructField("PROVINCE_STATE",StringType(),True),
    StructField("ISO_3166_1",StringType(),True), 
    StructField("ISO_3166_2", StringType(), True), 
    StructField("DATE", DateType(), True), 
    StructField("GROCERY_AND_PHARMACY_CHANGE_PERC", StringType(), True), #using StringType
    StructField("PARKS_CHANGE_PERC", StringType(), True), #using StringType
    StructField("RESIDENTIAL_CHANGE_PERC", IntegerType(), True), 
    StructField("RETAIL_AND_RECREATION_CHANGE_PERC", IntegerType(), True),                    
    StructField("TRANSIT_STATIONS_CHANGE_PERC", IntegerType(), True),  
    StructField("WORKPLACES_CHANGE_PERC", IntegerType(), True), 
    StructField("LAST_UPDATE_DATE", StringType(), True), #using StringType
    StructField("LAST_REPORTED_FLAG", BooleanType(), True),
    StructField("SUB_REGION_2", StringType(), True),
  ])
df2=df.select (from_json("value",schema).alias("json"))

После синтаксического анализа строки json преобразуйте структуру в отдельные столбцы верхнего уровня (select("json.*")), обработайте три столбца с помощью withColumn, а затем при необходимости переупакуйте структуру, используя этот ответ:

from pyspark.sql import functions as F

df2.select("json.*") \
    .withColumn("GROCERY_AND_PHARMACY_CHANGE_PERC", 
        F.col("GROCERY_AND_PHARMACY_CHANGE_PERC").cast(IntegerType())) \
    .withColumn("PARKS_CHANGE_PERC", 
        F.col("PARKS_CHANGE_PERC").cast(IntegerType())) \
    .withColumn("LAST_UPDATE_DATE", 
        F.to_timestamp("LAST_UPDATE_DATE", "HH:mm.s")) \
    .withColumn('json', F.struct(*[F.col(col) for col in df2.select('json.*').columns])) \
    .drop(*df2.select('json.*').columns) \
    .show(truncate=False)

Примечание: в примере данных для столбца LAST_UPDATE_DATE дана строка "05:31.7". В приведенном выше коде предполагается, что это временная метка в формате HH:mm.s. Поскольку дата отсутствует, в этом примере результатом будет 1970-01-01 05:31:07. Это можно исправить, используя другой формат даты в to_timestamp.

Adam
9 августа 2021 в 03:49
0

Я продолжаю получать запросы с источниками потоковой передачи, которые должны выполняться с помощью writeStream.start(); когда я пытаюсь запустить df2 = df2.select("json.*") \

Adam
9 августа 2021 в 06:27
1

Мне удалось сделать это так: json_edit = json_edit.select("json.*") \ .withColumn("GROCERY_AND_PHARMACY_CHANGE_PERC", F.col("GROCERY_AND_PHARMACY_CHANGE_PERC").cast(IntegerType())) \ .withColumn("PARKS_CHANGE_PERC ", F.col("PARKS_CHANGE_PERC").cast(IntegerType())) \ .withColumn("DATE", F.to_date("DATE","yyyy-MM-dd")) \ .withColumn("LAST_UPDATE_DATE" , F.to_timestamp("LAST_UPDATE_DATE", "HH:mm.s")) \ .withColumn('json', F.struct([F.col(col) for col in json_edit.select('json. ').columns]))\ json_edit = json_edit.drop('json')

Adam
9 августа 2021 в 06:28
0

Кажется, я не могу привязать дату к дате. .withColumn ("DATE", F.to_date ("DATE", "yyyy-MM-dd")) \ продолжает показывать значение null. Есть идеи?

werner
11 августа 2021 в 15:54
0

@ Адам, строка формата в to_date, вероятно, не соответствует вашим данным. Можете ли вы проверить, соответствуют ли ваши данные шаблону datetime yyyy-MM-dd?

avatar
Mohana B C
8 августа 2021 в 20:40
0

Если вы не уверены в схеме входящих данных, лучше не указывать ее в первую очередь. Что ж, мы можем справиться с такой ситуацией, если будем использовать реестр схемы с форматом данных avro и интегрировать spark с kafka.

Поскольку здесь используется формат JSON, мы можем динамически извлечь схему с помощью функции schema_of_json и передать этот вывод в from_json.

Вот код, использующий scala, надеюсь, вы сможете изменить его на python по мере необходимости.

  val spark = SparkSession.builder().master("local[*]").getOrCreate()
  import spark.implicits._
  spark.sparkContext.setLogLevel("ERROR")

  def dfOps(ds: DataFrame, n: Long) =
    ds.select(from_json('value,
      schema_of_json(ds.select('value).first().getString(0))).as("json"))
      .select("json.*").show()

  spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "mytopic")
    .option("startingOffsets", "earliest")
    .load()
    .select('value.cast("string"))
    .writeStream
    .foreachBatch(dfOps _)
    .start()
    .awaitTermination()