StarPU Handbook
Loading...
Searching...
No Matches
24. Python Interface

This chapter presents the StarPU Python Interface. It provides for those used to the Python language a more concise and easy-to-use StarPU interface.

This interface supports most of the main StarPU functionalities. While not all features of the C API are replicated in the Python Interface, additional functions tailored for Python's ease of use have been incorporated.

Several examples using the Python API are provided in the directory starpupy/examples/.

24.1 Installation of the Python Interface

Calling configure will enable by default the StarPU Python Interface. You can also specify the option --enable-starpupy which will fail if some requirements are missing. For now, the only requirement is the availability of the python3 interpreter.

The python modules joblib and cloudpickle are mandatory to run parallel codes.

The python module numpy is recommended, but not mandatory.

$ pip3 install joblib
$ pip3 install cloudpickle
$ pip3 install numpy
$  ../configure --enable-starpupy --enable-blocking-drivers --prefix=$HOME/usr/starpu
$ make
$ make install

You can then go to the directory in which StarPU is installed, and test the provided Python examples.

$ cd $HOME/usr/starpu
$ . ./bin/starpu_env
Setting StarPU environment for ...
$ cd lib/starpu/python
$ python3 starpu_py.py
Example 1:
Hello, world!
...
$

24.2 Python Parallelism

Python interpreters share the Global Interpreter Lock (GIL), which requires that at any time, one and only one thread has the right to execute a task. With Python versions up to 3.11, if the application is pure Python script, even with multi-interpreters, the program cannot be executed in parallel. The sharedGIL makes the multiple interpreters execution of Python actually serial rather than parallel, and the execution of Python program is single-threaded essentially.

For the pure Python script with python versions up to 3.11, the only way to achieve parallelism is to use the master-slave mechanism (Section Master Slave Support). Parallelism may be implemented with multi-interpreters in the future Python version. Details can be found in Section Multiple Interpreters. Otherwise parallelism can be achieved when external C applications are called or external APIs e.g. BLAS API is used for Numpy objects.

Starting from python version 3.12, multiple interpreters can use a separate GIL, to allow parallelism of pure python code. This can be enabled by setting STARPUPY_OWN_GIL to 1. Some corner cases are however not supported yet in python 3.12, notably the usage of futures.

24.3 Using StarPU in Python

The StarPU module should be imported in any Python code wanting to use the StarPU Python interface.

import starpu

Before using any StarPU functionality, it is necessary to call starpu.init(). The function starpu.shutdown() should be called after all StarPU functions have been called.

import starpu
starpu.init()
# ...
starpu.shutdown()

24.3.1 Submitting Tasks

One of the fundamental aspects of StarPU is the task submission. The Python Interface greatly simplifies this process, allowing for direct calls to the submission function without any extra complexities.

The Python function used for task submission follows the format: task_submit(options)(func, *args, **kwargs). In this structure:

  • func represents any Python function.
  • args and kwargs denote the function's arguments.

You can also provide the function as a string.

By submitting tasks through this function, you enable StarPU to perform optimizations for your program's execution. It's recommended to submit all tasks to ensure StarPU's efficient scheduling of the underlying tasks. It's important to note that submitted tasks do not execute immediately, and you can retrieve the return value only after the task execution.

The first set of parentheses allows to specify various options. Keep in mind that each option has a default value, and even if you're not providing any options, the parentheses should be retained. The options are as follows:

  • name (string, default: None) : Set the name of the task. This can be useful for debugging purposes.
  • synchronous (unsigned, default: 0) : If this flag is set, task_submit() only returns when the task has been executed (or if no worker is able to process the task). Otherwise, task_submit() returns immediately.
  • priority (int, default: 0) : Set the level of priority for the task. This is an integer value whose value must be greater than the return value of the function starpu.sched_get_min_priority() (for the least important tasks), and lower or equal to the return value of the function starpu.sched_get_max_priority() (for the most important tasks). Default priority is defined as 0 in order to allow static task initialization. Scheduling strategies that take priorities into account can use this parameter to take better scheduling decisions, but the scheduling policy may also ignore it.
  • color (unsigned, default: None) : Set the color of the task to be used in dag.dot.
  • flops (double, default: None) : Set the number of floating points operations that the task will have to achieve. This is useful for easily getting GFlops/s curves from the function starpu.perfmodel_plot, and for the hypervisor load balancing.

  • perfmodel (string, default: None) : Set the name of the performance model. This name will be used as the filename where the performance model information will be saved. After the task is executed, one can call the function starpu.perfmodel_plot() by giving the symbol of perfmodel to view its performance curve.

24.3.2 Returning Future Object

In order to realize asynchronous frameworks, the task_submit() function returns a Future object. This is an extended use of StarPU provided by the Python interface. A Future represents an eventual result of an asynchronous operation. It is an awaitable object, Coroutines can await on Future objects until they either have a result or an exception set, or until they are canceled. Some basic examples are available in the script starpupy/examples/starpu_py.py.

This feature needs the asyncio module to be imported.

import starpu
import asyncio
starpu.init()
def add(a, b):
return a+b
async def main():
fut = starpu.task_submit()(add, 1, 2)
res = await fut
print("The result of function is", res)
asyncio.run(main())
starpu.shutdown()

Execution:

The result of function is 3

When using at least the version 3.8 of python, one can also use the parameter -m asyncio which allows to directly use await instead of asyncio.run().

$ python3 -m asyncio
>>> import asyncio
import starpu
starpu.init()
def add(a, b):
print("The result is ready!")
return a+b
fut = starpu.task_submit()(add, 1, 2)
The result is ready!
res = await fut
res
3

You can also use the decorator starpu.delayed to wrap a function. The function can then directly be submitted to StarPU and will automatically create a Future object.

@starpu.delayed
def add_deco(a, b):
print("The result is ready!")
return a+b
fut = add_deco(1, 2)
The result is ready!
res = await fut
res
3

To specify options when using the decorator, just do as follows:

@starpu.delayed(name="add", color=2, perfmodel="add_deco")
def add_deco(a, b):
print("The result is ready!")
return a+b
fut = add_deco(1, 2)
The result is ready!
res = await fut
res
3

A Future object can also be used for the next step calculation even before being ready. The calculation will be postponed until the Future has a result.

In this example, after submitting the first task, a Future object fut1 is created, and it is used as an argument of a second task. The second task is submitted even without having the return value of the first task.

import asyncio
import starpu
import time
starpu.init()
def add(a, b):
time.sleep(10)
print("The first result is ready!")
return a+b
def sub(x, a):
print("The second result is ready!")
return x-a
fut1 = starpu.task_submit()(add, 1, 2)
fut2 = starpu.task_submit()(sub, fut1, 1)
The first result is ready!
The second result is ready!
res = await fut2
res
2

24.3.3 Submit Python Objects Supporting The Buffer Protocol

The Python buffer protocol is a framework in which Python objects can expose raw byte arrays to other Python objects. This can be extremely useful to efficiently store and manipulate large arrays of data. The StarPU Python Interface allows users to use such objects as task parameters.

import asyncio
import starpu
import time
import numpy as np
starpu.init()
def add(a,b):
c = np.zeros(np.size(a))
for i in range(np.size(a)):
c[i] = a[i] + b[i]
return c
a = np.array([1, 2, 3])
b = np.array([4, 5, 6])
fut = starpu.task_submit()(add, a, b)
res = await fut
res
array([5., 7., 9.])

StarPU uses a specific data interface to handle Python objects supporting buffer protocol, such python objects are then managed by the StarPU data management library which allows minimizing data transfers between accelerators, and avoids copying the object each time.

We show the performances below of the numpy addition (numpy.add running the script test_perf.sh) with different array sizes (10, 20, ..., 100, 200, ..., 1000, 2000, ..., 10000, 20000, ..., 100000, 200000, ..., 1000000, 2000000, ..., 10000000, ..., 50000000). We compare two cases:

  1. Using StarPU,
  2. Without using StarPU tasks, but directly calling the numpy.add function.

The first plot compares the task submission time when using StarPU and the program execution time without using StarPU. We can see that there is an obvious optimization using StarPU when the test array size is large. The task has not finished its execution yet as shown in second figure, the time can be used to perform other operations.

We can also define our own function to do the numpy operation, e.g. the element addition:

def add(a, b):
for i in range(np.size(a)):
a[i] = a[i] + b[i]

We will compare operation performances with the same two cases, but based on our custom function add(a, b).

We can see that the custom function is not as efficient as the numpy function overall. The optimization for large arrays is the same when using StarPU.

24.3.3.1 Access Mode Annotation

StarPU defines different access modes for a data, it can be readable (access mode is R), writable (access mode is W), or both readable and writable (access mode is RW). The default access mode is R.

For the Python interface, these modes can be defined as shown below.

  1. Using the decorator starpu.access(arg="R/W/RW") to wrap the function.

    a = np.array([1, 2, 3, 4, 5, 6])
    e = np.array([0, 0, 0, 0, 0, 0, 0])
    @starpu.access(a="R", b="W")
    def assign(a,b):
    for i in range(min(np.size(a), np.size(b))):
    b[i]=a[i]
    fut = starpu.task_submit()(assign, a, e)
    starpu.acquire(e)
    array([1, 2, 3, 4, 5, 6, 0])
    
    starpu.release(e)

  2. Using the decorator starpu.delayed(options, arg="R/W/RW").

    @starpu.delayed(a="R", b="W")
    def assign(a,b):
    for i in range(min(np.size(a), np.size(b))):
    b[i]=a[i]
    fut = assign(a, e)
    starpu.acquire(e)
    array([1, 2, 3, 4, 5, 6, 0])
    
    starpu.release(e)

  3. Using the method starpu.set_access(func, arg="R/W/RW") that will create a new function.

    def assign(a,b):
    for i in range(min(np.size(a), np.size(b))):
    b[i]=a[i]
    assign_access=starpu.set_access(assign, a="R", b="W")
    fut = starpu.task_submit()(assign_access, a, e)
    starpu.acquire(e)
    array([1, 2, 3, 4, 5, 6, 0])
    
    starpu.release(e)

