2017-08-28 19 views
1

Ignite 2.1.0を使用していて、DataStreamerを試すための簡単なプログラムを作成しましたが、このようなエラーが発生しました。 "[診断]パーティションマップの交換を待つことができませんでした"または "書き込みロックを保持していない状態で解放します。 私は2つのローカルノードを開始しました.1つはWindows xml構成でのCMDで起動し、もう1つはEclipseで起動しました。DataStreamerが正常に動作しない

public class TestDataStreamer { 

    public static void main(String[] args) { 
     // TODO Auto-generated method stub 
     long bgn,end; 
     IgniteConfiguration cfg = new IgniteConfiguration(); 
     cfg.setPeerClassLoadingEnabled(true); 
     Ignite ignite = Ignition.start(cfg); 
     CacheConfiguration<Long, Map> cacheConf = new CacheConfiguration(); 
     cacheConf.setName("TestDataStreamer").setCacheMode(CacheMode.REPLICATED); 
     cacheConf.setBackups(0); 
     IgniteCache cache = ignite.getOrCreateCache(cacheConf); 
     cache.clear(); 
     File dataFile = new File("D:/data/1503307171374.data"); //10,000,000 rows text data 
     bgn = System.currentTimeMillis(); 
     try { 
      loadByStreamer(dataFile,ignite,"TestDataStreamer"); 
     } catch (Exception e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } finally { 
      end = System.currentTimeMillis(); 
      System.out.println("---------------"); 
      System.out.println((end-bgn)/1000.0+" s"); 
     } 
     cache.destroy(); 
     System.out.println("cache destroy..."); 
     ignite.close(); 
     System.out.println("finish"); 
    } 

    private static void loadByStreamer(File dataFile, Ignite ignite, String cacheName) throws Exception { 
     IgniteDataStreamer<Long,TestObj> ds = ignite.dataStreamer(cacheName); 
     //ds.allowOverwrite(true); 
     ds.autoFlushFrequency(10000); 
     ds.perNodeBufferSize(4096); 
     BufferedReader br = new BufferedReader(new InputStreamReader(
       new FileInputStream(dataFile),"UTF-8")); 
     String line = null; 
     long count = 0; 
     while((line=br.readLine())!=null){ 
      ds.addData(System.currentTimeMillis(), parseData(line, Constants.DEFAULT_SEPARATOR, 
        "id,sn,type_code,trade_ts,bill_ts,company_code,company_name,biz_type,charge_amt,pay_mode".split(","))); 
      if(++count%10000==0){ 
       System.out.println(count+" loaded..."); 
      } 
      //System.out.println(count+":"+line); 
     } 
     System.out.println("flushing..."); 
     ds.flush(); 
     System.out.println("flushed"); 
     br.close(); 
     ds.close(); 
     System.out.println("file handled..."); 
    } 

    private static TestObj parseData(String data, String saperator, String[] fields){ 
     TestObj obj = new TestObj(); 
     if(data!=null && saperator.trim().length()>0){ 
      String[] values = data.split(saperator); 
      obj.setId(values[0]); 
      obj.setSn(values[1]); 
      obj.setType_code(values[2]); 
      obj.setTrade_ts(values[3]); 
      obj.setBill_ts(values[4]); 
      obj.setCompany_code(values[5]); 
      obj.setCompany_name(values[6]); 
      obj.setBiz_type(values[7]); 
      obj.setCharge_amt(values[8]); 
      obj.setPay_mode(values[9]); 
     } 
     return obj; 
    } 
} 

class TestObj { 
    private String id; 
    private String sn; 
    private String type_code; 
    private String trade_ts; 
    private String bill_ts; 
    private String company_code; 
    private String company_name; 
    private String biz_type; 
    private String charge_amt; 
    private String pay_mode; 
    public String getId() { 
     return id; 
    } 
    public void setId(String id) { 
     this.id = id; 
    } 
    public String getSn() { 
     return sn; 
    } 
    public void setSn(String sn) { 
     this.sn = sn; 
    } 
    public String getType_code() { 
     return type_code; 
    } 
    public void setType_code(String type_code) { 
     this.type_code = type_code; 
    } 
    public String getTrade_ts() { 
     return trade_ts; 
    } 
    public void setTrade_ts(String trade_ts) { 
     this.trade_ts = trade_ts; 
    } 
    public String getBill_ts() { 
     return bill_ts; 
    } 
    public void setBill_ts(String bill_ts) { 
     this.bill_ts = bill_ts; 
    } 
    public String getCompany_code() { 
     return company_code; 
    } 
    public void setCompany_code(String company_code) { 
     this.company_code = company_code; 
    } 
    public String getCompany_name() { 
     return company_name; 
    } 
    public void setCompany_name(String company_name) { 
     this.company_name = company_name; 
    } 
    public String getBiz_type() { 
     return biz_type; 
    } 
    public void setBiz_type(String biz_type) { 
     this.biz_type = biz_type; 
    } 
    public String getCharge_amt() { 
     return charge_amt; 
    } 
    public void setCharge_amt(String charge_amt) { 
     this.charge_amt = charge_amt; 
    } 
    public String getPay_mode() { 
     return pay_mode; 
    } 
    public void setPay_mode(String pay_mode) { 
     this.pay_mode = pay_mode; 
    } 
} 

ノードをCMDで起動して1つのノードでプログラムを実行すると、正常に動作します。 誰かが私を助けることができますか?

+0

使用しているjdkのバージョンはどれですか? –

+0

1.8.0_144,64ビットの – CrazyRen

+0

両方のノードが同じバージョンの1.8.0_144,64ビットを使用していることを確認してください。 –

答えて

1

例えば1.8.0_144のような同じバージョンの両方のノードのjdkを更新する(すでにインストール済み)か、少なくともidkを1.7の最新バージョンに更新するようにしてください。 。

人がかなり同じ例外に直面し、javaバージョンの更新が彼らを修正するのを助けたとき、thread on Ignite user listがあります。

+0

私はWindows CMDの代わりにVMMでLinuxを使用してみました.LinuxとEclipseの両方でjdk 1.8.0_144を使用してもOKです。どうもありがとう。 – CrazyRen

関連する問題