Rad sa Node.js Stream API-jem

Darko Milošević iz Florence Healthcare-a nam u tekstu piše o najnovijoj implementaciji „Stream 3″, kao i o novim i korisnim API-jima koji dolaze uz Node v10+.

 

DARKO MILOŠEVIĆ — 24/06/2021

 

Uvod

Reč „Stream” koristi se u kompjuterskim naukama da se opiše komadno prikupljanje podataka koji nisu dostupni izjednom već se preuzimaju tokom vremena. Stream je u suštini skup vrednosti sličan nizovima, ali obrnut iz prostorne do vremenske ose.

 

U Node.js, „stream” je naziv modula koji implementira API za rad sa streaming podacima.

 

Node.js stream API je prošao značajan razvojni put od svojih početaka u 2009. godini. API koji se konstantno razvija stvara zabunu vezanu za različite načine implementiranja kao i sposobnost mešanja različitih interfejsa.

 

Naš fokus biće na najnovijoj implementaciji „Stream 3”, zajedno sa novim i korisnim API-jima koji dolaze uz Node v10+.

 

Osnove Stream-a

Svaki stream je instanca EventEmitter-a. To znači da emituju događaje koji se mogu upotrebiti za pisanje i čitanje podataka.

 

Tipovi Stream-a

 

Postoje četiri osnovne vrste stream-a u okviru Node.js:

 

Očitavajući:

 

  • apstrakcija izvora koji može da se očitava i konzumira
  • primeri: HTTP odgovori na klijentu, HTTP zahtevi na serveru, fs čitajući stream-ovi, process.stdin i tako dalje.

 

Zapisni:

 

  • apstrakcija za odredište na kome se mogu pisati podaci
  • primeri: HTTP odgovori na serveru, HTTP zahtevi na klijentu, fs pišući stream-ovi, process.stderr itd.

 

Duplex:

 

  • Streams koji implementiraju interfejs i za očitavanje i za pisanje podataka
  • primer: TCP socket (net.Socket)

 

Transform:

 

  • Streams koji su srodni Duplex-ima, ali uz mogućnost da se podaci menjaju ili transformišu u toku čitanja ili pisanja
  • primer: kompresujući stream (zlib.createGzip)

 

Modovi stream-a

 

Postoje dva moda u kojima operiše Node.js:

 

Standardni mod:

 

  • ovaj mod je postavljen u podrazumevanim podešavanjima
  • operiše na `STRING` i `BUFFER` (ili `UInt8Array`) tipovima
  • jedini tip koji se koristi u internim implementacijama streama u Node-u

 

Objektni mod:

 

  • postavlja se „objectMode” zastavicom pri kreiranju Stream-a
  • unutrašnji algoritam za buffering (izdvajanje dela memorije u koji će se pohraniti podaci) koji broji objekte a ne bajtove.

 

 

Buffering

 

Svaki stream poseduje unutrašnji buffer koji se koristi za smeštanje podataka. Očitavajući i pišući streams imaju po jedan i njemu se može pristupiti kroz `readable.readableBuffer` i `writable.writableBuffer`.

 

Duplex i transform streams imaju po dva odvojena buffera, što svakom dozvoljava da radi samostalno.

 

Veličina buffera definiše se kroz `highWatermarkOption`. Za streams koji rade u standardnom modu njime se određuje veličina buffera dok se za streams u objektnom modu određuje broj objekata.

 

 

 

Backpressure – povratni pritisak

 

Backpressure je koncept koji je obično teško razumljiv za sve one koji započinju rad sa Stream API-jem, što ga čini čestim uzrokom grešaka. Bez povratnog pritiska, streamovi ne bi bili toliko efikasni jer je isti jedan od najvažnijih odlika streamova.

 

Backpressure je signal koji pišući stream šalje natrag očitavajućem stream-u. Signal se šalje kada očitavajući stream čita podatke suviše brzo, a interni buffer pišućeg streama (koji se podešava kroz `highWatermarkOption`) se ispuni brže nego što je moguće obraditi.

 

Signal upozorava očitavajući stream da treba da pauzira pre nego što pošalje još podataka. Povratni pritisak je ono što omogućava pouzdan, vučni transfer podataka između očitavajućeg i pišućeg stream-a.

 

Par stvari se mogu dogoditi ako se sistem za backpressure ne uzme u obzir pri transferu podataka:

 

  • memorija sistema biće potrošena
  • procesi koji su u toku biće usporeni
  • sakupljač otpada biće preopterećen

 

Backpressure rukovodi pouzdanim, bez-gubitnim i memorijski-efikasnim prenosom podataka, što je i primarna svrha Node.js Stream API-ja.

 

 

API za korisnike Stream-a

 