24.3.3.2 Methods

Once the access mode of one argument is set to at least W, it may be modified during the task execution. We should pay attention that before the task is finished, we cannot get the up-to-date value of this argument by simply using print function. For example:

import asyncio
import starpu
import time
import numpy as np
starpu.init()
a = np.array([1, 2, 3, 4, 5, 6])
e = np.array([0, 0, 0, 0, 0, 0, 0])
@starpu.access(a="R", b="W")
def assign(a,b):
time.sleep(10)
for i in range(min(np.size(a), np.size(b))):
b[i]=a[i]
fut = starpu.task_submit()(assign, a, e)
print(e) # before the task is finished
[0 0 0 0 0 0 0]

We print argument e right after submitting the task, but since the task is not finished yet, we can only get its unchanged value. If we want to get its up-to-date value, we need extra functions.

In order to access data registered to StarPU outside tasks, we provide an acquire and release mechanism.

  • The starpu.acquire(data, mode) method should be called to access registered data outside tasks (Refer to the C API starpu_data_acquire()). StarPU will ensure that the application will get an up-to-date copy of handle in main memory located where the data was originally registered, and that all concurrent accesses (e.g. from tasks) will be consistent with the access mode specified with the given mode (R the default mode, W or RW).

  • The starpu.release(data) method must be called once the application no longer needs to access the piece of data (Refer to the C API starpu_data_release()).

  • The starpu.unregister(data) method must be called to unregister the Python object from StarPU. (Refer to the C API starpu_data_unregister()). This method waits for all calculations to be finished before unregistering data.

With acquire, even we ask to access the argument right after submitting the task, the up-to-date value will be printed once the task is finished.

starpu.acquire(e) # before the task is finished
array([1, 2, 3, 4, 5, 6, 0])

In order to complete the addition operation example, execution steps are:

import asyncio
import starpu
import time
import numpy as np
starpu.init()
@starpu.access(a="RW", b="R")
def add(a,b):
time.sleep(10)
for i in range(np.size(a)):
a[i] = a[i] + b[i]
a = np.array([1, 2, 3])
b = np.array([4, 5, 6])
starpu.acquire(a, mode="R")
array([1, 2, 3])
starpu.release(a)
fut = starpu.task_submit()(add, a, b)
starpu.acquire(b, mode="R")
array([4, 5, 6])
starpu.acquire(a, mode="R") # before the task is finished
array([5, 7, 9])
starpu.release(a)
starpu.release(b)
starpu.unregister(a)
starpu.unregister(b)

The result of b is printed directly right after calling acquire, but the up-to-date value of a is printed after the task is finished. Here we need to pay attention that if we want to modify an argument during the task execution and get its up-to-date value for the future operation, we should set the access mode of this argument to at least W, otherwise this argument object is not synchronous, and the next task which needs this object will not wait its up-to-date value to execute.

If we call acquire but not release before the task submission, the task will not start to execute until the object is released.

An example is shown below:

import asyncio
import starpu
import numpy as np
import time
starpu.init()
@starpu.access(a="RW")
def add(a,b):
print("This is the addition function")
time.sleep(10)
for i in range(np.size(a)):
a[i] = a[i] + b[i]
a = np.array([1, 2, 3])
b = np.array([4, 5, 6])
starpu.acquire(a, mode="R")
array([1, 2, 3])
fut = starpu.task_submit()(add, a, b)
starpu.release(a)
This is the addition function   # The task will not start until "a" is released
starpu.acquire(a, mode="R") # Before the task is finished
array([5, 7, 9])                # After the task is finished
starpu.release(a)
starpu.unregister(a)
starpu.unregister(b)

24.4 StarPU Data Interface for Python Objects

StarPU uses data handles to manage a piece of data. A data handle keeps track of replicates of the same data (registered by the application) over various memory nodes. The data management library manages to keep them coherent. That also allows minimizing the data transfers, and avoids copying the object each time. Data handles are managed through specific data interfaces. Some examples applying this specific interface are available in script starpupy/examples/starpu_py_handle.py.

24.4.1 Interface for Ordinary Python Objects

A specific data interface has been defined to manage Python objects, such as constant (integer, float...), string, list, etc. This interface is defined with the class Handle. When submitting a task, instead of specifying a function and its arguments, we specify a function and the handles of its arguments.

In addition to returning a Future object, it is also possible to return a StarPU handle object when submitting a function. To do so, you need to set the starpu.task_submit option ret_handle to True, its default value is False.

import starpu
from starpu import Handle
starpu.init()
def add(x, y):
return x + y
x = Handle(2)
y = Handle(3)
res = starpu.task_submit(ret_handle=True)(add, x, y)

