RDD gamit ang Spark: Ang Building Block ng Apache Spark

Ang blog na ito sa RDD gamit ang Spark ay magbibigay sa iyo ng isang detalyado at komprehensibong kaalaman tungkol sa RDD, na kung saan ay ang pangunahing yunit ng Spark & ​​Kung gaano ito kapaki-pakinabang.

, Ang salitang mismong iyon ay sapat na upang makabuo ng isang spark sa bawat isip ng Hadoop engineer. SA n sa memorya kasangkapan sa pagpoproseso na kung saan ay mabilis ang kidlat sa computing computing. Kung ikukumpara sa MapReduce, ang pagbabahagi ng data na nasa memorya ay gumagawa ng mga RDD 10-100x mas mabilis kaysa sa pagbabahagi ng network at disk at lahat ng ito ay posible dahil sa mga RDD (Mga matatag na Ibinahaging mga hanay ng Data). Ang mga pangunahing puntong pinagtutuunan natin ngayon sa RDD na ito gamit ang artikulong Spark ay:

Kailangan para sa RDDs?

Bakit kailangan natin ng RDD? -RDD gamit ang Spark





Ang mundo ay umuunlad kasama at Agham sa Data dahil sa pagsulong sa . Mga algorithm batay sa Pag-urong , , at na tumatakbo sa Ipinamahagi Iterative Comput ation fashion na kasama ang Reusing at Pagbabahagi ng data sa maraming mga yunit ng computing.

Ang tradisyunal mga diskarte na kailangan ng isang Matatag Intermediate at Ipinamahagi imbakan tulad HDFS na binubuo ng paulit-ulit na mga pagkalkula na may mga pagtitiklop ng data at serialization ng data, na ginawang mas mabagal ang proseso. Ang paghanap ng solusyon ay hindi kailanman madali.



Ito ay kung saan Mga RDD Ang (Resilient Distribution Datasets) ay may malaking larawan.

RDD Ang mga ito ay madaling gamitin at walang kahirap-hirap upang lumikha ng bilang ang data ay na-import mula sa mga mapagkukunan ng data at nahulog sa RDDs. Dagdag dito, ang pagpapatakbo ay inilalapat upang maproseso ang mga ito. Sila ay ibinahagi koleksyon ng memorya na may mga pahintulot bilang Basahin lamang at ang pinakamahalaga, sila ang Mapagparaya sa pagkakamali .



Kung mayroon man pagkahati ng data ng ang RDD ay nawala na , maaari itong muling buhayin sa pamamagitan ng paglalapat ng pareho pagbabago operasyon sa nawala na pagkahati sa angkan , sa halip na iproseso ang lahat ng data mula sa simula. Ang ganitong uri ng diskarte sa mga pangyayari sa real time ay maaaring maganap ang mga himala sa mga sitwasyon ng pagkawala ng data o kung ang isang system ay wala.

Ano ang mga RDD?

RDD o ( Nakatakdang nakabahaging hanay ng Data ) ay isang pangunahing kaalaman istraktura ng data sa Spark. Ang termino Matatag tumutukoy sa kakayahan na awtomatikong bumubuo ng data o data lumiligid pabalik sa orihinal na estado kapag ang isang hindi inaasahang kalamidad ay nangyayari na may posibilidad ng pagkawala ng data.

Ang data na nakasulat sa RDDs ay naghiwalay at nakaimbak sa maraming napatay na node . Kung ang isang pagpapatupad node nabigo sa oras ng pagpapatakbo, pagkatapos ay agad itong makakakuha ng back up mula sa susunod na maipapatupad na node . Ito ang dahilan kung bakit ang RDD ay isinasaalang-alang bilang isang advanced na uri ng mga istraktura ng data kung ihahambing sa iba pang mga tradisyonal na istruktura ng data. Maaaring iimbak ng mga RDD ang nakaayos na data, hindi istraktura at semi-istrukturang data.

Sumulong tayo sa ating RDD gamit ang Spark blog at alamin ang tungkol sa mga natatanging tampok ng RDD na nagbibigay nito ng higit sa iba pang mga uri ng istraktura ng data.

Mga tampok ng RDD

  • Sa alaala (RAM) Pagkuwenta : Ang konsepto ng pagkalkula ng In-Memory ay kumukuha ng pagproseso ng data sa isang mas mabilis at mahusay na yugto kung saan ang pangkalahatang pagganap ng system ay na-upgrade
  • L ang kanyang Evaluation : Ang katagang pagtatasa ng tamad na sinasabi ng mga pagbabago ay inilapat sa data sa RDD, ngunit ang output ay hindi nabuo. Sa halip, ang inilapat na mga pagbabago ay naka-log
  • Pagtitiyaga : Ang mga resulta RDD ay palaging magagamit muli
  • Magaspang na Grained Operations : Maaaring maglapat ang gumagamit ng mga pagbabago sa lahat ng mga elemento sa mga hanay ng data mapa, salain o pangkat ng operasyon.
  • Fault Tolerant : Kung may pagkawala ng data, magagawa ng system gumulong pabalik sa nito orihinal na estado sa pamamagitan ng paggamit ng naka-log mga pagbabago .
  • Kawalan ng kakayahan : Ang kahulugan ng data, nakuha o nilikha ay hindi maaaring nagbago sa sandaling naka-log in ito sa system. Kung sakaling kailanganin mong i-access at baguhin ang umiiral na RDD, dapat kang lumikha ng isang bagong RDD sa pamamagitan ng paglalapat ng isang hanay ng Pagbabago gumagana sa kasalukuyan o naunang RDD.
  • Naghahati : Ito ang mahalagang yunit ng parallelism sa Spark RDD. Bilang default, ang bilang ng mga partisyon na nilikha ay batay sa iyong mapagkukunan ng data. Maaari mo ring magpasya ang bilang ng mga partisyon na nais mong gawin gamit ang pasadyang pagkahati pagpapaandar

Paglikha ng RDD gamit ang Spark

Ang mga RDD ay maaaring malikha sa tatlong paraan:

  1. Pagbasa ng data mula sa mga parallelized na koleksyon
val PCRDD = spark.sparkContext.parallelize (Array ('Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat'), 2) val resultRDD = PCRDD.collect () resultRDD.collect ( ) .asonach (println)
  1. Paglalapat pagbabago sa mga nakaraang RDD
val words = spark.sparkContext.parallelize (Seq ('Spark', 'is', 'a', 'very', 'powerful', 'language')) val wordpair = words.map (w = (w.charAt ( 0), w)) wordpair.collect (). Frontach (println)
  1. Pagbasa ng data mula sa panlabas na imbakan o mga file path tulad ng HDFS o HBase
val Sparkfile = spark.read.textFile ('/ user / edureka_566977 / spark / spark.txt.') Sparkfile.collect ()

Ang mga pagpapatakbo na isinagawa sa RDDs:

Higit sa lahat mayroong dalawang uri ng pagpapatakbo na isinasagawa sa RDDs, lalo:

  • Pagbabago
  • Mga kilos

Pagbabago : Ang operasyon nag-a-apply kami sa RDDs sa filter, pag-access at baguhin ang data sa magulang RDD upang makabuo ng a sunud-sunod na RDD ay tinatawag na pagbabago . Ang bagong RDD ay nagbabalik ng isang pointer sa nakaraang RDD na tinitiyak ang pagtitiwala sa pagitan nila.

Ang mga pagbabago ay Mga Tamad na Pagsusuri, sa madaling salita, ang mga pagpapatakbo na inilapat sa RDD na iyong pinagtatrabahuhan ay mai-log ngunit hindi pinatay. Ang system ay nagtatapon ng isang resulta o pagbubukod matapos na ma-trigger ang Kilos .

kung paano tapusin ang isang pamamaraan sa java

