sqlite3 - Thread safe counter using python's multiprocessing starmap -


at moment i'm trying handle results calculation come in fast. @ first inserted each simulation result sqlite database turned out bottleneck of entire calculation. ended using cursor.executemany instead of cursor.execute faster.

my problem i'm somehow not able implement thread safe counter.

the executemany task should run every 1000 calculations. therefore implemented initializer multiprocessing.value tried solution (http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing) somehow values of counter duplicates ends in running executemany task or not @ all.

if has idea how solve issue i'd appreciate it.

here's minimum sample:

import multiprocessing, sqlite3 multiprocessing import value, lock itertools import repeat  def worker(testvalues, totalvalues):     mp_counter.value += 1     counter.increment()      con = sqlite3.connect("test.db", timeout=30.0)     cur = con.cursor()     # minimum sample:     helper = list(range(5))     helper = [x * testvalues x in helper]     glist.append(helper)      execute_every = 10     print("counter class: %d" % (counter.value()))     print("mp_counter: %d" % (mp_counter.value))      if counter.value() % execute_every == 0 or counter.value() == totalvalues - 1:         print("execute query")         print("counter class: %d" % (counter.value()))         print("mp_counter: %d" % (mp_counter.value))          helper = [tuple(row) row in glist[:execute_every]]         del glist[:execute_every]         cur.executemany(             "insert test (one, two, three, four, five) values (?, ?, ?, ?, ?);", helper)         con.commit()      con.close()  def setup(t, g, c):     global mp_counter     global glist     global counter     mp_counter = t     glist = g     counter = c  class counter(object):     def __init__(self, initval=0):         self.val = value('i', initval)         self.lock = lock()      def increment(self):         self.lock:             self.val.value += 1      def value(self):         self.lock:             return self.val.value  if __name__ == '__main__':     m = multiprocessing.manager()     cpus = multiprocessing.cpu_count()     mp_counter = multiprocessing.value('i', 0)     glist = m.list([])     thread_safe_counter = counter(0)      l = multiprocessing.lock()     workers = multiprocessing.pool(initializer=setup, initargs=[mp_counter, glist, thread_safe_counter],processes=cpus)      con = sqlite3.connect("test.db", timeout=30.0)     cur = con.cursor()     cur.execute('pragma journal_mode=wal')     sqlcommand = "create table if not exists test (one int, 2 int, 3 int, 4 int, 5 int);"     cur.execute(sqlcommand)     con.close()      totalvalues = 100     testvalues = list(range(totalvalues))      workers.starmap(worker, zip(testvalues, repeat(totalvalues)))     workers.close()     workers.join()     #check if list empty     print(glist) 

thank guys :)

your counter has increment() , value() method, need called separately, make safe you'd have call both operations while holding lock. increment() method should return new value after incrementing it, , should use without further calls value(), e.g:

class counter(object):     def __init__(self, initval=0):         self.val = value('i', initval)         self.lock = lock()      def increment(self):         self.lock:             self.val.value += 1             return self.val.value  ...  def worker(testvalues, totalvalues):     counter_value = counter.increment()     # use counter_value here on     ... 

also, value created default rlock, can overridden in constructor call different lock type if needed. don't need allocate own lock, use:

class counter(object):     def __init__(self, initval=0):         self.val = value('i', initval)         # or value('i', initval, lock=lock())      def increment(self):         self.val.get_lock():             self.val.value += 1             return self.val.value 

Comments

Popular posts from this blog

aws api gateway - SerializationException in posting new Records via Dynamodb Proxy Service in API -

depending on nth recurrence of job in control M -

asp.net - Problems sending emails from forum -