python - Union RDDs after a loop PySpark -
i'm using pyspark , i'm looking way modify 4 rdds, included in list. when display list, have :
so given :
for r in repartionned_rdd: print r.collect()
gives :
[(u'_guid_ncw7sufnch_mfw3si3qtvbcbqxkd4mtsdjvwe7hngng=', (u'f', u'ksjakod2|ktc9zf9h'))] [(u'_guid_ocs2au-sknxzpe0urpdp4hg1vvhgpzraayjnwrqpkbw=', (u'f', u'kxrylzua|kpsxjwh2')), (u'_guid_txh15ulaeudbc4z_nleoj2xoybfa-08imqiblfyskps=', (u'f', u'bda54c71-cd1e-4eb7-856c-ba2e6def30c8|6e189e07-807e-41a2-a60a-b07d894a2905')), (u'_guid_ehct6nyd9l3q3nv9zroawveo3bndt4tvbu_fmbren1g=', (u'm', u'537d69b4-743a-45b9-bed1-a25aa5926f13|2bb3e466-edc5-4302-b102-3bddb1f8c490|aa4760de-104c-4dc3-94c3-336427f89723')), (u'_guid_9f4ph5gztln9ilwngzwkpmcct4n3je6-93im_130f-c=', (u'f', u'koqqbzhu|krdt5gc4')), (u'_guid_nple_f-zoohnyixjsgxwovryc1u4bnfxkow3p0mdufy=', (u'f', u'kh3tizr1|khs0trsh|k3gebqb_|kbrvncdx|jg2udy8m|529816a3-ee43-4423-961f-8aedaf25d58c')), (u'331d8410d4924e72b0f0585e888c85ce', (u'f', u'1f37807a-cbea-4b78-85d7-5a97b37b539e'))] [(u'28b195c271f14a329235c262e7baecbf', (u'm', u'50c41480-a94e-4afa-a732-b6ed7a057239'))] [(u'c65ac2064bc14116a363125392dcc6f7', (u'f', u'77e4b9b3-83b4-4553-b274-7a16f553cf05')), (u'171f92200d634d62bdc6685bdb7a94e3', (u'f', u'bdf53cb6-695d-4dde-b0c1-d1a34ebea6f7|a09e4074-c22e-48a1-9976-ee2151b5888c|k1umlb5m|639b02b4-24ad-4069-99a2-c68e8c8f7f06|kje3wxir')), (u'_guid_wqzizefxcix9cihupewof2euoic0jiosxvxn98_zch8=', (u'f', u'f0992237-2598-4b13-aa8a-c37d436b901c|c80d1a89-dd84-4734-838f-128f99ebdd20|kthpuvu0')), (u'_guid_ufocko48drwr50yjn26nrix5mlyonwmalxwcmly7oqq=', (u'f', u'kly10yxx|kycvx_km'))]
my objectif add kind of new "column" every rdd in list. row contains unique index every rdd. code :
for i, rdd in enumerate(repartionned_rdd): new_rdd = rdd.map(lambda x : x + (float(i), )) print new_rdd.collect()
which gives :
[(u'_guid_ncw7sufnch_mfw3si3qtvbcbqxkd4mtsdjvwe7hngng=', (u'f', u'ksjakod2|ktc9zf9h'), 0.0)] [(u'_guid_ocs2au-sknxzpe0urpdp4hg1vvhgpzraayjnwrqpkbw=', (u'f', u'kxrylzua|kpsxjwh2'), 1.0), (u'_guid_txh15ulaeudbc4z_nleoj2xoybfa-08imqiblfyskps=', (u'f', u'bda54c71-cd1e-4eb7-856c-ba2e6def30c8|6e189e07-807e-41a2-a60a-b07d894a2905'), 1.0), (u'_guid_ehct6nyd9l3q3nv9zroawveo3bndt4tvbu_fmbren1g=', (u'm', u'537d69b4-743a-45b9-bed1-a25aa5926f13|2bb3e466-edc5-4302-b102-3bddb1f8c490|aa4760de-104c-4dc3-94c3-336427f89723'), 1.0), (u'_guid_9f4ph5gztln9ilwngzwkpmcct4n3je6-93im_130f-c=', (u'f', u'koqqbzhu|krdt5gc4'), 1.0), (u'_guid_nple_f-zoohnyixjsgxwovryc1u4bnfxkow3p0mdufy=', (u'f', u'kh3tizr1|khs0trsh|k3gebqb_|kbrvncdx|jg2udy8m|529816a3-ee43-4423-961f-8aedaf25d58c'), 1.0), (u'331d8410d4924e72b0f0585e888c85ce', (u'f', u'1f37807a-cbea-4b78-85d7-5a97b37b539e'), 1.0)] [(u'28b195c271f14a329235c262e7baecbf', (u'm', u'50c41480-a94e-4afa-a732-b6ed7a057239'), 2.0)] [(u'c65ac2064bc14116a363125392dcc6f7', (u'f', u'77e4b9b3-83b4-4553-b274-7a16f553cf05'), 3.0), (u'171f92200d634d62bdc6685bdb7a94e3', (u'f', u'bdf53cb6-695d-4dde-b0c1-d1a34ebea6f7|a09e4074-c22e-48a1-9976-ee2151b5888c|k1umlb5m|639b02b4-24ad-4069-99a2-c68e8c8f7f06|kje3wxir'), 3.0), (u'_guid_wqzizefxcix9cihupewof2euoic0jiosxvxn98_zch8=', (u'f', u'f0992237-2598-4b13-aa8a-c37d436b901c|c80d1a89-dd84-4734-838f-128f99ebdd20|kthpuvu0'), 3.0), (u'_guid_ufocko48drwr50yjn26nrix5mlyonwmalxwcmly7oqq=', (u'f', u'kly10yxx|kycvx_km'), 3.0)]
so every row in new_rdd contains new columns, concretely rdd's index (as mentioned in code!)
my objectif put new rdds in 1 unique rdd. tried :
all_rdds_list =[] i, rdd in enumerate(repartionned_rdd): new_rdd = rdd.map(lambda x : x + (float(i), )) all_rdds_list.append(new_rdd)
but when tried display rdds, got :
for x in all_rdds_list: print x.collect()
result :
[(u'_guid_ncw7sufnch_mfw3si3qtvbcbqxkd4mtsdjvwe7hngng=', (u'f', u'ksjakod2|ktc9zf9h'), 3.0)] [(u'_guid_ocs2au-sknxzpe0urpdp4hg1vvhgpzraayjnwrqpkbw=', (u'f', u'kxrylzua|kpsxjwh2'), 3.0), (u'_guid_txh15ulaeudbc4z_nleoj2xoybfa-08imqiblfyskps=', (u'f', u'bda54c71-cd1e-4eb7-856c-ba2e6def30c8|6e189e07-807e-41a2-a60a-b07d894a2905'), 3.0), (u'_guid_ehct6nyd9l3q3nv9zroawveo3bndt4tvbu_fmbren1g=', (u'm', u'537d69b4-743a-45b9-bed1-a25aa5926f13|2bb3e466-edc5-4302-b102-3bddb1f8c490|aa4760de-104c-4dc3-94c3-336427f89723'), 3.0), (u'_guid_9f4ph5gztln9ilwngzwkpmcct4n3je6-93im_130f-c=', (u'f', u'koqqbzhu|krdt5gc4'), 3.0), (u'_guid_nple_f-zoohnyixjsgxwovryc1u4bnfxkow3p0mdufy=', (u'f', u'kh3tizr1|khs0trsh|k3gebqb_|kbrvncdx|jg2udy8m|529816a3-ee43-4423-961f-8aedaf25d58c'), 3.0), (u'331d8410d4924e72b0f0585e888c85ce', (u'f', u'1f37807a-cbea-4b78-85d7-5a97b37b539e'), 3.0)] [(u'28b195c271f14a329235c262e7baecbf', (u'm', u'50c41480-a94e-4afa-a732-b6ed7a057239'), 3.0)] [(u'c65ac2064bc14116a363125392dcc6f7', (u'f', u'77e4b9b3-83b4-4553-b274-7a16f553cf05'), 3.0), (u'171f92200d634d62bdc6685bdb7a94e3', (u'f', u'bdf53cb6-695d-4dde-b0c1-d1a34ebea6f7|a09e4074-c22e-48a1-9976-ee2151b5888c|k1umlb5m|639b02b4-24ad-4069-99a2-c68e8c8f7f06|kje3wxir'), 3.0), (u'_guid_wqzizefxcix9cihupewof2euoic0jiosxvxn98_zch8=', (u'f', u'f0992237-2598-4b13-aa8a-c37d436b901c|c80d1a89-dd84-4734-838f-128f99ebdd20|kthpuvu0'), 3.0), (u'_guid_ufocko48drwr50yjn26nrix5mlyonwmalxwcmly7oqq=', (u'f', u'kly10yxx|kycvx_km'), 3.0)]
help ? thx !
you have 2 problems in approach. first use variable changes before method assignment evaluated. map calls transformations , execute when apply action (like collect
). why when collect inside enumerate loop see correct additional column, in later example chooses last value of i
each mapping.
the second issue if you're trying union rdds should use union
function rather list of rdds. if wanted list of rdds, can replace union below list append had before.
full_rdd = none i, rdd in enumerate(repartionned_rdd): new_rdd = rdd.map(lambda x : x + (float(i),)) if full_rdd none: full_rdd = new_rdd else: full_rdd = sc.union([full_rdd, new_rdd]) # force lazy evaluation execute before `i` changes full_rdd.count()
Comments
Post a Comment