diff options
author | nclarke <> | 2020-03-09 14:05:00 (GMT) |
---|---|---|
committer | hdiff <hdiff@hdiff.luite.com> | 2020-03-09 14:05:00 (GMT) |
commit | 306e9a481ffcb94b92d059ce75c12663d23eb900 (patch) | |
tree | 2f2ab075c98d729f34cf1d7f38b847494a50593e | |
parent | 31eaf2c2a3b07a6553aef61572aa685426d9f5c2 (diff) |
34 files changed, 187 insertions, 2723 deletions
diff --git a/TestFunflow.hs b/TestFunflow.hs index 09a8373..86445a6 100644 --- a/TestFunflow.hs +++ b/TestFunflow.hs @@ -7,10 +7,9 @@ import Control.Arrow import Control.Arrow.Free import Control.Exception.Safe import Control.Funflow -import qualified Control.Funflow.ContentStore as CS import Control.Funflow.External.Coordinator.Memory import Control.Funflow.Pretty -import Data.Default +import qualified Data.CAS.ContentStore as CS import Data.Monoid ((<>)) import Path import Path.IO diff --git a/app/FFExecutorD.hs b/app/FFExecutorD.hs index 70ae7cd..881a40d 100644 --- a/app/FFExecutorD.hs +++ b/app/FFExecutorD.hs @@ -11,12 +11,12 @@ import Control.Concurrent (myThreadId) import Control.Concurrent.MVar import Control.Exception.Base (AsyncException (UserInterrupt)) import Control.Exception.Safe -import qualified Control.Funflow.ContentStore as CS import Control.Funflow.External.Coordinator import Control.Funflow.External.Coordinator.Redis import Control.Funflow.External.Coordinator.SQLite import Control.Funflow.External.Executor import Control.Monad (void) +import qualified Data.CAS.ContentStore as CS import Data.Monoid ((<>)) import qualified Database.Redis as R import Network.Socket (PortNumber) diff --git a/changelog.md b/changelog.md index e69de29..e69de29 100644..100755 --- a/changelog.md +++ b/changelog.md diff --git a/funflow.cabal b/funflow.cabal index 31d5013..2f60be3 100644 --- a/funflow.cabal +++ b/funflow.cabal @@ -1,5 +1,5 @@ Name: funflow -Version: 1.5.0 +Version: 1.6.0 Synopsis: Workflows with arrows Description: An arrow with resumable computations and logging @@ -24,13 +24,10 @@ Library default-language: Haskell2010 Exposed-modules: - Control.Arrow.AppArrow - , Control.Arrow.Async + Control.Arrow.Async , Control.Arrow.Free , Control.Funflow , Control.Funflow.Cache.TH - , Control.Funflow.ContentHashable - , Control.Funflow.ContentStore , Control.Funflow.Diagram , Control.Funflow.External , Control.Funflow.External.Docker @@ -39,22 +36,20 @@ Library , Control.Funflow.External.Coordinator.Memory , Control.Funflow.External.Coordinator.Redis , Control.Funflow.External.Coordinator.SQLite - , Control.Funflow.Lock - , Control.Funflow.Orphans - , Control.Funflow.RemoteCache , Control.Funflow.Steps , Control.Funflow.Pretty , Control.Funflow.Exec.Simple Other-modules: Control.Funflow.Base , Control.Funflow.Class - , Control.Funflow.ContentStore.Notify Build-depends: base >= 4.6 && <5 , Glob , aeson >= 1.2.3.0 , async , bytestring + , cas-hashable >= 1.0.1 && <2 + , cas-store >= 1.0.1 && <2 , clock , constraints , containers @@ -79,6 +74,7 @@ Library , path-io , pretty , process + , profunctors , random , safe-exceptions , scientific @@ -94,21 +90,13 @@ Library , unordered-containers , vector , yaml - if os(linux) - CPP-options: -DOS_Linux - Other-modules: Control.Funflow.ContentStore.Notify.Linux - Build-depends: hinotify >= 0.3.9 - else - if os(darwin) || os(freebsd) - CPP-options: -DOS_BSD - Other-modules: Control.Funflow.ContentStore.Notify.BSD - Build-depends: kqueue Executable ffexecutord default-language: Haskell2010 main-is: app/FFExecutorD.hs build-depends: base >=4.6 && <5 , bytestring + , cas-store , clock , funflow , hedis >= 0.12.5 @@ -126,6 +114,7 @@ Test-suite test-funflow main-is: TestFunflow.hs ghc-options: -Wall -threaded build-depends: base >=4.6 && <5 + , cas-store , data-default , funflow , filepath @@ -141,14 +130,13 @@ Test-suite unit-tests default-language: Haskell2010 hs-source-dirs: test main-is: Test.hs - other-modules: Funflow.ContentStore - Funflow.SQLiteCoordinator + other-modules: Funflow.SQLiteCoordinator Funflow.TestFlows Control.Arrow.Async.Tests ghc-options: -Wall -threaded build-depends: base , async - , containers + , cas-store , data-default >= 0.7 , directory , filepath diff --git a/src/Control/Arrow/AppArrow.hs b/src/Control/Arrow/AppArrow.hs deleted file mode 100644 index fe88236..0000000 --- a/src/Control/Arrow/AppArrow.hs +++ /dev/null @@ -1,39 +0,0 @@ --- | This modules defines the composition of an applicative functor and an --- arrow, which is always an arrow. - -module Control.Arrow.AppArrow - ( AppArrow(..) - , appArrow - ) where - -import Control.Category -import Control.Arrow -import Prelude hiding (id, (.)) - -newtype AppArrow app arr a b = AppArrow { unAppArrow :: app (arr a b) } - -instance (Applicative app, Category cat) => Category (AppArrow app cat) where - id = appArrow id - AppArrow a1 . AppArrow a2 = AppArrow $ (.) <$> a1 <*> a2 - -instance (Applicative app, Arrow arr) => Arrow (AppArrow app arr) where - arr = appArrow . arr - first (AppArrow a) = AppArrow $ first <$> a - second (AppArrow a) = AppArrow $ second <$> a - AppArrow a1 *** AppArrow a2 = AppArrow $ (***) <$> a1 <*> a2 - -instance (Applicative app, ArrowChoice arr) => ArrowChoice (AppArrow app arr) where - left (AppArrow a) = AppArrow $ left <$> a - right (AppArrow a) = AppArrow $ right <$> a - AppArrow a1 +++ AppArrow a2 = AppArrow $ (+++) <$> a1 <*> a2 - AppArrow a1 ||| AppArrow a2 = AppArrow $ (|||) <$> a1 <*> a2 - -instance (Applicative f, Arrow arr) => Functor (AppArrow f arr t) where - fmap f = (>>> arr f) - -instance (Applicative app, Arrow arr) => Applicative (AppArrow app arr t) where - pure = arr . const - a <*> b = a &&& b >>> arr (uncurry ($)) - -appArrow :: (Applicative app) => arr a b -> AppArrow app arr a b -appArrow = AppArrow . pure diff --git a/src/Control/Arrow/Async.hs b/src/Control/Arrow/Async.hs index 4a27473..5cbfb7e 100644 --- a/src/Control/Arrow/Async.hs +++ b/src/Control/Arrow/Async.hs @@ -2,6 +2,9 @@ {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE UndecidableInstances #-} +{-# LANGUAGE DerivingVia #-} +{-# LANGUAGE StandaloneDeriving #-} + -- | Asynchronous arrows over monads with MonadBaseControl IO, using -- lifted-async. module Control.Arrow.Async where @@ -14,10 +17,26 @@ import Control.Exception.Safe (Exception, MonadCatch) import qualified Control.Exception.Safe import Control.Monad.Trans.Class (MonadTrans, lift) import Control.Monad.Trans.Control (MonadBaseControl) +import qualified Data.Profunctor as P +import qualified Data.Profunctor.Mapping as P +import qualified Data.Profunctor.Traversing as P import Prelude hiding (id, (.)) newtype AsyncA m a b = AsyncA { runAsyncA :: a -> m b } +deriving via P.WrappedArrow (AsyncA m) + instance (MonadBaseControl IO m) => P.Profunctor (AsyncA m) +deriving via P.WrappedArrow (AsyncA m) + instance (MonadBaseControl IO m) => P.Strong (AsyncA m) +deriving via P.WrappedArrow (AsyncA m) + instance (MonadBaseControl IO m) => P.Choice (AsyncA m) +instance (MonadBaseControl IO m) => P.Traversing (AsyncA m) where + traverse' (AsyncA f) = AsyncA $ mapConcurrently f +instance (MonadBaseControl IO m) => P.Closed (AsyncA m) where + closed = P.closedMapping +instance (MonadBaseControl IO m) => P.Mapping (AsyncA m) where + map' = P.traverseMapping + instance Monad m => Category (AsyncA m) where id = AsyncA return (AsyncA f) . (AsyncA g) = AsyncA (\b -> g b >>= f) diff --git a/src/Control/Arrow/Free.hs b/src/Control/Arrow/Free.hs index 149ab13..23094ec 100644 --- a/src/Control/Arrow/Free.hs +++ b/src/Control/Arrow/Free.hs @@ -12,6 +12,7 @@ {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE TypeOperators #-} +{-# LANGUAGE DerivingVia #-} -- | Various varieties of free arrow constructions. -- @@ -37,26 +38,27 @@ module Control.Arrow.Free , mapA , mapSeqA , filterA + , hoistErrorChoiceEff + , expandErrorChoiceEff , type (~>) ) where +import Prelude (Functor (..)) + import Control.Arrow -import Control.Arrow.AppArrow import Control.Category import Control.Exception.Safe (Exception, MonadCatch) import qualified Control.Exception.Safe -import Control.Monad.Trans.Reader -import Control.Monad.Trans.Writer -import qualified Control.Monad.Trans.Writer.Strict as SW -import Data.Bool (Bool) -import Data.Constraint (Constraint, Dict (..), mapDict, weaken1, - weaken2) -import Data.Either (Either (..)) -import Data.Function (const, flip, ($)) -import Data.List (uncons) -import Data.Maybe (maybe) -import Data.Monoid (Monoid) -import Data.Tuple (uncurry) +import Data.Bool (Bool) +import Data.Constraint (Constraint, Dict (..)) +import Data.Either (Either (..)) +import Data.Function (const, flip, ($)) +import Data.List (uncons) +import Data.Maybe (maybe) +import qualified Data.Profunctor as P +import qualified Data.Profunctor.Cayley as P +import qualified Data.Profunctor.Traversing as P +import Data.Tuple (uncurry) -- | A natural transformation on type constructors of two arguments. type x ~> y = forall a b. x a b -> y a b @@ -77,10 +79,10 @@ class FreeArrowLike fal where -- | Annoying hackery to let us tuple constraints and still use 'effect' -- and 'eval' -class Join ( a :: k -> Constraint) (b :: k -> Constraint) (x :: k) where - ctx :: Dict (a x, b x) -instance (a x, b x) => Join a b x where - ctx = Dict +class Join (a :: k -> Constraint) (b :: k -> Constraint) (c :: k -> Constraint) (x :: k) where + ctx :: (Dict (a x), Dict (b x), Dict (c x)) +instance (a x, b x, c x) => Join a b c x where + ctx = (Dict, Dict, Dict) -------------------------------------------------------------------------------- -- Arrow @@ -167,17 +169,8 @@ instance FreeArrowLike Choice where class ArrowError ex a where try :: a e c -> a e (Either ex c) -instance (ArrowError ex arr) => ArrowError ex (AppArrow (Reader r) arr) where - try (AppArrow act) = AppArrow $ reader $ \r -> - try $ runReader act r - -instance (ArrowError ex arr, Monoid w) => ArrowError ex (AppArrow (Writer w) arr) where - try (AppArrow act) = AppArrow $ writer (try a, w) - where (a, w) = runWriter act - -instance (ArrowError ex arr, Monoid w) => ArrowError ex (AppArrow (SW.Writer w) arr) where - try (AppArrow act) = AppArrow $ SW.writer (try a, w) - where (a, w) = SW.runWriter act +instance (ArrowError ex arr, Functor f) => ArrowError ex (P.Cayley f arr) where + try (P.Cayley f) = P.Cayley $ fmap try f catch :: (ArrowError ex a, ArrowChoice a) => a e c -> a (e, ex) c -> a e c catch a onExc = proc e -> do @@ -192,11 +185,27 @@ instance (Arrow (Kleisli m), Exception ex, MonadCatch m) => ArrowError ex (Kleisli m) where try (Kleisli a) = Kleisli $ Control.Exception.Safe.try . a --- | Freely generated arrows with both choice and error handling. +-- | Freely generated arrows with both choice, error handling and ability to +-- traverse structures with Traversals. newtype ErrorChoice ex eff a b = ErrorChoice { - runErrorChoice :: forall ac. (ArrowChoice ac, ArrowError ex ac) + runErrorChoice :: forall ac. (ArrowChoice ac, ArrowError ex ac, P.Traversing ac) => (eff ~> ac) -> ac a b } + deriving (P.Profunctor, P.Strong, P.Choice) via P.WrappedArrow (ErrorChoice ex eff) + +hoistErrorChoiceEff + :: (eff ~> eff') + -> ErrorChoice ex eff a b + -> ErrorChoice ex eff' a b +hoistErrorChoiceEff f (ErrorChoice ec) = ErrorChoice $ \interp -> + ec (interp . f) + +expandErrorChoiceEff + :: (forall x y. eff x y -> ErrorChoice ex eff' x y) + -> ErrorChoice ex eff a b + -> ErrorChoice ex eff' a b +expandErrorChoiceEff f (ErrorChoice ec) = ErrorChoice $ \interp -> + ec (\eff -> runErrorChoice (f eff) interp) instance Category (ErrorChoice ex eff) where id = ErrorChoice $ const id @@ -216,19 +225,22 @@ instance ArrowChoice (ErrorChoice ex eff) where instance ArrowError ex (ErrorChoice ex eff) where try (ErrorChoice a) = ErrorChoice $ \f -> try $ a f +instance P.Traversing (ErrorChoice ex eff) where + traverse' (ErrorChoice a) = ErrorChoice $ \f -> P.traverse' (a f) + wander trav (ErrorChoice a) = ErrorChoice $ \f -> P.wander trav (a f) + instance FreeArrowLike (ErrorChoice ex) where - type Ctx (ErrorChoice ex) = Join ArrowChoice (ArrowError ex) + type Ctx (ErrorChoice ex) = Join ArrowChoice (ArrowError ex) P.Traversing effect :: eff a b -> ErrorChoice ex eff a b effect a = ErrorChoice $ \f -> f a - eval :: forall eff arr a0 b0. (Join ArrowChoice (ArrowError ex) arr) + eval :: forall eff arr a0 b0. (Join ArrowChoice (ArrowError ex) P.Traversing arr) => (eff ~> arr) -> ErrorChoice ex eff a0 b0 -> arr a0 b0 - eval f a = case (\x -> (mapDict weaken1 x, mapDict weaken2 x)) ctx of - ( Dict :: Dict (ArrowChoice arr) - , Dict :: Dict (ArrowError ex arr) - ) -> runErrorChoice a f + eval f a = case ctx of + ( Dict :: Dict (ArrowChoice arr), Dict :: Dict (ArrowError ex arr), Dict :: Dict (P.Traversing arr) ) + -> runErrorChoice a f -------------------------------------------------------------------------------- diff --git a/src/Control/Funflow.hs b/src/Control/Funflow.hs index c2d90ed..b255cc7 100644 --- a/src/Control/Funflow.hs +++ b/src/Control/Funflow.hs @@ -1,4 +1,6 @@ {-# LANGUAGE TypeOperators #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE GADTs #-} -- | Central Funflow module. -- @@ -9,13 +11,14 @@ module Control.Funflow , type (Base.==>) , Base.NoEffect , Base.Flow'(..) - , Base.Cacher(..) , Base.ExternalProperties(..) , Base.MDWriter , Base.Properties(..) , Base.EpPurity(..) , Base.alwaysRecompile - , Base.defaultCacherWithIdent + , CS.CacherM(..) + , CS.Cacher + , CS.defaultCacherWithIdent , Cache.defaultCacher , Cache.defaultCacherLoc -- * Defines our primitive flow functions @@ -24,16 +27,45 @@ module Control.Funflow , Class.stepIO , Class.wrap , CS.withStore + , hoistFlowEff + , expandFlowEff , module Control.Funflow.Steps , module Control.Funflow.Exec.Simple , module Control.Funflow.External ) where -import qualified Control.Funflow.Base as Base +import Control.Arrow.Free +import Control.Funflow.Base as Base import qualified Control.Funflow.Cache.TH as Cache import qualified Control.Funflow.Class as Class -import qualified Control.Funflow.ContentStore as CS import Control.Funflow.Exec.Simple import Control.Funflow.External import Control.Funflow.Steps +import qualified Data.CAS.ContentStore as CS + +-- | Change the effect type present inside the 'Flow' +hoistFlowEff :: (forall x y. eff x y -> eff' x y) -> Flow eff ex a b -> Flow eff' ex a b +hoistFlowEff f = hoistErrorChoiceEff $ \e -> case e of + Step p s -> Step p s + StepIO p s -> StepIO p s + External p s -> External p s + PutInStore s -> PutInStore s + GetFromStore s -> GetFromStore s + InternalManipulateStore s -> InternalManipulateStore s + Wrapped p s -> Wrapped p (f s) + +-- | Turns each effect in the 'Flow' as a 'Flow' of some other effect +expandFlowEff :: (forall x y. Properties x y -> eff x y -> Flow eff' ex x y) + -> Flow eff ex a b + -> Flow eff' ex a b +expandFlowEff f = expandErrorChoiceEff $ \e -> case e of + Step p s -> Class.step' p s + StepIO p s -> Class.stepIO' p s + External p s -> Class.external' p s + PutInStore s -> Class.putInStore s + GetFromStore s -> Class.getFromStore s + InternalManipulateStore s -> Class.internalManipulateStore s + Wrapped p s -> f p s +-- hoistFlowEff and expandFlowEff are here and not in Control.Funflow.Base to +-- avoid a dependency cycle diff --git a/src/Control/Funflow/Base.hs b/src/Control/Funflow/Base.hs index 5402784..b26f269 100644 --- a/src/Control/Funflow/Base.hs +++ b/src/Control/Funflow/Base.hs @@ -16,16 +16,14 @@ module Control.Funflow.Base where import Control.Arrow (Kleisli (..)) import Control.Arrow.Free import Control.Exception.Safe (SomeException) -import Control.Funflow.ContentHashable -import qualified Control.Funflow.ContentStore as CS import Control.Funflow.Diagram import Control.Funflow.External import Data.ByteString (ByteString) +import Data.CAS.ContentHashable +import Data.CAS.ContentStore as CS import Data.Default -import Data.Functor.Identity import Data.Int (Int64) import Data.Proxy (Proxy (..)) -import qualified Data.Store as Store import qualified Data.Text as T import Path import Prelude hiding (id, (.)) @@ -34,30 +32,6 @@ import System.Random (randomIO) -- | Metadata writer type MDWriter i o = Maybe (i -> o -> [(T.Text, ByteString)]) --- | A cacher is responsible for controlling how steps are cached. -data Cacher i o = - NoCache -- ^ This step cannot be cached (default). - | Cache - { -- | Function to encode the input into a content - -- hash. - -- This function additionally takes an - -- 'identities' which gets incorporated into - -- the cacher. - cacherKey :: Int -> i -> ContentHash - , cacherStoreValue :: o -> ByteString - -- | Attempt to read the cache value back. May throw exceptions. - , cacherReadValue :: ByteString -> o - } - -defaultCacherWithIdent :: (Store.Store o, ContentHashable Identity i) - => Int -- ^ Seed for the cacher - -> Cacher i o -defaultCacherWithIdent ident = Cache - { cacherKey = \i ident' -> runIdentity $ contentHash (ident', ident, i) - , cacherStoreValue = Store.encode - , cacherReadValue = Store.decodeEx - } - data Properties i o = Properties { -- | Name of this step. Used when describing the step in diagrams -- or other reporting. diff --git a/src/Control/Funflow/Cache/TH.hs b/src/Control/Funflow/Cache/TH.hs index 6850995..72bd82c 100644 --- a/src/Control/Funflow/Cache/TH.hs +++ b/src/Control/Funflow/Cache/TH.hs @@ -8,7 +8,7 @@ -- | Template Haskell splices for the funflow cache. module Control.Funflow.Cache.TH where -import Control.Funflow.Base +import Data.CAS.ContentStore import Data.Hashable import Language.Haskell.TH.Syntax import System.Random diff --git a/src/Control/Funflow/Class.hs b/src/Control/Funflow/Class.hs index ec121ba..8d501c6 100644 --- a/src/Control/Funflow/Class.hs +++ b/src/Control/Funflow/Class.hs @@ -16,12 +16,12 @@ module Control.Funflow.Class where import Control.Arrow -import Control.Arrow.AppArrow import Control.Arrow.Free import qualified Control.Funflow.Base as Base -import Control.Funflow.ContentHashable -import qualified Control.Funflow.ContentStore as CS import Control.Funflow.External +import Data.CAS.ContentHashable +import qualified Data.CAS.ContentStore as CS +import qualified Data.Profunctor.Cayley as P import Data.Default (def) import Path @@ -67,14 +67,15 @@ wrap :: ArrowFlow eff ex arr => eff a b -> arr a b wrap = wrap' def instance ( Applicative app - , ArrowError ex (AppArrow app (arr eff ex)) + , ArrowError ex (arr eff ex) , ArrowFlow eff ex (arr eff ex) ) - => ArrowFlow eff ex (AppArrow app (arr eff ex)) where - step' props f = appArrow $ step' props f - stepIO' props f = appArrow $ stepIO' props f - external f = appArrow $ external f - external' props f = appArrow $ external' props f - wrap' props eff = appArrow $ wrap' props eff - putInStore f = appArrow $ putInStore f - getFromStore f = appArrow $ getFromStore f - internalManipulateStore f = appArrow $ internalManipulateStore f + => ArrowFlow eff ex (P.Cayley app (arr eff ex)) where + step' props f = P.Cayley . pure $ step' props f + stepIO' props f = P.Cayley . pure $ stepIO' props f + external f = P.Cayley . pure $ external f + external' props f = P.Cayley . pure $ external' props f + wrap' props eff = P.Cayley . pure $ wrap' props eff + putInStore f = P.Cayley . pure $ putInStore f + getFromStore f = P.Cayley . pure $ getFromStore f + internalManipulateStore f = P.Cayley . pure $ internalManipulateStore f + diff --git a/src/Control/Funflow/ContentHashable.hs b/src/Control/Funflow/ContentHashable.hs deleted file mode 100644 index 53a6aca..0000000 --- a/src/Control/Funflow/ContentHashable.hs +++ /dev/null @@ -1,549 +0,0 @@ -{-# LANGUAGE DefaultSignatures #-} -{-# LANGUAGE DeriveGeneric #-} -{-# LANGUAGE FlexibleContexts #-} -{-# LANGUAGE FlexibleInstances #-} -{-# LANGUAGE LambdaCase #-} -{-# LANGUAGE MagicHash #-} -{-# LANGUAGE MultiParamTypeClasses #-} -{-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE TypeApplications #-} -{-# LANGUAGE TypeOperators #-} -{-# LANGUAGE TypeSynonymInstances #-} -{-# LANGUAGE UnboxedTuples #-} - --- | 'ContentHashable' provides a hashing function suitable for use in the --- Funflow content store. --- --- This behaves as does a normal hashing function on Haskell types. However, --- on path types, this instead calculates a hash based on the contents of the --- file or directory referenced. --- --- We also export the 'ExternallyAssuredFile' and 'ExternallyAssuredDirectory' --- types. These instead use the path, file size and modification time to control --- the hash. -module Control.Funflow.ContentHashable - ( ContentHash - , toBytes - , fromBytes - , ContentHashable (..) - , contentHashUpdate_binaryFile - , contentHashUpdate_byteArray# - , contentHashUpdate_fingerprint - , contentHashUpdate_primitive - , contentHashUpdate_storable - - , FileContent (..) - , DirectoryContent (..) - - , ExternallyAssuredFile(..) - , ExternallyAssuredDirectory(..) - - , encodeHash - , decodeHash - , hashToPath - , pathToHash - - , SHA256 - , Context - , Digest - ) where - - -import Control.Exception.Safe (catchJust) -import Control.Funflow.Orphans () -import Control.Monad (foldM, mzero, (>=>)) -import Control.Monad.IO.Class (MonadIO, liftIO) -import Crypto.Hash (Context, Digest, SHA256, - digestFromByteString, - hashFinalize, hashInit, - hashUpdate) -import qualified Data.Aeson as Aeson -import qualified Data.Aeson.Types as Aeson -import Data.Bits (shiftL) -import Data.ByteArray (Bytes, MemView (MemView), - allocAndFreeze, convert) -import Data.ByteArray.Encoding (Base (Base16), - convertFromBase, - convertToBase) -import qualified Data.ByteString as BS -import Data.ByteString.Builder.Extra (defaultChunkSize) -import qualified Data.ByteString.Char8 as C8 -import qualified Data.ByteString.Lazy as BSL -import Data.Foldable (foldlM) -import Data.Functor.Contravariant -import qualified Data.Hashable -import qualified Data.HashMap.Lazy as HashMap -import qualified Data.HashSet as HashSet -import Data.Int -import Data.List (sort) -import Data.List.NonEmpty (NonEmpty) -import Data.Map (Map) -import qualified Data.Map as Map -import Data.Ratio -import Data.Scientific -import Data.Store (Store (..), peekException) -import qualified Data.Text as T -import qualified Data.Text.Array as TA -import qualified Data.Text.Encoding as TE -import qualified Data.Text.Internal as T -import qualified Data.Text.Lazy as TL -import Data.Time.Clock (UTCTime) -import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds) -import Data.Typeable -import qualified Data.Vector as V -import Data.Word -import qualified Database.SQLite.Simple.FromField as SQL -import qualified Database.SQLite.Simple.ToField as SQL -import Foreign.Marshal.Utils (with) -import Foreign.Ptr (castPtr) -import Foreign.Storable (Storable, sizeOf) -import GHC.Fingerprint -import GHC.Generics -import GHC.Integer.GMP.Internals (BigNat (..), Integer (..)) -import GHC.Natural (Natural (..)) -import GHC.Prim (ByteArray#, - copyByteArrayToAddr#, - sizeofByteArray#) -import GHC.Ptr (Ptr (Ptr)) -import GHC.Types (IO (IO), Int (I#), Word (W#)) -import qualified Path -import qualified Path.Internal -import qualified Path.IO -import System.IO (IOMode (ReadMode), - withBinaryFile) -import System.IO.Error (isPermissionError) -import System.IO.Unsafe (unsafePerformIO) -import System.Posix.Files (fileSize, getFileStatus) - - -newtype ContentHash = ContentHash { unContentHash :: Digest SHA256 } - deriving (Eq, Ord, Generic) - -instance Aeson.FromJSON ContentHash where - parseJSON (Aeson.String s) - | Just h <- decodeHash (TE.encodeUtf8 s) = pure h - | otherwise = fail "Invalid hash encoding" - parseJSON invalid - = Aeson.typeMismatch "ContentHash" invalid -instance Aeson.ToJSON ContentHash where - toJSON = Aeson.String . TE.decodeUtf8 . encodeHash - -instance Data.Hashable.Hashable ContentHash where - hashWithSalt s = Data.Hashable.hashWithSalt s . encodeHash - -instance Show ContentHash where - showsPrec d h = showParen (d > app_prec) - $ showString "ContentHash \"" - . (showString $ C8.unpack $ encodeHash h) - . showString "\"" - where app_prec = 10 - -instance Store ContentHash where - size = contramap toBytes size - peek = fromBytes <$> peek >>= \case - Nothing -> peekException "Store ContentHash: Illegal digest" - Just x -> return x - poke = poke . toBytes - -instance SQL.FromField ContentHash where - fromField f = do - bs <- SQL.fromField f - case decodeHash bs of - Just h -> pure h - Nothing -> mzero - -instance SQL.ToField ContentHash where - toField = SQL.toField . encodeHash - -toBytes :: ContentHash -> BS.ByteString -toBytes = convert . unContentHash - -fromBytes :: BS.ByteString -> Maybe ContentHash -fromBytes bs = ContentHash <$> digestFromByteString bs - -hashEncoding :: Base -hashEncoding = Base16 - --- | File path appropriate encoding of a hash -encodeHash :: ContentHash -> BS.ByteString -encodeHash = convertToBase hashEncoding . toBytes - --- | Inverse of 'encodeHash' if given a valid input. --- --- prop> decodeHash (encodeHash x) = Just x -decodeHash :: BS.ByteString -> Maybe ContentHash -decodeHash bs = case convertFromBase hashEncoding bs of - Left _ -> Nothing - Right x -> fromBytes x - --- | File path appropriate encoding of a hash -hashToPath :: ContentHash -> Path.Path Path.Rel Path.Dir -hashToPath h = - case Path.parseRelDir $ C8.unpack $ encodeHash h of - Nothing -> error - "[ContentHashable.hashToPath] \ - \Failed to convert hash to directory name" - Just dir -> dir - - --- | Inverse of 'hashToPath' if given a valid input. --- --- prop> pathToHash (hashToPath x) = Just x -pathToHash :: FilePath -> Maybe ContentHash -pathToHash = decodeHash . C8.pack - - -class Monad m => ContentHashable m a where - - -- | Update a hash context based on the given value. - -- - -- See 'Crypto.Hash.hashUpdate'. - -- - -- XXX: Consider swapping the arguments. - contentHashUpdate :: Context SHA256 -> a -> m (Context SHA256) - - default contentHashUpdate :: (Generic a, GContentHashable m (Rep a)) - => Context SHA256 -> a -> m (Context SHA256) - contentHashUpdate ctx a = gContentHashUpdate ctx (from a) - - -- | Generate hash of the given value. - -- - -- See 'Crypto.Hash.hash'. - contentHash :: a -> m ContentHash - contentHash x = ContentHash . hashFinalize <$> contentHashUpdate hashInit x - - --- | Update hash context based on binary in memory representation due to 'Foreign.Storable.Storable'. --- --- XXX: Do we need to worry about endianness? -contentHashUpdate_storable :: (Monad m, Storable a) => Context SHA256 -> a -> m (Context SHA256) -contentHashUpdate_storable ctx a = - return . unsafePerformIO $ with a (\p -> pure $! hashUpdate ctx (MemView (castPtr p) (sizeOf a))) - --- | Update hash context based on a type's 'GHC.Fingerprint.Type.Fingerprint'. --- --- The fingerprint is constructed from the library-name, module-name, and name of the type itself. -contentHashUpdate_fingerprint :: (Monad m, Typeable a) => Context SHA256 -> a -> m (Context SHA256) -contentHashUpdate_fingerprint ctx = contentHashUpdate ctx . typeRepFingerprint . typeOf - --- | Update hash context by combining 'contentHashUpdate_fingerprint' and 'contentHashUpdate_storable'. --- Intended for primitive types like 'Int'. -contentHashUpdate_primitive :: (Monad m, Typeable a, Storable a) => Context SHA256 -> a -> m (Context SHA256) -contentHashUpdate_primitive ctx a = - flip contentHashUpdate_fingerprint a >=> flip contentHashUpdate_storable a $ ctx - --- | Update hash context based on binary contents of the given file. -contentHashUpdate_binaryFile :: Context SHA256 -> FilePath -> IO (Context SHA256) -contentHashUpdate_binaryFile ctx0 fp = withBinaryFile fp ReadMode $ \h -> - let go ctx = do - chunk <- BS.hGetSome h defaultChunkSize - if BS.null chunk then - pure ctx - else - go $! hashUpdate ctx chunk - in go ctx0 - --- | Update hash context based on 'GHC.Prim.ByteArray#' --- by copying into a newly allocated 'Data.ByteArray.Bytes' --- and updating the hash context from there. --- --- XXX: @'GHC.Prim.byteArrayContents#' :: 'GHC.Prim.ByteArray#' -> 'GHC.Prim.Addr#'@ --- could be used together with 'Data.ByteArray.MemView' instead. --- However, 'GHC.Prim.byteArrayContents#' explicitly says, that it is only safe to use --- on a pinned 'GHC.Prim.ByteArray#'. -contentHashUpdate_byteArray# :: ByteArray# -> Int -> Int -> Context SHA256 -> Context SHA256 -contentHashUpdate_byteArray# ba (I# off) (I# len) ctx = hashUpdate ctx $ - allocAndFreeze @Bytes (I# len) $ \(Ptr addr) -> IO $ \s -> - (# copyByteArrayToAddr# ba off addr len s, () #) - --- | Update hash context based on the contents of a strict 'Data.Text.Text'. -contentHashUpdate_text :: Context SHA256 -> T.Text -> Context SHA256 -contentHashUpdate_text ctx (T.Text arr off_ len_) = - contentHashUpdate_byteArray# (TA.aBA arr) off len ctx - where - off = off_ `shiftL` 1 -- convert from 'Word16' to 'Word8' - len = len_ `shiftL` 1 -- convert from 'Word16' to 'Word8' - -instance Monad m => ContentHashable m Fingerprint where - contentHashUpdate ctx (Fingerprint a b) = flip contentHashUpdate_storable a >=> flip contentHashUpdate_storable b $ ctx - -instance Monad m => ContentHashable m Bool where contentHashUpdate = contentHashUpdate_primitive - -instance Monad m => ContentHashable m Char where contentHashUpdate = contentHashUpdate_primitive - -instance Monad m => ContentHashable m Int where contentHashUpdate = contentHashUpdate_primitive -instance Monad m => ContentHashable m Int8 where contentHashUpdate = contentHashUpdate_primitive -instance Monad m => ContentHashable m Int16 where contentHashUpdate = contentHashUpdate_primitive -instance Monad m => ContentHashable m Int32 where contentHashUpdate = contentHashUpdate_primitive -instance Monad m => ContentHashable m Int64 where contentHashUpdate = contentHashUpdate_primitive - -instance Monad m => ContentHashable m Word where contentHashUpdate = contentHashUpdate_primitive -instance Monad m => ContentHashable m Word8 where contentHashUpdate = contentHashUpdate_primitive -instance Monad m => ContentHashable m Word16 where contentHashUpdate = contentHashUpdate_primitive -instance Monad m => ContentHashable m Word32 where contentHashUpdate = contentHashUpdate_primitive -instance Monad m => ContentHashable m Word64 where contentHashUpdate = contentHashUpdate_primitive - -instance Monad m => ContentHashable m Float where contentHashUpdate = contentHashUpdate_primitive -instance Monad m => ContentHashable m Double where contentHashUpdate = contentHashUpdate_primitive - -instance (ContentHashable m n, Typeable n) => ContentHashable m (Ratio n) where - contentHashUpdate ctx x = - flip contentHashUpdate_fingerprint x - >=> flip contentHashUpdate (numerator x) - >=> flip contentHashUpdate (denominator x) - $ ctx - -instance Monad m => ContentHashable m Scientific where - contentHashUpdate ctx x = - flip contentHashUpdate_fingerprint x - >=> flip contentHashUpdate (toRational x) - $ ctx - -instance Monad m => ContentHashable m Integer where - contentHashUpdate ctx n = ($ ctx) $ - flip contentHashUpdate_fingerprint n >=> case n of - S# i -> - pure . flip hashUpdate (C8.pack "S") -- tag constructur - >=> flip contentHashUpdate_storable (I# i) -- hash field - Jp# (BN# ba) -> - pure . flip hashUpdate (C8.pack "L") -- tag constructur - >=> pure . contentHashUpdate_byteArray# ba 0 (I# (sizeofByteArray# ba)) -- hash field - Jn# (BN# ba) -> - pure . flip hashUpdate (C8.pack "N") -- tag constructur - >=> pure . contentHashUpdate_byteArray# ba 0 (I# (sizeofByteArray# ba)) -- hash field - -instance Monad m => ContentHashable m Natural where - contentHashUpdate ctx n = ($ ctx) $ - flip contentHashUpdate_fingerprint n >=> case n of - NatS# w -> - pure . flip hashUpdate (C8.pack "S") -- tag constructur - >=> flip contentHashUpdate_storable (W# w) -- hash field - NatJ# (BN# ba) -> - pure . flip hashUpdate (C8.pack "L") -- tag constructur - >=> pure . contentHashUpdate_byteArray# ba 0 (I# (sizeofByteArray# ba)) -- hash field - -instance Monad m => ContentHashable m BS.ByteString where - contentHashUpdate ctx s = - flip contentHashUpdate_fingerprint s - >=> pure . flip hashUpdate s $ ctx - -instance Monad m => ContentHashable m BSL.ByteString where - contentHashUpdate ctx s = - flip contentHashUpdate_fingerprint s - >=> pure . flip (BSL.foldlChunks hashUpdate) s $ ctx - -instance Monad m => ContentHashable m T.Text where - contentHashUpdate ctx s = - flip contentHashUpdate_fingerprint s - >=> pure . flip contentHashUpdate_text s $ ctx - -instance Monad m => ContentHashable m TL.Text where - contentHashUpdate ctx s = - flip contentHashUpdate_fingerprint s - >=> pure . flip (TL.foldlChunks contentHashUpdate_text) s $ ctx - -instance (Typeable k, Typeable v, ContentHashable m k, ContentHashable m v) - => ContentHashable m (Map k v) where - contentHashUpdate ctx m = - flip contentHashUpdate_fingerprint m - >=> flip contentHashUpdate (Map.toList m) $ ctx - -instance (Typeable k, Typeable v, ContentHashable m k, ContentHashable m v) - => ContentHashable m (HashMap.HashMap k v) where - contentHashUpdate ctx m = - flip contentHashUpdate_fingerprint m - -- XXX: The order of the list is unspecified. - >=> flip contentHashUpdate (HashMap.toList m) $ ctx - -instance (Typeable v, ContentHashable m v) - => ContentHashable m (HashSet.HashSet v) where - contentHashUpdate ctx s = - flip contentHashUpdate_fingerprint s - -- XXX: The order of the list is unspecified. - >=> flip contentHashUpdate (HashSet.toList s) $ ctx - -instance (Typeable a, ContentHashable m a) - => ContentHashable m [a] where - contentHashUpdate ctx l = - flip contentHashUpdate_fingerprint l - >=> flip (foldM contentHashUpdate) l $ ctx - -instance (Typeable a, ContentHashable m a) - => ContentHashable m (NonEmpty a) where - contentHashUpdate ctx l = - flip contentHashUpdate_fingerprint l - >=> flip (foldlM contentHashUpdate) l $ ctx - -instance (Typeable a, ContentHashable m a) - => ContentHashable m (V.Vector a) where - contentHashUpdate ctx v = - flip contentHashUpdate_fingerprint v - >=> flip (V.foldM' contentHashUpdate) v $ ctx - -instance Monad m => ContentHashable m () -instance (ContentHashable m a, ContentHashable m b) => ContentHashable m (a, b) -instance (ContentHashable m a, ContentHashable m b, ContentHashable m c) => ContentHashable m (a, b, c) -instance (ContentHashable m a, ContentHashable m b, ContentHashable m c, ContentHashable m d) => ContentHashable m (a, b, c, d) -instance (ContentHashable m a, ContentHashable m b, ContentHashable m c, ContentHashable m d, ContentHashable m e) => ContentHashable m (a, b, c, d, e) -instance (Monad m, ContentHashable m a, ContentHashable m b, ContentHashable m c, ContentHashable m d, ContentHashable m e, ContentHashable m f) => ContentHashable m (a, b, c, d, e, f) -instance (Monad m, ContentHashable m a, ContentHashable m b, ContentHashable m c, ContentHashable m d, ContentHashable m e, ContentHashable m f, ContentHashable m g) => ContentHashable m (a, b, c, d, e, f, g) - -instance ContentHashable m a => ContentHashable m (Maybe a) - -instance (ContentHashable m a, ContentHashable m b) => ContentHashable m (Either a b) - -instance Monad m => ContentHashable m Aeson.Value - - -class Monad m => GContentHashable m f where - gContentHashUpdate :: Context SHA256 -> f a -> m (Context SHA256) - -instance Monad m => GContentHashable m V1 where - gContentHashUpdate ctx _ = pure ctx - -instance Monad m => GContentHashable m U1 where - gContentHashUpdate ctx U1 = pure ctx - -instance ContentHashable m c => GContentHashable m (K1 i c) where - gContentHashUpdate ctx x = contentHashUpdate ctx (unK1 x) - -instance (Constructor c, GContentHashable m f) => GContentHashable m (C1 c f) where - gContentHashUpdate ctx0 x = gContentHashUpdate nameCtx (unM1 x) - where nameCtx = hashUpdate ctx0 $ C8.pack (conName x) - -instance (Datatype d, GContentHashable m f) => GContentHashable m (D1 d f) where - gContentHashUpdate ctx0 x = gContentHashUpdate packageCtx (unM1 x) - where - datatypeCtx = hashUpdate ctx0 $ C8.pack (datatypeName x) - moduleCtx = hashUpdate datatypeCtx $ C8.pack (datatypeName x) - packageCtx = hashUpdate moduleCtx $ C8.pack (datatypeName x) - -instance GContentHashable m f => GContentHashable m (S1 s f) where - gContentHashUpdate ctx x = gContentHashUpdate ctx (unM1 x) - -instance (GContentHashable m a, GContentHashable m b) => GContentHashable m (a :*: b) where - gContentHashUpdate ctx (x :*: y) = gContentHashUpdate ctx x >>= flip gContentHashUpdate y - -instance (GContentHashable m a, GContentHashable m b) => GContentHashable m (a :+: b) where - gContentHashUpdate ctx (L1 x) = gContentHashUpdate ctx x - gContentHashUpdate ctx (R1 x) = gContentHashUpdate ctx x - --- XXX: Do we need this? --- instance GContentHashable (a :.: b) where --- gContentHashUpdate ctx x = _ (unComp1 x) - - -instance (Monad m, Typeable b, Typeable t) => ContentHashable m (Path.Path b t) where - contentHashUpdate ctx p@(Path.Internal.Path fp) = - flip contentHashUpdate_fingerprint p - >=> flip contentHashUpdate fp - $ ctx - - --- | Path to a regular file --- --- Only the file's content and its executable permission is taken into account --- when generating the content hash. The path itself is ignored. -newtype FileContent = FileContent (Path.Path Path.Abs Path.File) - -instance ContentHashable IO FileContent where - - contentHashUpdate ctx (FileContent fp) = do - exec <- Path.IO.executable <$> Path.IO.getPermissions fp - ctx' <- if exec then contentHashUpdate ctx () else pure ctx - contentHashUpdate_binaryFile ctx' (Path.fromAbsFile fp) - --- | Path to a directory --- --- Only the contents of the directory and their path relative to the directory --- are taken into account when generating the content hash. --- The path to the directory is ignored. -newtype DirectoryContent = DirectoryContent (Path.Path Path.Abs Path.Dir) - -instance MonadIO m => ContentHashable m DirectoryContent where - - contentHashUpdate ctx0 (DirectoryContent dir0) = liftIO $ do - (dirs, files) <- Path.IO.listDir dir0 - ctx' <- foldM hashFile ctx0 (sort files) - foldM hashDir ctx' (sort dirs) - where - hashFile ctx fp = - -- XXX: Do we need to treat symbolic links specially? - flip contentHashUpdate (Path.filename fp) - >=> flip contentHashUpdate (FileContent fp) - $ ctx - hashDir ctx dir = - flip contentHashUpdate (Path.dirname dir) - >=> flip contentHashUpdate (DirectoryContent dir) - $ ctx - -instance Monad m => ContentHashable m UTCTime where - contentHashUpdate ctx utcTime = let - secondsSinceEpoch = fromEnum . utcTimeToPOSIXSeconds $ utcTime - in flip contentHashUpdate_fingerprint utcTime - >=> flip contentHashUpdate secondsSinceEpoch - $ ctx - --- | Path to a file to be treated as _externally assured_. --- --- An externally assured file is handled in a somewhat 'cheating' way by --- funflow. The 'ContentHashable' instance for such assumes that some external --- agent guarantees the integrity of the file being referenced. Thus, rather --- than hashing the file contents, we only consider its (absolute) path, size and --- modification time, which can be rapidly looked up from filesystem metadata. --- --- For a similar approach, see the instance for 'ObjectInBucket' in --- Control.Funflow.AWS.S3, where we exploit the fact that S3 is already --- content hashed to avoid performing any hashing. -newtype ExternallyAssuredFile = ExternallyAssuredFile (Path.Path Path.Abs Path.File) - deriving (Generic, Show) - -instance Aeson.FromJSON ExternallyAssuredFile -instance Aeson.ToJSON ExternallyAssuredFile -instance Store ExternallyAssuredFile - -instance ContentHashable IO ExternallyAssuredFile where - contentHashUpdate ctx (ExternallyAssuredFile fp) = do - modTime <- Path.IO.getModificationTime fp - fSize <- fileSize <$> getFileStatus (Path.toFilePath fp) - flip contentHashUpdate fp - >=> flip contentHashUpdate modTime - >=> flip contentHashUpdate_storable fSize - $ ctx - - --- | Path to a directory to be treated as _externally assured_. --- --- For an externally assured directory, we _do_ traverse its contents and verify --- those as we would externally assured files, rather than just relying on the --- directory path. Doing this traversal is pretty cheap, and it's quite likely --- for directory contents to be modified without modifying the contents. --- --- If an item in the directory cannot be read due to lacking permissions, --- then it will be ignored and not included in the hash. If the flow does not --- have permissions to access the contents of a subdirectory, then these --- contents cannot influence the outcome of a task and it is okay to exclude --- them from the hash. In that case we only hash the name, as that could --- influence the outcome of a task. -newtype ExternallyAssuredDirectory = ExternallyAssuredDirectory (Path.Path Path.Abs Path.Dir) - deriving (Generic, Show) - -instance Aeson.FromJSON ExternallyAssuredDirectory -instance Aeson.ToJSON ExternallyAssuredDirectory -instance Store ExternallyAssuredDirectory - -instance ContentHashable IO ExternallyAssuredDirectory where - contentHashUpdate ctx0 (ExternallyAssuredDirectory dir0) = do - -- Note that we don't bother looking at the relative directory paths and - -- including these in the hash. This is because the absolute hash gets - -- included every time we hash a file. - (dirs, files) <- Path.IO.listDir dir0 - ctx' <- foldM hashFile ctx0 (sort files) - foldM hashDir ctx' (sort dirs) - where - hashFile ctx fp = contentHashUpdate ctx (ExternallyAssuredFile fp) - `catchPermissionError` \_ -> contentHashUpdate ctx fp - hashDir ctx dir = contentHashUpdate ctx (ExternallyAssuredDirectory dir) - `catchPermissionError` \_ -> contentHashUpdate ctx dir - catchPermissionError = catchJust $ \e -> - if isPermissionError e then Just e else Nothing diff --git a/src/Control/Funflow/ContentStore.hs b/src/Control/Funflow/ContentStore.hs deleted file mode 100644 index b784f29..0000000 --- a/src/Control/Funflow/ContentStore.hs +++ /dev/null @@ -1,1032 +0,0 @@ -{-# LANGUAGE DeriveGeneric #-} -{-# LANGUAGE FlexibleContexts #-} -{-# LANGUAGE FlexibleInstances #-} -{-# LANGUAGE GADTs #-} -{-# LANGUAGE GeneralizedNewtypeDeriving #-} -{-# LANGUAGE LambdaCase #-} -{-# LANGUAGE MultiParamTypeClasses #-} -{-# LANGUAGE NamedFieldPuns #-} -{-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE PatternSynonyms #-} -{-# LANGUAGE QuasiQuotes #-} -{-# LANGUAGE RecordWildCards #-} -{-# LANGUAGE ScopedTypeVariables #-} -{-# LANGUAGE StandaloneDeriving #-} - --- | Hash addressed store in file system. --- --- Associates a key ('Control.Funflow.ContentHashable.ContentHash') --- with an item in the store. An item can either be --- 'Control.Funflow.ContentStore.Missing', --- 'Control.Funflow.ContentStore.Pending', or --- 'Control.Funflow.ContentStore.Complete'. --- The state is persisted in the file system. --- --- Items are stored under a path derived from their hash. Therefore, --- there can be no two copies of the same item in the store. --- If two keys are associated with the same item, then there will be --- only one copy of that item in the store. --- --- The store is thread-safe and multi-process safe. --- --- It is assumed that the user that the process is running under is the owner --- of the store root, or has permission to create it if missing. --- --- It is assumed that the store root and its immediate contents are not modified --- externally. The contents of pending items may be modified externally. --- --- __Implementation notes:__ --- --- The hash of an item can only be determined once it is completed. --- If that hash already exists in the store, then the new item is discarded. --- --- Store state is persisted in the file-system: --- --- * Pending items are stored writable under the path @pending-\<key>@. --- * Complete items are stored read-only under the path @item-\<hash>@, --- with a link under @complete-\<key>@ pointing to that directory. -module Control.Funflow.ContentStore - ( - -- * Open/Close - withStore - , open - , close - - -- * List Contents - , listAll - , listPending - , listComplete - , listItems - - -- * Query/Lookup - , query - , isMissing - , isPending - , isComplete - , lookup - , lookupOrWait - , waitUntilComplete - - -- * Construct Items - , constructOrAsync - , constructOrWait - , constructIfMissing - , withConstructIfMissing - , markPending - , markComplete - - -- * Remove Contents - , removeFailed - , removeForcibly - , removeItemForcibly - - -- * Aliases - , assignAlias - , lookupAlias - , removeAlias - , listAliases - - -- * Metadata - , getBackReferences - , setInputs - , getInputs - , setMetadata - , getMetadata - , createMetadataFile - , getMetadataFile - - -- * Accessors - , itemHash - , itemPath - , itemRelPath - , contentPath - , contentItem - , contentFilename - , root - - -- * Types - , ContentStore - , Item - , Content (..) - , (^</>) - , Alias (..) - , Status (..) - , Status_ - , Update (..) - , StoreError (..) - ) where - - -import Prelude hiding (lookup) - -import Control.Arrow (second) -import Control.Concurrent (threadDelay) -import Control.Concurrent.Async -import Control.Concurrent.MVar -import Control.Exception.Safe (Exception, MonadMask, - bracket, bracketOnError, - bracket_, - displayException, throwIO) -import Control.Funflow.ContentStore.Notify -import Control.Funflow.Orphans () -import Control.Lens -import Control.Monad (forM_, forever, unless, - void, when, (<=<), (>=>)) -import Control.Monad.IO.Class (MonadIO, liftIO) -import Control.Monad.Trans.Control (MonadBaseControl) -import Crypto.Hash (hashUpdate) -import Data.Aeson (FromJSON, ToJSON) -import Data.Bits (complement) -import qualified Data.ByteString.Char8 as C8 -import Data.Foldable (asum) -import qualified Data.Hashable -import Data.List (foldl', stripPrefix) -import Data.Maybe (fromMaybe, listToMaybe) -import Data.Monoid ((<>)) -import qualified Data.Store -import Data.String (IsString (..)) -import qualified Data.Text as T -import Data.Typeable (Typeable) -import Data.Void -import qualified Database.SQLite.Simple as SQL -import qualified Database.SQLite.Simple.FromField as SQL -import qualified Database.SQLite.Simple.ToField as SQL -import GHC.Generics (Generic) -import Path -import Path.IO -import System.Directory (removePathForcibly) -import System.FilePath (dropTrailingPathSeparator) -import System.IO (Handle, IOMode (..), - openFile) -import System.Posix.Files -import System.Posix.Types - -import Control.Funflow.ContentHashable (ContentHash, - ContentHashable (..), - DirectoryContent (..), - contentHashUpdate_fingerprint, - encodeHash, pathToHash, - toBytes) -import Control.Funflow.Lock -import qualified Control.Funflow.RemoteCache as Remote - - --- | Status of an item in the store. -data Status missing pending complete - = Missing missing - -- ^ The item does not exist, yet. - | Pending pending - -- ^ The item is under construction and not ready for consumption. - | Complete complete - -- ^ The item is complete and ready for consumption. - deriving (Eq, Show) - -type Status_ = Status () () () - --- | Update about the status of a pending item. -data Update - = Completed Item - -- ^ The item is now completed and ready for consumption. - | Failed - -- ^ Constructing the item failed. - deriving (Eq, Show) - --- | Errors that can occur when interacting with the store. -data StoreError - = NotPending ContentHash - -- ^ An item is not under construction when it should be. - | AlreadyPending ContentHash - -- ^ An item is already under construction when it should be missing. - | AlreadyComplete ContentHash - -- ^ An item is already complete when it shouldn't be. - | CorruptedLink ContentHash FilePath - -- ^ The link under the given hash points to an invalid path. - | FailedToConstruct ContentHash - -- ^ A failure occurred while waiting for the item to be constructed. - | IncompatibleStoreVersion (Path Abs Dir) Int Int - -- ^ @IncompatibleStoreVersion storeDir actual expected@ - -- The given store has a version number that is incompatible. - | MalformedMetadataEntry ContentHash SQL.SQLData - -- ^ @MalformedMetadataEntry hash key@ - -- The metadata entry for the give @hash@, @key@ pair is malformed. - deriving (Show, Typeable) -instance Exception StoreError where - displayException = \case - NotPending hash -> - "The following input hash is not pending '" - ++ C8.unpack (encodeHash hash) - ++ "'." - AlreadyPending hash -> - "The following input hash is already pending '" - ++ C8.unpack (encodeHash hash) - ++ "'." - AlreadyComplete hash -> - "The following input hash is already completed '" - ++ C8.unpack (encodeHash hash) - ++ "'." - CorruptedLink hash fp -> - "The completed input hash '" - ++ C8.unpack (encodeHash hash) - ++ "' points to an invalid store item '" - ++ fp - ++ "'." - FailedToConstruct hash -> - "Failed to construct the input hash '" - ++ C8.unpack (encodeHash hash) - ++ "'." - IncompatibleStoreVersion storeDir actual expected -> - "The store in '" - ++ fromAbsDir storeDir - ++ "' has version " - ++ show actual - ++ ". This software expects version " - ++ show expected - ++ ". No automatic migration is available, \ - \please use a fresh store location." - MalformedMetadataEntry hash key -> - "The metadtaa entry for hash '" - ++ C8.unpack (encodeHash hash) - ++ "' under key '" - ++ show key - ++ "' is malformed." - --- | A hash addressed store on the file system. -data ContentStore = ContentStore - { storeRoot :: Path Abs Dir - -- ^ Root directory of the content store. - -- The process must be able to create this directory if missing, - -- change permissions, and create files and directories within. - , storeLock :: Lock - -- ^ Write lock on store metadata to ensure multi thread and process safety. - -- The lock is taken when item state is changed or queried. - , storeNotifier :: Notifier - -- ^ Used to watch for updates on store items. - , storeDb :: SQL.Connection - -- ^ Connection to the metadata SQLite database. - } - --- | A completed item in the 'ContentStore'. -data Item = Item { itemHash :: ContentHash } - deriving (Eq, Ord, Show, Generic) - -instance Monad m => ContentHashable m Item where - contentHashUpdate ctx item = - flip contentHashUpdate_fingerprint item - >=> pure . flip hashUpdate (toBytes $ itemHash item) - $ ctx - -instance FromJSON Item -instance ToJSON Item -instance Data.Hashable.Hashable Item -instance Data.Store.Store Item - --- | File or directory within a content store 'Item'. -data Content t where - All :: Item -> Content Dir - (:</>) :: Item -> Path Rel t -> Content t -infixr 5 :</> -deriving instance Eq (Content t) -deriving instance Show (Content t) -instance Monad m => ContentHashable m (Content Dir) where - contentHashUpdate ctx x = case x of - All i -> - flip contentHashUpdate_fingerprint x - >=> flip contentHashUpdate i - $ ctx - i :</> p -> - flip contentHashUpdate_fingerprint x - >=> flip contentHashUpdate i - >=> flip contentHashUpdate p - $ ctx -instance Monad m => ContentHashable m (Content File) where - contentHashUpdate ctx x = case x of - i :</> p -> - flip contentHashUpdate_fingerprint x - >=> flip contentHashUpdate i - >=> flip contentHashUpdate p - $ ctx - --- | Append to the path within a store item. -(^</>) :: Content Dir -> Path Rel t -> Content t -All item ^</> path = item :</> path -(item :</> dir) ^</> path = item :</> dir </> path -infixl 4 ^</> - -newtype Alias = Alias { unAlias :: T.Text } - deriving (ContentHashable IO, Eq, Ord, Show, SQL.FromField, SQL.ToField, Data.Store.Store) - --- | The root directory of the store. -root :: ContentStore -> Path Abs Dir -root = storeRoot - --- | The scoped path to a content item within the store. -itemRelPath :: Item -> Path Rel Dir -itemRelPath (Item x) = prefixHashPath itemPrefix x - --- | The store path of a completed item. -itemPath :: ContentStore -> Item -> Path Abs Dir -itemPath store = mkItemPath store . itemHash - --- | Store item containing the given content. -contentItem :: Content t -> Item -contentItem (All i) = i -contentItem (i :</> _) = i - -contentFilename :: Content File -> Path Rel File -contentFilename (_ :</> relPath) = filename relPath - --- | The absolute path to content within the store. -contentPath :: ContentStore -> Content t -> Path Abs t -contentPath store (All item) = itemPath store item -contentPath store (item :</> dir) = itemPath store item </> dir - --- | @open root@ opens a store under the given root directory. --- --- The root directory is created if necessary. --- --- It is not safe to have multiple store objects --- refer to the same root directory. -open :: Path Abs Dir -> IO ContentStore -open storeRoot = do - createDirIfMissing True storeRoot - storeLock <- openLock (lockPath storeRoot) - withLock storeLock $ withWritableStoreRoot storeRoot $ do - storeDb <- SQL.open (fromAbsFile $ dbPath storeRoot) - initDb storeRoot storeDb - createDirIfMissing True (metadataPath storeRoot) - storeNotifier <- initNotifier - return ContentStore {..} - --- | Free the resources associated with the given store object. --- --- The store object may not be used afterwards. -close :: ContentStore -> IO () -close store = do - closeLock (storeLock store) - killNotifier (storeNotifier store) - SQL.close (storeDb store) - --- | Open the store under the given root and perform the given action. --- Closes the store once the action is complete --- --- See also: 'Control.Funflow.ContentStore.open' -withStore :: (MonadIO m, MonadMask m) - => Path Abs Dir -> (ContentStore -> m a) -> m a -withStore root' = bracket (liftIO $ open root') (liftIO . close) - --- | List all elements in the store --- @(pending keys, completed keys, completed items)@. -listAll :: MonadIO m => ContentStore -> m ([ContentHash], [ContentHash], [Item]) -listAll ContentStore {storeRoot} = liftIO $ - foldr go ([], [], []) . fst <$> listDir storeRoot - where - go d prev@(builds, outs, items) = fromMaybe prev $ asum - [ parsePending d >>= \x -> Just (x:builds, outs, items) - , parseComplete d >>= \x -> Just (builds, x:outs, items) - , parseItem d >>= \x -> Just (builds, outs, x:items) - ] - parsePending :: Path Abs Dir -> Maybe ContentHash - parsePending = pathToHash <=< stripPrefix pendingPrefix . extractDir - parseComplete :: Path Abs Dir -> Maybe ContentHash - parseComplete = pathToHash <=< stripPrefix completePrefix . extractDir - parseItem :: Path Abs Dir -> Maybe Item - parseItem = fmap Item . pathToHash <=< stripPrefix itemPrefix . extractDir - extractDir :: Path Abs Dir -> FilePath - extractDir = dropTrailingPathSeparator . fromRelDir . dirname - --- | List all pending keys in the store. -listPending :: MonadIO m => ContentStore -> m [ContentHash] -listPending = fmap (^._1) . listAll - --- | List all completed keys in the store. -listComplete :: MonadIO m => ContentStore -> m [ContentHash] -listComplete = fmap (^._2) . listAll - --- | List all completed items in the store. -listItems :: MonadIO m => ContentStore -> m [Item] -listItems = fmap (^._3) . listAll - --- | Query the state of the item under the given key. -query :: MonadIO m => ContentStore -> ContentHash -> m (Status () () ()) -query store hash = liftIO . withStoreLock store $ - internalQuery store hash >>= pure . \case - Missing _ -> Missing () - Pending _ -> Pending () - Complete _ -> Complete () - --- | Check if there is no complete or pending item under the given key. -isMissing :: MonadIO m => ContentStore -> ContentHash -> m Bool -isMissing store hash = (== Missing ()) <$> query store hash - --- | Check if there is a pending item under the given key. -isPending :: MonadIO m => ContentStore -> ContentHash -> m Bool -isPending store hash = (== Pending ()) <$> query store hash - --- | Check if there is a completed item under the given key. -isComplete :: MonadIO m => ContentStore -> ContentHash -> m Bool -isComplete store hash = (== Complete ()) <$> query store hash - --- | Query the state under the given key and return the item if completed. --- Doesn't block if the item is pending. -lookup :: MonadIO m => ContentStore -> ContentHash -> m (Status () () Item) -lookup store hash = liftIO . withStoreLock store $ - internalQuery store hash >>= \case - Missing () -> return $ Missing () - Pending _ -> return $ Pending () - Complete item -> return $ Complete item - --- | Query the state under the given key and return the item if completed. --- Return an 'Control.Concurrent.Async' to await an update, if pending. -lookupOrWait - :: MonadIO m - => ContentStore - -> ContentHash - -> m (Status () (Async Update) Item) -lookupOrWait store hash = liftIO . withStoreLock store $ - internalQuery store hash >>= \case - Complete item -> return $ Complete item - Missing () -> return $ Missing () - Pending _ -> Pending <$> internalWatchPending store hash - --- | Query the state under the given key and return the item once completed. --- Blocks if the item is pending. --- Returns 'Nothing' if the item is missing, or failed to be completed. -waitUntilComplete :: MonadIO m => ContentStore -> ContentHash -> m (Maybe Item) -waitUntilComplete store hash = lookupOrWait store hash >>= \case - Complete item -> return $ Just item - Missing () -> return Nothing - Pending a -> liftIO (wait a) >>= \case - Completed item -> return $ Just item - Failed -> return Nothing - --- | Atomically query the state under the given key and mark pending if missing. --- --- Returns @'Complete' item@ if the item is complete. --- Returns @'Pending' async@ if the item is pending, where @async@ is an --- 'Control.Concurrent.Async' to await updates on. --- Returns @'Missing' buildDir@ if the item was missing, and is now pending. --- It should be constructed in the given @buildDir@, --- and then marked as complete using 'markComplete'. -constructOrAsync - :: forall m remoteCache. - (MonadIO m, MonadBaseControl IO m, MonadMask m, Remote.Cacher m remoteCache) - => ContentStore - -> remoteCache - -> ContentHash - -> m (Status (Path Abs Dir) (Async Update) Item) -constructOrAsync store cacher hash = - constructIfMissing store cacher hash >>= \case - Complete item -> return $ Complete item - Missing path -> return $ Missing path - Pending _ -> Pending <$> liftIO (internalWatchPending store hash) - --- | Atomically query the state under the given key and mark pending if missing. --- Wait for the item to be completed, if already pending. --- Throws a 'FailedToConstruct' error if construction fails. --- --- Returns @'Complete' item@ if the item is complete. --- Returns @'Missing' buildDir@ if the item was missing, and is now pending. --- It should be constructed in the given @buildDir@, --- and then marked as complete using 'markComplete'. -constructOrWait - :: (MonadIO m, MonadMask m, MonadBaseControl IO m, Remote.Cacher m remoteCache) - => ContentStore - -> remoteCache - -> ContentHash - -> m (Status (Path Abs Dir) Void Item) -constructOrWait store cacher hash = constructOrAsync store cacher hash >>= \case - Pending a -> liftIO (wait a) >>= \case - Completed item -> return $ Complete item - -- XXX: Consider extending 'Status' with a 'Failed' constructor. - -- If the store contains metadata as well, it could keep track of the - -- number of failed attempts and further details about the failure. - -- If an external task is responsible for the failure, the client could - -- choose to resubmit a certain number of times. - Failed -> liftIO . throwIO $ FailedToConstruct hash - Complete item -> return $ Complete item - Missing dir -> return $ Missing dir - --- | Atomically query the state under the given key and mark pending if missing. -constructIfMissing - :: (MonadIO m, MonadBaseControl IO m, MonadMask m, Remote.Cacher m remoteCache) - => ContentStore - -> remoteCache - -> ContentHash - -> m (Status (Path Abs Dir) () Item) -constructIfMissing store cacher hash = withStoreLock store $ - internalQuery store hash >>= \case - Complete item -> return $ Complete item - Missing () -> withWritableStore store $ do - let destDir :: Path Abs Dir = mkItemPath store hash - Remote.pull cacher hash destDir >>= \case - Remote.PullOK () -> return $ Complete (Item hash) - Remote.NotInCache -> - Missing <$> liftIO (internalMarkPending store hash) - Remote.PullError _ -> - Missing <$> liftIO (internalMarkPending store hash) - Pending _ -> return $ Pending () - --- | Atomically query the state under the given key and mark pending if missing. --- Execute the given function to construct the item, mark as complete on success --- and remove on failure. Forcibly removes if an uncaught exception occurs --- during item construction. -withConstructIfMissing - :: (MonadIO m, MonadBaseControl IO m, MonadMask m, Remote.Cacher m remoteCache) - => ContentStore - -> remoteCache - -> ContentHash - -> (Path Abs Dir -> m (Either e a)) - -> m (Status e () (Maybe a, Item)) -withConstructIfMissing store cacher hash f = - bracketOnError - (constructIfMissing store cacher hash) - (\case - Missing _ -> removeForcibly store hash - _ -> return ()) - (\case - Pending () -> return (Pending ()) - Complete item -> return (Complete (Nothing, item)) - Missing fp -> f fp >>= \case - Left e -> do - removeFailed store hash - return (Missing e) - Right x -> do - item <- markComplete store hash - _ <- Remote.push cacher (itemHash item) (Just hash) (itemPath store item) - return (Complete (Just x, item))) - --- | Mark a non-existent item as pending. --- --- Creates the build directory and returns its path. --- --- See also: 'Control.Funflow.ContentStore.constructIfMissing'. -markPending :: MonadIO m => ContentStore -> ContentHash -> m (Path Abs Dir) -markPending store hash = liftIO . withStoreLock store $ - internalQuery store hash >>= \case - Complete _ -> throwIO (AlreadyComplete hash) - Pending _ -> throwIO (AlreadyPending hash) - Missing () -> withWritableStore store $ - internalMarkPending store hash - --- | Mark a pending item as complete. -markComplete :: MonadIO m => ContentStore -> ContentHash -> m Item -markComplete store inHash = liftIO . withStoreLock store $ - internalQuery store inHash >>= \case - Missing () -> throwIO (NotPending inHash) - Complete _ -> throwIO (AlreadyComplete inHash) - Pending build -> withWritableStore store $ liftIO $ do - do - let metadataDir = mkMetadataDirPath store inHash - exists <- doesDirExist metadataDir - when exists $ - unsetWritableRecursively metadataDir - -- XXX: Hashing large data can take some time, - -- could we avoid locking the store for all that time? - outHash <- contentHash (DirectoryContent build) - let out = mkItemPath store outHash - link' = mkCompletePath store inHash - doesDirExist out >>= \case - True -> removePathForcibly (fromAbsDir build) - False -> do - renameDir build out - unsetWritableRecursively out - rel <- makeRelative (parent link') out - let from' = dropTrailingPathSeparator $ fromAbsDir link' - to' = dropTrailingPathSeparator $ fromRelDir rel - createSymbolicLink to' from' - addBackReference store inHash (Item outHash) - pure $! Item outHash - --- | Remove a pending item. --- --- It is the callers responsibility to ensure that no other threads or processes --- will attempt to access the item's contents afterwards. -removeFailed :: MonadIO m => ContentStore -> ContentHash -> m () -removeFailed store hash = liftIO . withStoreLock store $ - internalQuery store hash >>= \case - Missing () -> throwIO (NotPending hash) - Complete _ -> throwIO (AlreadyComplete hash) - Pending build -> withWritableStore store $ - removePathForcibly (fromAbsDir build) - --- | Remove a key association independent of the corresponding item state. --- Do nothing if no item exists under the given key. --- --- It is the callers responsibility to ensure that no other threads or processes --- will attempt to access the contents afterwards. --- --- Note, this will leave an orphan item behind if no other keys point to it. --- There is no garbage collection mechanism in place at the moment. -removeForcibly :: MonadIO m => ContentStore -> ContentHash -> m () -removeForcibly store hash = liftIO . withStoreLock store $ withWritableStore store $ - internalQuery store hash >>= \case - Missing () -> pure () - Pending build -> liftIO $ removePathForcibly (fromAbsDir build) - Complete _out -> liftIO $ - removePathForcibly $ - dropTrailingPathSeparator $ fromAbsDir $ mkCompletePath store hash - -- XXX: This will leave orphan store items behind. - -- Add GC in some form. - --- | Remove a completed item in the store. --- Do nothing if not completed. --- --- It is the callers responsibility to ensure that no other threads or processes --- will attempt to access the contents afterwards. --- --- Note, this will leave keys pointing to that item dangling. --- There is no garbage collection mechanism in place at the moment. -removeItemForcibly :: MonadIO m => ContentStore -> Item -> m () -removeItemForcibly store item = liftIO . withStoreLock store $ withWritableStore store $ - removePathForcibly (fromAbsDir $ itemPath store item) - -- XXX: Remove dangling links. - -- Add back-references in some form. - --- | Link the given alias to the given item. --- If the alias existed before it is overwritten. -assignAlias :: MonadIO m => ContentStore -> Alias -> Item -> m () -assignAlias store alias item = - liftIO . withStoreLock store $ withWritableStore store $ do - hash <- contentHash alias - SQL.executeNamed (storeDb store) - "INSERT OR REPLACE INTO\ - \ aliases\ - \ VALUES\ - \ (:hash, :dest, :name)" - [ ":hash" SQL.:= hash - , ":dest" SQL.:= itemHash item - , ":name" SQL.:= alias - ] - --- | Lookup an item under the given alias. --- Returns 'Nothing' if the alias does not exist. -lookupAlias :: MonadIO m => ContentStore -> Alias -> m (Maybe Item) -lookupAlias store alias = - liftIO . withStoreLock store $ do - hash <- contentHash alias - r <- SQL.queryNamed (storeDb store) - "SELECT dest FROM aliases\ - \ WHERE\ - \ hash = :hash" - [ ":hash" SQL.:= hash ] - pure $! listToMaybe $ Item . SQL.fromOnly <$> r - --- | Remove the given alias. -removeAlias :: MonadIO m => ContentStore -> Alias -> m () -removeAlias store alias = - liftIO . withStoreLock store $ withWritableStore store $ do - hash <- contentHash alias - SQL.executeNamed (storeDb store) - "DELETE FROM aliases\ - \ WHERE\ - \ hash = :hash" - [ ":hash" SQL.:= hash ] - --- | List all aliases and the respective items. -listAliases :: MonadIO m => ContentStore -> m [(Alias, Item)] -listAliases store = liftIO . withStoreLock store $ - fmap (map (second Item)) $ - SQL.query_ (storeDb store) - "SELECT name, dest FROM aliases" - --- | Get all hashes that resulted in the given item. -getBackReferences :: MonadIO m => ContentStore -> Item -> m [ContentHash] -getBackReferences store (Item outHash) = liftIO . withStoreLock store $ - map SQL.fromOnly <$> SQL.queryNamed (storeDb store) - "SELECT hash FROM backrefs\ - \ WHERE\ - \ dest = :out" - [ ":out" SQL.:= outHash ] - --- | Define the input items to a subtree. -setInputs :: MonadIO m => ContentStore -> ContentHash -> [Item] -> m () -setInputs store hash items = liftIO $ - withStoreLock store $ - withWritableStore store $ - internalQuery store hash >>= \case - Pending _ -> forM_ items $ \(Item input) -> - SQL.executeNamed (storeDb store) - "INSERT OR REPLACE INTO\ - \ inputs (hash, input)\ - \ VALUES\ - \ (:hash, :input)" - [ ":hash" SQL.:= hash - , ":input" SQL.:= input - ] - _ -> throwIO $ NotPending hash - --- | Get the input items to a subtree if any were defined. -getInputs :: MonadIO m => ContentStore -> ContentHash -> m [Item] -getInputs store hash = liftIO . withStoreLock store $ - map (Item . SQL.fromOnly) <$> SQL.queryNamed (storeDb store) - "SELECT input FROM inputs\ - \ WHERE\ - \ hash = :hash" - [ ":hash" SQL.:= hash ] - --- | Set a metadata entry on an item. -setMetadata :: (SQL.ToField k, SQL.ToField v, MonadIO m ) - => ContentStore -> ContentHash -> k -> v -> m () -setMetadata store hash k v = liftIO $ - withStoreLock store $ - withWritableStore store $ - SQL.executeNamed (storeDb store) - "INSERT OR REPLACE INTO\ - \ metadata (hash, key, value)\ - \ VALUES\ - \ (:hash, :key, :value)" - [ ":hash" SQL.:= hash - , ":key" SQL.:= k - , ":value" SQL.:= v - ] - --- | Retrieve a metadata entry on an item, or 'Nothing' if missing. -getMetadata :: (SQL.ToField k, SQL.FromField v, MonadIO m) - => ContentStore -> ContentHash -> k -> m (Maybe v) -getMetadata store hash k = liftIO . withStoreLock store $ do - r <- SQL.queryNamed (storeDb store) - "SELECT value FROM metadata\ - \ WHERE\ - \ (hash = :hash AND key = :key)" - [ ":hash" SQL.:= hash - , ":key" SQL.:= k - ] - case r of - [] -> pure Nothing - [[v]] -> pure $ Just v - _ -> throwIO $ MalformedMetadataEntry hash (SQL.toField k) - --- | Create and open a new metadata file on a pending item in write mode. -createMetadataFile - :: MonadIO m - => ContentStore -> ContentHash -> Path Rel File -> m (Path Abs File, Handle) -createMetadataFile store hash file = liftIO . withStoreLock store $ - internalQuery store hash >>= \case - Pending _ -> do - let path = mkMetadataFilePath store hash file - createDirIfMissing True (parent path) - handle <- openFile (fromAbsFile path) WriteMode - pure (path, handle) - _ -> throwIO $ NotPending hash - --- | Return the path to a metadata file if it exists. -getMetadataFile - :: MonadIO m - => ContentStore -> ContentHash -> Path Rel File -> m (Maybe (Path Abs File)) -getMetadataFile store hash file = liftIO . withStoreLock store $ do - let path = mkMetadataFilePath store hash file - exists <- doesFileExist path - if exists then - pure $ Just path - else - pure Nothing - ----------------------------------------------------------------------- --- Internals - -lockPath :: Path Abs Dir -> Path Abs Dir -lockPath = (</> [reldir|lock|]) - -dbPath :: Path Abs Dir -> Path Abs File -dbPath = (</> [relfile|metadata.db|]) - -metadataPath :: Path Abs Dir -> Path Abs Dir -metadataPath = (</> [reldir|metadata|]) - --- | Holds a lock on the global 'MVar' and on the global lock file --- for the duration of the given action. -withStoreLock :: MonadBaseControl IO m => ContentStore -> m a -> m a -withStoreLock store = withLock (storeLock store) - -prefixHashPath :: C8.ByteString -> ContentHash -> Path Rel Dir -prefixHashPath pref hash - | Just dir <- Path.parseRelDir $ C8.unpack $ pref <> encodeHash hash - = dir - | otherwise = error - "[Control.Funflow.ContentStore.prefixHashPath] \ - \Failed to construct hash path." - -pendingPrefix, completePrefix, hashPrefix, itemPrefix :: IsString s => s -pendingPrefix = "pending-" -completePrefix = "complete-" -hashPrefix = "hash-" -itemPrefix = "item-" - --- | Return the full build path for the given input hash. -mkPendingPath :: ContentStore -> ContentHash -> Path Abs Dir -mkPendingPath ContentStore {storeRoot} hash = - storeRoot </> prefixHashPath pendingPrefix hash - --- | Return the full link path for the given input hash. -mkCompletePath :: ContentStore -> ContentHash -> Path Abs Dir -mkCompletePath ContentStore {storeRoot} hash = - storeRoot </> prefixHashPath completePrefix hash - --- | Return the full store path to the given output hash. -mkItemPath :: ContentStore -> ContentHash -> Path Abs Dir -mkItemPath ContentStore {storeRoot} hash = - storeRoot </> prefixHashPath itemPrefix hash - --- | Return the full store path to the given metadata directory. -mkMetadataDirPath :: ContentStore -> ContentHash -> Path Abs Dir -mkMetadataDirPath ContentStore {storeRoot} hash = - metadataPath storeRoot </> prefixHashPath hashPrefix hash - --- | Return the full store path to the given metadata file. -mkMetadataFilePath - :: ContentStore -> ContentHash -> Path Rel File -> Path Abs File -mkMetadataFilePath store hash file = - mkMetadataDirPath store hash </> file - --- | Query the state under the given key without taking a lock. -internalQuery - :: MonadIO m - => ContentStore - -> ContentHash - -> m (Status () (Path Abs Dir) Item) -internalQuery store inHash = liftIO $ do - let build = mkPendingPath store inHash - link' = mkCompletePath store inHash - buildExists <- doesDirExist build - if buildExists then - pure $! Pending build - else do - linkExists <- doesDirExist link' - if linkExists then do - out <- readSymbolicLink - (dropTrailingPathSeparator $ fromAbsDir link') - case pathToHash =<< stripPrefix itemPrefix out of - Nothing -> throwIO $ CorruptedLink inHash out - Just outHash -> return $ Complete (Item outHash) - else - pure $! Missing () - --- | Create the build directory for the given input hash --- and make the metadata directory writable if it exists. -internalMarkPending :: ContentStore -> ContentHash -> IO (Path Abs Dir) -internalMarkPending store hash = do - let dir = mkPendingPath store hash - createDir dir - setDirWritable dir - let metadataDir = mkMetadataDirPath store hash - metadirExists <- doesDirExist metadataDir - when metadirExists $ - setWritableRecursively metadataDir - return dir - --- | Watch the build directory of the pending item under the given key. --- The returned 'Async' completes after the item is completed or failed. -internalWatchPending - :: ContentStore - -> ContentHash - -> IO (Async Update) -internalWatchPending store hash = do - let build = mkPendingPath store hash - -- Add an inotify/kqueue watch and give a signal on relevant events. - let notifier = storeNotifier store - signal <- newEmptyMVar - -- Signal the listener. If the 'MVar' is full, - -- the listener didn't handle earlier signals, yet. - let giveSignal = void $ tryPutMVar signal () - watch <- addDirWatch notifier (fromAbsDir build) giveSignal - -- Additionally, poll on regular intervals. - -- Inotify/Kqueue don't cover all cases, e.g. network filesystems. - ticker <- async $ forever $ threadDelay 3007000 >> giveSignal - let stopWatching = do - cancel ticker - removeDirWatch watch - -- Listen to the signal asynchronously, - -- and query the status when it fires. - -- If the status changed, fill in the update. - update <- newEmptyMVar - let query' = liftIO . withStoreLock store $ internalQuery store hash - loop = takeMVar signal >> query' >>= \case - Pending _ -> loop - Complete item -> tryPutMVar update $ Completed item - Missing () -> tryPutMVar update Failed - void $ async loop - -- Wait for the update asynchronously. - -- Stop watching when it arrives. - async $ takeMVar update <* stopWatching - -setRootDirWritable :: MonadIO m => Path Abs Dir -> m () -setRootDirWritable storeRoot = liftIO $ - setFileMode (fromAbsDir storeRoot) writableRootDirMode - -writableRootDirMode :: FileMode -writableRootDirMode = writableDirMode - -setRootDirReadOnly :: MonadIO m => Path Abs Dir -> m () -setRootDirReadOnly storeRoot = liftIO $ - setFileMode (fromAbsDir storeRoot) readOnlyRootDirMode - -readOnlyRootDirMode :: FileMode -readOnlyRootDirMode = writableDirMode `intersectFileModes` allButWritableMode - -withWritableStoreRoot :: (MonadMask m, MonadIO m) => Path Abs Dir -> m a -> m a -withWritableStoreRoot storeRoot = - bracket_ (setRootDirWritable storeRoot) (setRootDirReadOnly storeRoot) - -withWritableStore :: (MonadMask m, MonadIO m) => ContentStore -> m a -> m a -withWritableStore ContentStore {storeRoot} = - withWritableStoreRoot storeRoot - -setDirWritable :: Path Abs Dir -> IO () -setDirWritable fp = setFileMode (fromAbsDir fp) writableDirMode - -writableDirMode :: FileMode -writableDirMode = foldl' unionFileModes nullFileMode - [ directoryMode, ownerModes - , groupReadMode, groupExecuteMode - , otherReadMode, otherExecuteMode - ] - --- | Set write permissions on the given path. -setWritable :: Path Abs t -> IO () -setWritable fp = do - mode <- fileMode <$> getFileStatus (toFilePath fp) - setFileMode (toFilePath fp) $ mode `unionFileModes` ownerWriteMode - --- | Unset write permissions on the given path. -unsetWritable :: Path Abs t -> IO () -unsetWritable fp = do - mode <- fileMode <$> getFileStatus (toFilePath fp) - setFileMode (toFilePath fp) $ mode `intersectFileModes` allButWritableMode - -allButWritableMode :: FileMode -allButWritableMode = complement $ foldl' unionFileModes nullFileMode - [ownerWriteMode, groupWriteMode, otherWriteMode] - --- | Set write permissions on all items in a directory tree recursively. -setWritableRecursively :: Path Abs Dir -> IO () -setWritableRecursively = walkDir $ \dir _ files -> do - mapM_ setWritable files - setWritable dir - return $ WalkExclude [] - --- | Unset write permissions on all items in a directory tree recursively. -unsetWritableRecursively :: Path Abs Dir -> IO () -unsetWritableRecursively = walkDir $ \dir _ files -> do - mapM_ unsetWritable files - unsetWritable dir - return $ WalkExclude [] - -storeVersion :: Int -storeVersion = 1 - --- | Initialize the database. -initDb :: Path Abs Dir -> SQL.Connection -> IO () -initDb storeDir db = do - [[version]] <- SQL.query_ db "PRAGMA user_version" - if version == 0 then - SQL.execute_ db $ - "PRAGMA user_version = " <> fromString (show storeVersion) - else - unless (version == storeVersion) $ - throwIO $ IncompatibleStoreVersion storeDir version storeVersion - -- Aliases to items. - SQL.execute_ db - "CREATE TABLE IF NOT EXISTS\ - \ aliases\ - \ ( hash TEXT PRIMARY KEY\ - \ , dest TEXT NOT NULL\ - \ , name TEXT NOT NULL\ - \ )" - -- Back-references from items @dest@ to hashes @hash@. - SQL.execute_ db - "CREATE TABLE IF NOT EXISTS\ - \ backrefs\ - \ ( hash TEXT PRIMARY KEY\ - \ , dest TEXT NOT NULL\ - \ )" - -- Inputs @input@ to hashes @hash@. - SQL.execute_ db - "CREATE TABLE IF NOT EXISTS\ - \ inputs\ - \ ( hash TEXT NOT NULL\ - \ , input TEXT NOT NULL\ - \ , UNIQUE (hash, input)\ - \ )" - -- Arbitrary metadata on hashes. - SQL.execute_ db - "CREATE TABLE IF NOT EXISTS\ - \ metadata\ - \ ( hash TEXT NOT NULL\ - \ , key TEXT NOT NULL\ - \ , value TEXT\ - \ , PRIMARY KEY(hash, key)\ - \ )" - --- | Adds a link between input hash and the output hash. --- --- Assumes that the store is locked and writable. -addBackReference :: ContentStore -> ContentHash -> Item -> IO () -addBackReference store inHash (Item outHash) = - SQL.executeNamed (storeDb store) - "INSERT OR REPLACE INTO\ - \ backrefs (hash, dest)\ - \ VALUES\ - \ (:in, :out)" - [ ":in" SQL.:= inHash - , ":out" SQL.:= outHash - ] diff --git a/src/Control/Funflow/ContentStore/Notify.hs b/src/Control/Funflow/ContentStore/Notify.hs deleted file mode 100644 index 53e1676..0000000 --- a/src/Control/Funflow/ContentStore/Notify.hs +++ /dev/null @@ -1,28 +0,0 @@ -{-# LANGUAGE CPP #-} - --- | Generic file change notifier library for unix-based systems. --- --- This library abstracts over specific implementations for BSD and linux --- systems. --- --- It provides facilities to watch specific directories for the following changes: --- - File moves --- - File deletion --- - Attribute changes. -module Control.Funflow.ContentStore.Notify - ( Notifier - , initNotifier - , killNotifier - - , Watch - , addDirWatch - , removeDirWatch - ) where - -#ifdef OS_Linux -import Control.Funflow.ContentStore.Notify.Linux -#else -# ifdef OS_BSD -import Control.Funflow.ContentStore.Notify.BSD -# endif -#endif diff --git a/src/Control/Funflow/ContentStore/Notify/BSD.hs b/src/Control/Funflow/ContentStore/Notify/BSD.hs deleted file mode 100644 index 61aaf7d..0000000 --- a/src/Control/Funflow/ContentStore/Notify/BSD.hs +++ /dev/null @@ -1,78 +0,0 @@ -{-# LANGUAGE LambdaCase #-} -{-# LANGUAGE ScopedTypeVariables #-} - --- | Implementation of filesystem watch for BSD-based systems using KQueue. -module Control.Funflow.ContentStore.Notify.BSD - ( Notifier - , initNotifier - , killNotifier - - , Watch - , addDirWatch - , removeDirWatch - ) where - -import Control.Concurrent -import Control.Concurrent.Async -import Control.Exception.Safe -import Control.Monad -import Data.Typeable -import Foreign.Ptr -import System.KQueue -import System.Posix.IO -import System.Posix.Types - -type Notifier = KQueue - -data NotifierException - = RequiresThreadedRuntime - deriving (Show, Typeable) -instance Exception NotifierException where - displayException = \case - -- XXX: Compile time check? - RequiresThreadedRuntime -> - "Threaded runtime required! Please rebuild with -threaded flag." - -initNotifier :: IO Notifier -initNotifier = do - unless rtsSupportsBoundThreads $ throwIO RequiresThreadedRuntime - kqueue - -killNotifier :: Notifier -> IO () -killNotifier _ = return () - -data Watch = Watch KQueue Fd (Async ()) - -addDirWatch :: Notifier -> FilePath -> IO () -> IO Watch -addDirWatch kq dir f = do - fd <- openFd dir ReadOnly Nothing defaultFileFlags - a <- async $ do - let event = KEvent - { ident = fromIntegral fd - , evfilter = EvfiltVnode - , flags = [EvAdd] - , fflags = [NoteDelete, NoteAttrib, NoteRename, NoteRevoke] - , data_ = 0 - , udata = nullPtr - } - loop = do - chgs <- kevent kq [event] 1 Nothing - unless (null chgs) f - loop - loop `finally` closeFd fd - return $! Watch kq fd a - -removeDirWatch :: Watch -> IO () -removeDirWatch (Watch kq fd a) = do - closeFd fd - let event = KEvent - { ident = fromIntegral fd - , evfilter = EvfiltVnode - , flags = [EvDelete] - , fflags = [] - , data_ = 0 - , udata = nullPtr - } - void (kevent kq [event] 0 Nothing) - `catch` \(_ :: KQueueException) -> return () - cancel a diff --git a/src/Control/Funflow/ContentStore/Notify/Linux.hs b/src/Control/Funflow/ContentStore/Notify/Linux.hs deleted file mode 100644 index 1d0bdf0..0000000 --- a/src/Control/Funflow/ContentStore/Notify/Linux.hs +++ /dev/null @@ -1,60 +0,0 @@ -{-# LANGUAGE CPP #-} -{-# LANGUAGE LambdaCase #-} -{-# LANGUAGE ScopedTypeVariables #-} - --- | Implementation of filesystem watching functionality for linux based on --- inotify. -module Control.Funflow.ContentStore.Notify.Linux - ( Notifier - , initNotifier - , killNotifier - - , Watch - , addDirWatch - , removeDirWatch - ) where - -import Control.Exception.Safe (catch) -#if MIN_VERSION_hinotify(0,3,10) -import qualified Data.ByteString.Char8 as BS -#endif -import System.INotify - -type Notifier = INotify - -initNotifier :: IO Notifier -initNotifier = initINotify - -killNotifier :: Notifier -> IO () -killNotifier = killINotify - -type Watch = WatchDescriptor - -addDirWatch :: Notifier -> FilePath -> IO () -> IO Watch -addDirWatch inotify dir f = addWatch inotify mask dir' $ \case - Attributes True Nothing -> f - MovedSelf True -> f - DeletedSelf -> f - _ -> return () - where - mask = [Attrib, MoveSelf, DeleteSelf, OnlyDir] -#if MIN_VERSION_hinotify(0,3,10) - dir' = BS.pack dir -#else - dir' = dir -#endif - -removeDirWatch :: Watch -> IO () -removeDirWatch w = - -- When calling `addWatch` on a path that is already being watched, - -- inotify will not create a new watch, but amend the existing watch - -- and return the same watch descriptor. - -- Therefore, the watch might already have been removed at this point, - -- which will cause an 'IOError'. - -- Fortunately, all event handlers to a file are called at once. - -- So, that removing the watch here will not cause another handler - -- to miss out on the event. - -- Note, that this may change when adding different event handlers, - -- that remove the watch under different conditions. - removeWatch w - `catch` \(_::IOError) -> return () diff --git a/src/Control/Funflow/Diagram.hs b/src/Control/Funflow/Diagram.hs index 7305c9b..32a47c3 100644 --- a/src/Control/Funflow/Diagram.hs +++ b/src/Control/Funflow/Diagram.hs @@ -3,6 +3,9 @@ {-# LANGUAGE InstanceSigs #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE DerivingVia #-} +{-# LANGUAGE RankNTypes #-} + -- | A Diagram is a representation of an arrow with labels. -- No computation is actually performed by a diagram, it just carries -- around a description. @@ -13,6 +16,8 @@ module Control.Funflow.Diagram where import Control.Arrow import Control.Arrow.Free (ArrowError (..)) import Control.Category +import qualified Data.Profunctor as P +import qualified Data.Profunctor.Traversing as P import Data.Proxy (Proxy (..)) import qualified Data.Text as T import Prelude hiding (id, (.)) @@ -34,6 +39,9 @@ data Diagram ex a b where Par :: Diagram ex a b -> Diagram ex c d -> Diagram ex (a,c) (b,d) Fanin :: Diagram ex a c -> Diagram ex b c -> Diagram ex (Either a b) c Try :: Diagram ex a b -> Diagram ex a (Either ex b) + Traverse :: Diagram ex a b -> Diagram ex (f a) (f b) + Wander :: (forall f. (Applicative f) => (a -> f b) -> s -> f t) -> Diagram ex a b -> Diagram ex s t + deriving (P.Profunctor, P.Strong, P.Choice) via P.WrappedArrow (Diagram ex) instance Category (Diagram ex) where id = Node emptyNodeProperties Proxy Proxy @@ -53,6 +61,10 @@ instance ArrowChoice (Diagram ex) where instance ArrowError ex (Diagram ex) where try = Try +instance P.Traversing (Diagram ex) where + traverse' = Traverse + wander = Wander + -- | Construct a labelled node node :: forall arr a b ex. Arrow arr => arr a b -> [T.Text] -> (Diagram ex) a b node _ lbls = Node props (Proxy :: Proxy a) (Proxy :: Proxy b) diff --git a/src/Control/Funflow/Exec/Simple.hs b/src/Control/Funflow/Exec/Simple.hs index 1d810f7..50a9b71 100644 --- a/src/Control/Funflow/Exec/Simple.hs +++ b/src/Control/Funflow/Exec/Simple.hs @@ -27,25 +27,22 @@ import Control.Concurrent.Async (withAsync) import Control.Exception.Safe (Exception, SomeException, bracket, - onException, throwM, try) import Control.Funflow.Base -import Control.Funflow.ContentHashable -import qualified Control.Funflow.ContentStore as CS import Control.Funflow.External import Control.Funflow.External.Coordinator import Control.Funflow.External.Coordinator.Memory import Control.Funflow.External.Executor (executeLoop) -import qualified Control.Funflow.RemoteCache as Remote +import Control.Lens (Identity (..)) import Control.Monad.Catch (MonadCatch, MonadMask) import Control.Monad.IO.Class import Control.Monad.Trans.Control (MonadBaseControl) -import qualified Data.ByteString as BS +import Data.CAS.ContentHashable +import qualified Data.CAS.ContentStore as CS +import qualified Data.CAS.RemoteCache as Remote import Data.Foldable (traverse_) -import Data.Maybe import Data.Monoid ((<>)) -import Data.Void import Katip import Path import System.IO (stderr) @@ -68,35 +65,17 @@ runFlowEx :: forall m c eff ex a b remoteCache. -> Flow eff ex a b -> a -> m b -runFlowEx _ cfg store cacher runWrapped confIdent flow input = do +runFlowEx _ cfg store remoteCacher runWrapped confIdent flow input = do hook <- initialise cfg runAsyncA (eval (runFlow' hook) flow) input where - simpleOutPath item = toFilePath - $ CS.itemPath store item </> [relfile|out|] - withStoreCache :: forall i o. Cacher i o + withStoreCache :: forall i o. CS.Cacher i o -> AsyncA m i o -> AsyncA m i o - withStoreCache c@Cache{} (AsyncA f) - | Just confIdent' <- confIdent = AsyncA $ \i -> do - let chash = cacherKey c confIdent' i - computeAndStore fp = do - res <- f i -- Do the actual computation - liftIO $ BS.writeFile (toFilePath $ fp </> [relfile|out|]) - . cacherStoreValue c $ res - return $ Right res - readItem item = do - bs <- liftIO . BS.readFile $ simpleOutPath item - return . cacherReadValue c $ bs - CS.withConstructIfMissing store cacher chash computeAndStore >>= \case - CS.Missing e -> absurd e - CS.Pending _ -> - liftIO (CS.waitUntilComplete store chash) >>= \case - Just item -> readItem item - Nothing -> throwM $ CS.FailedToConstruct chash - CS.Complete (Just a, _) -> return a - CS.Complete (_, item) -> readItem item - withStoreCache _ f = f - + withStoreCache c (AsyncA f) = AsyncA $ + let c' = c{CS.cacherKey = \ident i -> return $ runIdentity $ + CS.cacherKey c ident i} + in CS.cacheKleisliIO confIdent c' store remoteCacher f + writeMd :: forall i o. ContentHash -> i -> o @@ -107,14 +86,13 @@ runFlowEx _ cfg store cacher runWrapped confIdent flow input = do let kvs = writer i o in traverse_ (uncurry $ CS.setMetadata store chash) kvs - runFlow' :: Hook c -> Flow' eff a1 b1 -> AsyncA m a1 b1 runFlow' _ (Step props f) = withStoreCache (cache props) . AsyncA $ \x -> do let out = f x case cache props of - Cache key _ _ | Just confIdent' <- confIdent -> - writeMd (key confIdent' x) x out $ mdpolicy props + CS.Cache key _ _ | Just confIdent' <- confIdent -> + writeMd (runIdentity $ key confIdent' x) x out $ mdpolicy props _ -> return () return out runFlow' _ (StepIO props f) = withStoreCache (cache props) @@ -162,24 +140,13 @@ runFlowEx _ cfg store cacher runWrapped confIdent flow input = do mbStdout <- CS.getMetadataFile store chash [relfile|stdout|] mbStderr <- CS.getMetadataFile store chash [relfile|stderr|] throwM $ ExternalTaskFailed td ti mbStdout mbStderr - runFlow' _ (PutInStore f) = AsyncA $ \x -> katipAddNamespace "putInStore" $ do - chash <- liftIO $ contentHash x - CS.constructOrWait store cacher chash >>= \case - CS.Pending void -> absurd void - CS.Complete item -> return item - CS.Missing fp -> - do - liftIO $ f fp x - finalItem <- CS.markComplete store chash - _ <- Remote.push cacher (CS.itemHash finalItem) (Just chash) (CS.itemPath store finalItem) - pure finalItem - `onException` - (do $(logTM) WarningS . ls $ "Exception in construction: removing " <> show chash - CS.removeFailed store chash - ) - runFlow' _ (GetFromStore f) = AsyncA $ \case - CS.All item -> liftIO . f $ CS.itemPath store item - item CS.:</> path -> liftIO . f $ CS.itemPath store item </> path + runFlow' _ (PutInStore f) = AsyncA $ + katipAddNamespace "putInStore" + . CS.putInStore store remoteCacher + (\chash -> $(logTM) WarningS . ls $ "Exception in construction: removing " <> show chash) + (\p -> liftIO . f p) + runFlow' _ (GetFromStore f) = AsyncA $ + liftIO . f . CS.contentPath store runFlow' _ (InternalManipulateStore f) = AsyncA $ \i -> liftIO $ f store i runFlow' _ (Wrapped props w) = withStoreCache (cache props) $ runWrapped w diff --git a/src/Control/Funflow/External.hs b/src/Control/Funflow/External.hs index 4a21297..2b3fbbf 100644 --- a/src/Control/Funflow/External.hs +++ b/src/Control/Funflow/External.hs @@ -9,14 +9,15 @@ -- | Definition of external tasks module Control.Funflow.External where -import Control.Funflow.ContentHashable (ContentHash, ContentHashable, ExternallyAssuredDirectory (..), - ExternallyAssuredFile (..)) -import qualified Control.Funflow.ContentStore as CS import Control.Lens.TH import Data.Aeson (FromJSON, ToJSON) #if __GLASGOW_HASKELL__ < 804 import Data.Semigroup #endif +import Data.CAS.ContentHashable (ContentHash, ContentHashable, ExternallyAssuredDirectory (..), + ExternallyAssuredFile (..)) +import qualified Data.CAS.ContentStore as CS + import Data.Store (Store) import Data.String (IsString (..)) import qualified Data.Text as T diff --git a/src/Control/Funflow/External/Coordinator.hs b/src/Control/Funflow/External/Coordinator.hs index 2771f94..b8c0943 100644 --- a/src/Control/Funflow/External/Coordinator.hs +++ b/src/Control/Funflow/External/Coordinator.hs @@ -16,11 +16,10 @@ module Control.Funflow.External.Coordinator where import Control.Exception.Safe -import Control.Funflow.ContentHashable (ContentHash) import Control.Funflow.External import Control.Lens import Control.Monad.IO.Class (MonadIO, liftIO) - +import Data.CAS.ContentHashable (ContentHash) import Data.Monoid ((<>)) import Data.Store (Store) import Data.Store.TH (makeStore) diff --git a/src/Control/Funflow/External/Coordinator/Memory.hs b/src/Control/Funflow/External/Coordinator/Memory.hs index 8b364cd..451ff5e 100644 --- a/src/Control/Funflow/External/Coordinator/Memory.hs +++ b/src/Control/Funflow/External/Coordinator/Memory.hs @@ -7,12 +7,12 @@ module Control.Funflow.External.Coordinator.Memory where import Control.Concurrent (threadDelay) import Control.Concurrent.STM.TVar -import Control.Funflow.ContentHashable (ContentHash) import Control.Funflow.External import Control.Funflow.External.Coordinator import Control.Lens import Control.Monad.IO.Class (liftIO) import Control.Monad.STM +import Data.CAS.ContentHashable (ContentHash) import Data.List (find) import qualified Data.Map.Strict as M import System.Clock (fromNanoSecs) diff --git a/src/Control/Funflow/External/Coordinator/Redis.hs b/src/Control/Funflow/External/Coordinator/Redis.hs index af4de27..3fabf47 100644 --- a/src/Control/Funflow/External/Coordinator/Redis.hs +++ b/src/Control/Funflow/External/Coordinator/Redis.hs @@ -14,12 +14,12 @@ module Control.Funflow.External.Coordinator.Redis , RedisPreconnected (..) ) where -import qualified Control.Funflow.ContentHashable as CHash import Control.Funflow.External import Control.Funflow.External.Coordinator import Control.Lens import Control.Monad.Except import Control.Monad.Fix (fix) +import qualified Data.CAS.ContentHashable as CHash import Data.Store import qualified Database.Redis as R import GHC.Conc diff --git a/src/Control/Funflow/External/Coordinator/SQLite.hs b/src/Control/Funflow/External/Coordinator/SQLite.hs index ef311dd..3c2e8e6 100644 --- a/src/Control/Funflow/External/Coordinator/SQLite.hs +++ b/src/Control/Funflow/External/Coordinator/SQLite.hs @@ -15,14 +15,14 @@ module Control.Funflow.External.Coordinator.SQLite import Control.Concurrent (threadDelay) import Control.Exception.Safe -import Control.Funflow.ContentHashable import Control.Funflow.External import Control.Funflow.External.Coordinator -import Control.Funflow.Lock import Control.Lens import Control.Monad.IO.Class import qualified Data.Aeson as Json import qualified Data.ByteString.Char8 as C8 +import Data.CAS.ContentHashable +import Data.CAS.Lock import qualified Data.Text as T import Data.Typeable (Typeable) import qualified Database.SQLite.Simple as SQL diff --git a/src/Control/Funflow/External/Docker.hs b/src/Control/Funflow/External/Docker.hs index ecf0edd..810e782 100644 --- a/src/Control/Funflow/External/Docker.hs +++ b/src/Control/Funflow/External/Docker.hs @@ -16,9 +16,9 @@ module Control.Funflow.External.Docker ) where import Control.Arrow (Kleisli (..), second) -import Control.Funflow.ContentHashable import Control.Funflow.External import Control.Monad.Trans.State.Strict +import Data.CAS.ContentHashable import Data.Semigroup (Semigroup, (<>)) import qualified Data.Text as T import GHC.Generics (Generic) diff --git a/src/Control/Funflow/External/Executor.hs b/src/Control/Funflow/External/Executor.hs index 92c2c07..6117d84 100644 --- a/src/Control/Funflow/External/Executor.hs +++ b/src/Control/Funflow/External/Executor.hs @@ -16,16 +16,16 @@ import Control.Concurrent (threadDelay) import Control.Concurrent.Async import Control.Concurrent.MVar import Control.Exception.Safe -import qualified Control.Funflow.ContentStore as CS import Control.Funflow.External import Control.Funflow.External.Coordinator -import qualified Control.Funflow.RemoteCache as Remote import Control.Lens import Control.Monad (forever, mzero, unless) import Control.Monad.Trans (lift) import Control.Monad.Trans.Maybe import qualified Data.Aeson as Json import qualified Data.ByteString as BS +import qualified Data.CAS.ContentStore as CS +import qualified Data.CAS.RemoteCache as Remote import Data.Foldable (for_) import Data.Maybe (isJust, isNothing) import Data.Monoid ((<>)) diff --git a/src/Control/Funflow/Lock.hs b/src/Control/Funflow/Lock.hs deleted file mode 100644 index ca24d26..0000000 --- a/src/Control/Funflow/Lock.hs +++ /dev/null @@ -1,120 +0,0 @@ -{-# LANGUAGE FlexibleContexts #-} -{-# LANGUAGE QuasiQuotes #-} -{-# LANGUAGE ScopedTypeVariables #-} - --- | Thread and process write lock. --- --- Allows synchronisation between threads and processes. --- Uses an 'MVar' for synchronisation between threads --- and fcntl write locks for synchronisation between processes. --- --- Only ever have one 'Lock' object per lock file per process! -module Control.Funflow.Lock - ( Lock - , openLock - , closeLock - , withLock - ) where - -import Control.Concurrent -import Control.Exception.Safe -import Control.Monad (unless) -import Control.Monad.Trans.Control (MonadBaseControl, liftBaseOp_) -import Network.HostName (getHostName) -import Path -import Path.IO -import System.Posix.Files -import System.Posix.IO -import System.Posix.Process -import System.Random - --- | Thread and process write lock. --- --- Only ever have one 'Lock' object per lock file per process! -data Lock = Lock - { lockMVar :: MVar () - , lockDir :: Path Abs Dir - } - --- | Open the lock file and create a lock object. --- --- This does not acquire the lock. --- --- Only ever have one 'Lock' object per lock file per process! -openLock :: Path Abs Dir -> IO Lock -openLock dir = do - mvar <- newMVar () - createDirIfMissing True dir - return $! Lock - { lockMVar = mvar - , lockDir = dir - } - --- | Close the lock file. --- --- Does not release the lock. --- --- Blocks if the lock is taken. -closeLock :: Lock -> IO () -closeLock lock = do - takeMVar (lockMVar lock) - --- | Acquire the lock for the duration of the given action and release after. -withLock :: MonadBaseControl IO m => Lock -> m a -> m a -withLock lock = liftBaseOp_ $ \action -> - withMVar (lockMVar lock) $ \() -> - bracket_ (acquireDirLock $ lockDir lock) (releaseDirLock $ lockDir lock) $ - action - ----------------------------------------------------------------------- --- Internals - --- | Generate unique (per process) filename. --- --- Combines the host name and process ID. -getUniqueFileName :: IO (Path Rel File) -getUniqueFileName = do - hostName <- getHostName - pid <- getProcessID - parseRelFile $ hostName ++ show pid - -lockFileName :: Path Rel File -lockFileName = [relfile|lock|] - --- | Acquire the lock. --- --- Uses an algorithm that is described in the man-page of open (2) in the --- last paragraph to @O_EXCL@ in release 4.14 of the Linux man-pages project. --- --- Creates a file under a unique (per process) filename. --- Attempts to hard-link that file to a common lock path. --- If the operation succeeds, then the lock was acquired. --- If not, but if the link count of the file under the unique filename --- increased to two, then the lock was acquired. --- Otherwise, another process holds the lock and this process waits --- and retries. -acquireDirLock :: Path Abs Dir -> IO () -acquireDirLock dir = do - file <- getUniqueFileName - let path = dir </> file - fd <- createFile (fromAbsFile path) ownerWriteMode - closeFd fd - r <- try $ createLink (fromAbsFile path) (fromAbsFile $ dir </> lockFileName) - case r of - Right () -> return () - Left (_::IOError) -> do - count <- linkCount <$> getFileStatus (fromAbsFile path) - unless (count == 2) $ do - delay <- randomRIO (50000, 100000) - threadDelay delay - acquireDirLock dir - --- | Release the lock. --- --- Unlinks the file under the unique file name and the common lock file. -releaseDirLock :: Path Abs Dir -> IO () -releaseDirLock dir = do - file <- getUniqueFileName - let path = dir </> file - removeLink (fromAbsFile $ dir </> lockFileName) - removeLink (fromAbsFile path) diff --git a/src/Control/Funflow/Orphans.hs b/src/Control/Funflow/Orphans.hs deleted file mode 100644 index 36b7db5..0000000 --- a/src/Control/Funflow/Orphans.hs +++ /dev/null @@ -1,28 +0,0 @@ -{-# OPTIONS_GHC -fno-warn-orphans #-} -{-# LANGUAGE FlexibleInstances #-} - --- | Dedicated module for orphan instances. -module Control.Funflow.Orphans where - -import Data.Functor.Contravariant -import Data.Store (Store) -import qualified Data.Store as Store -import qualified Path as Path -import qualified Path.Internal - -instance Store (Path.Path Path.Abs Path.File) where - size = contramap (\(Path.Internal.Path fp) -> fp) Store.size - peek = Path.Internal.Path <$> Store.peek - poke = Store.poke . (\(Path.Internal.Path fp) -> fp) -instance Store (Path.Path Path.Abs Path.Dir) where - size = contramap (\(Path.Internal.Path fp) -> fp) Store.size - peek = Path.Internal.Path <$> Store.peek - poke = Store.poke . (\(Path.Internal.Path fp) -> fp) -instance Store (Path.Path Path.Rel Path.File) where - size = contramap (\(Path.Internal.Path fp) -> fp) Store.size - peek = Path.Internal.Path <$> Store.peek - poke = Store.poke . (\(Path.Internal.Path fp) -> fp) -instance Store (Path.Path Path.Rel Path.Dir) where - size = contramap (\(Path.Internal.Path fp) -> fp) Store.size - peek = Path.Internal.Path <$> Store.peek - poke = Store.poke . (\(Path.Internal.Path fp) -> fp) diff --git a/src/Control/Funflow/Pretty.hs b/src/Control/Funflow/Pretty.hs index ebeb860..0187453 100644 --- a/src/Control/Funflow/Pretty.hs +++ b/src/Control/Funflow/Pretty.hs @@ -19,6 +19,8 @@ ppFlow = ppDiagram . toDiagram where ppDiagram (Par f g) = parens $ ppDiagram f <+> text "***" <+> ppDiagram g ppDiagram (Fanin f g) = parens $ ppDiagram f <+> text "|||" <+> ppDiagram g ppDiagram (Try f) = parens $ text "try" <+> ppDiagram f + ppDiagram (Traverse f) = parens $ text "traverse" <+> ppDiagram f + ppDiagram (Wander _trav f) = parens $ text "wander" <+> ppDiagram f showFlow :: Flow eff ex a b -> String showFlow = render . ppFlow diff --git a/src/Control/Funflow/RemoteCache.hs b/src/Control/Funflow/RemoteCache.hs deleted file mode 100644 index 803c466..0000000 --- a/src/Control/Funflow/RemoteCache.hs +++ /dev/null @@ -1,134 +0,0 @@ -{-# LANGUAGE FlexibleInstances #-} -{-# LANGUAGE LambdaCase #-} -{-# LANGUAGE MultiParamTypeClasses #-} - --- | --- This module defines the remote caching mechanism of funflow which is used to --- keep several funflow stores (possibly on different machines) in sync. -module Control.Funflow.RemoteCache - ( Cacher(..) - , PullResult(..), PushResult(..), AliasResult(..) - , NoCache(..), memoryCache - , pullAsArchive, pushAsArchive - ) where - -import qualified Codec.Archive.Tar as Tar -import Control.Concurrent.MVar -import Control.Funflow.ContentHashable -import Control.Monad.IO.Class (MonadIO, liftIO) -import Data.ByteString.Lazy (ByteString) -import Data.Map.Strict (Map) -import qualified Data.Map.Strict as Map -import Path - - --- | --- The result of a tentative pull from the remote cache -data PullResult a - = PullOK a - | NotInCache - | PullError String - deriving (Eq, Ord, Show) - --- | --- The result of a tentative push to the remote cache -data PushResult - = PushOK - | PushError String - deriving (Eq, Ord, Show) - -data AliasResult - = AliasOK - | TargetNotInCache - | AliasError String - --- | --- A simple mechanism for remote-caching. --- --- Provides a way to push a path to the cache and pull it back. --- --- No assumption is made on the availability of a store path. In particular, --- pushing a path to the cache doesn't mean that we can pull it back. -class Monad m => Cacher m a where - push :: - a - -> ContentHash -- ^ "Primary" key: hash of the content - -> Maybe ContentHash -- ^ "Secondary" key: hash of the dependencies - -> Path Abs Dir -- ^ Path to the content - -> m PushResult - pull :: a -> ContentHash -> Path Abs Dir -> m (PullResult ()) - --- | --- Push the path as an archive to the remote cache -pushAsArchive :: - MonadIO m - => (ContentHash -> ContentHash -> m (Either String ())) -- ^ How to create the aliases - -> (ContentHash -> ByteString -> m PushResult) -- ^ How to push the content - -> ContentHash -- ^ Primary key - -> Maybe ContentHash -- ^ Secondary key - -> Path Abs Dir - -> m PushResult -pushAsArchive alias pushArchive primaryKey mSecondaryKey path = do - archive <- liftIO $ Tar.write <$> Tar.pack (toFilePath path) ["."] - pushArchive primaryKey archive >>= \case - PushError e -> pure $ PushError e - res -> - case mSecondaryKey of - Just secondaryKey -> - alias primaryKey secondaryKey >>= \case - Left err -> pure $ PushError err - Right () -> pure res - Nothing -> pure res - -pullAsArchive :: - MonadIO m - => (ContentHash -> m (PullResult ByteString)) - -> ContentHash - -> Path Abs Dir - -> m (PullResult ()) -pullAsArchive pullArchive hash path = - pullArchive hash >>= \case - PullOK archive -> do - liftIO $ Tar.unpack (toFilePath path) $ Tar.read archive - pure $ PullOK () - NotInCache -> pure NotInCache - PullError e -> pure $ PullError e - --- | --- A dummy remote cache implementation which does nothing -data NoCache = NoCache - -instance Monad m => Cacher m NoCache where - pull _ _ _ = pure NotInCache - push _ _ _ _ = pure PushOK - --- | --- An in-memory cache, for testing purposes -data MemoryCache = MemoryCache (MVar (Map ContentHash ByteString)) -instance MonadIO m => Cacher m MemoryCache where - pull (MemoryCache cacheVar) = pullAsArchive $ \hash -> do - cacheMap <- liftIO $ readMVar cacheVar - case Map.lookup hash cacheMap of - Nothing -> pure NotInCache - Just x -> pure (PullOK x) - push (MemoryCache cacheVar) = pushAsArchive alias $ \hash content -> do - liftIO $ modifyMVar_ - cacheVar - (\cacheMap -> pure $ Map.insert hash content cacheMap) - pure PushOK - where - alias from to = liftIO $ Right <$> modifyMVar_ cacheVar - (\cacheMap -> pure $ Map.insert to (cacheMap Map.! from) cacheMap) - -memoryCache :: MonadIO m => m MemoryCache -memoryCache = liftIO $ MemoryCache <$> newMVar mempty - --- | --- If 'a' is a 'Cacher' then 'Maybe a' is a cacher such that 'Just x' behavies --- like 'x' and 'Nothing' doesn't cache anything -instance Cacher m a => Cacher m (Maybe a) where - pull (Just x) = pull x - pull Nothing = pull NoCache - - push (Just x) = push x - push Nothing = push NoCache diff --git a/src/Control/Funflow/Steps.hs b/src/Control/Funflow/Steps.hs index 0281efa..26fec4b 100644 --- a/src/Control/Funflow/Steps.hs +++ b/src/Control/Funflow/Steps.hs @@ -46,15 +46,15 @@ where import Control.Arrow import Control.Arrow.Free (catch) import Control.Exception.Safe (Exception, throwM) -import Control.Funflow.Base (SimpleFlow, cache, - defaultCacherWithIdent) +import Control.Funflow.Base (SimpleFlow, cache) import Control.Funflow.Class -import Control.Funflow.ContentHashable (ContentHashable, +import qualified Control.Funflow.External.Docker as Docker +import Data.CAS.ContentHashable (ContentHashable, DirectoryContent (..), FileContent (..)) -import Control.Funflow.ContentStore (Content ((:</>))) -import qualified Control.Funflow.ContentStore as CS -import qualified Control.Funflow.External.Docker as Docker +import Data.CAS.ContentStore (Content ((:</>)), + defaultCacherWithIdent) +import qualified Data.CAS.ContentStore as CS import Data.Default (def) import Data.Foldable (for_) import Data.Store diff --git a/test/Funflow/ContentStore.hs b/test/Funflow/ContentStore.hs deleted file mode 100644 index bdd8f9e..0000000 --- a/test/Funflow/ContentStore.hs +++ /dev/null @@ -1,472 +0,0 @@ -{-# OPTIONS_GHC -fno-warn-unused-do-bind #-} -{-# LANGUAGE LambdaCase #-} -{-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE QuasiQuotes #-} -{-# LANGUAGE TypeApplications #-} - -module Funflow.ContentStore - ( tests - ) where - -import Control.Concurrent.Async -import Control.Exception.Safe (tryAny) -import Control.Funflow.ContentHashable (contentHash) -import Control.Funflow.ContentStore (ContentStore) -import qualified Control.Funflow.ContentStore as ContentStore -import qualified Control.Funflow.RemoteCache as Remote -import Control.Monad (void) -import Data.Maybe (catMaybes) -import qualified Data.Set as Set -import Path -import Path.IO -import System.Posix.Files -import Test.Tasty -import Test.Tasty.HUnit - -tests :: TestTree -tests = testGroup "Content Store" - - [ testCase "initialise fresh store" $ - withTmpDir $ \dir -> do - let root = dir </> [reldir|store|] - ContentStore.withStore root $ \_ -> - doesDirExist @IO root - @? "store root exists" - - , testCase "initialise existing store" $ - withTmpDir $ \dir -> do - let root = dir </> [reldir|store|] - createDir root - ContentStore.withStore root $ \_ -> - doesDirExist @IO root - @? "store root exists" - - , testCase "store is not writable" $ - withEmptyStore $ \store -> do - let root = ContentStore.root store - isNotWritable root - @? "store not writable" - - , testCase "subtree stages" $ - withEmptyStore $ \store -> do - hash <- contentHash ("test" :: String) - - missing <- ContentStore.query store hash - missing @?= ContentStore.Missing () - missing' <- ContentStore.lookup store hash - missing' @?= ContentStore.Missing () - - subtree <- ContentStore.markPending store hash - let dir = subtree </> [reldir|dir|] - file = dir </> [relfile|file|] - expectedContent = "Hello World" - pending <- ContentStore.query store hash - pending @?= ContentStore.Pending () - pending' <- ContentStore.lookup store hash - pending' @?= ContentStore.Pending () - doesDirExist @IO subtree - @? "pending subtree exists" - isWritable subtree - @? "pending subtree is writable" - createDir dir - writeFile (fromAbsFile file) expectedContent - do - content <- readFile (fromAbsFile file) - content @?= expectedContent - - item <- ContentStore.markComplete store hash - let itemDir = ContentStore.itemPath store item - file' = itemDir </> [relfile|dir/file|] - complete <- ContentStore.query store hash - complete @?= ContentStore.Complete () - complete' <- ContentStore.lookup store hash - complete' @?= ContentStore.Complete item - doesDirExist @IO itemDir - @? "complete subtree exists" - isNotWritable itemDir - @? "complete subtree is not writable" - isNotWritable file' - @? "complete file is not writable" - do - content <- readFile (fromAbsFile file') - content @?= expectedContent - - , testCase "await construction" $ - Remote.memoryCache >>= \myCache -> - withEmptyStore $ \store -> do - hash <- contentHash ("test" :: String) - - ContentStore.constructOrAsync store myCache hash >>= \case - ContentStore.Pending _ -> - assertFailure "missing already under construction" - ContentStore.Complete _ -> - assertFailure "missing already complete" - ContentStore.Missing _ -> - return () - - a <- ContentStore.constructOrAsync store Remote.NoCache hash >>= \case - ContentStore.Missing _ -> do - assertFailure "under construction still missing" - undefined - ContentStore.Complete _ -> do - assertFailure "under construction already complete" - undefined - ContentStore.Pending a -> - return a - - b <- ContentStore.lookupOrWait store hash >>= \case - ContentStore.Missing _ -> do - assertFailure "under construction still missing" - undefined - ContentStore.Complete _ -> do - assertFailure "under construction already complete" - undefined - ContentStore.Pending b -> - return b - - item <- ContentStore.markComplete store hash - - item' <- wait a - item' @?= ContentStore.Completed item - - item'' <- wait b - item'' @?= ContentStore.Completed item - - ContentStore.constructOrAsync store Remote.NoCache hash >>= \case - ContentStore.Missing _ -> do - assertFailure "complete still missing" - ContentStore.Pending _ -> do - assertFailure "complete still under construction" - ContentStore.Complete _ -> do - return () - - , testCase "await failure" $ - withEmptyStore $ \store -> do - hash <- contentHash ("test" :: String) - - ContentStore.constructOrAsync store Remote.NoCache hash >>= \case - ContentStore.Pending _ -> - assertFailure "missing already under construction" - ContentStore.Complete _ -> - assertFailure "missing already complete" - ContentStore.Missing _ -> - return () - - a <- ContentStore.constructOrAsync store Remote.NoCache hash >>= \case - ContentStore.Missing _ -> do - assertFailure "under construction still missing" - undefined - ContentStore.Complete _ -> do - assertFailure "under construction already complete" - undefined - ContentStore.Pending a -> - return a - - b <- ContentStore.lookupOrWait store hash >>= \case - ContentStore.Missing _ -> do - assertFailure "under construction still missing" - undefined - ContentStore.Complete _ -> do - assertFailure "under construction already complete" - undefined - ContentStore.Pending b -> - return b - - ContentStore.removeFailed store hash - - item' <- wait a - item' @?= ContentStore.Failed - - item'' <- wait b - item'' @?= ContentStore.Failed - - ContentStore.constructOrAsync store Remote.NoCache hash >>= \case - ContentStore.Pending _ -> do - assertFailure "failed still under construction" - ContentStore.Complete _ -> do - assertFailure "failed already complete" - ContentStore.Missing _ -> do - return () - - , testCase "construct if missing" $ - withEmptyStore $ \store -> do - hash <- contentHash ("test" :: String) - let file = [relfile|file|] - expectedContent = "Hello World" - - ContentStore.constructIfMissing store Remote.NoCache hash >>= \case - ContentStore.Pending () -> - assertFailure "missing already under construction" - ContentStore.Complete _ -> - assertFailure "missing already complete" - ContentStore.Missing subtree -> do - isWritable subtree - @? "under construction not writable" - writeFile (fromAbsFile $ subtree </> file) expectedContent - - ContentStore.constructIfMissing store Remote.NoCache hash >>= \case - ContentStore.Missing _ -> - assertFailure "under construction still missing" - ContentStore.Complete _ -> - assertFailure "under construction already complete" - ContentStore.Pending () -> - void $ ContentStore.markComplete store hash - - ContentStore.constructIfMissing store Remote.NoCache hash >>= \case - ContentStore.Missing _ -> - assertFailure "complete still missing" - ContentStore.Pending () -> - assertFailure "complete still under construction" - ContentStore.Complete item -> do - let subtree = ContentStore.itemPath store item - isNotWritable (subtree </> file) - @? "complete still writable" - content <- readFile (fromAbsFile $ subtree </> file) - content @?= expectedContent - - , testCase "remove failed" $ - withEmptyStore $ \store -> do - hash <- contentHash ("test" :: String) - subtree <- ContentStore.markPending store hash - ContentStore.removeFailed store hash - not <$> doesDirExist @IO subtree - @? "subtree was removed" - - , testCase "forcibly remove" $ - withEmptyStore $ \store -> do - hash <- contentHash ("test" :: String) - subtree <- ContentStore.markPending store hash - - ContentStore.removeForcibly store hash - not <$> doesDirExist @IO subtree - @? "remove under construction" - - ContentStore.removeForcibly store hash - not <$> doesDirExist @IO subtree - @? "remove missing" - - subtree' <- ContentStore.markPending store hash - void $ ContentStore.markComplete store hash - ContentStore.removeForcibly store hash - not <$> doesDirExist @IO subtree' - @? "remove complete" - - , testCase "subtree state is persisted" $ - withTmpDir $ \dir -> do - let root = dir </> [reldir|store|] - hash <- contentHash ("test" :: String) - - do - ContentStore.withStore root $ \store -> - void $ ContentStore.markPending store hash - - -- Imagine the process terminates and the store is closed - - do - ContentStore.withStore root $ \store -> do - underConstruction <- ContentStore.query store hash - underConstruction @?= ContentStore.Pending () - void $ ContentStore.markComplete store hash - - -- Imagine the process terminates and the store is closed - - do - ContentStore.withStore root $ \store -> do - complete <- ContentStore.query store hash - complete @?= ContentStore.Complete () - - , testCase "mark complete before under construction fails" $ - withEmptyStore $ \store -> do - hash <- contentHash ("test" :: String) - ContentStore.markComplete store hash - `shouldFail` "complete before under construction" - - , testCase "mark complete after complete fails" $ - withEmptyStore $ \store -> do - hash <- contentHash ("test" :: String) - void $ ContentStore.markPending store hash - void $ ContentStore.markComplete store hash - ContentStore.markComplete store hash - `shouldFail` "complete after complete" - - , testCase "mark under construction after under construction fails" $ - withEmptyStore $ \store -> do - hash <- contentHash ("test" :: String) - void $ ContentStore.markPending store hash - void $ ContentStore.markPending store hash - `shouldFail` "under construction after under construction" - - , testCase "mark under construction after complete fails" $ - withEmptyStore $ \store -> do - hash <- contentHash ("test" :: String) - void $ ContentStore.markPending store hash - void $ ContentStore.markComplete store hash - void $ ContentStore.markPending store hash - `shouldFail` "under construction after complete" - - , testCase "remove missing fails" $ - withEmptyStore $ \store -> do - hash <- contentHash ("test" :: String) - ContentStore.removeFailed store hash - `shouldFail` "remove non existent" - - , testCase "remove complete fails" $ - withEmptyStore $ \store -> do - hash <- contentHash ("test" :: String) - void $ ContentStore.markPending store hash - void $ ContentStore.markComplete store hash - ContentStore.removeFailed store hash - `shouldFail` "remove complete" - - , testCase "list store contents" $ - withEmptyStore $ \store -> do - [a, b, c, d] <- mapM contentHash ["a", "b", "c", "d" :: String] - void $ mapM (ContentStore.markPending store) [a, b, c, d] - mapM_ (ContentStore.markComplete store) [a, b] - - (pendings, completes, items) <- ContentStore.listAll store - let all' = pendings ++ completes - Set.fromList all' @?= Set.fromList [a, b, c, d] - Set.size (Set.fromList all') @?= 4 - - underContsruction <- ContentStore.listPending store - Set.fromList underContsruction @?= Set.fromList [c, d] - - complete <- ContentStore.listComplete store - Set.fromList complete @?= Set.fromList [a, b] - - items' <- catMaybes <$> mapM (ContentStore.waitUntilComplete store) [a, b] - Set.fromList items @?= Set.fromList items' - - , testCase "store aliases" $ - withEmptyStore $ \store -> do - let fp = [relfile|test|] - contentA = "itemA" - contentB = "itemB" - itemA <- do - hash <- contentHash ("a" :: String) - dir <- ContentStore.markPending store hash - writeFile (fromAbsFile $ dir </> fp) contentA - ContentStore.markComplete store hash - itemB <- do - hash <- contentHash ("b" :: String) - dir <- ContentStore.markPending store hash - writeFile (fromAbsFile $ dir </> fp) contentB - ContentStore.markComplete store hash - let aliasA = ContentStore.Alias "aliasA" - aliasB = ContentStore.Alias "aliasB" - - do - r <- ContentStore.lookupAlias store aliasA - r @?= Nothing - - ContentStore.assignAlias store aliasA itemA - ContentStore.assignAlias store aliasB itemB - do - r <- ContentStore.lookupAlias store aliasA - r @?= Just itemA - do - r <- ContentStore.lookupAlias store aliasB - r @?= Just itemB - - ContentStore.assignAlias store aliasB itemA - do - r <- ContentStore.lookupAlias store aliasA - r @?= Just itemA - do - r <- ContentStore.lookupAlias store aliasB - r @?= Just itemA - - ContentStore.removeAlias store aliasA - ContentStore.removeAlias store aliasB - do - r <- ContentStore.lookupAlias store aliasA - r @?= Nothing - do - r <- ContentStore.lookupAlias store aliasB - r @?= Nothing - - , testCase "Remote caching (constructOrAsync)" $ do - cacher <- Remote.memoryCache - hash <- contentHash ("test" :: String) - let file = [relfile|file|] - expectedContent = "Hello World" - - -- Populate the remote cache - withEmptyStore $ \store -> do - ContentStore.constructOrAsync store cacher hash >>= \case - ContentStore.Pending _ -> - assertFailure "missing already under construction" - ContentStore.Complete _ -> - assertFailure "missing already complete" - ContentStore.Missing subtree -> do - isWritable subtree @? "under construction not writable" - writeFile (fromAbsFile $ subtree </> file) expectedContent - Remote.push cacher hash Nothing subtree - - -- Expects having the item in cache - withEmptyStore $ \store -> do - ContentStore.constructOrAsync store cacher hash >>= \case - ContentStore.Pending _ -> - assertFailure "missing already under construction" - ContentStore.Complete _ -> pure () - ContentStore.Missing _ -> assertFailure "Not found in the cache" - - , testCase "Remote caching (withConstructIfMissing)" $ do - cacher <- Remote.memoryCache - hash <- contentHash ("test" :: String) - let file = [relfile|file|] - expectedContent = "Hello World" - doWrite subtree = do - isWritable subtree @? "under construction not writable" - writeFile (fromAbsFile $ subtree </> file) expectedContent - return $ Right () - - -- Populates the remote cache - withEmptyStore $ \store -> do - ContentStore.withConstructIfMissing store cacher hash doWrite >>= \case - ContentStore.Missing _ -> assertFailure "not found in the cache" - ContentStore.Pending _ -> - assertFailure "missing already under construction" - -- ContentStore.waitUntilComplete store hash >>= \case - -- Just item -> return () - -- Nothing -> assertFailure "item construction failed" - ContentStore.Complete _ -> pure () - - -- Expects having the item in the remote cache - withEmptyStore $ \store -> do - ContentStore.withConstructIfMissing store cacher hash - (const $ assertFailure "should not try to write the file a second time") >>= \case - ContentStore.Missing _ -> assertFailure "Not found in the cache" - ContentStore.Pending _ -> - assertFailure "missing already under construction" - ContentStore.Complete _ -> pure () - - ] - -shouldFail :: IO a -> String -> IO () -shouldFail m msg = tryAny m >>= \case - Left _ -> return () - Right _ -> assertFailure msg - -withTmpDir :: (Path Abs Dir -> IO a) -> IO a -withTmpDir = withSystemTempDir "funflow-test" - -withEmptyStore :: (ContentStore -> IO a) -> IO a -withEmptyStore k = withTmpDir $ \dir -> - ContentStore.withStore (dir </> [reldir|store|]) k - -isNotWritable :: Path Abs t -> IO Bool -isNotWritable path = do - mode <- fileMode <$> getFileStatus (toFilePath path) - return $! nullFileMode == (mode `intersectFileModes` writeModes) - where - writeModes = - ownerWriteMode - `unionFileModes` - groupWriteMode - `unionFileModes` - otherWriteMode - -isWritable :: Path Abs t -> IO Bool -isWritable = fmap not . isNotWritable diff --git a/test/Funflow/SQLiteCoordinator.hs b/test/Funflow/SQLiteCoordinator.hs index 442bca3..a830cf0 100644 --- a/test/Funflow/SQLiteCoordinator.hs +++ b/test/Funflow/SQLiteCoordinator.hs @@ -12,9 +12,9 @@ import Control.Concurrent.Async (wait, withAsync) import Control.Concurrent.MVar import Control.Exception.Safe import Control.Funflow -import qualified Control.Funflow.ContentStore as CS import Control.Funflow.External.Coordinator.SQLite import Control.Monad +import qualified Data.CAS.ContentStore as CS import Data.Semigroup ((<>)) import Data.String (fromString) import Path diff --git a/test/Funflow/TestFlows.hs b/test/Funflow/TestFlows.hs index 5c3161e..f84c143 100644 --- a/test/Funflow/TestFlows.hs +++ b/test/Funflow/TestFlows.hs @@ -12,11 +12,11 @@ import Control.Arrow.Free import Control.Concurrent.Async (withAsync) import Control.Exception.Safe hiding (catch) import Control.Funflow -import Control.Funflow.ContentStore (Content ((:</>))) -import qualified Control.Funflow.ContentStore as CS import Control.Funflow.External.Coordinator.Memory import Control.Funflow.External.Executor (executeLoop) import Control.Monad (when) +import Data.CAS.ContentStore (Content ((:</>))) +import qualified Data.CAS.ContentStore as CS import Data.Default (def) import Data.List (sort) import Path diff --git a/test/Test.hs b/test/Test.hs index de3d732..5a09bef 100644 --- a/test/Test.hs +++ b/test/Test.hs @@ -1,5 +1,4 @@ import qualified Control.Arrow.Async.Tests -import qualified Funflow.ContentStore import qualified Funflow.SQLiteCoordinator import qualified Funflow.TestFlows import Test.Tasty @@ -9,8 +8,7 @@ main = defaultMain tests tests :: TestTree tests = testGroup "Unit Tests" - [ Funflow.ContentStore.tests - , Control.Arrow.Async.Tests.tests + [ Control.Arrow.Async.Tests.tests , Funflow.TestFlows.tests , Funflow.SQLiteCoordinator.tests ] |