Closed Bug 1468351 Opened 6 years ago Closed 6 years ago

Longitudinal Failing due to OOM on Executors

Categories

(Data Platform and Tools Graveyard :: Datasets: Longitudinal, defect, P1)

defect

Tracking

(Not tracked)

RESOLVED FIXED

People

(Reporter: frank, Assigned: frank)

References

(Blocks 1 open bug)

Details

Attachments

(2 files)

It looks like avro was updated to 1.8.2 from 1.7.7, which caused this failure. The new avro version is bundled with some existing jar we use.

Stacktrace:
java.lang.NoSuchMethodError: org.apache.avro.Schema.getLogicalType()Lorg/apache/avro/LogicalType;
	at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:178)
	at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:130)
	at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:227)
	at org.apache.parquet.avro.AvroSchemaConverter.convertFields(AvroSchemaConverter.java:124)
	at org.apache.parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:115)
	at org.apache.parquet.avro.AvroParquetWriter.writeSupport(AvroParquetWriter.java:144)
	at org.apache.parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:106)
	at com.mozilla.telemetry.parquet.ParquetFile$.serialize(ParquetFile.scala:25)
	at com.mozilla.telemetry.views.LongitudinalView$$anonfun$24.apply(Longitudinal.scala:228)
	at com.mozilla.telemetry.views.LongitudinalView$$anonfun$24.apply(Longitudinal.scala:205)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Component: Datasets: General → Datasets: Longitudinal
Update: We fixed the initial issue with https://github.com/mozilla/telemetry-batch-view/commit/14741db20dd3873b94944b8238dfc48a003c744d

However, we've run into a separate OOM error that is unrelated. Keeping this bug open until the runs are complete.
After looking over the memory usage of a failed cluster, it's not using even half of the allotted amount. My theory is then that this is happening due to the bad greedy partitioning strategy. I ran with the partition-packing strategy on a 30 node cluster but it failed due to OOM (actually used all of the cluster memory), so I'm retrying now on a 40 node cluster [0].

If anyone has to pick this up later, note you can login with the dataops key.

[0] https://us-west-2.console.aws.amazon.com/elasticmapreduce/home?region=us-west-2#cluster-details:j-305HVI9UXHYDA
Note that I sporadically ran into this Spark bug after the 2.3 upgrade for longitudinal: https://issues.apache.org/jira/browse/SPARK-22371?attachmentOrder=asc

Obvious solution is to downgrade to 2.2 until 2.31 is available in EMR.
Quick update: Using the partition-packing strategy on a 40-node cluster ran "successfully", except every client was ignored. It's not clear to me why. If you ssh into the machine at hadoop@ec2-52-89-52-144.us-west-2.compute.amazonaws.com and attach to the running tmux session you can see what the return was for the recent run. The spark.log is loaded with >30k lines, but I see no errors towards the end (it included multiple runs, the previous ones being failures - apologies for the overload).
Note that it was running in /mnt, using github.com/fbertsch/telemetry-batch-view, on branch long_test (which had the partition-packing strategy rather than greedy algorithm).
It looks like 20180623 also failed
I'm having trouble reproducing this bug now. It looks like the last 3 longitudinals succeeded. Robert, Ryan - does this suffice for your use cases? Is it acceptable if two historic longitudinal versions are not available? (I want to be clear here that that does not mean data is missing, since the most recent version will always have full histories of clients).
Flags: needinfo?(ryanvm)
Flags: needinfo?(robert.strong.bugs)
I spoke too quickly - the job says succeeded but there is no data (thanks Ryan for letting me know!). These compounding issues are making me worried about how timely a fix will be inbound (obviously it hasn't been very timely so far).
Flags: needinfo?(ryanvm)
Flags: needinfo?(robert.strong.bugs)
Here is the error from the run last Tuesday

AnalysisExceptionTraceback (most recent call last)
<ipython-input-12-247f6bd3d5f7> in <module>()
----> 1 summaryDF = sqlContext.sql(summary_sql)

/usr/lib/spark/python/pyspark/sql/context.py in sql(self, sqlQuery)
    382         [Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
    383         """
--> 384         return self.sparkSession.sql(sqlQuery)
    385 
    386     @since(1.0)

/usr/lib/spark/python/pyspark/sql/session.py in sql(self, sqlQuery)
    554         [Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
    555         """
--> 556         return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
    557 
    558     @since(2.0)

/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     67                                              e.java_exception.getStackTrace()))
     68             if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
     70             if s.startswith('org.apache.spark.sql.catalyst.analysis'):
     71                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)

AnalysisException: u'Path does not exist: s3://telemetry-parquet/longitudinal/v20180707;; line 1 pos 435'
Hey Frank, as I understand things this data set has been inaccessible for over a month. Are we planning on getting this up and running again soonish?
Flags: needinfo?(fbertsch)
(In reply to Jim Mathies [:jimm] from comment #10)
> Hey Frank, as I understand things this data set has been inaccessible for
> over a month. Are we planning on getting this up and running again soonish?

Hey Jim, thanks for the note. We are hoping to get this up and running, but to be clear, the kind of error this is throwing up only occurs in production, when running on a 40-node cluster for 4+ hours. As such debugging is a difficult and tedious process! We will hopefully have some updates soon.

Given the magnitude of this outage and the difficult of fixing, we'll be accelerating plans for replacing longitudinal with something more maintainable. There a few details in bug 1391308 if you're interested in taking a look.
Points: 2 → 5
Flags: needinfo?(fbertsch)
Aha! A breakthrough. This error is being raised:

Caused by: java.lang.NullPointerException
        at com.mozilla.telemetry.views.LongitudinalView$.getElemType(Longitudinal.scala:726)
        at com.mozilla.telemetry.views.LongitudinalView$.getStandardHistogramSchema(Longitudinal.scala:722)
        at com.mozilla.telemetry.views.LongitudinalView$.histograms2Avro(Longitudinal.scala:771)
        at com.mozilla.telemetry.views.LongitudinalView$.com$mozilla$telemetry$views$LongitudinalView$$buildRecord(Longitudinal.scala:1030)
        at com.mozilla.telemetry.views.LongitudinalView$$anonfun$24$$anonfun$25.apply(Longitudinal.scala:218)
        at com.mozilla.telemetry.views.LongitudinalView$$anonfun$24$$anonfun$25.apply(Longitudinal.scala:217)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.TraversableOnce$FlattenOps$$anon$1.hasNext(TraversableOnce.scala:464)
        at scala.collection.Iterator$class.isEmpty(Iterator.scala:330)
        at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1336)
        at scala.collection.TraversableOnce$class.nonEmpty(TraversableOnce.scala:111)
        at scala.collection.AbstractIterator.nonEmpty(Iterator.scala:1336)
        at com.mozilla.telemetry.views.LongitudinalView$$anonfun$24.apply(Longitudinal.scala:232)
        at com.mozilla.telemetry.views.LongitudinalView$$anonfun$24.apply(Longitudinal.scala:212)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2017-07-14 ran successfully. We will backfill the previous versions, closing this out.
Status: NEW → RESOLVED
Closed: 6 years ago
Resolution: --- → FIXED
We spoke to soon, 2018-07-14 _almost_ ran successfully - there is data available in s3. What actually happened is most of the executors wrote their parquet files, but a few specific partitions kept going OOM. This took long enough that eventually the entire job timed out, causing those files to sit in s3 without ever being deleted.

We should be aware moving forward that Timeouts on Longitudinal will cause partial writes, but luckily not success in Airflow.

I'm repurposing this bug because it seems that most people who care about longitudinal are already cced.
Status: RESOLVED → REOPENED
Resolution: FIXED → ---
Summary: Longitudinal Failed on 20180610 → Longitudinal Failing due to OOM on Executors
This OOM is from the following (deserializing an Array using Kryo). Longitudinal uses mapPartitions, which may be more limited in the amount of memory it's allowed compared to the amount of data it processes (as compared to a traditional Map, where each executor would handle many partitions):

