Spark join hangs -


i have table n columns i'll call a. in table there 3 columns i'll need:

  • vat -> string
  • tax -> string
  • card -> string

vat or tax can null, not @ same time. every unique couple of vat , tax there @ least 1 card.

i need alter table, adding column count_card in put text based on number of cards every unique combination of tax , vat has.

so i've done this:

val cardcount = a.groupby("tax", "vat").count  val sqlcard = udf((count: int) => {   if (count > 1)     "multi"   else     "mono" })  val b = cardcount.withcolumn(           "card_count",            sqlcard(cardcount.col("count"))         ).drop("count") 

in table b have 3 columns now:

  • vat -> string
  • tax -> string
  • card_count -> int

and every operation on dataframe smooth.

now, because wanted import new column in a table, performed following join:

val result = a.join(b,         b.col("tax")<=>a.col("tax") ,         b.col("vat")<=>a.col("vat")      ).drop(b.col("tax"))       .drop(b.col("vat")) 

expecting have original table column card_count.

problem join hangs, getting system resources blocking pc.

additional details:

  • table a has ~1.5m elements , read parquet file;
  • table b has ~1.3m elements.
  • system 8 thread , 30gb of ram

let me know i'm doing wrong

at end, didn't found out issue, changed approach

val cardcount = a.groupby("tax", "vat").count  val cardcountset = cardcount.filter(cardcount.col("count") > 1)   .rdd.map(r => r(0) + " " + r(1)).collect().toset  val udfcardcount = udf((tax: string, vat:string) => {   if (cardcountset.contains(tax + " " + vat))     "multi"   else     "mono" })  val result = a.withcolumn("card_count",             udfcardcount(a.col("tax"), a.col("vat"))) 

if knows better approach let me know it


Comments

Popular posts from this blog

asynchronous - C# WinSCP .NET assembly: How to upload multiple files asynchronously -

aws api gateway - SerializationException in posting new Records via Dynamodb Proxy Service in API -

asp.net - Problems sending emails from forum -