Thread synchronization ในภาษา Python

ในบทนี้ คุณจะได้เรียนรู้เกี่ยวกับ Thread synchronization ในภาษา Python ซึ่งเป็นปัญหาที่สามารถเกิดขึ้นได้เมื่อโปรแกรมทำงานพร้อมกันแบบหลาย Thread มาดูว่ามันคืออะไร และจะสามารถแก้ไขปัญหานี้ได้อย่างไรบ้าง นี่เป็นเนื้อหาของบทนี้

  • Thread synchronization
  • Lock object
  • RLock object
  • Semaphor object

Thread synchronization

Thread synchronization คือการที่ตั้งแต่สอง Thread ขึ้นไปมีการทำงานแบบขนานกันและใช้งานทรัพยากรบางอย่างพร้อมกัน เพื่อแก้ไขปัญหาการเข้าถึงทรัพยากรพร้อมกัน Thread จะต้องได้รับแก้ไขให้ทำงานแบบลำดับในการเข้าถึงทรัพยากรเหล่านั้น ไม่เช่นนั้น Thread ที่ทำงานแบบขนานกันอาจทำงานไม่ถูกต้องตามที่คาดหวัง ยกตัวอย่างเช่น การเปลี่ยนแปลงหรืออ่านค่าจากตัวแปรในเวลาเดียวกัน เป็นต้น

ในภาษา Python นั้นมีคลาสที่สามารถใช้จัดการกับ Thread synchronization ได้ ซึ่งเราจะแนะนำให้รู้จักกับคลาสที่สำคัญและใช้บ่อยในการแก้ไขปัญหานี้ นั่นคือ Lock, RLock และ Semaphor

Lock object

Lock นั้นเป็นคลาส Synchronization แบบพื้นฐานและเรียบง่ายที่สุดในภาษา Python ออบเจ็คของคลาสไม่ได้เป็นเจ้าของโดย Thread ใดๆ เมื่อมันถูกล็อค นั่นหมายความว่าทุก Thread สามารถที่จะล็อคและปลดล็อคออบเจ็ค Lock ได้ มาดูตัวอย่างโปรแกรมสำหรับแสดงผลตัวเลขจาก 1 - 10 ออกทางหน้าจอจากหลาย Thread โดยการใช้ Lock เพื่อควบคุมการทำงานแบบพร้อมกันของ Thread นี่เป็นตัวอย่าง

thread_lock.py
from threading import Thread, Lock

# This variable shared for all threads
count = 1
# Create lock object
lock = Lock()

class Counter(Thread):

    def run(self):
        global count

        while count <= 10:
            # Accessing shared resource
            lock.acquire()
            n = count
            count = count + 1
            lock.release()

            print("%s: %d" % (self.name, n))

thr1 = Counter()
thr2 = Counter()

thr1.start()
thr2.start()

นี่เป็นผลลัพธ์การทำงานของโปรแกรม

Thread-1: 1
Thread-1: 2
Thread-2: 3
Thread-2: 5
Thread-1: 4
Thread-2: 6
Thread-1: 7
Thread-2: 8
Thread-1: 9
Thread-2: 10

ในตัวอย่างเป็นโปรแกรมแสดงตัวเลขจาก 1 - 10 ออกทางหน้าจอโดยการใช้สอง Thread สำหรับการทำงานดังกล่าว

# This variable shared for all threads
count = 1
# Create lock object
lock = Lock()

ตัวแปร count สำหรับเก็บตัวเลขเพื่อนับในการแสดงผลที่มีค่าเริ่มจาก 1 มันเป็นตัวแปรที่จะถูกเข้าถึงโดยสอง Thread หรือกล่าวอีกนัยหนึ่ง มันเป็นทรัพยากรที่ถูกใช้ร่วมกันจากหลาย Thread ส่วน lock เป็นออบเจ็คจากคลาส Lock ที่ใช้สำหรับควบคุมให้เพียงแค่หนึ่ง Thread เท่านั้นที่สามารถใช้งานตัวแปร count ได้ในขณะนั้น

จากนั้นเราสร้างคลาส Counter ซึ่งเป็นคลาสที่สืบทอดมาจากคลาส Thread สำหรับแสดงตัวเลขวนแสดงตัวเลข 1 - 10 ออกทางหน้าจอ

def run(self):
    global count
...

เมธอด run() จะเริ่มทำงานเมื่อเรียกใช้เมธอด start() บนออบเจ็ค Thread และเนื่องจากตัวแปร count นั้นอยู่นอกคลาส เพื่อใช้งานภายในคลาสเราต้องอ้างถึงมันด้วยคำสั่ง global จากนั้นใช้คำสั่ง while loop เพื่อตรวจสอบว่าในขณะที่ค่าในตัวแปร count น้อยกว่าหรือเท่ากับ 10 ให้โปรแกรมทำงานในลูป

