NoClassDefFoundError while calling .saveAsTable during FeatureStore Insert

Hello,

While trying to run TitanicTrainingData notebook using spark engine; I’m getting ClassNotFoundException HiveException while trying to insert into the feature-group.

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession


conf_spark = SparkConf()

conf_spark.set("spark.driver.host", "127.0.0.1")
conf_spark.set("spark.hadoop.hops.ssl.keystores.passwd.name", "material_passwd")

conf_spark.set("spark.hadoop.fs.hopsfs.impl",                         "io.hops.hopsfs.client.HopsFileSystem")
conf_spark.set("spark.hadoop.hops.ipc.server.ssl.enabled",            "true")
conf_spark.set("spark.hadoop.hops.ssl.hostname.verifier",             "ALLOW_ALL")
conf_spark.set("spark.hadoop.hops.rpc.socket.factory.class.default",  "io.hops.hadoop.shaded.org.apache.hadoop.net.HopsSSLSocketFactory")
conf_spark.set("spark.hadoop.client.rpc.ssl.enabled.protocol",        "TLSv1.2")
conf_spark.set("spark.hadoop.hops.ssl.keystores.passwd.name",         "<path-to-pw>/material_passwd")
conf_spark.set("spark.hadoop.hops.ssl.keystore.name",                 "<path-to-store>/keyStore.jks")
conf_spark.set("spark.hadoop.hops.ssl.trustore.name",                 "<path-to-store>/trustStore.jks")

conf_spark.set("spark.sql.hive.metastore.jars",                       "<path-to-jars>/hopswork_jars")
conf_spark.set("spark.hadoop.hive.metastore.uris",                    "thrift://IP:9085")


# sc = SparkContext.getOrCreate(conf_spark)


spark_session = SparkSession \
               .builder \
               .enableHiveSupport() \
               .config(conf=conf_spark) \
               .getOrCreate()
connection = hsfs.connection(host=IP,
                             port=443,
                             engine='spark',
                             project = PROJECT_NAME,
                             api_key_value = API_KEY)
fs = connection.get_feature_store()

raw_df = spark_session.read.csv(training_csv_path, header=True)
clean_train_df = raw_df.select('survived', 'pclass', 'sex', 'fare', 'age', 'sibsp', 'parch') \
                    .fillna({'age': 30}) 
titantic_fg = fs.create_feature_group(name="titanic_training_all_features",
                                       version=1,
                                       description="titanic training dataset with clean features",
                                       time_travel_format=None,
                                     )


titantic_fg.insert(clean_train_df)

The above code runs into exception while saving dataframe into feature-group.

Py4JJavaError: An error occurred while calling o827.saveAsTable.
: java.lang.NoClassDefFoundError: org/apache/hadoop/hive/ql/metadata/HiveException
	at java.base/java.lang.Class.getDeclaredConstructors0(Native Method)
	at java.base/java.lang.Class.privateGetDeclaredConstructors(Class.java:3137)
	at java.base/java.lang.Class.getConstructors(Class.java:1943)
	at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:291)
	at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:492)
	at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:352)
	at org.apache.spark.sql.hive.HiveExternalCatalog.client$lzycompute(HiveExternalCatalog.scala:71)
	at org.apache.spark.sql.hive.HiveExternalCatalog.client(HiveExternalCatalog.scala:70)
	at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$databaseExists$1(HiveExternalCatalog.scala:224)
	at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
	at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:102)
	at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:224)
	at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:134)
	at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:124)
	at org.apache.spark.sql.hive.HiveSessionStateBuilder.externalCatalog(HiveSessionStateBuilder.scala:44)
	at org.apache.spark.sql.hive.HiveSessionStateBuilder.$anonfun$catalog$1(HiveSessionStateBuilder.scala:59)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.externalCatalog$lzycompute(SessionCatalog.scala:98)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.externalCatalog(SessionCatalog.scala:98)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.tableExists(SessionCatalog.scala:462)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:692)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:626)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.metadata.HiveException
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
	at org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1.doLoadClass(IsolatedClientLoader.scala:247)
	at org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1.loadClass(IsolatedClientLoader.scala:236)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
	... 32 more

And the read from feature group also fails:

tititanic_df = titantic_fg.read()

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
/var/folders/dd/2rm5s2w54zs70s3d5mwsn1kh0000gq/T/ipykernel_57495/2691049954.py in <module>
----> 1 a = titantic_fg.read()

