How to stream structured data to online feature groups?

In my mind, online feature groups receive streaming data from kafka and saves the data for near real time consumption.
However, I can’t find any example that does that.
Kafka PySpark Producer Example shows you how to write kafka data into text files, yet the desired destination is the online feature groups. What am i supposed to do with those text files?
Then Kafka PySpark Example only talks about sending simple texts to kafka while desired content is structured data. I tried sending json to kafka and always got the following error while trying the insert_stream(streaming_df) method:
An error was encountered:

An error was encountered:
“cannot resolve ‘cc_num’ given input columns: [timestamptype, partition, key, value, timestamp, topic, offset];;\n’Project ['cc_num, 'num_trans_per_10m, 'avg_amt_per_10m, 'stdev_amt_per_10m]\n± Project [key#56 AS key#70, value#57 AS value#71, topic#58 AS topic#72, partition#59 AS partition#73, offset#60L AS offset#74L, timestamp#61 AS timestamp#75, timestampType#62 AS timestamptype#76]\n ± StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@6e8c8a1b, kafka, Map(kafka.ssl.truststore.location → t_certificate, subscribe → topic1, kafka.ssl.key.password → h2ZTD17PDqA0vtUgfuEW, kafka.ssl.keystore.location → k_certificate, kafka.ssl.truststore.password → h2ZTD17PDqA0vtUgfuEW, kafka.bootstrap.servers → 10.194.10.4:9091, kafka.ssl.keystore.password → h2ZTD17PDqA0vtUgfuEW, kafka.ssl.endpoint.identification.algorithm → , kafka.security.protocol → SSL), [key#56, value#57, topic#58, partition#59, offset#60L, timestamp#61, timestampType#62], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@29962279,kafka,List(),None,List(),None,Map(kafka.ssl.truststore.location → t_certificate, subscribe → topic1, kafka.ssl.key.password → h2ZTD17PDqA0vtUgfuEW, kafka.ssl.keystore.location → k_certificate, kafka.ssl.truststore.password → h2ZTD17PDqA0vtUgfuEW, kafka.bootstrap.servers → 10.194.10.4:9091, kafka.ssl.keystore.password → h2ZTD17PDqA0vtUgfuEW, kafka.ssl.endpoint.identification.algorithm → , kafka.security.protocol → SSL),None), kafka, [key#49, value#50, topic#51, partition#52, offset#53L, timestamp#54, timestampType#55]\n”
Traceback (most recent call last):
File “/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/feature_group.py”, line 813, in insert_stream
write_options,
File “/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/core/feature_group_engine.py”, line 247, in insert_stream
self.get_kafka_config(write_options),
File “/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/engine/spark.py”, line 237, in save_stream_dataframe
feature_group, self._encode_complex_features(feature_group, dataframe)
File “/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/engine/spark.py”, line 315, in _encode_complex_features
for field in json.loads(feature_group.avro_schema)[“fields”]
File “/srv/hops/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py”, line 1320, in select
jdf = self._jdf.select(self._jcols(*cols))
File “/srv/hops/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py”, line 1257, in call
answer, self.gateway_client, self.target_id, self.name)
File “/srv/hops/spark/python/lib/pyspark.zip/pyspark/sql/utils.py”, line 69, in deco
raise AnalysisException(s.split(’: ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: “cannot resolve ‘cc_num’ given input columns: [timestamptype, partition, key, value, timestamp, topic, offset];;\n’Project ['cc_num, 'num_trans_per_10m, 'avg_amt_per_10m, 'stdev_amt_per_10m]\n± Project [key#56 AS key#70, value#57 AS value#71, topic#58 AS topic#72, partition#59 AS partition#73, offset#60L AS offset#74L, timestamp#61 AS timestamp#75, timestampType#62 AS timestamptype#76]\n ± StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@6e8c8a1b, kafka, Map(kafka.ssl.truststore.location → t_certificate, subscribe → topic1, kafka.ssl.key.password → h2ZTD17PDqA0vtUgfuEW, kafka.ssl.keystore.location → k_certificate, kafka.ssl.truststore.password → h2ZTD17PDqA0vtUgfuEW, kafka.bootstrap.servers → 10.194.10.4:9091, kafka.ssl.keystore.password → h2ZTD17PDqA0vtUgfuEW, kafka.ssl.endpoint.identification.algorithm → , kafka.security.protocol → SSL), [key#56, value#57, topic#58, partition#59, offset#60L, timestamp#61, timestampType#62], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@29962279,kafka,List(),None,List(),None,Map(kafka.ssl.truststore.location → t_certificate, subscribe → topic1, kafka.ssl.key.password → h2ZTD17PDqA0vtUgfuEW, kafka.ssl.keystore.location → k_certificate, kafka.ssl.truststore.password → h2ZTD17PDqA0vtUgfuEW, kafka.bootstrap.servers → 10.194.10.4:9091, kafka.ssl.keystore.password → h2ZTD17PDqA0vtUgfuEW, kafka.ssl.endpoint.identification.algorithm → , kafka.security.protocol → SSL),None), kafka, [key#49, value#50, topic#51, partition#52, offset#53L, timestamp#54, timestampType#55]\n”

What’s the use of online feature groups if it does not work with kafka?

Hi Tim,

it is not entirely clear to me what you mean with this:

I tried sending json to kafka and always got the following error while trying the insert_stream(streaming_df) method

What does the JSON contain, and what schema does it have, does it have the same schema as you created the feature group with?

Here is an example on how to ingest a Spark Streaming Dataframe to the online feature store.

The idea is that you read a stram of messages from your own Kafka Topic into a Spark Streaming DF to perform some aggregation/feature engineering on these messages, which are typically window aggregations. You can then pass the resulting Streaming DF to insert_stream(df). From what I see in the error, you are trying to pass a streaming dataframe coming directly from kafka to the online feature store. However, the online feature store expects the dataframe to have the same schema as when you created the feature group the first time. What you see there as [timestamptype, partition, key, value, timestamp, topic, offset] is the schema of a Kafka message, what you probably need to do is taking what’s contained in value and transform it into the schema of the feature group.

OK. That example checks out. Online feature groups work in the PySpark notebook.
However in our case, we are not going to use the Kafka provided by Hopsworks. The engineering team uses their Confluent Kafka. Do you have an example for that?