2016-06-21 3 views
0

オブジェクト_thread.lockピクルスIは同時にCassandraのマルチは、

http://www.datastax.com/dev/blog/datastax-python-driver-multiprocessing-example-for-improved-bulk-data-throughput

の例に基づいて、これは私のコード

class QueryManager(object): 

concurrency = 100 # chosen to match the default in execute_concurrent_with_args 

def __init__(self, session, process_count=None): 
    self.pool = Pool(processes=process_count, initializer=self._setup, initargs=(session,)) 

@classmethod 
def _setup(cls, session): 
    cls.session = session 
    cls.prepared = cls.session.prepare(""" 
INSERT INTO test_table (key1, key2, key3, key4, key5) VALUES (?, ?, ?, ?, ?) 
""") 

def close_pool(self): 
    self.pool.close() 
    self.pool.join() 

def get_results(self, params): 
    results = self.pool.map(_multiprocess_write, (params[n:n+self.concurrency] for n in range(0, len(params), self.concurrency))) 
    return list(itertools.chain(*results)) 

@classmethod 
def _results_from_concurrent(cls, params): 
    return [results[1] for results in execute_concurrent_with_args(cls.session, cls.prepared, params)] 


def _multiprocess_write(params): 
    return QueryManager._results_from_concurrent(params) 


if __name__ == '__main__': 

    processes = 2 

    # connect cluster 
    cluster = Cluster(contact_points=['127.0.0.1'], port=9042) 
    session = cluster.connect() 

    # database name is a concatenation of client_id and system_id 
    keyspace_name = 'unit_test_0' 

    # drop keyspace if it already exists in a cluster 
    try: 
     session.execute("DROP KEYSPACE IF EXISTS " + keyspace_name) 
    except: 
     pass 

    create_keyspace_query = "CREATE KEYSPACE " + keyspace_name \ 
         + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};" 
    session.execute(create_keyspace_query) 

    # use a session's keyspace 
    session.set_keyspace(keyspace_name) 

    # drop table if it already exists in the keyspace 
    try: 
     session.execute("DROP TABLE IF EXISTS " + "test_table") 
    except: 
     pass 

    # create a table for invoices in the keyspace 
    create_test_table = "CREATE TABLE test_table(" 

    keys = "key1 text,\n" \ 
      "key2 text,\n" \ 
      "key3 text,\n" \ 
      "key4 text,\n" \ 
      "key5 text,\n" 

    create_invoice_table_query += keys 
    create_invoice_table_query += "PRIMARY KEY (key1))" 
    session.execute(create_test_table) 

    qm = QueryManager(session, processes) 

    params = list() 
    for row in range(100000): 
     key = 'test' + str(row) 
     params.append([key, 'test', 'test', 'test', 'test']) 

    start = time.time() 
    rows = qm.get_results(params) 
    delta = time.time() - start 
    log.info(fm('Cassandra inserts 100k dummy rows for ', delta, ' secs')) 
である行(ダミーデータ)を挿入する Cassandramultiprocessingを使用しようとすることはできません私は、コードを実行

は、私は次のエラーを得た

TypeError: can't pickle _thread.lock objects 

あなたはIPCの境界を超えるロックをシリアル化しようとしていることをお勧め

self.pool = Pool(processes=process_count, initializer=self._setup, initargs=(session,)) 
+0

ここに到着した他の人に役立つ可能性があります。 https://stackoverflow.com/questions/44005212/picklingerror-when-copying-a-very-large-cassandra-table-using-cqlsh/45698179#45698179 – bpgriner

答えて

1

で指摘しています。私はそれがあなたがSession初期化関数への引数としてSessionオブジェクトを供給しているからかもしれないと思います。 init関数が各ワーカープロセスで新しいセッションを作成するようにします(引用したblog postの「セッションごとのセッション」を参照)。