while count <= 10:
    # Accessing shared resource
    lock.acquire()
    n = count
    count = count + 1
    lock.release()

    print("%s: %d" % (self.name, n))

การเข้าถึงตัวแปร count และการอัพเดทค่าในตัวแปรอาจทำให้เกิด Synchronization ขึ้น ซึ่งเป็นโค้ดที่อยู่ระหว่างคำสั่ง lock.acquire() และ lock.release() เราเรียกส่วนการทำงานนี้ว่า Critical section ดังนั้นก่อนการเข้าถึงเราได้ทำการล็อคส่วนนี้การเข้าถึงด้วยเมธอด acquire() และเมื่อทำงานเสร็จแล้วปลดล็อคมันด้วยเมธอด release()

ในขณะเดียวกัน ถ้าหากในตอนที่ Thread เรียกใช้งานเมธอด acquire() นั้นออบเจ็คถูกล็อกอยู่ด้วย Thread อื่น คำสั่งนี้จะบล็อคการทำงานและรอจนกว่า Thread อื่นจะมีการเรียกเมธอด release() เพื่อปลอดล็อคก่อนก่อนที่มันจะสามารถใช้ได้

ต่อไปมาวิเคราะห์ปัญหาที่แท้จริงว่าเกิดอะไรขึ้น และทำไมเราจึงต้องแก้ไขมันด้วย Thread Synchronization และถ้าหากไม่ทำจะเกิดผลอย่างไรกับโปรแกรม

n = count # (1) <- Thread 1, Thread 2
count = count + 1

การเข้าถึงตัวแปรพร้อมกันสามารถเกิดขึ้นได้จากคำสั่งสองบรรทัดนี้ เนื่องจากว่าทั้งสอง Thread นั้นทำงานแบบขนานกัน ลองจินตนาการว่า Thread ทั้งสองนั้นทำงานมาถึงบรรทัด (1) พร้อมกัน Thread แรกอ่านค่าจากตัวแปร count มาเก็บไว้ในตัวแปร n

n = count # (1) Thread 2
count = count + 1 # (2) <- Thread 1

หลังจากที่ Thread แรกอ่านข้อมูลเสร็จสิ้นแล้ว เราคาดหวังว่าค่าในตัวแปร count จะเพิ่มขึ้นเพื่อให้ Thread ที่สองอ่านต่อ แต่ในการทำงานจริงของคอมพิวเตอร์ เนื่องจากทั้ง Thread ทำงานพร้อมกัน มันเป็นไปได้ว่าในขณะที่ Thread แรกยังไม่ได้เพิ่มค่าในบรรทัด (2) Thread ที่สองอาจอ่านค่าจากตัวแปร count มาแล้ว ซึ่งเป็นค่าเดียวกับที่ Thread แรกอ่านไปก่อนหน้า ซึ่งนี่ทำให้ทั้งสอง Thread ได้ตัวเลขเป็นค่าเดียวกัน ซึ่งไม่ใช่สิ่งที่เราต้องการให้มันเป็น

การทำงานที่ถูกต้องคือ Thread แรกควรเพิ่มค่าในตัวแปร count ก่อน ก่อนที่ Thread ที่สองจะสามารถอ่านได้ นั่นหมายความว่าเราจะต้องป้องกันส่วนนี้โดยการใช้ Lock ออบเจ็คเข้ามาช่วย

lock.acquire()
n = count
count = count + 1
lock.release()

ดังนั้นเพื่อป้องกันปัญดังกล่าว เราต้องทำให้ส่วนนี้สามารถทำงานได้ทีละ Thread นั่นหมายความว่าคำสั่งที่อยู่ระหว่างเมธอด lock.acquire() และ lock.release() จะมีเพียงแค่หนึ่ง Thread เท่านั้นที่สามารถทำงานได้ในขณะนั้น นั่นเพื่อเป็นการยืนยันว่าค่าในตัวแปร count จะถูกเพิ่มขึ้นก่อนเสมอ ก่อนที่ Thread อื่นจะมาอ่านค่าต่อไป

เนื่องจากออบเจ็ค Lock ไม่ได้เป็นเจ้าของโดย Thread ใดๆ เมื่อมันถูกล็อก มันสามารถถูกปลดล็อคจาก Thread อื่นได้ ดังนั้นคุณต้องออกแบบลำดับการทำงานของโปรแกรมให้ดีในการใช้งาน

RLock object

RLock หรือ Re-entrant lock นั้นเป็นคลาสสำหรับจัดการ Synchronization เหมือนกับ Lock แต่สิ่งที่แตกต่างกันคือ Thread ที่ทำการล็อคจะเป็นเจ้าของ RLock ออบเจ็คนั้นและมีเพียง Thread ดังกล่าวเท่านั้นที่จะสามารถปลดล็อคได้ และนอกจากนีิ้ RLock ยังสามารถที่จะให้ Thread ทำการล็อคแบบซ้อนกันเข้าไปได้ แต่เราจะไม่พูดถึงมันในบทนี้

สำหรับการใช้งานโดยทั่วไปนั้นเราสามารถใช้คลาส RLock แทน Lock ได้และการใช้ RLock จะช่วยให้เราแน่ใจว่า Thread ที่ทำการล็อคเท่านั้นที่จะสามารถปลดล็อคได้ ในตัวอย่างนี้จะคล้ายกับตัวอย่างก่อนหน้า แต่จะเป็นการหาจำนวนเฉพาะจาก 1 - 100 แทน

thread_rlock.py
from threading import Thread, RLock

count = 1
lock = RLock()

def is_prime(n):
    if n == 1:
        return False
    for i in range(2, n + 1):
        if n % i == 0 and i != n:
            return False
    return True

class Counter(Thread):

    def run(self):
        global count

        while count <= 100:
            # Accessing shared resource
            lock.acquire()
            n = count
            count = count + 1
            lock.release()

            if is_prime(n):
                print("%s: %d" % (self.name, n))

thr1 = Counter()
thr2 = Counter()

print("Prime numbers from 1 to 100")
thr1.start()
thr2.start()

นี่เป็นผลลัพธ์การทำงานของโปรแกรม

Prime numbers from 1 to 100
Thread-1: 2
Thread-1: 3
Thread-2: 5
Thread-1: 7
Thread-2: 11
Thread-1: 13
Thread-2: 17
Thread-1: 19
Thread-2: 23
Thread-1: 29
...

ในตัวอย่างนี้เป็นโปรแกรมสำหรับค้นหาจำนวนเฉพาะที่มีค่าอยู่ระหว่าง 1 - 100 โดยการใช้ Thread ในภาษา Python ในการทำงานนั้นแต่ละ Thread จะนำค่าในตัวแปร count มาคำนวณและเพิ่มค่าขึ้นไป 1 สำหรับการทำงานในรอบถัดไป

lock.acquire()
n = count
count = count + 1
lock.release()

และเช่นเดิม ตัวแปร count เป็นค่าที่ Thread ใช้ร่วมกัน ดังนั้นก่อนเข้าถึงเราได้เรียกใช้เมธอด acquire() เพื่อทำการล็อคให้มันสามารถเข้าถึงได้ทีละ Thread และเมื่อได้ค่าตัวเลขสำหรับนำไปหาจำนวนเฉพาะมาเก็บไว้ในตัวแปร n แล้วเพิ่มค่าในตัวแปร count ขึ้น 1 และเรียกใช้เมธอด release() เพื่อปลดล็อคเพื่อให้ Thread อื่นเข้าถึงได้ต่อไป

def is_prime(n):
    if n == 1:
        return False
    for i in range(2, n + 1):
        if n % i == 0 and i != n:
            return False
    return True

ฟังก์ชัน is_prime ใช้สำหรับตรวจสอบว่าตัวเลขเป็นจำนวนเฉพาะหรือไม่ เนื่องจากจำนวนเฉพาะคือจำนวนที่มีเพียง 1 และตัวมันเองเท่านั้นที่สามารถหารลงตัวได้ ดังนั้นเราใช้คำสั่ง for loop เพื่อวนหาว่ามีตัวเลขที่มากกว่า 1 และไม่เท่ากับ n หารลงตัวหรือไม่ ถ้าใช่หมายความว่าตัวเลขดังกล่าวไม่เป็นจำนวนเฉพาะ แต่ถ้าหารแล้วไม่มีตัวเลขที่หารลงตัวเลยแสดงว่าตัวเลขเป็นจำนวนเฉพาะ

thr1 = Counter()
thr2 = Counter()

thr1.start()
thr2.start()

จากนั้นสร้างออบเจ็ค Thread เพื่อให้ Thread ทั้งสองเริ่มทำงานเพื่อค้นหาจำนวนเฉพาะจาก 1 - 100 และแสดงผลออกทางหน้าจอ คุณสามารถสร้างมากกว่าสอง Thread เพื่อให้ช่วยกันทำงานได้

Semaphore object

