Closed Bug 1355241 Opened 8 years ago Closed 8 years ago

[churn] dataset generation occasionally fails due to schema inference failure

Categories

(Cloud Services Graveyard :: Metrics: Pipeline, enhancement, P1)

enhancement

Tracking

(Not tracked)

RESOLVED FIXED

People

(Reporter: amiyaguchi, Assigned: amiyaguchi)

Details

Attachments

(1 file)

In churn, the following line converts a RDD to a Dataframe without a schema using inference. > records_df = aggregated.map(lambda x: x[0] + x[1]).toDF(record_columns) However, this occasionally fails due to empty rows. Instead of providing a list of column names, there should be a schema that enforces that all fields should contain values. ``` Py4JJavaErrorTraceback (most recent call last) <ipython-input-8-323c5bf0cf07> in <module>() 2 week_start = fmt(week_start_date), 3 bucket = bucket, ----> 4 prefix = prefix) <ipython-input-6-d4b2006554d4> in compute_churn_week(df, week_start, bucket, prefix) 141 aggregated = countable.reduceByKey(reduce_func) 142 --> 143 records_df = aggregated.map(lambda x: x[0] + x[1]).toDF(record_columns) 144 145 # Write to s3 as parquet, file size is on the order of 40MB. We bump the version /usr/lib/spark/python/pyspark/sql/session.py in toDF(self, schema, sampleRatio) 55 [Row(name=u'Alice', age=1)] 56 """ ---> 57 return sparkSession.createDataFrame(self, schema, sampleRatio) 58 59 RDD.toDF = toDF /usr/lib/spark/python/pyspark/sql/session.py in createDataFrame(self, data, schema, samplingRatio, verifySchema) 520 521 if isinstance(data, RDD): --> 522 rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio) 523 else: 524 rdd, schema = self._createFromLocal(map(prepare, data), schema) /usr/lib/spark/python/pyspark/sql/session.py in _createFromRDD(self, rdd, schema, samplingRatio) 358 """ 359 if schema is None or isinstance(schema, (list, tuple)): --> 360 struct = self._inferSchema(rdd, samplingRatio) 361 converter = _create_converter(struct) 362 rdd = rdd.map(converter) /usr/lib/spark/python/pyspark/sql/session.py in _inferSchema(self, rdd, samplingRatio) 329 :return: :class:`pyspark.sql.types.StructType` 330 """ --> 331 first = rdd.first() 332 if not first: 333 raise ValueError("The first row in RDD is empty, " /usr/lib/spark/python/pyspark/rdd.py in first(self) 1326 ValueError: RDD is empty 1327 """ -> 1328 rs = self.take(1) 1329 if rs: 1330 return rs[0] ```
Assignee: nobody → amiyaguchi
Points: --- → 1
Priority: -- → P1
Status: NEW → RESOLVED
Closed: 8 years ago
Resolution: --- → FIXED
Product: Cloud Services → Cloud Services Graveyard
You need to log in before you can comment on or make changes to this bug.

Attachment

General

Created:
Updated:
Size: