2016-06-01 17 views
1

データを以下次のよう
Apache Sparkは非構造化マルチラインデータを処理しますか?

make,Model,MPG,Cylinders,Engine Disp,Horsepower,Weight,Accelerate,Year,Origin<br> 
amc,amc ambassador dpl,15,8,390,190,3850,8.5,70,Indian<br> 
amc,amc gremlin,21,6,199,90,2648,15,70,Indian<br> 
amc,amc hornet,18,6,199,97,2774,15.5,70,Indian<br> 
amc,amc rebel sst,16,8,304,150,3433,12,70,Indian<br> 
............. 
............. 
............. 

次に上記

val rawData=sc.textFile("/hdfs/spark/cars2.txt") <br> 
case class cars(make:String, model:String, mpg:Integer, cylinders :Integer, engine_disp:Integer, horsepower:Integer,weight:Integer ,accelerate:Double, year:Integer, origin:String)<br> 
val carsData=rawData.map(x=>x.split(",")).map(x=>cars(x(0).toString,x(1).toString,x(2).toInt,x(3).toInt,x(4).toInt,x(5).toInt,x(6).toInt,x(7).toDouble,x(8).toInt,x(9).toString))<br> 
carsData.take(2)<br> 
carsData.cache()<br> 
carsData.map(x=>(x.origin,1)).reduceByKey((x,y)=>x+y).collect<br> 
val indianCars=carsData.filter(x=>(x.origin=="Indian"))<br> 
indianCars.count() <br> 
val makeWeightSum=indianCars.map(x=>(x.make,x.weight.toInt)).combineByKey((x:Int) => (x, 1),(acc:(Int, Int), x) => (acc._1 + x, acc._2 + 1),(acc1:(Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2))<br> 
makeWeightSum.collect()<br> 
val makeWeightAvg=makeWeightSum.map(x=>(x._1,(x._2._1/x._2._2)))<br> 
makeWeightAvg.collect()<br> 
makeWeightAvg.saveAsTextFile(“carsMakeWeightAvg.txt”)<br> 

下に示すように、私は今はHIVEにこの分析を行うことができスカラとスパークと幸せ処理した純粋に構造化されたデータでありますまた、なぜ私はスパークが必要なのですか(スパークは速くて、本当にロケットで旅行したいと思うかもしれません)。そこで問題は、以下に示すようにSPARKプロセス、マルチライン、非構造化データを行いますされています データ:

Brand:Nokia, Model:1112, price:100, time:201604091,<br> 
redirectDomain:xyz.com, type:online,status:completed,<br> 
tx:credit,country:in,<br> 

Brand:samsung, Model:s6, price:5000, time:2016045859,<br> 
redirectDomain:abc.com, type:online,status:completed,<br> 

.....thousands of records... 

答えて

2

はい、スパークはそれを行うために使用しなければなりません。

DataFrameは名前付き列に編成されたデータの分散コレクションです。 Spark SQLは、DataFrame interfaceを介してさまざまなデータソースでの操作をサポートしています。 このようなデータのデータソースはManually Specify Optionsです。

は参照してください:Spark DataFramesmutli-line input in spark

注:あなたのデータには、構造化されていないではありません。それはcsvファイルのようなもので、基本的な変換をほとんど実行しないと、データセット/データフレームに変換される可能性があります。

可能なさまざまなツールやフレームワークをテストしているだけの場合は、Apache Flinkもお勧めします。

+0

そのcsvファイルが、私の質問はちょうど記事のようなparagrapgh方法で編成された複数行のデータである非構造化データないスパークプロセスであり、ニュースペーパーで? – user2192023

+0

はい、いくつかの変換を実行するだけで済みます。 –

1

通常、スパークは行ごとに行を読み取ります。したがって、rawData.mapは、"、"各テキスト行で分割されます。したがって、構造化されていない複数行のデータは失敗します。

複数行のCSVを使用している場合は、すべてのファイルをまとめて読み取り、独自のCSVパーサーを実装してマルチラインを処理する必要があります。これは、O'really目的から

Learning Sparkブック以下のアプローチ:

val input = sc.wholeTextFiles(inputFile) 
val result = input.flatMap{ case (_, txt) => 
    val reader = new CSVReader(new StringReader(txt)); 
    reader.readAll().map(x => IndianCar(x(0), x(1), ...))) 
} 
関連する問題