Closed Bug 1302264 Opened 8 years ago Closed 7 years ago

Verify all downstream consumers can handle new telemetry s3 object format

Categories

(Cloud Services Graveyard :: Metrics: Pipeline, defect, P1)

defect

Tracking

(Not tracked)

RESOLVED FIXED

People

(Reporter: whd, Assigned: whd)

Details

(Whiteboard: [SvcOps])

Attachments

(5 files)

Namely, the switch from per-record snappy compression to whole-object gzip compression. The consumers are at the least heka/hindsight based reporting from ATMO/airflow and the current and possibly future versions of the spark APIs for accessing telemetry records. In addition to making sure all current jobs support the new format, make sure performance is acceptable.
As I've been working through this verification work I have come across the following issues and proposed solutions.

Large integer literals are being generated in the new infra, and support for these in JSON libraries is not standard.

In the current edge infra (cjson) we simply truncate these values in lua/c. The new parser (rjson) can support and the client appears to be generating values even greater than can fit in unsigned 64 bit integers.

The Scala/Java libraries appear to not have issues dealing with these large numbers, but our Python parser (ujson) throws errors. My solution for this is to use a fallback parser (the standard python json lib) for when ujson fails on these large numbers, as the standard parser, while slow, does not throw an exception.

Another issue is that the way we remove objects from the main payload creates shadow NULL structures that make doing dict merging more difficult.

This wastes some space but is overall not a huge issue because its workaround is fairly simple: we can assume that the new submission field is the first heka field that is parsed (which is incidentally true because of the way the code is structured). I came across this issue only because I was seeing the fields set to NULL due to the following issue.

We are using bytes fields instead of string fields for the extracted JSON fields.

Since we don't currently support bytes fields in telemetry-batch-view and python-moztelemetry, the fields as above that would normally take precedence over the NULLs in the submission were not being used. From my testing the fix is simple: we can assume that bytes streams are utf8-encoded and convert them to strings. This is predicated on us not using actual bytes streams for anything other than strings on the analysis side of things, which I think is safe. In some cases we do have binary data in bytes fields (such as landfill) but those will not be processed by downstream consumers.
I've got the updated PRs up for the remainder of the work here.

Performance on both the python and scala versions seems to be roughly 10-15% slower on gzip than snappy, which is in line with what we expected for switching to gzip compression. We should be moving to zstd compression in q1-q2 which will boost new infra performance considerably.

Of note, small jobs such as the sync parquet job take considerably (~4x) longer than their old infra counterparts in my current testing. This appears to be for two reasons:

We partition in kafka by client id in the old infra. An unintended consequence of this is that messages without a client id such as the sync pings all wind up in the same kafka partition, which means they are all processed by a single DWL node. This is convenient for batching but means we have potentially unevenly distributed data in kafka, and a potential SPOF on the consumer side. We can investigate whether we want to continue to do this in the new infra, but for now it means there are roughly 5x the number of objects in s3 in the new infra for data that does not have a client id. This being unique to small data sets, the performance overhead is negligible compared to the other, following issue.

To distribute data evenly when object sizes vary considerably, we perform grouping by object size using a (arbitrary?) size threshold. The smaller object size causes our code to partition the data into a smaller number of groups (2 vs 7), fewer than the number of cores available on a single node. When changing the size threshold or letting spark do the partitioning across more cores (16) performance is better, in line with the 10-15% performance hit as above.

Next steps are:

Fix and merge PRs
Test a few of the larger parquet generation jobs
Cut over the metadata prefixes and send an email to fhr-dev about the migration
Decommission the old infra

I plan on doing this over the course of the next week.
The PRs have landed. Bug #1335219 was introduced by these changes but didn't affect scheduled jobs and has been fixed. I'm marking this as done.
Status: NEW → RESOLVED
Closed: 7 years ago
Resolution: --- → FIXED
Product: Cloud Services → Cloud Services Graveyard
You need to log in before you can comment on or make changes to this bug.

Attachment

General

Created:
Updated:
Size: