Skip to content

Thread Pool în Python

16 January 2011

Majoritatea problemelor de la Hacker Cup primeau input de forma un număr N urmat de N linii, fiecare linie conținând un test case. Având în vedere intervalul de 6 minute de când ai downloadat fișierul input în care ai voie să trimiți înapoi soluția și faptul că desktop-ul meu e 8 core, m-am gândit că e o ocazie bună să paralelizez execuția. Problema se pune în felul următor: avem soluția la problemă (să zicem mysolution), care primește ca argument la linia de comandă un fișier input de forma descrisă mai sus (./mysolution input.in) și scrie rezultatele în StdOut – cum paralelizăm? Soluția naivă, pentru două core-uri de exemplu, e să împarți fișierul input în două jumătăți și să execuți soluția deodată, în două procese, pe fiecare jumătate. Dacă, însă, un test case ia considerabil mai mult timp decât celelalte, soluția nu e optimă – un proces termină mult mai repede și moare iar celălalt rămâne să lucreze la jumătatea sa mult mai mult timp. Cât timp nu avem garanția că distribuția test case-urilor asigură un timp de execuție asemănător pentru fiecare bucată, această abordare nu e bună. Și nu avem garanția.

Soluția: un thread pool. Împărțim fișierul input în N bucăți și le introducem într-o coadă. Pornim C thread-uri, unde C e numărul de core-uri pe care le avem la dispoziție. Fiecare thread apelează mysolution cu prima bucată din coadă până când coada e epuizată. Timpul de execuție este de C ori mai mic decât apelul ./mysolution input.in. Două lucruri de care am vrut să țin cont cu această soluție: 1. thread pool-ul poate fi refolosit la orice problemă (cât timp input-ul este de froma descrisă); 2. soluția la o problemă nu necesită nicio modificare – adică dă același rezultat și rulată ne-paralelizat, desigur, în mai mult timp.

Așa că am implementat primul meu thread pool în Python:

#!/usr/bin/env python
import sys
from threading import Thread, Condition, Lock
from subprocess import Popen, PIPE

Avem nevoie de sys pentru argumentele la linia de comandă; din threading avem nevoie de Thread pentru thread pool și de Condition și Lock pentru sincronizare; din subprocess avem nevoie de Popen și PIPE ca să executăm mysolution și să capturăm output-ul.

Avem nevoie de o funcție care să ia un fișier input și să îl împartă în N bucăți, păstrând formatul:

# split input file
def split(input_file):
    in_file = open(input_file)
    n = int(in_file.readline())
    for i in range(n):
        slice_file = open(str(i) + '.in', 'w')
        slice_file.write('1\n' + in_file.readline())
        slice_file.close()
    in_file.close()
    return n

Funcția citește (și întoarce N), apoi crează N fișiere 0.in, 1.in … fiecare având pe prima linie “1”, urmat de o linie din fișierul input. Acum aplul ./mysolution input.in poate fi transformat în N apeluri: ./mysolution 0.in, ./mysolution 1.in

Construim apoi thread pool-ul:

# thread pool
class ThreadPool:
    def __init__(self, pool_size, app, slice_count):
        # script to execute
        self.app = app

        # input slices
        self.__slice_count = slice_count
        self.__current_slice = 0

        # thread pool and slice lock
        self.__pool_size = pool_size
        self.__threads = []
        self.slice_lock = Condition(Lock())
      
        for i in range(self.__pool_size):
            self.__threads.append(ExecutorThread(self))

Constructorul primește ca argumente numărul de thread-uri dorite (pool_size), aplicația pe care să o apeleze fiecare thread (app) și numărul de fișiere input (slice_count). Tot aici inițializăm mecanismul de sincronizare (slice_lock) și inițializăm thread-urile.

Următoara funcție (tot parte din ThreadPool), e simplă: pornește toate thread-urile din pool și așteaptă ca toate să termine execuția.

    def start(self):
        # start all threads
        for thread in self.__threads:
            thread.start()

        # wait for all threads to finish
        for thread in self.__threads:
            thread.join()

