Как настроить ReadablerStreamReader для обработки массива JSON

avatar
icelemon
9 августа 2021 в 06:25
316
1
1

У меня есть user.json (предположим, что это будет большой файл, я хочу, чтобы поток читал этот файл, но ограничьте размер фрагмента).

[
  {
    "name": "John Doe",
    "occupation": "gardener",
    "born": "1992-03-02"
  },
  {
    "name": "Brian Flemming",
    "occupation": "teacher",
    "born": "1967-11-22"
  },
  {
    "name": "Lucy Black",
    "occupation": "accountant",
    "born": "1995-04-07"
  },
  {
    "name": "William Bean",
    "occupation": "pilot",
    "born": "1977-10-31"
  }
]

Мой пример кода.

const fs = require('fs');
const stream = require('stream');

async function logChunks(readable) {
  for await (const chunk of readable) {
    console.log('---------start')
    console.log(chunk.toString());
    console.log('---------end')
  }
}

const readStream = fs.createReadStream('users.json', {highWaterMark: 120 })
logChunks(readStream)

Вывод выглядит следующим образом:

---------start
[
  {
    "name": "John Doe",
    "occupation": "gardener",
    "born": "1992-03-02"
  }
  ,
  {
    "name": "Brian Flem
---------end
---------start
ming",
    "occupation": "teacher",
    "born": "1967-11-22"
  }
  ,
  {
    "name": "Lucy Black",
    "occupation": "ac
---------end
---------start
countant",
    "born": "1995-04-07"
  }
  ,
  {
    "name": "William Bean",
    "occupation": "pilot",
    "born": "1977
---------end
---------start
-10-31"
  }
]

---------end

Моя цель — извлечь объект json из нескольких фрагментов, чтобы он мог быть JSON.parse.

Я не нашел никакого JSONStreamParse для node.js, поэтому я надеюсь, что смогу почерпнуть здесь некоторые экспертные идеи. Спасибо


Обновление:

У меня есть один вариант решения: использовать стороннее решение. поток-json

await util.promisify(stream.pipeline)(
    readStream,
    StreamArray.withParser(),
    async function( parsedArrayEntriesIterable ){
      for await (const {key: arrIndex, value: arrElem} of parsedArrayEntriesIterable) {
        console.log("Parsed array element:", arrElem);
      }
    }
  )
Источник
waterloos
12 августа 2021 в 16:58
0

Репост этого комментария, так как я прокомментировал неправильный вопрос. Есть ли какая-то особая причина не сохранять все куски в буфере и не анализировать всю строку json в конце? Я могу легко показать вам этот ответ, иначе нам придется написать собственный синтаксический анализатор, чтобы разделить неполные строки json на две, такие как действительная часть и неполная часть. Ожидание всей строки json — не такая уж плохая идея, поскольку пользователь не блокируется на протяжении всего процесса чтения. Основной поток цикла событий JavaScript получает контроль над каждой итерацией цикла, поскольку каждая итерация является асинхронной.

Marc
13 августа 2021 в 10:38
0

Меня также интересует решение, в настоящее время у меня нет варианта использования, но мне любопытно, как этот празер будет работать. (и как расширить его для работы с массивами/вложенными объектами)

waterloos
15 августа 2021 в 08:54
0

@Summer Благодаря вашему обновлению я понял, что с опубликованной вами библиотекой есть лучшее решение. Вы также можете использовать эту библиотеку для другого вопроса coderhelper.com/questions/68705813/…. Я тоже обновлю этот ответ, когда у меня будет время.

Ответы (1)

avatar
waterloos
15 августа 2021 в 08:50
0

Я прочитал ваше обновление по вашему вопросу и понял, что комментарий, который я оставил по вашему вопросу, был совершенно не по делу. Поскольку вы используете поток, вы не хотите ждать всех данных, чтобы избежать исчерпания памяти. Я должен был заметить это в самом начале.

Позвольте мне привести несколько примеров для моих извинений. Надеюсь, это поможет понять, как использовать потоки.

Чтобы сделать примеры более реалистичными, давайте смоделируем получение json с удаленного сервера, как это делает node-fetch. node-fetch возвращает экземпляр ReadableStream, который также является asyncIterable. Мы можем легко создать его, передав функцию асинхронного генератора stream.Readable.from(), как показано ниже.

Определение fetch()

