2017-03-29 17 views
0

pythonスクリプトを使用してページ(例:facebookページ)をスクラップし、各投稿をファイルに書き込む(gettwitterプロセスのようなもの)。 ExecuteScriptは、nifiデータフローの最初のプロセッサです。 session.create()を使用してフローファイルを作成しましたが、問題はありませんでした。Pythonでnifi addscriptフローファイルを作成する際の問題

しかし、私がFacebookから読み込んだデータをoutputstreamCallbackに入れる方法については混乱しています。私が見た例のほとんどは、Javaオーバーライドを使用していますが、私はPythonを使用しなければなりません。

私は、フローファイルの読み込みで多くの例を見つけましたが、それほど多くはありませんでした。以下はPythonでやりたいJavaのものです。

FlowFile flowFile = session.create(); 
flowFile = session.write(flowFile, new OutputStreamCallback() { 
    @Override 
    public void process(final OutputStream out) throws IOException { 
     out.write(tweet.getBytes(StandardCharsets.UTF_8)); 

その他の方法がある場合は、こちらをご覧ください。ありがとう。


@Jamesによって提案された変更を採用した後、以下に示すスニペットを書きましたが、フローファイルは転送されません。しかしコンパイルエラーはありません。以下は

import urllib2 
import json 
import datetime 
import csv 
import time 
import sys 
import traceback 
from org.apache.nifi.processor.io import OutputStreamCallback 
from org.python.core.util import StringUtil 

class WriteContentCallback(OutputStreamCallback): 
    def __init__(self, content): 
     self.content_text = content 

    def process(self, outputStream): 
     try: 
      outputStream.write(StringUtil.toBytes(self.content_text)) 
     except: 
      traceback.print_exc(file=sys.stdout) 
      raise 

#app_id = "<FILL IN>" 
#app_secret = "<FILL IN>" # DO NOT SHARE WITH ANYONE! 
page_id = "dsssssss" 
#page_id = raw_input("Please Paste Public Page Name:") 

#access_token = app_id + "|" + app_secret 

access_token = "sdfsdfsf%sdfsdf" 

#access_token = raw_input("Please Paste Your Access Token:") 


def scrapeFacebookPageFeedStatus(page_id, access_token): 
     flowFile = session.create() 
     flowFile = session.write(flowFile, WriteContentCallback("Hello there this is my data")) 
     flowFile = session.write() 
     session.transfer(flowFile, REL_SUCCESS) 

     has_next_page = False 
     num_processed = 0 # keep a count on how many we've processed 
     scrape_starttime = datetime.datetime.now() 


     while has_next_page: 
      print "Scraping %s Facebook Page: %s\n" % (page_id, scrape_starttime) 
      has_next_page = False 

     print "\nDone!\n%s Statuses Processed in %s" % \ 
       (num_processed, datetime.datetime.now() - scrape_starttime) 


if __name__ == '__main__': 
    scrapeFacebookPageFeedStatus(page_id, access_token) 
    flowFile = session.create() 
    flowFile = session.write(flowFile, WriteContentCallback("and your data")) 
    session.transfer(flowFile, REL_SUCCESS) 

nifi-app.log

> [[email protected] logs]# tail -100 nifi-app.log 2017-04-03 14:08:07,989 
> INFO [StandardProcessScheduler Thread-6] 
> o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled 
> ExecuteScript[id=a62f4b97-8fd7-15cd-95b9-505e1b960805] to run with 1 
> threads 2017-04-03 14:08:08,938 INFO [Flow Service Tasks Thread-2] 
> o.a.nifi.controller.StandardFlowService Saved flow controller 
> [email protected] // Another save 
> pending = false 2017-04-03 14:08:13,789 INFO [StandardProcessScheduler 
> Thread-3] o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled 
> PutFile[id=a62f4b8e-8fd7-15cd-7517-56593deabf55] to run with 1 threads 
> 2017-04-03 14:08:14,296 INFO [Flow Service Tasks Thread-2] 
> o.a.nifi.controller.StandardFlowService Saved flow controller 
> [email protected] // Another save 
> pending = false 

答えて

0

ここからの出力は、Python ExecuteScriptでNiFi OutputStreamCallbackの簡単な実装です:ジェームズたくさん

import sys 
import traceback 
from org.apache.nifi.processor.io import OutputStreamCallback 
from org.python.core.util import StringUtil 

class WriteContentCallback(OutputStreamCallback): 
    def __init__(self, content): 
     self.content_text = content 

    def process(self, outputStream): 
     try: 
      outputStream.write(StringUtil.toBytes(self.content_text)) 
     except: 
      traceback.print_exc(file=sys.stdout) 
      raise 

# Create new FlowFile with content 
flowFile = session.create() 
flowFile = session.write(flowFile, WriteContentCallback("This is the flowfile content")) 
session.transfer(flowFile, REL_SUCCESS) 
+0

感謝。それは動作し、あなたは揺れます! – omer

+0

あなたが共有したスニペットは、スタンドアロンで実行された場合にうまく機能します。しかし、私はPythonコードでそれを豊かにしようとすると、コンパイルエラーはありませんが、私は次のプロセッサ(0バイト転送)に転送されたフローファイルを取得しません。 – omer

関連する問題