У меня есть JSON, который я читаю из темы kafka, используя потоковую передачу искры
{"COUNTRY_REGION": "United States", "GROCERY_AND_PHARMACY_CHANGE_PERC": "-7", "PARKS_CHANGE_PERC": "\\\\N", "LAST_UPDATE_DATE": "05:31.7"}
Я понимаю, что сначала нам нужно создать схему, которую я сделал здесь, и проанализировать входной json, который мы получили от Kafka, то есть поле значения с помощью функции from_json.
schema = StructType([
StructField("COUNTRY_REGION",StringType(),True),
StructField("PROVINCE_STATE",StringType(),True),
StructField("ISO_3166_1",StringType(),True),
StructField("ISO_3166_2", StringType(), True),
StructField("DATE", DateType(), True),
StructField("GROCERY_AND_PHARMACY_CHANGE_PERC", IntegerType(), True),
StructField("PARKS_CHANGE_PERC", IntegerType(), True),
StructField("RESIDENTIAL_CHANGE_PERC", IntegerType(), True),
StructField("RETAIL_AND_RECREATION_CHANGE_PERC", IntegerType(), True),
StructField("TRANSIT_STATIONS_CHANGE_PERC", IntegerType(), True),
StructField("WORKPLACES_CHANGE_PERC", IntegerType(), True),
StructField("LAST_UPDATE_DATE", DateType(), True),
StructField("LAST_REPORTED_FLAG", BooleanType(), True),
StructField("SUB_REGION_2", StringType(), True),
])
json_edit = df.select (from_json("value",schema).alias("json"))
Однако я понимаю, что GROCERY_AND_PHARMACY_CHANGE_PERC
, PARKS_CHANGE_PERC
и LAST_UPDATE_DATE
становятся нулевыми.
display(json_edit)
{"COUNTRY_REGION": "United States", "GROCERY_AND_PHARMACY_CHANGE_PERC": null, "PARKS_CHANGE_PERC": null, "LAST_UPDATE_DATE": null}
Я понял, что это из-за исходного JSON, например "GROCERY_AND_PHARMACY_CHANGE_PERC": "-7"
, хотя должно быть "GROCERY_AND_PHARMACY_CHANGE_PERC": -7
.
Есть ли какой-либо способ преобразовать строку в тип double/int, прежде чем я буду анализировать ее в схеме?
Спасибо!
Я продолжаю получать запросы с источниками потоковой передачи, которые должны выполняться с помощью writeStream.start(); когда я пытаюсь запустить df2 = df2.select("json.*") \
Мне удалось сделать это так: json_edit = json_edit.select("json.*") \ .withColumn("GROCERY_AND_PHARMACY_CHANGE_PERC", F.col("GROCERY_AND_PHARMACY_CHANGE_PERC").cast(IntegerType())) \ .withColumn("PARKS_CHANGE_PERC ", F.col("PARKS_CHANGE_PERC").cast(IntegerType())) \ .withColumn("DATE", F.to_date("DATE","yyyy-MM-dd")) \ .withColumn("LAST_UPDATE_DATE" , F.to_timestamp("LAST_UPDATE_DATE", "HH:mm.s")) \ .withColumn('json', F.struct([F.col(col) for col in json_edit.select('json. ').columns]))\ json_edit = json_edit.drop('json')
Кажется, я не могу привязать дату к дате. .withColumn ("DATE", F.to_date ("DATE", "yyyy-MM-dd")) \ продолжает показывать значение null. Есть идеи?
@ Адам, строка формата в
to_date
, вероятно, не соответствует вашим данным. Можете ли вы проверить, соответствуют ли ваши данные шаблону datetimeyyyy-MM-dd
?