Closed Bug 1390104 Opened 7 years ago Closed 7 years ago

Deploy health ping to parquet output

Categories

(Data Platform and Tools Graveyard :: Operations, defect)

defect
Not set
normal

Tracking

(Not tracked)

RESOLVED FIXED

People

(Reporter: katejimmelon, Assigned: whd)

References

Details

-- configuration file
filename        = "health_parquet.lua"
message_matcher = "Type == 'telemetry' && Fields[docType] == 'health'"
ticker_interval = 60
preserve_data   = false

parquet_schema = [=[
   message health {
     required binary type (UTF8);
     required binary id (UTF8);
     required binary creationDate (UTF8);
     required double version;
     required binary clientId (UTF8);
     required group application {
       required binary architecture (UTF8);
       required binary buildId (UTF8);
       required binary channel (UTF8);
       required binary name (UTF8);
       required binary platformVersion (UTF8);
       required binary version (UTF8);
       optional binary displayVersion (UTF8);
       required binary vendor (UTF8);
       required binary xpcomAbi (UTF8);
     }
     required group payload {
       required group os {
           required binary name (UTF8);
           required binary version (UTF8);
       }
       required binary reason (UTF8);
       optional group sendFailure (MAP) {
           repeated group key_value {
               required binary key (UTF8);
               required int64 value;
           }
       }
       optional group pingDiscardedForSize (MAP) {
           repeated group key_value {
               required binary key (UTF8);
               required int64 value;
           }
       }
     }
     required group metadata {
         optional binary Date (UTF8);
         required binary normalizedChannel (UTF8);
         required binary appUpdateChannel (UTF8);
         required binary submissionDate (UTF8);
         required binary geoCity (UTF8);
         required binary geoCountry (UTF8);
         required binary documentId (UTF8);
         required binary appBuildId (UTF8);
         required binary appName (UTF8);
     }
   }
]=]

-- The name of a top level parquet group used to specify additional information-- to be extracted from the message (using read_message). If the column name
-- matches a Heka message header name the data is extracted from 'msg.name'
-- otherwise the data is extracted from msg.Fields[name]
metadata_group = "metadata"

-- Array of Heka message variables containing JSON strings. The decoded JSON
-- objects are assembled into a record that is dissected based on the parquet
-- schema. This provides a generic way to cherry pick and re-combine the
-- segmented JSON structures like the Mozilla telemetry pings. A table can be
-- passed as the first value either empty or with some pre-seeded values.
-- If not specified the schema is applied directly to the Heka message.
json_objects = {"Fields[submission]"}

s3_path_dimensions  = {
    -- access message data with using read_message()
    -- {name = "_submission_date", source = "Fields[submissionDate]"},
    -- access Timestamp using read_message() and then encode it using the dateformat string.
    -- scaling_factor is multiplied by source to output timestamp seconds. The default is 1e-9,
    -- which implies that source is in nanoseconds.
    {name = "_submission_date", source = "Timestamp", dateformat = "%Y-%m-%d-%H", scaling_factor = 1e-9},
    -- access the record data with a path array
    -- {name = "_submission_date", source = {"metadata", "submissionDate"}}
}

-- directory location to store the intermediate output files
batch_dir       = "/var/tmp/parquet"

-- Specifies how many parquet writers can be opened at once. If this value is
-- exceeded the least-recently used writer will have its data finalize and be
-- closed. The default is 100. A value of 0 means no maximum **warning** if
-- there are a large number of partitions this can easily run the system out of
-- file handles and/or memory.
max_writers         = 100

-- Specifies how many records to aggregate before creating a rowgroup
-- (default 10000)
max_rowgroup_size   = 10000

-- Specifies how much data (in bytes) can be written to a single file before
-- it is finalized. The file size is only checked after each rowgroup write
-- (default 300MiB).
max_file_size       = 1024 * 1024 * 300

-- Specifies how long (in seconds) to wait before the file is finalized
-- (default 1 hour).  Idle files are only checked every ticker_interval seconds.
max_file_age        = 60 * 60

