cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

StreamingQueryException: Read timed out // Reading from delta share'd dataset

charlieyou
New Contributor

I have a workspace in GCP that's reading from a delta-shared dataset hosted in S3. When trying to run a very basic DLT pipeline, I'm getting the below error. Any help would be awesome!

Code:

import dlt
 
 
@dlt.table
def fn():
    return (spark.readStream
                .format("deltaSharing")
                .table("table_name"))

Logs:

org.apache.spark.sql.streaming.StreamingQueryException: [STREAM_FAILED] Query [id = 20325fa3-ee40-465d-9063-95ebb74a8b6f, runId = bcf9ac55-adb2-4d94-ad60-31bac6585e1e] terminated with exception: Read timed out
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:395)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$2(StreamExecution.scala:257)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:41)
at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:99)
at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:104)
at scala.util.Using$.resource(Using.scala:269)
at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:103)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:257)
java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(SocketInputStream.java:-2)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:476)
at sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:470)
at sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket(SSLSocketInputRecord.java:70)
at sun.security.ssl.SSLSocketImpl.readApplicationRecord(SSLSocketImpl.java:1369)
at sun.security.ssl.SSLSocketImpl.access$300(SSLSocketImpl.java:73)
at sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:978)
at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137)
at org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:153)
at org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:280)
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:138)
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56)
at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:259)
at org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:163)
at org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:157)
at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:273)
at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:125)
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272)
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:72)
at io.delta.sharing.spark.DeltaSharingRestClient.$anonfun$getResponse$1(DeltaSharingClient.scala:406)
at io.delta.sharing.spark.util.RetryUtils$.runWithExponentialBackoff(RetryUtils.scala:36)
at io.delta.sharing.spark.DeltaSharingRestClient.getResponse(DeltaSharingClient.scala:401)
at io.delta.sharing.spark.DeltaSharingRestClient.getNDJson(DeltaSharingClient.scala:342)
at io.delta.sharing.spark.DeltaSharingRestClient.getFiles(DeltaSharingClient.scala:251)
at io.delta.sharing.spark.DeltaSharingSource.getTableFileChanges(DeltaSharingSource.scala:274)
at io.delta.sharing.spark.DeltaSharingSource.maybeGetFileChanges(DeltaSharingSource.scala:211)
at io.delta.sharing.spark.DeltaSharingSource.createDataFrameFromOffset(DeltaSharingSource.scala:463)
at io.delta.sharing.spark.DeltaSharingSource.getBatch(DeltaSharingSource.scala:741)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$3(MicroBatchExecution.scala:730)
at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:27)
at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:27)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$2(MicroBatchExecution.scala:726)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:335)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:333)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:77)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:726)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStreamWithListener$5(MicroBatchExecution.scala:375)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withSchemaEvolution(MicroBatchExecution.scala:1024)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStreamWithListener$2(MicroBatchExecution.scala:372)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:335)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:333)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:77)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStreamWithListener$1(MicroBatchExecution.scala:330)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:68)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStreamWithListener(MicroBatchExecution.scala:320)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:308)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:368)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1038)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:332)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$2(StreamExecution.scala:257)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:41)
at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:99)
at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:104)
at scala.util.Using$.resource(Using.scala:269)
at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:103)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:257)

1 REPLY 1

Anonymous
Not applicable

@Charlie You​ :

The error message you're encountering suggests a timeout issue when reading from the Delta-shared dataset hosted in S3. There are a few potential reasons and solutions you can explore:

  1. Network connectivity: Verify that the network connectivity between your GCP workspace and the S3 bucket hosting the Delta-shared dataset is stable
  2. Access credentials: Confirm that the GCP workspace has the necessary access credentials or permissions to read from the S3 bucket
  3. Dataset size and performance: If the Delta-shared dataset is large or contains a significant amount of data, the read operation may take longer and potentially exceed the default timeout settings
  4. Spark configuration: Review the Spark configuration in your GCP workspace and ensure it is optimized for handling the size and complexity of the Delta-shared dataset

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group