Maaari nating hatiin ang mga pagbabago sa dalawang uri tulad ng sa ibaba:

  • Makitid na Pagbabago
  • Malapad na Pagbabago

Makitid na Pagbabago Naglalapat kami ng makitid na pagbabago sa a solong pagkahati ng magulang RDD upang makabuo ng isang bagong RDD bilang kinakailangang data upang maproseso ang RDD ay magagamit sa isang solong pagkahati ng magulang ASD . Ang mga halimbawa para sa makitid na pagbabago ay:

  • mapa ()
  • filter ()
  • flatMap ()
  • pagkahati ()
  • mapPartitions ()

Malapad na Pagbabago: Nalalapat namin ang malawak na pagbabago maraming partisyon upang makabuo ng isang bagong RDD. Ang data na kinakailangan upang maproseso ang RDD ay magagamit sa maraming mga pagkahati ng magulang ASD . Ang mga halimbawa para sa malawak na pagbabago ay:

  • bawasan niBy ()
  • unyon ()

Mga kilos : Ang mga kilos ay nagtuturo sa Apache Spark na mag-apply pagkalkula at ipasa ang resulta o isang pagbubukod pabalik sa driver ng RDD. Ilan sa mga pagkilos ang kasama:

  • mangolekta ()
  • bilangin ()
  • kunin ()
  • una ()

Ipaalam sa amin praktikal na ilapat ang mga pagpapatakbo sa RDDs:

IPL (Indian Premier League) ay isang paligsahan sa cricket na may hipe ito sa isang pinakamataas na antas. Kaya, hinayaan ngayon na makuha ang ating mga kamay sa hanay ng data ng IPL at ipatupad ang aming RDD gamit ang Spark.

  • Una, mag-download tayo ng isang data ng pagtutugma ng CSV ng IPL. Pagkatapos i-download ito, nagsisimula itong magmukhang isang EXCEL file na may mga hilera at haligi.

Sa susunod na hakbang, sunugin namin ang spark at i-load ang mga file ng match.csv mula sa lokasyon nito, sa aking kaso mycsvang lokasyon ng file ay “/User/edureka_566977/test/matches.csv”

Ngayon ay magsimula tayo sa Pagbabago bahagi muna:

  • mapa ():

Gumagamit kami Pagbabago ng Mapa upang mag-apply ng isang tukoy na operasyon ng pagbabago sa bawat elemento ng isang RDD. Lumilikha kami dito ng isang RDD sa pangalang CKfile kung saan iimbak ang amingcsvfile Lilikha kami ng isa pang RDD na tinawag na Mga Estado itabi ang mga detalye ng lungsod .

spark2-shell val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.collect.foreach (println) val states = CKfile.map (_. split (',') (2)) nakasaad.collect (). frontach (println)

  • filter ():

Pagsasaayos ng filter, ang pangalan mismo ay naglalarawan ng paggamit nito. Ginagamit namin ang operasyon ng pagbabago na ito upang i-filter ang pumipiling data mula sa isang koleksyon ng ibinigay na data. Nag-a-apply kami pagpapatakbo ng filter dito upang makuha ang mga tala ng mga tugma sa IPL ng taon 2017 at iimbak ito sa fil RDD.

val fil = CKfile.filter (line => line.contains ('2017')) fil.collect (). precach (println)

  • flatMap ():

Nag-apply kami ng flatMap ay isang operasyon ng pagbabago sa bawat isa sa mga elemento ng isang RDD upang lumikha ng isang bagongRDD. Ito ay katulad ng pagbabago ng Mapa. dito kami nag applyFlatmapsa dumura ang mga laban ng lungsod ng Hyderabad at iimbak ang data safilRDDRDD.

val filRDD = fil.flatMap (line => line.split ('Hyderabad')). mangolekta ()

  • pagkahati ():

