python - Union RDDs after a loop PySpark -


i'm using pyspark , i'm looking way modify 4 rdds, included in list. when display list, have :

enter image description here

so given :

for r in repartionned_rdd:      print r.collect() 

gives :

[(u'_guid_ncw7sufnch_mfw3si3qtvbcbqxkd4mtsdjvwe7hngng=', (u'f', u'ksjakod2|ktc9zf9h'))] [(u'_guid_ocs2au-sknxzpe0urpdp4hg1vvhgpzraayjnwrqpkbw=', (u'f', u'kxrylzua|kpsxjwh2')), (u'_guid_txh15ulaeudbc4z_nleoj2xoybfa-08imqiblfyskps=', (u'f', u'bda54c71-cd1e-4eb7-856c-ba2e6def30c8|6e189e07-807e-41a2-a60a-b07d894a2905')), (u'_guid_ehct6nyd9l3q3nv9zroawveo3bndt4tvbu_fmbren1g=', (u'm', u'537d69b4-743a-45b9-bed1-a25aa5926f13|2bb3e466-edc5-4302-b102-3bddb1f8c490|aa4760de-104c-4dc3-94c3-336427f89723')), (u'_guid_9f4ph5gztln9ilwngzwkpmcct4n3je6-93im_130f-c=', (u'f', u'koqqbzhu|krdt5gc4')), (u'_guid_nple_f-zoohnyixjsgxwovryc1u4bnfxkow3p0mdufy=', (u'f', u'kh3tizr1|khs0trsh|k3gebqb_|kbrvncdx|jg2udy8m|529816a3-ee43-4423-961f-8aedaf25d58c')), (u'331d8410d4924e72b0f0585e888c85ce', (u'f', u'1f37807a-cbea-4b78-85d7-5a97b37b539e'))] [(u'28b195c271f14a329235c262e7baecbf', (u'm', u'50c41480-a94e-4afa-a732-b6ed7a057239'))] [(u'c65ac2064bc14116a363125392dcc6f7', (u'f', u'77e4b9b3-83b4-4553-b274-7a16f553cf05')), (u'171f92200d634d62bdc6685bdb7a94e3', (u'f', u'bdf53cb6-695d-4dde-b0c1-d1a34ebea6f7|a09e4074-c22e-48a1-9976-ee2151b5888c|k1umlb5m|639b02b4-24ad-4069-99a2-c68e8c8f7f06|kje3wxir')), (u'_guid_wqzizefxcix9cihupewof2euoic0jiosxvxn98_zch8=', (u'f', u'f0992237-2598-4b13-aa8a-c37d436b901c|c80d1a89-dd84-4734-838f-128f99ebdd20|kthpuvu0')), (u'_guid_ufocko48drwr50yjn26nrix5mlyonwmalxwcmly7oqq=', (u'f', u'kly10yxx|kycvx_km'))] 

my objectif add kind of new "column" every rdd in list. row contains unique index every rdd. code :

for i, rdd in enumerate(repartionned_rdd):      new_rdd = rdd.map(lambda x : x + (float(i), ))      print new_rdd.collect() 

which gives :

[(u'_guid_ncw7sufnch_mfw3si3qtvbcbqxkd4mtsdjvwe7hngng=',   (u'f', u'ksjakod2|ktc9zf9h'), 0.0)]  [(u'_guid_ocs2au-sknxzpe0urpdp4hg1vvhgpzraayjnwrqpkbw=',   (u'f', u'kxrylzua|kpsxjwh2'), 1.0),   (u'_guid_txh15ulaeudbc4z_nleoj2xoybfa-08imqiblfyskps=',   (u'f', u'bda54c71-cd1e-4eb7-856c-ba2e6def30c8|6e189e07-807e-41a2-a60a-b07d894a2905'), 1.0),  (u'_guid_ehct6nyd9l3q3nv9zroawveo3bndt4tvbu_fmbren1g=',   (u'm', u'537d69b4-743a-45b9-bed1-a25aa5926f13|2bb3e466-edc5-4302-b102-3bddb1f8c490|aa4760de-104c-4dc3-94c3-336427f89723'), 1.0),   (u'_guid_9f4ph5gztln9ilwngzwkpmcct4n3je6-93im_130f-c=',   (u'f', u'koqqbzhu|krdt5gc4'), 1.0),   (u'_guid_nple_f-zoohnyixjsgxwovryc1u4bnfxkow3p0mdufy=',   (u'f', u'kh3tizr1|khs0trsh|k3gebqb_|kbrvncdx|jg2udy8m|529816a3-ee43-4423-961f-8aedaf25d58c'), 1.0),   (u'331d8410d4924e72b0f0585e888c85ce',   (u'f', u'1f37807a-cbea-4b78-85d7-5a97b37b539e'), 1.0)]  [(u'28b195c271f14a329235c262e7baecbf',   (u'm', u'50c41480-a94e-4afa-a732-b6ed7a057239'), 2.0)]  [(u'c65ac2064bc14116a363125392dcc6f7',   (u'f', u'77e4b9b3-83b4-4553-b274-7a16f553cf05'), 3.0),   (u'171f92200d634d62bdc6685bdb7a94e3',   (u'f', u'bdf53cb6-695d-4dde-b0c1-d1a34ebea6f7|a09e4074-c22e-48a1-9976-ee2151b5888c|k1umlb5m|639b02b4-24ad-4069-99a2-c68e8c8f7f06|kje3wxir'), 3.0),  (u'_guid_wqzizefxcix9cihupewof2euoic0jiosxvxn98_zch8=',   (u'f', u'f0992237-2598-4b13-aa8a-c37d436b901c|c80d1a89-dd84-4734-838f-128f99ebdd20|kthpuvu0'), 3.0),  (u'_guid_ufocko48drwr50yjn26nrix5mlyonwmalxwcmly7oqq=',   (u'f', u'kly10yxx|kycvx_km'), 3.0)] 

