Introduction

Hello.

These are notes for a 3 hour course (6 x 25 minute sessions) for experienced Parsl users who want to level-up their ability to use Parsl or to hack on the Parsl codebase by learning more about how Parsl works inside. This is a contrast to the user guide, which focuses on what Parsl looks like from the outside.

This text is not intended to be a comprehensive guide to all parts of the Parsl codebase: there is plenty more to learn about. This text is intended to give you a few places to begin exploring.

I’ll try to include links to relevant external resources: source code, other Parsl documentation, community talks, github issues, papers and other research work.

You don’t need any particular environment to follow along with these notes, but you might like to have a throw-away installation of Parsl 2024.09.02 that you can mess with when you find something interesting.

A sample task execution path

In this section, I’ll walk through the code path of a single Parsl task from invoking an app to running on an HighThroughputExecutor worker, and then sending the result back.

Here’s a simple workflow that you can run on a single computer. I’ll point out as we go which bits would run on a worker node when running on an HPC system - but otherwise, as far as this section is concerned there isn’t much difference between a single node and multiple node workflow run.

import parsl

def fresh_config():
  return parsl.Config(
    executors=[parsl.HighThroughputExecutor()],
  )

@parsl.python_app
def add(x: int, y: int) -> int:
  return x+y

with parsl.load(fresh_config()):
  print(add(5,3).result())

This is deliberately nothing fancy: there’s a config in my preferred style, with almost every parameter getting a suitable default value. All that is changed is to use the High-Throughput Executor, which is much more interesting than the default Thread Pool Executor; there is a single almost trivial app definition; and the code invokes it once, without any novelties like dependencies.

Now I will pick apart what happens in that expression near the end where app execution actually happens:

add(5,3).result()

I’m going to ignore quite a lot, though: the startup/shutdown process (for example, what happens with parsl.load() and what happens at the end of the with block), and I’m going to defer batch system interactions to another section (TODO: hyperlink blocks)

A python_app

@parsl.python_app
def add(x: int, y: int) -> int:
  return x+y

Normally def defines a function (or a method) in Python. With the python_app decorator, Parsl gets to change that into something slightly different: a “Python app” which mostly looks like function but with fancy Parsl bits added. The relevant fancy bit for this example is that instead of returning an int when invoked, add will instead return a Future[int] that will some time later get the result of the underlying function.

What happens when making this definition is that Python does make a regular function, but instead of binding the name add to that function, instead it passes it to a decorating function parsl.python_app. That decorating function is allowed to do pretty much anything, but in Parsl it replaces the function definition with a new PythonApp object, constructed from that underlying regular function (and a few other parameters).

[TODO: link to Python decorator description, perhaps in python docs?]

link to parsl.python_app source code:

https://github.com/Parsl/parsl/blob/3f2bf1865eea16cc44d6b7f8938a1ae1781c61fd/parsl/app/app.py#L108

[TODO: link to PythonApp code]

Later on, when the workflow executes this expression:

add(5,3)

what is being invoked here is the add PythonApp, not the underlying function that the workflow seemed to be defining.

What does it mean to call an object instead of a function (or method)? What happens is that Python looks on that object for a method called __call__ and invokes that method with all the parameters. Double-underscore methods are the standard way in Python for overriding things. The most common one is probably __repr__ but there are loads of them described throughout https://docs.python.org/3/reference/datamodel.html

The PythonApp implementation of __call__ doesn’t do too much: it massages arguments a bit but ultimately delegates all the work to the next component along, the Data Flow Kernel. The submit method returns immediately, also without executing anything. It returns a Future, app_fut, which PythonApp.__call__ returns to its own caller.

TODO: some different syntax highlighting/background to indicate this is from Parsl source code?

app_fut = dfk.submit(func, app_args=args,
                     executors=self.executors,
                     cache=self.cache,
                     ignore_for_cache=self.ignore_for_cache,
                     app_kwargs=invocation_kwargs,
                     join=self.join)

return app_fut

So what the decorator has mostly done is overload Python function syntax, so that it can be used to submit tasks to the Data Flow Kernel, which handles most of the interesting stuff to do with a task.

The three important parameters here are func - the underlying function that we want to execute, app_args - a list of positional arguments to be passed to that function, and app_kwargs - a dict of keyword arguments to be passed to that function. We’ll be moving these three structures around all over the place (and sometimes changing them) until the task is eventually executed.

The Data Flow Kernel

