Closed Bug 1365077 Opened 7 years ago Closed 4 years ago

[meta] Anomaly detection and prioritization in telemetry-streaming

Categories

(Data Platform and Tools :: Monitoring & Alerting, enhancement, P3)

x86_64
Linux
enhancement

Tracking

(Not tracked)

RESOLVED WONTFIX

People

(Reporter: amiyaguchi, Assigned: amiyaguchi)

References

Details

MacroBase is an analysis system for fast data in the form of a dataflow pipeline. [1] At a high level, it outlines a system that contains: 

* An API for defining an analytics pipeline for fast data
* Suggestions for robustly estimating the distribution of data
* A method of training and retraining classifiers for anomaly detection
* A risk-ratio metric to explain outlier data through summarization and prioritization
* Algorithms and data structures to support streaming summarizations

This could be used to automatically surface anomalies such as the notorious Aurora 51 build. [2]

# Setup

The experimental setup used to benchmark MacroBase consisted of a single server running a Xeon E5-4657L (4 CPUs, 12 cores each) and 1TB of ram. This setup was able to achieve roughly 1-2 million points/second on streaming explanations of anomalies, depending on the data. According to the CEP monitor for docTypes and the main_summary dataset, main pings arrive at to the ingestion endpoint at an average rate of 200k pings per minute over the day. [3] 

While well within the bounds of MacroBase's facilities, it may require some work fit this within our spark streaming environment. MacroBase was benchmarked against a single executor with fairly heavy specs. Our current infrastructure is built on top of ec2 c3.4xlarge machines running in a cluster configuration. In Appendix D of [1], an experimental scale-out was performed by partitioning the dataset across independent workers and taking the union of the results. Throughput increased linearly, but accuracy suffered significantly. We may want to consider the necessary resources required for processing the full stream if we utilizing the library.

# Points to follow up on

Here are a few of the questions that I currently have and would like to explore in more detail.

> What throughput can we expect from a typical machine hosted in aws (e.g. c3.4xlarge)?
I expect this to scale linearly with the amount of resources available. This is a more realistic setup that would be productionized. 

> Is it possible to achieve horizontal scaling through parallelization?
A few of the algorithms have possible parallel implementations. For example, median absolute deviation (MAD) can be achieved through approximate quantiles. There is also an implementation of parallel FPGrowth for frequent itemset mining in MLib that can be used in a batched setting. 

Spark does have a few streaming machine learning algorithms implemented such as linear regression and k-means, but none relevant to MacroBase. It would be interesting to look into parallel algorithms for identifying frequent itemsets over sliding windows. 


# Next steps

Macrobase-lib as of writing is currently a work in progress, lacking some operators for reproducing the entirety of the default streaming pipeline described in [1].

However, it should be possible to pass labeled data to the library and use the IncrementalSummarizer to see what kinds of anomalies surface. 

Concretely what should happen next is the following:

* Choose a metric:
    - Ping volume rate is probably the most interesting, but requires a count_approx_distinct over the cube of relevant dimensions in order to come up with a distribution of the rate. The sliding window required for results at the client_id granularity is in the order of days because of the time between submissions. 
    -  Client latency is a metric that occurs on a per-ping basis, and is a better candidate for this drill-down type of analysis. This is simple to implement (datediff(submission_date, subsession_date)), and could lead to an interesting insight.
    - Other key metric supporting mission control related work

* Generate a dataset of labeled data over main_summary over the month of April 2017 using median absolute distance over the chosen metric.

* Batch feed the dataset to legacy MacroBase setup to explore the data

* Set up spark streaming to read from labeled data and feed to IncrementalSummarizer
    - Collect streamed explanations
    - Determine if the defaults are adequate for our ping volume