Ultima funcție din ThreadPool întoarce prima felie din coadă – practic primul număr de la 0 la N-1 neprocesat încă – sau None dacă toate feliile au fost procesate.

    def get_next_slice(self):
        # return next slice if available
        if self.__current_slice < self.__slice_count:
            result = self.__current_slice
            self.__current_slice += 1
        else:
            result = None

        return result

ExecutorThread e definit astfel:

# thread class wrapping process
class ExecutorThread(Thread):
    def __init__(self, pool):
        Thread.__init__(self)
        self.__pool = pool

    def run(self):
        while True:
            # critical section
            self.__pool.slice_lock.acquire()
            # get next slice from pool
            input_slice = self.__pool.get_next_slice()
            self.__pool.slice_lock.release()

            # if None, input is exhausted
            if input_slice is None:
                break

            # run script
            output = Popen([self.__pool.app, str(input_slice) + '.in'], stdout=PIPE).communicate()[0].decode('utf-8')

            # write output
            out_file = open(str(input_slice) + '.out', 'w')
            out_file.write(output)
            out_file.close()

ExecutorThread e derivat din Thread. Constructor-ul doar inițializează o referință la thread pool. Funcția run are 3 pași:

  1. Determină felia ce urmează a fi procesată. Aici e nevoie de sincronizare pentru că mai multe thread-uri ar putea să apeleze get_next_slice simultan, cu rezultate nedorite. Dacă nu mai există felii (get_next_slice întoarce None), se iese din ciclu și thread-ul se oprește.
  2. Apelează app, pasând ca argument felia (./myscript 0.in, ./myscrpit 1.in etc.) și capturează StdOut.
  3. Scrie StdOut rezultat în urma apelului într-un fișier .out, (0.out, 1.out etc.).

Mai avem nevoie de o funcție care să pună cap la cap cele N fișiere out rezultate, în ordinea numerelor:

# splice output files
def splice(n):
    result = open("result.txt", "w")
    for i in range(n):
        out_file = open(str(i) + ".out")
        result.writelines(out_file.readlines())
        out_file.close()
    result.close()

Asamblăm totul într-un script care primește la linia de comandă trei argumente: numărul de thread-uri dorite, soluția pe care să o apeleze și numele fișierului input:

# split input file
n = split(input_file)

# start thread pool
pool = ThreadPool(threads, script, n)
pool.start()

# splice output files
splice(n)

În loc de ./mysolution input.in > result.txt, acum putem apela ./threadpool 8 ./mysolution input.in cu același rezultat, de data asta obținut de 8 ori mai repede.

Implementarea de mai sus e pentru Python 3, cred că trebuie modificată puțin ca să meargă cu Python 2. Codul sursă complet aici: http://pastebin.com/Gmr1HLDs

From → code complete

4 Comments
  1. interesant…
    insa nu cred ca implementarea algoritmul pt problemele astora ar tb sa ruleze mai mult de 1 minut pe intrarea care ti-o dau.
    Alea 6 minute is fix de aiurea. Aveau sens oarecum in runda de calificare, insa si acolo…
    Din moment ce trebuie sa dai si codul nu conteaza daca is 6 minute sau 60 de minute.

  2. vladr permalink

    Să știi că și în runda 1 e limita de timp de 6 minute. Desigur, dacă ai un algoritm prost, chestia de mai sus nu te ajută mare lucru dar când ești contra timp, m-am gândit că nu strică dacă pot face lucrurile să meargă mai repede. A, și dacă trebuie să trimiți codul sursă, nu trebuie neapărat să trimiți și thread pool-ul😉

  3. nu am zis ca nu au limita de timp… am zis ca e destul de retard sa ai o limita de timp in momentul in care ceri si codul

  4. vladr permalink

    Spune-mi ceva ce NU e retard la concursul ăsta :))

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: