Closed Bug 1627758 Opened 6 years ago Closed 5 years ago

Consider turning off decoder deduplication

Categories

(Data Platform and Tools :: General, task, P2)

task
Points:
5

Tracking

(Not tracked)

RESOLVED FIXED

People

(Reporter: relud, Unassigned)

References

Details

No description provided.

or change it to only dedup republished messages

See Also: → 1630336

Have we considered using the deduplication built into Beam's PubsubIO?

Beam provides a withIdAttribute method where you can specify the name of an attribute to use as a unique ID for deduplicating input messages. In the Dataflow case, I believe that deduplication is handled with the "windmill" service that replaces the PubsubIO implementation in the Beam SDK.

This could be relevant at least for the republisher step where we'd expect incoming messages to have a document_id attribute populated.

We would not have a any guarantee that the message we process is the first one published, though.

I can't remember if perhaps we've previously considered this and rejected using the feature for some performance reason. But I may be misremembering discussion about performance implications of withTimestampAttribute instead.

If we can determine that deduplication is only needed for the republisher, then that seems like a good approach. Otherwise I don't expect it to be useful for telemetry where the document id isn't in the URL.

Otherwise I don't expect it to be useful for telemetry where the document id isn't in the URL.

Both structured ingestion and telemetry have the document_id in the URL. Many telemetry pings also duplicate this value in the payload.

I would even be so bold as to say we might want to investigate moving URL parsing into the edge service so that document_id will be present to use for deduplication at the Decoder step.

