Create generic ingestion parquet s3 output

RESOLVED FIXED

Status

Data Platform and Tools
Pipeline Ingestion
P1
normal
RESOLVED FIXED
11 months ago
3 months ago

People

(Reporter: whd, Assigned: whd)

Tracking

(Blocks: 2 bugs)

Details

(Assignee)

Description

11 months ago
This isn't a complete spec but should be enough for someone to get started on implementation.

Prior art to base this work on: https://github.com/mozilla-services/lua_sandbox_extensions/blob/master/parquet/sandboxes/heka/output/s3_parquet.lua

The output from this plugin should wind up on the file system looking something like:

$BATCH_DIR/$NAMESPACE-$DOCTYPE-parquet/v$DOCVERSION/submission_date_s3=YYYMMDD+file.parquet.done

and wind going to s3 looking like:

s3://$BUCKET/$NAMESPACE-$DOCTYPE-parquet/v$DOCVERSION/submission_date_s3=YYYMMDD/file.parquet

The key addition being that a single output will handle all the namespace-doctype-docversion parquetmr schemas available, instead of just one.

I will describe more or less the changes that I think need to be made to the existing output to accomplish the above.

For configuration, parquet_schema should be replaced with schema_path, which should be used to load all available parquetmr schemas from the filesystem on initialization and possibly timer interval. Instead of writing the parquet files directly to the batch directory, the various prefixes in the batch directory will need to be generated. This can be done during schema load, and the directories should be as above: $BATCH_DIR/$NAMESPACE-$DOCTYPE-parquet/v$DOCVERSION/.

The rest of the parameters should remain as they are, but process_message needs a few changes. At a high level, the logic needs to be parameterized to have writers per namespace-doctype-docversion trio of the message being processed. This will include minimally changes to get_s3_path and probably other noodling around.

Of note to :mreid, in this variant the output path order for parquet differs from the heka output as we're using the docversion also as the parquet table version. We could redundantly add docversion to the submission path after submissiondate I suppose too.

I think we can simply drop messages for which a parquet schema does not exist. If a JSON schema also doesn't exist, the data will wind up in both the heka error stream and the parquet error stream. If a JSON schema does exist, it might mean we don't want a parquet output.
(Assignee)

Comment 1

11 months ago
NI :mreid for review.
Flags: needinfo?(mreid)

Comment 2

11 months ago
(In reply to Wesley Dawson [:whd] from comment #0)
> Of note to :mreid, in this variant the output path order for parquet differs
> from the heka output as we're using the docversion also as the parquet table
> version. We could redundantly add docversion to the submission path after
> submissiondate I suppose too.

I don't quite follow this bit. Can you give an example?

> I think we can simply drop messages for which a parquet schema does not
> exist. If a JSON schema also doesn't exist, the data will wind up in both
> the heka error stream and the parquet error stream. If a JSON schema does
> exist, it might mean we don't want a parquet output.

Agreed. I think the usual case should be that both JSON Schema exists (for validation) and parquet schema exists (for output). If the former doesn't exist, there is an error. If the JSON Schema exists, but not the parquet schema, we default to only having access to the data for realtime analysis. If we want output in a form other than parquet (ie. Heka-framed protobuf), that would be a special case.
Points: --- → 3
Flags: needinfo?(mreid)
Priority: -- → P2
(Assignee)

Comment 3

11 months ago
(In reply to Mark Reid [:mreid] from comment #2)

> I don't quite follow this bit. Can you give an example?

I'm pointing out that the dimension order in parquet e.g.

s3://$BUCKET/$NAMESPACE-$DOCTYPE-parquet/v$DOCVERSION/submission_date_s3=YYYMMDD/file.parquet

differs in order from the heka output that we had in the doc:

submission_date/namespace/doctype/version/<files> 

in that we use docversion as table version and it's not necessarily available as docversion in queries. In the google doc the parquet schema was not made explicit.
(Assignee)

Updated

10 months ago
Blocks: 1242017
(Assignee)

Comment 4

8 months ago
Instead of implementing a multiplexed parquet output, I think it would be easier if we do something along the lines of https://github.com/mozilla-services/mozilla-pipeline-schemas/pull/70 to dynamically generate the d2p configs. To that end, I've prepared https://github.com/mozilla-services/svcops/tree/generic_ingestion and https://github.com/mozilla-services/puppet-config/tree/generic_ingestion for testing this approach.

NI :mreid for an opinion on whether this is an acceptable alternate approach.
Assignee: nobody → whd
Flags: needinfo?(mreid)
Priority: P2 → P1

Comment 5

7 months ago
Templated / auto-generated configs seem like a reasonable approach.

If we go with putting the generated configs into the schema repo, I mildly dislike the puppet erb templates being separated from the variables they depend on (@max_age, etc) but I don't see a good way to avoid that.
Flags: needinfo?(mreid)
(Assignee)

Comment 6

3 months ago
https://github.com/mozilla-services/puppet-config/pull/2687/files
https://github.com/mozilla-services/svcops/pull/1392/files

We have plans for a v2 that should remove some of the ugliness of this approach but it is (self)-serviceable for now.
Status: NEW → RESOLVED
Last Resolved: 3 months ago
Resolution: --- → FIXED
You need to log in before you can comment on or make changes to this bug.