Error when trying to save to online feature group

Hi,

I am trying to save a Dataframe to a ‘online_enabled’ feature group. The respective ingestion job fails with the following error:

2021-07-07 15:43:01,317 INFO demo_fs_alexande,steri_features_1_true_1_insert_fg_07072021153825,1085,application_1625227013828_0026 PrometheusSink: role=driver, job=application_1625227013828_0026

Traceback (most recent call last):
File “hsfs_utils-2.2.0.py”, line 110, in
insert_fg(spark, job_conf)
File “hsfs_utils-2.2.0.py”, line 48, in insert_fg
fg.insert(df, write_options=job_conf.pop(“write_options”, {}) or {})
File “/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/feature_group.py”, line 708, in insert
write_options,
File “/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/core/feature_group_engine.py”, line 101, in insert
validation_id,
File “/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/engine/spark.py”, line 220, in save_dataframe
self._save_online_dataframe(feature_group, dataframe, online_write_options)
File “/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/engine/spark.py”, line 300, in _save_online_dataframe
feature_group, self._encode_complex_features(feature_group, dataframe)
File “/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/engine/spark.py”, line 315, in _encode_complex_features
for field in json.loads(feature_group.avro_schema)[“fields”]
File “/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/feature_group.py”, line 1153, in avro_schema
self._avro_schema = self._feature_group_engine.get_avro_schema(self)
File “/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/core/feature_group_engine.py”, line 196, in get_avro_schema
return self._kafka_api.get_topic_subject(feature_group._online_topic_name)
File “/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/core/kafka_api.py”, line 32, in get_topic_subject
return _client._send_request(“GET”, path_params, headers=headers)[“schema”]
File “/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/decorators.py”, line 35, in if_connected
return fn(inst, *args, **kwargs)
File “/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/client/base.py”, line 147, in _send_request
raise exceptions.RestAPIError(url, response)
hsfs.client.exceptions.RestAPIError: Metadata operation error: (url: https://hopsworks.glassfish.service.consul:8182/hopsworks-api/api/project/1149/kafka/topics/1149_1068_steri_features_1_true_1_onlinefs/subjects). Server response:
HTTP code: 404, HTTP reason: Not Found, error code: 190010, error msg: Topic is not shared with project, user msg: topic: 1149_1068_steri_features_1_true_1_onlinefs, project: demo_fs_alexande
2021-07-07 15:43:02,404 ERROR demo_fs_alexande,steri_features_1_true_1_insert_fg_07072021153825,1085,application_1625227013828_0026 ApplicationMaster: User application exited with status 1

I also tried to share the auto-generated topic with my project. However, I cannot find it there in the selection mask.

Can you please help me?

Kind regards
Alex

Hi @alex_s,

I suspect something went wrong during the feature group creation. Can you check if you have the kafka topic for the feature group, in the Kafka section of the UI?
What happens if you trigger the job again manually from the UI?


Fabio

Hi Fabio,

thank you for your feedback. The topic mentioned in the error message is missing in the Kafka section. Also if I manually rerun the job from the UI it is still failing with the same error message.

Kind regards
Alex

@alex_s, Can you try to re-create the feature group? I’m wondering if it was a transient error or not?

Do you have a sample notebook/python file you can share which we can use to investigate the issue?

Hi Fabio,

thank you again for your help.

I think the problem with the Kafka topic was an error on my side. However, the storing to an online feature group still does not work from my local Python environment. I created a small example:
import hsfs
import json
import pandas as pd

configFile = "ConfigFiles/hopsworksConfig.json"
dictConfig = json.load(open(configFile,'r'))
    
hostInstance,projectName,apiKey,featureStoreName = dictConfig["hopsworksHost"],dictConfig["hopsworksProject"],dictConfig["hopsworksApiKey"],dictConfig["featureStoreName"]
conn = hsfs.connection(host=hostInstance,project=projectName,api_key_value=apiKey)

fs = conn.get_feature_store(featureStoreName)

fg = fs.create_feature_group("test_feature_group",version=1,
                            description="test",
                            primary_key=["Index"],
                            online_enabled=True,
                            statistics_config=None)

dfFeatures = pd.DataFrame([[0,4.5,6.7],[1,1.3,2.4]],columns = ["Index","Feature1","Feature2"])
fg.save(dfFeatures) 

The above configuration parameters have to be added to the configuration file in json format.

The respective job starts in the UI in my case but leads to the following errors in the output logs:

Driver stacktrace:
2021-07-13 09:44:59,797 INFO demo_fs_alexande,test_feature_group_1_insert_fg_13072021094131,1116,application_1625838568216_0015 DAGScheduler: Job 12 failed: save at NativeMethodAccessorImpl.java:0, took 1.351363 s
Traceback (most recent call last):
  File "hsfs_utils-2.2.0.py", line 110, in <module>
    insert_fg(spark, job_conf)
  File "hsfs_utils-2.2.0.py", line 48, in insert_fg
    fg.insert(df, write_options=job_conf.pop("write_options", {}) or {})
  File "/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/feature_group.py", line 728, in insert
    write_options,
  File "/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/core/feature_group_engine.py", line 101, in insert
    validation_id,
  File "/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/engine/spark.py", line 220, in save_dataframe
    self._save_online_dataframe(feature_group, dataframe, online_write_options)
  File "/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/engine/spark.py", line 303, in _save_online_dataframe
    "topic", feature_group._online_topic_name
  File "/srv/hops/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 732, in save
  File "/srv/hops/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/srv/hops/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/srv/hops/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o215.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 31.0 failed 4 times, most recent failure: Lost task 0.3 in stage 31.0 (TID 13515, demojun17-worker-2.internal.cloudapp.net, executor 2): org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Catalyst type IntegerType to Avro type "long".
	at org.apache.spark.sql.avro.AvroSerializer.org$apache$spark$sql$avro$AvroSerializer$$newConverter(AvroSerializer.scala:197)
	at org.apache.spark.sql.avro.AvroSerializer$$anonfun$3.apply(AvroSerializer.scala:209)
	at org.apache.spark.sql.avro.AvroSerializer$$anonfun$3.apply(AvroSerializer.scala:208)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.immutable.List.map(List.scala:296)
	at org.apache.spark.sql.avro.AvroSerializer.newStructConverter(AvroSerializer.scala:208)
	at org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:51)
	at org.apache.spark.sql.avro.CatalystDataToAvro.serializer$lzycompute(CatalystDataToAvro.scala:42)
	at org.apache.spark.sql.avro.CatalystDataToAvro.serializer(CatalystDataToAvro.scala:41)
	at org.apache.spark.sql.avro.CatalystDataToAvro.nullSafeEval(CatalystDataToAvro.scala:54)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.kafka010.KafkaWriteTask.execute(KafkaWriteTask.scala:45)
	at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$1.apply$mcV$sp(KafkaWriter.scala:89)
	at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$1.apply(KafkaWriter.scala:89)
	at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$1.apply(KafkaWriter.scala:89)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1.apply(KafkaWriter.scala:89)
	at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1.apply(KafkaWriter.scala:87)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:935)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:933)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:933)
	at org.apache.spark.sql.kafka010.KafkaWriter$.write(KafkaWriter.scala:87)
	at org.apache.spark.sql.kafka010.KafkaSourceProvider.createRelation(KafkaSourceProvider.scala:254)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:268)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Catalyst type IntegerType to Avro type "long".
	at org.apache.spark.sql.avro.AvroSerializer.org$apache$spark$sql$avro$AvroSerializer$$newConverter(AvroSerializer.scala:197)
	at org.apache.spark.sql.avro.AvroSerializer$$anonfun$3.apply(AvroSerializer.scala:209)
	at org.apache.spark.sql.avro.AvroSerializer$$anonfun$3.apply(AvroSerializer.scala:208)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.immutable.List.map(List.scala:296)
	at org.apache.spark.sql.avro.AvroSerializer.newStructConverter(AvroSerializer.scala:208)
	at org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:51)
	at org.apache.spark.sql.avro.CatalystDataToAvro.serializer$lzycompute(CatalystDataToAvro.scala:42)
	at org.apache.spark.sql.avro.CatalystDataToAvro.serializer(CatalystDataToAvro.scala:41)
	at org.apache.spark.sql.avro.CatalystDataToAvro.nullSafeEval(CatalystDataToAvro.scala:54)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.kafka010.KafkaWriteTask.execute(KafkaWriteTask.scala:45)
	at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$1.apply$mcV$sp(KafkaWriter.scala:89)
	at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$1.apply(KafkaWriter.scala:89)
	at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$1.apply(KafkaWriter.scala:89)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1.apply(KafkaWriter.scala:89)
	at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1.apply(KafkaWriter.scala:87)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more

