PigがUDFオブジェクトをインスタンス化する方法を教えてください。 Pigを使ってデータを処理するパイプラインを構築しました。私はマルチノードHadoop
クラスタにパイプラインを導入しました。そして、パイプラインの各ステップの後に生成されるすべての中間結果を保存します。そこで私は初期化時にHTTP接続を開き、データをexec
に送信するJavaでUDFを書いた。また、オブジェクトのfinalize
の接続を閉じます。PigがUDFオブジェクトをインスタンス化する方法
私のスクリプトは以下のように簡略化することができます。
REGISTER MyPackage.jar;
DEFINE InterStore test.InterStore('localhost', '58888');
DEFINE Clean test.Clean();
raw = LOAD 'mydata';
cleaned = FILTER (FOREACH raw GENERATE FLATTEN(Clean(*))) BY NOT ($0 MATCHES '');
cleaned = FOREACH cleaned GENERATE FLATTEN(InterStore(*));
named = FOREACH cleaned GENERATE $1 AS LocationID, $2 AS AccessCount;
named = FOREACH named GENERATE FLATTEN(InterStore(*)) AS (LocationID, AccessCount);
grp = GROUP named BY LocationID;
grp = FOREACH grp GENERATE FLATTEN(InterStore(*)) AS (group, named:{(LocationID, AccessCount)});
sum = FOREACH grp GENERATE group AS LocationID, SUM(named.AccessCount) AS TotalAccesses;
sum = FOREACH sum GENERATE FLATTEN(InterStore(*)) AS (LocationID, TotalAccesses);
ordered = ORDER sum BY TotalAccesses DESC;
STORE ordered INTO 'result';
そしてInterStoreのためのコードは以下のように簡略化することができます。
class InterStore extends EvalFunc<Tuple>{
HttpURLConnection con; //Avoid redundant connection establishment in exec
public InterStore(String ip, String port) throws IOException
{
URL url = new URL("http://" + ip + ':' + port);
con = (HttpURLConnection)url.openConnection();
con.setRequestMethod("PUT");
con.setDoOutput(true);
con.setDoInput(true);
}
public Tuple exec(Tuple input) throws IOException
{
con.getOutputStream().write((input.toDelimitedString(",")+'\n').getBytes());
return input;
}
@Override
protected void finalize() throws Throwable
{
con.getOutputStream().close();
int respcode = con.getResponseCode();
BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
System.out.printf("Resp Code:%d, %s\n", respcode, in.readLine());
in.close();
}
}
はしかし、私は、HTTP接続が成功したとしてデータを送信することができないことがわかりましたローカルモードで動作します。それに対処する方法?
こんにちは、試した場所でスクリプトを共有できるのであればあなたのUDFを使用して助けてください。 – kecso
いくつかのコードサンプルを追加しました。 Thx〜 – Trams