2013-02-01 3 views
7

顧客コールのデータを保持するハイブテーブルがあります。 簡略化のため、2つの列があり、最初の列は顧客IDを保持し、2番目の列は呼び出しのタイムスタンプ(UNIXのタイムスタンプ)を保持します。それが返すハイブクエリを作成することが可能です各顧客のために、ハイブクエリでのHadoopの連続レコードの相違の計算

Customer1 timestamp11 
Customer1 timestamp12 
Customer1 timestamp13 
Customer2 timestamp21 
Customer3 timestamp31 
Customer3 timestamp32 
... 

SELECT * FROM mytable SORT BY customer_id, call_time; 

結果は次のとおりです。

私は、各顧客のすべてのコールを見つけるために、このテーブルを照会することができます2回目の呼び出しから、2回の連続した呼び出しの時間間隔を開始しますか?上記の例の場合 そのクエリが返す必要があります:

Customer1 timestamp12-timestamp11 
Customer1 timestamp13-timestamp12 
Customer3 timestamp32-timestamp31 
... 

を私はsql solutionからソリューションを適応しようとしましたが、私はハイブの制限がこだわっている:it accepts subqueries only in FROMjoins must contain only equalities

ありがとうございます。

EDIT1:

public class DeltaComputerUDF extends UDF { 
private String previousCustomerId; 
private long previousCallTime; 

public String evaluate(String customerId, LongWritable callTime) { 
    long callTimeValue = callTime.get(); 
    String timeDifference = null; 

    if (customerId.equals(previousCustomerId)) { 
     timeDifference = new Long(callTimeValue - previousCallTime).toString(); 
    } 

    previousCustomerId = customerId; 
    previousCallTime = callTimeValue; 

    return timeDifference; 
}} 

と名 "デルタ" とそれを使用します。

私はハイブUDF関数を使用しようとしました。

しかし、(ログと結果から)MAP時に使用されているようです。 2つの問題はここから生じ:

まず:表のデータは、この機能を使用する前に、顧客IDとタイムスタンプでソートする必要があります。クエリ:

SELECT customer_id, call_time, delta(customer_id, call_time) FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time; 

ソート部分は、私の機能が使用されてから長い間、REDUCE時に実行されるため、機能しません。

関数を使用する前にテーブルデータを並べ替えることができますが、これはオーバーヘッドであるため避けたいと思っています。

セカンド:分散Hadoopの構成の場合には、データが利用可能なジョブトラッカー間で分割される。したがって、この機能の複数のインスタンスが各マッパーに1つずつ存在すると考えられます。したがって、同じ顧客データを2つのマッパーの間で分割することは可能です。この場合、私は顧客の電話を紛失しますが、それは受け入れられません。

この問題の解決方法はわかりません。 DISTRIBUTE BYは、特定の値を持つすべてのデータが同じレデューサーに送信されることを保証します(したがって、SORTが期待通りに機能することを保証します)。

次は、libjackがreduceスクリプトを使用することを提案します。他のハイブクエリの間にこの "計算"が必要なので、Balaswamy vaddemanの示唆するように、別のツールに移る前にハイブのすべてのものを試してみたい。

EDIT2:

私は、カスタムスクリプトソリューションを検討し始めました。

ストリーミングは通常、同等のUDFまたは のInputFormatオブジェクトをコーディングするよりも効率が低い。しかし、プログラミングハイブブックの章14(この章では、カスタムスクリプトを提示)の最初のページに、私は、次の段落を発見しました。それを渡すためにデータをシリアライズおよびデシリアライズし、パイプから を外すことは比較的非効率的です。統合された方法で プログラム全体をデバッグすることも難しくなります。ただし、高速プロトタイピング と、Javaで書かれていない既存のコードを活用するのに便利です。 Javaコードを作成したくないユーザーHive の場合は、非常に効果的な アプローチになります。

したがって、カスタムスクリプトは効率の点で最適なソリューションではないことが明らかでした。

しかし、私はUDFの機能をどのように保つべきですが、分散型Hadoop構成では正常に動作することを確認する必要がありますか?この質問に対する答えは、Language Manual UDF wikiページのUDF Internalsセクションにあります。私は私のクエリを記述する場合:

SELECT customer_id, call_time, delta(customer_id, call_time) FROM (SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t; 

は、それは時間を短縮し、通話のために、同じ顧客からのすべてのレコードが同じ減速によって処理されていることを保証する構築物によりBYおよびSORT、DISTRIBUTEで実行されます。

上記のUDFとこのクエリの構成は私の問題を解決します。

(リンクを追加しないため申し訳ありませんが、私は十分評判ポイントを持っていないので、私はそれを行うことは許されないよ)

+0

これは[この質問]と非常に似ていると思います(http://stackoverflow.com/questions/14028796/reduce-a-set-of-rows-in-hive-to-another-set-ofrows )私はハイブでカスタムマップ/リダクションを使って答えました。適切なreduceスクリプトを用意するだけで済みます。 – libjack

+0

ハイブでこれを行う方法はわかりませんが、カスケーディングするapiがあります。カスケードのバッファと呼ばれるものがあります.http://docs.cascading.org/cascading/2.0/userguide/html/ch05s05.html –

答えて

11

それは古い質問ですが、将来の参照のために、私はここに別の提案を書く:

ハイブWindowing functionsクエリで次/前の値を使用することができます。

模造コードクエリであってもよい。

SELECT CUSTOMER_ID、LAG(call_time、1、0)(call_time行1先行することによってCUSTOMER_ID ORDER BY PARTITION)OVER - MYTABLE FROM call_time。

0

たぶん誰かが同様の要件に遭遇し、私が見つけた解決策は以下の通りであります:

package com.example; 
// imports (they depend on the hive version) 
@Description(name = "delta", value = "_FUNC_(customer id column, call time column) " 
    + "- computes the time passed between two succesive records from the same customer. " 
    + "It generates 3 columns: first contains the customer id, second contains call time " 
    + "and third contains the time passed from the previous call. This function returns only " 
    + "the records that have a previous call from the same customer (requirements are not applicable " 
    + "to the first call)", extended = "Example:\n> SELECT _FUNC_(customer_id, call_time) AS" 
    + "(customer_id, call_time, time_passed) FROM (SELECT customer_id, call_time FROM mytable " 
    + "DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;") 
public class DeltaComputerUDTF extends GenericUDTF { 
private static final int NUM_COLS = 3; 

private Text[] retCols; // array of returned column values 
private ObjectInspector[] inputOIs; // input ObjectInspectors 
private String prevCustomerId; 
private Long prevCallTime; 

@Override 
public StructObjectInspector initialize(ObjectInspector[] ois) throws UDFArgumentException { 
    if (ois.length != 2) { 
     throw new UDFArgumentException(
       "There must be 2 arguments: customer Id column name and call time column name"); 
    } 

    inputOIs = ois; 

    // construct the output column data holders 
    retCols = new Text[NUM_COLS]; 
    for (int i = 0; i < NUM_COLS; ++i) { 
     retCols[i] = new Text(); 
    } 

    // construct output object inspector 
    List<String> fieldNames = new ArrayList<String>(NUM_COLS); 
    List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(NUM_COLS); 
    for (int i = 0; i < NUM_COLS; ++i) { 
     // column name can be anything since it will be named by UDTF as clause 
     fieldNames.add("c" + i); 
     // all returned type will be Text 
     fieldOIs.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector); 
    } 

    return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); 
} 

@Override 
public void process(Object[] args) throws HiveException { 
    String customerId = ((StringObjectInspector) inputOIs[0]).getPrimitiveJavaObject(args[0]); 
    Long callTime = ((LongObjectInspector) inputOIs[1]).get(args[1]); 

    if (customerId.equals(prevCustomerId)) { 
     retCols[0].set(customerId); 
     retCols[1].set(callTime.toString()); 
     retCols[2].set(new Long(callTime - prevCallTime).toString()); 
     forward(retCols); 
    } 

    // Store the current customer data, for the next line 
    prevCustomerId = customerId; 
    prevCallTime = callTime; 
} 

@Override 
public void close() throws HiveException { 
    // TODO Auto-generated method stub 

} 

} 

2)この関数を含むjarファイルを作成します。

1)は、カスタム関数を作成します。 jarnameがmyjar.jarであるとします。

3)Hiveでマシンにジャーをコピーします。クエリを実行する)

ADD JAR /tmp/myjar.jar; 
CREATE TEMPORARY FUNCTION delta AS 'com.example.DeltaComputerUDTF'; 

5:ハイブ内のカスタム機能を定義する)は、それが/ tmpに

4が配置されていると

SELECT delta(customer_id, call_time) AS (customer_id, call_time, time_difference) FROM 
    (SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t; 

備考:

a。私は、call_time列がデータをbigintとして格納すると仮定しました。それが文字列の場合、プロセス関数で文字列として取得し(customerIdと同様に)、それを解析してLongに変換します

b。 UDFの代わりにUDTFを使用することにしました。これは、必要なすべてのデータを生成するためです。それ以外の場合(UDFの場合)、生成されたデータをフィルタリングしてNULL値をスキップする必要があります。だから、元の記事の最初の編集で説明したUDF機能(DeltaComputerUDF)と、クエリは次のようになります。

SELECT customer_id, call_time, time_difference 
FROM 
    (
    SELECT delta(customer_id, call_time) AS (customer_id, call_time, time_difference) 
    FROM 
     (
     SELECT customer_id, call_time FROM mytable 
     DISTRIBUTE BY customer_id 
     SORT BY customer_id, call_time 
     ) t 
    ) u 
WHERE time_difference IS NOT NULL; 

C。テーブル内の行の順番に関係なく、両方の関数(UDFとUDTF)は必要に応じて機能します(したがって、デルタ関数を使用する前に顧客IDと呼び出し時間でテーブルデータをソートする必要はありません)

1

MAP-REDUCEはJavaやPythonのような他のプログラミング言語と明示的に関係しています。 マップ{cutomer_id,call_time}から出て、減速機では{customer_id,list{time_stamp}}が得られます。減速機では、これらのタイムスタンプをソートしてデータを処理できます。

関連する問題