summaryrefslogtreecommitdiff
path: root/src/Control/Funflow/Exec/Simple.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Control/Funflow/Exec/Simple.hs')
-rw-r--r--src/Control/Funflow/Exec/Simple.hs115
1 files changed, 62 insertions, 53 deletions
diff --git a/src/Control/Funflow/Exec/Simple.hs b/src/Control/Funflow/Exec/Simple.hs
index 9e60966..1d810f7 100644
--- a/src/Control/Funflow/Exec/Simple.hs
+++ b/src/Control/Funflow/Exec/Simple.hs
@@ -21,7 +21,6 @@ module Control.Funflow.Exec.Simple
, withSimpleLocalRunner
) where
-import Control.Arrow (returnA)
import Control.Arrow.Async
import Control.Arrow.Free (type (~>), eval)
import Control.Concurrent.Async (withAsync)
@@ -37,12 +36,14 @@ import Control.Funflow.External
import Control.Funflow.External.Coordinator
import Control.Funflow.External.Coordinator.Memory
import Control.Funflow.External.Executor (executeLoop)
+import qualified Control.Funflow.RemoteCache as Remote
+import Control.Monad.Catch (MonadCatch,
+ MonadMask)
import Control.Monad.IO.Class
import Control.Monad.Trans.Control (MonadBaseControl)
-import Control.Monad.Catch (MonadCatch
- ,MonadMask)
import qualified Data.ByteString as BS
import Data.Foldable (traverse_)
+import Data.Maybe
import Data.Monoid ((<>))
import Data.Void
import Katip
@@ -50,20 +51,24 @@ import Path
import System.IO (stderr)
-- | Simple evaulation of a flow
-runFlowEx :: forall m c eff ex a b.
+runFlowEx :: forall m c eff ex a b remoteCache.
(Coordinator c, Exception ex, MonadIO m, MonadBaseControl IO m
- ,MonadCatch m, MonadMask m, KatipContext m)
+ ,MonadCatch m, MonadMask m, KatipContext m, Remote.Cacher m remoteCache)
=> c
-> Config c
-> CS.ContentStore
+ -> remoteCache
-> (eff ~> AsyncA m) -- ^ Natural transformation from wrapped effects
- -> Int -- ^ Flow configuration identity. This forms part of the caching
- -- system and is used to disambiguate the same flow run in
- -- multiple configurations.
+ -> Maybe Int -- ^ Flow configuration identity. This forms part of the
+ -- caching system and is used to disambiguate the same
+ -- flow run in multiple configurations. If Nothing,
+ -- then it means this flow has no identity, this
+ -- implies that steps will be executed without cache,
+ -- and external tasks will all be considered impure.
-> Flow eff ex a b
-> a
-> m b
-runFlowEx _ cfg store runWrapped confIdent flow input = do
+runFlowEx _ cfg store cacher runWrapped confIdent flow input = do
hook <- initialise cfg
runAsyncA (eval (runFlow' hook) flow) input
where
@@ -71,32 +76,27 @@ runFlowEx _ cfg store runWrapped confIdent flow input = do
$ CS.itemPath store item </> [relfile|out|]
withStoreCache :: forall i o. Cacher i o
-> AsyncA m i o -> AsyncA m i o
- withStoreCache NoCache f = f
- withStoreCache c f = let
- chashOf i = cacherKey c confIdent i
- checkStore = AsyncA $ \chash ->
- CS.constructOrWait store chash >>= \case
- CS.Pending void -> absurd void
- CS.Complete item -> do
- bs <- liftIO . BS.readFile $ simpleOutPath item
- return . Right . cacherReadValue c $ bs
- CS.Missing fp -> return $ Left fp
- writeStore = AsyncA $ \(chash, fp, res) ->
- do
- liftIO $ BS.writeFile (toFilePath $ fp </> [relfile|out|])
- . cacherStoreValue c $ res
- _ <- CS.markComplete store chash
- return res
- `onException`
- CS.removeFailed store chash
- in proc i -> do
- let chash = chashOf i
- mcontents <- checkStore -< chash
- case mcontents of
- Right contents -> returnA -< contents
- Left fp -> do
- res <- f -< i
- writeStore -< (chash, fp, res)
+ withStoreCache c@Cache{} (AsyncA f)
+ | Just confIdent' <- confIdent = AsyncA $ \i -> do
+ let chash = cacherKey c confIdent' i
+ computeAndStore fp = do
+ res <- f i -- Do the actual computation
+ liftIO $ BS.writeFile (toFilePath $ fp </> [relfile|out|])
+ . cacherStoreValue c $ res
+ return $ Right res
+ readItem item = do
+ bs <- liftIO . BS.readFile $ simpleOutPath item
+ return . cacherReadValue c $ bs
+ CS.withConstructIfMissing store cacher chash computeAndStore >>= \case
+ CS.Missing e -> absurd e
+ CS.Pending _ ->
+ liftIO (CS.waitUntilComplete store chash) >>= \case
+ Just item -> readItem item
+ Nothing -> throwM $ CS.FailedToConstruct chash
+ CS.Complete (Just a, _) -> return a
+ CS.Complete (_, item) -> readItem item
+ withStoreCache _ f = f
+
writeMd :: forall i o. ContentHash
-> i
-> o
@@ -113,13 +113,16 @@ runFlowEx _ cfg store runWrapped confIdent flow input = do
. AsyncA $ \x -> do
let out = f x
case cache props of
- NoCache -> return ()
- Cache key _ _ -> writeMd (key confIdent x) x out $ mdpolicy props
+ Cache key _ _ | Just confIdent' <- confIdent ->
+ writeMd (key confIdent' x) x out $ mdpolicy props
+ _ -> return ()
return out
runFlow' _ (StepIO props f) = withStoreCache (cache props)
. AsyncA $ liftIO . f
runFlow' po (External props toTask) = AsyncA $ \x -> do
- chash <- liftIO $ case (ep_impure props) of
+ let purity | Just _ <- confIdent = ep_impure props
+ | otherwise = alwaysRecompile
+ chash <- liftIO $ case purity of
EpPure -> contentHash (toTask x)
EpImpure fn -> do
salt <- fn
@@ -149,7 +152,9 @@ runFlowEx _ cfg store runWrapped confIdent flow input = do
submitTask po td
wait chash td
wait chash td = do
- KnownTask _ <- awaitTask po chash
+ awaitTask po chash >>= \case
+ KnownTask _ -> pure ()
+ _ -> error "[Control.Funflow.Exec.Simple.runFlowEx] Expected KnownTask."
CS.waitUntilComplete store chash >>= \case
Just item -> return item
Nothing -> do
@@ -159,13 +164,15 @@ runFlowEx _ cfg store runWrapped confIdent flow input = do
throwM $ ExternalTaskFailed td ti mbStdout mbStderr
runFlow' _ (PutInStore f) = AsyncA $ \x -> katipAddNamespace "putInStore" $ do
chash <- liftIO $ contentHash x
- CS.constructOrWait store chash >>= \case
+ CS.constructOrWait store cacher chash >>= \case
CS.Pending void -> absurd void
CS.Complete item -> return item
CS.Missing fp ->
do
liftIO $ f fp x
- CS.markComplete store chash
+ finalItem <- CS.markComplete store chash
+ _ <- Remote.push cacher (CS.itemHash finalItem) (Just chash) (CS.itemPath store finalItem)
+ pure finalItem
`onException`
(do $(logTM) WarningS . ls $ "Exception in construction: removing " <> show chash
CS.removeFailed store chash
@@ -178,40 +185,42 @@ runFlowEx _ cfg store runWrapped confIdent flow input = do
$ runWrapped w
-- | Run a flow in a logging context.
-runFlowLog :: forall m c eff ex a b.
+runFlowLog :: forall m c eff ex a b remoteCache.
(Coordinator c, Exception ex, MonadIO m, MonadBaseControl IO m
- ,MonadCatch m, MonadMask m, KatipContext m)
+ ,MonadCatch m, MonadMask m, KatipContext m, Remote.Cacher m remoteCache)
=> c
-> Config c
-> CS.ContentStore
+ -> remoteCache
-> (eff ~> AsyncA m) -- ^ Natural transformation from wrapped effects
- -> Int -- ^ Flow configuration identity. This forms part of the caching
+ -> Maybe Int -- ^ Flow configuration identity. This forms part of the caching
-- system and is used to disambiguate the same flow run in
-- multiple configurations.
-> Flow eff ex a b
-> a
-> m (Either ex b)
-runFlowLog c cfg store runWrapped confIdent flow input =
- try $ runFlowEx c cfg store runWrapped confIdent flow input
+runFlowLog c cfg store cacher runWrapped confIdent flow input =
+ try $ runFlowEx c cfg store cacher runWrapped confIdent flow input
-- | Run a flow, discarding all logging.
-runFlow :: forall m c eff ex a b.
+runFlow :: forall m c eff ex a b remoteCache.
(Coordinator c, Exception ex, MonadIO m, MonadBaseControl IO m
- ,MonadCatch m, MonadMask m)
+ ,MonadCatch m, MonadMask m, Remote.Cacher (KatipContextT m) remoteCache)
=> c
-> Config c
-> CS.ContentStore
+ -> remoteCache
-> (eff ~> AsyncA m) -- ^ Natural transformation from wrapped effects
- -> Int -- ^ Flow configuration identity. This forms part of the caching
+ -> Maybe Int -- ^ Flow configuration identity. This forms part of the caching
-- system and is used to disambiguate the same flow run in
-- multiple configurations.
-> Flow eff ex a b
-> a
-> m (Either ex b)
-runFlow c cfg store runWrapped confIdent flow input = do
+runFlow c cfg store cacher runWrapped confIdent flow input = do
le <- liftIO $ initLogEnv "funflow" "production"
runKatipContextT le () "runFlow"
- $ runFlowLog c cfg store (liftAsyncA . runWrapped) confIdent flow input
+ $ runFlowLog c cfg store cacher (liftAsyncA . runWrapped) confIdent flow input
-- | Run a simple flow. Logging will be sent to stderr
runSimpleFlow :: forall m c a b.
@@ -224,7 +233,7 @@ runSimpleFlow :: forall m c a b.
-> a
-> m (Either SomeException b)
runSimpleFlow c ccfg store flow input = do
- handleScribe <- liftIO $ mkHandleScribe ColorIfTerminal stderr InfoS V2
+ handleScribe <- liftIO $ mkHandleScribe ColorIfTerminal stderr (permitItem InfoS) V2
let mkLogEnv = liftIO $
registerScribe "stderr" handleScribe defaultScribeSettings =<< initLogEnv "funflow" "production"
bracket mkLogEnv (liftIO . closeScribes) $ \le -> do
@@ -232,7 +241,7 @@ runSimpleFlow c ccfg store flow input = do
initialNamespace = "executeLoop"
runKatipContextT le initialContext initialNamespace
- $ runFlowLog c ccfg store runNoEffect 12345 flow input
+ $ runFlowLog c ccfg store Remote.NoCache runNoEffect (Just 12345) flow input
-- | Create a full pipeline runner locally. This includes an executor for
-- executing external tasks.