2016-04-05 10 views
0

私はPython2.7、Celery、cx_Oracleを使ってOracleデータベースにアクセスしています。同時にセロリのタスク間でOracleデータベース接続を共有する

私は多くのタスクを作成します。各タスクは、cx_Oracleを介してクエリを実行します。このタスクの多くは同時に実行されます。すべてのタスクが同じデータベース接続を共有する必要があります。

1つのタスクのみを起動すると、クエリが正しく実行されます。しかし、いくつかのクエリを起動すると、次のエラーメッセージが表示されます。

[2016-04-04 17:12:43,846: ERROR/MainProcess] Task tasks.run_query[574a6e7f-f58e-4b74-bc84-af4555af97d6] raised unexpected: DatabaseError('<cx_Oracle._Error object at 0x7f9976635580>',) 
Traceback (most recent call last): 
    File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 240, in trace_task 
    R = retval = fun(*args, **kwargs) 
    File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 438, in __protected_call__ 
    return self.run(*args, **kwargs) 
    File "/home/ric/workspace/dbw_celery/tasks.py", line 39, in run_query 
    column_names = get_column_names(oracle_conn, table_info["table_name"]) 
    File "/home/ric/workspace/dbw_celery/utilities.py", line 87, in get_column_names 
    cursor.execute(query_str) 
DatabaseError: <cx_Oracle._Error object at 0x7f9976635580> 

私のコードを見てみましょう。

これは私がOracleデータベース接続を作成し、私のtasks.pyファイル、セロリのインスタンスであり、ユーザがデータベース接続を言っただろう、私のタスク定義:

# tasks.py 
import celeryconfig 
from celery import Celery 
from utilities import connect_to_db, get_new_rows, write_output_rows 

# Define a Celery instance 
dbwapp = Celery('tasks') 
dbwapp.config_from_object(celeryconfig) 
dbwapp.conf["CELERYBEAT_SCHEDULE"] = {} 

# Define an Oracle connection as a global variable to be used by all tasks 
oracle_conn = connect_to_db(db_user, db_pass, db_host, db_port, db_name) 

# Define the task function that each Celery worker will run 
@dbwapp.task() 
def run_query(table_info, output_description): 
    """Run a query on a given table. Writes found rows to output file.""" 
    global oracle_conn 

    column_names = get_column_names(oracle_conn, table_info["table_name"]) 

    new_rows, last_check_timestamp = get_new_rows(oracle_conn, table_info) 

    write_result_to_output_file(output_file, new_rows) 


def load_celerybeat_schedule(): 
    """Loads the CELERYBEAT_SCHEDULE dictionary with the tasks to run.""" 

    new_task_dict = { 
     "task": "tasks.run_query", 
     "schedule": timedelta(seconds=table_config["check_interval"]), 
     "args": (table_config, output_description) 
    } 
    new_task_name = "task-" + table_config["table_name"] 
    dbwapp.conf["CELERYBEAT_SCHEDULE"][new_task_name] = new_task_dict 

これは私がutilities.pyファイル内のデータベースに接続する方法ですが:

012:

# utilities.py 
def connect_to_db(db_user, db_password, db_host, db_port, db_name): 
    """Connect to DB.""" 
    connection_str = "%s/%[email protected]%s:%s/%s" % (db_user, db_password, db_host, db_port, db_name) 

    try: 
     db_connection = cx_Oracle.connect(connection_str) 
    except cx_Oracle.DatabaseError: 
     logger.error("Couldn't connect to DB %s" % db_name) 
     return None 
    logging.info("Succesfully connected to the DB: %s" % db_name) 

    return db_connection 

これは、クエリが実際に実行されます別のファイルで定義さget_new_rows_functionあります

私はこのような私のコードを実行します。私はそれが簡単に理解できるようにするために自分のコードを簡素化することを試みたcelery -A tasks worker -B

を。

エラーが発生する原因は、異なるタスクが同時に実行され、同じデータベース接続を共有しているためです。それらの同時実行は「混ざり合っている」かそのようなものになります。

異なるセロリのタスク間でデータベース接続を共有する正しい方法は何ですか?

誰かが私が間違っていることを知っていますか?

答えて

3

複数のスレッドが同じ接続を共有したい場合は、スレッドモードを有効にする必要があります。このようなもの:

conn = cx_Oracle.connect(connection_str, threaded = True) 

あなたが面白い問題に遭遇することができます!

関連する問題