2017-02-05 15 views
0

追加された画像にはan input and desired outputのサンプルが含まれています。Pysparkを使用して文字列内のサブストリングを検索する方法

たとえば、文章に「John」と「drives」が含まれている場合は、Johnが車を持っていて、運転することを意味します。私はそれを行うために使用しているコードを添付しています。しかし、コードは正しく動作せず、複雑すぎます。私はあなたの助けに感謝します。

%pyspark 
rdd = sc.textFile("./sample.txt") 
col = rdd.map(lambda line: line.split('\t')) 
#remove header 
header = col.first() #extract header 
col = col.filter(lambda line: line != header) 
def convertToRow(line): 
    return Row(Name = line[0],Text = line[1]) 
#call the function on each row, then convert to dataframe  
df = col.map(convertToRow).toDF() 
from pyspark.sql.functions import udf 
def splitParagraphIntoSentences(paragraph): 
    sentences = nltk.tokenize.sent_tokenize(paragraph) 
    return sentences 

def tokenize(text): 
    text = text.lower().replace('\n', '') 
    text = re.sub(',', '', text) 
    tokens = text.split() 
    if(len(tokens)>1): 
     tokens = splitParagraphIntoSentences(text) 
    return tokens 

tokenize = udf(lambda text: tokenize(text)) 
data = df.select('Name', tokenize(df.Text).alias("Text")) 
def how(name,paragraph): 
    drive = ['drives'] 
    walks = ['walks'] 
    comingwith = ['coming with'] 
    for s in paragraph: 
     s = s.split() 
     if ((any(s[i:i+len(drive)]==drive for i in xrange(len(s)-len(drive)+1))) and (any(s[i:i+len(name)]==name for i in xrange(len(s)-len(name)+1)))): 
      return "Drives" 
     elif ((any(s[i:i+len(walks)]==walks for i in xrange(len(s)-len(walks)+1))) and (any(s[i:i+len(name)]==name for i in xrange(len(s)-len(name)+1)))): 
      return "Walks" 
     elif ((any(s[i:i+len(comingwith)]==comingwith for i in xrange(len(s)-len(comingwith)+1))) and (any(s[i:i+len(name)]==name for i in xrange(len(s)-len(name)+1)))): 
      return "Coming with" 

def checkYesNo(name,paragraph): 
    drive = ['drives'] 
    walks = ['walks'] 
    comingwith = ['coming with'] 
    for s in paragraph: 
     s = s.split() 
     if ((any(s[i:i+len(comingwith)]==comingwith for i in xrange(len(s)-len(comingwith)+1))) or (any(s[i:i+len(walks)]==walks for i in xrange(len(s)-len(walks)+1)))): 
      return "No" 
     else: 
      return "Yes" 

how = udf(lambda name,paragraph: how(name,paragraph)) 
checkYesNo = udf(lambda name,paragraph: checkYesNo(name,paragraph)) 

final_df = data.select('Name', checkYesNo(data.Name, data.Text), how(data.Name, data.Text)) 

答えて

0

私はこのようにそれを行うだろう:

import socket 

class SparkUtil(object): 
    @staticmethod 
    def get_spark_context (host, venv, framework_name, parts): 
     os.environ['PYSPARK_PYTHON'] = "{0}/bin/python".format (venv) 
     from pyspark import SparkConf, SparkContext 
     from StringIO import StringIO 
     ip = socket.gethostbyname(socket.gethostname()) 
     sparkConf = (SparkConf() 
        .setMaster(host) 
        .setAppName(framework_name)) 
     return SparkContext(conf = sparkConf) 

input_txt = [ 
    [ "John", "John usually drives to work. He usually gets up early and drinks coffee. Mary usually joining him." ], 
    [ "Sam", "As opposed to John, Sam doesn't like to drive. Sam usually walks there." ], 
    [ "Mary", "Mary doesn't have driving license. Mary usually coming with John which picks her up from home." ] 
] 

def has_car (text): 
    return "drives" in text 

def get_method (text): 
    method = None 
    for m in [ "drives", "walks", "coming with" ]: 
     if m in text: 
      method = m 
      break 
    return method 

def process_row (row): 
    return [ row[0], has_car(row[1]), get_method(row[1]) ] 

sc = SparkUtil.get_spark_context (host   = "local[2]", 
            venv   = "../starshome/venv", 
            framework_name = "app", 
            parts   = 2) 

print (sc.parallelize (input_txt).map (process_row).collect()) 

SparkUtilクラスあなたはおそらく無視することができます。私はノートブックを使用していません。これはまっすぐなスパークアプリです。

関連する問題