2016-05-04 4 views
0

PigがUDFオブジェクトをインスタンス化する方法を教えてください。 Pigを使ってデータを処理するパイプラインを構築しました。私はマルチノードHadoopクラスタにパイプラインを導入しました。そして、パイプラインの各ステップの後に生成されるすべての中間結果を保存します。そこで私は初期化時にHTTP接続を開き、データをexecに送信するJavaでUDFを書いた。また、オブジェクトのfinalizeの接続を閉じます。PigがUDFオブジェクトをインスタンス化する方法

私のスクリプトは以下のように簡略化することができます。

REGISTER MyPackage.jar; 
DEFINE InterStore test.InterStore('localhost', '58888'); 
DEFINE Clean  test.Clean(); 

raw = LOAD 'mydata'; 
cleaned = FILTER (FOREACH raw GENERATE FLATTEN(Clean(*))) BY NOT ($0 MATCHES ''); 
cleaned = FOREACH cleaned GENERATE FLATTEN(InterStore(*)); 
named = FOREACH cleaned GENERATE $1 AS LocationID, $2 AS AccessCount; 
named = FOREACH named GENERATE FLATTEN(InterStore(*)) AS (LocationID, AccessCount); 
grp = GROUP named BY LocationID; 
grp = FOREACH grp GENERATE FLATTEN(InterStore(*)) AS (group, named:{(LocationID, AccessCount)}); 
sum = FOREACH grp GENERATE group AS LocationID, SUM(named.AccessCount) AS TotalAccesses; 
sum = FOREACH sum GENERATE FLATTEN(InterStore(*)) AS (LocationID, TotalAccesses); 
ordered = ORDER sum BY TotalAccesses DESC; 
STORE ordered INTO 'result'; 

そしてInterStoreのためのコードは以下のように簡略化することができます。

class InterStore extends EvalFunc<Tuple>{ 
    HttpURLConnection con; //Avoid redundant connection establishment in exec 
    public InterStore(String ip, String port) throws IOException 
    { 
    URL url = new URL("http://" + ip + ':' + port); 
    con = (HttpURLConnection)url.openConnection(); 
    con.setRequestMethod("PUT"); 
    con.setDoOutput(true); 
    con.setDoInput(true); 
    } 
    public Tuple exec(Tuple input) throws IOException 
    { 
    con.getOutputStream().write((input.toDelimitedString(",")+'\n').getBytes()); 
    return input; 
    } 
    @Override 
    protected void finalize() throws Throwable 
    { 
    con.getOutputStream().close(); 
    int respcode = con.getResponseCode(); 
    BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream())); 
    System.out.printf("Resp Code:%d, %s\n", respcode, in.readLine()); 
    in.close(); 
    } 
} 

はしかし、私は、HTTP接続が成功したとしてデータを送信することができないことがわかりましたローカルモードで動作します。それに対処する方法?

+0

こんにちは、試した場所でスクリプトを共有できるのであればあなたのUDFを使用して助けてください。 – kecso

+0

いくつかのコードサンプルを追加しました。 Thx〜 – Trams

答えて

0

'localhost'、 '58888'でリッスンするサービスはありますか?ローカルホストは、各実行ノードによって異なっている、あなたはこれをしたいかもしれないと

注:

%default LHOST `localhost` 

、パラメータとして一般的に

DEFINE InterStore test.InterStore('$LHOST', '58888'); 

を、この変数を使用する私はいくつかのプリントアウトを行うだろうUDFで渡されたパラメータを再確認し、接続をテストする(pingのように、ハープープノードからポートにアクセスできるかどうかをチェックする)

関連する問題