class: middle, right # Reproducible Data Pipelines Using Controlled Effects ### A General Framework for Composable Task and Data Workflows **Yves Parès**
![logo](tweag_logo.svg) --- # Where do we come from? ## Tweag - A functional programming consultancy company: dev, infra-as-code & data-engineering projects - No prefered project scope (from a few weeks to a few years) and domain ## Me - Haskell dev & data engineer - Bioinformatics projects --- .smalltitle[A quick categorization of workflow tools (1/4)] ## _Task_ workflow - A DAG of tasks, eagerly executed one after the other - Similar to build systems - Parallelism possible when two tasks are independent Ex: Luigi, Airflow ## _Data_ pipeline - A DAG of data processors - Similar to lazy functions processing lists - All processors can run in parallel Ex: Apache Beam --- .smalltitle[A quick categorization of workflow tools (2/4)] ## Fine-grained tasks - Each node of the DAG: one elementary operation Ex: TensorFlow ## Coarse-grained tasks - Each node of the DAG: script, docker container... Ex: Luigi, Airflow --- .smalltitle[A quick categorization of workflow tools (3/4)] ## Static graphs Whole set of tasks to execute known in advance (If conditional branches, all possible branches must be known) Ex: TensorFlow (< 2.0), Luigi, AirFlow ## Dynamic graphs Nodes may be added to compute graph as processing goes Ex: PyTorch, Apache Spark --- .smalltitle[A quick categorization of workflow tools (4/4)] ## Config-time - Configuration (endpoints, data sources/sinks...) - Ahead-of-time checks (network, resources...) - Optimizations, graph rewriting - Static scheduling _Requires prior knowledge (metadata) about the computations that will be performed_ ## Process-time - Actual processing of the data - Dynamic scheduling --- # Scope of the work ## Our contribution An approach (_Kernmantle_) usable to implement both task and data workflows, at any level of granularity, and which can exploit prior knowledge... ...although only for static graphs. ## Application domain Simulation and data analytics applications in health sciences (pharmacology, biochemistry) Developed through projects with Pfizer & NovaDiscovery --- # Some buzzwords - Reproducibility (run twice, get the same result) - Homogeneicity (task and workflow: same type) - Composability (gluing two tasks together --> new task) - Configurability (tasks not tied to one environment) - Reusability (tasks can be imported, reused in many projects) - Separation of concerns (dev's and analyst's worlds separated) To sum it up: tasks usable like plain old functions ## How? Abstract effects and Arrows --- # Effects = Control over what tasks can do #### Train a model - Download dataset - Log stuff - Write result model #### Classify data - Download dataset - Download pre-trained model - Log stuff #### Simulate a system of equations - Log stuff - Write result matrix --- # Benefits of controlling effects - Uncontrollable, opaque arbitrary IO? Gone - Execution logic spreading through the pipeline? Gone - Tasks writing to bases/filesystems they shouldn't? Gone - Like static typing: more execution errors become compilation errors - High inspectability at config-time ==> _early-failure_ **==> Reproducibility** --- # Abstract binary effects An effect = _one API_ & _several possible implementations_ APIs expressed as Haskell datastructures (with 2 type params) Jargon: Implementation = _interpretation function_ or _handler_ ```haskell -- | An effect data Log a b where Log :: Log Text () -- | Another effect data FileIO a b where Read :: FilePath -> FileIO () Text Write :: FilePath -> FileIO Text () -- | A type that can actually do some work. EAGER, so only for _tasks_ newtype Core a b = Core (a -> IO b) -- | Two basic interpretation functions (handlers) log :: Log a b -> Core a b log Log = Core putStrLn fileIO :: FileIO a b -> Core a b fileIO (Read path) = Core (\_ -> readFile path) fileIO (Write path) = Core (\content -> writeFile path content) ``` --- # Less trivial effect ```haskell -- | The name/URL of some pre-trained model on TensorFlow Hub/PyTorch Hub type ModelName = String data Classify a b where Classify :: ModelName -> Classify (Vector Double) Int interpretClassify :: Classify a b -> Core a b interpretClassify (Classify modelName) = Core $ \inputData -> do -- Here we can do any IO we want, fetch URLs etc model <- downloadModel modelName classifyWithModel model inputData ``` Interpreter here does everything at process-time What if I want to know in advance the model name, check whether it exists, pre-download models etc _at config-time_? --- # Chaining effects together ## Arrows ```haskell type T x y task1a :: T A B task1b :: T A B' task2 :: T (B,B') C ``` ``` .--> task1a --. / \ [input] --- ---> task2 --> [output] \ / °--> task1b --° ``` ```haskell task1 :: T A (B,B') task1 = task1a &&& task1b -- Parallel composition & passthrough fullWorkflow :: T A C fullWorkflow = task1 >>> task2 -- Sequential composition ``` For big workflows, more convenient syntax available (arrow notation) --- # Effects and tasks (1/2) ![Hmmm](hmmm.jpg) --- # Effects and tasks (2/2) #### _Triggering_ an effect triggering = converting an effect into a task: ```haskell triggerEffect :: someEffect a b -> T a b triggerEffect = ... -- | Just read and log the contents of a file taskEcho = triggerEffect (Read "stuff.txt") >>> triggerEffect Log ``` #### Summary ``` triggering Arrow operators EFFECTS ------------> TASKS -----------------> WORKFLOW -----> CORE TYPE \ / °-------->---------interpretation---------->--------° ``` #### Remaining question What should type `T` (for tasks and workflows) be? --- # Our type for Tasks: `Rope` Works through Dependency Injection: ```haskell newtype Rope core mantle a b = Rope (Handlers core mantle -> core a b) instance (Arrow core) => Arrow (Rope core mantle) ``` - **Mantle:** Type-level collection of named effects ("strands") - **Handlers:** A collection of handlers (1 per strand) - **Core:** Target type of all handlers - `strand` function triggers effects #### Example task: ```haskell taskEcho :: (Arrow core, InMantle mantle "files" FileIO, InMantle mantle "log" Log) => Rope core mantle () () taskEcho = strand #files (Read "input.txt") >>> strand #log Log ``` --- # The Kernmantle Rope ![Kernmantle Rope](./kernmantle-rope.jpg) --- # Extracting info at config-time ```haskell data FileIO a b where Read :: FilePath -> FileIO () Text Write :: FilePath -> FileIO Text () data Classify a b where Classify :: ModelName -> Classify (Vector Double) Int ``` `FilePath`s and `ModelName`s knowable at config-time ## Two possible ways - Two-pass: write two sets of interpretation functions: - one for accumulating metadata - one for running - One-pass: change Core so that it can _both_ accumulate metadata AND perform computations _One-pass_ actually simpler. --- # Extracting info at config-time #### Example task ```haskell task = strand #files (Read "source.txt") >>> computeStuff >>> strand #files (Write "sink.txt") ``` #### Change Core so it can (1) gather file paths (2) allow to rebind them: ```haskell -- old: newtype Core a b = Core (a -> IO b) -- new: newtype CoreWithPaths a b = CoreWithPaths (HashMap FilePath FilePath -> a -> IO b ,[FilePath] ) -- new handler: interpFileIO2 :: FileIO a b -> CoreWithPaths a b interpFileIO2 (Read path) = CoreWithPaths (findAndRead, [path]) where findAndRead mappings () = readFile (mappings ! path) interpFileIO2 (Write path) = ... ``` **But can `CoreWithPaths` still be an Arrow?** --- # Enter the Black Magic... ```haskell newtype CoreWithPaths a b = CoreWithPaths (HashMap FilePath FilePath -> a -> IO b, [FilePath]) ``` is isomorphic to: ```haskell type CoreWithPaths2 = Cayley (Writer [FilePath]) (Cayley (Reader (HashMap FilePath FilePath)) (Kleisli IO)) ``` ...which is an Arrow, because: ```haskell instance (Monad m) => Arrow (Kleisli m) -- `base` package instance (Applicative f, Arrow a) => Arrow (Cayley f a) -- `profunctors` instance Applicative (Reader r) -- `transformers` instance (Monoid w) => Applicative (Writer w) -- `transformers` ``` --- # Moar Black Magic to Save the Day #### "BUT THIS IS UGLY AS \*\*\*\*!!!!" _-- You._ #### "THIS IS MADNESS!!!!" _-- You, again._ #### "OVER MY DEAD BODY!!!" _-- Still you._ And I agree, but... #### _...no need to let that ugly type invade the codebase:_ - Task/workflow users **only** see effects (not core type) - Devs don't have to repeatedly deal with it, thx to _DerivingVia_: ```haskell newtype CoreWithPaths a b = CoreWithPaths (HashMap FilePath FilePath -> a -> IO b, [FilePath]) deriving Arrow via Cayley (Writer [FilePath]) (Cayley (Reader (HashMap FilePath FilePath)) (Kleisli IO)) ``` --- # Example [github.com/GuillaumeDesforges/kernmantle-pytorch](https://github.com/GuillaumeDesforges/kernmantle-pytorch) Uses pre-trained PyTorch models to classify pictures: ```haskell -- Uses aforementioned arrow notation: workflow = proc () -> do image <- strand #pictures GetPicture -< "dog.jpg" -- Some task that will succeed: predictWith "inception_v3" -< image -- Some task that will fail, model doesn't exist: predictWith "rubbish" -< image where predictWith model = strand #predict $ Predict $ TorchHubModel "pytorch/vision:v0.6.0" model [...] dockerSpec = DockerSpec "pytorch/pytorch" ["scipy"] -- Used by handlers ``` - Handlers call python scripts (in docker container) - Needed docker images are _built_ at config-time - Needed models downloaded at config-time --- .smalltitle[Conclusion] #### Unmentioned features - _Re-interpretation_ of effects (effect handlers trigger same/other effects) - Different environments (test, production) = different sets of handlers - Caching - Integration with common Haskell libs/abstractions - Other structures than Arrow usable (ArrowChoice/Loop, etc) #### Ongoing & future work - `Kernmantle`'s v1 release - [`Funflow2`](https://github.com/tweag/funflow2) (high-level API for datasci _task_ workflows w/ predef. effects) - Benchmarking (overhead small but still) - Distributed tasks: WIP - Experiments on more flexible frameworks than Arrows #### Refs - Kermantle source: [github.com/tweag/kernmantle](https://github.com/tweag/kernmantle) - Paper: [richarde.dev/papers/2020/workflows/workflows.pdf](https://richarde.dev/papers/2020/workflows/workflows.pdf) - Tweag: [tweag.io](http://tweag.io) - Me: [LinkedIn](https://www.linkedin.com/in/yves-par%C3%A8s-88315549/) ; [github.com/YPares](https://github.com/YPares) --- # Leverage existing Applicatives/Monads ```haskell newtype ArrowFromEffects f m a b = ArrowFromEffects (f (a -> m b)) ``` Combination of: - Applicative effects `f` (config-time) - Monadic effects `m` (process-time) Defining applicative and monadic effects easy: lots of them around, lots of libraries. ### Very relevant for code decomposition and reusability --- # Comparison with Applicative _In case of no monadic effects_, one could argue that: ```haskell type Arr a b = f (a -> b) ``` Same power? Do we _really_ need arrows? (= Couldn't we have an equivalent for every class this way?) ### Let's try to implement the Arrow stack with that type: ```haskell Category -- Yes Arrow -- Yes ArrowZero -- Alternative is there, so yes ArrowPlus -- Yes, for the same reason ArrowLoop -- Yes ArrowChoice -- Yes... But, it doesn't have the right semantics ArrowApply -- Clearly no ``` `ArrowChoice` doesn't have the right operational semantics (though it satisfies Arrow laws): you can't shortcut effects. --- # Comparison with Applicative (2) ### Wait! Now we have `Selective`!! ...Yeah. Still no ArrowChoice, sorry... ```haskell branch :: (Selective f) => f (a -> c) -> f (b -> d) -> f (Etiher a b) -> f (Either c d) (|||) :: (ArrowChoice arr) => arr a c -> arr b d -> arr (Either a b) (Either c d) ``` If `arr a b = f (a -> b)` that gives: ```haskell branch :: f (a -> c) -> f (b -> d) -> f (Etiher a b) -> f (Either c d) (|||) :: f (a -> c) -> f (b -> d) -> f (Either a b -> Either c d) ``` `(|||)` with `(<*>)` gives you `branch`, but no way back possible. #### Every `ArrowChoice` can give you a `Selective`, but not the reverse :/ --- # (3) Lifting pure and impure functions ### ...with Applicatives ```haskell pure :: (a -> b) -> F (a -> b) impure :: (a -> IO b) -> F (a -> b) -- IMPOSSIBLE ``` To have same end type, need to force everything into `IO`: ```haskell pure' :: (a -> b) -> F (a -> IO b) pure' f = pure (return . f) impure' :: (a -> IO b) -> F (a -> IO b) impure' = pure ``` ...but you can't abstract over the underlying monad anymore (user sees it in the type) ### ...with Arrows ```haskell arr :: (a -> b) -> Arrow a b impureArr :: (a -> IO b) -> Arrow a b ```