Cumulative Stateful Transformation Sa Apache Spark Streaming



Tumatalakay ang post sa blog na ito ng mga makabuluhang pagbabago sa Spark Streaming. Alamin ang lahat tungkol sa pinagsama-samang pagsubaybay at up-skill para sa isang karera sa Hadoop Spark.

Ibinigay ni Prithviraj Bose

Sa aking nakaraang blog ay tinalakay ko ang mga makabuluhang pagbabago na ginagamit ang windowing konsepto ng Apache Spark Streaming. Maaari mo itong basahin dito .





Sa post na ito tatalakayin ko ang pinagsama-samang pagpapatakbo ng estado sa Apache Spark Streaming. Kung bago ka sa Spark Streaming pagkatapos ay masidhi kong inirerekumenda na basahin ang dati kong blog upang maunawaan kung paano gumagana ang windowing.

Mga uri ng Pagbabago ng Estado sa Spark Streaming (Patuloy…)

> Pagsubaybay na pang-agulasyon

Ginamit namin ang bawasanByKeyAndWindow (…) Ang API upang subaybayan ang mga estado ng mga susi, subalit ang windowing ay nagdudulot ng mga limitasyon para sa ilang mga kaso ng paggamit. Paano kung nais nating maipon ang mga estado ng mga susi sa buong halip na limitahan ito sa isang window ng oras? Sa kasong iyon kakailanganin naming gamitin updateStateByKey (…) APOY.



Ang API na ito ay ipinakilala sa Spark 1.3.0 at naging tanyag. Gayunpaman ang API na ito ay may ilang overhead ng pagganap, ang pagganap nito ay napapababa ng laki ng mga estado na tumataas sa paglipas ng panahon. Sumulat ako ng isang sample upang maipakita ang paggamit ng API na ito. Mahahanap mo ang code dito .

Ang Spark 1.6.0 ay nagpakilala ng isang bagong API mapWithState (…) na malulutas ang mga overhead ng pagganap na ipinakita ng updateStateByKey (…) . Sa blog na ito tatalakayin ko ang partikular na API na ito gamit ang isang sample na programa na isinulat ko. Mahahanap mo ang code dito .

Bago ako sumisid sa isang code walk-through, magtipid tayo ng ilang mga salita sa pag-checkpoint. Para sa anumang pagbabago ng estado, ang pag-checkpoint ay sapilitan. Ang checkpointing ay isang mekanismo upang maibalik ang estado ng mga susi kung sakaling mabigo ang programa ng driver. Kapag nag-restart ang driver, ang estado ng mga key ay naibalik mula sa mga file ng checkpoint. Ang mga lokasyon ng checkpoint ay karaniwang HDFS o Amazon S3 o anumang maaasahang imbakan. Habang sinusubukan ang code, maaari ding mag-imbak ang isa sa lokal na file system.



Sa sample na programa, nakikinig kami sa stream ng teksto ng socket sa host = localhost at port = 9999. Ito ay nagpapahiwatig ng papasok na stream sa (mga salita, hindi. Ng mga pangyayari) at sinusubaybayan ang bilang ng salita gamit ang 1.6.0 API mapWithState (…) . Bilang karagdagan, ang mga key na walang mga pag-update ay tinanggal gamit ang StateSpec.outout API Nagchecheckpoint kami sa HDFS at ang dalas ng checkpointing ay bawat 20 segundo.

Lumikha muna tayo ng sesyon ng Spark Streaming,

Spark-streaming-session

Lumilikha kami ng a checkpointDir sa HDFS at pagkatapos ay tawagan ang pamamaraan ng object getOrCreate (…) . Ang getOrCreate Sinusuri ng API ang checkpointDir upang makita kung mayroong anumang nakaraang mga estado upang maibalik, kung mayroon iyan, pagkatapos ay muling likha nito ang session ng Spark Streaming at ina-update ang mga estado ng mga susi mula sa data na nakaimbak sa mga file bago magpatuloy sa bagong data. Kung hindi man lumilikha ito ng isang bagong sesyon ng Spark Streaming.

Ang getOrCreate Kinukuha ang pangalan ng direktoryo ng checkpoint at isang pagpapaandar (na pinangalanan namin createFunc ) kaninong pirma dapat () => StreamingContext .

kung paano mag-cast ng doble sa int java

Suriin natin ang code sa loob createFunc .

Linya # 2: Lumilikha kami ng isang konteksto ng streaming na may pangalan ng trabaho sa 'TestMapWithStateJob' at batch interval = 5 segundo.

Linya # 5: Itakda ang direktoryo ng checkpoint.

Linya # 8: Itakda ang detalye ng estado gamit ang klase org.apache.streaming.StateSpec bagay Una naming itinakda ang pagpapaandar na susubaybay sa estado, pagkatapos ay itinakda namin ang bilang ng mga pagkahati para sa mga nagresultang DStream na mabubuo sa mga kasunod na pagbabago. Sa wakas itinakda namin ang timeout (sa 30 segundo) kung saan kung ang anumang pag-update para sa isang susi ay hindi natanggap sa loob ng 30 segundo kung gayon ang pangunahing estado ay aalisin.

Linya 12 #: I-set up ang socket stream, patagin ang papasok na data ng batch, lumikha ng isang pares ng key-halaga, tawag mapaWithState , itakda ang agwat ng checkpointing sa 20s at sa wakas ay mai-print ang mga resulta.

Ang balangkas ng Spark ay tumatawag sa ika at lumikhaFunc para sa bawat susi sa nakaraang halaga at kasalukuyang estado. Kinukuwenta namin ang kabuuan at na-update ang estado sa pinagsama-samang kabuuan at sa wakas ay ibabalik namin ang kabuuan para sa susi.

kung paano makahanap ng pinakamalaking bilang sa isang array java

Mga Pinagmulan ng Github -> TestMapStateWithKey.scala , TestUpdateStateByKey.scala

May tanong ba sa amin? Mangyaring banggitin ito sa seksyon ng mga komento at babalikan ka namin.

Mga Kaugnay na Post:

Magsimula sa Apache Spark & ​​Scala

Mga Stateful Transformation na may Windowing sa Spark Streaming