Ang bawat data na isinusulat namin sa isang RDD ay nahahati sa isang tiyak na bilang ng mga pagkahati. Ginagamit namin ang pagbabagong ito upang hanapin ang bilang ng mga partisyon ang data ay talagang nahahati sa.

val fil = CKfile.filter (line => line.contains ('2017')) fil.partitions.size

  • mapPartitions ():

Isinasaalang-alang namin ang MapPatitions bilang isang kahalili ng Map () atunahan() magkasama. Gumagamit kami ng mga mapPartition dito upang hanapin ang bilang ng hanay mayroon kaming sa aming RD RD.

val fil = CKfile.filter (line => line.contains ('2016')) fil.mapPartitions (idx => Array (idx.size) .iterator) .collect

  • bawasanBy ():

Gumagamit kamiBawasanBy() sa Mga pares ng Key-Value . Ginamit namin ang pagbabagong ito sa amingcsvfile upang mahanap ang player na may pinakamataas na Man ng mga tugma .

val ManOfTheMatch = CKfile.map (_. split (',') (13)) val MOTMcount = ManOfTheMatch.map (WINcount => (WINcount, 1)) val ManOTH = MOTMcount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) ManOTH. Kunin (10) .foreach (println)

  • unyon ():

Ipinapaliwanag ng pangalan ang lahat, Ginagamit namin ang pagbabago ng unyon ay upang magkasama ang dalawang RDD . Dito lumilikha kami ng dalawang RDD namely fil at fil2. Naglalaman ang fil RDD ng mga tala ng mga tugma sa IPL 2017 at ang fil2 RDD ay naglalaman ng 2016 IPL record ng tugma.

val fil = CKfile.filter (line => line.contains ('2017')) val fil2 = CKfile.filter (line => line.contains ('2016')) val uninRDD = fil.union (fil2)

Magsimula tayo sa Kilos bahagi kung saan ipinapakita namin ang aktwal na output:

ano ang kurso sa agham ng data

  • kolektahin ():

Kolektahin ang pagkilos na dati naming ginagamit ipakita ang mga nilalaman sa RDD.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.collect.foreach (println)

  • bilangin ():

Bilanginay isang aksyon na ginagamit namin upang mabilang ang bilang ng mga talaan naroroon sa RDD.Ditoginagamit namin ang operasyong ito upang mabilang ang kabuuang bilang ng mga tala sa aming file na match.csv.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.count ()

  • kunin ():

Ang pagkuha ay isang operasyon ng Pagkilos na katulad upang mangolekta ngunit ang pagkakaiba lamang ay maaari itong mai-print ang anuman pumipili na bilang ng mga hilera ayon sa kahilingan ng gumagamit. Dito inilalapat namin ang sumusunod na code upang mai-print ang nangungunang sampung nangungunang ulat.

val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.collect (). precach (println) statecountm. kunin ang (10) .asonach (println)

  • una ():

Ang Una () ay isang operasyon ng pagkilos na katulad ng pagkolekta () at gawin ()itoginamit upang mai-print ang pinakamataas na ulat ng output Dito ginagamit namin ang unang () operasyon upang hanapin ang maximum na bilang ng mga tugma na nilalaro sa isang partikular na lungsod at nakukuha namin ang Mumbai bilang output.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') val states = CKfile.map (_. split (',') (2)) val Scount = states.map (Scount => ( Scount, 1)) scala & gt val statecount = Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) Scount.reduceByKey ((x, y) => x + y) .collect.achach (println) val statecountm = Scount.reduceByKey ((x, y) => x + y). mapa (tup => (tup._2, tup._1)) sortByKey (false) statecountm.first ()

Upang gawin ang aming proseso ng aming pag-aaral RDD gamit ang Spark, kahit na higit pa, kawili-wili, nakagawa ako ng isang kagiliw-giliw na kaso ng paggamit.

RDD gamit ang Spark: Pokemon Use Case

  • Una, I-download natin ang isang Pokemon.csv file at i-load ito sa spark-shell tulad ng ginawa namin sa file na Matches.csv.
