previous up next
Go backward to CASA
Go up to Top
Go forward to Parallel Resultant Computation
RISC-Linz logo

Distributed Maple

Distributed Maple is an environment for writing parallel programs in the computer algebra system Maple. It allows to create concurrent tasks and to execute them by Maple kernels running on various machines of a network. In the following, we are going to sketch the use of the system, its programming interface, and its software architecture. For more details, see [3].

A Sample Session

A Distributed Maple Session
 

The user interacts with Distributed Maple via a conventional Maple frontend (text or graphical), i.e., she operates within the familiar Maple environment for writing and executing parallel programs. Maple commands establish a distributed session in which tasks are created for execution on any connected machine (see Figure *). The session trace below demonstrates the use of the environment by a simple example:

milkyway!> maple
    |\^/|     Maple V Release 5 (Universitae...
._|\|   |/|_. Copyright (c) 1981-1997 by Wat...
 \  MAPLE  /  reserved. Maple and Maple V ar...
 <____ ____>  Waterloo Maple Inc.
      |       Type ? for help.
> read `dist.maple`;
Distributed Maple V1.0.7 (c) 1998 Wolfgang S...
See http://www.risc.uni-linz.ac.at/software/...
> dist[initialize]
>   ([[gintonic,linux], [andromeda,octane]]);
connecting gintonic...
connecting andromeda...
                          okay

> t1 := dist[start](int, x^n, x);
                        t1 := 0

> t2 := dist[start](int, x^n, n);
                        t2 := 1

> dist[wait](t1) + dist[wait](t2);     
                     (n + 1)     n
                    x           x
                    -------- + -----
                     n + 1     ln(x)

> dist[terminate]();
                         okay

> quit;

We first load the file dist.maple which implements the interface to the distributed backend by a Maple package dist. By issuing the command dist[initialize], we ask the system to start the distributed backend and create two additional Maple kernels on machine gintonic of type linux and on machine andromeda of type octane, respectively. The machine types are used to lookup the system-specific startup information which is located in a file dist.systems in the current working directory.

After the distributed session has been successfully established, calls of dist[start] create two tasks evaluating the Maple expressions int(x^n, x) and int(x^n, n), respectively. The two dist[wait] calls block the current execution until the corresponding tasks have terminated and then return their results. Finally, the distributed session is closed by a call of dist[terminate]. If the user issues during the session the command dist[visualize], a window pops up that displays online the state of each machine and the total utilization as shown in Figure *. By issuing a command dist[trace] a trace file is produced that may be used after program execution to generate corresponding diagrams for printing (as those in Section *).

Session Visualization
 

Programming Interface

Session Initialization

In addition to the commands dist[initialize] and dist[terminate] that establish a distributed session, we need a possibility to initialize the Maple kernels on all connected machines by loading user code and program libraries.
dist[all](command)
lets the Maple statement command be executed on every Maple kernel connected to the distributed session. If this command refers to a particular file, (a copy of) this file must be visible on every machine participating in the session.

Functional Parallelism

The parallel programming model is essentially based on functional principles which is sufficient for many kinds of computer algebra algorithms:
dist[start](f, a, ...)
creates a task evaluating the expression f(a, ...) and returns a reference t to this task. The execution of f may take place on any machine connected to the distributed session and should therefore not cause any side effects. Tasks may create other tasks and arbitrary Maple objects (including task references) may be passed as task arguments and returned as task results.
dist[wait](t)
blocks the execution of the current task until the task represented by t has terminated and returns its result. Multiple tasks may independently wait for and retrieve the result of the same task t.
When and on which machine a task is scheduled for execution is entirely in the responsibility of the underlying runtime system.

Non-Determinism and Speculation

The performance of a parallel program may be improved by processing the results of a set of tasks not in a predetermined order but in the order in which they happen to arrive. Therefore we need a non-deterministic form of task synchronization; this is especially useful in speculative algorithms where the results of some tasks may turn out to be not of interest any more:
dist[select](tlist)
blocks the execution of the current task until any task t in the list of task handles tlist has terminated and returns a list r such that r[1] is the result of t and r[2] is the index of t in tlist.
dist[delete](t)
announces that the result of task t is not required any more. If this task has not yet started execution, it will be deleted from the system (however, if it has already started, it will continue execution until termination).

Shared Data Objects

If a parallel program processes large data in multiple phases with task interaction between phases, it may be more efficient to let tasks preserve their states across phases rather than creating for every phase new tasks to which the corresponding data have to be passed. Therefore we introduce a concept that allows tasks to interact in a safe way by side effects.
dist[data]()
creates an empty shared data object and returns its handle d. Any task may use d to read from or write to the shared data object no matter on which machine the task is executed.
dist[get](d)
blocks the execution of the current task until the data object referenced by d is non-empty and then returns its content. Multiple tasks may independently wait for and retrieve the result of the same data object d.
dist[put](d, v)
writes the value v (which may be any Maple object including tasks and data handles) into the shared data object referenced by d (overwriting any previously written value). All tasks blocked on d get released.
dist[clear](d)
empties the shared data object referenced by d.
Shared data objects may be used to implement various forms of inter-task communication, such as shared memory, single assignment objects, communication channels and non-strict lists (streams).

Software Architecture

The core of Distributed Maple is a scheduler program which is implemented in Java (package dist with main class Scheduler) and is completely independent and even unaware of Maple; it can in fact embed and schedule tasks from any kind of computation kernels that implement a specific communication protocol. Correspondingly each node connected to a Distributed Maple session comprises two components (see Figure *):

Scheduler

This program coordinates the interaction between nodes and schedules tasks among nodes. The initial scheduler process (invoked from the Maple kernel attached to the user frontend) reads all application-specific information from the configuration file dist.systems; it then starts instances of the scheduler on other machines and communicates with them via Internet sockets.

Maple Interface

The Maple file dist.maple read by every Maple kernel implements the interface between Maple kernel and scheduler. Communication between both components is based on Unix pipes to which and from which messages are written; these messages may embed Maple expressions (in the compact linear format that Maple uses for library files).

After a distributed session has been established, every scheduler instance accepts tasks from the attached computation kernel and schedules these tasks among all machines connected to the session. A task is a pair (taskid, exp) where taskid is an integer identifying the task and exp is an uninterpreted string to be submitted to some kernel for evaluation. Initially, every scheduler informs its kernel about the range of task identifiers it may use for assignment to new tasks such that all kernels in the system can independently create new tasks. Each instance of the scheduler holds a local task queue, a result table, and a result cache; their function is explained below.

Multi-Threading

The scheduler is implemented by a number of concurrent threads as shown in Figure *. Threads listening on all input channels put the received messages into a central buffer from where a server thread takes and processes them and creates new messages that are placed in some of the output buffers. Threads listening on these buffers take these messages and send them to the corresponding output channels. The multithreaded implementation of the communication interface is intended to maximize the overall throughput of the system.

The Task Scheduler
 

Task Scheduling

All remote schedulers send new tasks to the central scheduler which distributes them among all machines. Currently a simple load balancing scheme is used where the central scheduler assigns new tasks to remote schedulers until the number of not completed tasks reaches an upper bound; a remote scheduler asks for new tasks whenever the number of received but not yet started tasks falls below a lower bound. By this "watermark" scheme, the communication latency for the transfer of a new task (after termination of a task) can be masked by the execution of an already received task.

Task Results

When a new task is created, the scheduler allocates a corresponding "empty" slot in the result table (which will be filled with the result value when the corresponding task will have completed execution). Thus the result of a task is always stored on that machine where the task has been created (not necessarily where it is eventually executed); from the identifier of a task, the scheduler can determine the node holding a task result and correspondingly route requests. Additionally each scheduler holds a cache of all results that it has ever seen such that some requests to non-local task results can be immediately satisfied.

Recursive Server Loops

If the scheduler cannot immediately deliver a task result requested by the attached computation kernel, it instead sends a new task for execution. The Maple kernel continues with the execution of the new task (by recursive invocation of the server loop) until this task has been completed. If the originally requested result is then available, the kernel continues with the execution of the previous task; otherwise it receives another task for execution. In this way every kernel (also the kernel connected to the user interface) permanently computes as long as sufficiently many tasks are available.

Watchdogs

A watchdog thread in every remote scheduler controls in regular intervals if messages have been received from the central scheduler. If during the last control period no message has been received, the watchdog sends a "ping" message to the central scheduler. If during the subsequent control period no reply is received, the watchdog assumes that the connection is broken and aborts the external application process and the scheduler process. Thus we ensure that broken sessions do not lead to stalled remote processes (which becomes a problem in distributed environments).


Maintained by: Wolfgang Schreiner
Last Modification: April 22, 1999

previous up next