Personal Cluster 15.3 ML, Running the following notebook:
import pyspark.sql.functions as F
from datetime import datetime, timedelta
spark.sparkContext.addPyFile("gs://spark-lib/bigquery/spark-bigquery-support-0.26.0.zip")
target_hour = datetime(2024, 7, 24, 15) # Year, Month, Day, Hour
start_time = target_hour.strftime("%Y-%m-%d %H:%M:%S")
end_time = (target_hour + timedelta(hours=1)).strftime("%Y-%m-%d %H:%M:%S")
df = spark.read.format("bigquery") \
.option("project", "e24-prod-data") \
.option("parentProject", "databricks-424220") \
.option("table", input_table_name) \
.option("filter", f"date_ingested >= '{start_time}' AND date_ingested < '{end_time}'") \
.load() \
.withColumns( {
"date": F.date_format(F.col("date_ingested"), "yyyy-MM-dd"),
"hour": F.date_format(F.col("date_ingested"), "HH"),
}) \
.load()
display(df)
I get the following error
AssertionError: Undefined error message parameter for error class: CANNOT_PARSE_DATATYPE. Parameters: {'error': "No module named 'google.cloud.spark'"}
File /databricks/spark/python/pyspark/sql/dataframe.py:657, in DataFrame.schema(self)
655 try:
656 self._schema = cast(
--> 657 StructType, _parse_datatype_json_string(self._jdf.schema().json())
658 )
659 except Exception as e:
File /databricks/spark/python/pyspark/sql/types.py:1817, in _parse_datatype_json_string(json_string)
1763 """Parses the given data type JSON string.
1764
1765 Examples
(...)
1815 >>> check_datatype(complex_maptype)
1816 """
-> 1817 return _parse_datatype_json_value(json.loads(json_string))
File /databricks/spark/python/pyspark/sql/types.py:1877, in _parse_datatype_json_value(json_value, fieldPath, collationsMap)
1876 return MapType.fromJson(json_value, fieldPath, collationsMap)
-> 1877 return StructType.fromJson(json_value)
1878 elif tpe == "udt":
File /databricks/spark/python/pyspark/sql/types.py:1361, in StructType.fromJson(cls, json)
1270 """
1271 Constructs :class:`StructType` from a schema defined in JSON format.
1272
(...)
1359 'struct<Person:struct<name:string,surname:string>>'
1360 """
-> 1361 return StructType([StructField.fromJson(f) for f in json["fields"]])
File /databricks/spark/python/pyspark/sql/types.py:1361, in <listcomp>(.0)
1270 """
1271 Constructs :class:`StructType` from a schema defined in JSON format.
1272
(...)
1359 'struct<Person:struct<name:string,surname:string>>'
1360 """
-> 1361 return StructType([StructField.fromJson(f) for f in json["fields"]])
File /databricks/spark/python/pyspark/sql/types.py:954, in StructField.fromJson(cls, json)
948 metadata = {
949 key: value for key, value in metadata.items() if key != _COLLATIONS_METADATA_KEY
950 }
952 return StructField(
953 json["name"],
--> 954 _parse_datatype_json_value(json["type"], json["name"], collationsMap),
955 json.get("nullable", True),
956 metadata,
957 )
File /databricks/spark/python/pyspark/sql/types.py:1877, in _parse_datatype_json_value(json_value, fieldPath, collationsMap)
1876 return MapType.fromJson(json_value, fieldPath, collationsMap)
-> 1877 return StructType.fromJson(json_value)
1878 elif tpe == "udt":
File /databricks/spark/python/pyspark/sql/types.py:1361, in StructType.fromJson(cls, json)
1270 """
1271 Constructs :class:`StructType` from a schema defined in JSON format.
1272
(...)
1359 'struct<Person:struct<name:string,surname:string>>'
1360 """
-> 1361 return StructType([StructField.fromJson(f) for f in json["fields"]])
File /databricks/spark/python/pyspark/sql/types.py:1361, in <listcomp>(.0)
1270 """
1271 Constructs :class:`StructType` from a schema defined in JSON format.
1272
(...)
1359 'struct<Person:struct<name:string,surname:string>>'
1360 """
-> 1361 return StructType([StructField.fromJson(f) for f in json["fields"]])
File /databricks/spark/python/pyspark/sql/types.py:954, in StructField.fromJson(cls, json)
948 metadata = {
949 key: value for key, value in metadata.items() if key != _COLLATIONS_METADATA_KEY
950 }
952 return StructField(
953 json["name"],
--> 954 _parse_datatype_json_value(json["type"], json["name"], collationsMap),
955 json.get("nullable", True),
956 metadata,
957 )
File /databricks/spark/python/pyspark/sql/types.py:1879, in _parse_datatype_json_value(json_value, fieldPath, collationsMap)
1878 elif tpe == "udt":
-> 1879 return UserDefinedType.fromJson(json_value)
1880 else:
File /databricks/spark/python/pyspark/sql/types.py:1565, in UserDefinedType.fromJson(cls, json)
1564 pyClass = pyUDT[split + 1 :]
-> 1565 m = __import__(pyModule, globals(), locals(), [pyClass])
1566 if not hasattr(m, pyClass):
ModuleNotFoundError: No module named 'google.cloud.spark'
During handling of the above exception, another exception occurred:
AssertionError Traceback (most recent call last)
File <command-2993807820048799>, line 26
10 # use with .option("query", query) instead of option table and filter
11 query = f"""SELECT *
12 FROM ev24_ep_data_warehouse_v1_prd.metabase_news__2020_2029__common_v1__source_text_common_fields
13 WHERE date_ingested BETWEEN '{start_time}' '{end_time}'
14 """
16 df = spark.read.format("bigquery") \
17 .option("project", "e24-prod-data") \
18 .option("parentProject", "databricks-424220") \
19 .option("table", input_table_name) \
20 .option("filter", f"date_ingested >= '{start_time}' AND date_ingested < '{end_time}'") \
21 .load() \
22 .withColumns( {
23 "date": F.date_format(F.col("date_ingested"), "yyyy-MM-dd"),
24 "hour": F.date_format(F.col("date_ingested"), "HH"),
25 }) \
---> 26 .load()
27 display(df)
File /databricks/spark/python/pyspark/instrumentation_utils.py:47, in _wrap_function.<locals>.wrapper(*args, **kwargs)
45 start = time.perf_counter()
46 try:
---> 47 res = func(*args, **kwargs)
48 logger.log_success(
49 module_name, class_name, function_name, time.perf_counter() - start, signature
50 )
51 return res
File /databricks/spark/python/pyspark/sql/dataframe.py:3745, in DataFrame.__getattr__(self, name)
3712 def __getattr__(self, name: str) -> Column:
3713 """Returns the :class:`Column` denoted by ``name``.
3714
3715 .. versionadded:: 1.3.0
(...)
3743 +---+
3744 """
-> 3745 if name not in self.columns:
3746 raise PySparkAttributeError(
3747 error_class="ATTRIBUTE_NOT_SUPPORTED", message_parameters={"attr_name": name}
3748 )
3749 jc = self._jdf.apply(name)
File /databricks/spark/python/pyspark/instrumentation_utils.py:75, in _wrap_property.<locals>.wrapper(self)
71 @property # type: ignore[misc]
72 def wrapper(self: Any) -> Any:
73 if hasattr(_local, "logging") and _local.logging:
74 # no need to log since this should be internal call.
---> 75 return prop.fget(self)
76 _local.logging = True
77 try:
File /databricks/spark/python/pyspark/sql/dataframe.py:2592, in DataFrame.columns(self)
2517 @property
2518 def columns(self) -> List[str]:
2519 """
2520 Retrieves the names of all columns in the :class:`DataFrame` as a list.
2521
(...)
2590 False
2591 """
-> 2592 return [f.name for f in self.schema.fields]
File /databricks/spark/python/pyspark/instrumentation_utils.py:75, in _wrap_property.<locals>.wrapper(self)
71 @property # type: ignore[misc]
72 def wrapper(self: Any) -> Any:
73 if hasattr(_local, "logging") and _local.logging:
74 # no need to log since this should be internal call.
---> 75 return prop.fget(self)
76 _local.logging = True
77 try:
File /databricks/spark/python/pyspark/sql/dataframe.py:660, in DataFrame.schema(self)
656 self._schema = cast(
657 StructType, _parse_datatype_json_string(self._jdf.schema().json())
658 )
659 except Exception as e:
--> 660 raise PySparkValueError(
661 error_class="CANNOT_PARSE_DATATYPE",
662 message_parameters={"error": str(e)},
663 )
664 return self._schema
File /databricks/spark/python/pyspark/errors/exceptions/base.py:45, in PySparkException.__init__(self, message, error_class, message_parameters, query_contexts)
42 self._error_reader = ErrorClassesReader()
44 if message is None:
---> 45 self._message = self._error_reader.get_error_message(
46 cast(str, error_class), cast(Dict[str, str], message_parameters)
47 )
48 else:
49 self._message = message
File /databricks/spark/python/pyspark/errors/utils.py:39, in ErrorClassesReader.get_error_message(self, error_class, message_parameters)
37 # Verify message parameters.
38 message_parameters_from_template = re.findall("<([a-zA-Z0-9_-]+)>", message_template)
---> 39 assert set(message_parameters_from_template) == set(message_parameters), (
40 f"Undefined error message parameter for error class: {error_class}. "
41 f"Parameters: {message_parameters}"
42 )
44 def replace_match(match: Match[str]) -> str:
45 return match.group().translate(str.maketrans("<>", "{}"))
Any idea how to resolve this?