sqlite3 - Thread safe counter using python's multiprocessing starmap -
at moment i'm trying handle results calculation come in fast. @ first inserted each simulation result sqlite database turned out bottleneck of entire calculation. ended using cursor.executemany instead of cursor.execute faster.
my problem i'm somehow not able implement thread safe counter.
the executemany task should run every 1000 calculations. therefore implemented initializer multiprocessing.value tried solution (http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing) somehow values of counter duplicates ends in running executemany task or not @ all.
if has idea how solve issue i'd appreciate it.
here's minimum sample:
import multiprocessing, sqlite3 multiprocessing import value, lock itertools import repeat def worker(testvalues, totalvalues): mp_counter.value += 1 counter.increment() con = sqlite3.connect("test.db", timeout=30.0) cur = con.cursor() # minimum sample: helper = list(range(5)) helper = [x * testvalues x in helper] glist.append(helper) execute_every = 10 print("counter class: %d" % (counter.value())) print("mp_counter: %d" % (mp_counter.value)) if counter.value() % execute_every == 0 or counter.value() == totalvalues - 1: print("execute query") print("counter class: %d" % (counter.value())) print("mp_counter: %d" % (mp_counter.value)) helper = [tuple(row) row in glist[:execute_every]] del glist[:execute_every] cur.executemany( "insert test (one, two, three, four, five) values (?, ?, ?, ?, ?);", helper) con.commit() con.close() def setup(t, g, c): global mp_counter global glist global counter mp_counter = t glist = g counter = c class counter(object): def __init__(self, initval=0): self.val = value('i', initval) self.lock = lock() def increment(self): self.lock: self.val.value += 1 def value(self): self.lock: return self.val.value if __name__ == '__main__': m = multiprocessing.manager() cpus = multiprocessing.cpu_count() mp_counter = multiprocessing.value('i', 0) glist = m.list([]) thread_safe_counter = counter(0) l = multiprocessing.lock() workers = multiprocessing.pool(initializer=setup, initargs=[mp_counter, glist, thread_safe_counter],processes=cpus) con = sqlite3.connect("test.db", timeout=30.0) cur = con.cursor() cur.execute('pragma journal_mode=wal') sqlcommand = "create table if not exists test (one int, 2 int, 3 int, 4 int, 5 int);" cur.execute(sqlcommand) con.close() totalvalues = 100 testvalues = list(range(totalvalues)) workers.starmap(worker, zip(testvalues, repeat(totalvalues))) workers.close() workers.join() #check if list empty print(glist)
thank guys :)
your counter has increment()
, value()
method, need called separately, make safe you'd have call both operations while holding lock. increment()
method should return new value after incrementing it, , should use without further calls value()
, e.g:
class counter(object): def __init__(self, initval=0): self.val = value('i', initval) self.lock = lock() def increment(self): self.lock: self.val.value += 1 return self.val.value ... def worker(testvalues, totalvalues): counter_value = counter.increment() # use counter_value here on ...
also, value
created default rlock
, can overridden in constructor call different lock type if needed. don't need allocate own lock, use:
class counter(object): def __init__(self, initval=0): self.val = value('i', initval) # or value('i', initval, lock=lock()) def increment(self): self.val.get_lock(): self.val.value += 1 return self.val.value
Comments
Post a Comment