Developer's Guide

funflow provides a few task types (SimpleTask, StoreTask, and DockerTask) that will suffice for many pipelines, but the package facilitates creation of new task types as needed.

This tutorial aims to help prospective funflow developers get started with task type creation.

1. Creating your own task

In this tutorial, we will create a task called CustomTask by defining its type. We will define our own flow type, and write the functions needed to to run it.

Defining the new task

To define a task for our users, we first have to define a type that represents the task.

A task is represented by a generalized algebraic data type (GADT) of kind * -> * -> *.

In [1]:
-- Required language extensions
{-# LANGUAGE GADTs, StandaloneDeriving #-}

-- Define the representation of a custom task with some String and Int parameters
data CustomTask i o where
    CustomTask :: String -> Int -> CustomTask String String

-- Necessary in order to display it
deriving instance (Show i, Show o) => Show (CustomTask i o)

Here, we create a type CustomTask with type constructor CustomTask i o, and a value constructor CustomTask of type String -> Int -> CustomTask String String.

String -> Int -> SomeCustomTask String String means thatby providing a String and an Int, the function will give a task that takes a String as input and produces a String as output.

A new task can be created by using the value constructor:

In [2]:
-- An example of instantiation
CustomTask "someText" 42
CustomTask "someText" 42

However, a value created this way is a task, not a flow. To use this value in a flow, we need some more work.

From a task to a flow

The Flow type in fact comes from restricting the more general ExtendedFlow type, specifying a fixed collection of task types to support. These tasks types are those defined here in funflow: SimpleTask, StoreTask, and DockerTask, which are declared as RequiredStrands in Funflow.Flow.

In other words, a pipeline/workflow typed specifically as Flow may comprise tasks of these three types (and only these three), capturing the notion that it's these types with which a Flow is compatible. In order to manipulate a flow that can run our custom task (i.e., a value of a new task type), we need to create our own new flow type using ExtendedFlow, which is also defined in Funflow.Flow:

In [3]:
{-# LANGUAGE DataKinds, RankNTypes #-}
import Funflow.Flow (ExtendedFlow)

type MyFlow input output = ExtendedFlow '[ '("custom", CustomTask) ] input output

Prefixing the leading bracket or parenthesis, i.e. '[ ... ] and '( ... ), denotes a type-level list or tuple, respectively. This syntax is supported by the OverloadedLabels extension and is used to distinguish between the ordinary [] and () are data constructors, building values rather than types.

So with '[ '("custom", CustomTask) ], we build a type-level list of type-level tuple, "labeling" our custom task type with a name.

In kernmantle, such a tuple is called a strand, and the label facilitates disambiguation among different tasks with the same type.

Now that we have our own type of flow that uses our custom task, we can define how a value of our custom task should be stranded, using kernmantle:

In [4]:
{-# LANGUAGE OverloadedLabels #-}
import Control.Kernmantle.Rope (strand)

someCustomFlow :: String -> Int -> MyFlow String String
someCustomFlow x y = strand #custom (CustomTask x y)

This function is called a smart constructor. It facilitates the creation of a flow for a user without having to think about strands.

The #custom value is a Haskell label, and must match the string label associated to our task type in the flow type definition (here "custom").

In [5]:
myFlow :: MyFlow String String
myFlow = someCustomFlow "woop!" 7

Interpret a task

A strength of funflow is separation of the representation of a computation (task) from implementation of that task. More concretely, once it's created a task value has fixed input and output types, but what it does is not fixed. To specify that, we write an interpreter function.

An interpreter function is executed before running the flow. It takes a value of the task type that matches a particular strand (identified by the strand's label) and produces an actual implementation of the task, in compliance with the task's fixed input and output types.

In our case, we could define that our custom task CustomTask n s appends n times the string s to the input (which is a String):

In [6]:
import Control.Arrow (Arrow, arr)

-- Helper function that repeats a string n times
duplicate :: String -> Int -> String
duplicate s n = concat (replicate n s)

-- Our interpreter
interpretCustomTask :: (Arrow a) => CustomTask i o -> a i o
interpretCustomTask customTask = case customTask of
    CustomTask s n -> arr (\input -> input ++ duplicate s n)

What happens here is:

  1. We get the customTask of our type CustomTask.
  2. We consider the possible values. As we've defined it, CustomTask has only one value constructor, but in general a GADT may have multiple value constructors.
  3. Since our function is pure, we can simply wrap it inside of an Arrow using arr.

\input -> input ++ duplicate s n is the actual function that will be executed when running the pipeline.

In funflow, pure computation should be wrapped in a Arrow while IO operations should wrapped in a Kleisli IO.

Wrapping in an Arrow is done by using arr, while wrapping in a Kleisli IO is done by using liftKleisliIO.

funflow's interpreter functions are defined in the Funflow.Run module and can serve as examples as you write your own interpreter functions.

Run your custom flow

Now that we have defined a way to run our task, we might as well run our pipeline!

To run a pipeline typed as Flow, funflow provides runFlow. Since we've built--in order to include our custom task type--a different type of pipeline (MyFlow), though, in order to leverage runFlow we first need an additional step. We will use the weave' function from kernmantle.

In kernmantle, intepreting a task with a function is called weaving a strand.

There are multiple function available to weave strands (weave, weave', weave'', weaveK). Almost always, the one you want is weave'.

In [7]:
import Control.Kernmantle.Rope ((&), weave')
import Funflow.Flow (Flow)

weaveMyFlow myFlow = myFlow & weave' #custom interpretCustomTask

kernmantle's & operator allows us to "weave in," or "chain," multiple strands, e.g.:

weaveMyFlow myFlow = myFlow & weave' #custom1 interpretCustomTask1 & weave' #custom2 interpretCustomTask2

Now, we can run the resulting flow:

In [8]:
:opt no-lint
import Funflow.Run (runFlow)

runMyFlow :: MyFlow i o -> i -> IO o
runMyFlow myFlow input = runFlow (weaveMyFlow myFlow) input
In [9]:
runMyFlow myFlow "Kangaroo goes " :: IO String
"Kangaroo goes woop!woop!woop!woop!woop!woop!woop!"

We have to specify the type of the result IO String because of some issues with type inference when using GADTs.

Going further

See more about kernmantle here: https://github.com/tweag/kernmantle