cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

How can we sort the timeout issue in Databricks

Ravi96
New Contributor II

we are creating a denorm table based on a JSON ingestion but the complex table is getting generated .when we try to deflatten the JSON rows it is taking for more than 5 hours and the error message is timeout error

is there any way that we could resolve this issue... Kindly help!!

Thanks

4 REPLIES 4

-werners-
Esteemed Contributor III

did you enable the multiline option while reading the json, because that could be the cause?

See also here.

If you can, try with single-line format. Because then you can really leverage the parallel processing power of spark.

Ravi96
New Contributor II

Still the notebook is running for 3 hours. below is the code that is running continously and also attached the error image

error 

import json
i=0
df=spark.sql(f"""select distinct batch_id from sap_ingest_{env}.i2726_batch_control_delta_ns_tbl where raw_status='Running' and year*10000+month*100+day between year(date_sub(now(),{dayn}))*10000+month(date_sub(now(),{dayn}))*100+day(date_sub(now(),{dayn})) AND 
            (year(now())*10000 + month(now())*100+ day(now())) """)
my_list=[int(row.batch_id) for row in df.collect()]
 
while i < len(my_list):
  #df_check = spark.sql(f'''select * from editrack_ingest_{env}.i1001_batch_control_ns_tbl where batch_id = {my_list[i]} ''')
  #df_check.show()
  #print(my_list[i])
  #i=i+1
  df_check = spark.sql(f'''select count(*) from sap_ingest_{env}.i2726_po_sto_ns_tbl where batch_id = {my_list[i]} and year*10000+month*100+day between year(date_sub(now(),{dayn}))*10000+month(date_sub(now(),{dayn}))*100+day(date_sub(now(),{dayn})) AND 
            (year(now())*10000 + month(now())*100+ day(now()))  ''')
  batchid_check=df_check.take(1)[0][0]
  print(batchid_check)
 
  if batchid_check==0:
    spark.sql(f''' insert into table sap_ingest_{env}.i2726_po_sto_ns_tbl
partition (year,month,day)
select
y.batchRecordCount,
y.correlationId,
y.createdTimestamp,
y.envelopeVersion,
y.interfaceId ,
y.sequenceId,
y.interfaceName ,
y.messageId,
y.etag,
y.payloadKey,
y.payloadName,
y.payloadVersion,
y.securityClassification,
y.sourceApplicationName,
c.businessUnit,
c.countryOfOrigin ,
c.fobDate,
c.forwardingAgentDesc,
c.forwardingAgentId,
c.forwardingAgentDelPt,
c.incoTerms1,
c.incoTerms2 ,
c.initialOrderDate ,
c.initialOrderRef ,
c.invoicingPartyId,
c.orderingPartyId,
c.rtmPartyId,
c.reprocessorPartyId,
m.boxHangingInd,
m.changeIndicator,
m.deliveryTime,
d.amount ,
d.currency ,
d.type ,
p.custOrderNumber,
p.outloadDate,
p.collection_date,
p.celebration_date,
p.cust_message,
m.legacyProductId ,
m.lineItemId ,
m.netweight ,
m.orderUnit ,
m.productId ,
m.quantityPerOrderUnit ,
m.recStorageLocation ,
m.season,
m.totalQuantity ,
m.ultRecSapCode ,
m.ultimateReceivingLocationId ,
m.unitsOfWeight ,
m.contractID,
m.contractLineItem,
m.dispatchPriority,
m.updateReason,
m.knittedWoven, 
m.year,
c.msgType ,
c.orderNumber ,
c.orderTypeSAP,
c.orderType ,
c.portOfLoading,
c.purchDept ,
c.purchaseOrderType ,
c.recSapCode ,
c.receivingLocationId ,
c.receivingLocationType ,
c.requestedDeliveryDate ,
c.sendSapCode,
c.sendingLocationId ,
c.sendingLocationType ,
c.shippingMethod ,
c.supplierFactoryDesc ,
c.supplierFactoryId ,
c.timestamp,
k.amount,
k.currency,
k.type,
x.load_timestamp,
x.batch_id ,
x.year ,
x.month ,
x.day 
from sap_ingest_{env}.i2726_complex_ns_tbl as x 
lateral view outer explode ((array(x.header))) explode_product as y
lateral view outer explode ((array(x.payload))) explode_product as c
lateral view outer explode(((c.totalPOValue))) explode_product as k
lateral view outer explode ((c.lineItems)) explode_product as po
lateral view outer explode ((po.itemValue)) explode_product as d
lateral view outer explode((x.payload.lineItems)) explode_product as m
lateral view outer explode((po.customer_order)) explode_product as p
 
where x.batch_id = "{my_list[i]}"
and x.year*10000+x.month*100+x.day between year(date_sub(now(),{dayn}))*10000+month(date_sub(now(),{dayn}))*100+day(date_sub(now(),{dayn})) AND (year(now())*10000 + month(now())*100+ day(now()))
 
''')    
  else:
    print('Data already loaded for this batch')
    print(my_list[i])
    i=i+1
 
    

-werners-
Esteemed Contributor III

Why do you use a loop for executing the sql queries?

Spark will handle the parallel processing.

Anonymous
Not applicable

Hey @Raviteja Paluri​ 

Hope all is well!

Just wanted to check in if you were able to resolve your issue and would you be happy to share the solution or mark an answer as best? Else please let us know if you need more help. 

Thanks!

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group