Как обрабатывать новые строки при загрузке CSV в Apache Beam?

avatar
Ryan Tom
1 июля 2021 в 18:28
151
1
0

Я столкнулся с проблемой, когда в некоторых из моих полей в тексте появляются новые строки. Мой текущий код выглядит следующим образом:

# Python's regular expression library
import re
import sys

# Beam and interactive Beam imports
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib


p = beam.Pipeline(InteractiveRunner())

def print_row(element):
    print(element)

def parse_file(element):
    for line in csv.reader([element], quotechar='"', delimiter=',', lineterminator='\n', quoting=csv.QUOTE_ALL, skipinitialspace=True):
        return line

parsed_csv = p | 'Read input file' >> beam.io.ReadFromText("gs://ny-springml-data/AB_NYC_2019.csv")| 'Parse file' >> beam.Map(parse_file) 

split = parsed_csv | beam.Map(lambda x: x[0]) | beam.Map(print)

p.run()

У меня проблемы, потому что часть текста выглядит так:

The BLUE OWL:
VEGETARIAN WBURG W PATIO & BACKYARD!

Есть мысли, как действовать дальше?

Источник

Ответы (1)

avatar
robertwb
1 июля 2021 в 23:04
1

ReadFromText считывает входные данные по одной строке за раз. Как было предложено ранее, вы можете использовать Dataframe read_csv или создать PCollection путей и открыть/прочитать их в DoFn.

Например, вы можете написать

def read_csv_file(file_metadata):
  with beam.io.filesystems.FileSystems.open(file_metadata.path) as fin:
    for row in csv.reader(fin):
        yield row

rows = (
    p
    | beam.io.fileio.MatchFiles('/pattern/to/files/*.csv')  # emits FileMetadatas
    | beam.FlatMap(read_csv_file))                          # emits rows
Ryan Tom
2 июля 2021 в 05:16
0

Спасибо за ответ. Я пытаюсь сделать это без использования кадров данных луча, хотя они кажутся очень полезными. Можете ли вы уточнить, что вы подразумеваете под созданием PColl путей и открытием/чтением их в DoFn?