summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authornclarke <>2019-10-09 08:30:00 (GMT)
committerhdiff <hdiff@hdiff.luite.com>2019-10-09 08:30:00 (GMT)
commit31eaf2c2a3b07a6553aef61572aa685426d9f5c2 (patch)
tree0b6efd77935436a69661949149e6f7534abe8a95
parent3ceb65118bcf26cd628c9e0a0aba32cde5e9b4bf (diff)
version 1.5.0HEAD1.5.0master
-rw-r--r--TestFunflow.hs48
-rw-r--r--app/FFExecutorD.hs3
-rw-r--r--funflow.cabal15
-rw-r--r--src/Control/Arrow/AppArrow.hs39
-rw-r--r--src/Control/Arrow/Async.hs3
-rw-r--r--src/Control/Arrow/Free.hs36
-rw-r--r--src/Control/Funflow/Class.hs16
-rw-r--r--src/Control/Funflow/ContentHashable.hs5
-rw-r--r--src/Control/Funflow/ContentStore.hs64
-rw-r--r--src/Control/Funflow/Diagram.hs4
-rw-r--r--src/Control/Funflow/Exec/Simple.hs115
-rw-r--r--src/Control/Funflow/External/Executor.hs8
-rw-r--r--src/Control/Funflow/Lock.hs12
-rw-r--r--src/Control/Funflow/Pretty.hs2
-rw-r--r--src/Control/Funflow/RemoteCache.hs134
-rw-r--r--src/Control/Funflow/Steps.hs15
-rw-r--r--test/Funflow/ContentStore.hs76
17 files changed, 461 insertions, 134 deletions
diff --git a/TestFunflow.hs b/TestFunflow.hs
index 93828dc..09a8373 100644
--- a/TestFunflow.hs
+++ b/TestFunflow.hs
@@ -1,6 +1,7 @@
-{-# LANGUAGE Arrows #-}
-{-# LANGUAGE GADTs #-}
-{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE Arrows #-}
+{-# LANGUAGE GADTs #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE ScopedTypeVariables #-}
import Control.Arrow
import Control.Arrow.Free
@@ -9,7 +10,9 @@ import Control.Funflow
import qualified Control.Funflow.ContentStore as CS
import Control.Funflow.External.Coordinator.Memory
import Control.Funflow.Pretty
+import Data.Default
import Data.Monoid ((<>))
+import Path
import Path.IO
mkError :: String -> SomeException
@@ -32,22 +35,33 @@ flow2caught = retry 100 0 flow2
flow3 :: SimpleFlow [Int] [Int]
flow3 = mapA (arr (+1))
+runFailingFlow :: Path Abs Dir -> IO ()
+runFailingFlow storeDir = withSimpleLocalRunner storeDir $ \runner -> do
+ r <- runner cachedFailStep ()
+ print r
+
+testFailingFlow :: IO ()
+testFailingFlow =
+ withSystemTempDir "test_output" $ \storeDir ->
+ runFailingFlow storeDir >> runFailingFlow storeDir
+
main :: IO ()
-main =
+main = do
+ testFailingFlow
withSystemTempDir "test_output" $ \storeDir ->
- CS.withStore storeDir $ \store -> do
- memHook <- createMemoryCoordinator
- res <- runSimpleFlow MemoryCoordinator memHook store flow2 ()
- print res
- res' <- runSimpleFlow MemoryCoordinator memHook store flow2caught ()
- print res'
- putStrLn $ showFlow myFlow
- putStrLn $ showFlow flow2
- res1 <- runSimpleFlow MemoryCoordinator memHook store flow3 [1..10]
- print res1
--- main = redisTest
- externalTest
- storeTest
+ CS.withStore storeDir $ \store -> do
+ memHook <- createMemoryCoordinator
+ res <- runSimpleFlow MemoryCoordinator memHook store flow2 ()
+ print res
+ res' <- runSimpleFlow MemoryCoordinator memHook store flow2caught ()
+ print res'
+ putStrLn $ showFlow myFlow
+ putStrLn $ showFlow flow2
+ res1 <- runSimpleFlow MemoryCoordinator memHook store flow3 [1..10]
+ print res1
+ -- main = redisTest
+ externalTest
+ storeTest
externalTest :: IO ()
externalTest = let
diff --git a/app/FFExecutorD.hs b/app/FFExecutorD.hs
index cf445fb..70ae7cd 100644
--- a/app/FFExecutorD.hs
+++ b/app/FFExecutorD.hs
@@ -19,6 +19,7 @@ import Control.Funflow.External.Executor
import Control.Monad (void)
import Data.Monoid ((<>))
import qualified Database.Redis as R
+import Network.Socket (PortNumber)
import qualified Options.Applicative as Opt
import Path
import System.Clock
@@ -64,7 +65,7 @@ argsParser = Opt.subparser
<> Opt.help "Password for the Redis instance, if needed." ))
useRedis store host port pw = Args store $ UseRedis R.defaultConnectInfo
{ R.connectHost = host
- , R.connectPort = R.PortNumber port
+ , R.connectPort = R.PortNumber (port :: PortNumber)
, R.connectAuth = pw
}
sqliteParser = useSQLite
diff --git a/funflow.cabal b/funflow.cabal
index a1bf17f..31d5013 100644
--- a/funflow.cabal
+++ b/funflow.cabal
@@ -1,5 +1,5 @@
Name: funflow
-Version: 1.4.0
+Version: 1.5.0
Synopsis: Workflows with arrows
Description:
An arrow with resumable computations and logging
@@ -24,7 +24,8 @@ Library
default-language: Haskell2010
Exposed-modules:
- Control.Arrow.Async
+ Control.Arrow.AppArrow
+ , Control.Arrow.Async
, Control.Arrow.Free
, Control.Funflow
, Control.Funflow.Cache.TH
@@ -40,6 +41,7 @@ Library
, Control.Funflow.External.Coordinator.SQLite
, Control.Funflow.Lock
, Control.Funflow.Orphans
+ , Control.Funflow.RemoteCache
, Control.Funflow.Steps
, Control.Funflow.Pretty
, Control.Funflow.Exec.Simple
@@ -67,7 +69,7 @@ Library
, hedis
, hostname
, integer-gmp
- , katip >= 0.5.0.1
+ , katip >= 0.8.0.0
, lens
, lifted-async
, memory
@@ -83,6 +85,7 @@ Library
, sqlite-simple
, stm
, store
+ , tar
, template-haskell >= 2.11
, text
, time
@@ -108,7 +111,8 @@ Executable ffexecutord
, bytestring
, clock
, funflow
- , hedis
+ , hedis >= 0.12.5
+ , network
, path
, text
, unix
@@ -122,9 +126,10 @@ Test-suite test-funflow
main-is: TestFunflow.hs
ghc-options: -Wall -threaded
build-depends: base >=4.6 && <5
+ , data-default
, funflow
, filepath
- , hedis
+ , hedis >= 0.12.5
, path
, path-io
, text
diff --git a/src/Control/Arrow/AppArrow.hs b/src/Control/Arrow/AppArrow.hs
new file mode 100644
index 0000000..fe88236
--- /dev/null
+++ b/src/Control/Arrow/AppArrow.hs
@@ -0,0 +1,39 @@
+-- | This modules defines the composition of an applicative functor and an
+-- arrow, which is always an arrow.
+
+module Control.Arrow.AppArrow
+ ( AppArrow(..)
+ , appArrow
+ ) where
+
+import Control.Category
+import Control.Arrow
+import Prelude hiding (id, (.))
+
+newtype AppArrow app arr a b = AppArrow { unAppArrow :: app (arr a b) }
+
+instance (Applicative app, Category cat) => Category (AppArrow app cat) where
+ id = appArrow id
+ AppArrow a1 . AppArrow a2 = AppArrow $ (.) <$> a1 <*> a2
+
+instance (Applicative app, Arrow arr) => Arrow (AppArrow app arr) where
+ arr = appArrow . arr
+ first (AppArrow a) = AppArrow $ first <$> a
+ second (AppArrow a) = AppArrow $ second <$> a
+ AppArrow a1 *** AppArrow a2 = AppArrow $ (***) <$> a1 <*> a2
+
+instance (Applicative app, ArrowChoice arr) => ArrowChoice (AppArrow app arr) where
+ left (AppArrow a) = AppArrow $ left <$> a
+ right (AppArrow a) = AppArrow $ right <$> a
+ AppArrow a1 +++ AppArrow a2 = AppArrow $ (+++) <$> a1 <*> a2
+ AppArrow a1 ||| AppArrow a2 = AppArrow $ (|||) <$> a1 <*> a2
+
+instance (Applicative f, Arrow arr) => Functor (AppArrow f arr t) where
+ fmap f = (>>> arr f)
+
+instance (Applicative app, Arrow arr) => Applicative (AppArrow app arr t) where
+ pure = arr . const
+ a <*> b = a &&& b >>> arr (uncurry ($))
+
+appArrow :: (Applicative app) => arr a b -> AppArrow app arr a b
+appArrow = AppArrow . pure
diff --git a/src/Control/Arrow/Async.hs b/src/Control/Arrow/Async.hs
index 360668b..4a27473 100644
--- a/src/Control/Arrow/Async.hs
+++ b/src/Control/Arrow/Async.hs
@@ -40,8 +40,7 @@ instance MonadBaseControl IO m => ArrowChoice (AsyncA m) where
instance (Exception ex, MonadBaseControl IO m, MonadCatch m)
=> ArrowError ex (AsyncA m) where
- AsyncA arr1 `catch` AsyncA arr2 = AsyncA $ \x ->
- arr1 x `Control.Exception.Safe.catch` curry arr2 x
+ try (AsyncA a) = AsyncA $ Control.Exception.Safe.try . a
-- | Lift an AsyncA through a monad transformer of the underlying monad.
liftAsyncA :: (MonadTrans t, Monad m)
diff --git a/src/Control/Arrow/Free.hs b/src/Control/Arrow/Free.hs
index 7bae9f2..149ab13 100644
--- a/src/Control/Arrow/Free.hs
+++ b/src/Control/Arrow/Free.hs
@@ -32,6 +32,7 @@ module Control.Arrow.Free
, eval
-- * ArrowError
, ArrowError(..)
+ , catch
-- * Arrow functions
, mapA
, mapSeqA
@@ -40,9 +41,13 @@ module Control.Arrow.Free
) where
import Control.Arrow
+import Control.Arrow.AppArrow
import Control.Category
import Control.Exception.Safe (Exception, MonadCatch)
import qualified Control.Exception.Safe
+import Control.Monad.Trans.Reader
+import Control.Monad.Trans.Writer
+import qualified Control.Monad.Trans.Writer.Strict as SW
import Data.Bool (Bool)
import Data.Constraint (Constraint, Dict (..), mapDict, weaken1,
weaken2)
@@ -50,7 +55,8 @@ import Data.Either (Either (..))
import Data.Function (const, flip, ($))
import Data.List (uncons)
import Data.Maybe (maybe)
-import Data.Tuple (curry, uncurry)
+import Data.Monoid (Monoid)
+import Data.Tuple (uncurry)
-- | A natural transformation on type constructors of two arguments.
type x ~> y = forall a b. x a b -> y a b
@@ -159,12 +165,32 @@ instance FreeArrowLike Choice where
-- | ArrowError represents those arrows which can catch exceptions within the
-- processing of the flow.
class ArrowError ex a where
- catch :: a e c -> a (e, ex) c -> a e c
+ try :: a e c -> a e (Either ex c)
+
+instance (ArrowError ex arr) => ArrowError ex (AppArrow (Reader r) arr) where
+ try (AppArrow act) = AppArrow $ reader $ \r ->
+ try $ runReader act r
+
+instance (ArrowError ex arr, Monoid w) => ArrowError ex (AppArrow (Writer w) arr) where
+ try (AppArrow act) = AppArrow $ writer (try a, w)
+ where (a, w) = runWriter act
+
+instance (ArrowError ex arr, Monoid w) => ArrowError ex (AppArrow (SW.Writer w) arr) where
+ try (AppArrow act) = AppArrow $ SW.writer (try a, w)
+ where (a, w) = SW.runWriter act
+
+catch :: (ArrowError ex a, ArrowChoice a) => a e c -> a (e, ex) c -> a e c
+catch a onExc = proc e -> do
+ res <- try a -< e
+ case res of
+ Left ex ->
+ onExc -< (e, ex)
+ Right r ->
+ returnA -< r
instance (Arrow (Kleisli m), Exception ex, MonadCatch m)
=> ArrowError ex (Kleisli m) where
- Kleisli arr1 `catch` Kleisli arr2 = Kleisli $ \x ->
- arr1 x `Control.Exception.Safe.catch` curry arr2 x
+ try (Kleisli a) = Kleisli $ Control.Exception.Safe.try . a
-- | Freely generated arrows with both choice and error handling.
newtype ErrorChoice ex eff a b = ErrorChoice {
@@ -188,7 +214,7 @@ instance ArrowChoice (ErrorChoice ex eff) where
(ErrorChoice a) ||| (ErrorChoice b) = ErrorChoice $ \f -> a f ||| b f
instance ArrowError ex (ErrorChoice ex eff) where
- (ErrorChoice a) `catch` (ErrorChoice h) = ErrorChoice $ \f -> a f `catch` h f
+ try (ErrorChoice a) = ErrorChoice $ \f -> try $ a f
instance FreeArrowLike (ErrorChoice ex) where
type Ctx (ErrorChoice ex) = Join ArrowChoice (ArrowError ex)
diff --git a/src/Control/Funflow/Class.hs b/src/Control/Funflow/Class.hs
index 833afc1..ec121ba 100644
--- a/src/Control/Funflow/Class.hs
+++ b/src/Control/Funflow/Class.hs
@@ -16,6 +16,7 @@
module Control.Funflow.Class where
import Control.Arrow
+import Control.Arrow.AppArrow
import Control.Arrow.Free
import qualified Control.Funflow.Base as Base
import Control.Funflow.ContentHashable
@@ -24,7 +25,7 @@ import Control.Funflow.External
import Data.Default (def)
import Path
-class (ArrowChoice arr, ArrowError ex arr) => ArrowFlow eff ex arr | arr -> eff ex where
+class (Arrow arr, ArrowError ex arr) => ArrowFlow eff ex arr | arr -> eff ex where
-- | Create a flow from a pure function.
step' :: Base.Properties a b -> (a -> b) -> arr a b
-- | Create a flow from an IO action.
@@ -64,3 +65,16 @@ stepIO = stepIO' def
wrap :: ArrowFlow eff ex arr => eff a b -> arr a b
wrap = wrap' def
+
+instance ( Applicative app
+ , ArrowError ex (AppArrow app (arr eff ex))
+ , ArrowFlow eff ex (arr eff ex) )
+ => ArrowFlow eff ex (AppArrow app (arr eff ex)) where
+ step' props f = appArrow $ step' props f
+ stepIO' props f = appArrow $ stepIO' props f
+ external f = appArrow $ external f
+ external' props f = appArrow $ external' props f
+ wrap' props eff = appArrow $ wrap' props eff
+ putInStore f = appArrow $ putInStore f
+ getFromStore f = appArrow $ getFromStore f
+ internalManipulateStore f = appArrow $ internalManipulateStore f
diff --git a/src/Control/Funflow/ContentHashable.hs b/src/Control/Funflow/ContentHashable.hs
index 6fb41fa..53a6aca 100644
--- a/src/Control/Funflow/ContentHashable.hs
+++ b/src/Control/Funflow/ContentHashable.hs
@@ -52,6 +52,7 @@ module Control.Funflow.ContentHashable
import Control.Exception.Safe (catchJust)
import Control.Funflow.Orphans ()
import Control.Monad (foldM, mzero, (>=>))
+import Control.Monad.IO.Class (MonadIO, liftIO)
import Crypto.Hash (Context, Digest, SHA256,
digestFromByteString,
hashFinalize, hashInit,
@@ -459,9 +460,9 @@ instance ContentHashable IO FileContent where
-- The path to the directory is ignored.
newtype DirectoryContent = DirectoryContent (Path.Path Path.Abs Path.Dir)
-instance ContentHashable IO DirectoryContent where
+instance MonadIO m => ContentHashable m DirectoryContent where
- contentHashUpdate ctx0 (DirectoryContent dir0) = do
+ contentHashUpdate ctx0 (DirectoryContent dir0) = liftIO $ do
(dirs, files) <- Path.IO.listDir dir0
ctx' <- foldM hashFile ctx0 (sort files)
foldM hashDir ctx' (sort dirs)
diff --git a/src/Control/Funflow/ContentStore.hs b/src/Control/Funflow/ContentStore.hs
index 86d774e..b784f29 100644
--- a/src/Control/Funflow/ContentStore.hs
+++ b/src/Control/Funflow/ContentStore.hs
@@ -1,4 +1,5 @@
{-# LANGUAGE DeriveGeneric #-}
+{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
@@ -123,15 +124,16 @@ import Control.Concurrent (threadDelay)
import Control.Concurrent.Async
import Control.Concurrent.MVar
import Control.Exception.Safe (Exception, MonadMask,
- bracket, bracket_,
- bracketOnError,
+ bracket, bracketOnError,
+ bracket_,
displayException, throwIO)
import Control.Funflow.ContentStore.Notify
import Control.Funflow.Orphans ()
import Control.Lens
-import Control.Monad (forever, forM_, unless,
+import Control.Monad (forM_, forever, unless,
void, when, (<=<), (>=>))
import Control.Monad.IO.Class (MonadIO, liftIO)
+import Control.Monad.Trans.Control (MonadBaseControl)
import Crypto.Hash (hashUpdate)
import Data.Aeson (FromJSON, ToJSON)
import Data.Bits (complement)
@@ -166,6 +168,7 @@ import Control.Funflow.ContentHashable (ContentHash,
encodeHash, pathToHash,
toBytes)
import Control.Funflow.Lock
+import qualified Control.Funflow.RemoteCache as Remote
-- | Status of an item in the store.
@@ -465,16 +468,17 @@ waitUntilComplete store hash = lookupOrWait store hash >>= \case
-- It should be constructed in the given @buildDir@,
-- and then marked as complete using 'markComplete'.
constructOrAsync
- :: MonadIO m
+ :: forall m remoteCache.
+ (MonadIO m, MonadBaseControl IO m, MonadMask m, Remote.Cacher m remoteCache)
=> ContentStore
+ -> remoteCache
-> ContentHash
-> m (Status (Path Abs Dir) (Async Update) Item)
-constructOrAsync store hash = liftIO . withStoreLock store $
- internalQuery store hash >>= \case
+constructOrAsync store cacher hash =
+ constructIfMissing store cacher hash >>= \case
Complete item -> return $ Complete item
- Missing () -> withWritableStore store $
- Missing <$> internalMarkPending store hash
- Pending _ -> Pending <$> internalWatchPending store hash
+ Missing path -> return $ Missing path
+ Pending _ -> Pending <$> liftIO (internalWatchPending store hash)
-- | Atomically query the state under the given key and mark pending if missing.
-- Wait for the item to be completed, if already pending.
@@ -485,11 +489,12 @@ constructOrAsync store hash = liftIO . withStoreLock store $
-- It should be constructed in the given @buildDir@,
-- and then marked as complete using 'markComplete'.
constructOrWait
- :: MonadIO m
+ :: (MonadIO m, MonadMask m, MonadBaseControl IO m, Remote.Cacher m remoteCache)
=> ContentStore
+ -> remoteCache
-> ContentHash
-> m (Status (Path Abs Dir) Void Item)
-constructOrWait store hash = constructOrAsync store hash >>= \case
+constructOrWait store cacher hash = constructOrAsync store cacher hash >>= \case
Pending a -> liftIO (wait a) >>= \case
Completed item -> return $ Complete item
-- XXX: Consider extending 'Status' with a 'Failed' constructor.
@@ -503,30 +508,38 @@ constructOrWait store hash = constructOrAsync store hash >>= \case
-- | Atomically query the state under the given key and mark pending if missing.
constructIfMissing
- :: MonadIO m
+ :: (MonadIO m, MonadBaseControl IO m, MonadMask m, Remote.Cacher m remoteCache)
=> ContentStore
+ -> remoteCache
-> ContentHash
-> m (Status (Path Abs Dir) () Item)
-constructIfMissing store hash = liftIO . withStoreLock store $
+constructIfMissing store cacher hash = withStoreLock store $
internalQuery store hash >>= \case
Complete item -> return $ Complete item
+ Missing () -> withWritableStore store $ do
+ let destDir :: Path Abs Dir = mkItemPath store hash
+ Remote.pull cacher hash destDir >>= \case
+ Remote.PullOK () -> return $ Complete (Item hash)
+ Remote.NotInCache ->
+ Missing <$> liftIO (internalMarkPending store hash)
+ Remote.PullError _ ->
+ Missing <$> liftIO (internalMarkPending store hash)
Pending _ -> return $ Pending ()
- Missing () -> withWritableStore store $
- Missing <$> internalMarkPending store hash
-- | Atomically query the state under the given key and mark pending if missing.
-- Execute the given function to construct the item, mark as complete on success
-- and remove on failure. Forcibly removes if an uncaught exception occurs
-- during item construction.
withConstructIfMissing
- :: (MonadIO m, MonadMask m)
+ :: (MonadIO m, MonadBaseControl IO m, MonadMask m, Remote.Cacher m remoteCache)
=> ContentStore
+ -> remoteCache
-> ContentHash
-> (Path Abs Dir -> m (Either e a))
-> m (Status e () (Maybe a, Item))
-withConstructIfMissing store hash f =
+withConstructIfMissing store cacher hash f =
bracketOnError
- (constructIfMissing store hash)
+ (constructIfMissing store cacher hash)
(\case
Missing _ -> removeForcibly store hash
_ -> return ())
@@ -539,6 +552,7 @@ withConstructIfMissing store hash f =
return (Missing e)
Right x -> do
item <- markComplete store hash
+ _ <- Remote.push cacher (itemHash item) (Just hash) (itemPath store item)
return (Complete (Just x, item)))
-- | Mark a non-existent item as pending.
@@ -781,7 +795,7 @@ metadataPath = (</> [reldir|metadata|])
-- | Holds a lock on the global 'MVar' and on the global lock file
-- for the duration of the given action.
-withStoreLock :: ContentStore -> IO a -> IO a
+withStoreLock :: MonadBaseControl IO m => ContentStore -> m a -> m a
withStoreLock store = withLock (storeLock store)
prefixHashPath :: C8.ByteString -> ContentHash -> Path Rel Dir
@@ -895,25 +909,25 @@ internalWatchPending store hash = do
-- Stop watching when it arrives.
async $ takeMVar update <* stopWatching
-setRootDirWritable :: Path Abs Dir -> IO ()
-setRootDirWritable storeRoot =
+setRootDirWritable :: MonadIO m => Path Abs Dir -> m ()
+setRootDirWritable storeRoot = liftIO $
setFileMode (fromAbsDir storeRoot) writableRootDirMode
writableRootDirMode :: FileMode
writableRootDirMode = writableDirMode
-setRootDirReadOnly :: Path Abs Dir -> IO ()
-setRootDirReadOnly storeRoot =
+setRootDirReadOnly :: MonadIO m => Path Abs Dir -> m ()
+setRootDirReadOnly storeRoot = liftIO $
setFileMode (fromAbsDir storeRoot) readOnlyRootDirMode
readOnlyRootDirMode :: FileMode
readOnlyRootDirMode = writableDirMode `intersectFileModes` allButWritableMode
-withWritableStoreRoot :: Path Abs Dir -> IO a -> IO a
+withWritableStoreRoot :: (MonadMask m, MonadIO m) => Path Abs Dir -> m a -> m a
withWritableStoreRoot storeRoot =
bracket_ (setRootDirWritable storeRoot) (setRootDirReadOnly storeRoot)
-withWritableStore :: ContentStore -> IO a -> IO a
+withWritableStore :: (MonadMask m, MonadIO m) => ContentStore -> m a -> m a
withWritableStore ContentStore {storeRoot} =
withWritableStoreRoot storeRoot
diff --git a/src/Control/Funflow/Diagram.hs b/src/Control/Funflow/Diagram.hs
index 9581911..7305c9b 100644
--- a/src/Control/Funflow/Diagram.hs
+++ b/src/Control/Funflow/Diagram.hs
@@ -33,7 +33,7 @@ data Diagram ex a b where
Seq :: Diagram ex a b -> Diagram ex b c -> Diagram ex a c
Par :: Diagram ex a b -> Diagram ex c d -> Diagram ex (a,c) (b,d)
Fanin :: Diagram ex a c -> Diagram ex b c -> Diagram ex (Either a b) c
- Catch :: Diagram ex a b -> Diagram ex (a,ex) b -> Diagram ex a b
+ Try :: Diagram ex a b -> Diagram ex a (Either ex b)
instance Category (Diagram ex) where
id = Node emptyNodeProperties Proxy Proxy
@@ -51,7 +51,7 @@ instance ArrowChoice (Diagram ex) where
f ||| g = Fanin f g
instance ArrowError ex (Diagram ex) where
- f `catch` g = Catch f g
+ try = Try
-- | Construct a labelled node
node :: forall arr a b ex. Arrow arr => arr a b -> [T.Text] -> (Diagram ex) a b
diff --git a/src/Control/Funflow/Exec/Simple.hs b/src/Control/Funflow/Exec/Simple.hs
index 9e60966..1d810f7 100644
--- a/src/Control/Funflow/Exec/Simple.hs
+++ b/src/Control/Funflow/Exec/Simple.hs
@@ -21,7 +21,6 @@ module Control.Funflow.Exec.Simple
, withSimpleLocalRunner
) where
-import Control.Arrow (returnA)
import Control.Arrow.Async
import Control.Arrow.Free (type (~>), eval)
import Control.Concurrent.Async (withAsync)
@@ -37,12 +36,14 @@ import Control.Funflow.External
import Control.Funflow.External.Coordinator
import Control.Funflow.External.Coordinator.Memory
import Control.Funflow.External.Executor (executeLoop)
+import qualified Control.Funflow.RemoteCache as Remote
+import Control.Monad.Catch (MonadCatch,
+ MonadMask)
import Control.Monad.IO.Class
import Control.Monad.Trans.Control (MonadBaseControl)
-import Control.Monad.Catch (MonadCatch
- ,MonadMask)
import qualified Data.ByteString as BS
import Data.Foldable (traverse_)
+import Data.Maybe
import Data.Monoid ((<>))
import Data.Void
import Katip
@@ -50,20 +51,24 @@ import Path
import System.IO (stderr)
-- | Simple evaulation of a flow
-runFlowEx :: forall m c eff ex a b.
+runFlowEx :: forall m c eff ex a b remoteCache.
(Coordinator c, Exception ex, MonadIO m, MonadBaseControl IO m
- ,MonadCatch m, MonadMask m, KatipContext m)
+ ,MonadCatch m, MonadMask m, KatipContext m, Remote.Cacher m remoteCache)
=> c
-> Config c
-> CS.ContentStore
+ -> remoteCache
-> (eff ~> AsyncA m) -- ^ Natural transformation from wrapped effects
- -> Int -- ^ Flow configuration identity. This forms part of the caching
- -- system and is used to disambiguate the same flow run in
- -- multiple configurations.
+ -> Maybe Int -- ^ Flow configuration identity. This forms part of the
+ -- caching system and is used to disambiguate the same
+ -- flow run in multiple configurations. If Nothing,
+ -- then it means this flow has no identity, this
+ -- implies that steps will be executed without cache,
+ -- and external tasks will all be considered impure.
-> Flow eff ex a b
-> a
-> m b
-runFlowEx _ cfg store runWrapped confIdent flow input = do
+runFlowEx _ cfg store cacher runWrapped confIdent flow input = do
hook <- initialise cfg
runAsyncA (eval (runFlow' hook) flow) input
where
@@ -71,32 +76,27 @@ runFlowEx _ cfg store runWrapped confIdent flow input = do
$ CS.itemPath store item </> [relfile|out|]
withStoreCache :: forall i o. Cacher i o
-> AsyncA m i o -> AsyncA m i o
- withStoreCache NoCache f = f
- withStoreCache c f = let
- chashOf i = cacherKey c confIdent i
- checkStore = AsyncA $ \chash ->
- CS.constructOrWait store chash >>= \case
- CS.Pending void -> absurd void
- CS.Complete item -> do
- bs <- liftIO . BS.readFile $ simpleOutPath item
- return . Right . cacherReadValue c $ bs
- CS.Missing fp -> return $ Left fp
- writeStore = AsyncA $ \(chash, fp, res) ->
- do
- liftIO $ BS.writeFile (toFilePath $ fp </> [relfile|out|])
- . cacherStoreValue c $ res
- _ <- CS.markComplete store chash
- return res
- `onException`
- CS.removeFailed store chash
- in proc i -> do
- let chash = chashOf i
- mcontents <- checkStore -< chash
- case mcontents of
- Right contents -> returnA -< contents
- Left fp -> do
- res <- f -< i
- writeStore -< (chash, fp, res)
+ withStoreCache c@Cache{} (AsyncA f)
+ | Just confIdent' <- confIdent = AsyncA $ \i -> do
+ let chash = cacherKey c confIdent' i
+ computeAndStore fp = do
+ res <- f i -- Do the actual computation
+ liftIO $ BS.writeFile (toFilePath $ fp </> [relfile|out|])
+ . cacherStoreValue c $ res
+ return $ Right res
+ readItem item = do
+ bs <- liftIO . BS.readFile $ simpleOutPath item
+ return . cacherReadValue c $ bs
+ CS.withConstructIfMissing store cacher chash computeAndStore >>= \case
+ CS.Missing e -> absurd e
+ CS.Pending _ ->
+ liftIO (CS.waitUntilComplete store chash) >>= \case
+ Just item -> readItem item
+ Nothing -> throwM $ CS.FailedToConstruct chash
+ CS.Complete (Just a, _) -> return a
+ CS.Complete (_, item) -> readItem item
+ withStoreCache _ f = f
+
writeMd :: forall i o. ContentHash
-> i
-> o
@@ -113,13 +113,16 @@ runFlowEx _ cfg store runWrapped confIdent flow input = do
. AsyncA $ \x -> do
let out = f x
case cache props of
- NoCache -> return ()
- Cache key _ _ -> writeMd (key confIdent x) x out $ mdpolicy props
+ Cache key _ _ | Just confIdent' <- confIdent ->
+ writeMd (key confIdent' x) x out $ mdpolicy props
+ _ -> return ()
return out
runFlow' _ (StepIO props f) = withStoreCache (cache props)
. AsyncA $ liftIO . f
runFlow' po (External props toTask) = AsyncA $ \x -> do
- chash <- liftIO $ case (ep_impure props) of
+ let purity | Just _ <- confIdent = ep_impure props
+ | otherwise = alwaysRecompile
+ chash <- liftIO $ case purity of
EpPure -> contentHash (toTask x)
EpImpure fn -> do
salt <- fn
@@ -149,7 +152,9 @@ runFlowEx _ cfg store runWrapped confIdent flow input = do
submitTask po td
wait chash td
wait chash td = do
- KnownTask _ <- awaitTask po chash
+ awaitTask po chash >>= \case
+ KnownTask _ -> pure ()
+ _ -> error "[Control.Funflow.Exec.Simple.runFlowEx] Expected KnownTask."
CS.waitUntilComplete store chash >>= \case
Just item -> return item
Nothing -> do
@@ -159,13 +164,15 @@ runFlowEx _ cfg store runWrapped confIdent flow input = do
throwM $ ExternalTaskFailed td ti mbStdout mbStderr
runFlow' _ (PutInStore f) = AsyncA $ \x -> katipAddNamespace "putInStore" $ do
chash <- liftIO $ contentHash x
- CS.constructOrWait store chash >>= \case
+ CS.constructOrWait store cacher chash >>= \case
CS.Pending void -> absurd void
CS.Complete item -> return item
CS.Missing fp ->
do
liftIO $ f fp x
- CS.markComplete store chash
+ finalItem <- CS.markComplete store chash
+ _ <- Remote.push cacher (CS.itemHash finalItem) (Just chash) (CS.itemPath store finalItem)
+ pure finalItem
`onException`
(do $(logTM) WarningS . ls $ "Exception in construction: removing " <> show chash
CS.removeFailed store chash
@@ -178,40 +185,42 @@ runFlowEx _ cfg store runWrapped confIdent flow input = do
$ runWrapped w
-- | Run a flow in a logging context.
-runFlowLog :: forall m c eff ex a b.
+runFlowLog :: forall m c eff ex a b remoteCache.
(Coordinator c, Exception ex, MonadIO m, MonadBaseControl IO m
- ,MonadCatch m, MonadMask m, KatipContext m)
+ ,MonadCatch m, MonadMask m, KatipContext m, Remote.Cacher m remoteCache)
=> c
-> Config c
-> CS.ContentStore
+ -> remoteCache
-> (eff ~> AsyncA m) -- ^ Natural transformation from wrapped effects
- -> Int -- ^ Flow configuration identity. This forms part of the caching
+ -> Maybe Int -- ^ Flow configuration identity. This forms part of the caching
-- system and is used to disambiguate the same flow run in
-- multiple configurations.
-> Flow eff ex a b
-> a
-> m (Either ex b)
-runFlowLog c cfg store runWrapped confIdent flow input =
- try $ runFlowEx c cfg store runWrapped confIdent flow input
+runFlowLog c cfg store cacher runWrapped confIdent flow input =
+ try $ runFlowEx c cfg store cacher runWrapped confIdent flow input
-- | Run a flow, discarding all logging.
-runFlow :: forall m c eff ex a b.
+runFlow :: forall m c eff ex a b remoteCache.
(Coordinator c, Exception ex, MonadIO m, MonadBaseControl IO m
- ,MonadCatch m, MonadMask m)
+ ,MonadCatch m, MonadMask m, Remote.Cacher (KatipContextT m) remoteCache)
=> c
-> Config c
-> CS.ContentStore
+ -> remoteCache
-> (eff ~> AsyncA m) -- ^ Natural transformation from wrapped effects
- -> Int -- ^ Flow configuration identity. This forms part of the caching
+ -> Maybe Int -- ^ Flow configuration identity. This forms part of the caching
-- system and is used to disambiguate the same flow run in
-- multiple configurations.
-> Flow eff ex a b
-> a
-> m (Either ex b)
-runFlow c cfg store runWrapped confIdent flow input = do
+runFlow c cfg store cacher runWrapped confIdent flow input = do
le <- liftIO $ initLogEnv "funflow" "production"
runKatipContextT le () "runFlow"
- $ runFlowLog c cfg store (liftAsyncA . runWrapped) confIdent flow input
+ $ runFlowLog c cfg store cacher (liftAsyncA . runWrapped) confIdent flow input
-- | Run a simple flow. Logging will be sent to stderr
runSimpleFlow :: forall m c a b.
@@ -224,7 +233,7 @@ runSimpleFlow :: forall m c a b.
-> a
-> m (Either SomeException b)
runSimpleFlow c ccfg store flow input = do
- handleScribe <- liftIO $ mkHandleScribe ColorIfTerminal stderr InfoS V2
+ handleScribe <- liftIO $ mkHandleScribe ColorIfTerminal stderr (permitItem InfoS) V2
let mkLogEnv = liftIO $
registerScribe "stderr" handleScribe defaultScribeSettings =<< initLogEnv "funflow" "production"
bracket mkLogEnv (liftIO . closeScribes) $ \le -> do
@@ -232,7 +241,7 @@ runSimpleFlow c ccfg store flow input = do
initialNamespace = "executeLoop"
runKatipContextT le initialContext initialNamespace
- $ runFlowLog c ccfg store runNoEffect 12345 flow input
+ $ runFlowLog c ccfg store Remote.NoCache runNoEffect (Just 12345) flow input
-- | Create a full pipeline runner locally. This includes an executor for
-- executing external tasks.
diff --git a/src/Control/Funflow/External/Executor.hs b/src/Control/Funflow/External/Executor.hs
index c7c9a5c..92c2c07 100644
--- a/src/Control/Funflow/External/Executor.hs
+++ b/src/Control/Funflow/External/Executor.hs
@@ -19,6 +19,7 @@ import Control.Exception.Safe
import qualified Control.Funflow.ContentStore as CS
import Control.Funflow.External
import Control.Funflow.External.Coordinator
+import qualified Control.Funflow.RemoteCache as Remote
import Control.Lens
import Control.Monad (forever, mzero, unless)
import Control.Monad.Trans (lift)
@@ -29,6 +30,7 @@ import Data.Foldable (for_)
import Data.Maybe (isJust, isNothing)
import Data.Monoid ((<>))
import qualified Data.Text as T
+import GHC.IO.Handle (hClose)
import Katip as K
import Network.HostName
import Path
@@ -40,7 +42,6 @@ import System.IO (Handle, IOMode (..),
import System.Posix.Env (getEnv)
import System.Posix.User
import System.Process
-import GHC.IO.Handle (hClose)
data ExecutionResult =
-- | The result already exists in the store and there is no need
@@ -63,7 +64,8 @@ data ExecutionResult =
-- | Execute an individual task.
execute :: CS.ContentStore -> TaskDescription -> KatipContextT IO ExecutionResult
execute store td = logError $ do
- status <- CS.withConstructIfMissing store (td ^. tdOutput) $ \fp -> do
+ -- We should pass a Remote cacher down to it:
+ status <- CS.withConstructIfMissing store Remote.NoCache (td ^. tdOutput) $ \fp -> do
(fpOut, hOut) <- lift $
CS.createMetadataFile store (td ^. tdOutput) [relfile|stdout|]
(fpErr, hErr) <- lift $
@@ -162,7 +164,7 @@ executeLoop :: forall c. Coordinator c
-> IO ()
executeLoop coord cfg store =
executeLoopWithScribe coord cfg store =<<
- mkHandleScribe ColorIfTerminal stdout InfoS V2
+ mkHandleScribe ColorIfTerminal stdout (permitItem InfoS) V2
-- | Same as 'executeLoop', but allows specifying a custom 'Scribe' for logging
executeLoopWithScribe :: forall c. Coordinator c
diff --git a/src/Control/Funflow/Lock.hs b/src/Control/Funflow/Lock.hs
index ea6f0fa..ca24d26 100644
--- a/src/Control/Funflow/Lock.hs
+++ b/src/Control/Funflow/Lock.hs
@@ -1,4 +1,5 @@
-{-# LANGUAGE QuasiQuotes #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE ScopedTypeVariables #-}
-- | Thread and process write lock.
@@ -17,8 +18,9 @@ module Control.Funflow.Lock
import Control.Concurrent
import Control.Exception.Safe
-import Control.Monad (unless)
-import Network.HostName (getHostName)
+import Control.Monad (unless)
+import Control.Monad.Trans.Control (MonadBaseControl, liftBaseOp_)
+import Network.HostName (getHostName)
import Path
import Path.IO
import System.Posix.Files
@@ -58,8 +60,8 @@ closeLock lock = do
takeMVar (lockMVar lock)
-- | Acquire the lock for the duration of the given action and release after.
-withLock :: Lock -> IO a -> IO a
-withLock lock action =
+withLock :: MonadBaseControl IO m => Lock -> m a -> m a
+withLock lock = liftBaseOp_ $ \action ->
withMVar (lockMVar lock) $ \() ->
bracket_ (acquireDirLock $ lockDir lock) (releaseDirLock $ lockDir lock) $
action
diff --git a/src/Control/Funflow/Pretty.hs b/src/Control/Funflow/Pretty.hs
index 2f7dea1..ebeb860 100644
--- a/src/Control/Funflow/Pretty.hs
+++ b/src/Control/Funflow/Pretty.hs
@@ -18,7 +18,7 @@ ppFlow = ppDiagram . toDiagram where
ppDiagram (Seq f g) = parens $ ppDiagram f <+> text ">>>" <+> ppDiagram g
ppDiagram (Par f g) = parens $ ppDiagram f <+> text "***" <+> ppDiagram g
ppDiagram (Fanin f g) = parens $ ppDiagram f <+> text "|||" <+> ppDiagram g
- ppDiagram (Catch f g) = parens $ ppDiagram f <+> text "catch" <+> ppDiagram g
+ ppDiagram (Try f) = parens $ text "try" <+> ppDiagram f
showFlow :: Flow eff ex a b -> String
showFlow = render . ppFlow
diff --git a/src/Control/Funflow/RemoteCache.hs b/src/Control/Funflow/RemoteCache.hs
new file mode 100644
index 0000000..803c466
--- /dev/null
+++ b/src/Control/Funflow/RemoteCache.hs
@@ -0,0 +1,134 @@
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE LambdaCase #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+
+-- |
+-- This module defines the remote caching mechanism of funflow which is used to
+-- keep several funflow stores (possibly on different machines) in sync.
+module Control.Funflow.RemoteCache
+ ( Cacher(..)
+ , PullResult(..), PushResult(..), AliasResult(..)
+ , NoCache(..), memoryCache
+ , pullAsArchive, pushAsArchive
+ ) where
+
+import qualified Codec.Archive.Tar as Tar
+import Control.Concurrent.MVar
+import Control.Funflow.ContentHashable
+import Control.Monad.IO.Class (MonadIO, liftIO)
+import Data.ByteString.Lazy (ByteString)
+import Data.Map.Strict (Map)
+import qualified Data.Map.Strict as Map
+import Path
+
+
+-- |
+-- The result of a tentative pull from the remote cache
+data PullResult a
+ = PullOK a
+ | NotInCache
+ | PullError String
+ deriving (Eq, Ord, Show)
+
+-- |
+-- The result of a tentative push to the remote cache
+data PushResult
+ = PushOK
+ | PushError String
+ deriving (Eq, Ord, Show)
+
+data AliasResult
+ = AliasOK
+ | TargetNotInCache
+ | AliasError String
+
+-- |
+-- A simple mechanism for remote-caching.
+--
+-- Provides a way to push a path to the cache and pull it back.
+--
+-- No assumption is made on the availability of a store path. In particular,
+-- pushing a path to the cache doesn't mean that we can pull it back.
+class Monad m => Cacher m a where
+ push ::
+ a
+ -> ContentHash -- ^ "Primary" key: hash of the content
+ -> Maybe ContentHash -- ^ "Secondary" key: hash of the dependencies
+ -> Path Abs Dir -- ^ Path to the content
+ -> m PushResult
+ pull :: a -> ContentHash -> Path Abs Dir -> m (PullResult ())
+
+-- |
+-- Push the path as an archive to the remote cache
+pushAsArchive ::
+ MonadIO m
+ => (ContentHash -> ContentHash -> m (Either String ())) -- ^ How to create the aliases
+ -> (ContentHash -> ByteString -> m PushResult) -- ^ How to push the content
+ -> ContentHash -- ^ Primary key
+ -> Maybe ContentHash -- ^ Secondary key
+ -> Path Abs Dir
+ -> m PushResult
+pushAsArchive alias pushArchive primaryKey mSecondaryKey path = do
+ archive <- liftIO $ Tar.write <$> Tar.pack (toFilePath path) ["."]
+ pushArchive primaryKey archive >>= \case
+ PushError e -> pure $ PushError e
+ res ->
+ case mSecondaryKey of
+ Just secondaryKey ->
+ alias primaryKey secondaryKey >>= \case
+ Left err -> pure $ PushError err
+ Right () -> pure res
+ Nothing -> pure res
+
+pullAsArchive ::
+ MonadIO m
+ => (ContentHash -> m (PullResult ByteString))
+ -> ContentHash
+ -> Path Abs Dir
+ -> m (PullResult ())
+pullAsArchive pullArchive hash path =
+ pullArchive hash >>= \case
+ PullOK archive -> do
+ liftIO $ Tar.unpack (toFilePath path) $ Tar.read archive
+ pure $ PullOK ()
+ NotInCache -> pure NotInCache
+ PullError e -> pure $ PullError e
+
+-- |
+-- A dummy remote cache implementation which does nothing
+data NoCache = NoCache
+
+instance Monad m => Cacher m NoCache where
+ pull _ _ _ = pure NotInCache
+ push _ _ _ _ = pure PushOK
+
+-- |
+-- An in-memory cache, for testing purposes
+data MemoryCache = MemoryCache (MVar (Map ContentHash ByteString))
+instance MonadIO m => Cacher m MemoryCache where
+ pull (MemoryCache cacheVar) = pullAsArchive $ \hash -> do
+ cacheMap <- liftIO $ readMVar cacheVar
+ case Map.lookup hash cacheMap of
+ Nothing -> pure NotInCache
+ Just x -> pure (PullOK x)
+ push (MemoryCache cacheVar) = pushAsArchive alias $ \hash content -> do
+ liftIO $ modifyMVar_
+ cacheVar
+ (\cacheMap -> pure $ Map.insert hash content cacheMap)
+ pure PushOK
+ where
+ alias from to = liftIO $ Right <$> modifyMVar_ cacheVar
+ (\cacheMap -> pure $ Map.insert to (cacheMap Map.! from) cacheMap)
+
+memoryCache :: MonadIO m => m MemoryCache
+memoryCache = liftIO $ MemoryCache <$> newMVar mempty
+
+-- |
+-- If 'a' is a 'Cacher' then 'Maybe a' is a cacher such that 'Just x' behavies
+-- like 'x' and 'Nothing' doesn't cache anything
+instance Cacher m a => Cacher m (Maybe a) where
+ pull (Just x) = pull x
+ pull Nothing = pull NoCache
+
+ push (Just x) = push x
+ push Nothing = push NoCache
diff --git a/src/Control/Funflow/Steps.hs b/src/Control/Funflow/Steps.hs
index 1789574..0281efa 100644
--- a/src/Control/Funflow/Steps.hs
+++ b/src/Control/Funflow/Steps.hs
@@ -36,6 +36,7 @@ module Control.Funflow.Steps
, promptFor
, printS
, failStep
+ , cachedFailStep
, worstBernoulli
, pauseWith
, melancholicLazarus
@@ -45,7 +46,8 @@ where
import Control.Arrow
import Control.Arrow.Free (catch)
import Control.Exception.Safe (Exception, throwM)
-import Control.Funflow.Base (SimpleFlow)
+import Control.Funflow.Base (SimpleFlow, cache,
+ defaultCacherWithIdent)
import Control.Funflow.Class
import Control.Funflow.ContentHashable (ContentHashable,
DirectoryContent (..),
@@ -53,6 +55,7 @@ import Control.Funflow.ContentHashable (ContentHashable,
import Control.Funflow.ContentStore (Content ((:</>)))
import qualified Control.Funflow.ContentStore as CS
import qualified Control.Funflow.External.Docker as Docker
+import Data.Default (def)
import Data.Foldable (for_)
import Data.Store
import Data.Traversable (for)
@@ -78,6 +81,11 @@ printS = stepIO $ \s-> print s
failStep :: ArrowFlow eff ex arr => arr () ()
failStep = stepIO $ \_ -> fail "failStep"
+cachedFailStep :: ArrowFlow eff ex arr => arr () ()
+cachedFailStep = stepIO'
+ (def { cache = defaultCacherWithIdent 1235314531893843918})
+ (\_ -> fail "cachedFailStep")
+
worstBernoulli :: (Exception ex, ArrowFlow eff ex arr) => (String -> ex) -> arr Double Double
worstBernoulli errorC = stepIO $ \p -> do
r <- randomRIO (0,1)
@@ -107,7 +115,7 @@ melancholicLazarus = stepIO $ \s -> do
-- | `retry n s f` reruns `f` on failure at most n times with a delay of `s`
-- seconds between retries
-retry :: forall arr eff ex a b. (Exception ex, Store a, ArrowFlow eff ex arr)
+retry :: forall arr eff ex a b. (Exception ex, Store a, ArrowFlow eff ex arr, ArrowChoice arr)
=> Int -> Int -> arr a b -> arr a b
retry 0 _ f = f
retry n secs f = catch f $ proc (x, _ :: ex) -> do
@@ -140,7 +148,8 @@ copyFileToStore = putInStoreAt $ \p (FileContent inFP) -> copyFile inFP p
--
-- | @copyDirToStore (dIn, Just dOut)@ copies the contents of @dIn@ into the store
-- under relative path @dOut@ within the subtree
-copyDirToStore :: ArrowFlow eff ex arr => arr (DirectoryContent, Maybe (Path Rel Dir)) (CS.Content Dir)
+copyDirToStore :: (ArrowChoice arr, ArrowFlow eff ex arr)
+ => arr (DirectoryContent, Maybe (Path Rel Dir)) (CS.Content Dir)
copyDirToStore = proc (inDir, mbOutDir) ->
case mbOutDir of
Nothing -> do
diff --git a/test/Funflow/ContentStore.hs b/test/Funflow/ContentStore.hs
index 64b284e..bdd8f9e 100644
--- a/test/Funflow/ContentStore.hs
+++ b/test/Funflow/ContentStore.hs
@@ -13,6 +13,7 @@ import Control.Exception.Safe (tryAny)
import Control.Funflow.ContentHashable (contentHash)
import Control.Funflow.ContentStore (ContentStore)
import qualified Control.Funflow.ContentStore as ContentStore
+import qualified Control.Funflow.RemoteCache as Remote
import Control.Monad (void)
import Data.Maybe (catMaybes)
import qualified Data.Set as Set
@@ -91,10 +92,11 @@ tests = testGroup "Content Store"
content @?= expectedContent
, testCase "await construction" $
+ Remote.memoryCache >>= \myCache ->
withEmptyStore $ \store -> do
hash <- contentHash ("test" :: String)
- ContentStore.constructOrAsync store hash >>= \case
+ ContentStore.constructOrAsync store myCache hash >>= \case
ContentStore.Pending _ ->
assertFailure "missing already under construction"
ContentStore.Complete _ ->
@@ -102,7 +104,7 @@ tests = testGroup "Content Store"
ContentStore.Missing _ ->
return ()
- a <- ContentStore.constructOrAsync store hash >>= \case
+ a <- ContentStore.constructOrAsync store Remote.NoCache hash >>= \case
ContentStore.Missing _ -> do
assertFailure "under construction still missing"
undefined
@@ -130,7 +132,7 @@ tests = testGroup "Content Store"
item'' <- wait b
item'' @?= ContentStore.Completed item
- ContentStore.constructOrAsync store hash >>= \case
+ ContentStore.constructOrAsync store Remote.NoCache hash >>= \case
ContentStore.Missing _ -> do
assertFailure "complete still missing"
ContentStore.Pending _ -> do
@@ -142,7 +144,7 @@ tests = testGroup "Content Store"
withEmptyStore $ \store -> do
hash <- contentHash ("test" :: String)
- ContentStore.constructOrAsync store hash >>= \case
+ ContentStore.constructOrAsync store Remote.NoCache hash >>= \case
ContentStore.Pending _ ->
assertFailure "missing already under construction"
ContentStore.Complete _ ->
@@ -150,7 +152,7 @@ tests = testGroup "Content Store"
ContentStore.Missing _ ->
return ()
- a <- ContentStore.constructOrAsync store hash >>= \case
+ a <- ContentStore.constructOrAsync store Remote.NoCache hash >>= \case
ContentStore.Missing _ -> do
assertFailure "under construction still missing"
undefined
@@ -178,7 +180,7 @@ tests = testGroup "Content Store"
item'' <- wait b
item'' @?= ContentStore.Failed
- ContentStore.constructOrAsync store hash >>= \case
+ ContentStore.constructOrAsync store Remote.NoCache hash >>= \case
ContentStore.Pending _ -> do
assertFailure "failed still under construction"
ContentStore.Complete _ -> do
@@ -192,7 +194,7 @@ tests = testGroup "Content Store"
let file = [relfile|file|]
expectedContent = "Hello World"
- ContentStore.constructIfMissing store hash >>= \case
+ ContentStore.constructIfMissing store Remote.NoCache hash >>= \case
ContentStore.Pending () ->
assertFailure "missing already under construction"
ContentStore.Complete _ ->
@@ -202,7 +204,7 @@ tests = testGroup "Content Store"
@? "under construction not writable"
writeFile (fromAbsFile $ subtree </> file) expectedContent
- ContentStore.constructIfMissing store hash >>= \case
+ ContentStore.constructIfMissing store Remote.NoCache hash >>= \case
ContentStore.Missing _ ->
assertFailure "under construction still missing"
ContentStore.Complete _ ->
@@ -210,7 +212,7 @@ tests = testGroup "Content Store"
ContentStore.Pending () ->
void $ ContentStore.markComplete store hash
- ContentStore.constructIfMissing store hash >>= \case
+ ContentStore.constructIfMissing store Remote.NoCache hash >>= \case
ContentStore.Missing _ ->
assertFailure "complete still missing"
ContentStore.Pending () ->
@@ -384,6 +386,62 @@ tests = testGroup "Content Store"
r <- ContentStore.lookupAlias store aliasB
r @?= Nothing
+ , testCase "Remote caching (constructOrAsync)" $ do
+ cacher <- Remote.memoryCache
+ hash <- contentHash ("test" :: String)
+ let file = [relfile|file|]
+ expectedContent = "Hello World"
+
+ -- Populate the remote cache
+ withEmptyStore $ \store -> do
+ ContentStore.constructOrAsync store cacher hash >>= \case
+ ContentStore.Pending _ ->
+ assertFailure "missing already under construction"
+ ContentStore.Complete _ ->
+ assertFailure "missing already complete"
+ ContentStore.Missing subtree -> do
+ isWritable subtree @? "under construction not writable"
+ writeFile (fromAbsFile $ subtree </> file) expectedContent
+ Remote.push cacher hash Nothing subtree
+
+ -- Expects having the item in cache
+ withEmptyStore $ \store -> do
+ ContentStore.constructOrAsync store cacher hash >>= \case
+ ContentStore.Pending _ ->
+ assertFailure "missing already under construction"
+ ContentStore.Complete _ -> pure ()
+ ContentStore.Missing _ -> assertFailure "Not found in the cache"
+
+ , testCase "Remote caching (withConstructIfMissing)" $ do
+ cacher <- Remote.memoryCache
+ hash <- contentHash ("test" :: String)
+ let file = [relfile|file|]
+ expectedContent = "Hello World"
+ doWrite subtree = do
+ isWritable subtree @? "under construction not writable"
+ writeFile (fromAbsFile $ subtree </> file) expectedContent
+ return $ Right ()
+
+ -- Populates the remote cache
+ withEmptyStore $ \store -> do
+ ContentStore.withConstructIfMissing store cacher hash doWrite >>= \case
+ ContentStore.Missing _ -> assertFailure "not found in the cache"
+ ContentStore.Pending _ ->
+ assertFailure "missing already under construction"
+ -- ContentStore.waitUntilComplete store hash >>= \case
+ -- Just item -> return ()
+ -- Nothing -> assertFailure "item construction failed"
+ ContentStore.Complete _ -> pure ()
+
+ -- Expects having the item in the remote cache
+ withEmptyStore $ \store -> do
+ ContentStore.withConstructIfMissing store cacher hash
+ (const $ assertFailure "should not try to write the file a second time") >>= \case
+ ContentStore.Missing _ -> assertFailure "Not found in the cache"
+ ContentStore.Pending _ ->
+ assertFailure "missing already under construction"
+ ContentStore.Complete _ -> pure ()
+
]
shouldFail :: IO a -> String -> IO ()