We then need to call the method get() to get the latest version of this Python Object.

res.get()
5

When not setting the parameter ret_handle, the return object is a Future.

res_fut = starpu.task_submit()(add, x, y)
await res_fut

If the Python object is immutable (such as int, float, str, tuple...), registering the same object several times is authorised. That means you can do this:

x = Handle(2)
x1 = Handle(2)

x and x1 are two different Handle objects.

24.4.2 Interface for Python Objects Supporting Buffer Protocol

This StarPU data interface can also be used to manage Python objects supporting buffer protocol, i.e numpy array, bytes, bytearray, array.array and memoryview object.

import numpy as np
import starpu
from starpu import Handle
starpu.init()
def add(a,b):
for i in range(np.size(a)):
a[i] = a[i] + b[i]
return a
a = np.array([1, 2, 3])
b = np.array([2, 4, 6])
a_h = Handle(a)
b_h = Handle(b)
res = starpu.task_submit(ret_handle=True)(add, a_h, b_h)
res.get()
array([3, 6, 9])

Different from immutable Python object, all Python objects supporting buffer protocol are mutable, and registering the same object one more time is not authorized. If you do this:

a = np.array([1, 2, 3])
a_h = Handle(a)
a1_h = Handle(a)

You will get an error message:

starpupy.error: Should not register the same mutable python object once more.

You may refer to Section Submit Python Objects Supporting The Buffer Protocol, and realize that StarPU Python interface uses data handles to manage Python objects supporting buffer protocol by default. These objects are usually relatively large, such as a big NumPy matrix. We want to avoid multiple copies and transfers of this data over various memory nodes, so we set the default starpu.task_submit() option arg_handle to True for users to allow their applications to get the most optimization. To deactivate the use of this data interface, you need to set the option arg_handle to False.

Since we use data handles by default, registration is implemented in the step of task submission. Therefore, you should be careful not to register again the same object after the task submission, like this:

a = np.array([1, 2, 3])
b = np.array([2, 4, 6])
res = starpu.task_submit(ret_handle=True)(add, a, b)
a_h = Handle(a)

You will get the error message:

starpupy.error: Should not register the same mutable python object once more.

As performances, we showed in Section Submit Python Objects Supporting The Buffer Protocol, we add one case to compare with the others two cases. We still test the numpy addition (numpy.add running the script test_handle_perf.sh) with different array sizes (10, 20, ..., 100, 200, ..., 1000, 2000, ..., 10000, 20000, ..., 100000, 200000, ..., 1000000, 2000000, ..., 10000000, ..., 50000000). Three cases are:

  1. Using StarPU and returning future object,
  2. Using StarPU and returning handle object,
  3. Without using StarPU tasks, but directly calling the numpy.add function.

The first plot compares the task submission time when using StarPU either returning a Future or a handle object and the program execution time without using StarPU. We can see that there is an obvious optimization using StarPU, either returning a Future or a handle object when the test array size is large. The task has not finished its execution yet as shown in second figure, the time can be used to perform other operations. When array size is not very large, returning a handle has a better execution performance than returning a Future.

We can also define our own function to do the numpy operation, e.g. the element addition:

def add(a, b):
for i in range(np.size(a)):
a[i] = a[i] + b[i]

We will compare operation performances with the same three cases but based on our custom function add(a, b).

We can see that the custom function is not as efficient as the numpy function overall. The optimisation for large arrays is the same when using StarPU.

24.4.2.1 Methods

As in Section Methods, the Handle class defines methods to provide an acquire and release mechanism.

  • The method Handle::acquire(mode) should be called before accessing the object outside tasks (Refer to the C API starpu_data_acquire()). The access mode can be "R", "W", "RW", the default value is "R". We will get an up-to-date copy of Python object by calling this method.

  • The method Handle::release() must be called once the application no longer needs to access the registered data (Refer to the C API starpu_data_release()).

  • The method Handle::unregister() to unregister the Python object handle from StarPU (Refer to the C API starpu_data_unregister()). This method will wait for all calculations to be finished before unregistering data.

The previous example can be coded as follows:

import numpy as np
import starpu
from starpu import Handle
starpu.init()
@starpu.access(a="RW", b="R")
def add(a,b):
for i in range(np.size(a)):
a[i] = a[i] + b[i]
a = np.array([1, 2, 3])
b = np.array([2, 4, 6])
a_h = Handle(a)
b_h = Handle(b)
a_h.acquire(mode = "R")
array([1, 2, 3])
a_h.release()
starpu.task_submit(ret_handle=True)(add, a_h, b_h)
a_h.acquire(mode = "R") # we get the up-to-date value
array([3, 6, 9])
a_h.release()
a_h.unregister()

24.4.3 Interface for Empty Numpy Array

We can register an empty numpy array by calling HandleNumpy(size, type). The default value for type is float64.

You will find below an example which defines the function assign taking two arrays as parameters, the second one being an empty array which will be assigned the values of the first array.

