Prototype validation and transformation pipeline for storing UT pings as Parquet datasets

RESOLVED INVALID

Status

Cloud Services
Metrics: Pipeline
RESOLVED INVALID
2 years ago
2 years ago

People

(Reporter: Sam Penrose, Assigned: Sam Penrose)

Tracking

Firefox Tracking Flags

(Not tracked)

Details

(Whiteboard: [loasis])

(Assignee)

Description

2 years ago
Approach:
- I'm using the JSON-Schemas at [1] and writing new conversion routines at [2].
- The conversion routines won't be an all-purpose library. In order to deal with impedance mismatches you have to put extra information in the JSON-Schema.
- I'm taking advantage of file references in JSON-Schema, which is not a concept that Avro has. The generated Avro schema will be one big file.
- [1] should be moved to mozilla-services or just mozilla's account. I can't create repos there.

[1] https://github.com/SamPenrose/mozilla-pipeline-schemas

[2] https://github.com/SamPenrose/data-pipeline/tree/v4-baseset-schema
(Assignee)

Comment 1

2 years ago
This is going to be a little harder than we hoped. A single flat optional object in the ping:

  "optional": {"key": "value"}

is three layers of nesting in Avro:

  "name": "optional",
  "type": {
     ["null",
       {"type": "record"
         "name": "meaningless_placeholder",
         "fields": {
           "name": "key",
           "type": "string"
         }
       }
      ],
      "default": null
   }

If "value" is not a string but an object you get five layers, and so on. The interaction with extracting that information from JSON-Schema reference files has slowed me down, but the real problem is that just as this didn't work:

  https://github.com/vitillo/telemetry-batch-view/blob/longitudinal/src/main/scala/streams/Longitudinal.scala#L63

and my JSON-Schema doesn't try to cope:

  https://github.com/SamPenrose/mozilla-pipeline-schemas/blob/master/telemetry/main-v1.schema.json

taking the exact same approach with a static Avro file isn't going to work. We have to decide what we're going to cut off of the pings, and we have to filter the pings down to that, *before* we use static Avro declarations. Roberto's dynamic Avro appears to be hiding the complexity inside strings:

  .name("keyedHistograms").`type`().array().items().stringType().noDefault()
  ...
  .set("keyedHistograms", sorted.map(x => x.getOrElse("payload.keyedHistograms", "").asInstanceOf[String]).toArray)

(am I missing something?) so maybe Roberto meant that the Avro files I generate should declare some trees in the ping to be strings, and then we re-JSON-encode them? Or are we:

  1) Walking the JSON in an RDD or Heka or whatever.
  2) Generating one or more clean subsets per ping.
  3) Saving those subsets as Parquet with a clean simple Avro schema.

The problem is not specifying a clean structure for data. The problem is generating data which maps to any clean structure at all. Can either of you tell me how we are going to approach that? Then I can know how to write the Avro files, or whether we should be doing this at all.
Flags: needinfo?(rvitillo)
Flags: needinfo?(mreid)
(Assignee)

Comment 2

2 years ago
Here's an attempt to answer my own needinfo: although they interact, the diversity is a bigger problem than the optionality. I still believe that schemas are straightforward once you've cleaned the data and useless until you've cleaned it, and we haven't defined "clean" yet. Proposal:

1) I'll grab a sample of pings (10K? 1M?) and clean the data by deleting addons, histograms, childPayloads, and other sources of diversity that are revealed by inspection.

2) I'll process them in Spark to match a schema that has no optionality: missing dimensions will be added with a default value. I can add an array of strings that identifies which dimensions have been added.

3) I'll save that to Parquet. I forget whether Avro can be extracted at that point or if I have to write it, but either way we give Parquet what it needs.

4) We see how useful and performant the Parquet files are.

5) If we pass 4, next steps in some order:

  - move the processing out of PySpark to Scala and/or Heka
  - make dimensions optional if that is more effective
  - define a JSON Schema that matches step 1) (and maybe drives it) for use upstream from Spark
  - scale to 1% of clients
  - think about what we're going to do with the big nasty blobs

