Closed Bug 1568042 Opened 5 years ago Closed 5 years ago

Backfill production stable BigQuery tables with a year of data

Categories

(Data Platform and Tools :: General, task, P1)

task
Points:
5

Tracking

(Not tracked)

RESOLVED FIXED

People

(Reporter: amiyaguchi, Unassigned)

References

Details

The raw tables in BigQuery are being populated by a BigQuery sink in gcp-ingestion. All of the documents are represented by schemas in mozilla-services/mozilla-pipeline-schemas. These raw tables effectively replace the previous system of accessing raw pings in AWS (through the scala/python mozetelemtry APIs).

Not having a year of data is a regression, with respect to the AWS data lake. We should backfill the BigQuery tables with heka-protobuf data.


Here is one approach to provide 1 year (365 days) of backfill to the raw tables in BigQuery.

First, we transfer heka-protobuf data for the last 365 days from AWS to GCP into a localized storage bucket.
Everything that follows can be billed in GCP.

From here, there are a few approaches to backfill, which have varying costs.

  1. Replay data into PubSub via hindsight and reprocess via the BigQuery sink
  2. Use moztelemetry to support a heka-protobuf source and reprocess via the BigQuery sink
  3. Use moztelemetry to support a heka-protobuf source, reprocess into Avro, and use a BigQuery external data source.

(1) may incur massive overhead costs relative to the stated goal.
(2) can be done iteratively on a per-doctype basis. Cost of backfilling main pings should approximately match a year of backfill on main_summary.
(3) is a more complex (2). JSON Schema transpiler and Avro Encoder are designed around BigQuery compatibility.

The raw heka-protobuf data should be deleted once reprocessing has been performed and validated.

Especially for main ping, I suspect that an external Avro table would be quite slow to access since we'd always need to read out all columns.

(2) feels like a sweet spot. Do you have a clear idea of what (2) looks like? Would we have a spark-based step to turn heka-protobuf into ndjson files on GCS, then feed those through the BQ sink, or do you think we can use the moztelemetry library directly in gcp-ingestion?

The ideal situation is to import com.mozilla.telemetry.heka directly into gcp-ingestion, after modifying S3Store object to read from GCS and modifying the paths in the Dataset API. We might be able to write an alternative Dataset API that gives us a PCollection[Message] instead of a RDD[Message].

A less than ideal situation is to rewrite moztelemetry and reimplement the decoding routines. We might have to go this way if we want to avoid extra serialization/deserialization that comes with writing to GCS.

There's an ndjson version of a single day of main pings that I've reserialized using the moztelemetry API. It takes several hours to process a single day of data -- I imagine reprocessing into BigQuery will be several hours on top of that initial protobuf to json conversion.

Type: defect → task
Points: --- → 5
Priority: -- → P1
Blocks: 1572026
Depends on: 1579056
Depends on: 1579083

This seems like the canonical bug to update. I've renamed it to match precisely the intent to have the _stable tables include a year of data (as opposed to e.g. the payload_bytes_{raw,decoded} tables).

My current understanding from my work so far and conversations with :amiyaguchi and :klukas is the following:

We have a 1% sample of data (i.e. telemetry-sample-2) available in GCS at gs://moz-fx-data-prod-data/telemetry-sample-2.
We have a single full day of telemetry data (i.e. telemetry-3/20190831) available in GCS at gs://moz-fx-data-prod-data/telemetry-3/20190831.

We are in the midst of implementing a new beam input that will read heka-protobuf files from the above object store, and convert them into a representation that can be output to bigquery. It is presently undetermined whether this representation will need to pass through the decoder step (i.e. whether the result is more akin to input from edge or output from decoder) as part of the input, but the output is expected to be BQ sink-compatible . There are some considerations to be made here in implementation, including metadata propagation, potential "double validation" inefficiency, and the fact that there is data in the AWS data lake that unless specifically excluded while processing (or is otherwise invalid due to schema strictness evolution over time) may not be considered valid in the GCP pipeline.

The above code work is being tracked in bug #1572070 and https://github.com/mozilla/gcp-ingestion/pull/781 and the code being developed there should apply to all pings (not just crash pings). It is also presumed to be preparatory to further work on bug #1579083.

When this code is ready, we will run batch jobs (probably by day, but potentially by doctype/docversion) with input as spec'd and with output to the _live tables. We will need to consider concurrency carefully here. We also need to consider whether using a separate project with slots will improve performance and reduce cost. The performance characteristics of the beam job will likely also have an effect on how we decide to proceed.