import numpy as np
import starpu
from starpu import Handle
from starpu import HandleNumpy
starpu.init()
@starpu.access(b="W")
def assign(a,b):
for i in range(min(np.size(a,0), np.size(b,0))):
for j in range(min(np.size(a,1), np.size(b,1))):
b[i][j] = a[i][j]
return b
a = np.array([[1, 2, 3], [4, 5, 6]])
a_h = Handle(a)
e_h = HandleNumpy((5,10), a.dtype)
res = starpu.task_submit(ret_handle=True)(assign, a_h, e_h)
e_h.acquire()
array([[1, 2, 3, 0, 0, 0, 0, 0, 0, 0],
       [4, 5, 6, 0, 0, 0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]])
e_h.release()

24.4.4 Array Partitioning

A n-dim numpy array can be split into several sub-arrays by calling the method Handle::partition(nchildren, dim, chunks_list) (Refer to the C API starpu_data_partition_plan()).

  • nchildren is the number of sub-handles,
  • dim is the dimension that we want to partition along, it can be 0 for vertical dimension, 1 for horizontal dimension, 2 for depth dimension, 3 for time dimension, ...etc.
  • chunks_list is a list containing the size of each segment. The total length of segments in this list must be equal to the length of the selected dimension.

The method will return a sub-handle list, each of the sub-handles can be used when submitting a task with task_submit(). This allows to process an array in parallel, once the execution of each sub-handle is finished, the result will be directly reflected in the original n-dim array.

When the sub-handles are no longer needed, the method Handle::unpartition(handle_list, nchildren) should be called to clear the partition and unregister all the sub-handles (Refer to the C API starpu_data_partition_clean()).

  • handle_list is the sub-handle list which was previously returned by the method Handle::partition(),
  • nchildren is the number of sub-handles.

Here is an example to use these methods.

import numpy as np
import starpu
from starpu import Handle
starpu.init()
@starpu.access(a="RW", b="R")
def add(a,b):
np.add(a,b,out=a)
n, m = 20, 10
arr = np.arange(n*m).reshape(n, m)
arr_h = Handle(arr)
arr_h.acquire(mode='RW')
 [[  0   1   2   3   4   5   6   7   8   9]
 [ 10  11  12  13  14  15  16  17  18  19]
 [ 20  21  22  23  24  25  26  27  28  29]
 [ 30  31  32  33  34  35  36  37  38  39]
 [ 40  41  42  43  44  45  46  47  48  49]
 [ 50  51  52  53  54  55  56  57  58  59]
 [ 60  61  62  63  64  65  66  67  68  69]
 [ 70  71  72  73  74  75  76  77  78  79]
 [ 80  81  82  83  84  85  86  87  88  89]
 [ 90  91  92  93  94  95  96  97  98  99]
 [100 101 102 103 104 105 106 107 108 109]
 [110 111 112 113 114 115 116 117 118 119]
 [120 121 122 123 124 125 126 127 128 129]
 [130 131 132 133 134 135 136 137 138 139]
 [140 141 142 143 144 145 146 147 148 149]
 [150 151 152 153 154 155 156 157 158 159]
 [160 161 162 163 164 165 166 167 168 169]
 [170 171 172 173 174 175 176 177 178 179]
 [180 181 182 183 184 185 186 187 188 189]
 [190 191 192 193 194 195 196 197 198 199]]
arr_h.release()
split_num = 3
arr_h_list = arr_h.partition(split_num, 1, [3,2,5]) # split into 3 sub-handles, and partition along the horizontal dimension
for i in range(split_num):
res=starpu.task_submit(ret_handle=True)(add, arr_h_list[i], arr_h_list[i])
arr_h.acquire(mode='RW')
[[  0   2   4  12  16  40  48  56  64  72]
 [ 80  88  96 104 112 120 128 136 144 152]
 [160 168 176 184 192 200 208 216 224 232]
 [240 248 256 264 272 280 288 296 304 312]
 [320 328 336 172 176 180 184 188 192 196]
 [200 204 208 212 216 220 224 228 232 236]
 [120 122 124 126 128 130 132 134 136 138]
 [140 142 144 146 148 150 152 154 156 158]
 [160 162 164 166 168 170 172 174 176 178]
 [180 182 184 186 188 190 192 194 196 198]
 [200 202 204 206 208 105 106 107 108 109]
 [110 111 112 113 114 115 116 117 118 119]
 [120 121 122 123 124 125 126 127 128 129]
 [130 131 132 133 134 135 136 137 138 139]
 [140 141 142 143 144 145 146 147 148 149]
 [150 151 152 153 154 155 156 157 158 159]
 [160 161 162 163 164 165 166 167 168 169]
 [170 171 172 173 174 175 176 177 178 179]
 [180 181 182 183 184 185 186 187 188 189]
 [190 191 192 193 194 195 196 197 198 199]]
