Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-26-2022 03:24 AM
I mount an AZ blob storage with this
try:
dbutils.fs.mount(
source = "wasbs://client@client.blob.core.windows.net",
mount_point = "/mnt/client",
extra_configs = {"fs.azure.account.key."+storage_account_name+".blob.core.windows.net": storage_account_access_key})
except Exception as e:
passI then load the data JSON files with:
from pyspark.sql import types
def extra_cat_name(categories):
"""Extract the categories name, handling the fact that different types co-exist because of data migration"""
true = True
false = False
null = None
if isinstance(categories, str):
categories = eval(categories)
try:
return [categories['translatedNames'][0]['translation']]
except Exception as e:
return categories
cat_extract = spark.udf.register("cat_extract", extra_cat_name, types.ArrayType(types.StringType()))
def add_time_from_filename(df, indicator):
"""Add a timestamp column extracted from the filename
:param df: the dataframe to work on
:param indicator: the last word before the date in the file name"""
return df\
.withColumn('file_name', F.input_file_name())\
.withColumn('regtime', F.regexp_extract('file_name', f'(_{indicator}_)(.*)(\.json)', 2))\
.withColumn('ftime', F.to_date(col("regtime"),"yyyy_MM_dd"))\
def load_data():
referential_files = f"/mnt/client/{GIVEN_CLIENT_NAME}/internal_api/referential_data/*.json"
user_files = f"/mnt/client/{GIVEN_CLIENT_NAME}/internal_api/users_data/*.json"
df_users = spark.read.json(user_files)
df = spark.read.json(referential_files)\
.withColumn("data", F.explode("skills")).select("data.*")\
.withColumn('categories', cat_extract('categories'))
df = add_time_from_filename(df, 'referential')