so every row in new_rdd contains new columns, concretely rdd's index (as mentioned in code!)

my objectif put new rdds in 1 unique rdd. tried :

all_rdds_list =[]  i, rdd in enumerate(repartionned_rdd):      new_rdd = rdd.map(lambda x : x + (float(i), ))      all_rdds_list.append(new_rdd) 

but when tried display rdds, got :

for x in all_rdds_list:      print x.collect() 

result :

[(u'_guid_ncw7sufnch_mfw3si3qtvbcbqxkd4mtsdjvwe7hngng=',   (u'f', u'ksjakod2|ktc9zf9h'), 3.0)]  [(u'_guid_ocs2au-sknxzpe0urpdp4hg1vvhgpzraayjnwrqpkbw=',   (u'f', u'kxrylzua|kpsxjwh2'), 3.0),   (u'_guid_txh15ulaeudbc4z_nleoj2xoybfa-08imqiblfyskps=',   (u'f', u'bda54c71-cd1e-4eb7-856c-ba2e6def30c8|6e189e07-807e-41a2-a60a-b07d894a2905'), 3.0),  (u'_guid_ehct6nyd9l3q3nv9zroawveo3bndt4tvbu_fmbren1g=',   (u'm', u'537d69b4-743a-45b9-bed1-a25aa5926f13|2bb3e466-edc5-4302-b102-3bddb1f8c490|aa4760de-104c-4dc3-94c3-336427f89723'), 3.0),   (u'_guid_9f4ph5gztln9ilwngzwkpmcct4n3je6-93im_130f-c=',   (u'f', u'koqqbzhu|krdt5gc4'), 3.0),   (u'_guid_nple_f-zoohnyixjsgxwovryc1u4bnfxkow3p0mdufy=',   (u'f', u'kh3tizr1|khs0trsh|k3gebqb_|kbrvncdx|jg2udy8m|529816a3-ee43-4423-961f-8aedaf25d58c'), 3.0),   (u'331d8410d4924e72b0f0585e888c85ce',   (u'f', u'1f37807a-cbea-4b78-85d7-5a97b37b539e'), 3.0)]  [(u'28b195c271f14a329235c262e7baecbf',   (u'm', u'50c41480-a94e-4afa-a732-b6ed7a057239'), 3.0)]  [(u'c65ac2064bc14116a363125392dcc6f7',   (u'f', u'77e4b9b3-83b4-4553-b274-7a16f553cf05'), 3.0),   (u'171f92200d634d62bdc6685bdb7a94e3',   (u'f', u'bdf53cb6-695d-4dde-b0c1-d1a34ebea6f7|a09e4074-c22e-48a1-9976-ee2151b5888c|k1umlb5m|639b02b4-24ad-4069-99a2-c68e8c8f7f06|kje3wxir'), 3.0),  (u'_guid_wqzizefxcix9cihupewof2euoic0jiosxvxn98_zch8=',   (u'f', u'f0992237-2598-4b13-aa8a-c37d436b901c|c80d1a89-dd84-4734-838f-128f99ebdd20|kthpuvu0'), 3.0),  (u'_guid_ufocko48drwr50yjn26nrix5mlyonwmalxwcmly7oqq=',   (u'f', u'kly10yxx|kycvx_km'), 3.0)] 

help ? thx !

you have 2 problems in approach. first use variable changes before method assignment evaluated. map calls transformations , execute when apply action (like collect). why when collect inside enumerate loop see correct additional column, in later example chooses last value of i each mapping.

the second issue if you're trying union rdds should use union function rather list of rdds. if wanted list of rdds, can replace union below list append had before.

full_rdd = none i, rdd in enumerate(repartionned_rdd):     new_rdd = rdd.map(lambda x : x + (float(i),))     if full_rdd none:         full_rdd = new_rdd     else:         full_rdd = sc.union([full_rdd, new_rdd])     # force lazy evaluation execute before `i` changes     full_rdd.count() 

Comments

Popular posts from this blog

aws api gateway - SerializationException in posting new Records via Dynamodb Proxy Service in API -

depending on nth recurrence of job in control M -

asp.net - Problems sending emails from forum -