Closed Bug 1368196 Opened 7 years ago Closed 7 years ago

Create generic ingestion decoder

Categories

(Data Platform and Tools :: General, enhancement, P1)

enhancement
Points:
3

Tracking

(Not tracked)

RESOLVED FIXED

People

(Reporter: whd, Assigned: trink)

References

Details

This an attempt to write a more detailed spec for the generic ingestion decoder. It is not complete, but hopefully complete enough to begin implementation.

Prior art to base this work on: https://github.com/mozilla-services/lua_sandbox_extensions/blob/master/moz_telemetry/io_modules/decoders/moz_telemetry/ping.lua.

The input message to the decoder will be populated with the following fields in addition to the top-level fields (and possibly others based on headers, which can be ignored):

Fields[remote_addr] (ignore)
Fields[uri]: /submit/<namespace>/<doctype>/<docversion>[/<docid>]
Fields[protocol]: (ignore)
Fields[Content-Length]: (ignore)
Fields[host]: (ignore)
Fields[User-Agent]: (ignore)
Fields[X-Forwarded-For]: IP[, IP2...]
Fields[content]: (payload, assumed to be JSON or gzipped JSON)
Fields[args]: (optional) ?other=foo&stuff=bar&here=baz

The decoder should have the following properties.

Configuration options:
GeoIP database location
Schema location on disk

Decoder initialization logic:
Load schemas into a structure
Load GeoIP database

Error throwing logic:
Set the following fields:
Type = "moz_generic.error"
Fields[decodeErrorType] = (an appropriate value)
Fields[decodeError] = (an appropriate value)

Propagate the rest of the fields as they are, including Fields[uri], inject the error message, and exit.

Process message logic:
As a preliminary, all decoded messages should have the following top level values (except as overwritten in the error throwing logic):
Logger = "moz_generic"
Type = "moz_generic"

1. Split Fields[uri] on "/" and set the decoded message Fields[namespace], Fields[doctype], Fields[docversion], Fields[docid] based on them.

   We may want to use decoded field names that align with our telemetry usage or the spec that :mreid has written/is working on. The names I'm using are placeholders and don't particularly matter.

   If Fields[uri] is missing or any of the required properties is missing or contains bad values [^0-9a-zA-Z_-], or the URL is otherwise unparseable, set the bad fields to UNKNOWN, and throw a "uri" error.

2. Propagate Fields[args] as-is (?)

   We could process these into key-value pairs or similar, but as currently spec'd we don't use this information and would probably need to jsonify it to Fields[args_json] or similar to make it available to the parquet output.

3. Check the schema structure for a schema matching the namespace, doctype, and docversion.

   If no such schema is found, throw a "missing-schema" error.

4. Perform json schema validation on Fields[content] using the found schema.

   The logic in this step is json/compressed json document specific and presumably can be branched if/when we eventually support other formats and schemas.

   If Fields[content] is gzip compressed, decompress it. If decompression fails throw a "gzip" error. If the result is invalid json, throw a "json" error (note: these might currently be the same step due to optimizations we've made, see the telemetry decoder). If the schema validation fails, throw a "schema" error.

5. Set Fields[submission] in the output message to the serialized, decompressed, validated json string

6. Perform GeoIP look up (?) and set Fields[geoCountry] and Fields[geoCity]

   This wasn't spec'd but we might want to do it. The logic in the telemetry decoder can probably be used as-is.

7. Set Fields[submissionDate] to the submission date derived from Timestamp

   This is needed for the current s3 heka output, as the parquet one can use Timestamp natively if configured.

8. Inject the message and exit

Timer interval logic:
Reload the schemas from disk
Reload the GeoIP database


The end result of all this should be a message which has at least the following properties:

Fields[submissionDate]
Fields[namespace]
Fields[doctype]
Fields[docversion]
Fields[submission]

which will allow us to write a generic ingestion parquet output that will not require reconfiguration. Or, in the case of an error, we should be able to write it out to errors s3 and parquet streams using a static parquet schema that is generic across error types.
NI :mreid for review.
Flags: needinfo?(mreid)
Points: --- → 3
Priority: -- → P2
Blocks: 1242017
Assignee: nobody → whd
Status: NEW → ASSIGNED
Priority: P2 → P1
Assigning to :trink per IRC discussion.
Assignee: whd → mtrinkala
This will become moz_ingest (handling all output from nginx_moz_ingest).  The common functionality like de-duping will be handled in the top level decoder and sub-decoder support will be added for any decoder supporting transform_message() e.g. telemetry, pioneer, etc...
https://github.com/mozilla-services/lua_sandbox_extensions/commit/f1c7c50178e18d621579f11a19178d89ccbd583d
Status: ASSIGNED → RESOLVED
Closed: 7 years ago
Resolution: --- → FIXED
The previous closing of this bug covered the refactor, :trink is adding the json validation portion of this today.
Status: RESOLVED → REOPENED
Flags: needinfo?(mreid)
Resolution: FIXED → ---
https://github.com/mozilla-services/lua_sandbox_extensions/commit/3b71e3b5c6c9434a4a381c34565a6fd1d4dc7822
Status: REOPENED → RESOLVED
Closed: 7 years ago7 years ago
Resolution: --- → FIXED
Component: Pipeline Ingestion → General
You need to log in before you can comment on or make changes to this bug.