Closed
Bug 1355241
Opened 8 years ago
Closed 8 years ago
[churn] dataset generation occasionally fails due to schema inference failure
Categories
(Cloud Services Graveyard :: Metrics: Pipeline, enhancement, P1)
Cloud Services Graveyard
Metrics: Pipeline
Tracking
(Not tracked)
RESOLVED
FIXED
People
(Reporter: amiyaguchi, Assigned: amiyaguchi)
Details
Attachments
(1 file)
In churn, the following line converts a RDD to a Dataframe without a schema using inference.
> records_df = aggregated.map(lambda x: x[0] + x[1]).toDF(record_columns)
However, this occasionally fails due to empty rows. Instead of providing a list of column names, there should be a schema that enforces that all fields should contain values.
```
Py4JJavaErrorTraceback (most recent call last)
<ipython-input-8-323c5bf0cf07> in <module>()
2 week_start = fmt(week_start_date),
3 bucket = bucket,
----> 4 prefix = prefix)
<ipython-input-6-d4b2006554d4> in compute_churn_week(df, week_start, bucket, prefix)
141 aggregated = countable.reduceByKey(reduce_func)
142
--> 143 records_df = aggregated.map(lambda x: x[0] + x[1]).toDF(record_columns)
144
145 # Write to s3 as parquet, file size is on the order of 40MB. We bump the version
/usr/lib/spark/python/pyspark/sql/session.py in toDF(self, schema, sampleRatio)
55 [Row(name=u'Alice', age=1)]
56 """
---> 57 return sparkSession.createDataFrame(self, schema, sampleRatio)
58
59 RDD.toDF = toDF
/usr/lib/spark/python/pyspark/sql/session.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
520
521 if isinstance(data, RDD):
--> 522 rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
523 else:
524 rdd, schema = self._createFromLocal(map(prepare, data), schema)
/usr/lib/spark/python/pyspark/sql/session.py in _createFromRDD(self, rdd, schema, samplingRatio)
358 """
359 if schema is None or isinstance(schema, (list, tuple)):
--> 360 struct = self._inferSchema(rdd, samplingRatio)
361 converter = _create_converter(struct)
362 rdd = rdd.map(converter)
/usr/lib/spark/python/pyspark/sql/session.py in _inferSchema(self, rdd, samplingRatio)
329 :return: :class:`pyspark.sql.types.StructType`
330 """
--> 331 first = rdd.first()
332 if not first:
333 raise ValueError("The first row in RDD is empty, "
/usr/lib/spark/python/pyspark/rdd.py in first(self)
1326 ValueError: RDD is empty
1327 """
-> 1328 rs = self.take(1)
1329 if rs:
1330 return rs[0]
```
| Assignee | ||
Updated•8 years ago
|
Assignee: nobody → amiyaguchi
Points: --- → 1
Priority: -- → P1
| Assignee | ||
Comment 1•8 years ago
|
||
| Assignee | ||
Updated•8 years ago
|
Status: NEW → RESOLVED
Closed: 8 years ago
Resolution: --- → FIXED
Updated•7 years ago
|
Product: Cloud Services → Cloud Services Graveyard
You need to log in
before you can comment on or make changes to this bug.
Description
•