Python worker exited unexpectedly (crashed)

I’m getting the following error when I try to run experiment.grid_search(), how can I fix it?

An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 11 in stage 0.0 failed 1 times, most recent failure: Lost task 11.0 in stage 0.0 (TID 11, hadoop18, executor 6): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:486)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:475)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:593)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
	at java.io.DataInputStream.readInt(DataInputStream.java:392)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:578)
	... 26 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:486)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:475)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:593)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.io.EOFException
	at java.io.DataInputStream.readInt(DataInputStream.java:392)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:578)
	... 26 more

Traceback (most recent call last):
  File "/srv/hops/anaconda/anaconda/envs/python36/lib/python3.6/site-packages/hops/experiment.py", line 479, in grid_search
    tensorboard_logdir, param, metric = gs._grid_launch(sc, map_fun, grid_params, direction=direction, local_logdir=local_logdir, name=name)
  File "/srv/hops/anaconda/anaconda/envs/python36/lib/python3.6/site-packages/hops/grid_search.py", line 55, in _grid_launch
    nodeRDD.foreachPartition(_prepare_func(app_id, run_id, map_fun, args_dict, local_logdir))
  File "/srv/hops/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 806, in foreachPartition
    self.mapPartitions(func).count()  # Force evaluation
  File "/srv/hops/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1055, in count
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/srv/hops/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1046, in sum
    return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
  File "/srv/hops/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 917, in fold
    vals = self.mapPartitions(func).collect()
  File "/srv/hops/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 816, in collect
    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  File "/srv/hops/spark/python/lib/py4j-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/srv/hops/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/srv/hops/spark/python/lib/py4j-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 11 in stage 0.0 failed 1 times, most recent failure: Lost task 11.0 in stage 0.0 (TID 11, hadoop18, executor 6): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)

Could you send the sample code to reproduce it?
The error is an IOException - probably it can’t read/write some file.

seems like it fails due to lack of memory:

2020-03-15 20:48:13.624164: I tensorflow/stream_executor/platform/default/dso_loader.cc:42] Successfully opened dynamic library libcudart.so.10.0
2020-03-15 20:48:13.626876: I tensorflow/stream_executor/platform/default/dso_loader.cc:42] Successfully opened dynamic library libcublas.so.10.0
2020-03-15 20:48:13.629286: I tensorflow/stream_executor/platform/default/dso_loader.cc:42] Successfully opened dynamic library libcufft.so.10.0
2020-03-15 20:48:13.629847: I tensorflow/stream_executor/platform/default/dso_loader.cc:42] Successfully opened dynamic library libcurand.so.10.0
2020-03-15 20:48:13.632933: I tensorflow/stream_executor/platform/default/dso_loader.cc:42] Successfully opened dynamic library libcusolver.so.10.0
2020-03-15 20:48:13.635170: I tensorflow/stream_executor/platform/default/dso_loader.cc:42] Successfully opened dynamic library libcusparse.so.10.0
2020-03-15 20:48:13.641737: I tensorflow/stream_executor/platform/default/dso_loader.cc:42] Successfully opened dynamic library libcudnn.so.7
2020-03-15 20:48:13.642704: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1763] Adding visible gpu devices: 0
2020-03-15 20:48:13.642782: I tensorflow/stream_executor/platform/default/dso_loader.cc:42] Successfully opened dynamic library libcudart.so.10.0
2020-03-15 20:48:13.643788: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1181] Device interconnect StreamExecutor with strength 1 edge matrix:
2020-03-15 20:48:13.643812: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1187]      0 
2020-03-15 20:48:13.643822: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1200] 0:   N 
2020-03-15 20:48:13.644721: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1326] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 43 MB memory) -> physical GPU (device: 0, name: GeForce GTX 1080, pci bus id: 0000:82:00.0, compute capability: 6.1)
2020-03-15 20:48:14.290811: W tensorflow/compiler/jit/mark_for_compilation_pass.cc:1412] (One-time warning): Not using XLA:CPU for cluster because envvar TF_XLA_FLAGS=--tf_xla_cpu_global_jit was not set.  If you want XLA:CPU, either set that envvar, or use experimental_jit_scope to enable XLA:CPU.  To confirm that XLA is active, pass --vmodule=xla_compilation_cache=1 (as a proper command-line flag, not via TF_XLA_FLAGS) or set the envvar XLA_FLAGS=--xla_hlo_profile.
2020-03-15 20:48:14.296074: E tensorflow/stream_executor/cuda/cuda_driver.cc:828] failed to allocate 43.56M (45678592 bytes) from device: CUDA_ERROR_OUT_OF_MEMORY: out of memory
2020-03-15 20:48:14.296604: E tensorflow/stream_executor/cuda/cuda_driver.cc:828] failed to allocate 39.21M (41110784 bytes) from device: CUDA_ERROR_OUT_OF_MEMORY: out of memory
2020-03-15 20:48:14.378942: F tensorflow/stream_executor/cuda/cuda_driver.cc:175] Check failed: err == cudaSuccess || err == cudaErrorInvalidValue Unexpected CUDA error: out of memory

