Closed
Bug 1305087
Opened 9 years ago
Closed 7 years ago
spark-hyperloglog functions should be callable from pyspark
Categories
(Data Platform and Tools :: General, defect, P1)
Data Platform and Tools
General
Tracking
(Not tracked)
RESOLVED
FIXED
People
(Reporter: rvitillo, Assigned: amiyaguchi)
References
Details
(Whiteboard: [DataPlatform])
User Story
The spark-hyperloglog [1] package is used by telemetry-batch-view to generate the client_count dataset. We would like to invoke the same set of functionality from Python as well. Ideally we would like to have that package accessible by default by Spark clusters launched by our next-gen atmo. Saptarshi made an initial attempt (quoting from his e-mail): ======================================================================================== 1. Use emr-5.0.0 2. Start the shell with "pyspark --packages vitillo:spark-hyperloglog:1.1.1" 3. Run the following code: from pyspark.sql.column import Column, _to_java_column, _to_seq from pyspark.sql import Row from pyspark.sql.types import LongType import pyspark.sql.functions as F def hlm(col): sc = SparkContext._active_spark_context _hll_merge = sc._jvm.com.mozilla.spark.sql.hyperloglog.aggregates.HyperLogLogMerge().apply return Column(_hll_merge(_to_seq(sc, [col], _to_java_column))) def hlc(col): sc = SparkContext._active_spark_context _hll_cardinality = sc._jvm.com.mozilla.spark.sql.hyperloglog.functions.package.hllCardinality return Column( _hll_cardinality(col)) sqlContext.registerFunction("hllCardinality",hlc,returnType=LongType()) ff=sqlContext.read.parquet("s3://telemetry-parquet/client_count/v2016032120160921/") ff1=ff.filter(ff.activity_date=='2016-09-01') ff2=ff1.agg(hlm("hll").alias("mhll")) >>> ff2 DataFrame[mhll: binary] ff2.selectExpr("hllCardinality(mhll)").collect() and then i get (after nearly 100% done) File "<stdin>", line 3, in hlc AttributeError: 'NoneType' object has no attribute '_jvm' Because 'sc' is not present on worker nodes. ======================================================================================== [1] https://github.com/vitillo/spark-hyperloglog [2] https://github.com/mozilla/telemetry-batch-view/blob/master/src/main/scala/com/mozilla/telemetry/views/ClientCountView.scala
Attachments
(2 files)
No description provided.
Reporter | ||
Updated•9 years ago
|
User Story: (updated)
Reporter | ||
Updated•9 years ago
|
User Story: (updated)
Reporter | ||
Updated•9 years ago
|
Assignee: nobody → jezdez
Comment 1•9 years ago
|
||
One more update
In the driver node,
u = ff2.collect()
finalCount = sc._jvm.com.mozilla.spark.sql.hyperloglog.functions.package.hllCardinality(u[0].mhll)
finalCount is the correct number.
Updated•9 years ago
|
Status: NEW → ASSIGNED
Comment 2•9 years ago
|
||
Comment 3•9 years ago
|
||
When this is complete, we'll need to update the documentation for client_count: https://wiki.mozilla.org/Telemetry/Available_Telemetry_Datasets_and_their_Applications
Updated•9 years ago
|
Points: --- → 3
Priority: -- → P1
Comment 5•9 years ago
|
||
Hey Jannis - 3 points is "3 days or more" - see our sizing guidelines here: https://docs.google.com/document/d/1YEPaLLYX9CZD0IDenNgHb_hU1_gPTPjdcoRsvORxc1M/edit
Updated•9 years ago
|
Flags: needinfo?(thuelbert)
Updated•9 years ago
|
Assignee: jezdez → nobody
Status: ASSIGNED → NEW
Reporter | ||
Updated•9 years ago
|
Priority: P1 → --
Updated•9 years ago
|
Priority: -- → P3
Updated•8 years ago
|
Component: Metrics: Pipeline → Telemetry APIs for Analysis
Product: Cloud Services → Data Platform and Tools
Assignee | ||
Updated•8 years ago
|
Assignee: nobody → amiyaguchi
Assignee | ||
Comment 6•8 years ago
|
||
I've found a solution to this, which involves registering the UDFs on the scala side of the code.
Steps to reproduce:
1. Remove the package object from com.mozilla.spark.sql.hyperloglog.functions
- py4j doesn't have visibility into functions at this scope for some reason
2. Add sbt-assembly for fat-jar compilation
3. Add a function on the scala side to register all UDFs
4. Add package to spark-submit or pyspark using --jars (and --driver-class-path if in local mode)
5. Before calling the register function, initialize the spark context by building an empty dataframe
- spark.sparkContext._jvm.com.mozilla.spark.sql.hyperloglog.functions.registerUdf()
- I'm not sure why the sql context doesn't initialize when registering the UDF
Patches and code snippets to follow -- however the patchset to spark-hyperloglog is a work in progress. I'd like to write the wrapper library in a way that's compatible with a pip installation of spark.
Assignee | ||
Updated•8 years ago
|
Priority: P3 → P1
Assignee | ||
Comment 7•8 years ago
|
||
I've packaged this library so it is installable via pip. There are a few more tweaks and some documentation left before it is deployable.
Assignee | ||
Comment 8•8 years ago
|
||
Assignee | ||
Updated•7 years ago
|
Priority: P1 → P2
Updated•7 years ago
|
Whiteboard: [DataPlatform]
Assignee | ||
Updated•7 years ago
|
Status: NEW → ASSIGNED
Priority: P2 → P1
Assignee | ||
Updated•7 years ago
|
Status: ASSIGNED → RESOLVED
Closed: 7 years ago
Resolution: --- → FIXED
Updated•3 years ago
|
Component: Telemetry APIs for Analysis → General
You need to log in
before you can comment on or make changes to this bug.
Description
•