"No module named google.cloud.spark" errors querying BigQuery

gweakliem
New Contributor

Personal Cluster 15.3 ML, Running the following notebook:

import pyspark.sql.functions as F
from datetime import datetime, timedelta

spark.sparkContext.addPyFile("gs://spark-lib/bigquery/spark-bigquery-support-0.26.0.zip")

target_hour = datetime(2024, 7, 24, 15)  # Year, Month, Day, Hour
start_time = target_hour.strftime("%Y-%m-%d %H:%M:%S")
end_time = (target_hour + timedelta(hours=1)).strftime("%Y-%m-%d %H:%M:%S")

df = spark.read.format("bigquery") \
  .option("project", "e24-prod-data") \
  .option("parentProject", "databricks-424220") \
  .option("table", input_table_name) \
  .option("filter", f"date_ingested >= '{start_time}' AND date_ingested < '{end_time}'") \
  .load() \
  .withColumns( {
            "date": F.date_format(F.col("date_ingested"), "yyyy-MM-dd"),
            "hour": F.date_format(F.col("date_ingested"), "HH"),
        }) \
  .load()
display(df)

I get the following error

AssertionError: Undefined error message parameter for error class: CANNOT_PARSE_DATATYPE. Parameters: {'error': "No module named 'google.cloud.spark'"}
File /databricks/spark/python/pyspark/sql/dataframe.py:657, in DataFrame.schema(self)
    655 try:
    656     self._schema = cast(
--> 657         StructType, _parse_datatype_json_string(self._jdf.schema().json())
    658     )
    659 except Exception as e:
File /databricks/spark/python/pyspark/sql/types.py:1817, in _parse_datatype_json_string(json_string)
   1763 """Parses the given data type JSON string.
   1764 
   1765 Examples
   (...)
   1815 >>> check_datatype(complex_maptype)
   1816 """
-> 1817 return _parse_datatype_json_value(json.loads(json_string))
File /databricks/spark/python/pyspark/sql/types.py:1877, in _parse_datatype_json_value(json_value, fieldPath, collationsMap)
   1876         return MapType.fromJson(json_value, fieldPath, collationsMap)
-> 1877     return StructType.fromJson(json_value)
   1878 elif tpe == "udt":
File /databricks/spark/python/pyspark/sql/types.py:1361, in StructType.fromJson(cls, json)
   1270 """
   1271 Constructs :class:`StructType` from a schema defined in JSON format.
   1272 
   (...)
   1359 'struct<Person:struct<name:string,surname:string>>'
   1360 """
-> 1361 return StructType([StructField.fromJson(f) for f in json["fields"]])
File /databricks/spark/python/pyspark/sql/types.py:1361, in <listcomp>(.0)
   1270 """
   1271 Constructs :class:`StructType` from a schema defined in JSON format.
   1272 
   (...)
   1359 'struct<Person:struct<name:string,surname:string>>'
   1360 """
-> 1361 return StructType([StructField.fromJson(f) for f in json["fields"]])
File /databricks/spark/python/pyspark/sql/types.py:954, in StructField.fromJson(cls, json)
    948     metadata = {
    949         key: value for key, value in metadata.items() if key != _COLLATIONS_METADATA_KEY
    950     }
    952 return StructField(
    953     json["name"],
--> 954     _parse_datatype_json_value(json["type"], json["name"], collationsMap),
    955     json.get("nullable", True),
    956     metadata,
    957 )
File /databricks/spark/python/pyspark/sql/types.py:1877, in _parse_datatype_json_value(json_value, fieldPath, collationsMap)
   1876         return MapType.fromJson(json_value, fieldPath, collationsMap)
-> 1877     return StructType.fromJson(json_value)
   1878 elif tpe == "udt":
File /databricks/spark/python/pyspark/sql/types.py:1361, in StructType.fromJson(cls, json)
   1270 """
   1271 Constructs :class:`StructType` from a schema defined in JSON format.
   1272 
   (...)
   1359 'struct<Person:struct<name:string,surname:string>>'
   1360 """
-> 1361 return StructType([StructField.fromJson(f) for f in json["fields"]])
File /databricks/spark/python/pyspark/sql/types.py:1361, in <listcomp>(.0)
   1270 """
   1271 Constructs :class:`StructType` from a schema defined in JSON format.
   1272 
   (...)
   1359 'struct<Person:struct<name:string,surname:string>>'
   1360 """
-> 1361 return StructType([StructField.fromJson(f) for f in json["fields"]])
File /databricks/spark/python/pyspark/sql/types.py:954, in StructField.fromJson(cls, json)
    948     metadata = {
    949         key: value for key, value in metadata.items() if key != _COLLATIONS_METADATA_KEY
    950     }
    952 return StructField(
    953     json["name"],
--> 954     _parse_datatype_json_value(json["type"], json["name"], collationsMap),
    955     json.get("nullable", True),
    956     metadata,
    957 )
File /databricks/spark/python/pyspark/sql/types.py:1879, in _parse_datatype_json_value(json_value, fieldPath, collationsMap)
   1878 elif tpe == "udt":
