cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

StreamingQueryException: Read timed out // Reading from delta share'd dataset

charlieyou
New Contributor

I have a workspace in GCP that's reading from a delta-shared dataset hosted in S3. When trying to run a very basic DLT pipeline, I'm getting the below error. Any help would be awesome!

Code:

import dlt
 
 
@dlt.table
def fn():
    return (spark.readStream
                .format("deltaSharing")
                .table("table_name"))

Logs:

org.apache.spark.sql.streaming.StreamingQueryException: [STREAM_FAILED] Query [id = 20325fa3-ee40-465d-9063-95ebb74a8b6f, runId = bcf9ac55-adb2-4d94-ad60-31bac6585e1e] terminated with exception: Read timed out
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:395)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$2(StreamExecution.scala:257)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:41)
at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:99)
at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:104)
at scala.util.Using$.resource(Using.scala:269)
at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:103)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:257)
java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(SocketInputStream.java:-2)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:476)
at sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:470)
at sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket(SSLSocketInputRecord.java:70)
at sun.security.ssl.SSLSocketImpl.readApplicationRecord(SSLSocketImpl.java:1369)
at sun.security.ssl.SSLSocketImpl.access$300(SSLSocketImpl.java:73)
at sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:978)
at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137)
at org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:153)
at org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:280)
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:138)
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56)
at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:259)
at org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:163)
at org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:157)
at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:273)
at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:125)
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272)
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:72)
at io.delta.sharing.spark.DeltaSharingRestClient.$anonfun$getResponse$1(DeltaSharingClient.scala:406)
at io.delta.sharing.spark.util.RetryUtils$.runWithExponentialBackoff(RetryUtils.scala:36)
at io.delta.sharing.spark.DeltaSharingRestClient.getResponse(DeltaSharingClient.scala:401)
at io.delta.sharing.spark.DeltaSharingRestClient.getNDJson(DeltaSharingClient.scala:342)
at io.delta.sharing.spark.DeltaSharingRestClient.getFiles(DeltaSharingClient.scala:251)
at io.delta.sharing.spark.DeltaSharingSource.getTableFileChanges(DeltaSharingSource.scala:274)
at io.delta.sharing.spark.DeltaSharingSource.maybeGetFileChanges(DeltaSharingSource.scala:211)
at io.delta.sharing.spark.DeltaSharingSource.createDataFrameFromOffset(DeltaSharingSource.scala:463)
at io.delta.sharing.spark.DeltaSharingSource.getBatch(DeltaSharingSource.scala:741)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$3(MicroBatchExecution.scala:730)
at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:27)
at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:27)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$2(MicroBatchExecution.scala:726)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:335)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:333)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:77)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:726)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStreamWithListener$5(MicroBatchExecution.scala:375)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withSchemaEvolution(MicroBatchExecution.scala:1024)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStreamWithListener$2(MicroBatchExecution.scala:372)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:335)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:333)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:77)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStreamWithListener$1(MicroBatchExecution.scala:330)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:68)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStreamWithListener(MicroBatchExecution.scala:320)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:308)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:368)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1038)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:332)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$2(StreamExecution.scala:257)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:41)
at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:99)
at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:104)
at scala.util.Using$.resource(Using.scala:269)
at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:103)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:257)

1 REPLY 1

Anonymous
Not applicable

@Charlie You​ :

The error message you're encountering suggests a timeout issue when reading from the Delta-shared dataset hosted in S3. There are a few potential reasons and solutions you can explore:

  1. Network connectivity: Verify that the network connectivity between your GCP workspace and the S3 bucket hosting the Delta-shared dataset is stable
  2. Access credentials: Confirm that the GCP workspace has the necessary access credentials or permissions to read from the S3 bucket
  3. Dataset size and performance: If the Delta-shared dataset is large or contains a significant amount of data, the read operation may take longer and potentially exceed the default timeout settings
  4. Spark configuration: Review the Spark configuration in your GCP workspace and ensure it is optimized for handling the size and complexity of the Delta-shared dataset
Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.