val PokemonDataRDD1 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') PokemonDataRDD1.collect (). unahan (println)

Ang mga Pokemon ay talagang magagamit sa isang malaking pagkakaiba-iba, Makahanap kami ng ilang mga pagkakaiba-iba.

  • Inaalis ang schema mula sa Pokemon.csv file

Maaaring hindi natin kailangan ang Iskema ng Pokemon.csv file. Samakatuwid, inaalis namin ito.

val Head = PokemonDataRDD1.first () val NoHeader = PokemonDataRDD1.filter (line =>! line.equals (Head))

  • Paghahanap ng bilang ng mga partisyon ang aming pokemon.csv ay ipinamamahagi sa.
println ('No.ofpartitions =' + NoHeader.partitions.size)

  • Water Pokemon

Paghanap ng bilang ng Pokemon ng Tubig

val WaterRDD = PokemonDataRDD1.filter (line => line.contains ('Water')) WaterRDD.collect (). precach (println)

  • Fire Pokemon

Paghanap ng bilang ng Fire pokemon

val FireRDD = PokemonDataRDD1.filter (line => line.contains ('Fire')) FireRDD.collect (). frontach (println)

  • Maaari din nating tuklasin ang populasyon ng ibang uri ng pokemon gamit ang bilang ng pag-andar
WaterRDD.count () FireRDD.count ()

  • Dahil gusto ko ang laro ng diskarte sa pagtatanggol hanapin natin ang pokemon na may maximum na pagtatanggol.
val defenceList = NoHeader.map {x => x.split (',')}. mapa {x => (x (6) .toDouble)} println ('Highest_Defence:' + defenceList.max ())

  • Alam natin ang maximum halaga ng lakas ng pagtatanggol ngunit hindi namin alam kung aling pokemon ito. kaya, hanapin natin kung alin ito pokemon
val defWithPokemonName = NoHeader.map {x => x.split (',')}. mapa {x => (x (6) .toDouble, x (1))} val MaxDefencePokemon = defWithPokemonName.groupByKey.takeOrdered (1) (Pag-order ng [Dobleng] .reverse.on (_._ 1)) MaxDefencePokemon.foreach (println)

  • Ngayon ay pag-uri-uriin natin ang pokemon pinakamaliit na Depensa
val minDefencePokemon = defenceList.distinct.sortBy (x => x.toDouble, true, 1) minDefencePokemon.take (5) .achach (println)

  • Ngayon tingnan natin ang Pokemon na may a hindi gaanong nagtatanggol na diskarte.
val PokemonDataRDD2 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') val Head2 = PokemonDataRDD2.first () val NoHeader2 = PokemonDataRDD2.filter (line =>! line.equals (Head)) val defWithPemon .map {x => x.split (',')}. mapa {x => (x (6) .toDouble, x (1))} val MinDefencePokemon2 = defWithPokemonName2.groupByKey.takeOrdered (1) (Pag-order [Double ] .on (_._ 1)) MinDefencePokemon2.foreach (println)

Kaya, sa pamamagitan nito, natapos namin ang RDD na ito gamit ang artikulong Spark. Inaasahan kong nag-ilaw kami ng kaunting ilaw sa iyong kaalaman tungkol sa RDD, kanilang mga tampok at iba't ibang uri ng pagpapatakbo na maaaring maisagawa sa kanila.

Ang artikulong ito batay sa ay dinisenyo upang ihanda ka para sa Cloudera Hadoop at Spark Developer Certification Exam (CCA175). Makakakuha ka ng isang malalim na kaalaman sa Apache Spark at Spark Ecosystem, na kasama ang Spark RDD, Spark SQL, Spark MLlib at Spark Streaming. Makakakuha ka ng komprehensibong kaalaman sa wika ng Scala Programming, HDFS, Sqoop, Flume, Spark GraphX ​​at Messaging System tulad ng Kafka.