2017-11-22 12 views
0

保存の方法&読み取りスパークDataFrame/DataSetはApache Igniteにありますか?私は他の同様の質問で与えられた様々な解決策を試しましたが、最新の点火とスパークバージョンでは何も働いていません。 (私はスカラ2.11を使用しています) ありがとうございました。Apache IgniteでSpark DataFrame/DataSetを保存して読み込むには?

更新(コードを追加):

<?xml version="1.0" encoding="UTF-8"?>   
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation=" 
    http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans.xsd"> 

<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> 
    <property name="cacheConfiguration"> 
     <!-- SharedRDD cache example configuration (Atomic mode). --> 
     <bean class="org.apache.ignite.configuration.CacheConfiguration"> 
      <!-- Set a cache name. --> 
      <property name="name" value="sharedRDD"/> 
      <!-- Set a cache mode. --> 
      <property name="cacheMode" value="PARTITIONED"/> 

      <!-- Set atomicity mode. --> 
      <property name="atomicityMode" value="ATOMIC"/> 
      <!-- Configure a number of backups. --> 
      <property name="backups" value="1"/> 
     </bean> 
    </property> 

    <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. --> 
    <property name="discoverySpi"> 
     <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> 
      <property name="ipFinder"> 
       <!-- 
        Ignite provides several options for automatic discovery that can be used 
        instead os static IP based discovery. For information on all options refer 
        to our documentation: http://apacheignite.readme.io/docs/cluster-config 
       --> 
       <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. --> 
       <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">--> 
       <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> 
        <property name="addresses"> 
         <list> 
          <!-- In distributed environment, replace with actual host IP address. --> 
          <value>127.0.0.1:47500..47509</value> 
         </list> 
        </property> 
       </bean> 
      </property> 
     </bean> 
    </property> 
</bean> 

IgniteCacheコード(これは、DFを置き、また、RDDに変換し、それを読み取るしようとした):

object SparkIgniteCache { 
private val CONFIG = "config/cache.xml" 

import org.apache.ignite.IgniteCache 
import org.apache.ignite.binary.BinaryObject 
import org.apache.ignite.cache.CacheAtomicityMode 
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction 
import org.apache.ignite.configuration.CacheConfiguration 


private[sample] def set(sc: SparkContext, df: DataFrame, KEY: String){ 
val ic = new IgniteContext(sc, CONFIG, false) 

// FAILED ATTEMPT OF SETTING CONFIG : 1 
// val cacheConfiguration: CacheConfiguration[String, Row] = new CacheConfiguration[String, Row](KEY) 
//  .setAtomicityMode(CacheAtomicityMode.ATOMIC).setBackups(0) 
//  .setAffinity(new RendezvousAffinityFunction(false, 2)) 
//  .setIndexedTypes(classOf[String], classOf[Row]) 
// 
// val rddCache = ic.ignite.getOrCreateCache(cacheConfiguration) 

// FAILED ATTEMPT OF SETTING CONFIG : 2 
// val cacheConfiguration: CacheConfiguration[String, BinaryObject] = new CacheConfiguration[String, BinaryObject](KEY) 
//  .setAtomicityMode(CacheAtomicityMode.ATOMIC).setBackups(0) 
//  .setAffinity(new RendezvousAffinityFunction(false, 2)) 
//  .setIndexedTypes(classOf[String], classOf[BinaryObject]) 
// 
// val rddCache = ic.ignite.getOrCreateCache(cacheConfiguration) 

val sharedRDD = ic.fromCache[String, Row](KEY) 
sharedRDD.saveValues(df.rdd) 
} 

private[sample] def get(sc: SparkContext, KEY: String) = { 
    val ic = new IgniteContext(sc, CONFIG, false) 
    // val cacheConfiguration: CacheConfiguration[String, Row] = new CacheConfiguration[String, Row](KEY) 
    //  .setAtomicityMode(CacheAtomicityMode.ATOMIC).setBackups(0) 
    //  .setAffinity(new RendezvousAffinityFunction(false, 2)) 
    //  .setIndexedTypes(classOf[String], classOf[Row]) 
    // 
    // val rddCache = ic.ignite.getOrCreateCache(cacheConfiguration) 
ic.fromCache[String, Row](KEY) 
} 
} 
+0

https://stackoverflow.com/questions/47087005/cannot-write-save-data-to-ignite-directly-from-a-spark-rdd私はそれがあなたを助けると思う。 –

+0

ありがとうございますが、動作しません – aks

+0

何か問題がありますか? –

答えて

0

私ができる午前上記の問題を次の方法で解決してください:

XMLファイルundキャッシュコンフィグレーションノード:

<property name="indexedTypes"> 
    <list> 
     <value>java.lang.String</value> 
     <value>org.apache.spark.sql.Row</value> 
    </list> 
</property> 

DataFrame [行]タイプのデータフレームを保存したいが、これはまだigniteではできません。ただし、RDD [行]を保存して保存するには、ペア形式で保存する必要があります。ですから、私はRDD [行]をRDD [(文字列、行)]に変換する必要があります。 CacheConfigurationでそれを表現するために、上記のようにIndexTypesを追加しました。

また、データフレームのスキーマを後でデータフレームに変換できるように、データと共に保存する必要があります。ここ

保存するためのコードである/ DFを読み取る:

object SparkIgniteCache { 
    private val CONFIG = "config/cache.xml" 
    private val schemaCacheConfig = makeSchemaCacheConfig("schemas") 


    private[sample] def set(sc: SparkContext, df: DataFrame, KEY: String){ 
     val ic = new IgniteContext(sc, CONFIG, false) 
     val sharedRDD = ic.fromCache[String, Row](KEY) 

     val rddSchemaCache = ic.ignite.getOrCreateCache(schemaCacheConfig) 

     rddSchemaCache.put(KEY+"_schema", df.schema) 

     sharedRDD.saveValues(df.rdd) 
    } 

    private[sample] def get(sc: SparkContext, KEY: String) 
         : (StructType, IgniteRDD[String, Row]) = 
    { 
     val ic = new IgniteContext(sc, CONFIG, false) 
     val rddSchemaCache = ic.ignite.getOrCreateCache(schemaCacheConfig) 
     (rddSchemaCache.get(KEY+"_schema"), ic.fromCache[String, Row](KEY))  
    } 

    private def makeSchemaCacheConfig(name: String) = 
     new CacheConfiguration[String, StructType](name) 
      .setAtomicityMode(CacheAtomicityMode.ATOMIC) 
      .setBackups(1) 
      .setAffinity(new RendezvousAffinityFunction(false, 1)) 

}上記のコードで

、Iはまた、作成した動的CacheConfiguraitonをタイプたSchemaTypeのデータフレームのスキーマを保存します。

 // Set data/dataframe for KEY=input_data 
    SparkIgniteCache.set(spark.sparkContext, df, "input_data") 

    //Get dataframe 
    val (schema, igniteRDD) = SparkIgniteCache.get(spark.sparkContext, "input_data") 
    val rdd1: RDD[Row] = igniteRDD.map(_._2) //Getting Row from (String,Row) 
    val df = spark.sqlContext.createDataFrame(rdd1, schema) 

ありがとう:

は今、あなただけの設定を呼び出し、以下のようなメソッドを取得する必要があります。

関連する問題