async function* asyncGenerator (chunks) {
  let counter = 1;
  for (const chunk of chunks) {
    await new Promise(resolve => setTimeout(resolve, 1000));
    console.log(`==== chunk ${counter++} transmitted =====================`);
    yield chunk;
  }
}

const stream = require('stream');

// simulates node-fetch
async function fetch (json) {
  const asyncIterable = asyncGenerator(json);
  // let the client wait for 0.5 sec.
  await new Promise(resolve => setTimeout(resolve, 500));
  return new Promise(resolve => {
    // returns the response object
    resolve({ body: stream.Readable.from(asyncIterable) });
  });
}

fetch() занимает 0,5 секунды, чтобы получить объект ответа. Он возвращает Promise, который разрешается в объект, из которого body предоставляет ReadableStream. Этот доступный для чтения поток продолжает отправлять порцию данных json в нисходящий поток каждую секунду, как определено в asyncGenerator().

.

Наша функция fetch() принимает в качестве параметра массив JSON вместо URL. Давайте воспользуемся тем, что вы предоставили, но разделим его немного в другом месте, поэтому после получения второго фрагмента мы получим два полных объекта.

const chunkedJson = [
  // chunk 1
  `[
  {
    "name": "John Doe",
    "occupation": "gardener",
    "born": "1992-03-02"
  }
  ,
  {
    "name": "Brian Flem`,
  // chunk 2
  `ming",
    "occupation": "teacher",
    "born": "1967-11-22"
  }
  ,
  {
    "name": "Lucy Black",
    "occupation": "accountant",
    "born": "1995-04-07"
  }`,
  // chunk 3
  `,
  {
    "name": "William Bean",
    "occupation": "pilot",
    "born": "1977`,
  // chunk 4
  `-10-31"
  }
]`
];

Теперь с этими данными вы можете подтвердить, как работает fetch() следующим образом.

Пример 1: Тестирование fetch()

async function example1 () {
  const response = await fetch(chunkedJson);
  for await (const chunk of response.body) {
    console.log(chunk);
  }
}

example1();
console.log("==== Example 1 Started ==============");

Вывод примера 1.

==== Example 1 Started ==============
==== chunk 1 transmitted =====================
[
  {
    "name": "John Doe",
    "occupation": "gardener",
    "born": "1992-03-02"
  }
  ,
  {
    "name": "Brian Flem
==== chunk 2 transmitted =====================
ming",
    "occupation": "teacher",
    "born": "1967-11-22"
  }
  ,
  {
    "name": "Lucy Black",
    "occupation": "accountant",
    "born": "1995-04-07"
  }
==== chunk 3 transmitted =====================
,
  {
    "name": "William Bean",
    "occupation": "pilot",
    "born": "1977
==== chunk 4 transmitted =====================
-10-31"
  }
]

Теперь давайте обработаем каждый элемент этих данных json, не дожидаясь поступления всех данных.

StraemArray является подклассом stream.Transform. Таким образом, он имеет интерфейс как ReadableStream, так и WritableStream. Если экземпляры потока связаны с pipe(), вам не нужно беспокоиться о противодавлении, поэтому мы направляем два потока, т.е. ReadableStream получено из fetch() и экземпляра StreamArray вместе как response.body.pipe(StreamArray.withParser()) в Примере 2 ниже.

pipe(StreamArray.withParser()) возвращает экземпляр самого StreamArray для цепочки методов, поэтому переменная pipeline теперь содержит ссылку на поток преобразования, который также является читаемым потоком. Мы можем присоединить к нему прослушиватель событий, чтобы использовать преобразованные данные.

StreamArray генерирует событие data, когда один объект анализируется из читаемого источника. Итак, pipiline.on('data', callback) обрабатывает фрагмент за фрагментом, не дожидаясь всех данных json.

Когда список событий зарегистрирован на событие data с помощью pipiline.on('data', callback), поток начинает течь.

Поскольку мы моделируем получение данных асинхронно, вы можете увидеть !!!! MAIN THREAD !!!! в консоли в середине передачи данных. Вы можете подтвердить, что основной поток не блокируется во время ожидания проанализированных данных.

Пример 2: Тестирование stream-json обработки каждого элемента массива по одному по мере поступления

const StreamArray = require('stream-json/streamers/StreamArray');

