Closed Bug 1169534 Opened 9 years ago Closed 9 years ago

Spark crash

Categories

(Cloud Services Graveyard :: Metrics: Pipeline, defect)

defect
Not set
blocker

Tracking

(Not tracked)

RESOLVED FIXED

People

(Reporter: rvitillo, Assigned: rvitillo)

References

Details

Brendan reported a notebook [1] that causes Spark to crash with the following misterious error:

Py4JJavaError: An error occurred while calling o70.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 186 in stage 2.0 failed 4 times, most recent failure: Lost task 186.3 in stage 2.0 (TID 237, ip-10-37-146-226.us-west-2.compute.internal): ExecutorLostFailure (executor 6 lost)
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


The same notebook did work flawlessy just a week ago, so it seems like the configuration of the machines on AWS changed somehow.

[1] http://nbviewer.ipython.org/gist/vitillo/c06ef3fd7b8e77a6ab33
It seems this issue is related to Bug 1158175 as I switched to the new bucket with the per build-id splits 3 days ago (see https://github.com/vitillo/python_moztelemetry/commit/3993419601b49cc61d4614ae67dcbdf46cf989fe).

The issue is reproducibile on a single machine Spark cluster with 16 workers. Even a simple count on the RDD fails because the machine runs out of memory when reading and parsing the Heka files. What might be happening is that when each of the 16 workers is reading and parsing a rather big file, memory blows up.

Up to the 16th of May there are about ~300 files per day with an average size of 100M, while after the 16th we have over 9000 files per day with an average size of ~3.5M. That seems rather odd considering that the backfilled dates should all land in the same ballpark. Wesley, Mark, do you have any thoughts?

Brendan, please use only only dates after the 16th for now and let me know if you keep seeing failures.
Flags: needinfo?(whd)
Flags: needinfo?(mreid)
The Pipeline's S3 output has two parameters that control the file size in S3: max_file_size and max_file_age.

Files are flushed to S3 as soon as they reach the max. size or age (age is determined using the server clock).

We recently increased the max_file_size from 25MB to 300MB, with the specific aim of creating fewer larger files in S3.

During the back-processing to add the buildId dimension, we process a large number of messages in a given time period on the server, so we collect many more messages before we hit max_file_age than we do when processing messages in "real time".

So it is not surprising that there are fewer, larger files during the backprocessing period than afterwards.

Is it possible to read files in such a way that it does not require 16 x (max file size) all at once?
Flags: needinfo?(whd)
Flags: needinfo?(mreid)
(In reply to Mark Reid [:mreid] from comment #2)
> Is it possible to read files in such a way that it does not require 16 x
> (max file size) all at once?

I will look into it.
(In reply to Roberto Agostino Vitillo (:rvitillo) from comment #1)

> Brendan, please use only only dates after the 16th for now and let me know
> if you keep seeing failures.

Hey guys, I am getting a similar error on a script that was working last week, now using
submission_date=("20150520","20150525")

http://nbviewer.ipython.org/gist/bcolloran/bdc3218920621ebd4bbe


Py4JJavaError: An error occurred while calling o105.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4942 in stage 1.0 failed 4 times, most recent failure: Lost task 4942.3 in stage 1.0 (TID 5842, ip-10-37-6-174.us-west-2.compute.internal): ExecutorLostFailure (executor 5 lost)
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Blocks: 1169103
Severity: normal → blocker
cc'ing Katie, Benjamin and John--

If Roberto would be interested in having assistance (I don't want to put words in his mouth, because he and I have not talked about this yet), I wonder if there is anyone that could be recruited to help support Roberto's work on the Spark system. I bring this up for a couple reasons:
(1) At least for now, I think that the Telemetry Self-Serve Data Analysis Spark cluster infrastructure is the pretty much the only window that we have into FHR v4, so when that system is not working it's a hard blocker on the v4 data quality work I've been doing (bug 1169103), which is in turn a blocker on getting v4 into 40.
(2) Again, Roberto and I have not spoken about this, so I don't want to put words in his mouth, but if I were him I think I'd prefer to work on analyses than be responsible for supporting users and hunting down bugs, and I don't think he should have to shoulder that responsibility alone.

Anyway, if Roberto is open to having help, I think it'd make sense to have some more engineering eyes focused on making sure that our Spark setup for FHR v4 is "production grade" as we draw closer to 40.
Flags: needinfo?(rvitillo)
Brendan, I certainly welcome any help but I am not sure we have the manpower for it. 

There are several things going on with your notebook:

1) After max_file_size was increased from 25MB to 300MB on S3, a handful of files (after the 16th), happens to contain a much larger number of pings. 

When I fetch a list of filenames from the index, I distribute those to the workers as partitions. By itself having files of different sizes is not as bad as it looks like, as the workers are pulling a file only when they finish processing one.

What tends to happen though is that bigger files (i.e. partitions with larger files) take longer to process and the cluster might get in a situation where all workers are processing a big file. That again is generally not an issue, unless each worker is using a lot of memory, which is what happens in your case. 

This explains why your job was working before max_file_size was increased. After a conversation with Mark it seems unlikely that we will reduce max_file_size as the change is required by Heka to handle the load on release.

2) In your notebook you are shuffling lots of data around, a lot more than advised:

pingsByPingId = pings.map(lambda p: (p.get("id","MISSING"), [{k:p[k] for k in p.keys() if k!="meta"}]))

What that line does is to shuffle a copy of each ping. That snippet connected with my previous point generates a situation where each worker has in its working set a “big” list of parsed submissions. 

You should shuffle only small subsets of submissions. Furthermore, after that shuffle, you cache the dataset in memory. That’s also less than ideal as you shouldn’t cache the full submissions but only a subset of those, or remove the caching layer entirely as the dataset is unlikely to fit in memory and will spill to disk.

I understand that in this particular case you want the full pings though. One way to get around that could be to use the “repartition” method (see [1]). The RDD of pings you get back from get_pings has one partition per file. By invoking rdd.repartition(N), you will repartition your RDD in N partitions, and you can make the partition as small as you like. The smaller the partition, the smaller the working set size of the worker processing that partition (i.e. number of pings to process) and the less likely to run out of memory. The caveat is that repartitioning a dataset requires a shuffle, which is slow.

In general, you should keep an eye at memory and cpu usage as there are some corner cases that generate cryptic errors like yours for apparent no good reason. You can monitor the cluster with e.g. htop on a single machine and by opening the cluster dashboard with e.g. “lynx localhost:4040” on the master node.

In conclusion I feel like the use case of shuffling around whole submissions is a rare one and for now we can address it with the above suggestions.

[1] https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD
Flags: needinfo?(rvitillo)
Forgot to mention: you could also use a hash of the ping, or a hash of the main sections of the ping (e.g. payload), instead of the full pings to compare if submissions with the same id differ. Once you have some ids for which you know you that the pings differ, you could then fetch only those submissions.
Thanks for the pointers Roberto! I'll try these out!
I have implemented a chunked read for S3 files which should alleviate the issue in the future.
Status: NEW → RESOLVED
Closed: 9 years ago
Resolution: --- → FIXED
Product: Cloud Services → Cloud Services Graveyard
You need to log in before you can comment on or make changes to this bug.