from pyspark.sql.functions import udf,col
from pyspark.sql.types import StringType, IntegerType, ArrayType
data = ['athshgthsc asl','sdf sdfdsadf sdf', 'arasdfa sdf','aa bb','aaa bbb ccc','dd aa bbb']
df = sqlContext.createDataFrame(data,StringType())
def getLenghts(lst):
tempLst = []
for ele in lst:
tempLst.append(len(ele))
return tempLst
getList = udf(lambda data:data.split(),StringType())
getListLen = udf(getLenghts,ArrayType(IntegerType()))
getMaxLen = udf(lambda data:max(data),IntegerType())
df = (df.withColumn('splitWords',getList(df.value))
.withColumn('lengthList',getListLen(col('splitWords')))
.withColumn('maxLen',getMaxLen('lengthList')))
df.filter(df.maxLen<5).select('value').show()
+----------------+
| value|
+----------------+
| athshgthsc asl|
|sdf sdfdsadf sdf|
| arasdfa sdf|
| aa bb|
| aaa bbb ccc|
| dd aa bbb|
+----------------+
+----------------+--------------------+----------+------+
| value| splitWords|lengthList|maxLen|
+----------------+--------------------+----------+------+
| athshgthsc asl| [athshgthsc, asl]| [10, 3]| 10|
|sdf sdfdsadf sdf|[sdf, sdfdsadf, sdf]| [3, 8, 3]| 8|
| arasdfa sdf| [arasdfa, sdf]| [7, 3]| 7|
| aa bb| [aa, bb]| [2, 2]| 2|
| aaa bbb ccc| [aaa, bbb, ccc]| [3, 3, 3]| 3|
| dd aa bbb| [dd, aa, bbb]| [2, 2, 3]| 3|
+----------------+--------------------+----------+------+
+-----------+
| value|
+-----------+
| aa bb|
|aaa bbb ccc|
| dd aa bbb|
+-----------+
長さを15に保つように変更することもできます。データセットを分割する前に、さらに多くの前処理を実行できます。私のために、私はフィルターをかけて長さ> 5を保ってきました。