[1] https://arxiv.org/abs/1603.00567
[2] https://chuttenblog.wordpress.com/2017/02/23/data-science-is-hard-anomalies-and-what-to-do-about-them/
[3] https://pipeline-cep.prod.mozaws.net/dashboard_output/graphs/analysis.trink.doctypes.message_per_minute.html
[4] https://github.com/stanford-futuredata/macrobase
[5] https://github.com/stanford-futuredata/macrobase/blob/master/lib/src/test/java/edu/stanford/futuredata/macrobase/StreamingSummarizationTest.java#L129
Assignee: nobody → amiyaguchi
Priority: -- → P2
I've been working through the initial implementation of the MacroBase operators within a Spark streaming context [1]. It is not in a working state yet.

Stateful stream processing in Spark is a bit tricky. I'd like to be able to accomplish some of the following things during this stage of things.

* Maintain global state for training samples, classifiers, and incremental summarizer
* Checkpoint global state
* Gather explanations on a periodic and/or on-demand basis 

`mapWithState` persists arbitrary data-types between batches of data. This creates a RDD[T] with each key assigned to an executor. While this is great for things like counting items over a large number of keys, it might not be suited for holding global state of a single key.

Streaming programs should be able to tolerate failures and restart with previous state. Checkpointing is not available for state outside of`updateStateByKey` or `mapWithState`, so it needs to be done manually. This would require some serialization/deserialization of relevant class instances on a regular interval.

Finally, it would be interesting to have periodic triggers for stateful processing that are independent of batch intervals. However, it is much simpler to think about about the pipeline when everything is running on the same clock. I can probably just add a counter to output results on the nth batch. This could be used to track things like a exponential moving average processed every 15 seconds but reported every minute. 


[1] https://github.com/acmiyaguchi/telemetry-streaming/tree/macrobase
I've made some minor progress. First is a batched implementation of this system in pyspark. [1] I wanted to try this out to see if the ideas would translate well with our chosen streaming framework. I've also collected a day's worth of measurements that might be interesting to look at from the raw data dumps. [2] 

PLUGIN_HANG_NOTICE_COUNT   - scalar
SUBPROCESS_LAUNCH_FAILURE  - scalar
SUBPROCESS_ABNORMAL_ABORT  - scalar
GC_MAX_PAUSE_MS            - linear        high: 1000,  nbuckets: 50
CYCLE_COLLECTOR_MAX_PAUSE  - exponential   high: 10000  nbuckets: 50

Bug 1367506 will automatically add linear and exponential histograms to the main summary dataset. Once this lands, histogram data will be available via main_summary for the last 180 days. Cerberus, the histogram regression detection service, can provide a list of regressions in telemetry data via the Bhattacharyya distance [3]. The spark streaming context can read directly from a parquet data store, reducing the friction in getting data into the system. 

I've been considering an in-memory key-value store for stateful streaming processing. Apache Ignite [4] provides a RDD compatible caching layer, while RedisLabs [5] maintains a spark binding to redis. This extra memory layer would solve the problem of sharing state across machines and batches, and provide a way to persist accumulated state to disk for checkpointing.

[1] https://gist.github.com/acmiyaguchi/69d082193877967703a9b231a4442d15
[2] https://gist.github.com/acmiyaguchi/3c6996f37954a2deab10d0b004a8841e
[3] http://mozilla.github.io/cerberus/dashboard/
[4] https://ignite.apache.org/
[5] https://github.com/RedisLabs/spark-redis
An interesting constraint to put on this problem is to monitor all telemetry probes for anomalies. This is probably out of the scope of this particular bug, but is a nice problem to have as the design of this system is fleshed out. 

As of Firefox 55, there are 1636 probes with 271 being opt-out.[1] There are a few different ways to handle this case. Since many of these measures are bound to be heavily correlated or noisy, one way handles all measures as a single feature vector and attempt to reduce the dimensionality. This loses information about the individual measures themselves, but would be able to take advantage of correlations between measures.

Another is have a model for each probe, or a small subset of probes. Many models would be trained and evaluated in parallel, an approach not unprecedented in spark.[2] This would probably make the most sense in the macrobase pipeline, since initial scale-out is suggested via multiplexing. Processing bandwidth is limited to infrastructure costs, which is significant for services that are online 24 hours a day, 365 days a year. There are several ways of multiplexing processing of each individual telemetry measure -- for example, a very simple scheme would involve round robin training of models, but evaluation against all models every time period. This is based on the notion that roughly 50% of processing time is spent training models.