Mnoge Node.js aplikacije koriste streamove. Ako se upoznate sa API-jem za korisnike streamova, bićete u stanju da ispravno koristite i konzumirate streamove.

 

 

Konzumiranje pišućih streamova

 

Svaki pišući stream poseduje sledeće metode:

 

writable.write(chunk[, encoding][, callback])

 

  • Upisuje neke podatke u stream
  • Vraća vrednost „false” ako je unutrašnji buffer popunjen, a u suprotnom „true”

writable.end([chunk][, encoding][, callback])

 

  • Šalje signal kada nije moguće upisati više podataka. Moguće je upisati poslednju količinu podataka pre zatvaranja.

 

writable.cork()

 

  • Primorava sve pisane podatke da se bufferuju u memoriji

 

writable.uncork()

 

  • Čisti sve podatke koji su bufferovani od pozivanja `stream.cork()`

 

writable.destroy()

 

  • Uništava stream

 

Sledeći isečak koda daje primer jednostavne upotrebe pišućeg stream-a bez rukovanja povratnim pritiskom, što verovatno NE ŽELITE da uradite:

 

Ovo su događaji koji se mogu izostaviti upisnom instancom:

 

drain

 

  • Pošto `writable.write()` vrati vrednost `false` zbog povratnog pritiska i taj pritisak je uklonjen, ovaj događaj biće emitovan kada je moguće nastaviti pisanje podataka u stream

 

error

 

  • Emituje se ako je došlo do greške prilikom pisanja ili pipovanja podataka (stream se ne zatvara kada se emituje ovaj događaj)

 

finish

 

  • Emituje se pošto je `writable.end()` metod prizvan i svi podaci su očišćeni

 

close

 

  • Emituje se po zatvaranju stream-a ili bilo kog od njegovih pozadinskih resursa

 

pipe

 

  • Emituje se pošto je `.pipe()` metod prizvan na očitavajućem stream-u

 

unpipe

 

  • Emituje se pošto je `.unpipe()` metod prizvan na očitavajućem stream-u

 

Jednostavan primer kako napisati pišući stream ručno (bez `readable.pipe()` ), i dok se uzima u obzir backpressure:

 

Ovo je jednostavan primer pošto samo upisuje istu rečenicu u petlji. Najvažniji aspekt je upravljanje povratnim pritiskom.

 

Backpressure, tj. povratni pritisak je zgodno rešen kroz `readable.pipe()` metodu, što izgleda ovako:

 

Ulazimo u više detalja o `readable.pipe()` metodi kasnije u ovom tekstu.

Pored fokusa na backpressure kada se stvara ručno upisivanje u pišuću instancu stream-a, praćenje mogućih grešaka dok pišete je takođe važno.

Evo konkretnog primera za ručno upisivanje u pišući stream, uz osvrt na backpressure, ispravno upravljanje greškama i operacijama posle pisanja (u ovom slučaju logging):

 

 

Konzumiranje očitavajućih streamova 

 

Očitavajući streamovi mogu da rade u dva moda:

 

  • tekući (flowing) – gde se podaci čitaju iz pozadinskog sistema automatski i prosleđuju aplikaciji što je brže moguće
  • pauzirani — `readable.read()` metoda mora biti izričito prizvana da bi se očitali skupovi podataka

 

(U dokumentima se pominje i objekt mod ali to je odvojena funkcija gde i tekući i pauzirani streamovi mogu da budu u objektnom modu ili ne) 

 

Svi očitavajući streamovi počinju u pauziranom modu. Za prelazak iz pauziranog u tekući mod, mora se raditi jedna od sledećih operacija koje će u sledećem delu biti opširno pokrivene:

 

  • Dodati `data` osluškivač događaja (event listener)
  • Prizvati `readable.resume()` metodu
  • Priključiti čitajući pišućem uz `readable.pipe()`

 

Za povratak na pauzirani mod, mora se uraditi jedno od sledećih:

 

  • ako nema `pipe()` destinacija, prizvati`readable.pause()`
  • ako ima `pipe()` destinacija, ukloniti ih sve (`readable.unpipe()` je od velike pomoći u tome)

 

Postoje četiri načina konzumiranja čitajućih streamova. Developeri treba da odaberu jedan od metoda konzumiranja podataka. Mešanje API-ja može da dovede do neočekivanih ponašanja i nikada se ne treba praktikovati prilikom konzumiranja podataka iz pojedinog stream-a.

 

  • Upotrebom `readable.pause()`, `readable.resume()` i `data` događaja:

 

`data` događaj

 

  • Emituje se kada god stream prosleđuje skup podataka (automatski prebacuje stream u tekući mod kada se priključi slušaoc)

 

`readable.pause()`

 

  • pauzira stream, prebacuje ga u pauzirani mod

 

`readable.resume()`

 

  • prebacuje stream u tekući mod

 

Primer očitavajućeg streama koji se konzumira dok se podaci pišu u stdout. Ne nešto što je suviše korisno, ali poslužiće kao dobar primer:

 

 

  • Koristeći `readable.read()` i `readable` događaj:

 

`readable` događaj

 

  • ispaljuje se kada postoje pozadinski podaci koje treba očitati (priključivanje slušaoca u očitavajući prebacuje stream u pauzirani mod)

 

`readable.read([size])`

 

  • izvlači jedan deo podataka iz internog buffera i vraća ih. Vraća vrednost „null” ako nema više podataka za očitavanje. Po default-u, podaci će biti vraćeni kao „buffer” ako nije specifirano šifrovanje.

 

Ovo je sličan primer kao prethodni ali koristi drugi način konzimiranja očitavajućeg stream-a:

 

  • Korišćenje `readable.pipe()`:

 

`readable.pipe(writable[, options])`

 

  • priključuje pišući stream očitavajućem stream-u, prebacuje ga u flowing mod i čini da očitavajući prosledi sve svoje podatke priključenom pišućem stram-u. Tok podataka (drugim rečima backpressure) biće obavljen automatski.

 

Ovo je najpogodnije za konzumiranje očitavajućeg stream-a pošto nije opširan a backpressure i zatvaranje stream-a se automatsko obavlja po završetku.

Jednostavan primer iz prethodnih isečaka koda:

 

 

Jedna stvar koja se ne obavlja automatski je propagacija rukovanja greškom. Na primer, ako želimo da se svaki stream zatvori kada dođe do greške, moramo da priključimo osluškivače za greške u događajima. (error event listeners).

 

Primer potpune verzije konzumiranja očitavajućih streamova sa pipe-om uz ispravno rukovanje greškama:

 

 

  • Koristeći Async Iteraciju / Async Generatore:
  • očitavajući streamovi implementiraju [Symbol.asyncIterator] metodu tako da mogu da budu iterisani uz pomoć `for await of`

 

Async Generatori su zvanično dostupni u Node v10+. The async generatori su mešavina async funkcija i generator funkcija. Oni implementiraju [Symbol.asyncIterator] metodu i mogu se koristiti za async iteraciju. U opštem smislu streamovi su iskomadani skup podataka koji se prenose tokom vremena, tako da se Async Generatori savršeno uklapaju. Evo primera:

 

 

Konzumiranje Duplex i Transformirajućih Streamova

 

Duplex streamovi implementiraju i očitavajući i pišući interfejs. Jedna vrsta duplex stream-a je i `PassThrough` stream. Ova vrsta stream-a koristi se kada neki API-ji očekuju očitavajući stream kao parametar, a vi takođe želite ručno da unesete neke podatke.

 

Da biste postigli oba, potrebno je da:

 

  • Stvorite instancu `PassThrough` stream-a
  • Pošaljete stream API-ju (API će koristiti očitavajući interfejs stream-a)
  • Dodati količinu podataka stream-u (koristeći pišući interfejs stream-a)

 

Taj proces je prikazan ispod:

 

Transformišući streamovi su Duplex streamovi. Ovi streamovi poseduju i očitavajući i pišući interfejs, ali njihova glavna svrha je da transformišu prolazeće podatke.

 

Najuobičajeniji primer je kompresija podataka uz ugrađeni transformišući stream iz „zlib” modula:

 

 

Korisna klasa metoda (Node v10+)

 

`Stream.finished(stream, callback)`

 

  • omogućuje notifikacije kada stream više nije očitavajuć, pišući ili je naišao na grešku ili prerano zatvaranje.

 

Ova metoda korisna je za upravljanje greškama ili preduzimanje daljih postupaka kada je stream konzumiran. Primer:

 

 

