Я прочитал ваше обновление по вашему вопросу и понял, что комментарий, который я оставил по вашему вопросу, был совершенно не по делу. Поскольку вы используете поток, вы не хотите ждать всех данных, чтобы избежать исчерпания памяти. Я должен был заметить это в самом начале.
Позвольте мне привести несколько примеров для моих извинений. Надеюсь, это поможет понять, как использовать потоки.
Чтобы сделать примеры более реалистичными, давайте смоделируем получение json с удаленного сервера, как это делает node-fetch
. node-fetch
возвращает экземпляр ReadableStream
, который также является asyncIterable
. Мы можем легко создать его, передав функцию асинхронного генератора stream.Readable.from()
, как показано ниже.
Определение fetch()
async function* asyncGenerator (chunks) {
let counter = 1;
for (const chunk of chunks) {
await new Promise(resolve => setTimeout(resolve, 1000));
console.log(`==== chunk ${counter++} transmitted =====================`);
yield chunk;
}
}
const stream = require('stream');
// simulates node-fetch
async function fetch (json) {
const asyncIterable = asyncGenerator(json);
// let the client wait for 0.5 sec.
await new Promise(resolve => setTimeout(resolve, 500));
return new Promise(resolve => {
// returns the response object
resolve({ body: stream.Readable.from(asyncIterable) });
});
}
fetch()
занимает 0,5 секунды, чтобы получить объект ответа. Он возвращает Promise
, который разрешается в объект, из которого body
предоставляет ReadableStream
. Этот доступный для чтения поток продолжает отправлять порцию данных json в нисходящий поток каждую секунду, как определено в asyncGenerator()
.
.
Наша функция fetch()
принимает в качестве параметра массив JSON вместо URL. Давайте воспользуемся тем, что вы предоставили, но разделим его немного в другом месте, поэтому после получения второго фрагмента мы получим два полных объекта.
const chunkedJson = [
// chunk 1
`[
{
"name": "John Doe",
"occupation": "gardener",
"born": "1992-03-02"
}
,
{
"name": "Brian Flem`,
// chunk 2
`ming",
"occupation": "teacher",
"born": "1967-11-22"
}
,
{
"name": "Lucy Black",
"occupation": "accountant",
"born": "1995-04-07"
}`,
// chunk 3
`,
{
"name": "William Bean",
"occupation": "pilot",
"born": "1977`,
// chunk 4
`-10-31"
}
]`
];
Теперь с этими данными вы можете подтвердить, как работает fetch()
следующим образом.
Пример 1: Тестирование fetch()
async function example1 () {
const response = await fetch(chunkedJson);
for await (const chunk of response.body) {
console.log(chunk);
}
}
example1();
console.log("==== Example 1 Started ==============");
Вывод примера 1.
==== Example 1 Started ==============
==== chunk 1 transmitted =====================
[
{
"name": "John Doe",
"occupation": "gardener",
"born": "1992-03-02"
}
,
{
"name": "Brian Flem
==== chunk 2 transmitted =====================
ming",
"occupation": "teacher",
"born": "1967-11-22"
}
,
{
"name": "Lucy Black",
"occupation": "accountant",
"born": "1995-04-07"
}
==== chunk 3 transmitted =====================
,
{
"name": "William Bean",
"occupation": "pilot",
"born": "1977
==== chunk 4 transmitted =====================
-10-31"
}
]
Теперь давайте обработаем каждый элемент этих данных json, не дожидаясь поступления всех данных.
StraemArray
является подклассом stream.Transform. Таким образом, он имеет интерфейс как ReadableStream
, так и WritableStream
. Если экземпляры потока связаны с pipe()
, вам не нужно беспокоиться о противодавлении, поэтому мы направляем два потока, т.е. ReadableStream
получено из fetch()
и экземпляра StreamArray
вместе как response.body.pipe(StreamArray.withParser())
в Примере 2 ниже.
pipe(StreamArray.withParser())
возвращает экземпляр самого StreamArray
для цепочки методов, поэтому переменная pipeline
теперь содержит ссылку на поток преобразования, который также является читаемым потоком. Мы можем присоединить к нему прослушиватель событий, чтобы использовать преобразованные данные.
StreamArray
генерирует событие data
, когда один объект анализируется из читаемого источника. Итак, pipiline.on('data', callback)
обрабатывает фрагмент за фрагментом, не дожидаясь всех данных json.
Когда список событий зарегистрирован на событие data
с помощью pipiline.on('data', callback)
, поток начинает течь.
Поскольку мы моделируем получение данных асинхронно, вы можете увидеть !!!! MAIN THREAD !!!!
в консоли в середине передачи данных. Вы можете подтвердить, что основной поток не блокируется во время ожидания проанализированных данных.
Пример 2: Тестирование stream-json
обработки каждого элемента массива по одному по мере поступления
const StreamArray = require('stream-json/streamers/StreamArray');
async function example2 () {
const response = await fetch(chunkedJson);
const pipeline = response.body.pipe(StreamArray.withParser());
const timer = setInterval(() => console.log("!!!! MAIN THREAD !!!!"), 500);
pipeline.on('data', ({ key, value }) => {
console.log("====== stream-json StreamArray() RESULT ========");
console.log(value); // do your data processing here
}).on('close', () => {
clearInterval(timer); // stop the main thread console.log
});
}
example2();
console.log("==== Example 2 Started ==============");
Вывод примера 2.
==== Example 2 Started ==============
!!!! MAIN THREAD !!!!
==== chunk 1 transmitted =====================
====== stream-json StreamArray() RESULT ========
{ name: 'John Doe', occupation: 'gardener', born: '1992-03-02' }
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 2 transmitted =====================
====== stream-json StreamArray() RESULT ========
{ name: 'Brian Flemming', occupation: 'teacher', born: '1967-11-22' }
====== stream-json StreamArray() RESULT ========
{ name: 'Lucy Black', occupation: 'accountant', born: '1995-04-07' }
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 3 transmitted =====================
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 4 transmitted =====================
====== stream-json StreamArray() RESULT ========
{ name: 'William Bean', occupation: 'pilot', born: '1977-10-31' }
Поскольку все потоки являются экземплярами EventEmitter
, вы можете просто присоединить обратный вызов к событию data
для использования конечных данных, как в примере 2. Однако предпочтительнее использовать pipe()
даже для конечного потребления данных, поскольку pipe()
управляет противодавлением.
Проблема обратного давления возникает, когда потребление данных в нисходящем направлении медленнее, чем поток данных в восходящем направлении. Например, когда обработка данных требует времени, вы можете захотеть обрабатывать каждый фрагмент асинхронно. Если обработка следующего фрагмента завершается до обработки предыдущего фрагмента, следующий фрагмент помещается в нисходящий поток перед первым. Если нисходящий поток зависит от первого фрагмента перед обработкой следующего, это вызывает проблемы.
При использовании списка событий необходимо вручную управлять паузой и возобновлением, чтобы избежать противодавления (см. это в качестве примера). Однако, если вы соедините потоки с pipe()
, проблема обратного давления будет решена внутри. Это означает, что когда нисходящий поток медленнее восходящего, pipe()
автоматически приостановит подачу в нисходящий поток.
Давайте создадим собственный WritableStream
, чтобы соединиться с StreamArray
с помощью pipe()
. В нашем случае мы получаем двоичные данные из восходящего потока (т.е. StreamArray
), а не строку, мы должны установить objectMode
на true
. Мы переопределяем функцию _write()
, которая будет внутренне вызываться из write()
. Вы помещаете сюда всю логику обработки данных и после завершения вызываете callback()
. Восходящий поток не передает следующие данные до тех пор, пока не будет вызван обратный вызов, когда потоки связаны с pipe()
.
.
Чтобы имитировать обратное давление, мы обрабатываем фрагменты 1 и 3 в течение 1,5 секунды, а фрагменты 0 и 4 в течение 0 секунд ниже.
Пример 3. Передача нашего собственного экземпляра потока
class MyObjectConsumerStream extends stream.Writable {
constructor(options) {
super({ ...options, objectMode: true });
}
_write(chunk, encoding, callback) {
const { key, value } = chunk; // receive from StreamArray of stream-json
console.log("===== started to processing the chunk ........... ");
setTimeout(() => {
console.log("====== Example 3 RESULT ========");
console.log(value); // do your data processing here
callback(); // pipe() will pause the upstream until callback is called
}, key % 2 === 0 ? 1500 : 0); // for second and fourth chunk it processes 0 sec!
}
}
//--- Example 3: We write our own WritableStream to consume chunked data ------
async function example3 () {
const response = await fetch(chunkedJson);
response.body.pipe(StreamArray.withParser()).pipe(new MyObjectConsumerStream()).on('finish', () => {
clearInterval(timer); // stop the main thread console.log
});
const timer = setInterval(() => console.log("!!!! MAIN THREAD !!!!"), 500);
}
example3();
console.log("==== Example 3 Started ==============");
Вывод примера 3.
==== Example 3 Started ==============
!!!! MAIN THREAD !!!!
==== chunk 1 transmitted =====================
===== started to processing the chunk ...........
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 2 transmitted =====================
!!!! MAIN THREAD !!!!
====== Example 3 RESULT ========
{ name: 'John Doe', occupation: 'gardener', born: '1992-03-02' }
===== started to processing the chunk ...........
!!!! MAIN THREAD !!!!
====== Example 3 RESULT ========
{ name: 'Brian Flemming', occupation: 'teacher', born: '1967-11-22' }
===== started to processing the chunk ...........
==== chunk 3 transmitted =====================
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
====== Example 3 RESULT ========
{ name: 'Lucy Black', occupation: 'accountant', born: '1995-04-07' }
==== chunk 4 transmitted =====================
===== started to processing the chunk ...........
!!!! MAIN THREAD !!!!
====== Example 3 RESULT ========
{ name: 'William Bean', occupation: 'pilot', born: '1977-10-31' }
Вы можете подтвердить, что полученные данные в порядке. Вы также можете видеть, что передача 2-го фрагмента начинается во время обработки первого объекта, поскольку мы установили его на 1,5 секунды. Теперь давайте сделаем то же самое, используя прослушиватель событий следующим образом.
Пример 4: Проблема противодавления с простым обратным вызовом
async function example4 () {
const response = await fetch(chunkedJson);
const pipeline = response.body.pipe(StreamArray.withParser());
const timer = setInterval(() => console.log("!!!! MAIN THREAD !!!!"), 500);
pipeline.on('data', ({ key, value }) => {
console.log("===== started to processing the chunk ........... ");
setTimeout(() => {
console.log(`====== Example 4 RESULT ========`);
console.log(value); // do your data processing here
}, key % 2 === 0 ? 1500 : 0); // for second and thrid chunk it processes 0 sec!
}).on('close', () => {
clearInterval(timer); // stop the main thread console.log
});
}
example4();
console.log("==== Example 4 Started ==============");
Вывод примера 4.
==== Example 4 Started ==============
!!!! MAIN THREAD !!!!
==== chunk 1 transmitted =====================
===== started to processing the chunk ...........
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 2 transmitted =====================
===== started to processing the chunk ...........
===== started to processing the chunk ...........
!!!! MAIN THREAD !!!!
====== Example 4 RESULT ========
{ name: 'Brian Flemming', occupation: 'teacher', born: '1967-11-22' }
====== Example 4 RESULT ========
{ name: 'John Doe', occupation: 'gardener', born: '1992-03-02' }
!!!! MAIN THREAD !!!!
==== chunk 3 transmitted =====================
!!!! MAIN THREAD !!!!
====== Example 4 RESULT ========
{ name: 'Lucy Black', occupation: 'accountant', born: '1995-04-07' }
!!!! MAIN THREAD !!!!
==== chunk 4 transmitted =====================
===== started to processing the chunk ...........
====== Example 4 RESULT ========
{ name: 'William Bean', occupation: 'pilot', born: '1977-10-31' }
Теперь мы видим, что второй элемент "Брайан" появляется перед "Джоном". Если время обработки увеличить до 3 секунд для фрагментов 1 и 3, последний элемент "Уильям" также прибудет перед третьим "Люси".
Таким образом, рекомендуется использовать pipe()
, а не прослушиватели событий для потребления данных, когда важен порядок поступления данных.
Возможно, вам интересно, почему пример кода в документе API использует собственную функцию chain()
для создания конвейера. Это рекомендуемый шаблон проектирования для обработки ошибок в потоковом программировании в Node. Если ошибка возникает в нисходящем потоке конвейера, она не распространяется на восходящий поток. Таким образом, вы должны прикрепить обратный вызов к каждому потоку в конвейере следующим образом (здесь предполагается наличие трех потоков: a
, b
, c
).
a.on('error', callbackForA)
.pipe(b).on('error', callbackForB)
.pipe(c).on('error', callbackForC)
Это выглядит громоздко по сравнению с цепочкой Promise, которая может просто добавить .catch()
в конец цепочки. Несмотря на то, что мы установили все обработчики ошибок, как указано выше, этого все равно недостаточно.
Когда возникает ошибка в нисходящем потоке, поток, вызванный ошибкой, отсоединяется от конвейера с unpipe()
, однако восходящий поток не уничтожается автоматически. Это связано с тем, что существует возможность подключения нескольких потоков к восходящему потоку для ответвления линии потока. Таким образом, при использовании pipe()
.
вы должны самостоятельно закрыть все верхние потоки из каждого обработчика ошибок.
Для решения этой проблемы сообщество предоставило библиотеки для построения конвейеров. Я думаю, что chain()
из stream-chain является одним из них. Начиная с версии Node 10 для этой функциональности добавлен stream.pipeline. Мы можем использовать этот официальный конструктор конвейера, поскольку все потоки в stream-json
являются подклассом обычных экземпляров потока.
Прежде чем показывать использование stream.pipiline
, давайте изменим класс MyObjectConsumerStream
, чтобы он вызывал ошибку при обработке второго объекта.
Пользовательский поток, выдающий ошибку
class MyErrorStream extends MyObjectConsumerStream {
_write(chunk, encoding, callback) {
const { key, value } = chunk; // receive from StreamArray of stream-json
console.log("===== started to processing the chunk ........... ");
if (key === 2)
throw new Error("Error in key 2");
setTimeout(() => {
console.log("====== Example 5 RESULT ========");
console.log(value); // do your data processing here
callback(); // pipe() will pause the upstream until callback is called
}, key % 2 === 0 ? 1500 : 0); // for second and fourth chunk it processes 0 sec!
};
}
stream.pipeline
принимает несколько потоков по порядку вместе с обработчиком ошибок в конце. Обработчик ошибок получает экземпляр Error
при возникновении ошибки и получает null
при успешном завершении.
Пример 5: Использование stream.pipeline
async function example5 () {
const response = await fetch(chunkedJson);
const myErrorHandler = (timerRef) => (error) => {
if (error)
console.log("Error in the pipiline", error.message);
else
console.log("Finished Example 5 successfully");
clearInterval(timerRef); // stop the main thread console.log
}
const timer = setInterval(() => console.log("!!!! MAIN THREAD !!!!"), 500);
stream.pipeline(
response.body,
StreamArray.withParser(),
new MyErrorStream(),
myErrorHandler(timer)
);
console.log("==== Example 5 Started ==============");
}
example5();
Вывод примера 5
==== Example 5 Started ==============
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 1 transmitted =====================
===== started to processing the chunk ...........
!!!! MAIN THREAD !!!!
!!!! MAIN THREAD !!!!
==== chunk 2 transmitted =====================
!!!! MAIN THREAD !!!!
====== Example 5 RESULT ========
{ name: 'John Doe', occupation: 'gardener', born: '1992-03-02' }
===== started to processing the chunk ...........
====== Example 5 RESULT ========
{ name: 'Brian Flemming', occupation: 'teacher', born: '1967-11-22' }
===== started to processing the chunk ...........
/Users/shito/Documents/git-repositories/javascript/coderhelper/JS/FailToParseJasonStream/ParseChunkedJsonAnswer.js:211
throw new Error("Error in key 2");
^
Error: Error in key 2
at MyErrorStream._write (/Users/shito/Documents/git-repositories/javascript/coderhelper/JS/FailToParseJasonStream/ParseChunkedJsonAnswer.js:211:13)
at doWrite (internal/streams/writable.js:377:12)
at clearBuffer (internal/streams/writable.js:529:7)
at onwrite (internal/streams/writable.js:430:7)
at Timeout._onTimeout (/Users/shito/Documents/git-repositories/javascript/coderhelper/JS/FailToParseJasonStream/ParseChunkedJsonAnswer.js:215:7)
at listOnTimeout (internal/timers.js:554:17)
at processTimers (internal/timers.js:497:7)
При возникновении ошибки stream.pipeline()
вызывает stream.destroy(error)
для всех потоков, которые не были закрыты или не завершились должным образом. Так что нам не нужно беспокоиться об утечке памяти.
Репост этого комментария, так как я прокомментировал неправильный вопрос. Есть ли какая-то особая причина не сохранять все куски в буфере и не анализировать всю строку json в конце? Я могу легко показать вам этот ответ, иначе нам придется написать собственный синтаксический анализатор, чтобы разделить неполные строки json на две, такие как действительная часть и неполная часть. Ожидание всей строки json — не такая уж плохая идея, поскольку пользователь не блокируется на протяжении всего процесса чтения. Основной поток цикла событий JavaScript получает контроль над каждой итерацией цикла, поскольку каждая итерация является асинхронной.
Меня также интересует решение, в настоящее время у меня нет варианта использования, но мне любопытно, как этот празер будет работать. (и как расширить его для работы с массивами/вложенными объектами)
@Summer Благодаря вашему обновлению я понял, что с опубликованной вами библиотекой есть лучшее решение. Вы также можете использовать эту библиотеку для другого вопроса coderhelper.com/questions/68705813/…. Я тоже обновлю этот ответ, когда у меня будет время.