In my mind, online feature groups receive streaming data from kafka and saves the data for near real time consumption.
However, I can’t find any example that does that.
Kafka PySpark Producer Example shows you how to write kafka data into text files, yet the desired destination is the online feature groups. What am i supposed to do with those text files?
Then Kafka PySpark Example only talks about sending simple texts to kafka while desired content is structured data. I tried sending json to kafka and always got the following error while trying the insert_stream(streaming_df) method:
An error was encountered:
An error was encountered:
“cannot resolve ‘cc_num
’ given input columns: [timestamptype, partition, key, value, timestamp, topic, offset];;\n’Project ['cc_num, 'num_trans_per_10m, 'avg_amt_per_10m, 'stdev_amt_per_10m]\n± Project [key#56 AS key#70, value#57 AS value#71, topic#58 AS topic#72, partition#59 AS partition#73, offset#60L AS offset#74L, timestamp#61 AS timestamp#75, timestampType#62 AS timestamptype#76]\n ± StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@6e8c8a1b, kafka, Map(kafka.ssl.truststore.location → t_certificate, subscribe → topic1, kafka.ssl.key.password → h2ZTD17PDqA0vtUgfuEW, kafka.ssl.keystore.location → k_certificate, kafka.ssl.truststore.password → h2ZTD17PDqA0vtUgfuEW, kafka.bootstrap.servers → 10.194.10.4:9091, kafka.ssl.keystore.password → h2ZTD17PDqA0vtUgfuEW, kafka.ssl.endpoint.identification.algorithm → , kafka.security.protocol → SSL), [key#56, value#57, topic#58, partition#59, offset#60L, timestamp#61, timestampType#62], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@29962279,kafka,List(),None,List(),None,Map(kafka.ssl.truststore.location → t_certificate, subscribe → topic1, kafka.ssl.key.password → h2ZTD17PDqA0vtUgfuEW, kafka.ssl.keystore.location → k_certificate, kafka.ssl.truststore.password → h2ZTD17PDqA0vtUgfuEW, kafka.bootstrap.servers → 10.194.10.4:9091, kafka.ssl.keystore.password → h2ZTD17PDqA0vtUgfuEW, kafka.ssl.endpoint.identification.algorithm → , kafka.security.protocol → SSL),None), kafka, [key#49, value#50, topic#51, partition#52, offset#53L, timestamp#54, timestampType#55]\n”
Traceback (most recent call last):
File “/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/feature_group.py”, line 813, in insert_stream
write_options,
File “/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/core/feature_group_engine.py”, line 247, in insert_stream
self.get_kafka_config(write_options),
File “/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/engine/spark.py”, line 237, in save_stream_dataframe
feature_group, self._encode_complex_features(feature_group, dataframe)
File “/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/engine/spark.py”, line 315, in _encode_complex_features
for field in json.loads(feature_group.avro_schema)[“fields”]
File “/srv/hops/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py”, line 1320, in select
jdf = self._jdf.select(self._jcols(*cols))
File “/srv/hops/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py”, line 1257, in call
answer, self.gateway_client, self.target_id, self.name)
File “/srv/hops/spark/python/lib/pyspark.zip/pyspark/sql/utils.py”, line 69, in deco
raise AnalysisException(s.split(’: ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: “cannot resolve ‘cc_num
’ given input columns: [timestamptype, partition, key, value, timestamp, topic, offset];;\n’Project ['cc_num, 'num_trans_per_10m, 'avg_amt_per_10m, 'stdev_amt_per_10m]\n± Project [key#56 AS key#70, value#57 AS value#71, topic#58 AS topic#72, partition#59 AS partition#73, offset#60L AS offset#74L, timestamp#61 AS timestamp#75, timestampType#62 AS timestamptype#76]\n ± StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@6e8c8a1b, kafka, Map(kafka.ssl.truststore.location → t_certificate, subscribe → topic1, kafka.ssl.key.password → h2ZTD17PDqA0vtUgfuEW, kafka.ssl.keystore.location → k_certificate, kafka.ssl.truststore.password → h2ZTD17PDqA0vtUgfuEW, kafka.bootstrap.servers → 10.194.10.4:9091, kafka.ssl.keystore.password → h2ZTD17PDqA0vtUgfuEW, kafka.ssl.endpoint.identification.algorithm → , kafka.security.protocol → SSL), [key#56, value#57, topic#58, partition#59, offset#60L, timestamp#61, timestampType#62], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@29962279,kafka,List(),None,List(),None,Map(kafka.ssl.truststore.location → t_certificate, subscribe → topic1, kafka.ssl.key.password → h2ZTD17PDqA0vtUgfuEW, kafka.ssl.keystore.location → k_certificate, kafka.ssl.truststore.password → h2ZTD17PDqA0vtUgfuEW, kafka.bootstrap.servers → 10.194.10.4:9091, kafka.ssl.keystore.password → h2ZTD17PDqA0vtUgfuEW, kafka.ssl.endpoint.identification.algorithm → , kafka.security.protocol → SSL),None), kafka, [key#49, value#50, topic#51, partition#52, offset#53L, timestamp#54, timestampType#55]\n”
What’s the use of online feature groups if it does not work with kafka?