Exception when saving data to offline FS

Hello,

i am trying to stream data with Kafka and save it to the offline Feature Store in the end. With the online feature store everything works fine, but with the offline FS i get an error i cant manage to resolve:

Terminated with exception: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
File “/srv/hops/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py”, line 2381, in _call_proxy
return_value = getattr(self.pool[obj_id], method)(*params)
File “/srv/hops/spark/python/lib/pyspark.zip/pyspark/sql/utils.py”, line 191, in call
raise e
File “/srv/hops/spark/python/lib/pyspark.zip/pyspark/sql/utils.py”, line 188, in call
self.func(DataFrame(jdf, self.sql_ctx), batch_id)
File “”, line 12, in foreach_batch_header
File “/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/feature_group.py”, line 728, in insert
write_options,
File “/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/core/feature_group_engine.py”, line 111, in insert
validation_id,
File “/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/engine/spark.py”, line 209, in save_dataframe
validation_id,
File “/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/engine/spark.py”, line 286, in _save_offline_dataframe
dataframe, self.APPEND, operation, write_options, validation_id
File “/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/core/hudi_engine.py”, line 102, in save_hudi_fg
dataset, save_mode, operation, write_options
File “/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/core/hudi_engine.py”, line 129, in _write_hudi_dataset
).save(self._base_path)
File “/srv/hops/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py”, line 734, in save
self._jwrite.save(path)
File “/srv/hops/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py”, line 1257, in call
answer, self.gateway_client, self.target_id, self.name)
File “/srv/hops/spark/python/lib/pyspark.zip/pyspark/sql/utils.py”, line 63, in deco
return f(*a, **kw)
File “/srv/hops/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py”, line 328, in get_return_value
format(target_id, “.”, name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o214.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, ip-10-0-0-33.eu-central-1.compute.internal, executor 2): org.apache.hudi.exception.HoodieException: The value of seq can not be null
at org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:436)
at org.apache.hudi.HoodieSparkSqlWriter$$anonfun$2.apply(HoodieSparkSqlWriter.scala:149)
at org.apache.hudi.HoodieSparkSqlWriter$$anonfun$2.apply(HoodieSparkSqlWriter.scala:147)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) …

My setup for the offline FS is this:

def foreach_batch_header(batchDF, epoch_id):

batchDF.persist()

print(epoch_id)

extra_hudi_options = {

"hoodie.bulkinsert.shuffle.parallelism":"1",    

"hoodie.insert.shuffle.parallelism":"1",

"hoodie.upsert.shuffle.parallelism":"1",

"hoodie.parquet.compression.ratio":"0.5"

}

# Transform and write batchDF

#imu_header.statistics_config = {"enabled": False, "histograms": False, "correlations": False}

imu_header.insert(batchDF,write_options=extra_hudi_options, storage="offline")

batchDF.unpersist()

offline_query_header = df_header.writeStream.foreachBatch(foreach_batch_header)\

                .option("checkpointLocation", hdfs.project_path() + "/Resources/checkpoint-card")\

                .start()

For further information I added my Jupyter Notebook.
Consumer_FS_Alex.ipynb (40.8 KB)

It would be nice if you could give me any suggestions what i am doing wrong.

Kind Regards,
Alex