Closed Bug 1357277 Opened 9 years ago Closed 8 years ago

Deploy telemetry-streaming to production

Categories

(Data Platform and Tools :: General, enhancement, P1)

enhancement
Points:
3

Tracking

(Not tracked)

RESOLVED FIXED

People

(Reporter: whd, Assigned: whd)

Details

(Whiteboard: [SvcOps])

Repository: https://github.com/mozilla/telemetry-streaming Among other things this involves setting up build and provisioning logic that does not rely on external resources, deploy logic, determining a method for performing code updates, and exposing the streaming system's output to the appropriate places. Logs should be made available using Logging 2.0 and/or via EMR logs in s3. I believe the hard deadline for having this in production is the end of May.
Priority: P1 → P2
Priority: P2 → P1
I have been somewhat remiss in updating this bug, so here's a (rather large) update. Progress here: https://github.com/mozilla-services/cloudops-deployment/compare/telemetry_streaming?expand=1 I will outline the two major deployment cases that I am working towards. One is to update the application code, and the other is to update the EMR cluster for e.g. an EMR cluster failure or a Spark upgrade. In the case of an application failure there is also the need to roll back to a previously built stack. I am designing the deployment structure to support these various cases. I have a pretty good idea how this should work, but I am still ironing out some details. The deployment strategy is as follows. We maintain a separate checkpoint location per application version. This is to facilitate the rollback and cluster changes described above. Depending on whether S3 has acceptable latency, we can use EMRFS for this, or otherwise either run an S3 sync from HDFS to S3 or myriad other options (see [1][2][3]). Our batch window appears to be 5 minutes, so I'm hoping EMRFS will suffice. Within the application we run a background thread that checks for a marker object on S3, and if it sees the object, it initiates a graceful shutdown. This is to deal with issues related to stopping streaming applications mid-batch and when moving to a new cluster. The main parameters to the build logic are application version (VERSION), EMR stack version (EMRSTACK), and previous application version (PREVIOUS, optional). The relevant assets for a particular build are located at: JAR: BUCKET/VERSION/telemetry-streaming.jar Kafka checkpoints: BUCKET/VERSION/checkpoints Shutdown flag: BUCKET/VERSION/shutdown Here's a verbose rendering of the deployment steps and the various associated cases. To create the first EMR cluster and streaming job, the deployment logic will perform the following steps (VERSION=1, PREVIOUS=undef, EMRVERSION=1). 1. Build BUCKET/VERSION/telemetry-streaming.jar, if necessary Since BUCKET/1/telemetry-streaming.jar does not exist, it is created 2. Create EMRSTACK cluster if necessary Since EMR stack 1 does not exist, it is created 3. Create BUCKET/PREVIOUS/shutdown and wait for EMR step completion, if necessary Since PREVIOUS is unset, this is unnecessary 4. Write kafka checkpoints from BUCKET/PREVIOUS/checkpoints to BUCKET/VERSION/CHECKPOINTS, if necessary Since PREVIOUS is unset, this is unnecessary 5. Run an EMR Spark step on EMRSTACK pointing at BUCKET/VERSION/telemetry-streaming.jar, BUCKET/VERSION/checkpoints As there are no checkpoints in BUCKET/1/checkpoints, the application will start from the latest offsets and write its offsets to BUCKET/1/checkpoints 6. Clean up any bad artifacts generated by PREVIOUS (manual process) This is unnecessary To update the application code on the existing EMR cluster, the deployment logic will perform the following steps (VERSION=2, PREVIOUS=1, EMRSTACK=1). 1. Build BUCKET/VERSION/telemetry-streaming.jar, if necessary Since BUCKET/2/telemetry-streaming.jar does not exist, it is created 2. Create EMRSTACK cluster if necessary Since EMR stack 1 already exists, this is unnecessary 3. Create BUCKET/PREVIOUS/shutdown and wait for EMR step completion, if necessary Since PREVIOUS is specified, write the shutdown flag to BUCKET/1/shutdown. The application will read the shutdown flag and stop after finishing the current batch. The EMR step will signal success. Poll EMRSTACK for cluster status until it is WAITING (application shut down). Remove BUCKET/1/shutdown. 4. Write kafka checkpoints from BUCKET/PREVIOUS/checkpoints to BUCKET/VERSION/CHECKPOINTS, if necessary Since PREVIOUS was specified, and BUCKET/2/checkpoints does not exist, write BUCKET/1/checkpoints to BUCKET/2/checkpoints 5. Run an EMR Spark step on EMRSTACK pointing at BUCKET/VERSION/telemetry-streaming.jar, BUCKET/VERSION/checkpoints The application will start from BUCKET/2/checkpoints, which is where PREVIOUS left off 6. Clean up any bad artifacts generated by PREVIOUS (manual process) This is unnecessary Now let's assume application version 2 is bad and we need to roll back to version 1. The following are the steps to accomplish that (VERSION=1, PREVIOUS=2, EMRSTACK=1). 1. Build BUCKET/VERSION/telemetry-streaming.jar, if necessary Since BUCKET/1/telemetry-streaming.jar exists, this is unnecessary 2. Create EMRSTACK cluster if necessary Since EMR stack 1 already exists, this is unnecessary 3. Create BUCKET/PREVIOUS/shutdown and wait for EMR step completion, if necessary If EMR stack 1 is WAITING, this is unnecessary. If the job does not respond to the shutdown flag after a predetermined time, kill the cluster and build a new one (EMRSTACK is incremented until a cluster version is found that does not exist, which in this case is 2). At this point there should be a cluster that is WAITING. Remove BUCKET/2/shutdown if it was created. 4. Write kafka checkpoints from BUCKET/PREVIOUS/checkpoints to BUCKET/VERSION/CHECKPOINTS, if necessary Since BUCKET/1/checkpoints exists, this is unnecessary 5. Run an EMR Spark step on EMRSTACK pointing at BUCKET/VERSION/telemetry-streaming.jar, BUCKET/VERSION/checkpoints The application will start from BUCKET/1/checkpoints, where it left off 6. Clean up any bad artifacts generated by PREVIOUS (manual process) Depending on the failure mode of version 2, this may involve deleting objects from S3 Finally, in the case where we need to deploy a new EMR cluster, the following are the steps (VERSION=3, PREVIOUS=1, EMRSTACK=3): 1. Build BUCKET/VERSION/telemetry-streaming.jar, if necessary Since BUCKET/3/telemetry-streaming.jar does not exist, it is created 2. Create EMRSTACK cluster if necessary Since EMR stack 3 does not exist, it is created 3. Create BUCKET/PREVIOUS/shutdown and wait for EMR step completion, if necessary Since PREVIOUS is specified, write the shutdown flag to BUCKET/1/shutdown. The application will read the shutdown flag and stop after finishing the current batch. The EMR step will signal success. Poll EMRSTACK for cluster status until it is WAITING (application shut down). Remove BUCKET/1/shutdown. 4. Write kafka checkpoints from BUCKET/PREVIOUS/checkpoints to BUCKET/VERSION/CHECKPOINTS, if necessary Since PREVIOUS was specified, and BUCKET/3/checkpoints does not exist, write BUCKET/1/checkpoints to BUCKET/3/checkpoints 5. Run an EMR Spark step on EMRSTACK pointing at BUCKET/VERSION/telemetry-streaming.jar, BUCKET/VERSION/checkpoints The application will start from BUCKET/3/checkpoints, where PREVIOUS left off 6. Clean up any bad artifacts generated by PREVIOUS (manual process) This is unnecessary So that's the general flow which I am working towards implementing. This has taken longer than anticipated, and I now expect to finish this work next week or the week after. One pain point I've encountered is that Spark SQL streaming 2.1.0 doesn't appear to be production ready for our use case, as when building and running the application myself I've come across at least 3 bugs that are fixed by Spark 2.1.1 ([4][5][6]). Two of these were discovered by :rvitillo and subsequently patched, and the third is related to my usage of EMRFS for checkpointing. Unfortunately this makes deployment much more difficult because EMR only supports up to 2.1.0 currently. :mdoglio gave me access to the dev version that uses SBT and custom JAR paths, but unfortunately it was killed by reaper before I could take a proper look and :mdoglio is currently rebuilding it. I need access to a working dev deployment in order to figure out how to build a version that works with the production deployment strategy detailed above. NIing :mdoglio to let me know when the dev cluster is up and running again. [1] https://stackoverflow.com/questions/42006664/apache-spark-structured-streaming-s3-checkpoint-support [2] https://aseigneurin.github.io/2016/05/07/spark-kafka-achieving-zero-data-loss.html [3] https://stackoverflow.com/questions/36987875/writing-spark-checkpoints-to-s3-is-too-slow [4] https://issues.apache.org/jira/browse/SPARK-19677 [5] https://issues.apache.org/jira/browse/SPARK-19407 [6] https://issues.apache.org/jira/browse/SPARK-19268
Component: Metrics: Pipeline → Datasets: General
Flags: needinfo?(mdoglio)
Product: Cloud Services → Data Platform and Tools
And it's up again! It's called MissionControl and you can access it using the dataops keys. It's currently running on spark 2.1.1 and it's using sbt to launch the job.
Flags: needinfo?(mdoglio)
I've finally got the Spark 2.1.1 patching logic in place via a spark step, and am able build the application into a fat jar that actually runs via spark-submit [1], is able to use HDFS/EMRFS, and can restart without failure. I've decoupled the EMR cluster generation from the step deploy logic; the former is mostly complete and the jar build logic in the latter is complete. I still need to do, among other things: finish the deploy portion of the EMR step deploy logic, performance test EMRFS vs HDFS, determine syncing logic based on said testing, add shutdown flag support to the application, make the application checkpoint configurable, deploy production stack assets, and add monitoring/spark-specific stats to the WIP datadog dashboard [2][3]. I am now reasonably confident that this will be deployable to production some time next week. [1] https://github.com/mozilla/telemetry-streaming/compare/prod?expand=1 [2] http://docs.datadoghq.com/integrations/spark/ [3] https://app.datadoghq.com/dash/290159/telemetry-streaming
:whd that sounds awesome! \o/ Once you are done, how complicated is it to create a dev/test instance with the same setup?
(In reply to Mauro Doglio [:mdoglio] from comment #4) > Once you are done, how complicated is it to create a dev/test instance with > the same setup? It is low effort, and in fact I am considering using dev as the staging environment since in stage we do not currently have a kafka cluster that receives live data.
This has been somewhat slow going and I expect now to have this ready for production next week. There are many small issues such as https://issues.apache.org/jira/browse/SPARK-10789 and https://github.com/mozilla/telemetry-streaming/commit/a7933b5ef59759843b7f4b0980f9fb0a7079fe8f that have taken a long time to debug and resolve, but I am now at the point I thought I was at the end of last week: a jar that is runnable via spark-submit that actually works and fully utilizes the EMR cluster. I moved the stage environment to dev. It's now available as TelemetryStreaming with output to s3://net-mozaws-dev-us-west-2-data-telemetry-streaming/test4/. I think for the first pass I am going to leave the application code more or less as-is, and instead set spark.streaming.stopGracefullyOnShutdown to true and yarn.nodemanager.sleep-delay-before-sigkill.ms to a sufficiently high value to allow the program to exit gracefully. The sentinel logic will live as a separate cron that polls for a shutdown object in S3 and uses yarn to kill the application container (alternately, set spark.yarn.submit.waitAppCompletion and use a separate EMR step to shut down the application). In the first iteration checkpoints will be shared among application versions running on the same EMR cluster and not across EMR clusters, unless my testing of EMRFS with consistent view ends up working out better than it did without it. This means we will not be resilient to EMR or HDFS total failures, but we will be alerted in these cases and can reprocess the data for affected days if necessary. I'm also now assuming that the checkpointing information will be portable across application upgrades, which isn't necessarily true. In the interest of getting this running and because it's both an assumption that has held true so far and one that will be immediately apparent from dev deploys, I think making this assumption is acceptable for the first past. I have a spark-stats specific dashboard for dev running at https://app.datadoghq.com/dash/298014/, which will supplement or be merged with the other WIP dashboard with generic stats. Some links about yarn and spark streaming with particular discussion about shutdown: https://mkuthan.github.io/blog/2016/09/30/spark-streaming-on-yarn/ https://stackoverflow.com/questions/31684323/what-is-the-correct-way-to-start-stop-spark-streaming-jobs-in-yarn
After several iterations I've landed at the following implementation. It makes some simplifying assumptions that the original implementation in comment #1 doesn't, but it is actually running and can eventually be improved upon to include some of the elements I've omitted in the interest of celerity. The deploy step builds the application jar and runs an EMR step on the previously built EMR cluster, and this step enters a PENDING state if a previous version of the job is running. I've in theory hooked the repository up to auto-deploy the master branch in dev (cluster is TelemetryStreaming) but this may need more tweaking to work correctly. I then have a sentinel service on the EMR master that reads the pending job queue and kills the currently running application. This is done via yarn, and I haven't debugged how cleanly this shutdown occurs, but the application will shut down and the EMR cluster will enter the WAITING state. EMR will then start the next step, which runs a wrapper script to spark-submit called telemetry-streaming, which reads the latest kafka offsets from the previous run's checkpoint directory and uses those to start the new application (which uses a different checkpoint directory). This is because checkpoint data I have determined empirically is indeed not necessarily portable across application versions. When the graceful shutdown issues are resolved this should become a non-issue because there won't be much state to save between versions, but as it stands there may be some data blips on deploys due to bad shutdown behavior. The current version of the application I'm using in dev is the current master after https://github.com/mozilla/telemetry-streaming/pull/51 was merged. However, the application appears to no longer processes any data (it is still running and reading from kafka however). Previously I was applying the changes in the above PR to https://github.com/mozilla/telemetry-streaming/commit/d7fc30eed4e8bb5c5e6823b8c4b5c67babffe201, which worked. The only intervening changes are from https://github.com/mozilla/telemetry-streaming/pull/41 and https://github.com/mozilla/telemetry-streaming/pull/41#issuecomment-306186513 looks suspiciously similar to my problem. :mdoglio (NI), what was the resolution of the above issue? I need to determine if this is an application issue or a clustering/provisioning one before proceeding with tests in production. Speaking of the which, my plan is to deploy this to production early next week with output to the test production bucket for tuning with the production workload. After the remaining kinks are ironed out I will switch it to point at the production data bucket, and we can consider this first phase complete.
Flags: needinfo?(mdoglio)
The issue I had in https://github.com/mozilla/telemetry-streaming/pull/41#issuecomment-306186513 was due to an error in the definition of the incoming data schema. Basically all the pings had been rejected because they didn't pass validation. I don't think that's your case, but you can verify it by passing the --raiseOnError parameter when you start up the job. The job should fail on the first validation error and a stacktrace should appear in the logs. In order to make this kind of problems easier to debug, I just changed the log4j configuration to report a summary of each query executed e.g. 17/06/09 09:30:14 INFO StreamExecution: Streaming query made progress: { "id" : "64d55d61-86aa-4c93-aa3a-d0b1950dcd68", "runId" : "eb9e08ab-421d-4e2a-ac9b-45306c13963f", "name" : null, "timestamp" : "2017-06-09T09:26:42.000Z", "numInputRows" : 157724, "inputRowsPerSecond" : 606.8828590337525, "processedRowsPerSecond" : 742.3738227140295, "durationMs" : { "addBatch" : 212318, "getBatch" : 6, "getOffset" : 86, "queryPlanning" : 22, "triggerExecution" : 212459, "walCommit" : 22 }, "eventTime" : { "avg" : "2017-06-09T09:24:30.288Z", "max" : "2017-06-09T09:26:40.506Z", "min" : "2017-06-09T09:22:18.142Z", "watermark" : "2017-06-09T09:21:20.473Z" }, "stateOperators" : [ { "numRowsTotal" : 24866, "numRowsUpdated" : 19368 } ], "sources" : [ { "description" : "KafkaSource[Subscribe[telemetry]]", "startOffset" : { "telemetry" : { "92" : 58405926, --- data truncated --- "0" : 58403377 } }, "endOffset" : { "telemetry" : { "92" : 58407496, --- data truncated --- "0" : 58404954 } }, "numInputRows" : 157724, "inputRowsPerSecond" : 606.8828590337525, "processedRowsPerSecond" : 742.3738227140295 } ], "sink" : { "description" : "FileSink[s3n://telemetry-parquet/error_aggregates/v1]" } }
Flags: needinfo?(mdoglio)
Regarding your idea of using a different checkpoint location for each version of the job, I'm not sure how that is gonna work. The checkpoint directory contains not only the last kafka offsets read, but also a snapshot of the data being aggregated. I don't think we want to loose that. There are a couple of cases that we should consider to handle updates correctly: 1- Addition of a new stats column: in https://github.com/mozilla/telemetry-streaming/issues/19 I verified that the job recovers successfully in this case provided that checkpoint and s3 metadata folders are available. This is the common case. 2- Addition of a new dimension column: I haven't verified this yet, but I suspect this is not going to be easy to handle. Given that this is not a common case we can solve it bumping up the dataset version and eventually backfill the data. Not a big deal because the job can be also executed as a daily batch. Anyway, this is tracked by https://github.com/mozilla/telemetry-streaming/issues/35 and I'll try to tackle it in the next few weeks.
According to https://databricks.com/blog/2017/05/18/taking-apache-sparks-structured-structured-streaming-to-production.html you are right and I'm wrong. The databricks people suggest to turn on a new streaming job if you change significant parts of the code, like the output schema. In any case, we still have the batch job option as a backup to fill eventual gaps in the data if something goes wrong.
When I add the raiseOnError flag the exception that is raised is "Doctype should be one of main,crash", but also in this case the application exits afterwards (instead of consuming the kafka stream without actually processing it). I will continue to investigate this later today.
raiseOnError makes the job stop on the first failure and report the error. That Doctype error sounds legitimate and it's probably showing up because the stream contains non main/crash pings, which is fine. Did you update your code to latest master? Can you see some info like the one in comment 8 ?
I poked at this over the course of the afternoon and have not come to a conclusion on what is going wrong. I restarted the telemetry-streaming process on the dev "MissionControl" box managed by :mdoglio and it now exhibits the same issue I see on the box managed by me "TelemetryStreaming", which is now running but not processing data (numInputRows : 0). I am not sure why this is. Notably the code running via sbt on "MissionControl" appears to be an active development version and not necessarily representative of what's in master, but as I didn't change anything except for restarting the process I am not sure why it is acting up. I tried removing the checkpoint data but that didn't help. I have been able to get variants of the application working at times while cherry picking in the checkpointPath and outputPath options into a copy of the no longer working version on MissionControl, but I'm not sure what causes the application to work or not. I also sometimes get stack overflow errors and the dreaded 0 input rows processed but no failures, even with --raiseOnError. At this point, I can't get the application to reliably running via fat jar/yarn or sbt. I'm kind of stumped at this point, but I will continue to look into it next week. :mdoglio can you take a look at the MissionControl dev box and see if you can figure out what I did to cause it to fail? Notably, I tried simply rebuilding a version of the application from the branch point I mentioned in comment 7 with my jar and checkpoint changes and even that wasn't working. If I still can't find what's up I will start testing all the previous jar builds to find the last one that was working, and try to see how that one was built. These jars are available at s3://net-mozaws-prod-us-west-2-ops-rpmrepo-apps/pipelines/telemetry-streaming/.
I found a few issues: - running the job from sbt doesn't work anymore because of [1]. I guess I need to add the spark dependencies to the lib folder? If we can't find a way to differentiate the build config between dev and prod, it may be sufficient to add some instructions to the README. - running the job from sbt doesn't work anymore because of [2]. This should be probably reverted. - with the above commits reverted, the job runs but it seems to be stuck. I change the log verbosity in log4j.properties to DEBUG and I see it keeps spitting the following lines: [3]. I bet something is off with kafka, which would explain why both the dev job and the soon-to-become prod cannot process data. [1] https://github.com/mozilla/telemetry-streaming/commit/783220fa63a7b5396ac966cb69d5da17b7bbcf6f [2] https://github.com/mozilla/telemetry-streaming/commit/096935ea5bccf32fccd888094ca15c9bec2d1d8e [3] https://pastebin.mozilla.org/9024361
The kafka issues should be resolved; thanks to :mdoglio for catching that. I'll add better monitoring for dev at some later point. We had a meeting today where we decided we're going to hold off on fixing the "run from sbt" issues until after I can get production properly working. That said, regarding the possible reversion of https://github.com/mozilla/telemetry-streaming/commit/096935ea5bccf32fccd888094ca15c9bec2d1d8e: I made a note of the ramifications of removing that line in the PR that introduced it. Instead of reverting the change we should probably make the master address configurable. I spent several hours today tracking down why the application runs successfully via sbt in certain environments, but not others. I finally determined that https://github.com/mozilla/emr-bootstrap-spark/blob/master/ansible/files/bootstrap/telemetry.sh#L225 is the operative line that I was missing, since the production configuration doesn't run the analysis bootstrap script nor are EMR steps run via a hadoop user's interactive shell (however, part of the inconsistency in my results has been because I have been debugging using both methods). I can now (finally!) reliably start the application in standalone mode from any cluster. The next step is to get clustering working again, which sadly still isn't working. I anticipate that this involves tweaking a few more YARN parameters. Assuming that goes well I'll have the production test set up tomorrow.
I've gotten the master version of the application to run in clustered mode on YARN. However, I've been testing node failure scenarios and am beginning to suspect that HDFS failures cannot be handled gracefully with Spark in EMR (see links). This is unfortunate, because it means the application basically can't be HA on EMR, but I am going to stop working on HA configuration and instead focus on making it at least work under optimal conditions. I will deploy a cluster to production tomorrow. I also suspect that part of the reason the application sometimes stops outputting data has something to do with application version/checkpoint changes in the checkpoint location (hdfs) that are distinct from the output sink's (s3) "_spark_metadata" folder. I will dig into that later. https://issues.apache.org/jira/browse/SPARK-20608 https://community.hortonworks.com/questions/9586/spark-in-yarn-with-namenode-ha.html https://stackoverflow.com/questions/24762672/how-does-apache-spark-handles-system-failure-when-deployed-in-yarn
Two updates: it is indeed a mismatch between the temporary state stored in s3 _spark_metadata and not the checkpoint state in hdfs that is causing the application to process the kafka stream but not output parquet data. This metadata folder is presumably to allow idempotency (de-duping) in the case where the application crashes and no application version change is in progress. For this it works well, but has been the source of many hours of debug time to determine, and obviously it doesn't play nicely with application version changes. We could deal with this in a few ways, one of which is to bump the output prefix on every version change that would cause the state to become inconsistent. This is probably technically the correct thing to do, but this effectively creates a new presto table and will require backfilling each time we do it (even e.g. when a new output parquet schema is backwards-compatible). For now the option I'm going to employ is to blow away the temporary state in s3 on application upgrades, just as I am already doing for the checkpoint path. This has the potential 5 minute data gap as discussed in this bug but we've deemed that acceptable for the first pass. The second update is that in the time it's taken me to get the production configuration set up, Amazon has released EMR 5.6.0 with Spark 2.1.1 [1], which means I don't have to patch Spark anymore. This is advantageous because it's no longer a hack and e.g. at minimum the debugging UIs will work properly now. I've left the logic for patching spark in but disabled. Hopefully we never use it again! [1] https://aws.amazon.com/about-aws/whats-new/2017/06/updates-to-apache-spark-and-in-transit-encryption-for-presto-in-amazon-emr-release-5-6-0/
The service is now running in production, on 2 c3.2xls (comfortably) and with output to s3://net-mozaws-prod-us-west-2-pipeline-data/error_aggregates. I've still got some monitoring, cleanup, and review work to do but the first pass is now mostly complete. I made a README for spinning up a dev environment using the deployment logic I've been working on: https://github.com/mozilla-services/cloudops-deployment/blob/telemetry_streaming/projects/data/ansible/README-dev.md. It should be possible to e.g. run backfill by spinning up such an instance. Until I have all the CI stuff set up I want to use the dev cluster that is provisioned by jenkins as "stage" and have actual development/backfill done on a separate cluster.
I'm closing this. There's still more work to do to make it a truly production service, for which separate bugs can be filed.
Status: NEW → RESOLVED
Closed: 8 years ago
Resolution: --- → FIXED
Component: Datasets: General → General
You need to log in before you can comment on or make changes to this bug.