arr_h.release()
arr_h.unpartition(arr_h_list, split_num)
arr_h.unregister()

The method Handle::get_partition_size(handle_list) can be used to get the array size of each sub-array.

arr_h_list = arr_h.partition(split_num, 1, [3,2,5])
arr_h.get_partition_size(arr_h_list)
[60, 40, 100]

The full script is available in starpupy/examples/starpu_py_partition.py.

24.5 Benchmark

This benchmark gives a glimpse into how long a task should be (in µs) for the StarPU Python interface overhead to be low enough to keep efficiency. Running starpupy/benchmark/tasks_size_overhead.sh generates a plot of the speedup of tasks of various sizes, depending on the number of CPUs being used.

In the first figure, the return value is a handle object. In the second figure, the return value is a future object. In the third figure, the return value is None.

For example, in the figure of returning handle object, for a 571 µs task (the green line), StarPU overhead is low enough to guarantee a good speedup if the number of CPUs is not more than 12. But with the same number of CPUs, a 314 µs task (the blue line) cannot have a correct speedup. We need to decrease the number of CPUs to about 8 if we want to keep efficiency.

(1) Returning handle object
(2) Returning future object
(3) Returning None

24.6 Running Python Functions as Pipeline Jobs (Imitating Joblib Library)

The StarPU Python interface also provides parallel computing for loops using multiprocessing, similarly to the Joblib Library that can simply turn out Python code into parallel computing code and thus increase the computing speed.

24.6.1 Examples

  • The most basic usage is to parallelize a simple iteration.

    from math import log10
    [log10(10 ** i) for i in range(10)]
    [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
    

    In order to spread it over several CPUs, you need to import the starpu.joblib module, and use its Parallel class:

    import starpu.joblib
    from math import log10
    starpu.init()
    starpu.joblib.Parallel(n_jobs=2)(starpu.joblib.delayed(log10)(10**i)for i in range(10))
    [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
    

    It is also possible to first create an object of the Parallel class, and then call starpu.joblib.delayed to execute the generator expression.

    import starpu.joblib
    from math import log10
    starpu.init()
    parallel=starpu.joblib.Parallel(n_jobs=2)
    parallel(starpu.joblib.delayed(log10)(10**i)for i in range(10))
    [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
    

  • Instead of a generator expression, a list of functions can also be submitted as a task through the Parallel class.

    import starpu.joblib
    starpu.init()
    #generate a list to store functions
    g_func=[]
    #function no input no output print hello world
    def hello():
    print ("Example 1: Hello, world!")
    g_func.append(starpu.joblib.delayed(hello)())
    #function has 2 int inputs and 1 int output
    def multi(a, b):
    res_multi = a*b
    print("Example 2: The result of ",a,"*",b,"is",res_multi)
    return res_multi
    g_func.append(starpu.joblib.delayed(multi)(2, 3))
    #function has 4 float inputs and 1 float output
    def add(a, b, c, d):
    res_add = a+b+c+d
    print("Example 3: The result of ",a,"+",b,"+",c,"+",d,"is",res_add)
    return res_add
    g_func.append(starpu.joblib.delayed(add)(1.2, 2.5, 3.6, 4.9))
    #function has 2 int inputs 1 float input and 1 float output 1 int output
    def sub(a, b, c):
    res_sub1 = a-b-c
    res_sub2 = a-b
    print ("Example 4: The result of ",a,"-",b,"-",c,"is",res_sub1,"and the result of",a,"-",b,"is",res_sub2)
    return res_sub1, res_sub2
    g_func.append(starpu.joblib.delayed(sub)(6, 2, 5.9))
    #input is iterable function list
    starpu.joblib.Parallel(n_jobs=2)(g_func)

    Execution:

    Example 3: The result of  1.2 + 2.5 + 3.6 + 4.9 is 12.200000000000001
    Example 1: Hello, world!
    Example 4: The result of  6 - 2 - 5.9 is -1.9000000000000004 and the result of 6 - 2 is 4
    Example 2: The result of  2 * 3 is 6
    [None, 6, 12.200000000000001, (-1.9000000000000004, 4)]
    

  • The function can also take array parameters.

    import starpu.joblib
    import numpy as np
    starpu.init()
    def multi_array(a, b):
    for i in range(len(a)):
    a[i] = a[i]*b[i]
    A = np.arange(10)
    B = np.arange(10, 20, 1)
    starpu.joblib.Parallel(n_jobs=2)(starpu.joblib.delayed(multi_array)((i for i in A), (j for j in B)))
    A

    Here the array A has not been modified.

    array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
    

    If we pass A directly as an argument, its value is updated

    starpu.joblib.Parallel(n_jobs=2)(starpu.joblib.delayed(multi_array)(A, B))
    A
    array([ 0, 11, 24, 39, 56, 75, 96, 119, 144, 171])
    

    In the next call, the value of A is also updated.

    starpu.joblib.Parallel(n_jobs=2)(starpu.joblib.delayed(multi_array)(b=(j for j in B), a=A))
    A
    array([ 0, 121, 288, 507, 784, 1125, 1536, 2023, 2592, 3249])
    

    The above three writing methods are equivalent and their execution time are very close. However, when using directly a numpy arrays, its value will be updated, this does not happen when generators are provided. When using a numpy array, it will be handled by StarPU with a data interface.

  • Here an example mixing scalar objects and numpy arrays or generator expressions.

    import starpu.joblib
    import numpy as np
    starpu.init()
    def scal(a, t):
    for i in range(len(t)):
    t[i] = t[i]*a
    A = np.arange(10)
    starpu.joblib.Parallel(n_jobs=2)(starpu.joblib.delayed(scal)(2, (i for i in A)))
    starpu.joblib.Parallel(n_jobs=2)(starpu.joblib.delayed(scal)(2,A))

    Again, the value of A is modified by the 2nd call.

    A
    array([ 0,  2,  4,  6,  8, 10, 12, 14, 16, 18])
    

The full script is available in starpupy/examples/starpu_py_parallel.py.

24.6.2 Parallel Parameters

The starpu.joblib.Parallel class accepts the following parameters:

  • mode (string, default: "normal")

    A string with the value "normal" or "future". With the "normal" mode, you can call starpu.joblib.Parallel directly without using the asyncio module, and you will get the result when the task is executed. With the "future" mode, when calling starpu.joblib.Parallel, you will get a Future object as a return value. By setting the parameter end_msg, the given message will be displayed when the result is ready, then you can call await to get the result. The asyncio module should be imported in this case.

    import starpu
    import asyncio
    from math import log10
    starpu.init()
    fut = starpu.joblib.Parallel(mode="future", n_jobs=3, end_msg="The result is ready!")(starpu.joblib.delayed(log10)(10**i)for i in range(10))
    The result is ready! <_GatheringFuture finished result=[[0.0, 1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]]>
    await fut
    [[0.0, 1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]]
    

  • end_msg (string, default: None)

    A message that will be displayed when the task is executed and the result is ready. When the parameter is unset, no message will be displayed when the result is ready. In any case, you need to perform awaiting to get the result.

  • n_jobs (int, default: None)

    The maximum number of concurrently running jobs. If -1 all CPUs are used. If 1 is given, no parallel computing code is used at all, which is useful for debugging. For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. Thus, for n_jobs = -2, all CPUs but one are used. None is a marker for ‘unset’ that will be interpreted as n_jobs=1 (sequential execution). n_cpus is the number of CPUs detected by StarPU on the running device.

  • perfmodel (string, default : None)

    Set the name of the performance model. This name will be used as the filename where the performance model information will be saved. After the task is executed, one can call the function starpu.perfmodel_plot() by giving the symbol of perfmodel to view its performance curve.

24.6.3 Performances

  • We compare the performances of the two methods for passing arguments to the starpu.joblib.delayed function. The first method defines a function that contains only scalars calculations, and then we pass a generator expression as an argument. The second method defines a function that contains arrays calculations, and then we pass either numpy arrays or generators as arguments. The second method takes less time.

    import starpu.joblib
    import numpy as np
    import time
    starpu.init()
    N=1000000
    def multi(a,b):
    res_multi = a*b
    return res_multi
    print("--First method")
    A = np.arange(N)
    B = np.arange(N, 2*N, 1)
    start_exec1 = time.time()
    start_cpu1 = time.process_time()
    starpu.joblib.Parallel(n_jobs=-1)(starpu.joblib.delayed(multi)(i,j) for i,j in zip(A,B))
    end_exec1 = time.time()
    end_cpu1 = time.process_time()
    print("the program execution time is", end_exec1-start_exec1)
    print("the cpu execution time is", end_cpu1-start_cpu1)
    def multi_array(a, b):
    for i in range(len(a)):
    a[i] = a[i]*b[i]
    return a
    print("--Second method with Numpy arrays")
    A = np.arange(N)
    B = np.arange(N, 2*N, 1)
    start_exec2 = time.time()
    start_cpu2 = time.process_time()
    starpu.joblib.Parallel(n_jobs=-1)(starpu.joblib.delayed(multi_array)(A, B))
    end_exec2 = time.time()
    end_cpu2 = time.process_time()
    print("the program execution time is", end_exec2-start_exec2)
    print("the cpu execution time is", end_cpu2-start_cpu2)
    print("--Second method with generators")
    A = np.arange(N)
    B = np.arange(N, 2*N, 1)
    start_exec3 = time.time()
    start_cpu3 = time.process_time()
    starpu.joblib.Parallel(n_jobs=-1)(starpu.joblib.delayed(multi_array)((i for i in A), (j for j in B)))
    end_exec3 = time.time()
    end_cpu3 = time.process_time()
    print("the program execution time is", end_exec3-start_exec3)
    print("the cpu execution time is", end_cpu3-start_cpu3)

    Execution:

    --First method
    the program execution time is 3.000865936279297
    the cpu execution time is 5.17138062
    --Second method with Numpy arrays
    the program execution time is 0.7571873664855957
    the cpu execution time is 0.9166007309999991
    --Second method with generators
    the program execution time is 0.7259719371795654
    the cpu execution time is 1.1182918959999988
    

  • Performance can also be shown with the performance model. Here an example with the function log10.

    from math import log10
    for x in [10, 100, 1000, 10000, 100000, 1000000]:
    for X in range(x, x*10, x):
    starpu.joblib.Parallel(n_jobs=-1, perfmodel="log_list")(starpu.joblib.delayed(log10)(i+1)for i in range(X))
    starpu.perfmodel_plot(perfmodel="log_list")

    If we use a numpy array as parameter, the calculation can withstand larger size, as shown below.

    from math import log10
    def log10_arr(t):
    for i in range(len(t)):
    t[i] = log10(t[i])
    return t
    for x in [10, 100, 1000, 10000, 100000, 1000000, 10000000]:
    for X in range(x, x*10, x):
    A = np.arange(1,X+1,1)
    starpu.joblib.Parallel(n_jobs=-1, perfmodel="log_arr")(starpu.joblib.delayed(log10_arr)(A))
    starpu.perfmodel_plot(perfmodel="log_arr")

24.7 Multiple Interpreters

It is possible to use multiple interpreters when running python applications. To do so, you need to set the variable STARPUPY_MULTI_INTERPRETER when running a StarPU Python application.

Python interpreters share the Global Interpreter Lock (GIL), which requires that at any time, one and only one thread has the right to execute a task. In other words, GIL makes the multiple interpreters execution of Python actually serial rather than parallel, and the execution of Python program is single-threaded essentially. Therefore, if the application is pure Python script, even with multi-interpreters, the program cannot be executed in parallel, unless an external C application is called.

Fortunately now there is a quite positive development. Python developers are preparing to implement stop sharing the GIL between interpreters (https://peps.nogil.dev/pep-0684/) or even make GIL optional so that Python code can be run without GIL (https://peps.nogil.dev/pep-0701/), that will facilitate true parallelism with the next Python version.

In order to transfer data between interpreters, the module cloudpickle is used to serialize Python objects in contiguous byte array. This mechanism increases the overhead of the StarPU Python interface, as shown in the following plots, to be compared to the plots given in Benchmark.

In the first figure, the return value is a handle object. In the second figure, the return value is a future object. In the third figure, the return value is None.

(1) Returning handle object
(2) Returning future object
(3) Returning None

In order to reflect this influence more intuitively, we make a performance comparison.

By default, StarPU uses virtually shared memory manager for Python objects supporting buffer protocol that allows to minimize data transfers. But in the case of multi-interpreter, if we do not use virtually shared memory manager, data transfer can be realized only with the help of cloudpickle.

We will show the operation performances below (Running test_handle_perf_pickle.sh). The operation that we test is numpy addition (numpy.add), and the array size is 10, 20, ..., 100, 200, ..., 1000, 2000, ..., 10000, 2000, ..., 100000,200000, ..., 1000000, 2000000, ..., 10000000, ..., 50000000. We compared three cases: first, using virtually shared memory manager, second, without using virtually shared memory manager, third, without using StarPU task submitting, but directly calling numpy.add function.

In the first figure, we compare the submission time when using StarPU and the execution time without using StarPU. We can see that there is still an obvious optimization using StarPU virtually shared memory manager when the test array size is large. However, if only using cloudpickle, StarPU Python interface cannot provide an effective optimization. And in the second figure, we can see that the same operation will take more time to finish the program execution when only using cloudpickle.

We can also define our own function to do the numpy operation, e.g. the element addition:

def add(a, b):
for i in range(np.size(a)):
a[i] = a[i] + b[i]

We will compare operation performances of the same three cases, but based on the custom function add(a, b).

We can see that the custom function takes more time than numpy function overall. Although the same operation still takes more time to submit the task when only using cloudpickle than with virtually shared memory manager, there is still a better optimization. The operation takes less time than only calling a custom function even when the array is not very large.

24.8 Master Slave Support

StarPU Python interface provides MPI master slave support as well. Please refer to MPI Master Slave Support for the specific usage.

When you write your Python script, make sure to import all required functions before the starpu module. Functions imported after the starpu module can only be submitted using their name as a string when calling task_submit(), this will decrease the submission efficiency.

(TODO)

24.9 StarPUPY and Simgrid

In simgrid mode, the Python interpreter will not be aware of simgrid and will thus not notify it when some thread is blocked waiting for something to happen in another thread. This notably means that the asyncio mode and waiting for a future will not work, and one thus has to use StarPUPY-provided functions to wait for completion, such as starpupy.task_wait_for_all() or data.acquire.

Also, we have not yet implemented not calling the actual call of the task function, so the execution time will be longer than in real execution, since not only it executes computations, but also sequentially, and adds the simulation overhead.