Cassandra DBの作成/削除キースペースとテーブルにはCassandraセッションが必要です。 Sparkアプリケーションでは、Cassandra Sessionを作成するためにSparkConfをCassandraConnectorに渡す必要があります。 Spark 2.0では以下のようにすることができます。あなたがデータフレームを既存しているなら、あなたにもDataFrameFunctions.createCassandraTable(Df)
を使用してカサンドラにテーブルを作成することができます
SparkSession spark = SparkSession
.builder()
.appName("SparkCassandraApp")
.config("spark.cassandra.connection.host", "localhost")
.config("spark.cassandra.connection.port", "9042")
.master("local[2]")
.getOrCreate();
CassandraConnector connector = CassandraConnector.apply(spark.sparkContext().conf());
Session session = connector.openSession();
session.execute("CREATE TABLE mykeyspace.mytable(id UUID PRIMARY KEY, username TEXT, email TEXT)");
。 APIの詳細hereを参照してください。
下記のようにspark-cassandra-connectorが提供するapiを使用して、Cassandra DBからデータを読み取ることができます。
Dataset<Row> dataset = spark.read().format("org.apache.spark.sql.cassandra")
.options(new HashMap<String, String>() {
{
put("keyspace", "mykeyspace");
put("table", "mytable");
}
}).load();
dataset.show();
あなたは以下のようなスパークカサンドラコネクタによって返されたデータフレームで作成された一時テーブルでクエリを実行するためにSparkSession.sql()メソッドを使用することができます。
dataset.createOrReplaceTempView("usertable");
Dataset<Row> dataset1 = spark.sql("select * from usertable where username = 'Mat'");
dataset1.show();
ありがとうございました。これはまさに私が探していたものです。 SparkやCassandraを使用してクエリを実行する際には、SQLの制限事項には何の制限もありませんか?キースペース/テーブルを作成することを理解するには、cassandraを使用する必要があります。 –
また、SparkSession.sql() - Cassandraテーブルに対してではなく、一時テーブルに対してのみクエリを実行できますか。 Sparkの文書にはそのことは記載されていません。 "SQL構文解析に使用される方言は、 'spark.sql.dialect'を使用して構成できます。SQLContextの場合、利用できる唯一の方言は" SQL "です。このSQL構文は、Spark SQLが提供する単純なSQLパーサを使用します。HiveContext 、デフォルトは "hiveql"ですが、 "sql"も利用できます。HiveQLパーサーははるかに完全であるため、ほとんどのユースケースでお勧めします。 –
SparkSession.sqlは一時テーブルに限定されません。 Spark JDBCを使用して、異なるデータベースに接続することができます。 SparkCassandraConnectorを使用すると、Cassandra DBを使用するのがはるかに簡単です。 – abaghel