we can have a look at that method and see that to “invoke an app”, we call a method on the DataFlowKernel (DFK), the core object for a workflow (historically following the God-object antipattern).

inside the DFK:

  • create a task record and an AppFuture, and return that AppFuture to the user

  • (TODO: hyperlink to TaskRecord and describe it a bit more)

Then asynchronously:

  • perform “elaborations” - see elaborations chapter, but this is stuff like waiting for dependencies, and hooking in file staging

  • send the task to an Executor (TODO:hyperlink class docstring). in this case we aren’t specifying multiple executors, so the task will go to the default single executor which is an instance of the High Throughput Executor (TODO: hyperlink class docstring) - which generates an executor level future

  • wait for completion of execution (success or failure) signlled via the executor level future

  • a bit more post-execution elaboration

  • set the AppFuture result

dflow.py, where the data flow kernel lives, is the longest source file in the Parsl codebase, but most of what it does will be covered later on. For this example workflow, pretty much it sends the task straight on to the configured HighThroughputExecutor.

This is a callback driven state machine, which can be a bit hard to follow, especially when taking into account the various elaborations that happen.

HighThroughputExecutor.submit

so now lets dig into the high throughput executor. the dataflow kernel hands over control to whichever executor the user configured (the other options are commonly the thread pool executor (link) and work queue (link) although there are a few others included). but for this example we’re going to concentrate on the high throughput executor. If you’re a globus compute fan, this is the layer at which the globus compute endpoint attaches to the guts of parsl - so everything before this isn’t relevant for globus compute, but this bit about the high throughput executor is.

The data flow kernel will have performed some initialization on the high throughput executor when it started up, in addition to the user-specified configuration at construction time - (TODO: perhaps this is in enough of one place to link to in the DFK code?). for now, I’m going to assume that all the parts of the high throughput executor have started up correctly.

htex consists of a small part that runs in the user workflow process (TODO: do I need to define that as a process name earlier on in this chapter? it’s somethat that should be defined and perhaps there should be a glossary or index for this document for terms like that?) and several other processes.

The first process in the interchange (TODO: link to source code). This runs on the same host as the user workflow process and offloads task and result routing.

Beyond that, on each worker node on our HPC system, a copy of the process worker pool will be running. In this example workflow, our local system is the only worker node, so we should only expect to see one process worker pool, on the local system.