java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOfRange(Arrays.java:3664)
	at java.lang.String.<init>(String.java:207)
	at com.esotericsoftware.kryo.io.Input.readString(Input.java:484)
	at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:195)
	at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:184)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790)
	at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
	at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790)
	at com.twitter.chill.TraversableSerializer.read(Traversable.scala:43)
	at com.twitter.chill.TraversableSerializer.read(Traversable.scala:21)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790)
	at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:278)
	at org.apache.spark.serializer.DeserializationStream.readValue(Serializer.scala:158)
	at org.apache.spark.util.collection.ExternalSorter$SpillReader.org$apache$spark$util$collection$ExternalSorter$SpillReader$$readNextItem(ExternalSorter.scala:558)
	at org.apache.spark.util.collection.ExternalSorter$SpillReader$$anon$5.hasNext(ExternalSorter.scala:587)
	at scala.collection.Iterator$$anon$1.hasNext(Iterator.scala:1004)
	at org.apache.spark.util.collection.ExternalSorter$$anon$2.next(ExternalSorter.scala:384)
	at org.apache.spark.util.collection.ExternalSorter$$anon$2.next(ExternalSorter.scala:375)
	at scala.collection.Iterator$$anon$12.next(Iterator.scala:444)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at com.mozilla.telemetry.views.ClientIterator.next(Longitudinal.scala:42)
	at com.mozilla.telemetry.views.ClientIterator.next(Longitudinal.scala:25)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.TraversableOnce$FlattenOps$$anon$1.hasNext(TraversableOnce.scala:464)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at com.mozilla.telemetry.parquet.ParquetFile$.serialize(ParquetFile.scala:33)
There could be two additional causes here:
1. Do we have some client or clients with histories much longer than we've seen before?
2. Have client payload sizes increased by large amounts?

The former could be a cause because we first deserialize every record for a client before deciding if we should discard it (we only store the past 1000 pings in longitudinal).

The latter could be the cause because this happens before we transform to a row, in which case it's deserializing the entire ping, whether or not we use it to build the record.
I have confirmed both of the above hypotheses (though (2.) to a lesser extent [0] [1]. The first iteration of a fix will be to try and trim long histories before unpacking them, so that the heap space isn't filled with unused pings.

[0] Clients with the most pings: https://sql.telemetry.mozilla.org/queries/57453/source
[1] Main Ping Size over the past 6 months: https://sql.telemetry.mozilla.org/queries/57452#149787
Depends on: 1478160
Blocks: 1475242
Blocks: 1478805
Depends on: 1482414
Blocks: 1469546
No longer blocks: 1469546
I've temporarily paused the longitudinal DAG in Airflow to conserve resources for this job (the failure is currently resulting in the job timing out and retrying the max number of attempts).

We should re-enable it when it's back to normal.
Any status on this? Last I checked the data is missing for 6/9/18, 6/23/18, 6/30/18, 7/7/18, 8/25/18, and 9/1/18.
Hey Robert, Thanks for commenting. We've been trying every trick in the book to get longitudinal running, but the errors continue to arise. The yarn containers fail due to OOM, and the job continues to retry until it times out. The previous PRs fixed some issues but we continue running into new ones.

We recommend that you start looking into moving your dashboard to use a different datasource, for example, clients_daily. Feel free to reach out if you think you could use some help on that front!
Depends on: 1490041
This is fixed and the data is now current. Thank you everyone for your patience while we resolved this.

We have also fixed bug 1478805 by rerunning every longitudinal since 03/01, such that they will no longer be missing clients.
Status: REOPENED → RESOLVED
Closed: 6 years ago6 years ago
Resolution: --- → FIXED
Product: Data Platform and Tools → Data Platform and Tools Graveyard
You need to log in before you can comment on or make changes to this bug.

Attachment

General

Created:
Updated:
Size: