Closed Bug 1305087 Opened 9 years ago Closed 7 years ago

spark-hyperloglog functions should be callable from pyspark

Categories

(Data Platform and Tools :: General, defect, P1)

defect
Points:
3

Tracking

(Not tracked)

RESOLVED FIXED

People

(Reporter: rvitillo, Assigned: amiyaguchi)

References

Details

(Whiteboard: [DataPlatform])

User Story

The spark-hyperloglog [1] package is used by telemetry-batch-view to generate the client_count dataset. We would like to invoke the same set of functionality from Python as well. Ideally we would like to have that package accessible by default by Spark clusters launched by our next-gen atmo.

Saptarshi made an initial attempt (quoting from his e-mail):

========================================================================================

1. Use emr-5.0.0
2. Start the shell with "pyspark --packages vitillo:spark-hyperloglog:1.1.1"
3. Run the following code:

from pyspark.sql.column import Column, _to_java_column, _to_seq
from pyspark.sql import Row
from pyspark.sql.types import LongType
import pyspark.sql.functions as F

def hlm(col):
    sc = SparkContext._active_spark_context
    _hll_merge = sc._jvm.com.mozilla.spark.sql.hyperloglog.aggregates.HyperLogLogMerge().apply
    return  Column(_hll_merge(_to_seq(sc, [col], _to_java_column)))


def hlc(col):
    sc = SparkContext._active_spark_context
    _hll_cardinality = sc._jvm.com.mozilla.spark.sql.hyperloglog.functions.package.hllCardinality
    return Column( _hll_cardinality(col))

sqlContext.registerFunction("hllCardinality",hlc,returnType=LongType())

ff=sqlContext.read.parquet("s3://telemetry-parquet/client_count/v2016032120160921/")
ff1=ff.filter(ff.activity_date=='2016-09-01')
ff2=ff1.agg(hlm("hll").alias("mhll"))

>>> ff2

DataFrame[mhll: binary]

ff2.selectExpr("hllCardinality(mhll)").collect()

and then i get (after nearly 100% done)

  File "<stdin>", line 3, in hlc
AttributeError: 'NoneType' object has no attribute '_jvm'

Because 'sc' is not present on worker nodes.

========================================================================================


[1] https://github.com/vitillo/spark-hyperloglog
[2] https://github.com/mozilla/telemetry-batch-view/blob/master/src/main/scala/com/mozilla/telemetry/views/ClientCountView.scala

Attachments

(2 files)

No description provided.
User Story: (updated)
User Story: (updated)
Blocks: 1255748
Assignee: nobody → jezdez
One more update In the driver node, u = ff2.collect() finalCount = sc._jvm.com.mozilla.spark.sql.hyperloglog.functions.package.hllCardinality(u[0].mhll) finalCount is the correct number.
Status: NEW → ASSIGNED
When this is complete, we'll need to update the documentation for client_count: https://wiki.mozilla.org/Telemetry/Available_Telemetry_Datasets_and_their_Applications
Points: --- → 3
Priority: -- → P1
@thuelbert: What do 3 points mean exactly?
Flags: needinfo?(thuelbert)
Hey Jannis - 3 points is "3 days or more" - see our sizing guidelines here: https://docs.google.com/document/d/1YEPaLLYX9CZD0IDenNgHb_hU1_gPTPjdcoRsvORxc1M/edit
Flags: needinfo?(thuelbert)
Assignee: jezdez → nobody
Status: ASSIGNED → NEW
Priority: P1 → --
Priority: -- → P3
Component: Metrics: Pipeline → Telemetry APIs for Analysis
Product: Cloud Services → Data Platform and Tools
Assignee: nobody → amiyaguchi
I've found a solution to this, which involves registering the UDFs on the scala side of the code. Steps to reproduce: 1. Remove the package object from com.mozilla.spark.sql.hyperloglog.functions - py4j doesn't have visibility into functions at this scope for some reason 2. Add sbt-assembly for fat-jar compilation 3. Add a function on the scala side to register all UDFs 4. Add package to spark-submit or pyspark using --jars (and --driver-class-path if in local mode) 5. Before calling the register function, initialize the spark context by building an empty dataframe - spark.sparkContext._jvm.com.mozilla.spark.sql.hyperloglog.functions.registerUdf() - I'm not sure why the sql context doesn't initialize when registering the UDF Patches and code snippets to follow -- however the patchset to spark-hyperloglog is a work in progress. I'd like to write the wrapper library in a way that's compatible with a pip installation of spark.
Priority: P3 → P1
I've packaged this library so it is installable via pip. There are a few more tweaks and some documentation left before it is deployable.
Blocks: 1408225
Priority: P1 → P2
Blocks: 1450329
Whiteboard: [DataPlatform]
Status: NEW → ASSIGNED
Priority: P2 → P1
Status: ASSIGNED → RESOLVED
Closed: 7 years ago
Resolution: --- → FIXED
See Also: → 1465240
Component: Telemetry APIs for Analysis → General
You need to log in before you can comment on or make changes to this bug.

Attachment

General

Created:
Updated:
Size: