2016-08-09 22 views
3

スナップシェルからkafkaを使用してsparkストリーミングテーブルを作成する際に問題が発生しました。SnappyData - Kafkaストリーミングテーブルを作成中にエラーが発生しました

「例外 '無効な入力 'C'、dmlOperation、挿入を期待し、withIdentifier、選択またはプット(ライン1、列1):'

参考:ここでhttp://snappydatainc.github.io/snappydata/streamingWithSQL/#spark-streaming-overview

は私のsqlです:

CREATE STREAM TABLE if not exists sensor_data_stream 
(sensor_id string, metric string) 
using kafka_stream 
options (
    storagelevel 'MEMORY_AND_DISK_SER_2', 
    rowConverter 'io.snappydata.app.streaming.KafkaStreamToRowsConverter', 
    zkQuorum 'localhost:2181', 
    groupId 'streamConsumer', 
    topics 'test:01'); 

シェルは最初の文字 'C'でスクリプトが似ていないようです。次のコマンドを使用してスクリプトを実行しようとしています:

snappy> run '/scripts/my_test_sensor_script.sql'; 

助けてください!

+0

こんにちはマイク、これはあなたのために今すぐ返されます – plambre

答えて

3

いくつかの矛盾を文書化し、実際のsyntax.The正しい構文であるがある:あなたがする必要がある

CREATE STREAM TABLE sensor_data_stream if not exists (sensor_id string, 
metric string) using kafka_stream 
options (storagelevel 'MEMORY_AND_DISK_SER_2', 
rowConverter 'io.snappydata.app.streaming.KafkaStreamToRowsConverter', 
zkQuorum 'localhost:2181', 
groupId 'streamConsumer', topics 'test:01'); 

もう一つは、あなたのデータの行コンバータ

+1

ありがとうサシン!更新された構文が働いた。行コンバータの作成に関するあなたのコメントに基づいて、io.snappydata.app.streaming.KafkaStreamToRowsConverterでClassNotFound例外が発生しています。その問題を解決するためにドキュメントを検索します。 –

0

マイクを書くことで、あなたのことが必要

trait StreamToRowsConverter extends Serializable { 
    def toRows(message: Any): Seq[Row] 
} 

を実装して、独自のrowConverterクラスを作成し、次にそのDDLのrowConverter完全修飾クラス名を指定します。 rowConverterはスキーマ固有のものです。 'io.snappydata.app.streaming.KafkaStreamToRowsConverter'は単なるプレースホルダクラス名で、独自のrowConverterクラスで置き換える必要があります。

+0

ありがとうYogesh。私はKafkaに送信するJavaオブジェクトを変換しようとしています。私は行コンバータを使用してこのオブジェクトを変換する方法を理解しようとしています。このプロセスについて説明することをお勧めする文書はありますか? –

関連する問題