Stream.pipeline(…streams[, callback])`

 

  • metod pipivanja između stream-a koji prosleđuje greške i vrši potrebno čišćenje, kao i prosleđuje poziv (callback) kada je pipeline kompletiran.

 

Ova metoda je najčistiji i najmanje opširan način izgradnje stream pipeline-a. U poređenju sa `readable.pipe()`, sve se obavlja automatski, uključujući i propagaciju greške i čišćenje resursa po završetku procesa. Primer:

 

 

API za one koji implementiraju Stream

 

Stream API je podložan proširivanju i nudi interfejs u kome developeri mogu da stvore sopstvene produžetke stream-a. Postoje dva načina da implementirate vaš stream:

 

  • Proširite ispravnu parent klasu:

 

Nova klasa stream-a mora da implementira jednu ili više specifičnih metoda koje zavise od vrste stream-a koji se stvara (one će biti navedene dok prolazimo kroz implementaciju svakog tipa stream-a)

Tim metodama prethodi podvučna linija kao prefiks i koriste se samo za implementiranje novih streamova. Ako se upotrebe tokom konzumiranja izazvaće neočekivana ponašanja.

 

  • Proširivanje streamova je pojednostavljeni način da se direktno stvore instance i pruže ispravne metode kao opcije za konstrukciju:

 

Dobro je zapamtiti da u ovom slučaju potrebnim metodama nije pridružena podvučna linija kao prefiks.

 

 

Implementiranje pišućih stream-ova

 

Da bismo implementirali pišući stream, moramo da obezbedimo `writable._write()` metodu za slanje podataka podložnom resursu:

 

`writable._write(chunk, encoding, callback)`

 

  • chunk — odeljak u koji će se pisati podaci
  • encoding — šifrovanje, potrebno ako je tip odeljka `String`
  • callback — povratni poziv, mora da se pozove da bi javio da je pisanje završeno ili nije uspelo.

 

Evo jedne jednostavne implementacije pišućeg stream-a:

 

 

Ovaj stream „pipuje” standardni unos u standardni iznos, osim kada se unese crtica napred, tada se stream baca. Ovaj primer služi za demonstraciju.

 

Implementiranje očitavajućeg stream-a

Da bi se implementirao novi očitavajući stream, moramo da prizovemo „readable constructor” funkciju i implementiramo `readable._read()` metodu (druge metode su opcionalne), dok unutar nje pozivamo `readable.push()`:

 

`readable._read(size)`

 

  • kada se ova metoda prizove, ako su podaci dostupni iz izvora, implementacija treba da počne da gura te podatke u red za očitavanje kroz `this.push(dataChunk)` metodu
  • size — količina bajtova koja treba da se asinhrono očita

 

`readable.push()`

 

  • ovaj metod namenjen je prizivanju samo od strane očitavajućih implementatora i samo iz okvira `readable._read()` metode. Kada je prizvana, grupa podataka biće dodata internom redu za korisnike stream-a da je konzumiraju (`null` je karakter za terminaciju).

 

Ova implementacija očitavajućeg stream-a prikazana ispod generisaće nasumično određene integere između 1 i 10 svake sekunde kroz jedan minut, a tada će stream da završi generaciju podataka i zatvori se.

 

 

Implementiranje Duplex stream-a

 

Duplex stream implementira i očitavajuće i pišuće interfejse nezavisno jedan od drugoga. Dupleks klasa obično nasleđuje iz stream.Readable i parazitski stream.Writable (JavaScript ne podržava višestruko nasleđivanje).

 

Da biste stvorili drugačiju implementaciju duplex stream-a, morate da implementirate sve potrebne metode za očitavajuće i pišuće streamove, a to su `readable._read()` i `writable._write()`.

 

Strim prikazan ispod loguje sve iz stdin (pišuće strane), i pipuje nasumične „smajlije” u stdout (očitavajuća strama) sve dok se „tužni smajli” ne pojavi, kada se očitavajući stream terminiše.

 

 

Implementacija transformirajućeg stream-a

 

Transformirajući stream sličan je duplex stream-u (on je vrsta duplex stream-a) ali poseduje jednostavniji interfejs. Iznos se izvodi iz unosa. Nema potrebe da iznos bude iste veličine kao unos, da ima isti broj grupa podataka ili da stigne u isto vreme.

 

Samo jedna metoda je potrebna za implementiranje transformirajućeg stream-a, a to je `transform._transform()` metoda (`transform._flush()` je opcionalna).

 

`transform._transform(chunk, encoding, callback)`

 

  • rukovodi bajtovima koji se pišu, sračunava iznos i zatim prosleđuje taj iznos u očitavajući deo uz pomoć `readable.push()` metode. Može se prizvati više puta da generiše iznos za jednu primljenu grupu podataka ili da uopšte ne generiše iznos:
  • chunk — deo podataka koji će se pisati
  • encoding — potrebno za grupe podataka tipa `String`
  • callback(err, transformedChunk)

 

`transform._flush(callback)` — optional.

 

  • U nekim slučajevima operacija transformisanja možda će morati da emituje dodatnu količinu podataka na kraju stream-a, pošto se izvrše neka izračunavanja. Pre nego što se stream okonča, ova metoda čisti te podatke.

 

 

Zaključak

 

U ovom članku naučili smo kako da konzumiramo sve tipove Node.js Streamova. Takođe smo naučili da implementiramo sopstvene streamove i koristimo njihove moćne funkcije.

 

Node.js Streamovi imaju reputaciju da je sa njima teško raditi, ali uz dobro razumevanje njihovih određenih API-ja postaće dragocena alatka za vaš rad.