2017-02-26 8 views
0

私はSpark 2.1.0を使用しており、Cassandraクラスタに接続しようとしています。私は最新のスパークリヤを使用しました。私は デフォルト以下のようにデフォルト設定を設定していますスパークセッションsparklyrとRを使用してCassandraテーブルを読み込めません

# local-only configuration 
    sparklyr.cores.local: !expr parallel::detectCores() 
    spark.sql.shuffle.partitions.local: !expr parallel::detectCores() 

    # cassandra settings 
spark.cassandra.connection.host:<Cassandra IP> 
spark.cassandra.auth.username: <uid> 
spark.cassandra.auth.password:<pass> 

sparklyr.defaultPackages: 
- com.databricks:spark-csv_2.11:1.5.0 
- com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-RC1 
- com.datastax.cassandra:cassandra-driver-core:3.1.4 

瓶は、ソースファイルが配置されているルートディレクトリに配置されています。

以下の操作を実行しました。私はread関数を呼び出そうとするまで、すべてうまくいった。 jarの場所を明示的に設定しました。

> library(sparklyr) 
    > config <- spark_config() 
    Warning message: 
    In readLines(input, encoding = "UTF-8") : 
     incomplete final line found on '/home/bsc/BSCAnalytics/config.yml' 
    > config[["sparklyr.jars.default"]] <- c("/home/bsc/BSCAnalytics/cassandra-driver-core-3.1.4.jar") 
    > 
    > sc <- spark_connect(master = "local", version = "2.1.0") 
    Warning message: 
    In readLines(input, encoding = "UTF-8") : 
     incomplete final line found on '/home/bsc/BSCAnalytics/config.yml' 
    > Spark.session <- sparklyr::invoke_static(sc, "org.apache.spark.sql.SparkSession", "builder") %>% sparklyr::invoke("config", "spark.cassandra.connection.host", "<Cassandra IP>") %>% sparklyr::invoke("getOrCreate") 

read関数を呼び出そうとすると、実行時にjarファイルを見つけることができません。私は次のエラーを目撃:

> event_df <- invoke(Spark.session, "read") %>% invoke("format", "org.apache.spark.sql.cassandra") %>% invoke("option", "keyspace", "kps") %>% invoke("option", "table", "tab_event") %>% invoke("load") 
Error: java.lang.ClassNotFoundException: Failed to find data source: org.apache.spark.sql.cassandra. Please find packages at http://spark.apache.org/third-party-projects.html 
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:569) 
    at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:86) 
    at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:86) 
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:325) 
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152) 
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at sparklyr.Invoke$.invoke(invoke.scala:94) 
    at sparklyr.StreamHandler$.handleMethodCall(stream.scala:89) 
    at sparklyr.StreamHandler$.read(stream.scala:55) 
    at sparklyr.BackendHandler.channelRead0(handler.scala:49) 
    at sparklyr.BackendHandler.channelRead0(handler.scala:14) 
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346) 
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346) 
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293) 
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346) 
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353) 
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911) 
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489) 
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451) 
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140) 
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.cassandra.DefaultSource 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25$$anonfun$apply$13.apply(DataSource.scala:554) 
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25$$anonfun$apply$13.apply(DataSource.scala:554) 
    at scala.util.Try$.apply(Try.scala:192) 
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25.apply(DataSource.scala:554) 
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25.apply(DataSource.scala:554) 
    at scala.util.Try.orElse(Try.scala:84) 
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:554) 
    ... 39 more 

答えて

0

あなたはこのようなものを使用することができます

library(sparklyr) 

config <- spark_config() 
config[["sparklyr.defaultPackages"]] <- c(
    "datastax:spark-cassandra-connector:2.0.0-RC1-s_2.11") 

sc <- spark_connect(master = "local", version = "1.6.1", config = config) 

df <- sparklyr:::spark_data_read_generic(
    sc, 
    "org.apache.spark.sql.cassandra", 
    "format", list(
    keyspace = "dev", 
    table = "emp" 
)) %>% invoke("load") 

cassandra_tbl <- sparklyr:::spark_partition_register_df(
    sc, 
    df, 
    name = "emp", 
    repartition = 0, 
    memory = FALSE) 

cassandra_tbl 

も参照してくださいhttps://github.com/rstudio/sparklyr/issues/520

+0

こんにちはハビエル、 感謝を。私はあなたの閲覧のために、次の詳細情報を共有し、あなたの提言を求めたいと思います: 私たちのスパーク(2.1)クラスタが共同カサンドラ(3.xの)に配置されていない カサンドラは、ユーザーIDとパスワード で固定されたノードを私に聞かせてください。次のことを理解してください。 Spark Cassandraコネクタジャーはどこに配置しますか?Sparlyrはどのように参照しますか? configパラメータを設定する関数にmaven属性を渡しているようです。 ここでは、クラスタIP、ユーザID、およびパスワードを渡しますか? さらなる分析のために、DFに凍結または他のカッサンドラ複合オブジェクトをどのようにロードするのですか? – SCB

関連する問題