Node.jsМодуль 3: Файловая система
Потоки (Streams)
Readable, Writable, Transform, pipe
Цель урока
В этом уроке ты научишься:
- Понимать типы потоков
- Использовать pipe для передачи данных
- Создавать свои потоки
Зачем нужны потоки
Потоки позволяют обрабатывать данные по частям, не загружая всё в память.
const fs = require('fs');
// Плохо: загружает весь файл в память
const data = fs.readFileSync('huge-file.txt');
// Хорошо: читает по частям
const stream = fs.createReadStream('huge-file.txt');
stream.on('data', (chunk) => {
console.log('Получен кусок:', chunk.length, 'байт');
});Типы потоков
| Тип | Описание | Пример |
|---|---|---|
| Readable | Чтение данных | fs.createReadStream |
| Writable | Запись данных | fs.createWriteStream |
| Duplex | Чтение и запись | net.Socket |
| Transform | Преобразование данных | zlib.createGzip |
Readable Stream
Создание
const fs = require('fs');
const readable = fs.createReadStream('file.txt', {
encoding: 'utf8',
highWaterMark: 64 * 1024 // Размер буфера (64KB)
});События
readable.on('data', (chunk) => {
console.log('Данные:', chunk);
});
readable.on('end', () => {
console.log('Чтение завершено');
});
readable.on('error', (err) => {
console.error('Ошибка:', err.message);
});Режимы чтения
// Flowing mode (автоматическое чтение)
readable.on('data', (chunk) => {});
// Paused mode (ручное чтение)
readable.on('readable', () => {
let chunk;
while ((chunk = readable.read()) !== null) {
console.log(chunk);
}
});
// Управление потоком
readable.pause();
readable.resume();Writable Stream
Создание
const fs = require('fs');
const writable = fs.createWriteStream('output.txt', {
encoding: 'utf8'
});Запись
writable.write('Первая строка\n');
writable.write('Вторая строка\n');
writable.end('Последняя строка'); // Завершает поток
writable.on('finish', () => {
console.log('Запись завершена');
});
writable.on('error', (err) => {
console.error('Ошибка:', err.message);
});Backpressure
const writable = fs.createWriteStream('output.txt');
function write(data, callback) {
// write() возвращает false если буфер переполнен
if (!writable.write(data)) {
// Ждём освобождения буфера
writable.once('drain', callback);
} else {
process.nextTick(callback);
}
}pipe
Соединяет потоки:
const fs = require('fs');
const readable = fs.createReadStream('input.txt');
const writable = fs.createWriteStream('output.txt');
// Копирование файла
readable.pipe(writable);
// С обработкой завершения
readable
.pipe(writable)
.on('finish', () => console.log('Копирование завершено'));Цепочка pipe
const fs = require('fs');
const zlib = require('zlib');
// Сжатие файла
fs.createReadStream('file.txt')
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('file.txt.gz'));
// Распаковка
fs.createReadStream('file.txt.gz')
.pipe(zlib.createGunzip())
.pipe(fs.createWriteStream('file.txt'));Transform Stream
Преобразует данные на лету:
const { Transform } = require('stream');
const upperCase = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
process.stdin
.pipe(upperCase)
.pipe(process.stdout);Практический пример
const { Transform } = require('stream');
const fs = require('fs');
// Нумерация строк
const lineNumberer = new Transform({
transform(chunk, encoding, callback) {
const lines = chunk.toString().split('\n');
const numbered = lines
.map((line, i) => `${this.lineNum++}: ${line}`)
.join('\n');
this.push(numbered);
callback();
}
});
lineNumberer.lineNum = 1;
fs.createReadStream('input.txt')
.pipe(lineNumberer)
.pipe(fs.createWriteStream('numbered.txt'));pipeline (рекомендуется)
const { pipeline } = require('stream/promises');
const fs = require('fs');
const zlib = require('zlib');
async function compress(input, output) {
await pipeline(
fs.createReadStream(input),
zlib.createGzip(),
fs.createWriteStream(output)
);
console.log('Сжатие завершено');
}
compress('file.txt', 'file.txt.gz');Преимущество pipeline
pipeline автоматически обрабатывает ошибки и закрывает потоки.
Практика
Задание 1: Копирование файла
Задача: Скопируй большой файл используя потоки.
Решение:
const fs = require('fs');
const { pipeline } = require('stream/promises');
async function copyFile(src, dest) {
await pipeline(
fs.createReadStream(src),
fs.createWriteStream(dest)
);
}Задание 2: Transform
Задача: Создай поток, который заменяет слово.
Решение:
const { Transform } = require('stream');
function createReplacer(search, replace) {
return new Transform({
transform(chunk, encoding, callback) {
const result = chunk.toString().replaceAll(search, replace);
this.push(result);
callback();
}
});
}Проверь себя
- Когда использовать потоки вместо readFile?
- Что такое backpressure?
- Чем
pipelineлучшеpipe?