2017-03-09 7 views
1

SparklyRのSQL関数のラッパーを作成しようとしています。私は次の関数を作成しました:sparklyR spack SQL用ラッパー:sqlContext.sql

newsqlData <- sqlfunction(sc, "select 
          substr(V1,1,2), 
          substr(V1,3,3), 
          substr(V1,6,6), 
          substr(V1,12,4), 
          substr(V1,16,4) 
          FROM TABLE1 WHERE V1 IS NOT NULL") 

しかし、私は次のエラーを取得する:

sqlfunction <- function(sc, block) { 
    spark_context(sc) %>% 
invoke("sqlContext.sql", block) } 

は、その後、私はそれは、以下のものを使用して呼び出す

Error: java.lang.IllegalArgumentException: invalid method sqlContext.sql for object 12 
at sparklyr.Invoke$.invoke(invoke.scala:113) 
at sparklyr.StreamHandler$.handleMethodCall(stream.scala:89) 
at sparklyr.StreamHandler$.read(stream.scala:55) 
at sparklyr.BackendHandler.channelRead0(handler.scala:49) 
at sparklyr.BackendHandler.channelRead0(handler.scala:14) 
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) 
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) 
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) 
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) 
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) 
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) 
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244) 
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) 
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) 
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846) 
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) 
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) 
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) 
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) 
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) 
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) 
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) 
at java.lang.Thread.run(Thread.java:745) 

任意の提案や修正が大幅になります感謝。

答えて

1

それがあるべき:scspark_connection(出力:spark_connect(master = master_url))である

sqlfunction <- function(sc, block) { 
    spark_session(sc) %>% invoke("sql", block) 
} 

これ:

  • spark_session(sc) - 接続オブジェクトからSparkSessionを取り出します。
  • invoke("sql", block) - blockを引数としてSparkSessionインスタンスのsqlメソッドを呼び出します。使用例と

library(sparklyr) 

sc <- spark_connect(master = "local[*]") 
sqlfunction(sc, "SELECT SPLIT('foo,bar', ',')") 
<jobj[11]> 
    class org.apache.spark.sql.Dataset 
    [split(foo,bar, ,): array<string>] 

これは、あなたのJavaオブジェクトへの参照を提供します。あなたがしたい場合は、たとえばレジスタは一時テーブルとしてあることができます。

tbl
... %>% invoke("createOrReplaceTempView", "some_name_for_the_view") 

とアクセス:あなたが使用

library(dplyr) 

tbl(sc, "some_name_for_the_view") 

コード:

  • spark_contextからSparkContextインスタンスを抽出します。
  • invoke("sqlContext.sql", block) - 存在しないメソッド(sqlContext.sql)を呼び出しようとします。
関連する問題