These worker pools connect back to the interchange using two network connections (ZMQ over TCP) - so on the interchange process you’ll need 2 fds per node - this is a common limitation to “number of nodes” scalability of Parsl. (see issue #3022 for a proposal to use one network connection per worker pool)

so inside htex.submit: we’re going to:

  • serialize the details of the function invocation (the function, the positional args and the keyword args) into a sequence of bytes. this is non-trivial even though everyone likes to believe it is magic and simple. In a later chapter I’ll talk about this in much more depth (TODO: link pickle)

  • send that byte sequence to the interchange over ZMQ

  • create and return an executor future back to the invoking DFK - this is how we’re going to signal to the DFK that the task is completed (with a result or failure) so it is part of the propagation route of results all the way back to the user.

The Interchange

The interchange matches up tasks with available workers: it has a queue of tasks, and it has a queue of process worker pool managers which are ready for work. so whenever a new task arrives from the user workflow process, or when a manager is ready for work, a match is made. there won’t always be available work or available workers so there are queues in the interchange.

the matching process so far has been fairly arbitrary but we have been doing some research on better ways to match workers and tasks. (TODO: what link here? if more stuff merged into Parsl, then the PR can be linkable. otherwise later on maybe a SuperComputing 2024 publication - but still unknown)

so now, the interchange sends the task over one of those two zmq-over-TCP connections I talked about earlier… and we’re now on the worker node where we’re going to run the task.

The Process Worker Pool

Generally, a copy of the process worker pool runs on each worker node. (other configurations are possible) and consists of a few closely linked processes:

the manager process which interfaces to the interchange (this is why you’ll see a jumble of references to managers or worker pools in the code: the manager is the externally facing interface to the worker pool)

worker processes - each worker process is a worker. there are a bunch of configuration parameters and algorithms to decide how many workers to run - this happens near the start of the process worker pool process in the manager code. (TODO: link to worker pool code that calculates number of workers)

the task arrives at the manager, and the manager dispatches it to a free worker. it is possible there isnt’ a free worker, becuase of the preloading feature for high throughput (TODO link to docstring) - and the task will have to wait in another queue here - but that is a rarely used feature.

the worker then deserialises the byte package that was originally serialized all the way back in the user submit process: we’ve got python objects for the function to run, the positional arguments and the keyword arguments.

so at this point, we invoke the function with those arguments (link to the f(*args, **kwargs) line)

and the user code runs! almost, but not quite, as if all of that hadn’t happened and we’d just invoked the underlying function without Parsl.

it’s probably going to end in two ways: a result or an exception (actually there is a common third way, which is that it kills the unix-level worker process for example by using far too much memory or by a library segfault - or by the batch job containing the worker pool reaching the end of its run time - that is handled, but I’m ignoring that here)

now we’ve got the task outcome - either a Python object that is the result, or a Python object that is the exception. We pickle that object and send it back to the manager, then to the interchange (over the other ZMQ-over-TCP socket) and then to the high throughput executor submit-side in the user workflow process.

Back on the submit side, there’s a high throughput executor process running listening on that socket. It gets the result package and sets the result into the executor future (TODO code reference). That is the mechanism by which the DFK sees that the executor has finished its work, and so that’s where the final bit of “task elaboration” (TODO: link to elaboration chapter) happens - the big elaboration here would be retries on failure, which is basically do that whole HTEX submission again and get a new executor future for the next try. (but other less common elaborations would be storing checkpointing info for this task, and file staging)

When that elaboration is finished (and didn’t do a retry), we can set that same result value into the AppFuture which all that long time ago was given to the user. And so now future.result() returns that results (or raises that exception), back in the user workflow, and the user can see the result.

So now we’re at the end of our simple workflow, and we pass out of the parsl context manager. that causes parsl to do various bits of shutdown. and then the user workflow process falls of the bottom and ends.

TODO: label the various TaskRecord state transitions (there are only a few relevant here) throughout this doc - it will play nicely with the monitoring DB chapter later, to they are reflected not only in the log but also in the monitoring database.

Blocks

In the task overview, I assumed that process worker pools magically existed in the right place: on the local machine with the example configuration, but on HPC worker nodes when running a more serious workflow.

The theme of this section is: how to get process worker pools running on the nodes where we want to do the work.

In this section, I’ll talk a little bit more about how that actually happens, using Parsl’s provider abstraction.

The configuration mechanisms talked about here are usually the most non-portable pieces of a Parsl workflow, because they are closely tied to the behaviour of particular HPC machines. And so it’s one of the most useful areas for admins and users to contribute documentation: for example, the Parsl user guide has a section with configurations for different machines, and ALCF and NERSC both maintain their own Parsl examples.

We don’t need to describe the work to be performed by the workers (at least, not much), because once the workers are running they’ll get their own work from the interchange, as I talked about in the previous section.

themes:

  • LRM providers

  • launchers

  • batch jobs

  • scaling strategies and error handling (two parts of the same feedback loop)

  • batch job environments (esp worker_init)

caveats:

launchers: note that in some batch systems, the batch script doesnt’ run on a worker node but on a separate management node, and anything big/serious should be launched with something like mpiexec or aprun - so that those things run on the allocated worker nodes.

Serializing tasks with Pickle and dill

TODO: an emphasis on the common parsl problems: (un)installed packages, functions and exceptions

intro should refer to not regarding this as magic, despite most people desperately hoping it is magic and then not trying to understand whats happening. this is needs a bit of programming language thinking, way more than routing “tasks as quasi-commandlines”

I’ll use the term pickling and serializing fairly interchangeably: serialization is the general word for turning something like an object (or graph of objects) into a stream of bytes. Pickling is a more specific form, using Python’s built in Serializing tasks with Pickle and dill library (TODO: hyperlink pickle).

As I mentioned in an earlier section, (TODO: backlink hyperlink?) when htex wants to send a function invocation to a worker, it serializes the function and its arguments into a byte sequence, and routes that to a worker, where that byte sequence is turned back into objects that are in some sense equivalent to the original objects. Task results follow a similar path, in reverse.

That serialization is actually mostly pluggable, but basically everyone uses some variant of pickle (most often the dill library) because that’s the default and there isn’t much reason to change.

For most things that look like simple data structures, pickling is pretty simple. For example, almost anything that you can imagine some obvious representation in JSON, plain pickle won’t have a problem.

There are a few areas where it helps to have some deeper understanding of whats going on, so that you don’t run into “mystery pickling errors because the magic is broken.”

Functions

you’ve probably got some notion of what it means to send a function across the network. and those preconceptions are almost definitely not how pickle, dill and parsl do it. So you need to put those preconceptions aside.

Exceptions

the big deal here is with trying to have custom data types, only having them on the remote side, but then not realising that an exception being raised is also a custom data type.

TODO: review my pickle talk, figure out what is relevant or not. maybe don’t need to talk about pickle VM opcodes, just the remote-execution facility at a higher level? and the import facility at a higher level? no need to talk about recursive objects - that’s not a user facing problem (unless you’re trying to build your own pickle scheme)

TODO: also mention cloudpickle as a dill-like pickle extension. They are both installable alongside each other… and people mostly haven’t given me decent argumetns for cloudpickle because people don’t dig much into understanding whats going on.

More info

I’ve talked about Pickle in more depth and outside of the Parsl context at PyCon Lithuania (TODO: link slides and video)

Proxystore - reference its use in Parsl, and reference a citation for just proxystore. TODO

Serialising functions is a hard part of programming languages, especially in a language that wasn’t designed for this, and parsl is constantly pushing up against those limits. have a look at https://www.unison-lang.org/ if you’re interested in languages which are trying to do this from the start.

Elaborating tasks

stuff that the DFK does to a task that isn’t “just run this task”

this section, i’m briefly going to talk about a few of these things that might seem quite different features, but from the perspective of the DFK they all have some “fiddle with the task the user submitted” feel - enough so that one way forwards is to abstract the internal architecture so they all look more similar to the DFK.

themes:

  • dependencies (including rich dependency resolving - but that should be an onwards mention of plugin points?)

these two are good to introduce together for the concept of tries (rather than 1:1 with task submission, 0 or many…)

  • retries

  • checkpointing

  • file staging (mention how these are a bit like fancy dependency substition)

  • join_app joining

  • monitoring resource wrapper

TODO: mention bash_apps which are a similar elaboration, but happen inside the bash_app decorator: beyond the decorator, no part of Parsl has any notion of a “bash app”

Summarise by me pointing out that in my mind (not necessarily in the architecture of Parsl) that from a core perspective these are all quite similar, even though the user effects are all very different. Which is a nice way to have an abstraction. And maybe that’s an interesting forwards architecture for Parsl one day…

Understanding the monitoring database

this should focus on making use of data in the monitoring database, not on how monitoring is architected, implemented.

give example of the visualizer

give examples of plotting in Python

give example of raw SQL

go through each table (and most fields in the tables) and try to put it in context of what we’ve seen before

Modularity and Plugins

which bits you can swap for other plugins: how and why

structuring of code within the parsl github repo. “why” includes sustainability work on different quality of code/maintenance

In the blocks section, (TODO crossref) I showed how different environments need different providers and launchers, but that the scaling code doesn’t care about how those providers and launchers do their work. This interface is a straightforward way to add support for new batch systems, either in the Parsl codebase itself, or following the interface but defined outside of the Parsl codebase.

if there’s a decision point that looks like a multi-way if statement - having a bunch of choices is a suggestion that choices you might not have implemented might also exist, and someone might want to put those in. various plugin points then look like “expandable if” statements. a good contrast is the launcher plugin interface, vs the hard-coded MPI plugin interface (cross reference issue to fix that)

it’s also a place to plug in “policies” - that is user-specified decisions (such as how to retry, using retry handlers) that take into account the ability of our users to write Python code as policy specifications.

Parsl exists as a library within the python ecosystem. Python exists as a user-facing language, not an internal implementation language. Our users are generally Python users (of varying degree of experience) and we can make use of that fact.

Doing that sort of stuff is what I’d expect as part of moving from being a tutorial-level user to a power user.

place for research/hacking - eg. want to do some research on doing X with workflows. Parsl has a role there as being the workflow system that exists that you can then modify to try out X, rather than writing your own toy workflow system. want to try out an idea. (example for parslfest: matthew chungs work involved very minimal changes to Parsl - including a new plugin interface! - for a nice outcome)

place for power users - see policies and decision points paragraph

place for supporting other non-core uses: for example Globus Compute makes use of the plugin API to use only htex and the lrm provider parts of Parsl, and can do that because of the plugin API, where it becomes its own plugin host for the relevant plugins.

Colophon

Written in rst

Rendered with sphinx

Edited with vi and vscode

This text was prepared against Parsl version 2024.09.02