<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: How can we sort the timeout issue in Databricks in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/how-can-we-sort-the-timeout-issue-in-databricks/m-p/19611#M13167</link>
    <description>&lt;P&gt;Still the notebook is running for 3 hours. below is the code that is running continously and also attached the error image&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper" image-alt="error"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/1844i835C91648B2A1105/image-size/large?v=v2&amp;amp;px=999" role="button" title="error" alt="error" /&gt;&lt;/span&gt;&amp;nbsp;&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;import json
i=0
df=spark.sql(f"""select distinct batch_id from sap_ingest_{env}.i2726_batch_control_delta_ns_tbl where raw_status='Running' and year*10000+month*100+day between year(date_sub(now(),{dayn}))*10000+month(date_sub(now(),{dayn}))*100+day(date_sub(now(),{dayn})) AND 
            (year(now())*10000 + month(now())*100+ day(now())) """)
my_list=[int(row.batch_id) for row in df.collect()]
&amp;nbsp;
while i &amp;lt; len(my_list):
  #df_check = spark.sql(f'''select * from editrack_ingest_{env}.i1001_batch_control_ns_tbl where batch_id = {my_list[i]} ''')
  #df_check.show()
  #print(my_list[i])
  #i=i+1
  df_check = spark.sql(f'''select count(*) from sap_ingest_{env}.i2726_po_sto_ns_tbl where batch_id = {my_list[i]} and year*10000+month*100+day between year(date_sub(now(),{dayn}))*10000+month(date_sub(now(),{dayn}))*100+day(date_sub(now(),{dayn})) AND 
            (year(now())*10000 + month(now())*100+ day(now()))  ''')
  batchid_check=df_check.take(1)[0][0]
  print(batchid_check)
&amp;nbsp;
  if batchid_check==0:
    spark.sql(f''' insert into table sap_ingest_{env}.i2726_po_sto_ns_tbl
partition (year,month,day)
select
y.batchRecordCount,
y.correlationId,
y.createdTimestamp,
y.envelopeVersion,
y.interfaceId ,
y.sequenceId,
y.interfaceName ,
y.messageId,
y.etag,
y.payloadKey,
y.payloadName,
y.payloadVersion,
y.securityClassification,
y.sourceApplicationName,
c.businessUnit,
c.countryOfOrigin ,
c.fobDate,
c.forwardingAgentDesc,
c.forwardingAgentId,
c.forwardingAgentDelPt,
c.incoTerms1,
c.incoTerms2 ,
c.initialOrderDate ,
c.initialOrderRef ,
c.invoicingPartyId,
c.orderingPartyId,
c.rtmPartyId,
c.reprocessorPartyId,
m.boxHangingInd,
m.changeIndicator,
m.deliveryTime,
d.amount ,
d.currency ,
d.type ,
p.custOrderNumber,
p.outloadDate,
p.collection_date,
p.celebration_date,
p.cust_message,
m.legacyProductId ,
m.lineItemId ,
m.netweight ,
m.orderUnit ,
m.productId ,
m.quantityPerOrderUnit ,
m.recStorageLocation ,
m.season,
m.totalQuantity ,
m.ultRecSapCode ,
m.ultimateReceivingLocationId ,
m.unitsOfWeight ,
m.contractID,
m.contractLineItem,
m.dispatchPriority,
m.updateReason,
m.knittedWoven, 
m.year,
c.msgType ,
c.orderNumber ,
c.orderTypeSAP,
c.orderType ,
c.portOfLoading,
c.purchDept ,
c.purchaseOrderType ,
c.recSapCode ,
c.receivingLocationId ,
c.receivingLocationType ,
c.requestedDeliveryDate ,
c.sendSapCode,
c.sendingLocationId ,
c.sendingLocationType ,
c.shippingMethod ,
c.supplierFactoryDesc ,
c.supplierFactoryId ,
c.timestamp,
k.amount,
k.currency,
k.type,
x.load_timestamp,
x.batch_id ,
x.year ,
x.month ,
x.day 
from sap_ingest_{env}.i2726_complex_ns_tbl as x 
lateral view outer explode ((array(x.header))) explode_product as y
lateral view outer explode ((array(x.payload))) explode_product as c
lateral view outer explode(((c.totalPOValue))) explode_product as k
lateral view outer explode ((c.lineItems)) explode_product as po
lateral view outer explode ((po.itemValue)) explode_product as d
lateral view outer explode((x.payload.lineItems)) explode_product as m
lateral view outer explode((po.customer_order)) explode_product as p
&amp;nbsp;
where x.batch_id = "{my_list[i]}"
and x.year*10000+x.month*100+x.day between year(date_sub(now(),{dayn}))*10000+month(date_sub(now(),{dayn}))*100+day(date_sub(now(),{dayn})) AND (year(now())*10000 + month(now())*100+ day(now()))
&amp;nbsp;
''')    
  else:
    print('Data already loaded for this batch')
    print(my_list[i])
    i=i+1
&amp;nbsp;
    &lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;&lt;/P&gt;</description>
    <pubDate>Thu, 26 May 2022 10:56:59 GMT</pubDate>
    <dc:creator>Ravi96</dc:creator>
    <dc:date>2022-05-26T10:56:59Z</dc:date>
    <item>
      <title>How can we sort the timeout issue in Databricks</title>
      <link>https://community.databricks.com/t5/data-engineering/how-can-we-sort-the-timeout-issue-in-databricks/m-p/19609#M13165</link>
      <description>&lt;P&gt;we are creating a denorm table based on a JSON ingestion but the complex table is getting generated .when we try to deflatten the JSON rows it is taking for more than 5 hours and the error message is timeout error&lt;/P&gt;&lt;P&gt;is there any way that we could resolve this issue... Kindly help!!&lt;/P&gt;&lt;P&gt;Thanks&lt;/P&gt;</description>
      <pubDate>Thu, 26 May 2022 04:55:07 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-can-we-sort-the-timeout-issue-in-databricks/m-p/19609#M13165</guid>
      <dc:creator>Ravi96</dc:creator>
      <dc:date>2022-05-26T04:55:07Z</dc:date>
    </item>
    <item>
      <title>Re: How can we sort the timeout issue in Databricks</title>
      <link>https://community.databricks.com/t5/data-engineering/how-can-we-sort-the-timeout-issue-in-databricks/m-p/19610#M13166</link>
      <description>&lt;P&gt;did you enable the multiline option while reading the json, because that could be the cause?&lt;/P&gt;&lt;P&gt;See also &lt;A href="https://community.databricks.com/s/question/0D53f00001Q0Rq9CAF/bufferholder-exceeded-on-json-flattening" alt="https://community.databricks.com/s/question/0D53f00001Q0Rq9CAF/bufferholder-exceeded-on-json-flattening" target="_blank"&gt;here&lt;/A&gt;.&lt;/P&gt;&lt;P&gt;If you can, try with single-line format.  Because then you can really leverage the parallel processing power of spark.&lt;/P&gt;</description>
      <pubDate>Thu, 26 May 2022 05:59:27 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-can-we-sort-the-timeout-issue-in-databricks/m-p/19610#M13166</guid>
      <dc:creator>-werners-</dc:creator>
      <dc:date>2022-05-26T05:59:27Z</dc:date>
    </item>
    <item>
      <title>Re: How can we sort the timeout issue in Databricks</title>
      <link>https://community.databricks.com/t5/data-engineering/how-can-we-sort-the-timeout-issue-in-databricks/m-p/19611#M13167</link>
      <description>&lt;P&gt;Still the notebook is running for 3 hours. below is the code that is running continously and also attached the error image&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper" image-alt="error"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/1844i835C91648B2A1105/image-size/large?v=v2&amp;amp;px=999" role="button" title="error" alt="error" /&gt;&lt;/span&gt;&amp;nbsp;&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;import json
i=0
df=spark.sql(f"""select distinct batch_id from sap_ingest_{env}.i2726_batch_control_delta_ns_tbl where raw_status='Running' and year*10000+month*100+day between year(date_sub(now(),{dayn}))*10000+month(date_sub(now(),{dayn}))*100+day(date_sub(now(),{dayn})) AND 
            (year(now())*10000 + month(now())*100+ day(now())) """)
my_list=[int(row.batch_id) for row in df.collect()]
&amp;nbsp;
while i &amp;lt; len(my_list):
  #df_check = spark.sql(f'''select * from editrack_ingest_{env}.i1001_batch_control_ns_tbl where batch_id = {my_list[i]} ''')
  #df_check.show()
  #print(my_list[i])
  #i=i+1
  df_check = spark.sql(f'''select count(*) from sap_ingest_{env}.i2726_po_sto_ns_tbl where batch_id = {my_list[i]} and year*10000+month*100+day between year(date_sub(now(),{dayn}))*10000+month(date_sub(now(),{dayn}))*100+day(date_sub(now(),{dayn})) AND 
            (year(now())*10000 + month(now())*100+ day(now()))  ''')
  batchid_check=df_check.take(1)[0][0]
  print(batchid_check)
