cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

How can we sort the timeout issue in Databricks

Ravi96
New Contributor II

we are creating a denorm table based on a JSON ingestion but the complex table is getting generated .when we try to deflatten the JSON rows it is taking for more than 5 hours and the error message is timeout error

is there any way that we could resolve this issue... Kindly help!!

Thanks

4 REPLIES 4

-werners-
Esteemed Contributor III

did you enable the multiline option while reading the json, because that could be the cause?

See also here.

If you can, try with single-line format. Because then you can really leverage the parallel processing power of spark.

Ravi96
New Contributor II

Still the notebook is running for 3 hours. below is the code that is running continously and also attached the error image

error 

import json
i=0
df=spark.sql(f"""select distinct batch_id from sap_ingest_{env}.i2726_batch_control_delta_ns_tbl where raw_status='Running' and year*10000+month*100+day between year(date_sub(now(),{dayn}))*10000+month(date_sub(now(),{dayn}))*100+day(date_sub(now(),{dayn})) AND 
            (year(now())*10000 + month(now())*100+ day(now())) """)
my_list=[int(row.batch_id) for row in df.collect()]
 
while i < len(my_list):
  #df_check = spark.sql(f'''select * from editrack_ingest_{env}.i1001_batch_control_ns_tbl where batch_id = {my_list[i]} ''')
  #df_check.show()
  #print(my_list[i])
  #i=i+1
  df_check = spark.sql(f'''select count(*) from sap_ingest_{env}.i2726_po_sto_ns_tbl where batch_id = {my_list[i]} and year*10000+month*100+day between year(date_sub(now(),{dayn}))*10000+month(date_sub(now(),{dayn}))*100+day(date_sub(now(),{dayn})) AND 
            (year(now())*10000 + month(now())*100+ day(now()))  ''')
  batchid_check=df_check.take(1)[0][0]
  print(batchid_check)
 
  if batchid_check==0:
    spark.sql(f''' insert into table sap_ingest_{env}.i2726_po_sto_ns_tbl
partition (year,month,day)
select
y.batchRecordCount,
y.correlationId,
y.createdTimestamp,
y.envelopeVersion,
y.interfaceId ,
y.sequenceId,
y.interfaceName ,
y.messageId,
y.etag,
y.payloadKey,
y.payloadName,
y.payloadVersion,
y.securityClassification,
y.sourceApplicationName,
c.businessUnit,
c.countryOfOrigin ,
c.fobDate,
c.forwardingAgentDesc,
c.forwardingAgentId,
c.forwardingAgentDelPt,
c.incoTerms1,
c.incoTerms2 ,
c.initialOrderDate ,
c.initialOrderRef ,
c.invoicingPartyId,
c.orderingPartyId,
c.rtmPartyId,
c.reprocessorPartyId,
m.boxHangingInd,
m.changeIndicator,
m.deliveryTime,
d.amount ,
d.currency ,
d.type ,
p.custOrderNumber,
p.outloadDate,
p.collection_date,
p.celebration_date,
p.cust_message,
m.legacyProductId ,
m.lineItemId ,
m.netweight ,
m.orderUnit ,
m.productId ,
m.quantityPerOrderUnit ,
m.recStorageLocation ,
m.season,
m.totalQuantity ,
m.ultRecSapCode ,
m.ultimateReceivingLocationId ,
m.unitsOfWeight ,
m.contractID,
m.contractLineItem,
m.dispatchPriority,
m.updateReason,
m.knittedWoven, 
m.year,
c.msgType ,
c.orderNumber ,
c.orderTypeSAP,
c.orderType ,
c.portOfLoading,
c.purchDept ,
c.purchaseOrderType ,
c.recSapCode ,
c.receivingLocationId ,
c.receivingLocationType ,
c.requestedDeliveryDate ,
c.sendSapCode,
c.sendingLocationId ,
c.sendingLocationType ,
c.shippingMethod ,
c.supplierFactoryDesc ,
c.supplierFactoryId ,
c.timestamp,
k.amount,
k.currency,
k.type,
x.load_timestamp,
x.batch_id ,
x.year ,
x.month ,
x.day 
from sap_ingest_{env}.i2726_complex_ns_tbl as x 
lateral view outer explode ((array(x.header))) explode_product as y
lateral view outer explode ((array(x.payload))) explode_product as c
lateral view outer explode(((c.totalPOValue))) explode_product as k
lateral view outer explode ((c.lineItems)) explode_product as po
lateral view outer explode ((po.itemValue)) explode_product as d
lateral view outer explode((x.payload.lineItems)) explode_product as m
lateral view outer explode((po.customer_order)) explode_product as p
 
where x.batch_id = "{my_list[i]}"
and x.year*10000+x.month*100+x.day between year(date_sub(now(),{dayn}))*10000+month(date_sub(now(),{dayn}))*100+day(date_sub(now(),{dayn})) AND (year(now())*10000 + month(now())*100+ day(now()))
 
''')    
  else:
    print('Data already loaded for this batch')
    print(my_list[i])
    i=i+1
 
    

-werners-
Esteemed Contributor III

Why do you use a loop for executing the sql queries?

Spark will handle the parallel processing.

Anonymous
Not applicable

Hey @Raviteja Paluriโ€‹ 

Hope all is well!

Just wanted to check in if you were able to resolve your issue and would you be happy to share the solution or mark an answer as best? Else please let us know if you need more help. 

Thanks!

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!