Adding external Library to Pyspark (multiple Jars)

Hello,

i wrote a code snippet on my local machine and want to migrate it to hopsworks via Jupyter Notebooks now. For this code snippet in pyspark I need an external library called ABRiS. On my local machine, I implemented this scala library in pyspark like this:

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.column import Column, _to_java_column
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,za.co.absa:abris_2.12:5.0.0 --repositories https://packages.confluent.io/maven/ pyspark-shell'

and later im calling the scala library like this:

    def from_avro_abris_config(config_map, topic, is_key):
        """
        Create from avro abris config with a schema url

        :param config_map (dict[str, str]): configuration map to pass to deserializer, ex: {'schema.registry.url': 'http://localhost:8081'}
        :param topic (str): kafka topic
        :param is_key (bool): boolean
        :return: za.co.absa.abris.config.FromAvroConfig
        """
        jvm_gateway = SparkContext._active_spark_context._gateway.jvm
        scala_map = jvm_gateway.PythonUtils.toScalaMap(config_map)

        return jvm_gateway.za.co.absa.abris.config \
            .AbrisConfig \
            .fromConfluentAvro() \
            .downloadReaderSchemaByLatestVersion() \
            .andTopicNameStrategy(topic, is_key) \
            .usingSchemaRegistry(scala_map)

I figured out that there is an option to add Jars to the Pyspark session when creating the Jupyter Spark session. My two question are:
Is there an option to include a whole folder of Jars because clicking on every single JAR out of 30 is very frustrating.
Or is there a better way to include librarys like Abris to the spark session.

Thanks for your help and Kind Regards,
Alex

Hi Alex,
from the UI it’s currently not possible to attach a ZIP file with Jars or multiple Jars directly, except as you said by clicking every Jar manually.

If you are using the current nightly version of Hopsworks (2.5.0-SNAPHOT/master) we have added the possibility to have a default Spark configuration, so with that you would have to add the 30 Jars only once.

Alternatively, what you can try is to upload the Jars as a ZIP file to the filesystem, however, for that you need to upload them to the local filesystem on all machines that run Spark applications (workers). You will need ssh access for that. And then add them to the Spark classpath by setting the following extra properties:

spark.driver.extraClassPath=path/to/zip
spark.executor.extraClassPath=path/to/zip

If you have SSH access to all machines, you can also upload the Jars to a location which is already included in the classpath. That would be: /srv/hops/spark/hopsworks-jars/ for example on all of the machines.

Let me know if any of these options seems suitable for you.

Hello Moritz,

thanks for the response. I am using the current version of Hopsworks and decided to add the jars to the default Spark configuration for once. It was a bit annoying but it’s working at least :slight_smile:.

But I am still struggeling to get my code snippet to work with hopsworks integration. When i ran my kafka consumer on my local machine everything worked out fine, but when i try to match the settings to use kafka/schema registry from hopsworks i still get this error:

Traceback (most recent call last):
  File "/home/student/ameise-data-handling/Code/Kafka_Stream/venv/lib/python3.8/site-packages/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/home/student/ameise-data-handling/Code/Kafka_Stream/venv/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o51.usingSchemaRegistry.
: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unrecognized field "type" (class io.confluent.kafka.schemaregistry.client.rest.entities.ErrorMessage), not marked as ignorable (2 known properties: "error_code", "message"])
 at [Source: (sun.net.www.protocol.http.HttpURLConnection$HttpInputStream); line: 1, column: 141] (through reference chain: io.confluent.kafka.schemaregistry.client.rest.entities.ErrorMessage["type"]); error code: 50005
	at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:236)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:265)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:524)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:516)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getLatestSchemaMetadata(CachedSchemaRegistryClient.java:275)
	at za.co.absa.abris.avro.registry.ConfluentRegistryClient.getLatestSchemaMetadata(ConfluentRegistryClient.scala:39)
	at za.co.absa.abris.avro.read.confluent.SchemaManager.getSchemaMetadataBySubjectAndVersion(SchemaManager.scala:66)
	at za.co.absa.abris.avro.read.confluent.SchemaManager.getSchemaBySubjectAndVersion(SchemaManager.scala:55)
	at za.co.absa.abris.avro.read.confluent.SchemaManager.getSchema(SchemaManager.scala:46)
	at za.co.absa.abris.config.FromSchemaDownloadingConfigFragment.usingSchemaRegistry(Config.scala:250)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)

The error occurs the first time i use the library:

def from_avro_abris_config(config_map, topic, is_key):
    """
    Create from avro abris config with a schema url

    :param config_map (dict[str, str]): configuration map to pass to deserializer, ex: {'schema.registry.url': 'http://localhost:8081'}
    :param topic (str): kafka topic
    :param is_key (bool): boolean
    :return: za.co.absa.abris.config.FromAvroConfig
    """
    jvm_gateway = SparkContext._active_spark_context._gateway.jvm
    scala_map = jvm_gateway.PythonUtils.toScalaMap(config_map)

    return jvm_gateway.za.co.absa.abris.config \
        .AbrisConfig \
        .fromConfluentAvro() \
        .downloadReaderSchemaByLatestVersion() \
        .andTopicNameStrategy(topic, is_key) \
        .usingSchemaRegistry(scala_map)

from_avro_abris_settings = from_avro_abris_config({'schema.registry.url': registry_url}, conf['kafka']['topic'], False)

I’m not sure but this could maybe be cause due to a missmatch of the ABRiS library with the schema registry version used on hopsworks. Is it possible to change the schema registry version used on hopsworks? Or do you have any idea what else could possibly cause this issue?