{-# LANGUAGE Arrows #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedLabels #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeOperators #-}
module Funflow.Run
( runFlow,
runFlowWithConfig,
RunFlowConfig (..),
)
where
import Control.Arrow (Arrow, arr)
import Control.Exception.Safe (throw, throwString)
import Control.Kernmantle.Caching (localStoreWithId)
import Control.Kernmantle.Parallel (performP)
import Control.Kernmantle.Rope
( HasKleisliIO,
LooseRopeWith,
liftKleisliIO,
runReader,
untwine,
weave',
(&),
type (~>),
)
import Control.Monad.Except (runExceptT)
import Data.CAS.ContentHashable (DirectoryContent (DirectoryContent))
import qualified Data.CAS.ContentStore as CS
import qualified Data.CAS.RemoteCache as RC
import Data.Either (isLeft, lefts, partitionEithers)
import qualified Data.HashSet as HashSet
import qualified Data.Map.Lazy as Map
import Data.Maybe (mapMaybe)
import Data.Profunctor.Trans (Reader, Writer, reading, runWriter, writing)
import Data.Set (fromList)
import Data.String (IsString (fromString))
import qualified Data.Text as T
import Docker.API.Client
( ContainerLogType (..),
ContainerSpec (cmd),
OS (OS),
awaitContainer,
defaultContainerSpec,
hostVolumes,
newDefaultDockerManager,
printContainerLogs,
pullImage,
runContainer,
saveContainerArchive,
workingDir,
)
import Funflow.Config (ConfigKeysBySource (..), Configurable (..), ExternalConfig (..), missing, readEnvs, readYamlFileConfig)
import Funflow.Flow (RequiredCore, RequiredStrands)
import Funflow.Run.Orphans ()
import Funflow.Tasks.Docker
( Arg (Arg, Placeholder),
DockerTask (DockerTask),
DockerTaskConfig (DockerTaskConfig),
DockerTaskInput (DockerTaskInput),
VolumeBinding (VolumeBinding),
getIdFromArg,
renderArg,
)
import qualified Funflow.Tasks.Docker as DE
import Funflow.Tasks.Simple (SimpleTask (IOTask, PureTask))
import Funflow.Tasks.Store (StoreTask (GetDir, PutDir))
import GHC.Stack (HasCallStack)
import Network.HTTP.Client (Manager)
import Path (Abs, Dir, File, Path, absdir, parseAbsDir, parseRelDir, toFilePath, (</>))
import Path.IO (copyDirRecur)
import System.Directory (removeDirectory)
import System.Directory.Funflow (moveDirectoryContent)
import System.IO.Temp (withSystemTempDirectory)
import System.Info (os)
import System.PosixCompat.User (getEffectiveGroupID, getEffectiveUserID)
data RunFlowConfig = RunFlowConfig
{ RunFlowConfig -> Path Abs Dir
storePath :: !(Path Abs Dir),
RunFlowConfig -> Maybe (Path Abs File)
configFile :: !(Maybe (Path Abs File))
}
runFlowWithConfig ::
RunFlowConfig ->
LooseRopeWith RequiredStrands (RequiredCore IO) input output ->
input ->
IO output
runFlowWithConfig :: RunFlowConfig
-> LooseRopeWith RequiredStrands (RequiredCore IO) input output
-> input
-> IO output
runFlowWithConfig RunFlowConfig
config LooseRopeWith RequiredStrands (RequiredCore IO) input output
flow input
input =
let
RunFlowConfig {Path Abs Dir
storePath :: Path Abs Dir
storePath :: RunFlowConfig -> Path Abs Dir
storePath, Maybe (Path Abs File)
configFile :: Maybe (Path Abs File)
configFile :: RunFlowConfig -> Maybe (Path Abs File)
configFile} = RunFlowConfig
config
defaultCachingId :: Maybe Int
defaultCachingId = Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1
in
Path Abs Dir -> (ContentStore -> IO output) -> IO output
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
Path Abs Dir -> (ContentStore -> m a) -> m a
CS.withStore Path Abs Dir
storePath ((ContentStore -> IO output) -> IO output)
-> (ContentStore -> IO output) -> IO output
forall a b. (a -> b) -> a -> b
$ \ContentStore
store ->
do
Manager
manager <- OS -> IO Manager
newDefaultDockerManager (String -> OS
OS String
os)
let
weavedPipeline :: Cayley
(Writer [Text])
(Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO)))))
input
output
weavedPipeline =
LooseRope
'[ '("docker", DockerTask), '("store", StoreTask),
'("simple", SimpleTask)]
(Cayley
(Writer [Text])
(Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO))))))
input
output
LooseRopeWith RequiredStrands (RequiredCore IO) input output
flow
LooseRope
'[ '("docker", DockerTask), '("store", StoreTask),
'("simple", SimpleTask)]
(Cayley
(Writer [Text])
(Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO))))))
input
output
-> (LooseRope
'[ '("docker", DockerTask), '("store", StoreTask),
'("simple", SimpleTask)]
(Cayley
(Writer [Text])
(Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO))))))
input
output
-> LooseRope
'[ '("store", StoreTask), '("simple", SimpleTask)]
(Cayley
(Writer [Text])
(Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO))))))
input
output)
-> LooseRope
'[ '("store", StoreTask), '("simple", SimpleTask)]
(Cayley
(Writer [Text])
(Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO))))))
input
output
forall a b. a -> (a -> b) -> b
& Label "docker"
-> (DockerTask
:-> Cayley
(Writer [Text])
(Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO))))))
-> LooseRope
'[ '("docker", DockerTask), '("store", StoreTask),
'("simple", SimpleTask)]
(Cayley
(Writer [Text])
(Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO))))))
:-> LooseRope
'[ '("store", StoreTask), '("simple", SimpleTask)]
(Cayley
(Writer [Text])
(Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO))))))
forall (name :: Symbol) (binEff :: * -> * -> *)
(mantle :: [(Symbol, * -> * -> *)]) (core :: * -> * -> *).
Label name
-> (binEff :-> core)
-> LooseRope ('(name, binEff) : mantle) core
:-> LooseRope mantle core
weave' IsLabel "docker" (Label "docker")
Label "docker"
#docker (Manager
-> ContentStore
-> DockerTask a b
-> (~>)
(Writer [Text])
(Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO)))))
a
b
forall (core :: * -> * -> *) (m :: * -> *) i o.
(Arrow core, HasKleisliIO m core, HasCallStack) =>
Manager
-> ContentStore
-> DockerTask i o
-> (~>)
(Writer [Text])
(Writer ConfigKeysBySource
~> (Reader ExternalConfig ~> (Writer [String] ~> core)))
i
o
interpretDockerTask Manager
manager ContentStore
store)
LooseRope
'[ '("store", StoreTask), '("simple", SimpleTask)]
(Cayley
(Writer [Text])
(Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO))))))
input
output
-> (LooseRope
'[ '("store", StoreTask), '("simple", SimpleTask)]
(Cayley
(Writer [Text])
(Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO))))))
input
output
-> LooseRope
'[ '("simple", SimpleTask)]
(Cayley
(Writer [Text])
(Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO))))))
input
output)
-> LooseRope
'[ '("simple", SimpleTask)]
(Cayley
(Writer [Text])
(Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO))))))
input
output
forall a b. a -> (a -> b) -> b
& Label "store"
-> (StoreTask
:-> Cayley
(Writer [Text])
(Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO))))))
-> LooseRope
'[ '("store", StoreTask), '("simple", SimpleTask)]
(Cayley
(Writer [Text])
(Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO))))))
:-> LooseRope
'[ '("simple", SimpleTask)]
(Cayley
(Writer [Text])
(Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO))))))
forall (name :: Symbol) (binEff :: * -> * -> *)
(mantle :: [(Symbol, * -> * -> *)]) (core :: * -> * -> *).
Label name
-> (binEff :-> core)
-> LooseRope ('(name, binEff) : mantle) core
:-> LooseRope mantle core
weave' IsLabel "store" (Label "store")
Label "store"
#store (ContentStore
-> StoreTask a b
-> Cayley
(Writer [Text])
(Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO)))))
a
b
forall (a :: * -> * -> *) (m :: * -> *) i o.
(Arrow a, HasKleisliIO m a, Cacher m NoCache) =>
ContentStore -> StoreTask i o -> a i o
interpretStoreTask ContentStore
store)
LooseRope
'[ '("simple", SimpleTask)]
(Cayley
(Writer [Text])
(Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO))))))
input
output
-> (LooseRope
'[ '("simple", SimpleTask)]
(Cayley
(Writer [Text])
(Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO))))))
input
output
-> LooseRope
'[]
(Cayley
(Writer [Text])
(Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO))))))
input
output)
-> LooseRope
'[]
(Cayley
(Writer [Text])
(Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO))))))
input
output
forall a b. a -> (a -> b) -> b
& Label "simple"
-> (SimpleTask
:-> Cayley
(Writer [Text])
(Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO))))))
-> LooseRope
'[ '("simple", SimpleTask)]
(Cayley
(Writer [Text])
(Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO))))))
:-> LooseRope
'[]
(Cayley
(Writer [Text])
(Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO))))))
forall (name :: Symbol) (binEff :: * -> * -> *)
(mantle :: [(Symbol, * -> * -> *)]) (core :: * -> * -> *).
Label name
-> (binEff :-> core)
-> LooseRope ('(name, binEff) : mantle) core
:-> LooseRope mantle core
weave' IsLabel "simple" (Label "simple")
Label "simple"
#simple SimpleTask
:-> Cayley
(Writer [Text])
(Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO)))))
forall (a :: * -> * -> *) (m :: * -> *) i o.
(Arrow a, HasKleisliIO m a) =>
SimpleTask i o -> a i o
interpretSimpleTask
LooseRope
'[]
(Cayley
(Writer [Text])
(Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO))))))
input
output
-> (LooseRope
'[]
(Cayley
(Writer [Text])
(Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO))))))
input
output
-> Cayley
(Writer [Text])
(Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO)))))
input
output)
-> Cayley
(Writer [Text])
(Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO)))))
input
output
forall a b. a -> (a -> b) -> b
& LooseRope
'[]
(Cayley
(Writer [Text])
(Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO))))))
input
output
-> Cayley
(Writer [Text])
(Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO)))))
input
output
forall (core :: * -> * -> *). LooseRope '[] core :-> core
untwine
([Text]
dockerImages, Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO))))
input
output
pipelineWithDockerConfigWriter) = Cayley
(Writer [Text])
(Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO)))))
input
output
-> ([Text],
Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO))))
input
output)
forall w (eff :: * -> * -> *) a b.
(~>) (Writer w) eff a b -> (w, eff a b)
runWriter Cayley
(Writer [Text])
(Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO)))))
input
output
weavedPipeline
(ConfigKeysBySource
dockerConfigs, Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String]) (Cayley (Reader LocalStoreWithId) (PKleisli IO)))
input
output
pipelineWithDockerConfigReader) = Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO))))
input
output
-> (ConfigKeysBySource,
Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String]) (Cayley (Reader LocalStoreWithId) (PKleisli IO)))
input
output)
forall w (eff :: * -> * -> *) a b.
(~>) (Writer w) eff a b -> (w, eff a b)
runWriter Cayley
(Writer ConfigKeysBySource)
(Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO))))
input
output
pipelineWithDockerConfigWriter
requiredConfigs :: ConfigKeysBySource
requiredConfigs = [ConfigKeysBySource] -> ConfigKeysBySource
forall a. Monoid a => [a] -> a
mconcat [ConfigKeysBySource
dockerConfigs]
ConfigMap
fileConfig <- case Maybe (Path Abs File)
configFile of
Maybe (Path Abs File)
Nothing -> ConfigMap -> IO ConfigMap
forall (f :: * -> *) a. Applicative f => a -> f a
pure ConfigMap
forall a. Monoid a => a
mempty
Just Path Abs File
path -> String -> IO ConfigMap
forall (m :: * -> *) a. (MonadIO m, FromJSON a) => String -> m a
readYamlFileConfig (String -> IO ConfigMap) -> String -> IO ConfigMap
forall a b. (a -> b) -> a -> b
$ Path Abs File -> String
forall b t. Path b t -> String
toFilePath Path Abs File
path
KeyMap String
envConfig <- [Text] -> IO (KeyMap String)
forall (m :: * -> *). MonadIO m => [Text] -> m (KeyMap String)
readEnvs ([Text] -> IO (KeyMap String)) -> [Text] -> IO (KeyMap String)
forall a b. (a -> b) -> a -> b
$ HashSet Text -> [Text]
forall a. HashSet a -> [a]
HashSet.toList (HashSet Text -> [Text]) -> HashSet Text -> [Text]
forall a b. (a -> b) -> a -> b
$ ConfigKeysBySource -> HashSet Text
envConfigKeys ConfigKeysBySource
dockerConfigs
let externalConfig :: ExternalConfig
externalConfig = ExternalConfig :: ConfigMap -> KeyMap String -> ExternalConfig
ExternalConfig {fileConfig :: ConfigMap
fileConfig = ConfigMap
fileConfig, envConfig :: KeyMap String
envConfig = KeyMap String
envConfig}
missingConfigs :: [Text]
missingConfigs = ExternalConfig -> ConfigKeysBySource -> [Text]
missing ExternalConfig
externalConfig ConfigKeysBySource
requiredConfigs
if Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ [Text] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Text]
missingConfigs
then String -> IO ()
forall (m :: * -> *) a.
(MonadThrow m, HasCallStack) =>
String -> m a
throwString (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Missing the following required config keys: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ [Text] -> String
forall a. Show a => a -> String
show [Text]
missingConfigs
else IO ()
forall a. Monoid a => a
mempty :: IO ()
let
([String]
configErrors, Cayley (Reader LocalStoreWithId) (PKleisli IO) input output
weavePipeline') = (~>)
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO))
input
output
-> ([String],
Cayley (Reader LocalStoreWithId) (PKleisli IO) input output)
forall w (eff :: * -> * -> *) a b.
(~>) (Writer w) eff a b -> (w, eff a b)
runWriter ((~>)
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO))
input
output
-> ([String],
Cayley (Reader LocalStoreWithId) (PKleisli IO) input output))
-> (~>)
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO))
input
output
-> ([String],
Cayley (Reader LocalStoreWithId) (PKleisli IO) input output)
forall a b. (a -> b) -> a -> b
$ ExternalConfig
-> Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String]) (Cayley (Reader LocalStoreWithId) (PKleisli IO)))
input
output
-> (~>)
(Writer [String])
(Cayley (Reader LocalStoreWithId) (PKleisli IO))
input
output
forall t (eff :: * -> * -> *) a b.
t -> (~>) (Reader t) eff a b -> eff a b
runReader ExternalConfig
externalConfig Cayley
(Reader ExternalConfig)
(Cayley
(Writer [String]) (Cayley (Reader LocalStoreWithId) (PKleisli IO)))
input
output
pipelineWithDockerConfigReader
if Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ [String] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [String]
configErrors
then String -> IO ()
forall (m :: * -> *) a.
(MonadThrow m, HasCallStack) =>
String -> m a
throwString (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Configuration failed with errors: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ [String] -> String
forall a. Show a => a -> String
show [String]
configErrors
else IO ()
forall a. Monoid a => a
mempty :: IO ()
let
core :: PKleisli IO input output
core = LocalStoreWithId
-> Cayley (Reader LocalStoreWithId) (PKleisli IO) input output
-> PKleisli IO input output
forall t (eff :: * -> * -> *) a b.
t -> (~>) (Reader t) eff a b -> eff a b
runReader (ContentStore -> Maybe Int -> LocalStoreWithId
localStoreWithId ContentStore
store Maybe Int
defaultCachingId) Cayley (Reader LocalStoreWithId) (PKleisli IO) input output
weavePipeline'
if Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ [Text] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Text]
dockerImages
then do
String -> IO ()
putStrLn String
"Found docker images, pulling..."
let
dockerImagesSet :: Set Text
dockerImagesSet = [Text] -> Set Text
forall a. Ord a => [a] -> Set a
fromList [Text]
dockerImages
handleDockerImage :: Text -> IO ()
handleDockerImage Text
image = do
String -> IO ()
putStrLn (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Pulling docker image: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ Text -> String
T.unpack Text
image
Either DockerClientError ()
pullResult <- ExceptT DockerClientError IO () -> IO (Either DockerClientError ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT DockerClientError IO ()
-> IO (Either DockerClientError ()))
-> ExceptT DockerClientError IO ()
-> IO (Either DockerClientError ())
forall a b. (a -> b) -> a -> b
$ Manager -> Text -> ExceptT DockerClientError IO ()
pullImage Manager
manager Text
image
case Either DockerClientError ()
pullResult of
Left DockerClientError
ex -> do
String -> IO ()
putStrLn String
"Error pulling docker image; is Internet connected? Is Docker running?"
DockerClientError -> IO ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw DockerClientError
ex
Right ()
_ ->
IO ()
forall a. Monoid a => a
mempty :: IO ()
(Text -> IO ()) -> Set Text -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Text -> IO ()
handleDockerImage Set Text
dockerImagesSet
else IO ()
forall a. Monoid a => a
mempty
input -> PKleisli IO input output -> IO output
forall a (m :: * -> *) b. a -> PKleisli m a b -> m b
performP input
input PKleisli IO input output
core
runFlow ::
LooseRopeWith RequiredStrands (RequiredCore IO) input output ->
input ->
IO output
runFlow :: LooseRopeWith RequiredStrands (RequiredCore IO) input output
-> input -> IO output
runFlow = RunFlowConfig
-> LooseRopeWith RequiredStrands (RequiredCore IO) input output
-> input
-> IO output
forall input output.
RunFlowConfig
-> LooseRopeWith RequiredStrands (RequiredCore IO) input output
-> input
-> IO output
runFlowWithConfig (RunFlowConfig :: Path Abs Dir -> Maybe (Path Abs File) -> RunFlowConfig
RunFlowConfig {storePath :: Path Abs Dir
storePath = [absdir|/tmp/funflow/store/|], configFile :: Maybe (Path Abs File)
configFile = Maybe (Path Abs File)
forall a. Maybe a
Nothing})
interpretSimpleTask :: (Arrow a, HasKleisliIO m a) => SimpleTask i o -> a i o
interpretSimpleTask :: SimpleTask i o -> a i o
interpretSimpleTask SimpleTask i o
simpleTask = case SimpleTask i o
simpleTask of
PureTask i -> o
f -> (i -> o) -> a i o
forall (a :: * -> * -> *) b c. Arrow a => (b -> c) -> a b c
arr i -> o
f
IOTask i -> IO o
f -> (i -> IO o) -> a i o
forall (m :: * -> *) (eff :: * -> * -> *) a b.
HasKleisliIO m eff =>
(a -> IO b) -> eff a b
liftKleisliIO i -> IO o
f
interpretStoreTask :: (Arrow a, HasKleisliIO m a, RC.Cacher m RC.NoCache) => CS.ContentStore -> StoreTask i o -> a i o
interpretStoreTask :: ContentStore -> StoreTask i o -> a i o
interpretStoreTask ContentStore
store StoreTask i o
storeTask = case StoreTask i o
storeTask of
StoreTask i o
PutDir ->
(Path Abs Dir -> IO Item) -> a (Path Abs Dir) Item
forall (m :: * -> *) (eff :: * -> * -> *) a b.
HasKleisliIO m eff =>
(a -> IO b) -> eff a b
liftKleisliIO ((Path Abs Dir -> IO Item) -> a (Path Abs Dir) Item)
-> (Path Abs Dir -> IO Item) -> a (Path Abs Dir) Item
forall a b. (a -> b) -> a -> b
$ \Path Abs Dir
dirPath ->
let
directoryContent :: DirectoryContent
directoryContent = Path Abs Dir -> DirectoryContent
DirectoryContent Path Abs Dir
dirPath
handleError :: ContentHash -> IO ()
handleError ContentHash
hash = String -> IO ()
forall (m :: * -> *) a.
(MonadThrow m, HasCallStack) =>
String -> m a
throwString (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Could not put directory " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Path Abs Dir -> String
forall a. Show a => a -> String
show Path Abs Dir
dirPath String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
" in store item " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> ContentHash -> String
forall a. Show a => a -> String
show ContentHash
hash
copy :: Path Abs Dir -> DirectoryContent -> IO ()
copy :: Path Abs Dir -> DirectoryContent -> IO ()
copy Path Abs Dir
destinationPath (DirectoryContent Path Abs Dir
sourcePath) = Path Abs Dir -> Path Abs Dir -> IO ()
forall (m :: * -> *) b0 b1.
(MonadIO m, MonadCatch m) =>
Path b0 Dir -> Path b1 Dir -> m ()
copyDirRecur Path Abs Dir
sourcePath Path Abs Dir
destinationPath
in
ContentStore
-> NoCache
-> (ContentHash -> IO ())
-> (Path Abs Dir -> DirectoryContent -> IO ())
-> DirectoryContent
-> IO Item
forall (m :: * -> *) remoteCacher t.
(MonadIO m, MonadMask m, MonadUnliftIO m, Cacher m remoteCacher,
ContentHashable IO t) =>
ContentStore
-> remoteCacher
-> (ContentHash -> m ())
-> (Path Abs Dir -> t -> m ())
-> t
-> m Item
CS.putInStore ContentStore
store NoCache
RC.NoCache ContentHash -> IO ()
handleError Path Abs Dir -> DirectoryContent -> IO ()
copy DirectoryContent
directoryContent
StoreTask i o
GetDir ->
(Item -> Path Abs Dir) -> a Item (Path Abs Dir)
forall (a :: * -> * -> *) b c. Arrow a => (b -> c) -> a b c
arr ((Item -> Path Abs Dir) -> a Item (Path Abs Dir))
-> (Item -> Path Abs Dir) -> a Item (Path Abs Dir)
forall a b. (a -> b) -> a -> b
$ \Item
item -> ContentStore -> Item -> Path Abs Dir
CS.itemPath ContentStore
store Item
item
interpretDockerTask ::
(Arrow core, HasKleisliIO m core, HasCallStack) =>
Manager ->
CS.ContentStore ->
DockerTask i o ->
(Writer [T.Text] ~> Writer ConfigKeysBySource ~> Reader ExternalConfig ~> Writer [String] ~> core) i o
interpretDockerTask :: Manager
-> ContentStore
-> DockerTask i o
-> (~>)
(Writer [Text])
(Writer ConfigKeysBySource
~> (Reader ExternalConfig ~> (Writer [String] ~> core)))
i
o
interpretDockerTask Manager
manager ContentStore
store (DockerTask (DockerTaskConfig {Text
image :: DockerTaskConfig -> Text
image :: Text
DE.image, Text
command :: DockerTaskConfig -> Text
command :: Text
DE.command, [Arg]
args :: DockerTaskConfig -> [Arg]
args :: [Arg]
DE.args})) =
let requiredConfigs :: ConfigKeysBySource
requiredConfigs = [ConfigKeysBySource] -> ConfigKeysBySource
forall a. Monoid a => [a] -> a
mconcat ([ConfigKeysBySource] -> ConfigKeysBySource)
-> [ConfigKeysBySource] -> ConfigKeysBySource
forall a b. (a -> b) -> a -> b
$ (Arg -> Maybe ConfigKeysBySource) -> [Arg] -> [ConfigKeysBySource]
forall a b. (a -> Maybe b) -> [a] -> [b]
mapMaybe Arg -> Maybe ConfigKeysBySource
getIdFromArg [Arg]
args
in
[Text]
-> (Writer ConfigKeysBySource
~> (Reader ExternalConfig ~> (Writer [String] ~> core)))
:-> (Writer [Text]
~> (Writer ConfigKeysBySource
~> (Reader ExternalConfig ~> (Writer [String] ~> core))))
forall w (eff :: * -> * -> *). w -> eff :-> (Writer w ~> eff)
writing [Text
image] (Cayley
(Writer ConfigKeysBySource)
(Reader ExternalConfig ~> (Writer [String] ~> core))
DockerTaskInput
Item
-> (~>)
(Writer [Text])
(Writer ConfigKeysBySource
~> (Reader ExternalConfig ~> (Writer [String] ~> core)))
DockerTaskInput
Item)
-> Cayley
(Writer ConfigKeysBySource)
(Reader ExternalConfig ~> (Writer [String] ~> core))
DockerTaskInput
Item
-> (~>)
(Writer [Text])
(Writer ConfigKeysBySource
~> (Reader ExternalConfig ~> (Writer [String] ~> core)))
DockerTaskInput
Item
forall a b. (a -> b) -> a -> b
$
ConfigKeysBySource
-> (Reader ExternalConfig ~> (Writer [String] ~> core))
:-> (Writer ConfigKeysBySource
~> (Reader ExternalConfig ~> (Writer [String] ~> core)))
forall w (eff :: * -> * -> *). w -> eff :-> (Writer w ~> eff)
writing ConfigKeysBySource
requiredConfigs (Cayley
(Reader ExternalConfig)
(Writer [String] ~> core)
DockerTaskInput
Item
-> Cayley
(Writer ConfigKeysBySource)
(Reader ExternalConfig ~> (Writer [String] ~> core))
DockerTaskInput
Item)
-> Cayley
(Reader ExternalConfig)
(Writer [String] ~> core)
DockerTaskInput
Item
-> Cayley
(Writer ConfigKeysBySource)
(Reader ExternalConfig ~> (Writer [String] ~> core))
DockerTaskInput
Item
forall a b. (a -> b) -> a -> b
$
(ExternalConfig
-> Cayley (Writer [String]) core DockerTaskInput Item)
-> Cayley
(Reader ExternalConfig)
(Writer [String] ~> core)
DockerTaskInput
Item
forall t (eff :: * -> * -> *) a b.
(t -> eff a b) -> (~>) (Reader t) eff a b
reading ((ExternalConfig
-> Cayley (Writer [String]) core DockerTaskInput Item)
-> Cayley
(Reader ExternalConfig)
(Writer [String] ~> core)
DockerTaskInput
Item)
-> (ExternalConfig
-> Cayley (Writer [String]) core DockerTaskInput Item)
-> Cayley
(Reader ExternalConfig)
(Writer [String] ~> core)
DockerTaskInput
Item
forall a b. (a -> b) -> a -> b
$ \ExternalConfig
externalConfig ->
let ([String]
configErrors, [Arg]
argsRenderedWithConfig) = [Either String Arg] -> ([String], [Arg])
forall a b. [Either a b] -> ([a], [b])
partitionEithers ([Either String Arg] -> ([String], [Arg]))
-> [Either String Arg] -> ([String], [Arg])
forall a b. (a -> b) -> a -> b
$ (Arg -> Either String Arg) -> [Arg] -> [Either String Arg]
forall a b. (a -> b) -> [a] -> [b]
map (ExternalConfig -> Arg -> Either String Arg
renderArg ExternalConfig
externalConfig) [Arg]
args
in
[String] -> core :-> (Writer [String] ~> core)
forall w (eff :: * -> * -> *). w -> eff :-> (Writer w ~> eff)
writing [String]
configErrors (core DockerTaskInput Item
-> Cayley (Writer [String]) core DockerTaskInput Item)
-> core DockerTaskInput Item
-> Cayley (Writer [String]) core DockerTaskInput Item
forall a b. (a -> b) -> a -> b
$
(DockerTaskInput -> IO Item) -> core DockerTaskInput Item
forall (m :: * -> *) (eff :: * -> * -> *) a b.
HasKleisliIO m eff =>
(a -> IO b) -> eff a b
liftKleisliIO ((DockerTaskInput -> IO Item) -> core DockerTaskInput Item)
-> (DockerTaskInput -> IO Item) -> core DockerTaskInput Item
forall a b. (a -> b) -> a -> b
$ \(DockerTaskInput {[VolumeBinding]
inputBindings :: DockerTaskInput -> [VolumeBinding]
inputBindings :: [VolumeBinding]
DE.inputBindings, Map String Text
argsVals :: DockerTaskInput -> Map String Text
argsVals :: Map String Text
DE.argsVals}) ->
let argsFilled :: [Either String Text]
argsFilled =
[ ( case Arg
arg of
Arg Configurable Text
configValue -> case Configurable Text
configValue of
Literal Text
value -> Text -> Either String Text
forall a b. b -> Either a b
Right Text
value
ConfigFromEnv Text
k -> String -> Either String Text
forall a b. a -> Either a b
Left (String -> Either String Text) -> String -> Either String Text
forall a b. (a -> b) -> a -> b
$ String
"interpretDockerTask encountered an unrendered externally configurable value at key: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ Text -> String
T.unpack Text
k
ConfigFromFile Text
k -> String -> Either String Text
forall a b. a -> Either a b
Left (String -> Either String Text) -> String -> Either String Text
forall a b. (a -> b) -> a -> b
$ String
"interpretDockerTask encountered an unrendered externally configurable value at key: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ Text -> String
T.unpack Text
k
Placeholder String
label ->
let maybeVal :: Maybe Text
maybeVal = String -> Map String Text -> Maybe Text
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup String
label Map String Text
argsVals
in case Maybe Text
maybeVal of
Maybe Text
Nothing -> String -> Either String Text
forall a b. a -> Either a b
Left (String -> Either String Text) -> String -> Either String Text
forall a b. (a -> b) -> a -> b
$ String
"Unfilled label (" String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
label String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
")"
Just Text
val -> Text -> Either String Text
forall a b. b -> Either a b
Right Text
val
)
| Arg
arg <- [Arg]
argsRenderedWithConfig
]
in
if (Either String Text -> Bool) -> [Either String Text] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any Either String Text -> Bool
forall a b. Either a b -> Bool
isLeft [Either String Text]
argsFilled
then
let labelAndConfigErrors :: [String]
labelAndConfigErrors = [Either String Text] -> [String]
forall a b. [Either a b] -> [a]
lefts [Either String Text]
argsFilled
in String -> IO Item
forall (m :: * -> *) a.
(MonadThrow m, HasCallStack) =>
String -> m a
throwString (String -> IO Item) -> String -> IO Item
forall a b. (a -> b) -> a -> b
$ String
"Docker task failed with configuration errors: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ [String] -> String
forall a. Show a => a -> String
show [String]
labelAndConfigErrors
else do
let argsFilledChecked :: [Text]
argsFilledChecked = [Text
argVal | (Right Text
argVal) <- [Either String Text]
argsFilled]
UserID
uid <- IO UserID
getEffectiveUserID
GroupID
gid <- IO GroupID
getEffectiveGroupID
let
defaultWorkingDirName :: String
defaultWorkingDirName = String
"workdir"
defaultContainerWorkingDirPath :: String
defaultContainerWorkingDirPath = String
"/" String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
defaultWorkingDirName
container :: ContainerSpec
container =
(Text -> ContainerSpec
defaultContainerSpec Text
image)
{ workingDir :: Text
workingDir = String -> Text
T.pack String
defaultContainerWorkingDirPath,
cmd :: [Text]
cmd = [Text
command] [Text] -> [Text] -> [Text]
forall a. Semigroup a => a -> a -> a
<> [Text]
argsFilledChecked,
hostVolumes :: [Text]
hostVolumes = (String -> Text) -> [String] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map String -> Text
forall a. IsString a => String -> a
fromString [(Path Abs Dir -> String
forall b t. Path b t -> String
toFilePath (Path Abs Dir -> String) -> Path Abs Dir -> String
forall a b. (a -> b) -> a -> b
$ ContentStore -> Item -> Path Abs Dir
CS.itemPath ContentStore
store Item
item) String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
":" String -> String -> String
forall a. Semigroup a => a -> a -> a
<> (Path Abs Dir -> String
forall b t. Path b t -> String
toFilePath Path Abs Dir
mount) String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
":ro" | VolumeBinding {Item
item :: VolumeBinding -> Item
item :: Item
DE.item, Path Abs Dir
mount :: VolumeBinding -> Path Abs Dir
mount :: Path Abs Dir
DE.mount} <- [VolumeBinding]
inputBindings]
}
Either DockerClientError String
runDockerResult <-
ExceptT DockerClientError IO String
-> IO (Either DockerClientError String)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT DockerClientError IO String
-> IO (Either DockerClientError String))
-> ExceptT DockerClientError IO String
-> IO (Either DockerClientError String)
forall a b. (a -> b) -> a -> b
$ do
String
containerId <- Manager -> ContainerSpec -> ExceptT DockerClientError IO String
runContainer Manager
manager ContainerSpec
container
Manager
-> ContainerLogType -> String -> ExceptT DockerClientError IO ()
printContainerLogs Manager
manager ContainerLogType
Both String
containerId
Manager -> String -> ExceptT DockerClientError IO ()
awaitContainer Manager
manager String
containerId
String -> ExceptT DockerClientError IO String
forall (m :: * -> *) a. Monad m => a -> m a
return String
containerId
case Either DockerClientError String
runDockerResult of
Left DockerClientError
ex -> DockerClientError -> IO Item
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw DockerClientError
ex
Right String
containerId ->
let
handleError :: a -> m a
handleError a
hash = String -> m a
forall (m :: * -> *) a.
(MonadThrow m, HasCallStack) =>
String -> m a
throwString (String -> m a) -> String -> m a
forall a b. (a -> b) -> a -> b
$ String
"Could not put in store item " String -> String -> String
forall a. [a] -> [a] -> [a]
++ a -> String
forall a. Show a => a -> String
show a
hash
copyDockerContainer :: String -> IO Item
copyDockerContainer String
tmpPath = do
Either DockerClientError ()
copyResult <- ExceptT DockerClientError IO () -> IO (Either DockerClientError ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT DockerClientError IO ()
-> IO (Either DockerClientError ()))
-> ExceptT DockerClientError IO ()
-> IO (Either DockerClientError ())
forall a b. (a -> b) -> a -> b
$ Manager
-> UserID
-> GroupID
-> String
-> String
-> String
-> ExceptT DockerClientError IO ()
saveContainerArchive Manager
manager UserID
uid GroupID
gid String
defaultContainerWorkingDirPath String
tmpPath String
containerId
case Either DockerClientError ()
copyResult of
Left DockerClientError
ex -> DockerClientError -> IO Item
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw DockerClientError
ex
Right ()
_ -> do
Path Abs Dir
absTmpPath <- String -> IO (Path Abs Dir)
forall (m :: * -> *). MonadThrow m => String -> m (Path Abs Dir)
parseAbsDir String
tmpPath
Path Abs Dir
itemWorkdir <- (Path Abs Dir
absTmpPath Path Abs Dir -> Path Rel Dir -> Path Abs Dir
forall b t. Path b Dir -> Path Rel t -> Path b t
</>) (Path Rel Dir -> Path Abs Dir)
-> IO (Path Rel Dir) -> IO (Path Abs Dir)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> String -> IO (Path Rel Dir)
forall (m :: * -> *). MonadThrow m => String -> m (Path Rel Dir)
parseRelDir String
defaultWorkingDirName
Path Abs Dir -> Path Abs Dir -> IO ()
moveDirectoryContent Path Abs Dir
itemWorkdir Path Abs Dir
absTmpPath
String -> IO ()
removeDirectory (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ Path Abs Dir -> String
forall b t. Path b t -> String
toFilePath Path Abs Dir
itemWorkdir
ContentStore
-> NoCache
-> (ContentHash -> IO ())
-> (Path Abs Dir -> Path Abs Dir -> IO ())
-> Path Abs Dir
-> IO Item
forall (m :: * -> *) remoteCacher t.
(MonadIO m, MonadMask m, MonadUnliftIO m, Cacher m remoteCacher,
ContentHashable IO t) =>
ContentStore
-> remoteCacher
-> (ContentHash -> m ())
-> (Path Abs Dir -> t -> m ())
-> t
-> m Item
CS.putInStore ContentStore
store NoCache
RC.NoCache ContentHash -> IO ()
forall (m :: * -> *) a a. (MonadThrow m, Show a) => a -> m a
handleError ((Path Abs Dir -> Path Abs Dir -> IO ())
-> Path Abs Dir -> Path Abs Dir -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip Path Abs Dir -> Path Abs Dir -> IO ()
moveDirectoryContent) Path Abs Dir
absTmpPath
in String -> (String -> IO Item) -> IO Item
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
String -> (String -> m a) -> m a
withSystemTempDirectory String
"funflow" String -> IO Item
copyDockerContainer