How does that sound?
The final schema for the longitudinal dataset is going to look very different as the one you mentioned. There are some Spark bugs (e.g. SPARK-4502) that we have to circumvent to guarantee good performances during read time. After running some benchmarks with different layouts it became clear to me that we can't design the schema beforehand for that particular derived dataset. As I am working on generating the longitudinal dataset I am going to come up with a proper schema for it as well.

What we are looking for right now is a schema for the master dataset, which is our source of truth. That schema would mainly be used to validate incoming pings. The schema should also be convertible to Avro and used to store the master dataset with Parquet.
Flags: needinfo?(rvitillo)
(Assignee)

Comment 4

2 years ago
(In reply to Roberto Agostino Vitillo (:rvitillo) from comment #3)
> The final schema for the longitudinal dataset is going to look very
> different as the one you mentioned. There are some Spark bugs (e.g.
> SPARK-4502) that we have to circumvent to guarantee good performances during
> read time. After running some benchmarks with different layouts it became
> clear to me that we can't design the schema beforehand for that particular
> derived dataset. As I am working on generating the longitudinal dataset I am
> going to come up with a proper schema for it as well.

Wonderful.

> What we are looking for right now is a schema for the master dataset, which
> is our source of truth. That schema would mainly be used to validate
> incoming pings. The schema should also be convertible to Avro and used to
> store the master dataset with Parquet.

By "master dataset" do you mean the whole ping, or a defined subset? When we "validate" do we edit the ping to conform to a schema with minimal flexibility, or do we simply mark the ping as conforming or not and put it in a schema with thousands of nested optional records?
(In reply to Sam Penrose from comment #4)

> By "master dataset" do you mean the whole ping, or a defined subset?

I mean the whole ping.

> When we "validate" do we edit the ping to conform to a schema with minimal
> flexibility, or do we simply mark the ping as conforming or not and put it
> in a schema with thousands of nested optional records?

Let's discuss this during the meeting today.
(Assignee)

Comment 6

2 years ago
We met and restructured the project as follows:

  1) Validate the incoming stream with JSON-Schema, taking advantage of its "semantic" validation.
    i. I.e. clientId is not just a string, it's a UUID-4
  2) Transform valid pings into a flatter structure that Spark/Parquet can handle more easily.
  3) See if the schema that DataFrames auto-create is smart enough to use as our Avro schema.
  4) If it isn't, generate a schema for the output of step 2) that uses Avro Maps for histograms, addons, and other areas of great key diversity.
  5) Save the output of either 3) or 4) as Parquet and evaluate it's performance.

Next steps:

  - Sam is going to tackle 1)
  - I think Roberto is going to work on the Parquet performance of the hardest parts of the ping -- please correct.
  - Mark will join when he finishes his current P1

Please correct any points I have mis-remembered or omitted. Thanks!
Flags: needinfo?(mreid)
Summary: Generate Avro schemas for storing telemetry pings in Parquet files → Prototype validation and transformation pipeline for storing UT pings as Parquet datasets

Comment 7

2 years ago
seems like we need to peel off individual bugs for this?
(Assignee)

Comment 8

2 years ago
(In reply to thuelbert from comment #7)
> seems like we need to peel off individual bugs for this?

+1 to individual bugs and generally structuring this as a project with deliverables.

I have a simple JSON Schema that tests for clientIds being UUIDs and submissionDates being in the correct format. I've run it against 200,000 pings from 20160117 and every single one passed, which is reasonable giving how few fields I am checking. Spark is a PITA with development code, so I plan to focus on demonstrating progress with data management (as opposed to factoring repos properly, decomposing schemas into smaller files, etc.)

Updated

2 years ago
Whiteboard: [loasis]

Comment 9

2 years ago
see other bugs as needed
Status: NEW → RESOLVED
Last Resolved: 2 years ago
Resolution: --- → INVALID
You need to log in before you can comment on or make changes to this bug.