Kind regards
Alex

@alex_s,

This is a known issue that we fixed recently. To apply the fix on your instance, you need to replace the Python Job file on HopsFS that runs the ingestion.

On the machine itself, as hdfs user you can do:
/srv/hops/hadoop/bin/hdfs dfs -ls /user/spark

Among the files you should have a file named /user/spark/hsfs_utils-2.3.0-SNAPSHOT.py, you might have a different version, instead of 2.3.0-SNAPSHOT, you might have 2.2.0.

You should remove that file:
/srv/hops/hadoop/bin/hdfs dfs -rm /user/spark/hsfs_utils-2.3.0-SNAPSHOT.py

Download the new version from here: https://repo.hops.works/master/hsfs_utils/hsfs_utils-3.0.0-SNAPSHOT.py

Copy the new version on HopsFS:
/srv/hops/hadoop/bin/hdfs dfs -copyFromLocal hsfs_utils-2.2.16.py /user/spark/hsfs_utils-2.3.0-SNAPSHOT.py

It’s important that the file name of the new file on HopsFS matches the new file you are uploading. So if your original version was 2.2.0 and the file name was hsfs_utils-2.2.0.py the new file name should be the same.

Finally, you need to fix the permissions: /srv/hops/hadoop/bin/hdfs dfs -chmod 755 /user/spark/hsfs_utils-2.3.0-SNAPSHOT.py

After that the ingestion job should work correctly.


Fabio

