i wrote a code snippet on my local machine and want to migrate it to hopsworks via Jupyter Notebooks now. For this code snippet in pyspark I need an external library called ABRiS. On my local machine, I implemented this scala library in pyspark like this:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.column import Column, _to_java_column
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,za.co.absa:abris_2.12:5.0.0 --repositories https://packages.confluent.io/maven/ pyspark-shell'
I figured out that there is an option to add Jars to the Pyspark session when creating the Jupyter Spark session. My two question are:
Is there an option to include a whole folder of Jars because clicking on every single JAR out of 30 is very frustrating.
Or is there a better way to include librarys like Abris to the spark session.
Hi Alex,
from the UI it’s currently not possible to attach a ZIP file with Jars or multiple Jars directly, except as you said by clicking every Jar manually.
If you are using the current nightly version of Hopsworks (2.5.0-SNAPHOT/master) we have added the possibility to have a default Spark configuration, so with that you would have to add the 30 Jars only once.
Alternatively, what you can try is to upload the Jars as a ZIP file to the filesystem, however, for that you need to upload them to the local filesystem on all machines that run Spark applications (workers). You will need ssh access for that. And then add them to the Spark classpath by setting the following extra properties:
If you have SSH access to all machines, you can also upload the Jars to a location which is already included in the classpath. That would be: /srv/hops/spark/hopsworks-jars/ for example on all of the machines.
Let me know if any of these options seems suitable for you.
thanks for the response. I am using the current version of Hopsworks and decided to add the jars to the default Spark configuration for once. It was a bit annoying but it’s working at least .
But I am still struggeling to get my code snippet to work with hopsworks integration. When i ran my kafka consumer on my local machine everything worked out fine, but when i try to match the settings to use kafka/schema registry from hopsworks i still get this error:
Traceback (most recent call last):
File "/home/student/ameise-data-handling/Code/Kafka_Stream/venv/lib/python3.8/site-packages/pyspark/sql/utils.py", line 111, in deco
return f(*a, **kw)
File "/home/student/ameise-data-handling/Code/Kafka_Stream/venv/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o51.usingSchemaRegistry.
: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unrecognized field "type" (class io.confluent.kafka.schemaregistry.client.rest.entities.ErrorMessage), not marked as ignorable (2 known properties: "error_code", "message"])
at [Source: (sun.net.www.protocol.http.HttpURLConnection$HttpInputStream); line: 1, column: 141] (through reference chain: io.confluent.kafka.schemaregistry.client.rest.entities.ErrorMessage["type"]); error code: 50005
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:236)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:265)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:524)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:516)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getLatestSchemaMetadata(CachedSchemaRegistryClient.java:275)
at za.co.absa.abris.avro.registry.ConfluentRegistryClient.getLatestSchemaMetadata(ConfluentRegistryClient.scala:39)
at za.co.absa.abris.avro.read.confluent.SchemaManager.getSchemaMetadataBySubjectAndVersion(SchemaManager.scala:66)
at za.co.absa.abris.avro.read.confluent.SchemaManager.getSchemaBySubjectAndVersion(SchemaManager.scala:55)
at za.co.absa.abris.avro.read.confluent.SchemaManager.getSchema(SchemaManager.scala:46)
at za.co.absa.abris.config.FromSchemaDownloadingConfigFragment.usingSchemaRegistry(Config.scala:250)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
The error occurs the first time i use the library:
I’m not sure but this could maybe be cause due to a missmatch of the ABRiS library with the schema registry version used on hopsworks. Is it possible to change the schema registry version used on hopsworks? Or do you have any idea what else could possibly cause this issue?