&amp;nbsp;
  if batchid_check==0:
    spark.sql(f''' insert into table sap_ingest_{env}.i2726_po_sto_ns_tbl
partition (year,month,day)
select
y.batchRecordCount,
y.correlationId,
y.createdTimestamp,
y.envelopeVersion,
y.interfaceId ,
y.sequenceId,
y.interfaceName ,
y.messageId,
y.etag,
y.payloadKey,
y.payloadName,
y.payloadVersion,
y.securityClassification,
y.sourceApplicationName,
c.businessUnit,
c.countryOfOrigin ,
c.fobDate,
c.forwardingAgentDesc,
c.forwardingAgentId,
c.forwardingAgentDelPt,
c.incoTerms1,
c.incoTerms2 ,
c.initialOrderDate ,
c.initialOrderRef ,
c.invoicingPartyId,
c.orderingPartyId,
c.rtmPartyId,
c.reprocessorPartyId,
m.boxHangingInd,
m.changeIndicator,
m.deliveryTime,
d.amount ,
d.currency ,
d.type ,
p.custOrderNumber,
p.outloadDate,
p.collection_date,
p.celebration_date,
p.cust_message,
m.legacyProductId ,
m.lineItemId ,
m.netweight ,
m.orderUnit ,
m.productId ,
m.quantityPerOrderUnit ,
m.recStorageLocation ,
m.season,
m.totalQuantity ,
m.ultRecSapCode ,
m.ultimateReceivingLocationId ,
m.unitsOfWeight ,
m.contractID,
m.contractLineItem,
m.dispatchPriority,
m.updateReason,
m.knittedWoven, 
m.year,
c.msgType ,
c.orderNumber ,
c.orderTypeSAP,
c.orderType ,
c.portOfLoading,
c.purchDept ,
c.purchaseOrderType ,
c.recSapCode ,
c.receivingLocationId ,
c.receivingLocationType ,
c.requestedDeliveryDate ,
c.sendSapCode,
c.sendingLocationId ,
c.sendingLocationType ,
c.shippingMethod ,
c.supplierFactoryDesc ,
c.supplierFactoryId ,
c.timestamp,
k.amount,
k.currency,
k.type,
x.load_timestamp,
x.batch_id ,
x.year ,
x.month ,
x.day 
from sap_ingest_{env}.i2726_complex_ns_tbl as x 
lateral view outer explode ((array(x.header))) explode_product as y
lateral view outer explode ((array(x.payload))) explode_product as c
lateral view outer explode(((c.totalPOValue))) explode_product as k
lateral view outer explode ((c.lineItems)) explode_product as po
lateral view outer explode ((po.itemValue)) explode_product as d
lateral view outer explode((x.payload.lineItems)) explode_product as m
lateral view outer explode((po.customer_order)) explode_product as p
&amp;nbsp;
where x.batch_id = "{my_list[i]}"
and x.year*10000+x.month*100+x.day between year(date_sub(now(),{dayn}))*10000+month(date_sub(now(),{dayn}))*100+day(date_sub(now(),{dayn})) AND (year(now())*10000 + month(now())*100+ day(now()))
&amp;nbsp;
''')    
  else:
    print('Data already loaded for this batch')
    print(my_list[i])
    i=i+1
&amp;nbsp;
    &lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Thu, 26 May 2022 10:56:59 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-can-we-sort-the-timeout-issue-in-databricks/m-p/19611#M13167</guid>
      <dc:creator>Ravi96</dc:creator>
      <dc:date>2022-05-26T10:56:59Z</dc:date>
    </item>
    <item>
      <title>Re: How can we sort the timeout issue in Databricks</title>
      <link>https://community.databricks.com/t5/data-engineering/how-can-we-sort-the-timeout-issue-in-databricks/m-p/19612#M13168</link>
      <description>&lt;P&gt;Why do you use a loop for executing the sql queries?&lt;/P&gt;&lt;P&gt;Spark will handle the parallel processing.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Mon, 30 May 2022 08:10:11 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-can-we-sort-the-timeout-issue-in-databricks/m-p/19612#M13168</guid>
      <dc:creator>-werners-</dc:creator>
      <dc:date>2022-05-30T08:10:11Z</dc:date>
    </item>
    <item>
      <title>Re: How can we sort the timeout issue in Databricks</title>
      <link>https://community.databricks.com/t5/data-engineering/how-can-we-sort-the-timeout-issue-in-databricks/m-p/19613#M13169</link>
      <description>&lt;P&gt;Hey @Raviteja Paluri​&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Hope all is well! &lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Just wanted to check in if you were able to resolve your issue and would you be happy to share the solution or mark an answer as best? Else please let us know if you need more help.&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Thanks!&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Mon, 25 Jul 2022 16:33:39 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-can-we-sort-the-timeout-issue-in-databricks/m-p/19613#M13169</guid>
      <dc:creator>Anonymous</dc:creator>
      <dc:date>2022-07-25T16:33:39Z</dc:date>
    </item>
  </channel>
</rss>

