Closed
Bug 1401816
Opened 7 years ago
Closed 7 years ago
Heavy Users failing with "too large frame"
Categories
(Data Platform and Tools :: General, defect, P1)
Data Platform and Tools
General
Tracking
(Not tracked)
RESOLVED
FIXED
People
(Reporter: frank, Assigned: frank)
Details
Attachments
(2 files)
The full error is: "spark org.apache.spark.shuffle.FetchFailedException too large frame".
Initial attempts at increasing spark.sql.shuffle.partitions and spark.default.partitions did not solve the issue. Will attempt on a larger cluster tomorrow with more partitions.
I'm attaching the query plan, and looking at the stack trace it indicates the problem is happening during Full Outer SortMergeJoin.
Assignee | ||
Comment 1•7 years ago
|
||
Assignee | ||
Comment 2•7 years ago
|
||
Assignee | ||
Comment 3•7 years ago
|
||
I've confirmed that bumping up `spark.sql.shuffle.partitions` does increase the number of partitions before the SortMergeJoin, but had limited effect on the size of this one partition.
I'm wondering now if there is some inherent skew in the data. This doesn't make a whole lot of sense because it is:
1. 1 row per client
2. Hash partitioned on client-id
Will continue investigating.
Assignee | ||
Updated•7 years ago
|
Assignee: nobody → fbertsch
Priority: -- → P1
Assignee | ||
Comment 4•7 years ago
|
||
Looking into the distribution of data using the HashPartitioning scheme seen in the Physical Query Plan. Code for that is here: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala#L265
Assignee | ||
Comment 5•7 years ago
|
||
I made the job process each partition separately. With consistent hashing, heavy_users is split up into N partitions (currently at 16), all with a range of consecutive sample_ids. I made the job process each partition serially; in this way, it only reads those sample_ids from main_summary, then reads the previous days partition, and then joins them together to get the current active_ticks total.
It seems to be running, but is *much* too slow. If I ran this in parallel I think it would be most performant, I'll try that tomorrow.
Assignee | ||
Comment 6•7 years ago
|
||
Running in parallel results in the same "frame too large" exception. I've outputted the interim data to parquet in a previous job, and will try to run the hash partitioning function to find out which partition is having the issue.
Assignee | ||
Comment 7•7 years ago
|
||
Looking at the stages, there most Shuffle outputs are in the hundreds of MB. There are two, however, that are much more: 2.6GB and 2.9GB.
What is odd is that they did not *read* more data than other shuffle tasks. They read 306MB and 732MB respectively, about the same of every other shuffle task. Indeed, every other shuffle task wrote out less than they read in.
Assignee | ||
Comment 8•7 years ago
|
||
Currently running the job but having it write out each of those N partition runs into parquet, before the caching. Since the caching worked on the previous run, the error happens in the task after the fact - i.e. one of the too-large shuffle blocks is a cached block. This indicates that we can write those cached blocks to disk.
Two of these should be much larger than the others.
Assignee | ||
Comment 9•7 years ago
|
||
Well this was a fun one: tl;dr - don't trust the clients.
The issue revolved around this line [0]. I assumed that profileCreationDate would be stable; that for a single client, they would only have one. That was incorrect - some clients report a hundred pings a day, all with different profileCreationDates.
This caused an exponential increase in the number of rows for that client. This is because of the join that the current day's heavy_users table does with the previous day's heavy_users table. The first day, a client had 100 different rows. Those 100 rows are joined with the 100 rows on the next day; resulting in 10000 rows. Then the next day, another 100 rows; now 1000000. The fourth day was when the problem really came to a head, when the shuffle block sizes grew too large for a single machine. Obviously it didn't matter how many shuffle blocks I made, because that single client's pings were still going to a single shuffle block.
Fix is at [1]. Running backfill now.
[0] https://github.com/mozilla/telemetry-batch-view/blob/eba35d67db7783087d29221c76ebbb5690216435/src/main/scala/com/mozilla/telemetry/views/HeavyUsersView.scala#L208
[1] https://github.com/mozilla/telemetry-batch-view/commit/1cc6c59e18a62595712bd8b2c88e8d57edb52ee4
Status: NEW → RESOLVED
Closed: 7 years ago
Resolution: --- → FIXED
Updated•2 years ago
|
Component: Datasets: General → General
You need to log in
before you can comment on or make changes to this bug.
Description
•