Closed Bug 1304412 Opened 8 years ago Closed 8 years ago

Implement Parquet sink for Hindsight

Categories

(Cloud Services Graveyard :: Metrics: Pipeline, defect, P1)

defect

Tracking

(Not tracked)

RESOLVED FIXED

People

(Reporter: rvitillo, Assigned: trink)

References

Details

User Story

It would be great if Hindsight could store data in Parquet form so that we can easily import it in Spark or Presto without the need of custom jobs.

Ideally we would like to process JSON messages for which a schema was defined and registered a priori in a schema service. Using confluent's schema registry would be a plus as Ops is planning to use that for their logging service as well.

Each message should end up in a dataset on S3. The layout convention of a dataset is the following: s3://prefix/name/version/partition(/partition)*/file, where:
- prefix is an arbitrary prefix, e.g: “telemetry-parquet”
- name is the name of the dataset expressed in camel_case notation, e.g. “main_summary”
- version is the version of the dataset, e.g. “v3”; a version number is expressed as an integer preceded by the "v" character
- partition is a partition of the dataset formatted as “name=value” expressed in camel_case notation, e.g. “submission_date=20160919”
- file is the name of the Parquet file expressed in camel_case notation, it could be for example a UUID

The Parquet writer should support the following options:

1) partitioning scheme
The partitioning layout decides where the message will be stored on S3; it should be expressed as a list of field names e.g. “[submission_date, sample_id]”. Note that those fields should *not* be stored in Parquet files. The Hive metastore maps the partitions of a dataset to columns of the logical table that represents the dataset on S3. Which means that one can’t have a partition and a field in a Parquet file with the same name.

2) flush size
The maximum number of records or bytes to store in a Parquet file before flushing it to S3.

3) flush interval
The time interval in milliseconds to flush a Parquet file to S3. This configuration ensures that flushes are invoked every configured interval. This configuration is useful when data ingestion rate is low.

[1] - https://github.com/confluentinc/schema-registry
      No description provided.
User Story: (updated)
Points: --- → 2
Priority: -- → P2
Depends on: 1304439
Assignee: nobody → mtrinkala
Points: 2 → 5
Priority: P2 → P1
No longer depends on: 1304439
> process JSON messages for which a schema was defined and registered a priori

For ops' logging the schema is mozlog, which alone isn't a sufficiently defined parquet schema (I think). Ops will eventually need to be able to build a schema from the messages, because mozlog is the only restriction we get on logs.

> Using confluent's schema registry would be a plus as Ops is planning to use that for their logging service as well.

To clarify, my plan is to set up something sending logs to the confluent rest proxy in avro format, until this bug is implemented. It makes no difference for ops' logging whether or not the implementation of this bug uses confluent's schema registry (like confluent's rest proxy does).
Another bug will be created for the actual Hindsight output plugin. 

The high level tasks for the initial version of the parquet module follow: 

- define writer API
- implement schema parser (data output by the parquet-dump-schema utility)
- implement schema object
    - record creation
    - column creation
    - finalize
- implement a loader to iterate the schema parse table and create the corresponding schema object 
- implement writer property builder
    - support for the following properties
        -  global
            - enable_dictionary (bool)
            - dictionary_pagesize_limit (size_t)
            - write_batch_size (size_t)
            - data_pagesize (size_t)
            - version ("1.0", "2.0")
            - created_by (string)
            - encoding ("plain", "rle")
            - compression ("uncompressed", "snappy", "gzip", "lzo", "brotli")
            - enable_statistics (bool)
        - column_properties
            - col_name1
                - enable_dictionary (bool)
                - encoding ("plain", "rle")
                - compression ("uncompressed", "snappy", "gzip", "lzo", "brotli")
                - enable_statistics (bool)
            - col_nameN
- implement dremel column striping
- implement writer append_row (column output from each message)
- implement writer close (outputs the row group and closes the file)
    - Note: this design will only allow one row group per file
- testing/documentation
Blocks: 1314446
Depends on: 1304439
No longer blocks: 1314446
Reopening the module bug instead, this will track the HS output plugin work
Blocks: 1316027
The number of partitions could be a problem; how many partitions do we currently have?  Potential issues:

File handles:
The Heka stream s3 file partitioner/uploader could just close file handles and reopen them to append more data but this is not possible with the parquet writer.  We would have to finalize the file and upload it as-is.  This may produce a lot of small files.

Recommendation: increase the number of open file handles on the system and make sure we keep the partition count well below it since we will have multiple partitioned writers running on the same box.

Memory:
The parquet partitioner could have up to partitions * rowgroup_size number of records; so if the rowgroup_size is 10000 and there are 1000 partitions that is a worst case of 10MM records in memory at once (even with a small telemetry record of 1KiB that is 10GB).  In contrast the Heka stream partitioner only needs to have one record in memory at a time regardless of how many partitions there are.  

Recommendation: write out several rowgroups until the file reaches its desired since, this will at least reduce our working memory set.
I think that for this first iteration we could live with a simple time based partitioning scheme, i.e. day and hour (e.g. s3://prefix/name/version/submission_date=X/submission_hour=Y/foo.parquet). That's acceptable for many of the smaller use-cases and is also in line with our OKR to reduce latency in the order of few hours.
https://github.com/mozilla-services/lua_sandbox_extensions/pull/48
Status: NEW → RESOLVED
Closed: 8 years ago
Resolution: --- → FIXED
Product: Cloud Services → Cloud Services Graveyard
You need to log in before you can comment on or make changes to this bug.