~/opt/anaconda3/envs/py38misc/lib/python3.8/site-packages/hsfs/feature_group.py in read(self, wallclock_time, online, dataframe_type, read_options)
    571             `RestAPIError`. No data is available for feature group with this commit date, If time travel enabled.
    572         """
--> 573         engine.get_instance().set_job_group(
    574             "Fetching Feature group",
    575             "Getting feature group: {} from the featurestore {}".format(

~/opt/anaconda3/envs/py38misc/lib/python3.8/site-packages/hsfs/engine/spark.py in set_job_group(self, group_id, description)
     76 
     77     def set_job_group(self, group_id, description):
---> 78         self._spark_session.sparkContext.setJobGroup(group_id, description)
     79 
     80     def register_on_demand_temporary_table(self, on_demand_fg, alias):

~/opt/anaconda3/envs/py38misc/lib/python3.8/site-packages/pyspark/context.py in setJobGroup(self, groupId, description, interruptOnCancel)
   1140         Cancelled
   1141         """
-> 1142         self._jsc.setJobGroup(groupId, description, interruptOnCancel)
   1143 
   1144     def setLocalProperty(self, key, value):

AttributeError: 'NoneType' object has no attribute 'setJobGroup'

Versions:

  • Python - 3.8
  • pyspark - 3.1.2
  • hsfs[hive] - 2.3.2
  • hopsworks server install - 2.3.0-SNAPSHOT
  • hive2 - 3.0.0.8-SNAPSHOT

Hi @pburman,

The first error looks like your Spark configuration is missing the needed dependencies or there are some conflicts. Maybe you can describe where you are running your Spark deployment and how you are launch your application.

The second error might be a consequence of the first one and it looks like the Spark context is not available.


Fabio

Hi @Fabio, I’m new to spark/hive/hadoop ecosystem, so i might be missing something.
My understanding is that I should be able to remotely connect to Hopsworks using the api_keys and call fg.insert_steam(df) where df is a spark dataframe. The above code is running locally on my mac (including pyspark) inside of a jupyter notebook connecting to Hopsworks server over the vpn network.
I have pyspark library installed locally and configured using SparkConf (as shown above); additionally i copied over the hopsworks jars from ‘/srv/hops/spark/hopsworks-jars’ from hopsworks server to my mac locally. I got the configs from spark integration page: here.

Is there another way to launch a spark app and pass the instance to hopswork client api?

@pburman - The above documentation is meant for running the feature store APIs on external Spark clusters like Cloudera, EMR or HDInsight.

If you are on your laptop, the easiest way to run feature engineering code on Spark is on Hopsworks itself. Hopsworks provides a Jupyter notebook already configured to run both PySpark and Scala spark code.

Go on Hopsworks (Old UI) > Jupyter > Select Spark and run it. From there you’ll get a Jupyter notebook running, on which you can run Spark applications. You’ll also be able to track the application progress from the Spark UI, logs in real time and application metrics.


Fabio

Hi @Fabio, Understood, that was my impression as well. But I wanted to develop a light-weight library which would run inside a micro-service to push data from production into feature store asynchronously. The idea was to leverage steaming functionality (since the data from production would be per prediction and not a batch); thus having a near real-time update inside of feature-store.
Hence, I was trying to leverage pyspark with a single-node spark installation which would run inside the library.
Batching requests over few hours and using ‘fg.insert()’ is definitely an option; but I was hoping something more real-time. Is there a way to get around Cloudera/EMR/HDInsight setup for streaming? Thank You!

HI @pburman,

Ok thanks, now I have a more clear idea on what you are trying to achieve.
Your use case is quite similar to our credit card transaction example. You can see the notebooks at https://examples.hopsworks.ai/ at the Online Feature Store end-to-end section.

Essentially we have a notebook that generates credit card transactions (raw data) and send them to a Kafka topic: Generate credit card transactions data and send to kafka topic - In our case is a Python notebook, but in your case it can be replaced by your microservice.

Then we have another notebook (Windowed aggregations using spark streaming and ingestion to the online feature store) which is the one running the PySpark streaming application, reading from the Kafka topic above, doing the streaming feature engineering and writing the result to the online and offline feature store. The PySpark streaming application runs on Hopsworks.

I believe this architecture has several benefit, starting from the fact that it drastically simplifies your microservice. Instead of running a Spark application within the microservice itself, you can just embed a (lightweight) python library which sends event to a Kafka topic (see the library we are using in the first notebook). Which is more in line with the microservice philosophy.
Secondly, you’ll have a single PySpark application running which will process all the data from all the instances of your microservices running. You won’t have to deal with the PySpark local configuration and you’ll be able to monitor the application itself through the tooling that Hopsworks makes available.


Fabio