We will then need to finalize these live tables into their final _stable destinations, which is currently done by query. We once again need to consider concurrency carefully here: since this query is known to take about an hour to run every day, running these query jobs serially for a year of data would potentially take weeks. It is worth investigating whether deduplication can be done on BQ output in beam for batch jobs (writing directly to _stable tables), saving the extra step and associated BQ scan costs. :klukas will investigate this.

Notably, we're not planning on backfilling the payload_bytes_decoded tables, except for a single special case (prio v4 pings). We will run a separate one-off beam job for backfilling prio that writes the payload_bytes_decoded representation which should be extremely cheap due to the low volume of that ping. Generally, we expect that any jobs that need to read from the payload_bytes_decoded tables (e.g. mozaggregator) will not need history predating the GCP epoch.

We're waiting to hear back from Google on their recommendations and time/cost estimates for the amount of data we're planning to copy. From my recent copying of a single day of data, the current estimate for copying a year of data is about a week. I'm therefore inclined to avoid performing the full copy of data, which I originally planned for this week, until we have more information from Google and a better understanding of job performance and bottlenecks in processing GCP-side. We want to avoid spending money storing this data in heka-protobuf where it is reasonable to avoid that cost. Given the relative shortness of time it takes to import data, it may even make sense to make the unit of work for backfilling include import and delete (i.e. import data from AWS, run beam job to write to BQ, run query to populate stable table, delete intermediate data).

We are independently working on https://github.com/mozilla/jsonschema-transpiler/issues/87, and its resolution may result in our backfilling of data beyond the current GCP epoch for consistency with that (hopefully final) change to schema transpiling. The decision of what to do here (we could in theory rewrite the data from GCP without using heka-protobuf from AWS) does not block any other steps outlined above.

Separately to the above code path for telemetry will be the process by which we import structured/D2P datasets. The amount of data is expected to be relatively small and inexpensive to process. Depending on the availability of that separate code path and procedure, the timeline for copying data for those datasets will likely differ from telemetry. I will consider this separately after discussing it with :klukas.

Summary: Backfill production raw BigQuery tables with a year of data → Backfill production stable BigQuery tables with a year of data

Let's move discussion of the backfill code here rather than keeping it in bug 1572070.

Update from my end: work continues on supporting heka ingestion via the bigquery sink job in PR #781. I have it successfully parsing a valid heka blob and outputting it to standard out via this invocation:

Hammersmith:ingestion-beam wlach$ ./bin/mvn compile exec:java -Dexec.args="\
    --inputType=heka \
    --input=ingestion-beam/tmp/test_telemetry_snappy.heka \
    --outputFileFormat=text \
    --outputType=stdout \
    --errorOutputType=stderr"

I am still working through a few issues with my code with Jeff, but think it should be ready to run on GCP very soon, unless I'm missing something.

We have a successful Dataflow job configuration now, so the general problem of parsing heka is now solved. We are moving on to prepping for validation and tuning the performance of the job in the next few days. We are at this point aiming to start a larger scale backfill on Tuesday of next week, targeting the full month of 2019-08. If that is successful and passes validation, we will continue backwards in time from there.

This project has been delayed due to persistent OOM (out of memory) exceptions causing jobs to fail, but we have now found a solution by reducing the size of buffers allocated for GCS uploads (see the relevant case with Google support).

This removes the main blocker and I anticipate we can start the large-scale backfill by the end of the week.

Depends on: 1588181

We have two cases still open with Google, but we are now able to achieve sufficient throughput to finish this effort within the next two weeks. We have run the first dataflow jobs with final production configuration today. We will continue to tune the job if we get more information from Google.

We are starting with 2019-08 and will be proceeding backwards in time month by month through 2018-10.

We now have a month and a half of backfill data fully staged, deduplicated, and ready to copy into the prod stable table. I am going to message data users to let them know that this data is going to start becoming available and that some of their query patterns may need to change.

It got ugly in the end, but it is finished.

2019-03-27 is the day that simply would not complete successfully. The Dataflow job staged everything to temp files in GCS, but was issuing a single giant BQ load job that couldn't complete. I ended up issuing load jobs by hand on the temp files to get that tied up. I have updated Google about those difficulties and am awaiting more communication from their engineering team about ways forward for future backfills with better stability.

I am sending out an email to fx-data-dev to announce that things are complete, and will also post to #fx-metrics.

Status: NEW → RESOLVED
Closed: 5 years ago
Resolution: --- → FIXED
You need to log in before you can comment on or make changes to this bug.