Semaphore นั้นเป็นวิธีที่เก่าแก่ที่สุดในการทำ Synchronization ในการเขียนโปรแกรมคอมพิวเตอร์ มันถูกคิดค้นโดย Edsger W. Dijkstra เรามักจะใช้มันสำหรับควบคุมการทำงานของ Thread ในการเข้าถึงทรัพยากรที่มีอยู่อย่างจำกัด ยกตัวอย่างเช่น หน่วยความจำ ฐานข้อมูล หรือเน็ตเวิร์ค เป็นต้น

Semaphore ถูกใช้สำหรับเพื่อกำหนดให้ N Thread เท่านั้นที่สามารถทำงานได้ในขณะนั้น ในตัวอย่างนี้จะเป็นการใช้งาน Semaphore เพื่อจัดการปัญหาการใช้งานคอมพิวเตอร์ในห้องสมุดของโรงเรียนแห่งหนึ่ง โดยที่คอมพิวเตอร์นั้นมีอยู่อย่างจำกัด นี่เป็นตัวอย่าง

thread_semaphore.py
from threading import Thread, Semaphore
from time import sleep, gmtime, strftime

number_of_computers = 5
number_of_students = 20

semaphore = Semaphore(number_of_computers)

def current_time():
    return strftime("%H:%M:%S", gmtime())

class Student(Thread):

    def __init__(self, name):
        Thread.__init__(self)
        self.name = name

    def run(self):
        semaphore.acquire()
        print("%s: started using a computer [%s]" % 
        (self.name, current_time()))
        sleep(1)
        print("%s: finished using a computer [%s]" % 
        (self.name, current_time()))
        semaphore.release()

for i in range(1, number_of_students + 1):
    Student("Student " + str(i)).start()

ในตัวอย่าง เป็นโปรแกรมจำลองการใช้งานคอมพิวเตอร์ในห้องสมุดของโรงเรียนแห่งหนึ่ง ที่มีคอมพิวเตอร์อยู่อย่างจำกัดนั่นคือ 5 เครื่อง ซึ่งนักเรียนที่ต้องการใช้งานคอมพิวเตอร์นั้นจะเหมือน Thread ที่ต้องการใช้งานทรัพยากร นั่นก็คือคอมพิวเตอร์ที่มีอยู่อย่างจำกัดนั่นเอง

number_of_computers = 5
number_of_students = 20

ในตอนแรกเราได้กำหนดจำนวนของคอมพิวเตอร์และนักเรียนที่มีไว้ในตัวแปร ตัวแปร number_of_computers ใช้สำหรับกำหนดการทำงานพร้อมกันของ Thread และตัวแปร number_of_students เป็นการกำหนดจำนวน Thread ที่จะสร้างขึ้นเพื่อใช้งานคอมพิวเตอร์

def current_time():
    return strftime("%H:%M:%S", gmtime())

ฟังก์ชัน current_time ใช้สำหรับรับเอาค่าเวลาปัจจุบันเพื่อแสดงเวลาการใช้งานคอมพิวเตอร์ เพื่อช่วยให้เราสามารถเข้าใจการทำงานของโปรแกรมได้ง่ายขึ้น

semaphore = Semaphore(number_of_computers)

จากนั้นสร้างออบเจ็คจากคลาส Semaphore ที่รับพารามิเตอร์เป็นจำนวนของ Thread ที่สามารถทำงานได้พร้อมกัน ออบเจ็ค semaphore จะทำหน้าที่เป็นเหมือนตัวบอกสัญญาณว่าตอนนี้ Thread ที่ทำงานอยู่ครบ 5 หรือไม่และอนุญาติให้ทำงานหากยังไม่ครบ

def run(self):
    semaphore.acquire()
    print("%s: started using a computer [%s]" % 
    (self.name, current_time()))
    sleep(1)
    print("%s: finished using a computer [%s]" % 
    (self.name, current_time()))
    semaphore.release()

ในคลาส Student ออบเจ็ค semaphore จะเป็นเหมือนตัวบอกสัญญาณสำหรับควบคุมการเข้าถึงคอมพิวเตอร์ เมื่อเราเรียกใช้เมธอด acquire() โปรแกรมจะทำการนับการใช้งานขึ้น 1 และถ้าหากตอนนี้มีการใช้งานครบ 5 แล้ว Thread ที่เรียกใช้จะต้องรอจนกว่า Thread อื่นจะเรียกใช้เมธอด release() นั่นทำให้ตัวนับลดลง 1 เพื่อให้ Thread อื่นเข้าใช้งานได้ต่อไปนั่นเอง

sleep(1)

