cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

"No module named google.cloud.spark" errors querying BigQuery

gweakliem
New Contributor

Personal Cluster 15.3 ML, Running the following notebook:

import pyspark.sql.functions as F
from datetime import datetime, timedelta

spark.sparkContext.addPyFile("gs://spark-lib/bigquery/spark-bigquery-support-0.26.0.zip")

target_hour = datetime(2024, 7, 24, 15)  # Year, Month, Day, Hour
start_time = target_hour.strftime("%Y-%m-%d %H:%M:%S")
end_time = (target_hour + timedelta(hours=1)).strftime("%Y-%m-%d %H:%M:%S")

df = spark.read.format("bigquery") \
  .option("project", "e24-prod-data") \
  .option("parentProject", "databricks-424220") \
  .option("table", input_table_name) \
  .option("filter", f"date_ingested >= '{start_time}' AND date_ingested < '{end_time}'") \
  .load() \
  .withColumns( {
            "date": F.date_format(F.col("date_ingested"), "yyyy-MM-dd"),
            "hour": F.date_format(F.col("date_ingested"), "HH"),
        }) \
  .load()
display(df)

I get the following error

AssertionError: Undefined error message parameter for error class: CANNOT_PARSE_DATATYPE. Parameters: {'error': "No module named 'google.cloud.spark'"}
File /databricks/spark/python/pyspark/sql/dataframe.py:657, in DataFrame.schema(self)
    655 try:
    656     self._schema = cast(
--> 657         StructType, _parse_datatype_json_string(self._jdf.schema().json())
    658     )
    659 except Exception as e:
File /databricks/spark/python/pyspark/sql/types.py:1817, in _parse_datatype_json_string(json_string)
   1763 """Parses the given data type JSON string.
   1764 
   1765 Examples
   (...)
   1815 >>> check_datatype(complex_maptype)
   1816 """
-> 1817 return _parse_datatype_json_value(json.loads(json_string))
File /databricks/spark/python/pyspark/sql/types.py:1877, in _parse_datatype_json_value(json_value, fieldPath, collationsMap)
   1876         return MapType.fromJson(json_value, fieldPath, collationsMap)
-> 1877     return StructType.fromJson(json_value)
   1878 elif tpe == "udt":
File /databricks/spark/python/pyspark/sql/types.py:1361, in StructType.fromJson(cls, json)
   1270 """
   1271 Constructs :class:`StructType` from a schema defined in JSON format.
   1272 
   (...)
   1359 'struct<Person:struct<name:string,surname:string>>'
   1360 """
-> 1361 return StructType([StructField.fromJson(f) for f in json["fields"]])
File /databricks/spark/python/pyspark/sql/types.py:1361, in <listcomp>(.0)
   1270 """
   1271 Constructs :class:`StructType` from a schema defined in JSON format.
   1272 
   (...)
   1359 'struct<Person:struct<name:string,surname:string>>'
   1360 """
-> 1361 return StructType([StructField.fromJson(f) for f in json["fields"]])
File /databricks/spark/python/pyspark/sql/types.py:954, in StructField.fromJson(cls, json)
    948     metadata = {
    949         key: value for key, value in metadata.items() if key != _COLLATIONS_METADATA_KEY
    950     }
    952 return StructField(
    953     json["name"],
--> 954     _parse_datatype_json_value(json["type"], json["name"], collationsMap),
    955     json.get("nullable", True),
    956     metadata,
    957 )
File /databricks/spark/python/pyspark/sql/types.py:1877, in _parse_datatype_json_value(json_value, fieldPath, collationsMap)
   1876         return MapType.fromJson(json_value, fieldPath, collationsMap)
-> 1877     return StructType.fromJson(json_value)
   1878 elif tpe == "udt":
File /databricks/spark/python/pyspark/sql/types.py:1361, in StructType.fromJson(cls, json)
   1270 """
   1271 Constructs :class:`StructType` from a schema defined in JSON format.
   1272 
   (...)
   1359 'struct<Person:struct<name:string,surname:string>>'
   1360 """
-> 1361 return StructType([StructField.fromJson(f) for f in json["fields"]])
File /databricks/spark/python/pyspark/sql/types.py:1361, in <listcomp>(.0)
   1270 """
   1271 Constructs :class:`StructType` from a schema defined in JSON format.
   1272 
   (...)
   1359 'struct<Person:struct<name:string,surname:string>>'
   1360 """
-> 1361 return StructType([StructField.fromJson(f) for f in json["fields"]])
File /databricks/spark/python/pyspark/sql/types.py:954, in StructField.fromJson(cls, json)
    948     metadata = {
    949         key: value for key, value in metadata.items() if key != _COLLATIONS_METADATA_KEY
    950     }
    952 return StructField(
    953     json["name"],
--> 954     _parse_datatype_json_value(json["type"], json["name"], collationsMap),
    955     json.get("nullable", True),
    956     metadata,
    957 )
File /databricks/spark/python/pyspark/sql/types.py:1879, in _parse_datatype_json_value(json_value, fieldPath, collationsMap)
   1878 elif tpe == "udt":
-> 1879     return UserDefinedType.fromJson(json_value)
   1880 else:
File /databricks/spark/python/pyspark/sql/types.py:1565, in UserDefinedType.fromJson(cls, json)
   1564 pyClass = pyUDT[split + 1 :]
-> 1565 m = __import__(pyModule, globals(), locals(), [pyClass])
   1566 if not hasattr(m, pyClass):
ModuleNotFoundError: No module named 'google.cloud.spark'

During handling of the above exception, another exception occurred:
AssertionError                            Traceback (most recent call last)
File <command-2993807820048799>, line 26
     10 # use with .option("query", query) instead of option table and filter
     11 query = f"""SELECT *
     12 FROM ev24_ep_data_warehouse_v1_prd.metabase_news__2020_2029__common_v1__source_text_common_fields
     13 WHERE date_ingested BETWEEN '{start_time}' '{end_time}'
     14 """
     16 df = spark.read.format("bigquery") \
     17   .option("project", "e24-prod-data") \
     18   .option("parentProject", "databricks-424220") \
     19   .option("table", input_table_name) \
     20   .option("filter", f"date_ingested >= '{start_time}' AND date_ingested < '{end_time}'") \
     21   .load() \
     22   .withColumns( {
     23             "date": F.date_format(F.col("date_ingested"), "yyyy-MM-dd"),
     24             "hour": F.date_format(F.col("date_ingested"), "HH"),
     25         }) \
---> 26   .load()
     27 display(df)
File /databricks/spark/python/pyspark/instrumentation_utils.py:47, in _wrap_function.<locals>.wrapper(*args, **kwargs)
     45 start = time.perf_counter()
     46 try:
---> 47     res = func(*args, **kwargs)
     48     logger.log_success(
     49         module_name, class_name, function_name, time.perf_counter() - start, signature
     50     )
     51     return res
File /databricks/spark/python/pyspark/sql/dataframe.py:3745, in DataFrame.__getattr__(self, name)
   3712 def __getattr__(self, name: str) -> Column:
   3713     """Returns the :class:`Column` denoted by ``name``.
   3714 
   3715     .. versionadded:: 1.3.0
   (...)
   3743     +---+
   3744     """
-> 3745     if name not in self.columns:
   3746         raise PySparkAttributeError(
   3747             error_class="ATTRIBUTE_NOT_SUPPORTED", message_parameters={"attr_name": name}
   3748         )
   3749     jc = self._jdf.apply(name)
File /databricks/spark/python/pyspark/instrumentation_utils.py:75, in _wrap_property.<locals>.wrapper(self)
     71 @property  # type: ignore[misc]
     72 def wrapper(self: Any) -> Any:
     73     if hasattr(_local, "logging") and _local.logging:
     74         # no need to log since this should be internal call.
---> 75         return prop.fget(self)
     76     _local.logging = True
     77     try:
File /databricks/spark/python/pyspark/sql/dataframe.py:2592, in DataFrame.columns(self)
   2517 @property
   2518 def columns(self) -> List[str]:
   2519     """
   2520     Retrieves the names of all columns in the :class:`DataFrame` as a list.
   2521 
   (...)
   2590     False
   2591     """
-> 2592     return [f.name for f in self.schema.fields]
File /databricks/spark/python/pyspark/instrumentation_utils.py:75, in _wrap_property.<locals>.wrapper(self)
     71 @property  # type: ignore[misc]
     72 def wrapper(self: Any) -> Any:
     73     if hasattr(_local, "logging") and _local.logging:
     74         # no need to log since this should be internal call.
---> 75         return prop.fget(self)
     76     _local.logging = True
     77     try:
File /databricks/spark/python/pyspark/sql/dataframe.py:660, in DataFrame.schema(self)
    656         self._schema = cast(
    657             StructType, _parse_datatype_json_string(self._jdf.schema().json())
    658         )
    659     except Exception as e:
--> 660         raise PySparkValueError(
    661             error_class="CANNOT_PARSE_DATATYPE",
    662             message_parameters={"error": str(e)},
    663         )
    664 return self._schema
File /databricks/spark/python/pyspark/errors/exceptions/base.py:45, in PySparkException.__init__(self, message, error_class, message_parameters, query_contexts)
     42 self._error_reader = ErrorClassesReader()
     44 if message is None:
---> 45     self._message = self._error_reader.get_error_message(
     46         cast(str, error_class), cast(Dict[str, str], message_parameters)
     47     )
     48 else:
     49     self._message = message
File /databricks/spark/python/pyspark/errors/utils.py:39, in ErrorClassesReader.get_error_message(self, error_class, message_parameters)
     37 # Verify message parameters.
     38 message_parameters_from_template = re.findall("<([a-zA-Z0-9_-]+)>", message_template)
---> 39 assert set(message_parameters_from_template) == set(message_parameters), (
     40     f"Undefined error message parameter for error class: {error_class}. "
     41     f"Parameters: {message_parameters}"
     42 )
     44 def replace_match(match: Match[str]) -> str:
     45     return match.group().translate(str.maketrans("<>", "{}"))

Any idea how to resolve this?

1 REPLY 1

Kaniz_Fatma
Community Manager
Community Manager

Hi @gweakliem, First, ensure that the google-cloud-spark package is installed in your Python environment. This package is necessary for integrating with Google Cloud services. 

Next, ensure that your Spark environment is correctly configured to use the BigQuery connector. This involves specifying the connector JAR file when submitting your Spark job. You can do this by adding the JAR file to your Spark configuration.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group