2017-02-23 9 views
0

私はKafkaと方解石を統合しようとしています。私はCsvStreamableTableをリフレッシュしました。Apache CalciteとKafkaの統合

各ConsumerRecordがfowlloingコードを使用して、[]オブジェクトに変換される:

static class ArrayRowConverter extends RowConverter<Object[]> { 
    private List<Schema.Field> fields; 

    public ArrayRowConverter(List<Schema.Field> fields) { 
     this.fields = fields; 
    } 

    @Override 
    Object[] convertRow(ConsumerRecord<String, GenericRecord> consumerRecord) { 
     Object[] objects = new Object[fields.size()+1]; 
     int i = 0 ; 
     objects[i++] = consumerRecord.timestamp(); 
     for(Schema.Field field : this.fields) { 
      Object obj = consumerRecord.value().get(field.name()); 
      if(obj instanceof Utf8){ 
       objects[i ++] = obj.toString(); 
      }else { 
       objects[i ++] = obj; 
      } 
     } 
     return objects; 
    } 
} 

列挙子は、次のように実装され、1つのスレッド常にカフカからのポーリングレコードで、キュー、のGetRecord()メソッドの世論調査にそれらを置きますそのキューから:

public E current() { 
    return current; 
} 

public boolean moveNext() { 
for(;;) { 
    if(cancelFlag.get()) { 
     return false; 
    } 
    ConsumerRecord<String, GenericRecord> record = getRecord(); 
    if(record == null) { 
     try { 
      Thread.sleep(200L); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
     continue; 
    } 
    current = rowConvert.convertRow(record); 
    return true; 
    } 
} 

私はSELECT STREAM * FROM Kafka.clicksをテストしましたが正常に動作します。 rowtimeは明示的に追加された最初の列で、値はKafkaのレコードのタイムスタンプです。

しかし、私は

SELECT STREAM FLOOR(rowtime TO HOUR) 
AS rowtime,ip,COUNT(*) AS c FROM KAFKA.clicks GROUP BY FLOOR(rowtime TO HOUR), ip 

をしようとしたとき、それはあなたが "ROWTIME" 列が単調であることを宣言する必要がある例外

java.sql.SQLException: Error while executing SQL "SELECT STREAM FLOOR(rowtime TO HOUR) AS rowtime,ip,COUNT(*) AS c FROM KAFKA.clicks GROUP BY FLOOR(rowtime TO HOUR), ip": From line 1, column 85 to line 1, column 119: Streaming aggregation requires at least one monotonic expression in GROUP BY clause 
    at org.apache.calcite.avatica.Helper.createException(Helper.java:56) 
    at org.apache.calcite.avatica.Helper.createException(Helper.java:41) 

答えて

0

を投げました。 MockCatalogReaderでは、 "ORDERS"と "SHIPMENTS"ストリームで "ROWTIME"が単調に宣言されていることに注意してください。そのため、SqlValidatorTest.testStreamGroupBy()の一部のクエリは有効で、それ以外のクエリは有効ではありません。バリデーターに依存する主要なメソッドはSqlValidatorTable.getMonotonicity(String columnName)です。

+0

ありがとうございましたJulian、列を単調に宣言する簡単な方法がありますか、またはMockTableとして実装するだけですか? – user2283216

関連する問題