ในการใช้งานคอมพิวเตอร์นั้น เราสมมติให้แต่ละ Thread จะใช้เวลา 1 วินาที โดยใช้ฟังก์ชัน sleep เพื่อหน่วงเวลาการทำงานเอาไว้

for i in range(1, number_of_students + 1):
    Student("Student " + str(i)).start()

สุดท้ายเป็นการใช้คำสั่ง for loop เพื่อวนสร้าง 20 Thread จากค่าในตัวแปร number_of_students และนี่เป็นผลลัพธ์การทำงานของโปรแกรม นี่เป็นการจำลองว่านักเรียนทั้ง 20 คนต้องการใช้คอมพิวเตอร์ และจะสามารถเข้าใช้ได้เพียงทีละ 5 คนเท่านั้น

Student 1: started using a computer [12:37:53]
Student 2: started using a computer [12:37:53]
Student 3: started using a computer [12:37:53]
Student 4: started using a computer [12:37:53]
Student 5: started using a computer [12:37:53]
Student 1: finished using a computer [12:37:54]
Student 6: started using a computer [12:37:54]
Student 5: finished using a computer [12:37:54]
Student 3: finished using a computer [12:37:54]
Student 4: finished using a computer [12:37:54]
Student 7: started using a computer [12:37:54]
Student 8: started using a computer [12:37:54]
Student 9: started using a computer [12:37:54]
Student 2: finished using a computer [12:37:54]
Student 10: started using a computer [12:37:54]
Student 6: finished using a computer [12:37:55]
Student 11: started using a computer [12:37:55]
Student 8: finished using a computer [12:37:55]
Student 7: finished using a computer [12:37:55]
Student 12: started using a computer [12:37:55]
Student 13: started using a computer [12:37:55]
Student 9: finished using a computer [12:37:55]
Student 10: finished using a computer [12:37:55]
...

จากผลลัพธ์การทำงานของโปรแกรมจะเห็นว่าโปรแกรมจะทำงานพร้อมกันทีละ 5 Thread เท่านั้น และเมื่อมี Thread ใดๆ ทำงานเสร็จ Thread ที่เหลือก็จะทำงานต่อไปเรื่อยๆ คนครบทั่งหมด 20 Thread

นอกจากนี้ เรายังสามารถใช้งาน Semaphore เพื่อให้ทำงานเหมือนกับคลาส Lock และ RLock ได้โดยการกำหนดจำนวนของ Thread เป็น 1 เหมือนกับในคำสั่งต่อไปนี้

number_of_computers = 1

นี่จะทำให้การทำงานของ Semaphore เหมือนสองวิธีที่คุณได้เรียนรู้ไปก่อนหน้า คือมีเพียงหนึ่ง Thread เท่านั้นที่สามารถเข้าถึงทรัพยากรได้ในขณะนั้น อย่างไรก็ตามแต่ละคลาสนั้นถูกออกแบบมาสำหรับวัตถุประสงค์การใช้งานที่แตกต่างกัน ถ้าคุณต้องการให้การเข้าถึงทรัพยากรทำได้ทีละ Thread การใช้ Lock และ RLock จะง่ายกว่า

และนี่ก็เป็นตัวอย่างสำหรับการจัดการ Synchronization ด้วย Semaphore ในภาษา Python อย่างไรก็ตาม ในการใช้งานในการเขียนโปรแกรมจริงของ Semaphore นั้นมักจะเป็นการควบคุมการเชื่อมต่อกับฐานข้อมูลด้วย Database Connection Pool โดยการกำหนดจำนวนการเชื่อมต่อสูงสุดที่สามารถใช้งานได้ในขณะนั้น เมื่อฐานข้อมูลของเราสามารถรองรับการเชื่อมต่อได้อย่างจำกัด

Thread Synchronization นั้นถูกใช้ในหลายด้านเพื่อแก้ไขปัญหาต่างๆ ในการเขียนโปรแกรม ยกตัวอย่างเช่น ในระบบไฟล์ของระบบปฏิบัติการใช้มันเพื่อควบคุมให้เพียง 1 โปรเซสเท่านั้นที่สามารถใช้งานไฟล์ หรือ I/O ได้ หรือในระบบจัดการฐานข้อมูลเพื่อควบคุมการทำงานของ Transaction

ในบทนี้ คุณได้เรียนรู้เกีี่ยวกับ Thread Synchronization ในภาษา Python เราได้แนะนำวิธีการต่างๆ ที่คุณสามารถใช้ได้ซึ่งเป็นคลาสจากไลบรารี่มาตรฐานของภาษา เช่น Lock RLock และ Semaphore