async function example2 () {
  const response = await fetch(chunkedJson);
  const pipeline = response.body.pipe(StreamArray.withParser());
  const timer = setInterval(() => console.log("!!!! MAIN THREAD !!!!"), 500);
  pipeline.on('data', ({ key, value }) => {
    console.log("====== stream-json StreamArray() RESULT ========");
    console.log(value); // do your data processing here
  }).on('close', () => {
    clearInterval(timer); // stop the main thread console.log
  });
}

example2();
console.log("==== Example 2 Started ==============");

Вывод примера 2.

==== Example 2 Started ==============
!!!! MAIN THREAD !!!!
==== chunk 1 transmitted =====================
====== stream-json StreamArray() RESULT ========
{ name: 'John Doe', occupation: 'gardener', born: '1992-03-02' }
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 2 transmitted =====================
====== stream-json StreamArray() RESULT ========
{ name: 'Brian Flemming', occupation: 'teacher', born: '1967-11-22' }
====== stream-json StreamArray() RESULT ========
{ name: 'Lucy Black', occupation: 'accountant', born: '1995-04-07' }
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 3 transmitted =====================
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 4 transmitted =====================
====== stream-json StreamArray() RESULT ========
{ name: 'William Bean', occupation: 'pilot', born: '1977-10-31' }

Поскольку все потоки являются экземплярами EventEmitter, вы можете просто присоединить обратный вызов к событию data для использования конечных данных, как в примере 2. Однако предпочтительнее использовать pipe() даже для конечного потребления данных, поскольку pipe() управляет противодавлением.

Проблема обратного давления возникает, когда потребление данных в нисходящем направлении медленнее, чем поток данных в восходящем направлении. Например, когда обработка данных требует времени, вы можете захотеть обрабатывать каждый фрагмент асинхронно. Если обработка следующего фрагмента завершается до обработки предыдущего фрагмента, следующий фрагмент помещается в нисходящий поток перед первым. Если нисходящий поток зависит от первого фрагмента перед обработкой следующего, это вызывает проблемы.

При использовании списка событий необходимо вручную управлять паузой и возобновлением, чтобы избежать противодавления (см. это в качестве примера). Однако, если вы соедините потоки с pipe(), проблема обратного давления будет решена внутри. Это означает, что когда нисходящий поток медленнее восходящего, pipe() автоматически приостановит подачу в нисходящий поток.

Давайте создадим собственный WritableStream, чтобы соединиться с StreamArray с помощью pipe(). В нашем случае мы получаем двоичные данные из восходящего потока (т.е. StreamArray), а не строку, мы должны установить objectMode на true. Мы переопределяем функцию _write(), которая будет внутренне вызываться из write(). Вы помещаете сюда всю логику обработки данных и после завершения вызываете callback(). Восходящий поток не передает следующие данные до тех пор, пока не будет вызван обратный вызов, когда потоки связаны с pipe().

.

Чтобы имитировать обратное давление, мы обрабатываем фрагменты 1 и 3 в течение 1,5 секунды, а фрагменты 0 и 4 в течение 0 секунд ниже.

Пример 3. Передача нашего собственного экземпляра потока

class MyObjectConsumerStream extends stream.Writable {
  constructor(options) {
    super({ ...options, objectMode: true });
  }

  _write(chunk, encoding, callback) {
    const { key, value } = chunk; // receive from StreamArray of stream-json
    console.log("===== started to processing the chunk ........... ");
    setTimeout(() => {
      console.log("====== Example 3 RESULT ========");
      console.log(value); // do your data processing here
      callback(); // pipe() will pause the upstream until callback is called
    }, key % 2 === 0 ? 1500 : 0); // for second and fourth chunk it processes 0 sec!
  }
}

//--- Example 3: We write our own WritableStream to consume chunked data ------
async function example3 () {
  const response = await fetch(chunkedJson);
  response.body.pipe(StreamArray.withParser()).pipe(new MyObjectConsumerStream()).on('finish', () => {
    clearInterval(timer); // stop the main thread console.log
  });
  const timer = setInterval(() => console.log("!!!! MAIN THREAD !!!!"), 500);
}

example3();
console.log("==== Example 3 Started ==============");

Вывод примера 3.

==== Example 3 Started ==============
!!!! MAIN THREAD !!!!
==== chunk 1 transmitted =====================
===== started to processing the chunk ........... 
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 2 transmitted =====================
!!!! MAIN THREAD !!!!
====== Example 3 RESULT ========
{ name: 'John Doe', occupation: 'gardener', born: '1992-03-02' }
===== started to processing the chunk ........... 
!!!! MAIN THREAD !!!!
====== Example 3 RESULT ========
{ name: 'Brian Flemming', occupation: 'teacher', born: '1967-11-22' }
===== started to processing the chunk ........... 
==== chunk 3 transmitted =====================
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
====== Example 3 RESULT ========
{ name: 'Lucy Black', occupation: 'accountant', born: '1995-04-07' }
==== chunk 4 transmitted =====================
===== started to processing the chunk ........... 
!!!! MAIN THREAD !!!!
====== Example 3 RESULT ========
{ name: 'William Bean', occupation: 'pilot', born: '1977-10-31' }

Вы можете подтвердить, что полученные данные в порядке. Вы также можете видеть, что передача 2-го фрагмента начинается во время обработки первого объекта, поскольку мы установили его на 1,5 секунды. Теперь давайте сделаем то же самое, используя прослушиватель событий следующим образом.

Пример 4: Проблема противодавления с простым обратным вызовом

async function example4 () {
  const response = await fetch(chunkedJson);
  const pipeline = response.body.pipe(StreamArray.withParser());
  const timer = setInterval(() => console.log("!!!! MAIN THREAD !!!!"), 500);
  pipeline.on('data', ({ key, value }) => {
    console.log("===== started to processing the chunk ........... ");
    setTimeout(() => {
      console.log(`====== Example 4 RESULT ========`);
      console.log(value); // do your data processing here
    }, key % 2 === 0 ? 1500 : 0); // for second and thrid chunk it processes 0 sec!
  }).on('close', () => {
    clearInterval(timer); // stop the main thread console.log
  });
}

example4();
console.log("==== Example 4 Started ==============");

Вывод примера 4.

==== Example 4 Started ==============
!!!! MAIN THREAD !!!!
==== chunk 1 transmitted =====================
===== started to processing the chunk ........... 
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 2 transmitted =====================
===== started to processing the chunk ........... 
===== started to processing the chunk ........... 
!!!! MAIN THREAD !!!!
====== Example 4 RESULT ========
{ name: 'Brian Flemming', occupation: 'teacher', born: '1967-11-22' }
====== Example 4 RESULT ========
{ name: 'John Doe', occupation: 'gardener', born: '1992-03-02' }
!!!! MAIN THREAD !!!!
==== chunk 3 transmitted =====================
!!!! MAIN THREAD !!!!
====== Example 4 RESULT ========
{ name: 'Lucy Black', occupation: 'accountant', born: '1995-04-07' }
!!!! MAIN THREAD !!!!
==== chunk 4 transmitted =====================
===== started to processing the chunk ........... 
====== Example 4 RESULT ========
{ name: 'William Bean', occupation: 'pilot', born: '1977-10-31' }

Теперь мы видим, что второй элемент "Брайан" появляется перед "Джоном". Если время обработки увеличить до 3 секунд для фрагментов 1 и 3, последний элемент "Уильям" также прибудет перед третьим "Люси".

Таким образом, рекомендуется использовать pipe(), а не прослушиватели событий для потребления данных, когда важен порядок поступления данных.

Возможно, вам интересно, почему пример кода в документе API использует собственную функцию chain() для создания конвейера. Это рекомендуемый шаблон проектирования для обработки ошибок в потоковом программировании в Node. Если ошибка возникает в нисходящем потоке конвейера, она не распространяется на восходящий поток. Таким образом, вы должны прикрепить обратный вызов к каждому потоку в конвейере следующим образом (здесь предполагается наличие трех потоков: a, b, c).

a.on('error', callbackForA)
 .pipe(b).on('error', callbackForB)
 .pipe(c).on('error', callbackForC)

Это выглядит громоздко по сравнению с цепочкой Promise, которая может просто добавить .catch() в конец цепочки. Несмотря на то, что мы установили все обработчики ошибок, как указано выше, этого все равно недостаточно.

Когда возникает ошибка в нисходящем потоке, поток, вызванный ошибкой, отсоединяется от конвейера с unpipe(), однако восходящий поток не уничтожается автоматически. Это связано с тем, что существует возможность подключения нескольких потоков к восходящему потоку для ответвления линии потока. Таким образом, при использовании pipe().

