Concurrency in Python - Pool of Processes
Pool of process can be created and used in the same way as we have created and used the pool of threads. Process pool can be defined as the group of pre-instantiated and idle processes, which stand ready to be given work. Creating process pool is preferred over instantiating new processes for every task when we need to do a large number of tasks.
Python Module – Concurrent.futures
Python standard library has a module called the concurrent.futures. This module was added in Python 3.2 for providing the developers a high-level interface for launching asynchronous tasks. It is an abstraction layer on the top of Python’s threading and multiprocessing modules for providing the interface for running the tasks using pool of thread or processes.
In our subsequent sections, we will look at the different subclasses of the concurrent.futures module.
Executor Class
Executor is an abstract class of the concurrent.futures Python module. It cannot be used directly and we need to use one of the following concrete subclasses −
- ThreadPoolExecutor
- ProcessPoolExecutor
ProcessPoolExecutor – A concrete subclass
It is one of the concrete subclasses of the Executor class. It uses multi-processing and we get a pool of processes for submitting the tasks. This pool assigns tasks to the available processes and schedule them to run.
How to create a ProcessPoolExecutor?
With the help of the concurrent.futures module and its concrete subclass Executor, we can easily create a pool of process. For this, we need to construct a ProcessPoolExecutor with the number of processes we want in the pool. By default, the number is 5. This is followed by submitting a task to the process pool.
Example
We will now consider the same example that we used while creating thread pool, the only difference being that now we will use ProcessPoolExecutorinstead of ThreadPoolExecutor .
from concurrent.futures import ProcessPoolExecutor from time import sleep def task(message): sleep(2) return message def main(): executor = ProcessPoolExecutor(5) future = executor.submit(task, ("Completed")) print(future.done()) sleep(2) print(future.done()) print(future.result()) if __name__ == '__main__': main()
Output
False False Completed
In the above example, a ProcessPoolExecutor has been constructed with 5 threads. Then a task, which will wait for 2 seconds before giving the message, is submitted to the process pool executor. As seen from the output, the task does not complete until 2 seconds, so the first call to done() will return False. After 2 seconds, the task is done and we get the result of the future by calling the result() method on it.
Instantiating ProcessPoolExecutor – Context Manager
Another way to instantiate ProcessPoolExecutor is with the help of context manager. It works similar to the method used in the above example. The main advantage of using context manager is that it looks syntactically good. The instantiation can be done with the help of the following code −
with ProcessPoolExecutor(max_workers = 5) as executor
Example
For better understanding, we are taking the same example as used while creating thread pool. In this example, we need to start by importing the concurrent.futures module. Then a function named load_url() is created which will load the requested url. The ProcessPoolExecutor is then created with the 5 number of threads in the pool. The ProcessPoolExecutor has been utilized as context manager. We can get the result of the future by calling the result() method on it.
import concurrent.futures from concurrent.futures import ProcessPoolExecutor import urllib.request URLS = ['http://www.foxnews.com/', 'http://www.cnn.com/', 'http://europe.wsj.com/', 'http://www.bbc.co.uk/', 'http://some-made-up-domain.com/'] def load_url(url, timeout): with urllib.request.urlopen(url, timeout = timeout) as conn: return conn.read() def main(): with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor: future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print('%r generated an exception: %s' % (url, exc)) else: print('%r page is %d bytes' % (url, len(data))) if __name__ == '__main__': main()
Output
The above Python script will generate the following output −
'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed> 'http://www.foxnews.com/' page is 229476 bytes 'http://www.cnn.com/' page is 165323 bytes 'http://www.bbc.co.uk/' page is 284981 bytes 'http://europe.wsj.com/' page is 967575 bytes
Use of the Executor.map() function
The Python map() function is widely used to perform a number of tasks. One such task is to apply a certain function to every element within iterables. Similarly, we can map all the elements of an iterator to a function and submit these as independent jobs to the ProcessPoolExecutor. Consider the following example of Python script to understand this.
Example
We will consider the same example that we used while creating thread pool using the Executor.map() function. In the example givenbelow, the map function is used to apply square() function to every value in the values array.
from concurrent.futures import ProcessPoolExecutor from concurrent.futures import as_completed values = [2,3,4,5] def square(n): return n * n def main(): with ProcessPoolExecutor(max_workers = 3) as executor: results = executor.map(square, values) for result in results: print(result) if __name__ == '__main__': main()
Output
The above Python script will generate the following output
4 9 16 25
When to use ProcessPoolExecutor and ThreadPoolExecutor?
Now that we have studied about both the Executor classes – ThreadPoolExecutor and ProcessPoolExecutor, we need to know when to use which executor. We need to choose ProcessPoolExecutor in case of CPU-bound workloads and ThreadPoolExecutor in case of I/O-bound workloads.
If we use ProcessPoolExecutor, then we do not need to worry about GIL because it uses multiprocessing. Moreover, the execution time will be less when compared to ThreadPoolExecution. Consider the following Python script example to understand this.
Example
import time import concurrent.futures value = [8000000, 7000000] def counting(n): start = time.time() while n > 0: n -= 1 return time.time() - start def main(): start = time.time() with concurrent.futures.ProcessPoolExecutor() as executor: for number, time_taken in zip(value, executor.map(counting, value)): print('Start: {} Time taken: {}'.format(number, time_taken)) print('Total time taken: {}'.format(time.time() - start)) if __name__ == '__main__': main()
Output
Start: 8000000 Time taken: 1.5509998798370361 Start: 7000000 Time taken: 1.3259999752044678 Total time taken: 2.0840001106262207 Example- Python script with ThreadPoolExecutor: import time import concurrent.futures value = [8000000, 7000000] def counting(n): start = time.time() while n > 0: n -= 1 return time.time() - start def main(): start = time.time() with concurrent.futures.ThreadPoolExecutor() as executor: for number, time_taken in zip(value, executor.map(counting, value)): print('Start: {} Time taken: {}'.format(number, time_taken)) print('Total time taken: {}'.format(time.time() - start)) if __name__ == '__main__': main()
Output
Start: 8000000 Time taken: 3.8420000076293945 Start: 7000000 Time taken: 3.6010000705718994 Total time taken: 3.8480000495910645
From the outputs of both the programs above, we can see the difference of execution time while using ProcessPoolExecutor and ThreadPoolExecutor.
No comments:
Post a Comment