#### Code
# CodeImport DataType
from pyspark.sql.types import StructType, StructField, TimestampType, IntegerType, StringType, FloatType, BooleanType, LongType
# Define Custom Schema
call_schema = StructType(
[
StructField("RecordType", StringType(), True),
StructField("SystemIdentity", StringType(), True),
StructField("FileNum", StringType(), True),
StructField("SwitchNum", StringType(), True),
StructField("CallingNum", StringType(), True),
StructField("CallingIMSI", StringType(), True),
StructField("CalledNum", StringType(), True),
StructField("CalledIMSI", StringType(), True),
StructField("DateS", StringType(), True),
StructField("TimeS", StringType(), True),
StructField("TimeType", LongType(), True),
StructField("CallPeriod", LongType(), True),
StructField("CallingCellID", StringType(), True),
StructField("CalledCellID", StringType(), True),
StructField("ServiceType", StringType(), True),
StructField("Transfer", LongType(), True),
StructField("IncomingTrunk", StringType(), True),
StructField("OutgoingTrunk", StringType(), True),
StructField("MSRN", StringType(), True),
StructField("CalledNum2", StringType(), True),
StructField("FCIFlag", StringType(), True),
StructField("callrecTime", TimestampType(), True),
StructField("EventProcessedUtcTime", TimestampType(), True),
StructField("PartitionId", LongType(), True),
StructField("EventEnqueuedUtcTime", TimestampType(), True),
]
)
# Define Delta Live Table
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
json_path = "/mnt/adlspoc23/stream/"
@dlt.table(
comment="The raw Telecom callstream dataset, ingested from ADLS.",
schema=call_schema
)
def callstream_raw():
return (spark.read.format("json").load(json_path))
# Error
org.apache.spark.sql.AnalysisException: Table 'callstream_raw' has a user-specified schema that is incompatible with the schema inferred from its query.