1
私はSparkのデータフレームからCassandraに書き込もうとしています。私は、単純なデータフレームのスキーマを持っている場合は、例のように、それが動作します:spark-cassandra-connector - Dataframe - StructTypeからテーブルを作成していますか?
root
|-- id: string (nullable = true)
|-- url: string (nullable = true)
をしかし、私はこのようなスキーマで、StructTypesを含んでデータフレームを書き込みしようとすると:
root
|-- crawl: struct (nullable = true)
| |-- id: string (nullable = true)
その後私は、次の例外を取得:
val df = sqlContext.read.parquet(input)
df.createCassandraTable(keyspace, table)
df.write
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> table, "keyspace" -> keyspace))
.save()
:
Exception in thread "main" java.lang.IllegalArgumentException: Unsupported type: StructType(StructField(id,StringType,true))
at com.datastax.spark.connector.types.ColumnType$.unsupportedType$1(ColumnType.scala:132)
at com.datastax.spark.connector.types.ColumnType$.fromSparkSqlType(ColumnType.scala:155)
at com.datastax.spark.connector.mapper.DataFrameColumnMapper$$anonfun$1.apply(DataFrameColumnMapper.scala:18)
at com.datastax.spark.connector.mapper.DataFrameColumnMapper$$anonfun$1.apply(DataFrameColumnMapper.scala:16)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at com.datastax.spark.connector.mapper.DataFrameColumnMapper.newTable(DataFrameColumnMapper.scala:16)
at com.datastax.spark.connector.cql.TableDef$.fromDataFrame(Schema.scala:215)
at com.datastax.spark.connector.DataFrameFunctions.createCassandraTable(DataFrameFunctions.scala:26)
私のコードは次のようになります
ヘルプ?
私は同じ問題を抱えていますが、「あなたの構造に合わせて手動で新しいタイプを作成してください」ということはどうですか?これについてもっと詳しく教えてください。 – Omid
意味ネストされた構造体はudtを定義します。 c *でUDTを定義し、そのUDTを含む表を手動で作成する必要があります。 – RussS