-> 1879     return UserDefinedType.fromJson(json_value)
   1880 else:
File /databricks/spark/python/pyspark/sql/types.py:1565, in UserDefinedType.fromJson(cls, json)
   1564 pyClass = pyUDT[split + 1 :]
-> 1565 m = __import__(pyModule, globals(), locals(), [pyClass])
   1566 if not hasattr(m, pyClass):
ModuleNotFoundError: No module named 'google.cloud.spark'

During handling of the above exception, another exception occurred:
AssertionError                            Traceback (most recent call last)
File <command-2993807820048799>, line 26
     10 # use with .option("query", query) instead of option table and filter
     11 query = f"""SELECT *
     12 FROM ev24_ep_data_warehouse_v1_prd.metabase_news__2020_2029__common_v1__source_text_common_fields
     13 WHERE date_ingested BETWEEN '{start_time}' '{end_time}'
     14 """
     16 df = spark.read.format("bigquery") \
     17   .option("project", "e24-prod-data") \
     18   .option("parentProject", "databricks-424220") \
     19   .option("table", input_table_name) \
     20   .option("filter", f"date_ingested >= '{start_time}' AND date_ingested < '{end_time}'") \
     21   .load() \
     22   .withColumns( {
     23             "date": F.date_format(F.col("date_ingested"), "yyyy-MM-dd"),
     24             "hour": F.date_format(F.col("date_ingested"), "HH"),
     25         }) \
---> 26   .load()
     27 display(df)
File /databricks/spark/python/pyspark/instrumentation_utils.py:47, in _wrap_function.<locals>.wrapper(*args, **kwargs)
     45 start = time.perf_counter()
     46 try:
---> 47     res = func(*args, **kwargs)
     48     logger.log_success(
     49         module_name, class_name, function_name, time.perf_counter() - start, signature
     50     )
     51     return res
File /databricks/spark/python/pyspark/sql/dataframe.py:3745, in DataFrame.__getattr__(self, name)
   3712 def __getattr__(self, name: str) -> Column:
   3713     """Returns the :class:`Column` denoted by ``name``.
   3714 
   3715     .. versionadded:: 1.3.0
   (...)
   3743     +---+
   3744     """
-> 3745     if name not in self.columns:
   3746         raise PySparkAttributeError(
   3747             error_class="ATTRIBUTE_NOT_SUPPORTED", message_parameters={"attr_name": name}
   3748         )
   3749     jc = self._jdf.apply(name)
File /databricks/spark/python/pyspark/instrumentation_utils.py:75, in _wrap_property.<locals>.wrapper(self)
     71 @property  # type: ignore[misc]
     72 def wrapper(self: Any) -> Any:
     73     if hasattr(_local, "logging") and _local.logging:
     74         # no need to log since this should be internal call.
---> 75         return prop.fget(self)
     76     _local.logging = True
     77     try:
File /databricks/spark/python/pyspark/sql/dataframe.py:2592, in DataFrame.columns(self)
   2517 @property
   2518 def columns(self) -> List[str]:
   2519     """
   2520     Retrieves the names of all columns in the :class:`DataFrame` as a list.
   2521 
   (...)
   2590     False
   2591     """
-> 2592     return [f.name for f in self.schema.fields]
File /databricks/spark/python/pyspark/instrumentation_utils.py:75, in _wrap_property.<locals>.wrapper(self)
     71 @property  # type: ignore[misc]
     72 def wrapper(self: Any) -> Any:
     73     if hasattr(_local, "logging") and _local.logging:
     74         # no need to log since this should be internal call.
---> 75         return prop.fget(self)
     76     _local.logging = True
     77     try:
File /databricks/spark/python/pyspark/sql/dataframe.py:660, in DataFrame.schema(self)
    656         self._schema = cast(
    657             StructType, _parse_datatype_json_string(self._jdf.schema().json())
    658         )
    659     except Exception as e:
--> 660         raise PySparkValueError(
    661             error_class="CANNOT_PARSE_DATATYPE",
    662             message_parameters={"error": str(e)},
    663         )
    664 return self._schema
File /databricks/spark/python/pyspark/errors/exceptions/base.py:45, in PySparkException.__init__(self, message, error_class, message_parameters, query_contexts)
     42 self._error_reader = ErrorClassesReader()
     44 if message is None:
---> 45     self._message = self._error_reader.get_error_message(
     46         cast(str, error_class), cast(Dict[str, str], message_parameters)
     47     )
     48 else:
     49     self._message = message
File /databricks/spark/python/pyspark/errors/utils.py:39, in ErrorClassesReader.get_error_message(self, error_class, message_parameters)
     37 # Verify message parameters.
     38 message_parameters_from_template = re.findall("<([a-zA-Z0-9_-]+)>", message_template)
---> 39 assert set(message_parameters_from_template) == set(message_parameters), (
     40     f"Undefined error message parameter for error class: {error_class}. "
     41     f"Parameters: {message_parameters}"
     42 )
     44 def replace_match(match: Match[str]) -> str:
     45     return match.group().translate(str.maketrans("<>", "{}"))

Any idea how to resolve this?