Closed Bug 1431777 Opened 7 years ago Closed 7 years ago

Merge experiments_daily into clients_daily

Categories

(Data Platform and Tools :: General, enhancement, P1)

enhancement
Points:
3

Tracking

(Not tracked)

RESOLVED FIXED

People

(Reporter: mreid, Assigned: relud)

References

Details

Attachments

(2 files)

The experiments_daily job has been failing for some time - we should add some more logging and figure out how to fix it.
Assignee: nobody → dthorn
Points: --- → 1
Priority: -- → P1
I found an issue with a missing column, which I filed a PR to fix: https://github.com/mozilla/python_mozetl/pull/184 After that I finally managed to produce a traceback for this issue, which I think is the source of the problems: Traceback (most recent call last): File "/tmp/tmp.Qbu9wyxawM/runner.py", line 2, in <module> cli.entry_point(auto_envvar_prefix="MOZETL") File "/mnt/anaconda2/lib/python2.7/site-packages/click/core.py", line 722, in __call__ return self.main(*args, **kwargs) File "/mnt/anaconda2/lib/python2.7/site-packages/click/core.py", line 697, in main rv = self.invoke(ctx) File "/mnt/anaconda2/lib/python2.7/site-packages/click/core.py", line 1066, in invoke return _process_result(sub_ctx.command.invoke(sub_ctx)) File "/mnt/anaconda2/lib/python2.7/site-packages/click/core.py", line 895, in invoke return ctx.invoke(self.callback, **ctx.params) File "/mnt/anaconda2/lib/python2.7/site-packages/click/core.py", line 535, in invoke return callback(*args, **kwargs) File "build/bdist.linux-x86_64/egg/mozetl/experimentsdaily/rollup.py", line 102, in main File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 691, in parquet File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__ File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o445.parquet. : org.apache.spark.SparkException: Job aborted. at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:215) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:173) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92) at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:438) at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:474) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:217) at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:509) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.SparkException: Job 2 cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:861) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:859) at scala.collection.mutable.HashSet.foreach(HashSet.scala:78) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:859) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1929) at org.apache.spark.util.EventLoop.stop(EventLoop.scala:83) at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1842) at org.apache.spark.SparkContext$$anonfun$stop$8.apply$mcV$sp(SparkContext.scala:1921) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1317) at org.apache.spark.SparkContext.stop(SparkContext.scala:1920) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:108) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:188) ... 45 more
Points: 1 → 2
Points: 2 → 3
Priority: P1 → P2
dropping the missing probe works as expected. `results.repartition(1000).write...` didn't fix anything. Writing one experiment at a time takes 42 minutes to extract the list of experiment ids, and fails to finish writing even a single experiment before failing. writing with `partitionBy('experiment_id')`. I'm open to suggestions here, but all I can really tell is that this job seems unreasonable slow.
Mark, what do you think about trying to do this in scala (from telemetry-batch-view instead of python_mozetl)? iiuc scala actually runs in java, instead of using a coprocess like py4j. The exception above looks to me like the failure is happening in java, and is potentially being hidden behind the py4j connection. From what I see in experiments_daily, the job isn't actually that complex, so it should be easy enough to copy. Biggest issue is that I'd be duplicating EXPERIMENT_FIELD_AGGREGATORS.
Flags: needinfo?(mreid)
adding my most recent traceback, it looks like this is coming from a lost connection to a slave, or maybe a lost connection to java: Traceback (most recent call last): File "/tmp/tmp.V0yg1H9PMU/runner.py", line 2, in <module> cli.entry_point(auto_envvar_prefix="MOZETL") File "/mnt/anaconda2/lib/python2.7/site-packages/click/core.py", line 722, in __call__ return self.main(*args, **kwargs) File "/mnt/anaconda2/lib/python2.7/site-packages/click/core.py", line 697, in main rv = self.invoke(ctx) File "/mnt/anaconda2/lib/python2.7/site-packages/click/core.py", line 1066, in invoke return _process_result(sub_ctx.command.invoke(sub_ctx)) File "/mnt/anaconda2/lib/python2.7/site-packages/click/core.py", line 895, in invoke return ctx.invoke(self.callback, **ctx.params) File "/mnt/anaconda2/lib/python2.7/site-packages/click/core.py", line 535, in invoke return callback(*args, **kwargs) File "build/bdist.linux-x86_64/egg/mozetl/experimentsdaily/rollup.py", line 92, in main File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 691, in parquet File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__ File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o447.parquet. : org.apache.spark.SparkException: Job aborted. at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:215) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:173) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92) at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:438) at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:474) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:217) at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:509) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 68 in stage 5.0 failed 4 times, most recent failure: Lost task 68.3 in stage 5.0 (TID 39780, ip-172-31-25-21.us-west-2.compute.internal, executor 16): ExecutorLostFailure (executor 16 exited caused by one of the running tasks) Reason: Slave lost Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1690) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1678) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1677) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1677) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:855) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1905) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1860) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1849) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:188) ... 45 more
(In reply to Daniel Thorn [:relud] from comment #3) > Mark, what do you think about trying to do this in scala (from > telemetry-batch-view instead of python_mozetl)? iiuc scala actually runs in > java, instead of using a coprocess like py4j. The exception above looks to > me like the failure is happening in java, and is potentially being hidden > behind the py4j connection. From what I see in experiments_daily, the job > isn't actually that complex, so it should be easy enough to copy. Biggest > issue is that I'd be duplicating EXPERIMENT_FIELD_AGGREGATORS. I'm thinking that maybe the best way to address this issue is to drop experiments_daily as a separate dataset, and instead add the {experiment: experiment_branch} map as a field in the clients_daily dataset. Then queries for experiments could use a WHERE clause against clients_daily rather than maintaining another table. Ryan, Dave, do you have use cases for experiments_daily that would not be covered by such a change?
Flags: needinfo?(rharter)
Flags: needinfo?(mreid)
Flags: needinfo?(dzeber)
Spoke to Dave offline, he's ok with this proposed change.
Flags: needinfo?(dzeber)
Sounds like a good plan!
Flags: needinfo?(rharter)
Priority: P2 → P1
Summary: Investigate experiments_daily job failures → Merge experiments_daily into clients_daily
Status: NEW → RESOLVED
Closed: 7 years ago
Resolution: --- → FIXED
Blocks: 1426172
Component: Datasets: Experiments → General
You need to log in before you can comment on or make changes to this bug.

Attachment

General

Created:
Updated:
Size: