2016-06-12 8 views
0

マイクラスタサイズが6機であると私はしばしばこのエラーメッセージが表示されたと私は本当にこの問題を解決する方法がわからないカサンドラクラスタでエラーがタイムアウトになりました操作が

batch.add(schedule_remove_stmt, (source, type, row['scheduled_for'],row['id']));session.execute(batch,30) 

完全なコード:

cluster = Cluster(['localhost']) 
session = cluster.connect('keyspace') 
d = datetime.utcnow() 
scheduled_for = d.replace(second=0, microsecond=0) 
rowid=[] 
stmt = session.prepare('SELECT * FROM schedules WHERE source=? AND type= ? AND scheduled_for = ?') 
schedule_remove_stmt = session.prepare("DELETE FROM schedules WHERE source = ? AND type = ? AND scheduled_for = ? AND id = ?") 
schedule_insert_stmt = session.prepare("INSERT INTO schedules(source, type, scheduled_for, id) VALUES (?, ?, ?, ?)") 
schedules_to_delete = [] 
articles={} 
source='' 
type='' 
try: 
    rows = session.execute(stmt, [source,type, scheduled_for]) 
    article_schedule_delete = '' 
    for row in rows: 
     schedules_to_delete.append({'id':row.id,'scheduled_for':row.scheduled_for}) 
     article_schedule_delete=article_schedule_delete+'\''+row.id+'\',' 
     rowid.append(row.id) 
    article_schedule_delete = article_schedule_delete[0:-1] 
    cql = 'SELECT * FROM articles WHERE id in (%s)' % article_schedule_delete 
    articles_row = session.execute(cql) 
    for row in articles_row: 
     articles[row.id]=row.created_at 
except Exception as e: 
    print e 
    log.info('select error is:%s' % e) 
try: 
    for row in schedules_to_delete: 
     batch = BatchStatement() 
     batch.add(schedule_remove_stmt, (source, type, row['scheduled_for'],row['id'])) 
     try: 
      if row['id'] in articles.keys(): 
       next_schedule =d 
       elapsed = datetime.utcnow() - articles[row['id']] 
       if elapsed <= timedelta(hours=1): 
        next_schedule += timedelta(minutes=6) 
       elif elapsed <= timedelta(hours=3): 
        next_schedule += timedelta(minutes=18) 
       elif elapsed <= timedelta(hours=6): 
        next_schedule += timedelta(minutes=36) 
       elif elapsed <= timedelta(hours=12): 
        next_schedule += timedelta(minutes=72) 
       elif elapsed <= timedelta(days=1): 
        next_schedule += timedelta(minutes=144) 
       elif elapsed <= timedelta(days=3): 
        next_schedule += timedelta(minutes=432) 
       elif elapsed <= timedelta(days=30) : 
        next_schedule += timedelta(minutes=1440) 
       if not next_schedule==d: 
        batch.add(schedule_insert_stmt, (source,type, next_schedule.replace(second=0, microsecond=0),row['id'])) 
        #log.info('schedule id:%s' % row['id']) 
     except Exception as e: 
      print 'key error:',e 
      log.info('HOW IT CHANGES %s %s %s %s ERROR:%s' % (source,type, next_schedule.replace(second=0, microsecond=0), row['id'],e)) 
     session.execute(batch,30) 
except Exception as e: 
    print 'schedules error is =======================>',e 
    log.info('schedules error is:%s' % e) 

おかげらエラーメッセージが表示されるコードの一部は、このです私は本当にこれを解決する方法を知りません!

答えて

2

バッチを使用して異なるパーティションキーに対して多数の操作を実行しようとしているため、この場合はバッチステートメントを使用しないでください。タイムアウト例外が発生します。バッチを使用してテーブルを同期させ、パフォーマンスの最適化を維持する必要があります。 バッチの不正使用に関する詳細はこちらthis article

asynchronous driver apiを使用すると、大量の削除クエリを実行するのに適しています。コードのパフォーマンスを維持し、コーディネーターのオーバーロードを避けることができます。