diff options
-rw-r--r-- | TestFunflow.hs | 48 | ||||
-rw-r--r-- | app/FFExecutorD.hs | 3 | ||||
-rw-r--r-- | funflow.cabal | 15 | ||||
-rw-r--r-- | src/Control/Arrow/AppArrow.hs | 39 | ||||
-rw-r--r-- | src/Control/Arrow/Async.hs | 3 | ||||
-rw-r--r-- | src/Control/Arrow/Free.hs | 36 | ||||
-rw-r--r-- | src/Control/Funflow/Class.hs | 16 | ||||
-rw-r--r-- | src/Control/Funflow/ContentHashable.hs | 5 | ||||
-rw-r--r-- | src/Control/Funflow/ContentStore.hs | 64 | ||||
-rw-r--r-- | src/Control/Funflow/Diagram.hs | 4 | ||||
-rw-r--r-- | src/Control/Funflow/Exec/Simple.hs | 115 | ||||
-rw-r--r-- | src/Control/Funflow/External/Executor.hs | 8 | ||||
-rw-r--r-- | src/Control/Funflow/Lock.hs | 12 | ||||
-rw-r--r-- | src/Control/Funflow/Pretty.hs | 2 | ||||
-rw-r--r-- | src/Control/Funflow/RemoteCache.hs | 134 | ||||
-rw-r--r-- | src/Control/Funflow/Steps.hs | 15 | ||||
-rw-r--r-- | test/Funflow/ContentStore.hs | 76 |
17 files changed, 461 insertions, 134 deletions
diff --git a/TestFunflow.hs b/TestFunflow.hs index 93828dc..09a8373 100644 --- a/TestFunflow.hs +++ b/TestFunflow.hs @@ -1,6 +1,7 @@ -{-# LANGUAGE Arrows #-} -{-# LANGUAGE GADTs #-} -{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE Arrows #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE ScopedTypeVariables #-} import Control.Arrow import Control.Arrow.Free @@ -9,7 +10,9 @@ import Control.Funflow import qualified Control.Funflow.ContentStore as CS import Control.Funflow.External.Coordinator.Memory import Control.Funflow.Pretty +import Data.Default import Data.Monoid ((<>)) +import Path import Path.IO mkError :: String -> SomeException @@ -32,22 +35,33 @@ flow2caught = retry 100 0 flow2 flow3 :: SimpleFlow [Int] [Int] flow3 = mapA (arr (+1)) +runFailingFlow :: Path Abs Dir -> IO () +runFailingFlow storeDir = withSimpleLocalRunner storeDir $ \runner -> do + r <- runner cachedFailStep () + print r + +testFailingFlow :: IO () +testFailingFlow = + withSystemTempDir "test_output" $ \storeDir -> + runFailingFlow storeDir >> runFailingFlow storeDir + main :: IO () -main = +main = do + testFailingFlow withSystemTempDir "test_output" $ \storeDir -> - CS.withStore storeDir $ \store -> do - memHook <- createMemoryCoordinator - res <- runSimpleFlow MemoryCoordinator memHook store flow2 () - print res - res' <- runSimpleFlow MemoryCoordinator memHook store flow2caught () - print res' - putStrLn $ showFlow myFlow - putStrLn $ showFlow flow2 - res1 <- runSimpleFlow MemoryCoordinator memHook store flow3 [1..10] - print res1 --- main = redisTest - externalTest - storeTest + CS.withStore storeDir $ \store -> do + memHook <- createMemoryCoordinator + res <- runSimpleFlow MemoryCoordinator memHook store flow2 () + print res + res' <- runSimpleFlow MemoryCoordinator memHook store flow2caught () + print res' + putStrLn $ showFlow myFlow + putStrLn $ showFlow flow2 + res1 <- runSimpleFlow MemoryCoordinator memHook store flow3 [1..10] + print res1 + -- main = redisTest + externalTest + storeTest externalTest :: IO () externalTest = let diff --git a/app/FFExecutorD.hs b/app/FFExecutorD.hs index cf445fb..70ae7cd 100644 --- a/app/FFExecutorD.hs +++ b/app/FFExecutorD.hs @@ -19,6 +19,7 @@ import Control.Funflow.External.Executor import Control.Monad (void) import Data.Monoid ((<>)) import qualified Database.Redis as R +import Network.Socket (PortNumber) import qualified Options.Applicative as Opt import Path import System.Clock @@ -64,7 +65,7 @@ argsParser = Opt.subparser <> Opt.help "Password for the Redis instance, if needed." )) useRedis store host port pw = Args store $ UseRedis R.defaultConnectInfo { R.connectHost = host - , R.connectPort = R.PortNumber port + , R.connectPort = R.PortNumber (port :: PortNumber) , R.connectAuth = pw } sqliteParser = useSQLite diff --git a/funflow.cabal b/funflow.cabal index a1bf17f..31d5013 100644 --- a/funflow.cabal +++ b/funflow.cabal @@ -1,5 +1,5 @@ Name: funflow -Version: 1.4.0 +Version: 1.5.0 Synopsis: Workflows with arrows Description: An arrow with resumable computations and logging @@ -24,7 +24,8 @@ Library default-language: Haskell2010 Exposed-modules: - Control.Arrow.Async + Control.Arrow.AppArrow + , Control.Arrow.Async , Control.Arrow.Free , Control.Funflow , Control.Funflow.Cache.TH @@ -40,6 +41,7 @@ Library , Control.Funflow.External.Coordinator.SQLite , Control.Funflow.Lock , Control.Funflow.Orphans + , Control.Funflow.RemoteCache , Control.Funflow.Steps , Control.Funflow.Pretty , Control.Funflow.Exec.Simple @@ -67,7 +69,7 @@ Library , hedis , hostname , integer-gmp - , katip >= 0.5.0.1 + , katip >= 0.8.0.0 , lens , lifted-async , memory @@ -83,6 +85,7 @@ Library , sqlite-simple , stm , store + , tar , template-haskell >= 2.11 , text , time @@ -108,7 +111,8 @@ Executable ffexecutord , bytestring , clock , funflow - , hedis + , hedis >= 0.12.5 + , network , path , text , unix @@ -122,9 +126,10 @@ Test-suite test-funflow main-is: TestFunflow.hs ghc-options: -Wall -threaded build-depends: base >=4.6 && <5 + , data-default , funflow , filepath - , hedis + , hedis >= 0.12.5 , path , path-io , text diff --git a/src/Control/Arrow/AppArrow.hs b/src/Control/Arrow/AppArrow.hs new file mode 100644 index 0000000..fe88236 --- /dev/null +++ b/src/Control/Arrow/AppArrow.hs @@ -0,0 +1,39 @@ +-- | This modules defines the composition of an applicative functor and an +-- arrow, which is always an arrow. + +module Control.Arrow.AppArrow + ( AppArrow(..) + , appArrow + ) where + +import Control.Category +import Control.Arrow +import Prelude hiding (id, (.)) + +newtype AppArrow app arr a b = AppArrow { unAppArrow :: app (arr a b) } + +instance (Applicative app, Category cat) => Category (AppArrow app cat) where + id = appArrow id + AppArrow a1 . AppArrow a2 = AppArrow $ (.) <$> a1 <*> a2 + +instance (Applicative app, Arrow arr) => Arrow (AppArrow app arr) where + arr = appArrow . arr + first (AppArrow a) = AppArrow $ first <$> a + second (AppArrow a) = AppArrow $ second <$> a + AppArrow a1 *** AppArrow a2 = AppArrow $ (***) <$> a1 <*> a2 + +instance (Applicative app, ArrowChoice arr) => ArrowChoice (AppArrow app arr) where + left (AppArrow a) = AppArrow $ left <$> a + right (AppArrow a) = AppArrow $ right <$> a + AppArrow a1 +++ AppArrow a2 = AppArrow $ (+++) <$> a1 <*> a2 + AppArrow a1 ||| AppArrow a2 = AppArrow $ (|||) <$> a1 <*> a2 + +instance (Applicative f, Arrow arr) => Functor (AppArrow f arr t) where + fmap f = (>>> arr f) + +instance (Applicative app, Arrow arr) => Applicative (AppArrow app arr t) where + pure = arr . const + a <*> b = a &&& b >>> arr (uncurry ($)) + +appArrow :: (Applicative app) => arr a b -> AppArrow app arr a b +appArrow = AppArrow . pure diff --git a/src/Control/Arrow/Async.hs b/src/Control/Arrow/Async.hs index 360668b..4a27473 100644 --- a/src/Control/Arrow/Async.hs +++ b/src/Control/Arrow/Async.hs @@ -40,8 +40,7 @@ instance MonadBaseControl IO m => ArrowChoice (AsyncA m) where instance (Exception ex, MonadBaseControl IO m, MonadCatch m) => ArrowError ex (AsyncA m) where - AsyncA arr1 `catch` AsyncA arr2 = AsyncA $ \x -> - arr1 x `Control.Exception.Safe.catch` curry arr2 x + try (AsyncA a) = AsyncA $ Control.Exception.Safe.try . a -- | Lift an AsyncA through a monad transformer of the underlying monad. liftAsyncA :: (MonadTrans t, Monad m) diff --git a/src/Control/Arrow/Free.hs b/src/Control/Arrow/Free.hs index 7bae9f2..149ab13 100644 --- a/src/Control/Arrow/Free.hs +++ b/src/Control/Arrow/Free.hs @@ -32,6 +32,7 @@ module Control.Arrow.Free , eval -- * ArrowError , ArrowError(..) + , catch -- * Arrow functions , mapA , mapSeqA @@ -40,9 +41,13 @@ module Control.Arrow.Free ) where import Control.Arrow +import Control.Arrow.AppArrow import Control.Category import Control.Exception.Safe (Exception, MonadCatch) import qualified Control.Exception.Safe +import Control.Monad.Trans.Reader +import Control.Monad.Trans.Writer +import qualified Control.Monad.Trans.Writer.Strict as SW import Data.Bool (Bool) import Data.Constraint (Constraint, Dict (..), mapDict, weaken1, weaken2) @@ -50,7 +55,8 @@ import Data.Either (Either (..)) import Data.Function (const, flip, ($)) import Data.List (uncons) import Data.Maybe (maybe) -import Data.Tuple (curry, uncurry) +import Data.Monoid (Monoid) +import Data.Tuple (uncurry) -- | A natural transformation on type constructors of two arguments. type x ~> y = forall a b. x a b -> y a b @@ -159,12 +165,32 @@ instance FreeArrowLike Choice where -- | ArrowError represents those arrows which can catch exceptions within the -- processing of the flow. class ArrowError ex a where - catch :: a e c -> a (e, ex) c -> a e c + try :: a e c -> a e (Either ex c) + +instance (ArrowError ex arr) => ArrowError ex (AppArrow (Reader r) arr) where + try (AppArrow act) = AppArrow $ reader $ \r -> + try $ runReader act r + +instance (ArrowError ex arr, Monoid w) => ArrowError ex (AppArrow (Writer w) arr) where + try (AppArrow act) = AppArrow $ writer (try a, w) + where (a, w) = runWriter act + +instance (ArrowError ex arr, Monoid w) => ArrowError ex (AppArrow (SW.Writer w) arr) where + try (AppArrow act) = AppArrow $ SW.writer (try a, w) + where (a, w) = SW.runWriter act + +catch :: (ArrowError ex a, ArrowChoice a) => a e c -> a (e, ex) c -> a e c +catch a onExc = proc e -> do + res <- try a -< e + case res of + Left ex -> + onExc -< (e, ex) + Right r -> + returnA -< r instance (Arrow (Kleisli m), Exception ex, MonadCatch m) => ArrowError ex (Kleisli m) where - Kleisli arr1 `catch` Kleisli arr2 = Kleisli $ \x -> - arr1 x `Control.Exception.Safe.catch` curry arr2 x + try (Kleisli a) = Kleisli $ Control.Exception.Safe.try . a -- | Freely generated arrows with both choice and error handling. newtype ErrorChoice ex eff a b = ErrorChoice { @@ -188,7 +214,7 @@ instance ArrowChoice (ErrorChoice ex eff) where (ErrorChoice a) ||| (ErrorChoice b) = ErrorChoice $ \f -> a f ||| b f instance ArrowError ex (ErrorChoice ex eff) where - (ErrorChoice a) `catch` (ErrorChoice h) = ErrorChoice $ \f -> a f `catch` h f + try (ErrorChoice a) = ErrorChoice $ \f -> try $ a f instance FreeArrowLike (ErrorChoice ex) where type Ctx (ErrorChoice ex) = Join ArrowChoice (ArrowError ex) diff --git a/src/Control/Funflow/Class.hs b/src/Control/Funflow/Class.hs index 833afc1..ec121ba 100644 --- a/src/Control/Funflow/Class.hs +++ b/src/Control/Funflow/Class.hs @@ -16,6 +16,7 @@ module Control.Funflow.Class where import Control.Arrow +import Control.Arrow.AppArrow import Control.Arrow.Free import qualified Control.Funflow.Base as Base import Control.Funflow.ContentHashable @@ -24,7 +25,7 @@ import Control.Funflow.External import Data.Default (def) import Path -class (ArrowChoice arr, ArrowError ex arr) => ArrowFlow eff ex arr | arr -> eff ex where +class (Arrow arr, ArrowError ex arr) => ArrowFlow eff ex arr | arr -> eff ex where -- | Create a flow from a pure function. step' :: Base.Properties a b -> (a -> b) -> arr a b -- | Create a flow from an IO action. @@ -64,3 +65,16 @@ stepIO = stepIO' def wrap :: ArrowFlow eff ex arr => eff a b -> arr a b wrap = wrap' def + +instance ( Applicative app + , ArrowError ex (AppArrow app (arr eff ex)) + , ArrowFlow eff ex (arr eff ex) ) + => ArrowFlow eff ex (AppArrow app (arr eff ex)) where + step' props f = appArrow $ step' props f + stepIO' props f = appArrow $ stepIO' props f + external f = appArrow $ external f + external' props f = appArrow $ external' props f + wrap' props eff = appArrow $ wrap' props eff + putInStore f = appArrow $ putInStore f + getFromStore f = appArrow $ getFromStore f + internalManipulateStore f = appArrow $ internalManipulateStore f diff --git a/src/Control/Funflow/ContentHashable.hs b/src/Control/Funflow/ContentHashable.hs index 6fb41fa..53a6aca 100644 --- a/src/Control/Funflow/ContentHashable.hs +++ b/src/Control/Funflow/ContentHashable.hs @@ -52,6 +52,7 @@ module Control.Funflow.ContentHashable import Control.Exception.Safe (catchJust) import Control.Funflow.Orphans () import Control.Monad (foldM, mzero, (>=>)) +import Control.Monad.IO.Class (MonadIO, liftIO) import Crypto.Hash (Context, Digest, SHA256, digestFromByteString, hashFinalize, hashInit, @@ -459,9 +460,9 @@ instance ContentHashable IO FileContent where -- The path to the directory is ignored. newtype DirectoryContent = DirectoryContent (Path.Path Path.Abs Path.Dir) -instance ContentHashable IO DirectoryContent where +instance MonadIO m => ContentHashable m DirectoryContent where - contentHashUpdate ctx0 (DirectoryContent dir0) = do + contentHashUpdate ctx0 (DirectoryContent dir0) = liftIO $ do (dirs, files) <- Path.IO.listDir dir0 ctx' <- foldM hashFile ctx0 (sort files) foldM hashDir ctx' (sort dirs) diff --git a/src/Control/Funflow/ContentStore.hs b/src/Control/Funflow/ContentStore.hs index 86d774e..b784f29 100644 --- a/src/Control/Funflow/ContentStore.hs +++ b/src/Control/Funflow/ContentStore.hs @@ -1,4 +1,5 @@ {-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} @@ -123,15 +124,16 @@ import Control.Concurrent (threadDelay) import Control.Concurrent.Async import Control.Concurrent.MVar import Control.Exception.Safe (Exception, MonadMask, - bracket, bracket_, - bracketOnError, + bracket, bracketOnError, + bracket_, displayException, throwIO) import Control.Funflow.ContentStore.Notify import Control.Funflow.Orphans () import Control.Lens -import Control.Monad (forever, forM_, unless, +import Control.Monad (forM_, forever, unless, void, when, (<=<), (>=>)) import Control.Monad.IO.Class (MonadIO, liftIO) +import Control.Monad.Trans.Control (MonadBaseControl) import Crypto.Hash (hashUpdate) import Data.Aeson (FromJSON, ToJSON) import Data.Bits (complement) @@ -166,6 +168,7 @@ import Control.Funflow.ContentHashable (ContentHash, encodeHash, pathToHash, toBytes) import Control.Funflow.Lock +import qualified Control.Funflow.RemoteCache as Remote -- | Status of an item in the store. @@ -465,16 +468,17 @@ waitUntilComplete store hash = lookupOrWait store hash >>= \case -- It should be constructed in the given @buildDir@, -- and then marked as complete using 'markComplete'. constructOrAsync - :: MonadIO m + :: forall m remoteCache. + (MonadIO m, MonadBaseControl IO m, MonadMask m, Remote.Cacher m remoteCache) => ContentStore + -> remoteCache -> ContentHash -> m (Status (Path Abs Dir) (Async Update) Item) -constructOrAsync store hash = liftIO . withStoreLock store $ - internalQuery store hash >>= \case +constructOrAsync store cacher hash = + constructIfMissing store cacher hash >>= \case Complete item -> return $ Complete item - Missing () -> withWritableStore store $ - Missing <$> internalMarkPending store hash - Pending _ -> Pending <$> internalWatchPending store hash + Missing path -> return $ Missing path + Pending _ -> Pending <$> liftIO (internalWatchPending store hash) -- | Atomically query the state under the given key and mark pending if missing. -- Wait for the item to be completed, if already pending. @@ -485,11 +489,12 @@ constructOrAsync store hash = liftIO . withStoreLock store $ -- It should be constructed in the given @buildDir@, -- and then marked as complete using 'markComplete'. constructOrWait - :: MonadIO m + :: (MonadIO m, MonadMask m, MonadBaseControl IO m, Remote.Cacher m remoteCache) => ContentStore + -> remoteCache -> ContentHash -> m (Status (Path Abs Dir) Void Item) -constructOrWait store hash = constructOrAsync store hash >>= \case +constructOrWait store cacher hash = constructOrAsync store cacher hash >>= \case Pending a -> liftIO (wait a) >>= \case Completed item -> return $ Complete item -- XXX: Consider extending 'Status' with a 'Failed' constructor. @@ -503,30 +508,38 @@ constructOrWait store hash = constructOrAsync store hash >>= \case -- | Atomically query the state under the given key and mark pending if missing. constructIfMissing - :: MonadIO m + :: (MonadIO m, MonadBaseControl IO m, MonadMask m, Remote.Cacher m remoteCache) => ContentStore + -> remoteCache -> ContentHash -> m (Status (Path Abs Dir) () Item) -constructIfMissing store hash = liftIO . withStoreLock store $ +constructIfMissing store cacher hash = withStoreLock store $ internalQuery store hash >>= \case Complete item -> return $ Complete item + Missing () -> withWritableStore store $ do + let destDir :: Path Abs Dir = mkItemPath store hash + Remote.pull cacher hash destDir >>= \case + Remote.PullOK () -> return $ Complete (Item hash) + Remote.NotInCache -> + Missing <$> liftIO (internalMarkPending store hash) + Remote.PullError _ -> + Missing <$> liftIO (internalMarkPending store hash) Pending _ -> return $ Pending () - Missing () -> withWritableStore store $ - Missing <$> internalMarkPending store hash -- | Atomically query the state under the given key and mark pending if missing. -- Execute the given function to construct the item, mark as complete on success -- and remove on failure. Forcibly removes if an uncaught exception occurs -- during item construction. withConstructIfMissing - :: (MonadIO m, MonadMask m) + :: (MonadIO m, MonadBaseControl IO m, MonadMask m, Remote.Cacher m remoteCache) => ContentStore + -> remoteCache -> ContentHash -> (Path Abs Dir -> m (Either e a)) -> m (Status e () (Maybe a, Item)) -withConstructIfMissing store hash f = +withConstructIfMissing store cacher hash f = bracketOnError - (constructIfMissing store hash) + (constructIfMissing store cacher hash) (\case Missing _ -> removeForcibly store hash _ -> return ()) @@ -539,6 +552,7 @@ withConstructIfMissing store hash f = return (Missing e) Right x -> do item <- markComplete store hash + _ <- Remote.push cacher (itemHash item) (Just hash) (itemPath store item) return (Complete (Just x, item))) -- | Mark a non-existent item as pending. @@ -781,7 +795,7 @@ metadataPath = (</> [reldir|metadata|]) -- | Holds a lock on the global 'MVar' and on the global lock file -- for the duration of the given action. -withStoreLock :: ContentStore -> IO a -> IO a +withStoreLock :: MonadBaseControl IO m => ContentStore -> m a -> m a withStoreLock store = withLock (storeLock store) prefixHashPath :: C8.ByteString -> ContentHash -> Path Rel Dir @@ -895,25 +909,25 @@ internalWatchPending store hash = do -- Stop watching when it arrives. async $ takeMVar update <* stopWatching -setRootDirWritable :: Path Abs Dir -> IO () -setRootDirWritable storeRoot = +setRootDirWritable :: MonadIO m => Path Abs Dir -> m () +setRootDirWritable storeRoot = liftIO $ setFileMode (fromAbsDir storeRoot) writableRootDirMode writableRootDirMode :: FileMode writableRootDirMode = writableDirMode -setRootDirReadOnly :: Path Abs Dir -> IO () -setRootDirReadOnly storeRoot = +setRootDirReadOnly :: MonadIO m => Path Abs Dir -> m () +setRootDirReadOnly storeRoot = liftIO $ setFileMode (fromAbsDir storeRoot) readOnlyRootDirMode readOnlyRootDirMode :: FileMode readOnlyRootDirMode = writableDirMode `intersectFileModes` allButWritableMode -withWritableStoreRoot :: Path Abs Dir -> IO a -> IO a +withWritableStoreRoot :: (MonadMask m, MonadIO m) => Path Abs Dir -> m a -> m a withWritableStoreRoot storeRoot = bracket_ (setRootDirWritable storeRoot) (setRootDirReadOnly storeRoot) -withWritableStore :: ContentStore -> IO a -> IO a +withWritableStore :: (MonadMask m, MonadIO m) => ContentStore -> m a -> m a withWritableStore ContentStore {storeRoot} = withWritableStoreRoot storeRoot diff --git a/src/Control/Funflow/Diagram.hs b/src/Control/Funflow/Diagram.hs index 9581911..7305c9b 100644 --- a/src/Control/Funflow/Diagram.hs +++ b/src/Control/Funflow/Diagram.hs @@ -33,7 +33,7 @@ data Diagram ex a b where Seq :: Diagram ex a b -> Diagram ex b c -> Diagram ex a c Par :: Diagram ex a b -> Diagram ex c d -> Diagram ex (a,c) (b,d) Fanin :: Diagram ex a c -> Diagram ex b c -> Diagram ex (Either a b) c - Catch :: Diagram ex a b -> Diagram ex (a,ex) b -> Diagram ex a b + Try :: Diagram ex a b -> Diagram ex a (Either ex b) instance Category (Diagram ex) where id = Node emptyNodeProperties Proxy Proxy @@ -51,7 +51,7 @@ instance ArrowChoice (Diagram ex) where f ||| g = Fanin f g instance ArrowError ex (Diagram ex) where - f `catch` g = Catch f g + try = Try -- | Construct a labelled node node :: forall arr a b ex. Arrow arr => arr a b -> [T.Text] -> (Diagram ex) a b diff --git a/src/Control/Funflow/Exec/Simple.hs b/src/Control/Funflow/Exec/Simple.hs index 9e60966..1d810f7 100644 --- a/src/Control/Funflow/Exec/Simple.hs +++ b/src/Control/Funflow/Exec/Simple.hs @@ -21,7 +21,6 @@ module Control.Funflow.Exec.Simple , withSimpleLocalRunner ) where -import Control.Arrow (returnA) import Control.Arrow.Async import Control.Arrow.Free (type (~>), eval) import Control.Concurrent.Async (withAsync) @@ -37,12 +36,14 @@ import Control.Funflow.External import Control.Funflow.External.Coordinator import Control.Funflow.External.Coordinator.Memory import Control.Funflow.External.Executor (executeLoop) +import qualified Control.Funflow.RemoteCache as Remote +import Control.Monad.Catch (MonadCatch, + MonadMask) import Control.Monad.IO.Class import Control.Monad.Trans.Control (MonadBaseControl) -import Control.Monad.Catch (MonadCatch - ,MonadMask) import qualified Data.ByteString as BS import Data.Foldable (traverse_) +import Data.Maybe import Data.Monoid ((<>)) import Data.Void import Katip @@ -50,20 +51,24 @@ import Path import System.IO (stderr) -- | Simple evaulation of a flow -runFlowEx :: forall m c eff ex a b. +runFlowEx :: forall m c eff ex a b remoteCache. (Coordinator c, Exception ex, MonadIO m, MonadBaseControl IO m - ,MonadCatch m, MonadMask m, KatipContext m) + ,MonadCatch m, MonadMask m, KatipContext m, Remote.Cacher m remoteCache) => c -> Config c -> CS.ContentStore + -> remoteCache -> (eff ~> AsyncA m) -- ^ Natural transformation from wrapped effects - -> Int -- ^ Flow configuration identity. This forms part of the caching - -- system and is used to disambiguate the same flow run in - -- multiple configurations. + -> Maybe Int -- ^ Flow configuration identity. This forms part of the + -- caching system and is used to disambiguate the same + -- flow run in multiple configurations. If Nothing, + -- then it means this flow has no identity, this + -- implies that steps will be executed without cache, + -- and external tasks will all be considered impure. -> Flow eff ex a b -> a -> m b -runFlowEx _ cfg store runWrapped confIdent flow input = do +runFlowEx _ cfg store cacher runWrapped confIdent flow input = do hook <- initialise cfg runAsyncA (eval (runFlow' hook) flow) input where @@ -71,32 +76,27 @@ runFlowEx _ cfg store runWrapped confIdent flow input = do $ CS.itemPath store item </> [relfile|out|] withStoreCache :: forall i o. Cacher i o -> AsyncA m i o -> AsyncA m i o - withStoreCache NoCache f = f - withStoreCache c f = let - chashOf i = cacherKey c confIdent i - checkStore = AsyncA $ \chash -> - CS.constructOrWait store chash >>= \case - CS.Pending void -> absurd void - CS.Complete item -> do - bs <- liftIO . BS.readFile $ simpleOutPath item - return . Right . cacherReadValue c $ bs - CS.Missing fp -> return $ Left fp - writeStore = AsyncA $ \(chash, fp, res) -> - do - liftIO $ BS.writeFile (toFilePath $ fp </> [relfile|out|]) - . cacherStoreValue c $ res - _ <- CS.markComplete store chash - return res - `onException` - CS.removeFailed store chash - in proc i -> do - let chash = chashOf i - mcontents <- checkStore -< chash - case mcontents of - Right contents -> returnA -< contents - Left fp -> do - res <- f -< i - writeStore -< (chash, fp, res) + withStoreCache c@Cache{} (AsyncA f) + | Just confIdent' <- confIdent = AsyncA $ \i -> do + let chash = cacherKey c confIdent' i + computeAndStore fp = do + res <- f i -- Do the actual computation + liftIO $ BS.writeFile (toFilePath $ fp </> [relfile|out|]) + . cacherStoreValue c $ res + return $ Right res + readItem item = do + bs <- liftIO . BS.readFile $ simpleOutPath item + return . cacherReadValue c $ bs + CS.withConstructIfMissing store cacher chash computeAndStore >>= \case + CS.Missing e -> absurd e + CS.Pending _ -> + liftIO (CS.waitUntilComplete store chash) >>= \case + Just item -> readItem item + Nothing -> throwM $ CS.FailedToConstruct chash + CS.Complete (Just a, _) -> return a + CS.Complete (_, item) -> readItem item + withStoreCache _ f = f + writeMd :: forall i o. ContentHash -> i -> o @@ -113,13 +113,16 @@ runFlowEx _ cfg store runWrapped confIdent flow input = do . AsyncA $ \x -> do let out = f x case cache props of - NoCache -> return () - Cache key _ _ -> writeMd (key confIdent x) x out $ mdpolicy props + Cache key _ _ | Just confIdent' <- confIdent -> + writeMd (key confIdent' x) x out $ mdpolicy props + _ -> return () return out runFlow' _ (StepIO props f) = withStoreCache (cache props) . AsyncA $ liftIO . f runFlow' po (External props toTask) = AsyncA $ \x -> do - chash <- liftIO $ case (ep_impure props) of + let purity | Just _ <- confIdent = ep_impure props + | otherwise = alwaysRecompile + chash <- liftIO $ case purity of EpPure -> contentHash (toTask x) EpImpure fn -> do salt <- fn @@ -149,7 +152,9 @@ runFlowEx _ cfg store runWrapped confIdent flow input = do submitTask po td wait chash td wait chash td = do - KnownTask _ <- awaitTask po chash + awaitTask po chash >>= \case + KnownTask _ -> pure () + _ -> error "[Control.Funflow.Exec.Simple.runFlowEx] Expected KnownTask." CS.waitUntilComplete store chash >>= \case Just item -> return item Nothing -> do @@ -159,13 +164,15 @@ runFlowEx _ cfg store runWrapped confIdent flow input = do throwM $ ExternalTaskFailed td ti mbStdout mbStderr runFlow' _ (PutInStore f) = AsyncA $ \x -> katipAddNamespace "putInStore" $ do chash <- liftIO $ contentHash x - CS.constructOrWait store chash >>= \case + CS.constructOrWait store cacher chash >>= \case CS.Pending void -> absurd void CS.Complete item -> return item CS.Missing fp -> do liftIO $ f fp x - CS.markComplete store chash + finalItem <- CS.markComplete store chash + _ <- Remote.push cacher (CS.itemHash finalItem) (Just chash) (CS.itemPath store finalItem) + pure finalItem `onException` (do $(logTM) WarningS . ls $ "Exception in construction: removing " <> show chash CS.removeFailed store chash @@ -178,40 +185,42 @@ runFlowEx _ cfg store runWrapped confIdent flow input = do $ runWrapped w -- | Run a flow in a logging context. -runFlowLog :: forall m c eff ex a b. +runFlowLog :: forall m c eff ex a b remoteCache. (Coordinator c, Exception ex, MonadIO m, MonadBaseControl IO m - ,MonadCatch m, MonadMask m, KatipContext m) + ,MonadCatch m, MonadMask m, KatipContext m, Remote.Cacher m remoteCache) => c -> Config c -> CS.ContentStore + -> remoteCache -> (eff ~> AsyncA m) -- ^ Natural transformation from wrapped effects - -> Int -- ^ Flow configuration identity. This forms part of the caching + -> Maybe Int -- ^ Flow configuration identity. This forms part of the caching -- system and is used to disambiguate the same flow run in -- multiple configurations. -> Flow eff ex a b -> a -> m (Either ex b) -runFlowLog c cfg store runWrapped confIdent flow input = - try $ runFlowEx c cfg store runWrapped confIdent flow input +runFlowLog c cfg store cacher runWrapped confIdent flow input = + try $ runFlowEx c cfg store cacher runWrapped confIdent flow input -- | Run a flow, discarding all logging. -runFlow :: forall m c eff ex a b. +runFlow :: forall m c eff ex a b remoteCache. (Coordinator c, Exception ex, MonadIO m, MonadBaseControl IO m - ,MonadCatch m, MonadMask m) + ,MonadCatch m, MonadMask m, Remote.Cacher (KatipContextT m) remoteCache) => c -> Config c -> CS.ContentStore + -> remoteCache -> (eff ~> AsyncA m) -- ^ Natural transformation from wrapped effects - -> Int -- ^ Flow configuration identity. This forms part of the caching + -> Maybe Int -- ^ Flow configuration identity. This forms part of the caching -- system and is used to disambiguate the same flow run in -- multiple configurations. -> Flow eff ex a b -> a -> m (Either ex b) -runFlow c cfg store runWrapped confIdent flow input = do +runFlow c cfg store cacher runWrapped confIdent flow input = do le <- liftIO $ initLogEnv "funflow" "production" runKatipContextT le () "runFlow" - $ runFlowLog c cfg store (liftAsyncA . runWrapped) confIdent flow input + $ runFlowLog c cfg store cacher (liftAsyncA . runWrapped) confIdent flow input -- | Run a simple flow. Logging will be sent to stderr runSimpleFlow :: forall m c a b. @@ -224,7 +233,7 @@ runSimpleFlow :: forall m c a b. -> a -> m (Either SomeException b) runSimpleFlow c ccfg store flow input = do - handleScribe <- liftIO $ mkHandleScribe ColorIfTerminal stderr InfoS V2 + handleScribe <- liftIO $ mkHandleScribe ColorIfTerminal stderr (permitItem InfoS) V2 let mkLogEnv = liftIO $ registerScribe "stderr" handleScribe defaultScribeSettings =<< initLogEnv "funflow" "production" bracket mkLogEnv (liftIO . closeScribes) $ \le -> do @@ -232,7 +241,7 @@ runSimpleFlow c ccfg store flow input = do initialNamespace = "executeLoop" runKatipContextT le initialContext initialNamespace - $ runFlowLog c ccfg store runNoEffect 12345 flow input + $ runFlowLog c ccfg store Remote.NoCache runNoEffect (Just 12345) flow input -- | Create a full pipeline runner locally. This includes an executor for -- executing external tasks. diff --git a/src/Control/Funflow/External/Executor.hs b/src/Control/Funflow/External/Executor.hs index c7c9a5c..92c2c07 100644 --- a/src/Control/Funflow/External/Executor.hs +++ b/src/Control/Funflow/External/Executor.hs @@ -19,6 +19,7 @@ import Control.Exception.Safe import qualified Control.Funflow.ContentStore as CS import Control.Funflow.External import Control.Funflow.External.Coordinator +import qualified Control.Funflow.RemoteCache as Remote import Control.Lens import Control.Monad (forever, mzero, unless) import Control.Monad.Trans (lift) @@ -29,6 +30,7 @@ import Data.Foldable (for_) import Data.Maybe (isJust, isNothing) import Data.Monoid ((<>)) import qualified Data.Text as T +import GHC.IO.Handle (hClose) import Katip as K import Network.HostName import Path @@ -40,7 +42,6 @@ import System.IO (Handle, IOMode (..), import System.Posix.Env (getEnv) import System.Posix.User import System.Process -import GHC.IO.Handle (hClose) data ExecutionResult = -- | The result already exists in the store and there is no need @@ -63,7 +64,8 @@ data ExecutionResult = -- | Execute an individual task. execute :: CS.ContentStore -> TaskDescription -> KatipContextT IO ExecutionResult execute store td = logError $ do - status <- CS.withConstructIfMissing store (td ^. tdOutput) $ \fp -> do + -- We should pass a Remote cacher down to it: + status <- CS.withConstructIfMissing store Remote.NoCache (td ^. tdOutput) $ \fp -> do (fpOut, hOut) <- lift $ CS.createMetadataFile store (td ^. tdOutput) [relfile|stdout|] (fpErr, hErr) <- lift $ @@ -162,7 +164,7 @@ executeLoop :: forall c. Coordinator c -> IO () executeLoop coord cfg store = executeLoopWithScribe coord cfg store =<< - mkHandleScribe ColorIfTerminal stdout InfoS V2 + mkHandleScribe ColorIfTerminal stdout (permitItem InfoS) V2 -- | Same as 'executeLoop', but allows specifying a custom 'Scribe' for logging executeLoopWithScribe :: forall c. Coordinator c diff --git a/src/Control/Funflow/Lock.hs b/src/Control/Funflow/Lock.hs index ea6f0fa..ca24d26 100644 --- a/src/Control/Funflow/Lock.hs +++ b/src/Control/Funflow/Lock.hs @@ -1,4 +1,5 @@ -{-# LANGUAGE QuasiQuotes #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE QuasiQuotes #-} {-# LANGUAGE ScopedTypeVariables #-} -- | Thread and process write lock. @@ -17,8 +18,9 @@ module Control.Funflow.Lock import Control.Concurrent import Control.Exception.Safe -import Control.Monad (unless) -import Network.HostName (getHostName) +import Control.Monad (unless) +import Control.Monad.Trans.Control (MonadBaseControl, liftBaseOp_) +import Network.HostName (getHostName) import Path import Path.IO import System.Posix.Files @@ -58,8 +60,8 @@ closeLock lock = do takeMVar (lockMVar lock) -- | Acquire the lock for the duration of the given action and release after. -withLock :: Lock -> IO a -> IO a -withLock lock action = +withLock :: MonadBaseControl IO m => Lock -> m a -> m a +withLock lock = liftBaseOp_ $ \action -> withMVar (lockMVar lock) $ \() -> bracket_ (acquireDirLock $ lockDir lock) (releaseDirLock $ lockDir lock) $ action diff --git a/src/Control/Funflow/Pretty.hs b/src/Control/Funflow/Pretty.hs index 2f7dea1..ebeb860 100644 --- a/src/Control/Funflow/Pretty.hs +++ b/src/Control/Funflow/Pretty.hs @@ -18,7 +18,7 @@ ppFlow = ppDiagram . toDiagram where ppDiagram (Seq f g) = parens $ ppDiagram f <+> text ">>>" <+> ppDiagram g ppDiagram (Par f g) = parens $ ppDiagram f <+> text "***" <+> ppDiagram g ppDiagram (Fanin f g) = parens $ ppDiagram f <+> text "|||" <+> ppDiagram g - ppDiagram (Catch f g) = parens $ ppDiagram f <+> text "catch" <+> ppDiagram g + ppDiagram (Try f) = parens $ text "try" <+> ppDiagram f showFlow :: Flow eff ex a b -> String showFlow = render . ppFlow diff --git a/src/Control/Funflow/RemoteCache.hs b/src/Control/Funflow/RemoteCache.hs new file mode 100644 index 0000000..803c466 --- /dev/null +++ b/src/Control/Funflow/RemoteCache.hs @@ -0,0 +1,134 @@ +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE MultiParamTypeClasses #-} + +-- | +-- This module defines the remote caching mechanism of funflow which is used to +-- keep several funflow stores (possibly on different machines) in sync. +module Control.Funflow.RemoteCache + ( Cacher(..) + , PullResult(..), PushResult(..), AliasResult(..) + , NoCache(..), memoryCache + , pullAsArchive, pushAsArchive + ) where + +import qualified Codec.Archive.Tar as Tar +import Control.Concurrent.MVar +import Control.Funflow.ContentHashable +import Control.Monad.IO.Class (MonadIO, liftIO) +import Data.ByteString.Lazy (ByteString) +import Data.Map.Strict (Map) +import qualified Data.Map.Strict as Map +import Path + + +-- | +-- The result of a tentative pull from the remote cache +data PullResult a + = PullOK a + | NotInCache + | PullError String + deriving (Eq, Ord, Show) + +-- | +-- The result of a tentative push to the remote cache +data PushResult + = PushOK + | PushError String + deriving (Eq, Ord, Show) + +data AliasResult + = AliasOK + | TargetNotInCache + | AliasError String + +-- | +-- A simple mechanism for remote-caching. +-- +-- Provides a way to push a path to the cache and pull it back. +-- +-- No assumption is made on the availability of a store path. In particular, +-- pushing a path to the cache doesn't mean that we can pull it back. +class Monad m => Cacher m a where + push :: + a + -> ContentHash -- ^ "Primary" key: hash of the content + -> Maybe ContentHash -- ^ "Secondary" key: hash of the dependencies + -> Path Abs Dir -- ^ Path to the content + -> m PushResult + pull :: a -> ContentHash -> Path Abs Dir -> m (PullResult ()) + +-- | +-- Push the path as an archive to the remote cache +pushAsArchive :: + MonadIO m + => (ContentHash -> ContentHash -> m (Either String ())) -- ^ How to create the aliases + -> (ContentHash -> ByteString -> m PushResult) -- ^ How to push the content + -> ContentHash -- ^ Primary key + -> Maybe ContentHash -- ^ Secondary key + -> Path Abs Dir + -> m PushResult +pushAsArchive alias pushArchive primaryKey mSecondaryKey path = do + archive <- liftIO $ Tar.write <$> Tar.pack (toFilePath path) ["."] + pushArchive primaryKey archive >>= \case + PushError e -> pure $ PushError e + res -> + case mSecondaryKey of + Just secondaryKey -> + alias primaryKey secondaryKey >>= \case + Left err -> pure $ PushError err + Right () -> pure res + Nothing -> pure res + +pullAsArchive :: + MonadIO m + => (ContentHash -> m (PullResult ByteString)) + -> ContentHash + -> Path Abs Dir + -> m (PullResult ()) +pullAsArchive pullArchive hash path = + pullArchive hash >>= \case + PullOK archive -> do + liftIO $ Tar.unpack (toFilePath path) $ Tar.read archive + pure $ PullOK () + NotInCache -> pure NotInCache + PullError e -> pure $ PullError e + +-- | +-- A dummy remote cache implementation which does nothing +data NoCache = NoCache + +instance Monad m => Cacher m NoCache where + pull _ _ _ = pure NotInCache + push _ _ _ _ = pure PushOK + +-- | +-- An in-memory cache, for testing purposes +data MemoryCache = MemoryCache (MVar (Map ContentHash ByteString)) +instance MonadIO m => Cacher m MemoryCache where + pull (MemoryCache cacheVar) = pullAsArchive $ \hash -> do + cacheMap <- liftIO $ readMVar cacheVar + case Map.lookup hash cacheMap of + Nothing -> pure NotInCache + Just x -> pure (PullOK x) + push (MemoryCache cacheVar) = pushAsArchive alias $ \hash content -> do + liftIO $ modifyMVar_ + cacheVar + (\cacheMap -> pure $ Map.insert hash content cacheMap) + pure PushOK + where + alias from to = liftIO $ Right <$> modifyMVar_ cacheVar + (\cacheMap -> pure $ Map.insert to (cacheMap Map.! from) cacheMap) + +memoryCache :: MonadIO m => m MemoryCache +memoryCache = liftIO $ MemoryCache <$> newMVar mempty + +-- | +-- If 'a' is a 'Cacher' then 'Maybe a' is a cacher such that 'Just x' behavies +-- like 'x' and 'Nothing' doesn't cache anything +instance Cacher m a => Cacher m (Maybe a) where + pull (Just x) = pull x + pull Nothing = pull NoCache + + push (Just x) = push x + push Nothing = push NoCache diff --git a/src/Control/Funflow/Steps.hs b/src/Control/Funflow/Steps.hs index 1789574..0281efa 100644 --- a/src/Control/Funflow/Steps.hs +++ b/src/Control/Funflow/Steps.hs @@ -36,6 +36,7 @@ module Control.Funflow.Steps , promptFor , printS , failStep + , cachedFailStep , worstBernoulli , pauseWith , melancholicLazarus @@ -45,7 +46,8 @@ where import Control.Arrow import Control.Arrow.Free (catch) import Control.Exception.Safe (Exception, throwM) -import Control.Funflow.Base (SimpleFlow) +import Control.Funflow.Base (SimpleFlow, cache, + defaultCacherWithIdent) import Control.Funflow.Class import Control.Funflow.ContentHashable (ContentHashable, DirectoryContent (..), @@ -53,6 +55,7 @@ import Control.Funflow.ContentHashable (ContentHashable, import Control.Funflow.ContentStore (Content ((:</>))) import qualified Control.Funflow.ContentStore as CS import qualified Control.Funflow.External.Docker as Docker +import Data.Default (def) import Data.Foldable (for_) import Data.Store import Data.Traversable (for) @@ -78,6 +81,11 @@ printS = stepIO $ \s-> print s failStep :: ArrowFlow eff ex arr => arr () () failStep = stepIO $ \_ -> fail "failStep" +cachedFailStep :: ArrowFlow eff ex arr => arr () () +cachedFailStep = stepIO' + (def { cache = defaultCacherWithIdent 1235314531893843918}) + (\_ -> fail "cachedFailStep") + worstBernoulli :: (Exception ex, ArrowFlow eff ex arr) => (String -> ex) -> arr Double Double worstBernoulli errorC = stepIO $ \p -> do r <- randomRIO (0,1) @@ -107,7 +115,7 @@ melancholicLazarus = stepIO $ \s -> do -- | `retry n s f` reruns `f` on failure at most n times with a delay of `s` -- seconds between retries -retry :: forall arr eff ex a b. (Exception ex, Store a, ArrowFlow eff ex arr) +retry :: forall arr eff ex a b. (Exception ex, Store a, ArrowFlow eff ex arr, ArrowChoice arr) => Int -> Int -> arr a b -> arr a b retry 0 _ f = f retry n secs f = catch f $ proc (x, _ :: ex) -> do @@ -140,7 +148,8 @@ copyFileToStore = putInStoreAt $ \p (FileContent inFP) -> copyFile inFP p -- -- | @copyDirToStore (dIn, Just dOut)@ copies the contents of @dIn@ into the store -- under relative path @dOut@ within the subtree -copyDirToStore :: ArrowFlow eff ex arr => arr (DirectoryContent, Maybe (Path Rel Dir)) (CS.Content Dir) +copyDirToStore :: (ArrowChoice arr, ArrowFlow eff ex arr) + => arr (DirectoryContent, Maybe (Path Rel Dir)) (CS.Content Dir) copyDirToStore = proc (inDir, mbOutDir) -> case mbOutDir of Nothing -> do diff --git a/test/Funflow/ContentStore.hs b/test/Funflow/ContentStore.hs index 64b284e..bdd8f9e 100644 --- a/test/Funflow/ContentStore.hs +++ b/test/Funflow/ContentStore.hs @@ -13,6 +13,7 @@ import Control.Exception.Safe (tryAny) import Control.Funflow.ContentHashable (contentHash) import Control.Funflow.ContentStore (ContentStore) import qualified Control.Funflow.ContentStore as ContentStore +import qualified Control.Funflow.RemoteCache as Remote import Control.Monad (void) import Data.Maybe (catMaybes) import qualified Data.Set as Set @@ -91,10 +92,11 @@ tests = testGroup "Content Store" content @?= expectedContent , testCase "await construction" $ + Remote.memoryCache >>= \myCache -> withEmptyStore $ \store -> do hash <- contentHash ("test" :: String) - ContentStore.constructOrAsync store hash >>= \case + ContentStore.constructOrAsync store myCache hash >>= \case ContentStore.Pending _ -> assertFailure "missing already under construction" ContentStore.Complete _ -> @@ -102,7 +104,7 @@ tests = testGroup "Content Store" ContentStore.Missing _ -> return () - a <- ContentStore.constructOrAsync store hash >>= \case + a <- ContentStore.constructOrAsync store Remote.NoCache hash >>= \case ContentStore.Missing _ -> do assertFailure "under construction still missing" undefined @@ -130,7 +132,7 @@ tests = testGroup "Content Store" item'' <- wait b item'' @?= ContentStore.Completed item - ContentStore.constructOrAsync store hash >>= \case + ContentStore.constructOrAsync store Remote.NoCache hash >>= \case ContentStore.Missing _ -> do assertFailure "complete still missing" ContentStore.Pending _ -> do @@ -142,7 +144,7 @@ tests = testGroup "Content Store" withEmptyStore $ \store -> do hash <- contentHash ("test" :: String) - ContentStore.constructOrAsync store hash >>= \case + ContentStore.constructOrAsync store Remote.NoCache hash >>= \case ContentStore.Pending _ -> assertFailure "missing already under construction" ContentStore.Complete _ -> @@ -150,7 +152,7 @@ tests = testGroup "Content Store" ContentStore.Missing _ -> return () - a <- ContentStore.constructOrAsync store hash >>= \case + a <- ContentStore.constructOrAsync store Remote.NoCache hash >>= \case ContentStore.Missing _ -> do assertFailure "under construction still missing" undefined @@ -178,7 +180,7 @@ tests = testGroup "Content Store" item'' <- wait b item'' @?= ContentStore.Failed - ContentStore.constructOrAsync store hash >>= \case + ContentStore.constructOrAsync store Remote.NoCache hash >>= \case ContentStore.Pending _ -> do assertFailure "failed still under construction" ContentStore.Complete _ -> do @@ -192,7 +194,7 @@ tests = testGroup "Content Store" let file = [relfile|file|] expectedContent = "Hello World" - ContentStore.constructIfMissing store hash >>= \case + ContentStore.constructIfMissing store Remote.NoCache hash >>= \case ContentStore.Pending () -> assertFailure "missing already under construction" ContentStore.Complete _ -> @@ -202,7 +204,7 @@ tests = testGroup "Content Store" @? "under construction not writable" writeFile (fromAbsFile $ subtree </> file) expectedContent - ContentStore.constructIfMissing store hash >>= \case + ContentStore.constructIfMissing store Remote.NoCache hash >>= \case ContentStore.Missing _ -> assertFailure "under construction still missing" ContentStore.Complete _ -> @@ -210,7 +212,7 @@ tests = testGroup "Content Store" ContentStore.Pending () -> void $ ContentStore.markComplete store hash - ContentStore.constructIfMissing store hash >>= \case + ContentStore.constructIfMissing store Remote.NoCache hash >>= \case ContentStore.Missing _ -> assertFailure "complete still missing" ContentStore.Pending () -> @@ -384,6 +386,62 @@ tests = testGroup "Content Store" r <- ContentStore.lookupAlias store aliasB r @?= Nothing + , testCase "Remote caching (constructOrAsync)" $ do + cacher <- Remote.memoryCache + hash <- contentHash ("test" :: String) + let file = [relfile|file|] + expectedContent = "Hello World" + + -- Populate the remote cache + withEmptyStore $ \store -> do + ContentStore.constructOrAsync store cacher hash >>= \case + ContentStore.Pending _ -> + assertFailure "missing already under construction" + ContentStore.Complete _ -> + assertFailure "missing already complete" + ContentStore.Missing subtree -> do + isWritable subtree @? "under construction not writable" + writeFile (fromAbsFile $ subtree </> file) expectedContent + Remote.push cacher hash Nothing subtree + + -- Expects having the item in cache + withEmptyStore $ \store -> do + ContentStore.constructOrAsync store cacher hash >>= \case + ContentStore.Pending _ -> + assertFailure "missing already under construction" + ContentStore.Complete _ -> pure () + ContentStore.Missing _ -> assertFailure "Not found in the cache" + + , testCase "Remote caching (withConstructIfMissing)" $ do + cacher <- Remote.memoryCache + hash <- contentHash ("test" :: String) + let file = [relfile|file|] + expectedContent = "Hello World" + doWrite subtree = do + isWritable subtree @? "under construction not writable" + writeFile (fromAbsFile $ subtree </> file) expectedContent + return $ Right () + + -- Populates the remote cache + withEmptyStore $ \store -> do + ContentStore.withConstructIfMissing store cacher hash doWrite >>= \case + ContentStore.Missing _ -> assertFailure "not found in the cache" + ContentStore.Pending _ -> + assertFailure "missing already under construction" + -- ContentStore.waitUntilComplete store hash >>= \case + -- Just item -> return () + -- Nothing -> assertFailure "item construction failed" + ContentStore.Complete _ -> pure () + + -- Expects having the item in the remote cache + withEmptyStore $ \store -> do + ContentStore.withConstructIfMissing store cacher hash + (const $ assertFailure "should not try to write the file a second time") >>= \case + ContentStore.Missing _ -> assertFailure "Not found in the cache" + ContentStore.Pending _ -> + assertFailure "missing already under construction" + ContentStore.Complete _ -> pure () + ] shouldFail :: IO a -> String -> IO () |