2017-01-13 15 views
0

spark 2.0.2を使用してJSONファイルを寄木張りに変換しようとしています。無効な文字を含むフィールドをSpark 2からParquetにエクスポートする

  • JSONファイルは外部ソースに由来するため、スキーマは到着する前に変更することができません。
  • ファイルには属性のマップが含まれています。ファイルを受け取る前に知っている属性名はありません。
  • 属性名には、寄木細工では使用できない文字が含まれています。
{ 
    "id" : 1, 
    "name" : "test", 
    "attributes" : { 
     "name=attribute" : 10, 
     "name=attribute with space" : 100, 
     "name=something else" : 10 
    } 
} 

両方のスペースと文字が寄木細工で使用することができないに等しく、私は次のエラーを取得する:

 org.apache.spark.sql.AnalysisException: Attribute name "name=attribute" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.;
  • これらはフィールドをネストされているとして、私が使用してそれらの名前を変更することはできませんエイリアス、これは本当ですか?
  • ここに示唆しているように、スキーマ内のフィールドの名前を変更しようとしました:How to rename fields in an DataFrame corresponding to nested JSON。これはしかし、私は今、次のstackoverflowを取得し、いくつかのファイルのために働く:
私は、次のいずれかの操作を実行したい
 
java.lang.StackOverflowError 

at scala.runtime.BoxesRunTime.boxToInteger(BoxesRunTime.java:65) 
at org.apache.spark.scheduler.DAGScheduler.getCacheLocs(DAGScheduler.scala:258) 
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1563) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1579) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1578) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1578) 
at scala.collection.immutable.List.foreach(List.scala:381) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1578) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1576) 
at scala.collection.immutable.List.foreach(List.scala:381) 
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1576) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1579) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1578) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1578) 
at scala.collection.immutable.List.foreach(List.scala:381) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1578) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1576) 
at scala.collection.immutable.List.foreach(List.scala:381) 
... 
repeat 
... 

:私は負荷としてフィールド名から

  • ストリップ無効な文字をスタックを引き起こすことなく、スパーク
  • 変更にデータスキーマの列名が
  • どういうわけか、元のデータをロードするためのスキーマを変更するが、内部的に以下を使用をオーバーフロー:
{ 
    "id" : 1, 
    "name" : "test", 
    "attributes" : [ 
     {"key":"name=attribute", "value" : 10}, 
     {"key":"name=attribute with space", "value" : 100}, 
     {"key":"name=something else", "value" : 10} 
    ] 
} 
+0

解決策はありましたか? – Bhavesh

+0

投稿したソリューションをまだ使用しています – roblovelock

答えて

0

私が働くことを発見した唯一の解決策は、これまでのところ、変更されたスキーマとデータを再ロードすることです。新しいスキーマは属性をマップにロードします。

Dataset<Row> newData = sql.read().json(path); 
StructType newSchema = (StructType) toMapType(newData.schema(), null, "attributes"); 
newData = sql.read().schema(newSchema).json(path); 

private DataType toMapType(DataType dataType, String fullColName, String col) { 
    if (dataType instanceof StructType) { 
     StructType structType = (StructType) dataType; 

     List<StructField> renamed = Arrays.stream(structType.fields()).map(
      f -> toMapType(f, fullColName == null ? f.name() : fullColName + "." + f.name(), col)).collect(Collectors.toList()); 
     return new StructType(renamed.toArray(new StructField[renamed.size()])); 
    } 
    return dataType; 
} 

private StructField toMapType(StructField structField, String fullColName, String col) { 
    if (fullColName.equals(col)) { 
     return new StructField(col, new MapType(DataTypes.StringType, DataTypes.LongType, true), true, Metadata.empty()); 
    } else if (col.startsWith(fullColName)) { 
     return new StructField(structField.name(), toMapType(structField.dataType(), fullColName, col), structField.nullable(), structField.metadata()); 
    } 
    return structField; 

} 
0

私は@:と同じ問題を抱えています。

私たちの場合、私たちはDataFrameのフラット化を解決しました。

val ALIAS_RE: Regex = "[_.:@]+".r 
    val FIRST_AT_RE: Regex = "^_".r 

    def getFieldAlias(field_name: String): String = { 
    FIRST_AT_RE.replaceAllIn(ALIAS_RE.replaceAllIn(field_name, "_"), "") 
    } 

    def selectFields(df: DataFrame, fields: List[String]): DataFrame = { 
    var fields_to_select = List[Column]() 
    for (field <- fields) { 
     val alias = getFieldAlias(field) 
     fields_to_select +:= col(field).alias(alias) 
    } 

    df.select(fields_to_select: _*) 
    } 

したがって、次のJSON:[オブジェクト、スキーマ@タイプ、[email protected]] を形質転換する

{ 
    object: 'blabla', 
    schema: { 
    @type: 'blabla', 
    [email protected]: 'blabla' 
    } 
} 

@ドット(お客様の場合は=)は、SparkSQLに問題が発生します。

SelectFieldsの後には、 [object、schema_type、schema_name_id]で終了することができます。 Flatter DataFrame。

関連する問題