Hi Fabio,

thank you for the information.
How can I connect to the HopsFS (in the UI or from my local Python environment) on my instance and process the given commands?

Kind regards
Alex

You need to SSH into the instance itself. Where are you running Hopsworks?

I am running a 14-day demo instance of the enterprise edition on <myInstanceId>.cloud.hopsworks.ai

Ok then it’s on us to fix the demo instance. I’ll ping you back when it’s done

@alex_s - can you try to run the job again?

I rerun the job and also started a new version of the same jobs but they are still failing with the follwing error message:
org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Catalyst type IntegerType to Avro type “long”.
at org.apache.spark.sql.avro.AvroSerializer.org$apache$spark$sql$avro$AvroSerializer$$newConverter(AvroSerializer.scala:197)
at org.apache.spark.sql.avro.AvroSerializer$$anonfun$3.apply(AvroSerializer.scala:209)
at org.apache.spark.sql.avro.AvroSerializer$$anonfun$3.apply(AvroSerializer.scala:208)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:296)
at org.apache.spark.sql.avro.AvroSerializer.newStructConverter(AvroSerializer.scala:208)
at org.apache.spark.sql.avro.AvroSerializer.(AvroSerializer.scala:51)
at org.apache.spark.sql.avro.CatalystDataToAvro.serializer$lzycompute(CatalystDataToAvro.scala:42)
at org.apache.spark.sql.avro.CatalystDataToAvro.serializer(CatalystDataToAvro.scala:41)
at org.apache.spark.sql.avro.CatalystDataToAvro.nullSafeEval(CatalystDataToAvro.scala:54)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.kafka010.KafkaWriteTask.execute(KafkaWriteTask.scala:45)
at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$1.apply$mcV$sp(KafkaWriter.scala:89)
at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$1.apply(KafkaWriter.scala:89)
at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$1.apply(KafkaWriter.scala:89)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1.apply(KafkaWriter.scala:89)
at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1.apply(KafkaWriter.scala:87)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

However, it is strange because another job which stores different data to an online store succeeds.

@alex_s, apologies, the instance wasn’t fixed properly. Now it is if you want to try again.

Thank you for the information. However, I cannot try it anymore, because my 14-day trial is over.

@alex_s , we extended your demo period for 2 more weeks. You may have to recreate the project though as it might have already been removed by the demo instance garbage collection mechanism.

@Fabio : Thank you for your efforts and the extension of my demo period. I now created a new project and triggered the same job from my local Python environment. Now I obtain the following error in the output.log of the job:

2021-07-20 10:21:36,855 INFO demo_fs_alexande2,test_feature_group_new4_1_insert_fg_20072021102057,1185,application_1625838568216_0102 StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
log4j:WARN No appenders could be found for logger (org.apache.hadoop.fs.FileSystem).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Connected. Call `.close()` to terminate connection gracefully.
Traceback (most recent call last):
  File "hsfs_utils-2.2.0.py", line 132, in 
    insert_fg(spark, job_conf)
  File "hsfs_utils-2.2.0.py", line 69, in insert_fg
    df = get_fg_spark_df(job_conf, fg.read().schema)
  File "/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/feature_group.py", line 530, in read
    read_options,
  File "/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/constructor/query.py", line 72, in read
    query = self._query_constructor_api.construct_query(self)
  File "/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/core/query_constructor_api.py", line 28, in construct_query
    "PUT", path_params, headers=headers, data=query.json()
  File "/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/decorators.py", line 35, in if_connected
    return fn(inst, *args, **kwargs)
  File "/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/client/base.py", line 147, in _send_request
    raise exceptions.RestAPIError(url, response)
hsfs.client.exceptions.RestAPIError: Metadata operation error: (url: https://hopsworks.glassfish.service.consul:8182/hopsworks-api/api/project/1188/featurestores/query). Server response: 
HTTP code: 404, HTTP reason: Not Found, error code: 270118, error msg: No data is available for feature group with this commit date, user msg: featureGroup: test_feature_group_new4 version 1

Hi @alex_s, did you manage to get around this error? According to the error message you are retrieving data from time travel enabled feature group, but there is no commit available before the date you are providing. Change the requested date.

Let me know if this doesn’t solve the issues.

/Davit

Hi @Davit_Bzhalava,

I am still having the above error and it happens when trying to insert new data, not when querying data.

Kind regards
Alex

Can you send me code snippet how you defined test_feature_group_new4 version 1, what was arguments for fs.create_feature_group?

was it like this when you created:

fg = fs.create_feature_group("test_feature_group_new4",version=1,
                            description="test",
                            primary_key=["Index"],
                            online_enabled=True,
                            statistics_config=None)

dfFeatures = pd.DataFrame([[0,4.5,6.7],[1,1.3,2.4]],columns = ["Index","Feature1","Feature2"])
fg.save(dfFeatures) 

and when you try to insert?

fg = fs.get_feature_group("test_feature_group_new4",version=1)
dfFeatures = pd.DataFrame([[0,4.5,6.7],[1,1.3,2.4]],columns = ["Index","Feature1","Feature2"])
fg.insert(dfFeatures)