But not all telemetry measures are made equal. Models that are more likely to be anomalous should be trained an retrained on a higher frequency, whereas measurements that haven't been affected in a while should still be monitored but retrained after a longer period. The results of the pipeline could write the labels to disk, allowing us to figure out which measurements are correlated when anomalies are detected. This could form a basis of monitoring all 1500+ telemetry measures with reasonable latency and alerting. 

[1] http://georgf.github.io/fx-data-explorer/index.html
[2] https://github.com/databricks/spark-sklearn
[3] https://en.wikipedia.org/wiki/Multiplexing#Types
This looks exciting!  Can it be made into a service that we can deliver large cubes of time-series data to?  

We (the performance improvement team) track over 30K tests, over tens of platforms and branches, for a total of a couple billion data points per week. We would like to automatically monitor this data for changes in behaviour; the most egregious and confident listed first.
(In reply to Anthony Miyaguchi [:amiyaguchi] from comment #3)
> Another is have a model for each probe, or a small subset of probes. Many
> models would be trained and evaluated in parallel, an approach not
> unprecedented in spark.[2] This would probably make the most sense in the
> macrobase pipeline, since initial scale-out is suggested via multiplexing.
> Processing bandwidth is limited to infrastructure costs, which is
> significant for services that are online 24 hours a day, 365 days a year.
> There are several ways of multiplexing processing of each individual
> telemetry measure -- for example, a very simple scheme would involve round
> robin training of models, but evaluation against all models every time
> period. This is based on the notion that roughly 50% of processing time is
> spent training models.

Heh, yeah, that's an interesting problem, but probably out of scope for this bug.

However, one aspect to consider is how much this wins us compared to other solutions. Together with Roberto, last year we looked into using martingales for anomaly detection [2], and built up a prototype that ingests publicly available histogram data [1]. This technique is particularly well suited for real-time, streaming data and often used for anomaly detection in other contexts.
We also started digging into deep learning networks (RNNs in particular) to do the same. Many solutions :-D

[1] - https://github.com/Dexterp37/martingale-change-detector
[2] - https://robertovitillo.com/2016/12/15/a-martingale-approach-to-detect-changes-in-histograms/

:amiyaguchi, Correct me if I am wrong, but the power in the MacroBase strategy is the combination of anomaly detection **and prioritization**.  The latter is of primary importance due to the large number of (independent) time series datasets we must monitor; we will never have the human resources to investigate every statistically significant anomaly, so we must have an automated way to make a (rough) list of the top N most egregious anomalies; which we humans can review and prioritize against our other responsibilities.

:Dexter, Martingales for anomaly detection is no where close to something useful; it is inferior to basic statistical change detection methods; it requires human tweaking; and has no way to compare two anomalies in either gross amplitude nor statistical confidence. MacroBase is exciting because it is the first time I have seen anyone at Mozilla putting effort toward prioritizing anomalies. Our current math is detecting more anomalies than we can act on; we do not need to spend more effort improving our anomaly detection. We are missing egregious anomalies now because we are not performing basic change detection, and if we are, we are not prioritizing what we do find.

I also like that :amiyaguchi has started to deal with the problems of managing state between batch runs of MacroBase: This is an important step for distilling prior distributions rather than recalculating them; and also important for communicating data distributions to external systems that may need to display and store them.
(In reply to Kyle Lahnakoski [:ekyle] from comment #6)
> :Dexter, Martingales for anomaly detection is no where close to something
> useful; it is inferior to basic statistical change detection methods; it
> requires human tweaking; and has no way to compare two anomalies in either
> gross amplitude nor statistical confidence. MacroBase is exciting because it
> is the first time I have seen anyone at Mozilla putting effort toward
> prioritizing anomalies. Our current math is detecting more anomalies than we
> can act on; we do not need to spend more effort improving our anomaly
> detection.

The power of MacroBase is indeed the prioritization. I'm not saying let's all jump on martingales, I'm saying let's measure and evaluate what's the best option and the use-cases, if any effort is required to scale up :-)

With that said, I'm excited about seeing MacroBase being part of our pipeline and very happy about the work Anthony has been doing so far.

> We are missing egregious anomalies now because we are not
> performing basic change detection, and if we are, we are not prioritizing
> what we do find.

We as in the performance improvement team?
(In reply to Alessio Placitelli [:Dexter] from comment #7)
 
> We as in the performance improvement team?

Sure.
(In reply to Kyle Lahnakoski [:ekyle] from comment #4)
> This looks exciting!  Can it be made into a service that we can deliver
> large cubes of time-series data to?  
> 
> We (the performance improvement team) track over 30K tests, over tens of
> platforms and branches, for a total of a couple billion data points per
> week. We would like to automatically monitor this data for changes in
> behaviour; the most egregious and confident listed first.

I'm excited that there are use cases for a system that can track anomalies across a wide set of measures. The challenges on the performance team have a familiar ring to them -- the data platform team also manages a few billion data points a week across the full set of browser measurements. When it's appropriate, I would like to take the opportunity to expose these ideas to the wider data community at Mozilla. I try to share what I can, this bug is one way of keeping tabs on the effort to evaluate a prototype and bring it online. 


(In reply to Alessio Placitelli [:Dexter] from comment #5)
> However, one aspect to consider is how much this wins us compared to other
> solutions. Together with Roberto, last year we looked into using martingales
> for anomaly detection [2], and built up a prototype that ingests publicly
> available histogram data [1]. This technique is particularly well suited for
> real-time, streaming data and often used for anomaly detection in other
> contexts.
> We also started digging into deep learning networks (RNNs in particular) to
> do the same. Many solutions :-D

I've read through the available materials about anomaly detection systems in our data pipeline. I like the idea of forecasting to detect anomalies in time series data, and I think that the martingale approach is interesting. I'm really curious about what you've done with regards to RNNs, because I've been caught up with them myself. I read about a forecasting method using RNNs and some heavy feature engineering involving wavelets that caught my attention late last year, and it'd be cool to explore in that direction again.[1] Symbolic time series (iSAX) in hindsight also seems like an intriguing approach for detecting anomalies in streams, opening a pathway to incorporating tools like string similarity measures and motif databases in our work flow. This relevant tutorial [2] was a good primer for kinds of problems that can be tackled with the tools for modeling time series data.


I think MacroBase is relevant regardless of what method we use to label the data. The biggest take away for me is the idea of being able to explain anomalies when working with data -- it seems to be a missing piece of the debugging cycle when working with large fluxes of data. I've written some helpful utilities for automating large swathes of dataset validation by labeling and explaining differences between datasets of the same schema (bug 1347705 enhanced with some of the ideas I've explored in this bug). I think that MacroBase as an API is flexible enough for us to be able to explore different routes for labeling incoming data, while providing a consistent framework for classifying and explaining the data.

However, the real time component also very applicable to our domain. Meeting the requirements for high-volume real-time anomaly detection will probably involve the data structures and algorithms outlined in the paper. However, there's no real use discussing this until I can provide some concrete data on the efficacy of the pipeline in the context of telemetry in a batched setting.


[1] http://www.cs.au.dk/~cstorm/students/Chong_Jul2009.pdf
[2] http://www.cs.ucr.edu/~eamonn/iSAX/tutorial_ICDM06.ppt
Depends on: 1378435
Depends on: 1379806
Summary: Add anomaly detection and prioritization to telemetry-streaming → [meta] Anomaly detection and prioritization in telemetry-streaming
Priority: P2 → P3
Status: NEW → RESOLVED
Closed: 4 years ago
Resolution: --- → WONTFIX
You need to log in before you can comment on or make changes to this bug.