(In reply to Jeff Klukas [:klukas] (UTC-4) from comment #5)

Otherwise I don't expect it to be useful for telemetry where the document id isn't in the URL.

Both structured ingestion and telemetry have the document_id in the URL. Many telemetry pings also duplicate this value in the payload.

ah, my bad, that's super useful.

I would even be so bold as to say we might want to investigate moving URL parsing into the edge service so that document_id will be present to use for deduplication at the Decoder step.

even better, I think we could just use the url directly.

even better, I think we could just use the url directly.

Wow. Yes. I don't see any reason that wouldn't be perfectly sufficient. If a client is pathologically resending a payload with the document_id staying the same, it seems unlikely that other aspects of the URI would differ.

Points: --- → 5
Priority: -- → P2
Blocks: 1646497

I dug through the Beam codebase a bit and found that the default PubsubIO implementation ends up calling Deduplicate.WithRepresentativeValues if withIdAttribute is set. That transform has some window parameters (which aren't exposed at the PubsubIO level) which look like they default to 10 minutes.

All that said, Cloud Dataflow replaces the implementation of PubsubIO completely, so it's not calling this codepath. But it may be a starting point for reasoning about what Dataflow may be doing. It's likely worth opening a GCP case to ask for details of how this works in Dataflow.

I opened a GCP case to ask for details about the behavior of deduplication in Dataflow streaming jobs, including details about use particular use case.

The GCP case makes it sound like what we're proposing here should be reasonable, so there's probably nothing blocking us from moving forward with a test.

Do we care about metrics surrounding deduplication?

Of note: if we rely on PubsubIO.Read.withIdAttribute for deduplication, we don't get any metrics around it. We could probably infer how many messages we're dropping as dupes by counting the number of messages published to various topics, but that's certainly lacking in detail compared to our current situation where we publish metadata of deduped messages to payload_bytes_error and we can inspect the URIs of the dropped messages.

If we wanted to maintain metrics or send deduped messages to the error stream, we would likely need to implement our own transform based on Deduplicate.WithRepresentativeValues. This probably wouldn't be too hard to maintain, but I expect would not perform as well. Dataflow likely has some optimized logic for deduping at the PubsubIO level within its streaming service component.

note that we can still compare the number of duplicates dropped by comparing raw and decoded tables. This is imperfect, because ingestion-sink also introduces duplicates, but we could probably account for that by differentiating whether submission_timestamp is the same in addition to just uri being the same.

(In reply to Jeff Klukas [:klukas] (UTC-4) from comment #8)

I dug through the Beam codebase a bit and found that the default PubsubIO implementation ends up calling Deduplicate.WithRepresentativeValues if withIdAttribute is set. That transform has some window parameters (which aren't exposed at the PubsubIO level) which look like they default to 10 minutes.

I can't find where Deduplicate.WithRepresentativeValues gets added in, but in our case I would expect deduplication would be called even when withIdAttribute isn't set, because we are using PubsubIO.readMessagesWithAttributesAndMessageId() and the default implementation sets recordId to messageId as when idAttribute is null

not that the default implementation is actually meaningful, because as you said:

All that said, Cloud Dataflow replaces the implementation of PubsubIO completely, so it's not calling this codepath. But it may be a starting point for reasoning about what Dataflow may be doing. It's likely worth opening a GCP case to ask for details of how this works in Dataflow.

I can't find where Deduplicate.WithRepresentativeValues gets added in

Deduplicate.<ValueWithRecordId<T>, byte[]>withRepresentativeValueFn is called in Read.java. I concur that the messageId is used if no idAttribute is set.

I expect that whatever mechanism Dataflow uses for deduping, it follows that same pattern on using the messageId unless idAttribute is set. So I expect we won't really be hitting any different code paths; we'll just be enlarging the size of the IDs that need to be maintained.

https://github.com/mozilla/gcp-ingestion/pull/1542 landed on the 16th and didn't appear to have noticeable performance impact. It's notably enabled in tandem with redis (for telemetry). I think we're now in a position to used methods like comment #11 to analyze beam's dedup performance, but perhaps we want to test with redis entirely disabled as well (which is how structured is now configured, but structured clients behave differently).

In our weekly pipeline health check meeting, we noticed a precipitous decline in several of the top schema errors, starting on Feb 17th. It seems likely this is connected to this dedupe change, but more analysis is needed to validate. See https://sql.telemetry.mozilla.org/dashboard/all-schema-errors

Drop in errors other than DuplicateIdException

I am seeing ~6x reduction in volume in moz-fx-data-shared-prod.payload_bytes_error.telemetry for error types other than DuplicateIdException. The number of distinct URIs per day is reasonably constant, though. See https://sql.telemetry.mozilla.org/queries/78162/source

Previously, we did no deduplication on error messages. The Redis-based deduplication does not occur until after schema validation. So documents that don't pass validation were passed along to the error table. The new PubsubIO deduplication happens at the very start of the Decoder, so applies to all messages whether they validate or not.

I'm surprised that the duplicate rate for failing documents was so high. This appears to be driven by UnwantedDataException linking to https://bugzilla.mozilla.org/show_bug.cgi?id=1618684 which is FirefoxOS pings. We apparently have a number of clients sending 1000 to 2000 pings per day with identical IDs, adding up to millions of pings per day.

Drop in DuplicateIdException

The number of DuplicateIdException goes down by ~3x: https://sql.telemetry.mozilla.org/queries/78165/source

We're also able to explain at least one of the major error drops due to the dedupe change. This demonstrates perf/topsites_search_shortcuts behavior: https://sql.telemetry.mozilla.org/queries/78166/source

(In reply to Wesley Dawson [:whd] from comment #15)

I think we're now in a position to used methods like comment #11 to analyze beam's dedup performance, but perhaps we want to test with redis entirely disabled as well

I think we are good to turn off redis at this point. In telemetry the deduplicates removed by copy_deduplicate are 4-5% of stable table rows, and didn't significantly change, while the number of duplicates removed by redis went from ~0.50% to ~0.13% of stable table rows, per https://sql.telemetry.mozilla.org/queries/75213/source

I'm planning on disabling redis for the telemetry decoder next week. After that change, we're going to run a few more queries to document observed deduplication rates before closing this out.

I've filed bug #1694764 to separately decide whether it's appropriate to keep this enabled for stub installer pings or not.

I'm preparing a branch to remove ops redis configuration.

Here's the planned procedure, for the day after bug #1688633 lands:

  1. Disable automated jenkins jobs
  2. in stage beam resources (on the aforementioned branch):
    a. terraform-11.14 state rm module.resources.google_redis_instance.deduplication
    b. terraform-11.14 state rm module.resources.google_project_service.cache
    c. terraform-11.14 state mv module.resources.module.structured_dataflow.google_dataflow_job.decoder_no_deduplication module.resources.module.structured_dataflow.google_dataflow_job.decoder
    d. terraform-11.14 refresh to pick up output changes
  3. Run https://ops-master.jenkinsv2.prod.mozaws.net/job/gcp-pipelines/job/beam/job/beam-deploy-stage/ and verify
  4. Merge branch
  5. Repeat the above steps for prod on default branch
  6. Re-enable automated jenkins jobs
  7. Merge https://github.com/mozilla/gcp-ingestion/pull/1583, let stage auto-deploy
  8. Deploy beam prod code change as per usual
  9. Manually delete stage and prod memorystore instances

In classic memorystore fashion the redis instance became unavailable for over an hour with no indication as to why (instance uptime of course didn't drop to 0, but you can see the giant gap in metrics). I put in https://github.com/mozilla-services/cloudops-infra/pull/2940 to disable redis in beam job configuration to unblock prod deploys, which means we're no longer using redis as of UTC March 10th, but I expect to properly remove redis using the above procedure later this week (as of now we're still paying for it).

All the above steps have been completed and we're no longer deduplicating with redis. Closing this out.

Status: NEW → RESOLVED
Closed: 5 years ago
Resolution: --- → FIXED
Component: Pipeline Ingestion → General
You need to log in before you can comment on or make changes to this bug.