Here is my code:

def wrapper(lr, dr, eta):
    from tensorflow.keras.layers import Input, Dense, Dropout
    from tensorflow.keras.models import Model
    from tensorflow.keras.callbacks import TensorBoard, EarlyStopping, ModelCheckpoint
    from tensorflow.keras.optimizers import RMSprop, Adam, SGD
    from math import sqrt
    from tensorflow.keras import backend as K
    from hops import tensorboard
    from hops import hdfs

    activation='relu'
    seq_len=28
    n_features=1
    log_dir=tensorboard.logdir()
    adam_optimizer = Adam(lr=lr, decay=eta)    

    input_layer = Input(batch_shape=(config['batch_size'], seq_len * n_features))
    first_layer = Dense(30
                    , activation=activation
                    , name="first_dense_layer_1")(input_layer)

    first_layer = Dropout(dr)(first_layer, training=True)
    second_layer = Dense(15
                     , activation=activation
                     , name="second_dense_layer")(first_layer)
    out = Dense(n_features, name='output_layer')(second_layer)
    model = Model(input_layer, out)
    model.compile(loss='mean_squared_error', optimizer=adam_optimizer,)

    earlyStopping = EarlyStopping(monitor='val_loss', patience=50, verbose=1, mode='auto')
    mc = ModelCheckpoint(log_dir + 'best_model.h5', monitor='val_loss', mode='auto', verbose=1, save_best_only=True)
    tb_callback = TensorBoard(log_dir=tensorboard.logdir(), histogram_freq=0, write_graph=True, write_images=True)

    callbacks = [tb_callback]
    callbacks.append(earlyStopping)
    callbacks.append(mc)    

    history = model.fit(train_data, train_y
                  , epochs=config['epochs']
                  , callbacks=callbacks
                  , batch_size=config['batch_size']
                  , validation_data=(validation_data, validation_y)
                  , verbose=1
                  , shuffle=False)

   # save model
   model.save(log_dir + 'mlp_one_features_model.h5')
   model.save_weights(log_dir + 'mlp_one_features_weights.h5')    

   hdfs.log("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx evaluate ")
    train_score = model.evaluate(train_data, train_y, verbose=0, batch_size=config['batch_size'])
   hdfs.log('Train Score Loss: MSE')
   hdfs.log(str(train_score))

   test_score = model.evaluate(test_data, test_y, verbose=0, batch_size=config['batch_size'])
   hdfs.log('Test Score Loss: MSE: ')
   hdfs.log(str(test_score))

   return test_score

args_dict = {'lr': [0.001, 0.01, 0.1, 0.5], 
             'dr': [0.1, 0.36, 0.41, 0.5],
             'eta': [0, 0.1, 0.01]  
            }

experiment.grid_search(wrapper
                       , args_dict
                       , direction='min'
                       , name='32_MLP_ONE_FEATURE'
                       , local_logdir=True
                      )

Thanks for the code.

I took a look at it and it seems like your training function is missing some parts. Where is the definition of train_data, train_y, config, validation_data and validation_y?

In case you are defining these outside the wrapper, these definitions should be made inside the wrapper-function together with the rest of the training logic. The reason for that is, that this code (the wrapper) is being sent to the executors to train the single models of your grid search, so it needs to be self-contained and variables that are defined outside the scope of the wrapper are not being shipped to the executors.

The stack trace that you sent in your original post, was that the complete trace that was printed in Jupyter? Or are you running this as a job on Hopsworks? You can also send us the stdout/stderr of one of the Spark executors and the Spark driver. Furthermore, how many resources did you allocate to the driver and each executor (memory and cores)?

On which environment are you running this, hops.site or your own deployment?