вы должны самостоятельно закрыть все верхние потоки из каждого обработчика ошибок.

Для решения этой проблемы сообщество предоставило библиотеки для построения конвейеров. Я думаю, что chain() из stream-chain является одним из них. Начиная с версии Node 10 для этой функциональности добавлен stream.pipeline. Мы можем использовать этот официальный конструктор конвейера, поскольку все потоки в stream-json являются подклассом обычных экземпляров потока.

Прежде чем показывать использование stream.pipiline, давайте изменим класс MyObjectConsumerStream, чтобы он вызывал ошибку при обработке второго объекта.

Пользовательский поток, выдающий ошибку

class MyErrorStream extends MyObjectConsumerStream {
  _write(chunk, encoding, callback) {
    const { key, value } = chunk; // receive from StreamArray of stream-json
    console.log("===== started to processing the chunk ........... ");
    if (key === 2)
      throw new Error("Error in key 2");
    setTimeout(() => {
      console.log("====== Example 5 RESULT ========");
      console.log(value); // do your data processing here
      callback(); // pipe() will pause the upstream until callback is called
    }, key % 2 === 0 ? 1500 : 0); // for second and fourth chunk it processes 0 sec!
  };
}

stream.pipeline принимает несколько потоков по порядку вместе с обработчиком ошибок в конце. Обработчик ошибок получает экземпляр Error при возникновении ошибки и получает null при успешном завершении.

Пример 5: Использование stream.pipeline

async function example5 () {
  const response = await fetch(chunkedJson);
  const myErrorHandler = (timerRef) => (error) => {
    if (error)
      console.log("Error in the pipiline", error.message);
    else
      console.log("Finished Example 5 successfully");
    clearInterval(timerRef); // stop the main thread console.log
  }
  const timer = setInterval(() => console.log("!!!! MAIN THREAD !!!!"), 500);
  stream.pipeline(
    response.body,
    StreamArray.withParser(),
    new MyErrorStream(),
    myErrorHandler(timer)
  );
  console.log("==== Example 5 Started ==============");
}

example5();

Вывод примера 5

==== Example 5 Started ==============
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 1 transmitted =====================
===== started to processing the chunk ........... 
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 2 transmitted =====================
!!!! MAIN THREAD !!!!
====== Example 5 RESULT ========
{ name: 'John Doe', occupation: 'gardener', born: '1992-03-02' }
===== started to processing the chunk ........... 
====== Example 5 RESULT ========
{ name: 'Brian Flemming', occupation: 'teacher', born: '1967-11-22' }
===== started to processing the chunk ........... 
/Users/shito/Documents/git-repositories/javascript/coderhelper/JS/FailToParseJasonStream/ParseChunkedJsonAnswer.js:211
      throw new Error("Error in key 2");
      ^

Error: Error in key 2
    at MyErrorStream._write (/Users/shito/Documents/git-repositories/javascript/coderhelper/JS/FailToParseJasonStream/ParseChunkedJsonAnswer.js:211:13)
    at doWrite (internal/streams/writable.js:377:12)
    at clearBuffer (internal/streams/writable.js:529:7)
    at onwrite (internal/streams/writable.js:430:7)
    at Timeout._onTimeout (/Users/shito/Documents/git-repositories/javascript/coderhelper/JS/FailToParseJasonStream/ParseChunkedJsonAnswer.js:215:7)
    at listOnTimeout (internal/timers.js:554:17)
    at processTimers (internal/timers.js:497:7)

При возникновении ошибки stream.pipeline() вызывает stream.destroy(error) для всех потоков, которые не были закрыты или не завершились должным образом. Так что нам не нужно беспокоиться об утечке памяти.

icelemon
15 августа 2021 в 18:40
0

Вау, спасибо, что попробовали разные варианты здесь. Но моя цель здесь - избавиться от использования сторонней библиотеки "StreamArray.withParser()". Я не хочу использовать здесь зависимость, я пытаюсь написать собственную реализацию, чтобы сэкономить больше времени, поскольку мне не нужно анализировать имя/значение, указанное в объекте JSON.

icelemon
15 августа 2021 в 19:05
0

Не могли бы вы помочь мне еще раз взглянуть на coderhelper.com/questions/68767486/…