Closed Bug 1598387 Opened 6 years ago Closed 4 years ago

Use streaming engine for pipeline Dataflow jobs

Categories

(Data Platform and Tools Graveyard :: Operations, enhancement)

enhancement
Not set
normal

Tracking

(Not tracked)

RESOLVED MOVED

People

(Reporter: klukas, Assigned: whd)

Details

It's my understanding :whd is planning to look transition Dataflow jobs to use streaming engine over the next few weeks. This should make autoscaling more efficient and may lead to better stability per Google's recommendations. May also significantly reduce cost since we can stop provisioning large numbers of persistent disks.

Initial testing of streaming engine on the decoder job in stage (on a very small subset of data) shows performance and latency to be significantly worse using streaming engine, at least in the test configuration. I'm going to run through a couple of different configurations and do some more debugging before asking Google to investigate for us. Notably I'm going to see if the live sink performs better as that's the costly piece that we were hoping would improve with streaming engine, and some logs indicate that maybe the issue with the decoder is related to geoip decoding OOMing which uses a large (>100MB) artifact.

I originally set the instance type and disk per recommendations but also tried using the same instance types as we're using, and both had the same issues.

Results for the telemetry live sink on 100% of data have been mixed to negative. Here are some of the runs:

Using recommended defaults

This job has been the most successful, settling on 322 vCPUs (161 n1-standard-2), and using no persistent SSD. This is approximately double the number of vCPUs the current job uses, which is not in line with expectations of reduced vCPU usage. There are likely little to no cost savings here since we pay roughly equal for persistent SSD as vCPUs, and this isn't accounting for the price of streaming engine itself. Scaling appears to be relatively infrequent and data freshness was significantly more variable than the prod job, but appeared to have settled to be in line with the prod job by the end of the run.

Using a recommendation to halve the size of the instance type if you're using custom instance types from this blog post
This uses n1-standard-4 instead since we currently use n1-standard-8. This job ended up using 400 vCPUs (100 n1-standard-4) and not scaling very much or often. Notably it ran from 6am-9am PT with increasing data freshness up to 3h latent (which would have caused alerting on the current jobs). I don't presently understand how this could be while still processing records at the nominal input rate, since metrics represents the latest data processed watermark and not the oldest unprocessed message.

Streaming engine seems to be sensitive to initial num workers settings. The job above that ran with 100 workers had 100 workers set as the default. Ramp up time when not specifying num workers (hence starting with 1 worker) can take a very long time (for instance, see my current run which has been running for over half an hour with a single worker but hasn't scaled up). I am guessing that the job is actually configured to disable autoscaling somehow but I can't confirm that. For comparison the current dataflow job starts with 50 workers, and usually settles between 30-40 n1-standard-8. To be fair, non-streaming-engine dataflow may also exhibit the inability to scale when under-provisioned on job start. Overall I have not seen any significant increase in autoscaling performance in any of the configurations I've tried so far, and is empirically usually worse. Cost is probably worse as well.

I'll continue to test a few other configurations but current results suggest this is not going to be a silver bullet for SSD cost we were hoping for.

One potentially good note: the jobs have all appeared to drain correctly, except for my initial tests with the decoder earlier in the week.

MaxMind DB memory usage

Currently, we initialize our GeoCity db reader by passing an InputStream that points at GCS. The MaxMind java api can operate in a mode where it loads the entire DB to memory or it can do memory mapping to a file on disk, but the constructor accepting an InputStream only allows loading the entire stream to memory.

We should change this code to copy the GCS object to local disk, and then construct the reader to hit that local file so that it can do memory mapping. I'd love to also make sure we only have one db reader per worker, but I don't think the Beam API allows hooks for constructing objects per worker and sharing between threads.

https://github.com/mozilla/gcp-ingestion/pull/1061 is now merged and should significantly reduce the amount of memory used by the MaxMind DatabaseReader. It may be worth going ahead and trying out streaming engine for the Decoder job again to see if memory usage is no longer a blocker.

I'll launch a test streaming decoder job in stage later today.

The test job on stage data (1%) is now working well. I'm going run it with some backlog tomorrow and then with a prod subscription to observe streaming engine efficacy at scale.

Streaming decoder on 100% of data is now running:
stage
prod for comparison

Some observations:
vCPU and memory are comparable, with streaming engine slightly smaller (this may be due to granularity of instance types)
persistent disk usage is significantly smaller on streaming engine (~30x, as expected)
latencies are comparable in the normal case, with autoscaling performance to be determined

Overall I expect that if the autoscaling performance and cost improve or remain the same (less PD usage will be offset by the streaming engine premium) we should be good to move forward with migrating the non-sink dataflow jobs to streaming engine. I'm aiming to run the decoder test in stage for a few more days before switching to streaming engine in prod next week.

Comparing the decoder job running this week in stage, we see consistently (slightly) better autoscaling performance and lower compute and memory usage. The premium for using streaming engine amounts to about the same cost as is saved by the lowered compute usage, so I plan to move forward with migrating to streaming engine for non-sink jobs next week.

https://github.com/mozilla/gcp-ingestion/compare/streaming_engine
https://github.com/mozilla-services/cloudops-infra/compare/data_streaming_engine

I plan to roll these out on Tuesday, assuming our schemas woes are fully resolved.

We also have a meeting with Google scheduled for Wednesday to discuss BQ live sinks and streaming engine. I had a test job running for them to analyze, but it is very expensive to run (scales up to max workers and doesn't scale down) so I've disabled it for now. We should have a clearer story on how/whether to continue using dataflow with live sinks after that meeting.

Finally getting back to this, but I'm seeing once again that streaming engine is not playing nicely with the decoder: https://console.cloud.google.com/dataflow/jobs/us-west1/2020-02-11_15_51_05-3559276725899219957?project=moz-fx-data-beam-nonprod-b5f4&organizationId=442341870013, where essentially no data is processed and errors include lots of "processing stuck for 5m". I am trying to bisect the differences between the 100% test in https://bugzilla.mozilla.org/show_bug.cgi?id=1598387#c7 and the stage/production config in https://bugzilla.mozilla.org/show_bug.cgi?id=1598387#c8 but haven't found anything meaningful as yet.

Not doing explicitly but if it happens with Flex templates or dataflow prime this work will happen here. Closing this bug.

Status: NEW → RESOLVED
Closed: 4 years ago
Resolution: --- → MOVED
Product: Data Platform and Tools → Data Platform and Tools Graveyard
You need to log in before you can comment on or make changes to this bug.