-- This option causes the field name to be converted to a hive compatible column
-- name in the parquet output. The conversion snake cases the field name and
-- replaces any non [-_a-z0-9] characters with an underscore.
-- e.g. FooBar? -> foo_bar_
hive_compatible     = true -- default false
Summary: Deploy testpilot to parquet output → Deploy health ping to parquet output
(In reply to Kate Ustiuzhanina from comment #0)
Update scheme: move version from double -> int64

-- configuration file
filename        = "health_parquet.lua"
message_matcher = "Type == 'telemetry' && Fields[docType] == 'health'"
ticker_interval = 60
preserve_data   = false

parquet_schema = [=[
   message health {
     required binary type (UTF8);
     required binary id (UTF8);
     required binary creationDate (UTF8);
     required int64 version;
     required binary clientId (UTF8);
     required group application {
       required binary architecture (UTF8);
       required binary buildId (UTF8);
       required binary channel (UTF8);
       required binary name (UTF8);
       required binary platformVersion (UTF8);
       required binary version (UTF8);
       optional binary displayVersion (UTF8);
       required binary vendor (UTF8);
       required binary xpcomAbi (UTF8);
     }
     required group payload {
       required group os {
           required binary name (UTF8);
           required binary version (UTF8);
       }
       required binary reason (UTF8);
       optional group sendFailure (MAP) {
           repeated group key_value {
               required binary key (UTF8);
               required int64 value;
           }
       }
       optional group pingDiscardedForSize (MAP) {
           repeated group key_value {
               required binary key (UTF8);
               required int64 value;
           }
       }
     }
     required group metadata {
         optional binary Date (UTF8);
         required binary normalizedChannel (UTF8);
         required binary appUpdateChannel (UTF8);
         required binary submissionDate (UTF8);
         required binary geoCity (UTF8);
         required binary geoCountry (UTF8);
         required binary documentId (UTF8);
         required binary appBuildId (UTF8);
         required binary appName (UTF8);
     }
   }
]=]

-- The name of a top level parquet group used to specify additional information-- to be extracted from the message (using read_message). If the column name
-- matches a Heka message header name the data is extracted from 'msg.name'
-- otherwise the data is extracted from msg.Fields[name]
metadata_group = "metadata"

-- Array of Heka message variables containing JSON strings. The decoded JSON
-- objects are assembled into a record that is dissected based on the parquet
-- schema. This provides a generic way to cherry pick and re-combine the
-- segmented JSON structures like the Mozilla telemetry pings. A table can be
-- passed as the first value either empty or with some pre-seeded values.
-- If not specified the schema is applied directly to the Heka message.
json_objects = {"Fields[submission]"}

s3_path_dimensions  = {
    -- access message data with using read_message()
    -- {name = "_submission_date", source = "Fields[submissionDate]"},
    -- access Timestamp using read_message() and then encode it using the dateformat string.
    -- scaling_factor is multiplied by source to output timestamp seconds. The default is 1e-9,
    -- which implies that source is in nanoseconds.
    {name = "_submission_date", source = "Timestamp", dateformat = "%Y-%m-%d-%H", scaling_factor = 1e-9},
    -- access the record data with a path array
    -- {name = "_submission_date", source = {"metadata", "submissionDate"}}
}

-- directory location to store the intermediate output files
batch_dir       = "/var/tmp/parquet"

-- Specifies how many parquet writers can be opened at once. If this value is
-- exceeded the least-recently used writer will have its data finalize and be
-- closed. The default is 100. A value of 0 means no maximum **warning** if
-- there are a large number of partitions this can easily run the system out of
-- file handles and/or memory.
max_writers         = 100

-- Specifies how many records to aggregate before creating a rowgroup
-- (default 10000)
max_rowgroup_size   = 10000

-- Specifies how much data (in bytes) can be written to a single file before
-- it is finalized. The file size is only checked after each rowgroup write
-- (default 300MiB).
max_file_size       = 1024 * 1024 * 300

-- Specifies how long (in seconds) to wait before the file is finalized
-- (default 1 hour).  Idle files are only checked every ticker_interval seconds.
max_file_age        = 60 * 60

-- This option causes the field name to be converted to a hive compatible column
-- name in the parquet output. The conversion snake cases the field name and
-- replaces any non [-_a-z0-9] characters with an underscore.
-- e.g. FooBar? -> foo_bar_
hive_compatible     = true -- default false
This should go out the 21st. I agree with comment #2, so I'll add a monitor with an ingestion error threshold of 1% absent any better number(s) we might have for projected size, volume etc. I'll have it email minimally myself and :gfritzsche.

We're working on making alerting more self-service (and mandatory) per https://github.com/mozilla-services/mozilla-pipeline-schemas/pull/70 but it's a WIP.
https://github.com/mozilla-services/puppet-config/pull/2627
https://sql.telemetry.mozilla.org/queries/21960

Technically the schemas rpm was built from dev for this deploy, but it can be built out-of-band from the rest of the packages. The alerting might need tweaking but the only ingestion error so far was generated by my testing, leading me to believe the schema is sound.

There's currently some d2p->athena issue that's causing this data to only be available in presto (see example query), but :robotblake is looking into that.
Status: NEW → RESOLVED
Closed: 7 years ago
Resolution: --- → FIXED
Product: Data Platform and Tools → Data Platform and Tools Graveyard
You need to log in before you can comment on or make changes to this bug.