summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authornclarke <>2020-03-09 14:05:00 (GMT)
committerhdiff <hdiff@hdiff.luite.com>2020-03-09 14:05:00 (GMT)
commit306e9a481ffcb94b92d059ce75c12663d23eb900 (patch)
tree2f2ab075c98d729f34cf1d7f38b847494a50593e
parent31eaf2c2a3b07a6553aef61572aa685426d9f5c2 (diff)
version 1.6.0HEAD1.6.0master
-rw-r--r--TestFunflow.hs3
-rw-r--r--app/FFExecutorD.hs2
-rwxr-xr-x[-rw-r--r--]changelog.md0
-rw-r--r--funflow.cabal30
-rw-r--r--src/Control/Arrow/AppArrow.hs39
-rw-r--r--src/Control/Arrow/Async.hs19
-rw-r--r--src/Control/Arrow/Free.hs84
-rw-r--r--src/Control/Funflow.hs40
-rw-r--r--src/Control/Funflow/Base.hs30
-rw-r--r--src/Control/Funflow/Cache/TH.hs2
-rw-r--r--src/Control/Funflow/Class.hs27
-rw-r--r--src/Control/Funflow/ContentHashable.hs549
-rw-r--r--src/Control/Funflow/ContentStore.hs1032
-rw-r--r--src/Control/Funflow/ContentStore/Notify.hs28
-rw-r--r--src/Control/Funflow/ContentStore/Notify/BSD.hs78
-rw-r--r--src/Control/Funflow/ContentStore/Notify/Linux.hs60
-rw-r--r--src/Control/Funflow/Diagram.hs12
-rw-r--r--src/Control/Funflow/Exec/Simple.hs73
-rw-r--r--src/Control/Funflow/External.hs7
-rw-r--r--src/Control/Funflow/External/Coordinator.hs3
-rw-r--r--src/Control/Funflow/External/Coordinator/Memory.hs2
-rw-r--r--src/Control/Funflow/External/Coordinator/Redis.hs2
-rw-r--r--src/Control/Funflow/External/Coordinator/SQLite.hs4
-rw-r--r--src/Control/Funflow/External/Docker.hs2
-rw-r--r--src/Control/Funflow/External/Executor.hs4
-rw-r--r--src/Control/Funflow/Lock.hs120
-rw-r--r--src/Control/Funflow/Orphans.hs28
-rw-r--r--src/Control/Funflow/Pretty.hs2
-rw-r--r--src/Control/Funflow/RemoteCache.hs134
-rw-r--r--src/Control/Funflow/Steps.hs12
-rw-r--r--test/Funflow/ContentStore.hs472
-rw-r--r--test/Funflow/SQLiteCoordinator.hs2
-rw-r--r--test/Funflow/TestFlows.hs4
-rw-r--r--test/Test.hs4
34 files changed, 187 insertions, 2723 deletions
diff --git a/TestFunflow.hs b/TestFunflow.hs
index 09a8373..86445a6 100644
--- a/TestFunflow.hs
+++ b/TestFunflow.hs
@@ -7,10 +7,9 @@ import Control.Arrow
import Control.Arrow.Free
import Control.Exception.Safe
import Control.Funflow
-import qualified Control.Funflow.ContentStore as CS
import Control.Funflow.External.Coordinator.Memory
import Control.Funflow.Pretty
-import Data.Default
+import qualified Data.CAS.ContentStore as CS
import Data.Monoid ((<>))
import Path
import Path.IO
diff --git a/app/FFExecutorD.hs b/app/FFExecutorD.hs
index 70ae7cd..881a40d 100644
--- a/app/FFExecutorD.hs
+++ b/app/FFExecutorD.hs
@@ -11,12 +11,12 @@ import Control.Concurrent (myThreadId)
import Control.Concurrent.MVar
import Control.Exception.Base (AsyncException (UserInterrupt))
import Control.Exception.Safe
-import qualified Control.Funflow.ContentStore as CS
import Control.Funflow.External.Coordinator
import Control.Funflow.External.Coordinator.Redis
import Control.Funflow.External.Coordinator.SQLite
import Control.Funflow.External.Executor
import Control.Monad (void)
+import qualified Data.CAS.ContentStore as CS
import Data.Monoid ((<>))
import qualified Database.Redis as R
import Network.Socket (PortNumber)
diff --git a/changelog.md b/changelog.md
index e69de29..e69de29 100644..100755
--- a/changelog.md
+++ b/changelog.md
diff --git a/funflow.cabal b/funflow.cabal
index 31d5013..2f60be3 100644
--- a/funflow.cabal
+++ b/funflow.cabal
@@ -1,5 +1,5 @@
Name: funflow
-Version: 1.5.0
+Version: 1.6.0
Synopsis: Workflows with arrows
Description:
An arrow with resumable computations and logging
@@ -24,13 +24,10 @@ Library
default-language: Haskell2010
Exposed-modules:
- Control.Arrow.AppArrow
- , Control.Arrow.Async
+ Control.Arrow.Async
, Control.Arrow.Free
, Control.Funflow
, Control.Funflow.Cache.TH
- , Control.Funflow.ContentHashable
- , Control.Funflow.ContentStore
, Control.Funflow.Diagram
, Control.Funflow.External
, Control.Funflow.External.Docker
@@ -39,22 +36,20 @@ Library
, Control.Funflow.External.Coordinator.Memory
, Control.Funflow.External.Coordinator.Redis
, Control.Funflow.External.Coordinator.SQLite
- , Control.Funflow.Lock
- , Control.Funflow.Orphans
- , Control.Funflow.RemoteCache
, Control.Funflow.Steps
, Control.Funflow.Pretty
, Control.Funflow.Exec.Simple
Other-modules:
Control.Funflow.Base
, Control.Funflow.Class
- , Control.Funflow.ContentStore.Notify
Build-depends:
base >= 4.6 && <5
, Glob
, aeson >= 1.2.3.0
, async
, bytestring
+ , cas-hashable >= 1.0.1 && <2
+ , cas-store >= 1.0.1 && <2
, clock
, constraints
, containers
@@ -79,6 +74,7 @@ Library
, path-io
, pretty
, process
+ , profunctors
, random
, safe-exceptions
, scientific
@@ -94,21 +90,13 @@ Library
, unordered-containers
, vector
, yaml
- if os(linux)
- CPP-options: -DOS_Linux
- Other-modules: Control.Funflow.ContentStore.Notify.Linux
- Build-depends: hinotify >= 0.3.9
- else
- if os(darwin) || os(freebsd)
- CPP-options: -DOS_BSD
- Other-modules: Control.Funflow.ContentStore.Notify.BSD
- Build-depends: kqueue
Executable ffexecutord
default-language: Haskell2010
main-is: app/FFExecutorD.hs
build-depends: base >=4.6 && <5
, bytestring
+ , cas-store
, clock
, funflow
, hedis >= 0.12.5
@@ -126,6 +114,7 @@ Test-suite test-funflow
main-is: TestFunflow.hs
ghc-options: -Wall -threaded
build-depends: base >=4.6 && <5
+ , cas-store
, data-default
, funflow
, filepath
@@ -141,14 +130,13 @@ Test-suite unit-tests
default-language: Haskell2010
hs-source-dirs: test
main-is: Test.hs
- other-modules: Funflow.ContentStore
- Funflow.SQLiteCoordinator
+ other-modules: Funflow.SQLiteCoordinator
Funflow.TestFlows
Control.Arrow.Async.Tests
ghc-options: -Wall -threaded
build-depends: base
, async
- , containers
+ , cas-store
, data-default >= 0.7
, directory
, filepath
diff --git a/src/Control/Arrow/AppArrow.hs b/src/Control/Arrow/AppArrow.hs
deleted file mode 100644
index fe88236..0000000
--- a/src/Control/Arrow/AppArrow.hs
+++ /dev/null
@@ -1,39 +0,0 @@
--- | This modules defines the composition of an applicative functor and an
--- arrow, which is always an arrow.
-
-module Control.Arrow.AppArrow
- ( AppArrow(..)
- , appArrow
- ) where
-
-import Control.Category
-import Control.Arrow
-import Prelude hiding (id, (.))
-
-newtype AppArrow app arr a b = AppArrow { unAppArrow :: app (arr a b) }
-
-instance (Applicative app, Category cat) => Category (AppArrow app cat) where
- id = appArrow id
- AppArrow a1 . AppArrow a2 = AppArrow $ (.) <$> a1 <*> a2
-
-instance (Applicative app, Arrow arr) => Arrow (AppArrow app arr) where
- arr = appArrow . arr
- first (AppArrow a) = AppArrow $ first <$> a
- second (AppArrow a) = AppArrow $ second <$> a
- AppArrow a1 *** AppArrow a2 = AppArrow $ (***) <$> a1 <*> a2
-
-instance (Applicative app, ArrowChoice arr) => ArrowChoice (AppArrow app arr) where
- left (AppArrow a) = AppArrow $ left <$> a
- right (AppArrow a) = AppArrow $ right <$> a
- AppArrow a1 +++ AppArrow a2 = AppArrow $ (+++) <$> a1 <*> a2
- AppArrow a1 ||| AppArrow a2 = AppArrow $ (|||) <$> a1 <*> a2
-
-instance (Applicative f, Arrow arr) => Functor (AppArrow f arr t) where
- fmap f = (>>> arr f)
-
-instance (Applicative app, Arrow arr) => Applicative (AppArrow app arr t) where
- pure = arr . const
- a <*> b = a &&& b >>> arr (uncurry ($))
-
-appArrow :: (Applicative app) => arr a b -> AppArrow app arr a b
-appArrow = AppArrow . pure
diff --git a/src/Control/Arrow/Async.hs b/src/Control/Arrow/Async.hs
index 4a27473..5cbfb7e 100644
--- a/src/Control/Arrow/Async.hs
+++ b/src/Control/Arrow/Async.hs
@@ -2,6 +2,9 @@
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE UndecidableInstances #-}
+{-# LANGUAGE DerivingVia #-}
+{-# LANGUAGE StandaloneDeriving #-}
+
-- | Asynchronous arrows over monads with MonadBaseControl IO, using
-- lifted-async.
module Control.Arrow.Async where
@@ -14,10 +17,26 @@ import Control.Exception.Safe (Exception, MonadCatch)
import qualified Control.Exception.Safe
import Control.Monad.Trans.Class (MonadTrans, lift)
import Control.Monad.Trans.Control (MonadBaseControl)
+import qualified Data.Profunctor as P
+import qualified Data.Profunctor.Mapping as P
+import qualified Data.Profunctor.Traversing as P
import Prelude hiding (id, (.))
newtype AsyncA m a b = AsyncA { runAsyncA :: a -> m b }
+deriving via P.WrappedArrow (AsyncA m)
+ instance (MonadBaseControl IO m) => P.Profunctor (AsyncA m)
+deriving via P.WrappedArrow (AsyncA m)
+ instance (MonadBaseControl IO m) => P.Strong (AsyncA m)
+deriving via P.WrappedArrow (AsyncA m)
+ instance (MonadBaseControl IO m) => P.Choice (AsyncA m)
+instance (MonadBaseControl IO m) => P.Traversing (AsyncA m) where
+ traverse' (AsyncA f) = AsyncA $ mapConcurrently f
+instance (MonadBaseControl IO m) => P.Closed (AsyncA m) where
+ closed = P.closedMapping
+instance (MonadBaseControl IO m) => P.Mapping (AsyncA m) where
+ map' = P.traverseMapping
+
instance Monad m => Category (AsyncA m) where
id = AsyncA return
(AsyncA f) . (AsyncA g) = AsyncA (\b -> g b >>= f)
diff --git a/src/Control/Arrow/Free.hs b/src/Control/Arrow/Free.hs
index 149ab13..23094ec 100644
--- a/src/Control/Arrow/Free.hs
+++ b/src/Control/Arrow/Free.hs
@@ -12,6 +12,7 @@
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TypeOperators #-}
+{-# LANGUAGE DerivingVia #-}
-- | Various varieties of free arrow constructions.
--
@@ -37,26 +38,27 @@ module Control.Arrow.Free
, mapA
, mapSeqA
, filterA
+ , hoistErrorChoiceEff
+ , expandErrorChoiceEff
, type (~>)
) where
+import Prelude (Functor (..))
+
import Control.Arrow
-import Control.Arrow.AppArrow
import Control.Category
import Control.Exception.Safe (Exception, MonadCatch)
import qualified Control.Exception.Safe
-import Control.Monad.Trans.Reader
-import Control.Monad.Trans.Writer
-import qualified Control.Monad.Trans.Writer.Strict as SW
-import Data.Bool (Bool)
-import Data.Constraint (Constraint, Dict (..), mapDict, weaken1,
- weaken2)
-import Data.Either (Either (..))
-import Data.Function (const, flip, ($))
-import Data.List (uncons)
-import Data.Maybe (maybe)
-import Data.Monoid (Monoid)
-import Data.Tuple (uncurry)
+import Data.Bool (Bool)
+import Data.Constraint (Constraint, Dict (..))
+import Data.Either (Either (..))
+import Data.Function (const, flip, ($))
+import Data.List (uncons)
+import Data.Maybe (maybe)
+import qualified Data.Profunctor as P
+import qualified Data.Profunctor.Cayley as P
+import qualified Data.Profunctor.Traversing as P
+import Data.Tuple (uncurry)
-- | A natural transformation on type constructors of two arguments.
type x ~> y = forall a b. x a b -> y a b
@@ -77,10 +79,10 @@ class FreeArrowLike fal where
-- | Annoying hackery to let us tuple constraints and still use 'effect'
-- and 'eval'
-class Join ( a :: k -> Constraint) (b :: k -> Constraint) (x :: k) where
- ctx :: Dict (a x, b x)
-instance (a x, b x) => Join a b x where
- ctx = Dict
+class Join (a :: k -> Constraint) (b :: k -> Constraint) (c :: k -> Constraint) (x :: k) where
+ ctx :: (Dict (a x), Dict (b x), Dict (c x))
+instance (a x, b x, c x) => Join a b c x where
+ ctx = (Dict, Dict, Dict)
--------------------------------------------------------------------------------
-- Arrow
@@ -167,17 +169,8 @@ instance FreeArrowLike Choice where
class ArrowError ex a where
try :: a e c -> a e (Either ex c)
-instance (ArrowError ex arr) => ArrowError ex (AppArrow (Reader r) arr) where
- try (AppArrow act) = AppArrow $ reader $ \r ->
- try $ runReader act r
-
-instance (ArrowError ex arr, Monoid w) => ArrowError ex (AppArrow (Writer w) arr) where
- try (AppArrow act) = AppArrow $ writer (try a, w)
- where (a, w) = runWriter act
-
-instance (ArrowError ex arr, Monoid w) => ArrowError ex (AppArrow (SW.Writer w) arr) where
- try (AppArrow act) = AppArrow $ SW.writer (try a, w)
- where (a, w) = SW.runWriter act
+instance (ArrowError ex arr, Functor f) => ArrowError ex (P.Cayley f arr) where
+ try (P.Cayley f) = P.Cayley $ fmap try f
catch :: (ArrowError ex a, ArrowChoice a) => a e c -> a (e, ex) c -> a e c
catch a onExc = proc e -> do
@@ -192,11 +185,27 @@ instance (Arrow (Kleisli m), Exception ex, MonadCatch m)
=> ArrowError ex (Kleisli m) where
try (Kleisli a) = Kleisli $ Control.Exception.Safe.try . a
--- | Freely generated arrows with both choice and error handling.
+-- | Freely generated arrows with both choice, error handling and ability to
+-- traverse structures with Traversals.
newtype ErrorChoice ex eff a b = ErrorChoice {
- runErrorChoice :: forall ac. (ArrowChoice ac, ArrowError ex ac)
+ runErrorChoice :: forall ac. (ArrowChoice ac, ArrowError ex ac, P.Traversing ac)
=> (eff ~> ac) -> ac a b
}
+ deriving (P.Profunctor, P.Strong, P.Choice) via P.WrappedArrow (ErrorChoice ex eff)
+
+hoistErrorChoiceEff
+ :: (eff ~> eff')
+ -> ErrorChoice ex eff a b
+ -> ErrorChoice ex eff' a b
+hoistErrorChoiceEff f (ErrorChoice ec) = ErrorChoice $ \interp ->
+ ec (interp . f)
+
+expandErrorChoiceEff
+ :: (forall x y. eff x y -> ErrorChoice ex eff' x y)
+ -> ErrorChoice ex eff a b
+ -> ErrorChoice ex eff' a b
+expandErrorChoiceEff f (ErrorChoice ec) = ErrorChoice $ \interp ->
+ ec (\eff -> runErrorChoice (f eff) interp)
instance Category (ErrorChoice ex eff) where
id = ErrorChoice $ const id
@@ -216,19 +225,22 @@ instance ArrowChoice (ErrorChoice ex eff) where
instance ArrowError ex (ErrorChoice ex eff) where
try (ErrorChoice a) = ErrorChoice $ \f -> try $ a f
+instance P.Traversing (ErrorChoice ex eff) where
+ traverse' (ErrorChoice a) = ErrorChoice $ \f -> P.traverse' (a f)
+ wander trav (ErrorChoice a) = ErrorChoice $ \f -> P.wander trav (a f)
+
instance FreeArrowLike (ErrorChoice ex) where
- type Ctx (ErrorChoice ex) = Join ArrowChoice (ArrowError ex)
+ type Ctx (ErrorChoice ex) = Join ArrowChoice (ArrowError ex) P.Traversing
effect :: eff a b -> ErrorChoice ex eff a b
effect a = ErrorChoice $ \f -> f a
- eval :: forall eff arr a0 b0. (Join ArrowChoice (ArrowError ex) arr)
+ eval :: forall eff arr a0 b0. (Join ArrowChoice (ArrowError ex) P.Traversing arr)
=> (eff ~> arr)
-> ErrorChoice ex eff a0 b0
-> arr a0 b0
- eval f a = case (\x -> (mapDict weaken1 x, mapDict weaken2 x)) ctx of
- ( Dict :: Dict (ArrowChoice arr)
- , Dict :: Dict (ArrowError ex arr)
- ) -> runErrorChoice a f
+ eval f a = case ctx of
+ ( Dict :: Dict (ArrowChoice arr), Dict :: Dict (ArrowError ex arr), Dict :: Dict (P.Traversing arr) )
+ -> runErrorChoice a f
--------------------------------------------------------------------------------
diff --git a/src/Control/Funflow.hs b/src/Control/Funflow.hs
index c2d90ed..b255cc7 100644
--- a/src/Control/Funflow.hs
+++ b/src/Control/Funflow.hs
@@ -1,4 +1,6 @@
{-# LANGUAGE TypeOperators #-}
+{-# LANGUAGE RankNTypes #-}
+{-# LANGUAGE GADTs #-}
-- | Central Funflow module.
--
@@ -9,13 +11,14 @@ module Control.Funflow
, type (Base.==>)
, Base.NoEffect
, Base.Flow'(..)
- , Base.Cacher(..)
, Base.ExternalProperties(..)
, Base.MDWriter
, Base.Properties(..)
, Base.EpPurity(..)
, Base.alwaysRecompile
- , Base.defaultCacherWithIdent
+ , CS.CacherM(..)
+ , CS.Cacher
+ , CS.defaultCacherWithIdent
, Cache.defaultCacher
, Cache.defaultCacherLoc
-- * Defines our primitive flow functions
@@ -24,16 +27,45 @@ module Control.Funflow
, Class.stepIO
, Class.wrap
, CS.withStore
+ , hoistFlowEff
+ , expandFlowEff
, module Control.Funflow.Steps
, module Control.Funflow.Exec.Simple
, module Control.Funflow.External
)
where
-import qualified Control.Funflow.Base as Base
+import Control.Arrow.Free
+import Control.Funflow.Base as Base
import qualified Control.Funflow.Cache.TH as Cache
import qualified Control.Funflow.Class as Class
-import qualified Control.Funflow.ContentStore as CS
import Control.Funflow.Exec.Simple
import Control.Funflow.External
import Control.Funflow.Steps
+import qualified Data.CAS.ContentStore as CS
+
+-- | Change the effect type present inside the 'Flow'
+hoistFlowEff :: (forall x y. eff x y -> eff' x y) -> Flow eff ex a b -> Flow eff' ex a b
+hoistFlowEff f = hoistErrorChoiceEff $ \e -> case e of
+ Step p s -> Step p s
+ StepIO p s -> StepIO p s
+ External p s -> External p s
+ PutInStore s -> PutInStore s
+ GetFromStore s -> GetFromStore s
+ InternalManipulateStore s -> InternalManipulateStore s
+ Wrapped p s -> Wrapped p (f s)
+
+-- | Turns each effect in the 'Flow' as a 'Flow' of some other effect
+expandFlowEff :: (forall x y. Properties x y -> eff x y -> Flow eff' ex x y)
+ -> Flow eff ex a b
+ -> Flow eff' ex a b
+expandFlowEff f = expandErrorChoiceEff $ \e -> case e of
+ Step p s -> Class.step' p s
+ StepIO p s -> Class.stepIO' p s
+ External p s -> Class.external' p s
+ PutInStore s -> Class.putInStore s
+ GetFromStore s -> Class.getFromStore s
+ InternalManipulateStore s -> Class.internalManipulateStore s
+ Wrapped p s -> f p s
+-- hoistFlowEff and expandFlowEff are here and not in Control.Funflow.Base to
+-- avoid a dependency cycle
diff --git a/src/Control/Funflow/Base.hs b/src/Control/Funflow/Base.hs
index 5402784..b26f269 100644
--- a/src/Control/Funflow/Base.hs
+++ b/src/Control/Funflow/Base.hs
@@ -16,16 +16,14 @@ module Control.Funflow.Base where
import Control.Arrow (Kleisli (..))
import Control.Arrow.Free
import Control.Exception.Safe (SomeException)
-import Control.Funflow.ContentHashable
-import qualified Control.Funflow.ContentStore as CS
import Control.Funflow.Diagram
import Control.Funflow.External
import Data.ByteString (ByteString)
+import Data.CAS.ContentHashable
+import Data.CAS.ContentStore as CS
import Data.Default
-import Data.Functor.Identity
import Data.Int (Int64)
import Data.Proxy (Proxy (..))
-import qualified Data.Store as Store
import qualified Data.Text as T
import Path
import Prelude hiding (id, (.))
@@ -34,30 +32,6 @@ import System.Random (randomIO)
-- | Metadata writer
type MDWriter i o = Maybe (i -> o -> [(T.Text, ByteString)])
--- | A cacher is responsible for controlling how steps are cached.
-data Cacher i o =
- NoCache -- ^ This step cannot be cached (default).
- | Cache
- { -- | Function to encode the input into a content
- -- hash.
- -- This function additionally takes an
- -- 'identities' which gets incorporated into
- -- the cacher.
- cacherKey :: Int -> i -> ContentHash
- , cacherStoreValue :: o -> ByteString
- -- | Attempt to read the cache value back. May throw exceptions.
- , cacherReadValue :: ByteString -> o
- }
-
-defaultCacherWithIdent :: (Store.Store o, ContentHashable Identity i)
- => Int -- ^ Seed for the cacher
- -> Cacher i o
-defaultCacherWithIdent ident = Cache
- { cacherKey = \i ident' -> runIdentity $ contentHash (ident', ident, i)
- , cacherStoreValue = Store.encode
- , cacherReadValue = Store.decodeEx
- }
-
data Properties i o = Properties
{ -- | Name of this step. Used when describing the step in diagrams
-- or other reporting.
diff --git a/src/Control/Funflow/Cache/TH.hs b/src/Control/Funflow/Cache/TH.hs
index 6850995..72bd82c 100644
--- a/src/Control/Funflow/Cache/TH.hs
+++ b/src/Control/Funflow/Cache/TH.hs
@@ -8,7 +8,7 @@
-- | Template Haskell splices for the funflow cache.
module Control.Funflow.Cache.TH where
-import Control.Funflow.Base
+import Data.CAS.ContentStore
import Data.Hashable
import Language.Haskell.TH.Syntax
import System.Random
diff --git a/src/Control/Funflow/Class.hs b/src/Control/Funflow/Class.hs
index ec121ba..8d501c6 100644
--- a/src/Control/Funflow/Class.hs
+++ b/src/Control/Funflow/Class.hs
@@ -16,12 +16,12 @@
module Control.Funflow.Class where
import Control.Arrow
-import Control.Arrow.AppArrow
import Control.Arrow.Free
import qualified Control.Funflow.Base as Base
-import Control.Funflow.ContentHashable
-import qualified Control.Funflow.ContentStore as CS
import Control.Funflow.External
+import Data.CAS.ContentHashable
+import qualified Data.CAS.ContentStore as CS
+import qualified Data.Profunctor.Cayley as P
import Data.Default (def)
import Path
@@ -67,14 +67,15 @@ wrap :: ArrowFlow eff ex arr => eff a b -> arr a b
wrap = wrap' def
instance ( Applicative app
- , ArrowError ex (AppArrow app (arr eff ex))
+ , ArrowError ex (arr eff ex)
, ArrowFlow eff ex (arr eff ex) )
- => ArrowFlow eff ex (AppArrow app (arr eff ex)) where
- step' props f = appArrow $ step' props f
- stepIO' props f = appArrow $ stepIO' props f
- external f = appArrow $ external f
- external' props f = appArrow $ external' props f
- wrap' props eff = appArrow $ wrap' props eff
- putInStore f = appArrow $ putInStore f
- getFromStore f = appArrow $ getFromStore f
- internalManipulateStore f = appArrow $ internalManipulateStore f
+ => ArrowFlow eff ex (P.Cayley app (arr eff ex)) where
+ step' props f = P.Cayley . pure $ step' props f
+ stepIO' props f = P.Cayley . pure $ stepIO' props f
+ external f = P.Cayley . pure $ external f
+ external' props f = P.Cayley . pure $ external' props f
+ wrap' props eff = P.Cayley . pure $ wrap' props eff
+ putInStore f = P.Cayley . pure $ putInStore f
+ getFromStore f = P.Cayley . pure $ getFromStore f
+ internalManipulateStore f = P.Cayley . pure $ internalManipulateStore f
+
diff --git a/src/Control/Funflow/ContentHashable.hs b/src/Control/Funflow/ContentHashable.hs
deleted file mode 100644
index 53a6aca..0000000
--- a/src/Control/Funflow/ContentHashable.hs
+++ /dev/null
@@ -1,549 +0,0 @@
-{-# LANGUAGE DefaultSignatures #-}
-{-# LANGUAGE DeriveGeneric #-}
-{-# LANGUAGE FlexibleContexts #-}
-{-# LANGUAGE FlexibleInstances #-}
-{-# LANGUAGE LambdaCase #-}
-{-# LANGUAGE MagicHash #-}
-{-# LANGUAGE MultiParamTypeClasses #-}
-{-# LANGUAGE OverloadedStrings #-}
-{-# LANGUAGE TypeApplications #-}
-{-# LANGUAGE TypeOperators #-}
-{-# LANGUAGE TypeSynonymInstances #-}
-{-# LANGUAGE UnboxedTuples #-}
-
--- | 'ContentHashable' provides a hashing function suitable for use in the
--- Funflow content store.
---
--- This behaves as does a normal hashing function on Haskell types. However,
--- on path types, this instead calculates a hash based on the contents of the
--- file or directory referenced.
---
--- We also export the 'ExternallyAssuredFile' and 'ExternallyAssuredDirectory'
--- types. These instead use the path, file size and modification time to control
--- the hash.
-module Control.Funflow.ContentHashable
- ( ContentHash
- , toBytes
- , fromBytes
- , ContentHashable (..)
- , contentHashUpdate_binaryFile
- , contentHashUpdate_byteArray#
- , contentHashUpdate_fingerprint
- , contentHashUpdate_primitive
- , contentHashUpdate_storable
-
- , FileContent (..)
- , DirectoryContent (..)
-
- , ExternallyAssuredFile(..)
- , ExternallyAssuredDirectory(..)
-
- , encodeHash
- , decodeHash
- , hashToPath
- , pathToHash
-
- , SHA256
- , Context
- , Digest
- ) where
-
-
-import Control.Exception.Safe (catchJust)
-import Control.Funflow.Orphans ()
-import Control.Monad (foldM, mzero, (>=>))
-import Control.Monad.IO.Class (MonadIO, liftIO)
-import Crypto.Hash (Context, Digest, SHA256,
- digestFromByteString,
- hashFinalize, hashInit,
- hashUpdate)
-import qualified Data.Aeson as Aeson
-import qualified Data.Aeson.Types as Aeson
-import Data.Bits (shiftL)
-import Data.ByteArray (Bytes, MemView (MemView),
- allocAndFreeze, convert)
-import Data.ByteArray.Encoding (Base (Base16),
- convertFromBase,
- convertToBase)
-import qualified Data.ByteString as BS
-import Data.ByteString.Builder.Extra (defaultChunkSize)
-import qualified Data.ByteString.Char8 as C8
-import qualified Data.ByteString.Lazy as BSL
-import Data.Foldable (foldlM)
-import Data.Functor.Contravariant
-import qualified Data.Hashable
-import qualified Data.HashMap.Lazy as HashMap
-import qualified Data.HashSet as HashSet
-import Data.Int
-import Data.List (sort)
-import Data.List.NonEmpty (NonEmpty)
-import Data.Map (Map)
-import qualified Data.Map as Map
-import Data.Ratio
-import Data.Scientific
-import Data.Store (Store (..), peekException)
-import qualified Data.Text as T
-import qualified Data.Text.Array as TA
-import qualified Data.Text.Encoding as TE
-import qualified Data.Text.Internal as T
-import qualified Data.Text.Lazy as TL
-import Data.Time.Clock (UTCTime)
-import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds)
-import Data.Typeable
-import qualified Data.Vector as V
-import Data.Word
-import qualified Database.SQLite.Simple.FromField as SQL
-import qualified Database.SQLite.Simple.ToField as SQL
-import Foreign.Marshal.Utils (with)
-import Foreign.Ptr (castPtr)
-import Foreign.Storable (Storable, sizeOf)
-import GHC.Fingerprint
-import GHC.Generics
-import GHC.Integer.GMP.Internals (BigNat (..), Integer (..))
-import GHC.Natural (Natural (..))
-import GHC.Prim (ByteArray#,
- copyByteArrayToAddr#,
- sizeofByteArray#)
-import GHC.Ptr (Ptr (Ptr))
-import GHC.Types (IO (IO), Int (I#), Word (W#))
-import qualified Path
-import qualified Path.Internal
-import qualified Path.IO
-import System.IO (IOMode (ReadMode),
- withBinaryFile)
-import System.IO.Error (isPermissionError)
-import System.IO.Unsafe (unsafePerformIO)
-import System.Posix.Files (fileSize, getFileStatus)
-
-
-newtype ContentHash = ContentHash { unContentHash :: Digest SHA256 }
- deriving (Eq, Ord, Generic)
-
-instance Aeson.FromJSON ContentHash where
- parseJSON (Aeson.String s)
- | Just h <- decodeHash (TE.encodeUtf8 s) = pure h
- | otherwise = fail "Invalid hash encoding"
- parseJSON invalid
- = Aeson.typeMismatch "ContentHash" invalid
-instance Aeson.ToJSON ContentHash where
- toJSON = Aeson.String . TE.decodeUtf8 . encodeHash
-
-instance Data.Hashable.Hashable ContentHash where
- hashWithSalt s = Data.Hashable.hashWithSalt s . encodeHash
-
-instance Show ContentHash where
- showsPrec d h = showParen (d > app_prec)
- $ showString "ContentHash \""
- . (showString $ C8.unpack $ encodeHash h)
- . showString "\""
- where app_prec = 10
-
-instance Store ContentHash where
- size = contramap toBytes size
- peek = fromBytes <$> peek >>= \case
- Nothing -> peekException "Store ContentHash: Illegal digest"
- Just x -> return x
- poke = poke . toBytes
-
-instance SQL.FromField ContentHash where
- fromField f = do
- bs <- SQL.fromField f
- case decodeHash bs of
- Just h -> pure h
- Nothing -> mzero
-
-instance SQL.ToField ContentHash where
- toField = SQL.toField . encodeHash
-
-toBytes :: ContentHash -> BS.ByteString
-toBytes = convert . unContentHash
-
-fromBytes :: BS.ByteString -> Maybe ContentHash
-fromBytes bs = ContentHash <$> digestFromByteString bs
-
-hashEncoding :: Base
-hashEncoding = Base16
-
--- | File path appropriate encoding of a hash
-encodeHash :: ContentHash -> BS.ByteString
-encodeHash = convertToBase hashEncoding . toBytes
-
--- | Inverse of 'encodeHash' if given a valid input.
---
--- prop> decodeHash (encodeHash x) = Just x
-decodeHash :: BS.ByteString -> Maybe ContentHash
-decodeHash bs = case convertFromBase hashEncoding bs of
- Left _ -> Nothing
- Right x -> fromBytes x
-
--- | File path appropriate encoding of a hash
-hashToPath :: ContentHash -> Path.Path Path.Rel Path.Dir
-hashToPath h =
- case Path.parseRelDir $ C8.unpack $ encodeHash h of
- Nothing -> error
- "[ContentHashable.hashToPath] \
- \Failed to convert hash to directory name"
- Just dir -> dir
-
-
--- | Inverse of 'hashToPath' if given a valid input.
---
--- prop> pathToHash (hashToPath x) = Just x
-pathToHash :: FilePath -> Maybe ContentHash
-pathToHash = decodeHash . C8.pack
-
-
-class Monad m => ContentHashable m a where
-
- -- | Update a hash context based on the given value.
- --
- -- See 'Crypto.Hash.hashUpdate'.
- --
- -- XXX: Consider swapping the arguments.
- contentHashUpdate :: Context SHA256 -> a -> m (Context SHA256)
-
- default contentHashUpdate :: (Generic a, GContentHashable m (Rep a))
- => Context SHA256 -> a -> m (Context SHA256)
- contentHashUpdate ctx a = gContentHashUpdate ctx (from a)
-
- -- | Generate hash of the given value.
- --
- -- See 'Crypto.Hash.hash'.
- contentHash :: a -> m ContentHash
- contentHash x = ContentHash . hashFinalize <$> contentHashUpdate hashInit x
-
-
--- | Update hash context based on binary in memory representation due to 'Foreign.Storable.Storable'.
---
--- XXX: Do we need to worry about endianness?
-contentHashUpdate_storable :: (Monad m, Storable a) => Context SHA256 -> a -> m (Context SHA256)
-contentHashUpdate_storable ctx a =
- return . unsafePerformIO $ with a (\p -> pure $! hashUpdate ctx (MemView (castPtr p) (sizeOf a)))
-
--- | Update hash context based on a type's 'GHC.Fingerprint.Type.Fingerprint'.
---
--- The fingerprint is constructed from the library-name, module-name, and name of the type itself.
-contentHashUpdate_fingerprint :: (Monad m, Typeable a) => Context SHA256 -> a -> m (Context SHA256)
-contentHashUpdate_fingerprint ctx = contentHashUpdate ctx . typeRepFingerprint . typeOf
-
--- | Update hash context by combining 'contentHashUpdate_fingerprint' and 'contentHashUpdate_storable'.
--- Intended for primitive types like 'Int'.
-contentHashUpdate_primitive :: (Monad m, Typeable a, Storable a) => Context SHA256 -> a -> m (Context SHA256)
-contentHashUpdate_primitive ctx a =
- flip contentHashUpdate_fingerprint a >=> flip contentHashUpdate_storable a $ ctx
-
--- | Update hash context based on binary contents of the given file.
-contentHashUpdate_binaryFile :: Context SHA256 -> FilePath -> IO (Context SHA256)
-contentHashUpdate_binaryFile ctx0 fp = withBinaryFile fp ReadMode $ \h ->
- let go ctx = do
- chunk <- BS.hGetSome h defaultChunkSize
- if BS.null chunk then
- pure ctx
- else
- go $! hashUpdate ctx chunk
- in go ctx0
-
--- | Update hash context based on 'GHC.Prim.ByteArray#'
--- by copying into a newly allocated 'Data.ByteArray.Bytes'
--- and updating the hash context from there.
---
--- XXX: @'GHC.Prim.byteArrayContents#' :: 'GHC.Prim.ByteArray#' -> 'GHC.Prim.Addr#'@
--- could be used together with 'Data.ByteArray.MemView' instead.
--- However, 'GHC.Prim.byteArrayContents#' explicitly says, that it is only safe to use
--- on a pinned 'GHC.Prim.ByteArray#'.
-contentHashUpdate_byteArray# :: ByteArray# -> Int -> Int -> Context SHA256 -> Context SHA256
-contentHashUpdate_byteArray# ba (I# off) (I# len) ctx = hashUpdate ctx $
- allocAndFreeze @Bytes (I# len) $ \(Ptr addr) -> IO $ \s ->
- (# copyByteArrayToAddr# ba off addr len s, () #)
-
--- | Update hash context based on the contents of a strict 'Data.Text.Text'.
-contentHashUpdate_text :: Context SHA256 -> T.Text -> Context SHA256
-contentHashUpdate_text ctx (T.Text arr off_ len_) =
- contentHashUpdate_byteArray# (TA.aBA arr) off len ctx
- where
- off = off_ `shiftL` 1 -- convert from 'Word16' to 'Word8'
- len = len_ `shiftL` 1 -- convert from 'Word16' to 'Word8'
-
-instance Monad m => ContentHashable m Fingerprint where
- contentHashUpdate ctx (Fingerprint a b) = flip contentHashUpdate_storable a >=> flip contentHashUpdate_storable b $ ctx
-
-instance Monad m => ContentHashable m Bool where contentHashUpdate = contentHashUpdate_primitive
-
-instance Monad m => ContentHashable m Char where contentHashUpdate = contentHashUpdate_primitive
-
-instance Monad m => ContentHashable m Int where contentHashUpdate = contentHashUpdate_primitive
-instance Monad m => ContentHashable m Int8 where contentHashUpdate = contentHashUpdate_primitive
-instance Monad m => ContentHashable m Int16 where contentHashUpdate = contentHashUpdate_primitive
-instance Monad m => ContentHashable m Int32 where contentHashUpdate = contentHashUpdate_primitive
-instance Monad m => ContentHashable m Int64 where contentHashUpdate = contentHashUpdate_primitive
-
-instance Monad m => ContentHashable m Word where contentHashUpdate = contentHashUpdate_primitive
-instance Monad m => ContentHashable m Word8 where contentHashUpdate = contentHashUpdate_primitive
-instance Monad m => ContentHashable m Word16 where contentHashUpdate = contentHashUpdate_primitive
-instance Monad m => ContentHashable m Word32 where contentHashUpdate = contentHashUpdate_primitive
-instance Monad m => ContentHashable m Word64 where contentHashUpdate = contentHashUpdate_primitive
-
-instance Monad m => ContentHashable m Float where contentHashUpdate = contentHashUpdate_primitive
-instance Monad m => ContentHashable m Double where contentHashUpdate = contentHashUpdate_primitive
-
-instance (ContentHashable m n, Typeable n) => ContentHashable m (Ratio n) where
- contentHashUpdate ctx x =
- flip contentHashUpdate_fingerprint x
- >=> flip contentHashUpdate (numerator x)
- >=> flip contentHashUpdate (denominator x)
- $ ctx
-
-instance Monad m => ContentHashable m Scientific where
- contentHashUpdate ctx x =
- flip contentHashUpdate_fingerprint x
- >=> flip contentHashUpdate (toRational x)
- $ ctx
-
-instance Monad m => ContentHashable m Integer where
- contentHashUpdate ctx n = ($ ctx) $
- flip contentHashUpdate_fingerprint n >=> case n of
- S# i ->
- pure . flip hashUpdate (C8.pack "S") -- tag constructur
- >=> flip contentHashUpdate_storable (I# i) -- hash field
- Jp# (BN# ba) ->
- pure . flip hashUpdate (C8.pack "L") -- tag constructur
- >=> pure . contentHashUpdate_byteArray# ba 0 (I# (sizeofByteArray# ba)) -- hash field
- Jn# (BN# ba) ->
- pure . flip hashUpdate (C8.pack "N") -- tag constructur
- >=> pure . contentHashUpdate_byteArray# ba 0 (I# (sizeofByteArray# ba)) -- hash field
-
-instance Monad m => ContentHashable m Natural where
- contentHashUpdate ctx n = ($ ctx) $
- flip contentHashUpdate_fingerprint n >=> case n of
- NatS# w ->
- pure . flip hashUpdate (C8.pack "S") -- tag constructur
- >=> flip contentHashUpdate_storable (W# w) -- hash field
- NatJ# (BN# ba) ->
- pure . flip hashUpdate (C8.pack "L") -- tag constructur
- >=> pure . contentHashUpdate_byteArray# ba 0 (I# (sizeofByteArray# ba)) -- hash field
-
-instance Monad m => ContentHashable m BS.ByteString where
- contentHashUpdate ctx s =
- flip contentHashUpdate_fingerprint s
- >=> pure . flip hashUpdate s $ ctx
-
-instance Monad m => ContentHashable m BSL.ByteString where
- contentHashUpdate ctx s =
- flip contentHashUpdate_fingerprint s
- >=> pure . flip (BSL.foldlChunks hashUpdate) s $ ctx
-
-instance Monad m => ContentHashable m T.Text where
- contentHashUpdate ctx s =
- flip contentHashUpdate_fingerprint s
- >=> pure . flip contentHashUpdate_text s $ ctx
-
-instance Monad m => ContentHashable m TL.Text where
- contentHashUpdate ctx s =
- flip contentHashUpdate_fingerprint s
- >=> pure . flip (TL.foldlChunks contentHashUpdate_text) s $ ctx
-
-instance (Typeable k, Typeable v, ContentHashable m k, ContentHashable m v)
- => ContentHashable m (Map k v) where
- contentHashUpdate ctx m =
- flip contentHashUpdate_fingerprint m
- >=> flip contentHashUpdate (Map.toList m) $ ctx
-
-instance (Typeable k, Typeable v, ContentHashable m k, ContentHashable m v)
- => ContentHashable m (HashMap.HashMap k v) where
- contentHashUpdate ctx m =
- flip contentHashUpdate_fingerprint m
- -- XXX: The order of the list is unspecified.
- >=> flip contentHashUpdate (HashMap.toList m) $ ctx
-
-instance (Typeable v, ContentHashable m v)
- => ContentHashable m (HashSet.HashSet v) where
- contentHashUpdate ctx s =
- flip contentHashUpdate_fingerprint s
- -- XXX: The order of the list is unspecified.
- >=> flip contentHashUpdate (HashSet.toList s) $ ctx
-
-instance (Typeable a, ContentHashable m a)
- => ContentHashable m [a] where
- contentHashUpdate ctx l =
- flip contentHashUpdate_fingerprint l
- >=> flip (foldM contentHashUpdate) l $ ctx
-
-instance (Typeable a, ContentHashable m a)
- => ContentHashable m (NonEmpty a) where
- contentHashUpdate ctx l =
- flip contentHashUpdate_fingerprint l
- >=> flip (foldlM contentHashUpdate) l $ ctx
-
-instance (Typeable a, ContentHashable m a)
- => ContentHashable m (V.Vector a) where
- contentHashUpdate ctx v =
- flip contentHashUpdate_fingerprint v
- >=> flip (V.foldM' contentHashUpdate) v $ ctx
-
-instance Monad m => ContentHashable m ()
-instance (ContentHashable m a, ContentHashable m b) => ContentHashable m (a, b)
-instance (ContentHashable m a, ContentHashable m b, ContentHashable m c) => ContentHashable m (a, b, c)
-instance (ContentHashable m a, ContentHashable m b, ContentHashable m c, ContentHashable m d) => ContentHashable m (a, b, c, d)
-instance (ContentHashable m a, ContentHashable m b, ContentHashable m c, ContentHashable m d, ContentHashable m e) => ContentHashable m (a, b, c, d, e)
-instance (Monad m, ContentHashable m a, ContentHashable m b, ContentHashable m c, ContentHashable m d, ContentHashable m e, ContentHashable m f) => ContentHashable m (a, b, c, d, e, f)
-instance (Monad m, ContentHashable m a, ContentHashable m b, ContentHashable m c, ContentHashable m d, ContentHashable m e, ContentHashable m f, ContentHashable m g) => ContentHashable m (a, b, c, d, e, f, g)
-
-instance ContentHashable m a => ContentHashable m (Maybe a)
-
-instance (ContentHashable m a, ContentHashable m b) => ContentHashable m (Either a b)
-
-instance Monad m => ContentHashable m Aeson.Value
-
-
-class Monad m => GContentHashable m f where
- gContentHashUpdate :: Context SHA256 -> f a -> m (Context SHA256)
-
-instance Monad m => GContentHashable m V1 where
- gContentHashUpdate ctx _ = pure ctx
-
-instance Monad m => GContentHashable m U1 where
- gContentHashUpdate ctx U1 = pure ctx
-
-instance ContentHashable m c => GContentHashable m (K1 i c) where
- gContentHashUpdate ctx x = contentHashUpdate ctx (unK1 x)
-
-instance (Constructor c, GContentHashable m f) => GContentHashable m (C1 c f) where
- gContentHashUpdate ctx0 x = gContentHashUpdate nameCtx (unM1 x)
- where nameCtx = hashUpdate ctx0 $ C8.pack (conName x)
-
-instance (Datatype d, GContentHashable m f) => GContentHashable m (D1 d f) where
- gContentHashUpdate ctx0 x = gContentHashUpdate packageCtx (unM1 x)
- where
- datatypeCtx = hashUpdate ctx0 $ C8.pack (datatypeName x)
- moduleCtx = hashUpdate datatypeCtx $ C8.pack (datatypeName x)
- packageCtx = hashUpdate moduleCtx $ C8.pack (datatypeName x)
-
-instance GContentHashable m f => GContentHashable m (S1 s f) where
- gContentHashUpdate ctx x = gContentHashUpdate ctx (unM1 x)
-
-instance (GContentHashable m a, GContentHashable m b) => GContentHashable m (a :*: b) where
- gContentHashUpdate ctx (x :*: y) = gContentHashUpdate ctx x >>= flip gContentHashUpdate y
-
-instance (GContentHashable m a, GContentHashable m b) => GContentHashable m (a :+: b) where
- gContentHashUpdate ctx (L1 x) = gContentHashUpdate ctx x
- gContentHashUpdate ctx (R1 x) = gContentHashUpdate ctx x
-
--- XXX: Do we need this?
--- instance GContentHashable (a :.: b) where
--- gContentHashUpdate ctx x = _ (unComp1 x)
-
-
-instance (Monad m, Typeable b, Typeable t) => ContentHashable m (Path.Path b t) where
- contentHashUpdate ctx p@(Path.Internal.Path fp) =
- flip contentHashUpdate_fingerprint p
- >=> flip contentHashUpdate fp
- $ ctx
-
-
--- | Path to a regular file
---
--- Only the file's content and its executable permission is taken into account
--- when generating the content hash. The path itself is ignored.
-newtype FileContent = FileContent (Path.Path Path.Abs Path.File)
-
-instance ContentHashable IO FileContent where
-
- contentHashUpdate ctx (FileContent fp) = do
- exec <- Path.IO.executable <$> Path.IO.getPermissions fp
- ctx' <- if exec then contentHashUpdate ctx () else pure ctx
- contentHashUpdate_binaryFile ctx' (Path.fromAbsFile fp)
-
--- | Path to a directory
---
--- Only the contents of the directory and their path relative to the directory
--- are taken into account when generating the content hash.
--- The path to the directory is ignored.
-newtype DirectoryContent = DirectoryContent (Path.Path Path.Abs Path.Dir)
-
-instance MonadIO m => ContentHashable m DirectoryContent where
-
- contentHashUpdate ctx0 (DirectoryContent dir0) = liftIO $ do
- (dirs, files) <- Path.IO.listDir dir0
- ctx' <- foldM hashFile ctx0 (sort files)
- foldM hashDir ctx' (sort dirs)
- where
- hashFile ctx fp =
- -- XXX: Do we need to treat symbolic links specially?
- flip contentHashUpdate (Path.filename fp)
- >=> flip contentHashUpdate (FileContent fp)
- $ ctx
- hashDir ctx dir =
- flip contentHashUpdate (Path.dirname dir)
- >=> flip contentHashUpdate (DirectoryContent dir)
- $ ctx
-
-instance Monad m => ContentHashable m UTCTime where
- contentHashUpdate ctx utcTime = let
- secondsSinceEpoch = fromEnum . utcTimeToPOSIXSeconds $ utcTime
- in flip contentHashUpdate_fingerprint utcTime
- >=> flip contentHashUpdate secondsSinceEpoch
- $ ctx
-
--- | Path to a file to be treated as _externally assured_.
---
--- An externally assured file is handled in a somewhat 'cheating' way by
--- funflow. The 'ContentHashable' instance for such assumes that some external
--- agent guarantees the integrity of the file being referenced. Thus, rather
--- than hashing the file contents, we only consider its (absolute) path, size and
--- modification time, which can be rapidly looked up from filesystem metadata.
---
--- For a similar approach, see the instance for 'ObjectInBucket' in
--- Control.Funflow.AWS.S3, where we exploit the fact that S3 is already
--- content hashed to avoid performing any hashing.
-newtype ExternallyAssuredFile = ExternallyAssuredFile (Path.Path Path.Abs Path.File)
- deriving (Generic, Show)
-
-instance Aeson.FromJSON ExternallyAssuredFile
-instance Aeson.ToJSON ExternallyAssuredFile
-instance Store ExternallyAssuredFile
-
-instance ContentHashable IO ExternallyAssuredFile where
- contentHashUpdate ctx (ExternallyAssuredFile fp) = do
- modTime <- Path.IO.getModificationTime fp
- fSize <- fileSize <$> getFileStatus (Path.toFilePath fp)
- flip contentHashUpdate fp
- >=> flip contentHashUpdate modTime
- >=> flip contentHashUpdate_storable fSize
- $ ctx
-
-
--- | Path to a directory to be treated as _externally assured_.
---
--- For an externally assured directory, we _do_ traverse its contents and verify
--- those as we would externally assured files, rather than just relying on the
--- directory path. Doing this traversal is pretty cheap, and it's quite likely
--- for directory contents to be modified without modifying the contents.
---
--- If an item in the directory cannot be read due to lacking permissions,
--- then it will be ignored and not included in the hash. If the flow does not
--- have permissions to access the contents of a subdirectory, then these
--- contents cannot influence the outcome of a task and it is okay to exclude
--- them from the hash. In that case we only hash the name, as that could
--- influence the outcome of a task.
-newtype ExternallyAssuredDirectory = ExternallyAssuredDirectory (Path.Path Path.Abs Path.Dir)
- deriving (Generic, Show)
-
-instance Aeson.FromJSON ExternallyAssuredDirectory
-instance Aeson.ToJSON ExternallyAssuredDirectory
-instance Store ExternallyAssuredDirectory
-
-instance ContentHashable IO ExternallyAssuredDirectory where
- contentHashUpdate ctx0 (ExternallyAssuredDirectory dir0) = do
- -- Note that we don't bother looking at the relative directory paths and
- -- including these in the hash. This is because the absolute hash gets
- -- included every time we hash a file.
- (dirs, files) <- Path.IO.listDir dir0
- ctx' <- foldM hashFile ctx0 (sort files)
- foldM hashDir ctx' (sort dirs)
- where
- hashFile ctx fp = contentHashUpdate ctx (ExternallyAssuredFile fp)
- `catchPermissionError` \_ -> contentHashUpdate ctx fp
- hashDir ctx dir = contentHashUpdate ctx (ExternallyAssuredDirectory dir)
- `catchPermissionError` \_ -> contentHashUpdate ctx dir
- catchPermissionError = catchJust $ \e ->
- if isPermissionError e then Just e else Nothing
diff --git a/src/Control/Funflow/ContentStore.hs b/src/Control/Funflow/ContentStore.hs
deleted file mode 100644
index b784f29..0000000
--- a/src/Control/Funflow/ContentStore.hs
+++ /dev/null
@@ -1,1032 +0,0 @@
-{-# LANGUAGE DeriveGeneric #-}
-{-# LANGUAGE FlexibleContexts #-}
-{-# LANGUAGE FlexibleInstances #-}
-{-# LANGUAGE GADTs #-}
-{-# LANGUAGE GeneralizedNewtypeDeriving #-}
-{-# LANGUAGE LambdaCase #-}
-{-# LANGUAGE MultiParamTypeClasses #-}
-{-# LANGUAGE NamedFieldPuns #-}
-{-# LANGUAGE OverloadedStrings #-}
-{-# LANGUAGE PatternSynonyms #-}
-{-# LANGUAGE QuasiQuotes #-}
-{-# LANGUAGE RecordWildCards #-}
-{-# LANGUAGE ScopedTypeVariables #-}
-{-# LANGUAGE StandaloneDeriving #-}
-
--- | Hash addressed store in file system.
---
--- Associates a key ('Control.Funflow.ContentHashable.ContentHash')
--- with an item in the store. An item can either be
--- 'Control.Funflow.ContentStore.Missing',
--- 'Control.Funflow.ContentStore.Pending', or
--- 'Control.Funflow.ContentStore.Complete'.
--- The state is persisted in the file system.
---
--- Items are stored under a path derived from their hash. Therefore,
--- there can be no two copies of the same item in the store.
--- If two keys are associated with the same item, then there will be
--- only one copy of that item in the store.
---
--- The store is thread-safe and multi-process safe.
---
--- It is assumed that the user that the process is running under is the owner
--- of the store root, or has permission to create it if missing.
---
--- It is assumed that the store root and its immediate contents are not modified
--- externally. The contents of pending items may be modified externally.
---
--- __Implementation notes:__
---
--- The hash of an item can only be determined once it is completed.
--- If that hash already exists in the store, then the new item is discarded.
---
--- Store state is persisted in the file-system:
---
--- * Pending items are stored writable under the path @pending-\<key>@.
--- * Complete items are stored read-only under the path @item-\<hash>@,
--- with a link under @complete-\<key>@ pointing to that directory.
-module Control.Funflow.ContentStore
- (
- -- * Open/Close
- withStore
- , open
- , close
-
- -- * List Contents
- , listAll
- , listPending
- , listComplete
- , listItems
-
- -- * Query/Lookup
- , query
- , isMissing
- , isPending
- , isComplete
- , lookup
- , lookupOrWait
- , waitUntilComplete
-
- -- * Construct Items
- , constructOrAsync
- , constructOrWait
- , constructIfMissing
- , withConstructIfMissing
- , markPending
- , markComplete
-
- -- * Remove Contents
- , removeFailed
- , removeForcibly
- , removeItemForcibly
-
- -- * Aliases
- , assignAlias
- , lookupAlias
- , removeAlias
- , listAliases
-
- -- * Metadata
- , getBackReferences
- , setInputs
- , getInputs
- , setMetadata
- , getMetadata
- , createMetadataFile
- , getMetadataFile
-
- -- * Accessors
- , itemHash
- , itemPath
- , itemRelPath
- , contentPath
- , contentItem
- , contentFilename
- , root
-
- -- * Types
- , ContentStore
- , Item
- , Content (..)
- , (^</>)
- , Alias (..)
- , Status (..)
- , Status_
- , Update (..)
- , StoreError (..)
- ) where
-
-
-import Prelude hiding (lookup)
-
-import Control.Arrow (second)
-import Control.Concurrent (threadDelay)
-import Control.Concurrent.Async
-import Control.Concurrent.MVar
-import Control.Exception.Safe (Exception, MonadMask,
- bracket, bracketOnError,
- bracket_,
- displayException, throwIO)
-import Control.Funflow.ContentStore.Notify
-import Control.Funflow.Orphans ()
-import Control.Lens
-import Control.Monad (forM_, forever, unless,
- void, when, (<=<), (>=>))
-import Control.Monad.IO.Class (MonadIO, liftIO)
-import Control.Monad.Trans.Control (MonadBaseControl)
-import Crypto.Hash (hashUpdate)
-import Data.Aeson (FromJSON, ToJSON)
-import Data.Bits (complement)
-import qualified Data.ByteString.Char8 as C8
-import Data.Foldable (asum)
-import qualified Data.Hashable
-import Data.List (foldl', stripPrefix)
-import Data.Maybe (fromMaybe, listToMaybe)
-import Data.Monoid ((<>))
-import qualified Data.Store
-import Data.String (IsString (..))
-import qualified Data.Text as T
-import Data.Typeable (Typeable)
-import Data.Void
-import qualified Database.SQLite.Simple as SQL
-import qualified Database.SQLite.Simple.FromField as SQL
-import qualified Database.SQLite.Simple.ToField as SQL
-import GHC.Generics (Generic)
-import Path
-import Path.IO
-import System.Directory (removePathForcibly)
-import System.FilePath (dropTrailingPathSeparator)
-import System.IO (Handle, IOMode (..),
- openFile)
-import System.Posix.Files
-import System.Posix.Types
-
-import Control.Funflow.ContentHashable (ContentHash,
- ContentHashable (..),
- DirectoryContent (..),
- contentHashUpdate_fingerprint,
- encodeHash, pathToHash,
- toBytes)
-import Control.Funflow.Lock
-import qualified Control.Funflow.RemoteCache as Remote
-
-
--- | Status of an item in the store.
-data Status missing pending complete
- = Missing missing
- -- ^ The item does not exist, yet.
- | Pending pending
- -- ^ The item is under construction and not ready for consumption.
- | Complete complete
- -- ^ The item is complete and ready for consumption.
- deriving (Eq, Show)
-
-type Status_ = Status () () ()
-
--- | Update about the status of a pending item.
-data Update
- = Completed Item
- -- ^ The item is now completed and ready for consumption.
- | Failed
- -- ^ Constructing the item failed.
- deriving (Eq, Show)
-
--- | Errors that can occur when interacting with the store.
-data StoreError
- = NotPending ContentHash
- -- ^ An item is not under construction when it should be.
- | AlreadyPending ContentHash
- -- ^ An item is already under construction when it should be missing.
- | AlreadyComplete ContentHash
- -- ^ An item is already complete when it shouldn't be.
- | CorruptedLink ContentHash FilePath
- -- ^ The link under the given hash points to an invalid path.
- | FailedToConstruct ContentHash
- -- ^ A failure occurred while waiting for the item to be constructed.
- | IncompatibleStoreVersion (Path Abs Dir) Int Int
- -- ^ @IncompatibleStoreVersion storeDir actual expected@
- -- The given store has a version number that is incompatible.
- | MalformedMetadataEntry ContentHash SQL.SQLData
- -- ^ @MalformedMetadataEntry hash key@
- -- The metadata entry for the give @hash@, @key@ pair is malformed.
- deriving (Show, Typeable)
-instance Exception StoreError where
- displayException = \case
- NotPending hash ->
- "The following input hash is not pending '"
- ++ C8.unpack (encodeHash hash)
- ++ "'."
- AlreadyPending hash ->
- "The following input hash is already pending '"
- ++ C8.unpack (encodeHash hash)
- ++ "'."
- AlreadyComplete hash ->
- "The following input hash is already completed '"
- ++ C8.unpack (encodeHash hash)
- ++ "'."
- CorruptedLink hash fp ->
- "The completed input hash '"
- ++ C8.unpack (encodeHash hash)
- ++ "' points to an invalid store item '"
- ++ fp
- ++ "'."
- FailedToConstruct hash ->
- "Failed to construct the input hash '"
- ++ C8.unpack (encodeHash hash)
- ++ "'."
- IncompatibleStoreVersion storeDir actual expected ->
- "The store in '"
- ++ fromAbsDir storeDir
- ++ "' has version "
- ++ show actual
- ++ ". This software expects version "
- ++ show expected
- ++ ". No automatic migration is available, \
- \please use a fresh store location."
- MalformedMetadataEntry hash key ->
- "The metadtaa entry for hash '"
- ++ C8.unpack (encodeHash hash)
- ++ "' under key '"
- ++ show key
- ++ "' is malformed."
-
--- | A hash addressed store on the file system.
-data ContentStore = ContentStore
- { storeRoot :: Path Abs Dir
- -- ^ Root directory of the content store.
- -- The process must be able to create this directory if missing,
- -- change permissions, and create files and directories within.
- , storeLock :: Lock
- -- ^ Write lock on store metadata to ensure multi thread and process safety.
- -- The lock is taken when item state is changed or queried.
- , storeNotifier :: Notifier
- -- ^ Used to watch for updates on store items.
- , storeDb :: SQL.Connection
- -- ^ Connection to the metadata SQLite database.
- }
-
--- | A completed item in the 'ContentStore'.
-data Item = Item { itemHash :: ContentHash }
- deriving (Eq, Ord, Show, Generic)
-
-instance Monad m => ContentHashable m Item where
- contentHashUpdate ctx item =
- flip contentHashUpdate_fingerprint item
- >=> pure . flip hashUpdate (toBytes $ itemHash item)
- $ ctx
-
-instance FromJSON Item
-instance ToJSON Item
-instance Data.Hashable.Hashable Item
-instance Data.Store.Store Item
-
--- | File or directory within a content store 'Item'.
-data Content t where
- All :: Item -> Content Dir
- (:</>) :: Item -> Path Rel t -> Content t
-infixr 5 :</>
-deriving instance Eq (Content t)
-deriving instance Show (Content t)
-instance Monad m => ContentHashable m (Content Dir) where
- contentHashUpdate ctx x = case x of
- All i ->
- flip contentHashUpdate_fingerprint x
- >=> flip contentHashUpdate i
- $ ctx
- i :</> p ->
- flip contentHashUpdate_fingerprint x
- >=> flip contentHashUpdate i
- >=> flip contentHashUpdate p
- $ ctx
-instance Monad m => ContentHashable m (Content File) where
- contentHashUpdate ctx x = case x of
- i :</> p ->
- flip contentHashUpdate_fingerprint x
- >=> flip contentHashUpdate i
- >=> flip contentHashUpdate p
- $ ctx
-
--- | Append to the path within a store item.
-(^</>) :: Content Dir -> Path Rel t -> Content t
-All item ^</> path = item :</> path
-(item :</> dir) ^</> path = item :</> dir </> path
-infixl 4 ^</>
-
-newtype Alias = Alias { unAlias :: T.Text }
- deriving (ContentHashable IO, Eq, Ord, Show, SQL.FromField, SQL.ToField, Data.Store.Store)
-
--- | The root directory of the store.
-root :: ContentStore -> Path Abs Dir
-root = storeRoot
-
--- | The scoped path to a content item within the store.
-itemRelPath :: Item -> Path Rel Dir
-itemRelPath (Item x) = prefixHashPath itemPrefix x
-
--- | The store path of a completed item.
-itemPath :: ContentStore -> Item -> Path Abs Dir
-itemPath store = mkItemPath store . itemHash
-
--- | Store item containing the given content.
-contentItem :: Content t -> Item
-contentItem (All i) = i
-contentItem (i :</> _) = i
-
-contentFilename :: Content File -> Path Rel File
-contentFilename (_ :</> relPath) = filename relPath
-
--- | The absolute path to content within the store.
-contentPath :: ContentStore -> Content t -> Path Abs t
-contentPath store (All item) = itemPath store item
-contentPath store (item :</> dir) = itemPath store item </> dir
-
--- | @open root@ opens a store under the given root directory.
---
--- The root directory is created if necessary.
---
--- It is not safe to have multiple store objects
--- refer to the same root directory.
-open :: Path Abs Dir -> IO ContentStore
-open storeRoot = do
- createDirIfMissing True storeRoot
- storeLock <- openLock (lockPath storeRoot)
- withLock storeLock $ withWritableStoreRoot storeRoot $ do
- storeDb <- SQL.open (fromAbsFile $ dbPath storeRoot)
- initDb storeRoot storeDb
- createDirIfMissing True (metadataPath storeRoot)
- storeNotifier <- initNotifier
- return ContentStore {..}
-
--- | Free the resources associated with the given store object.
---
--- The store object may not be used afterwards.
-close :: ContentStore -> IO ()
-close store = do
- closeLock (storeLock store)
- killNotifier (storeNotifier store)
- SQL.close (storeDb store)
-
--- | Open the store under the given root and perform the given action.
--- Closes the store once the action is complete
---
--- See also: 'Control.Funflow.ContentStore.open'
-withStore :: (MonadIO m, MonadMask m)
- => Path Abs Dir -> (ContentStore -> m a) -> m a
-withStore root' = bracket (liftIO $ open root') (liftIO . close)
-
--- | List all elements in the store
--- @(pending keys, completed keys, completed items)@.
-listAll :: MonadIO m => ContentStore -> m ([ContentHash], [ContentHash], [Item])
-listAll ContentStore {storeRoot} = liftIO $
- foldr go ([], [], []) . fst <$> listDir storeRoot
- where
- go d prev@(builds, outs, items) = fromMaybe prev $ asum
- [ parsePending d >>= \x -> Just (x:builds, outs, items)
- , parseComplete d >>= \x -> Just (builds, x:outs, items)
- , parseItem d >>= \x -> Just (builds, outs, x:items)
- ]
- parsePending :: Path Abs Dir -> Maybe ContentHash
- parsePending = pathToHash <=< stripPrefix pendingPrefix . extractDir
- parseComplete :: Path Abs Dir -> Maybe ContentHash
- parseComplete = pathToHash <=< stripPrefix completePrefix . extractDir
- parseItem :: Path Abs Dir -> Maybe Item
- parseItem = fmap Item . pathToHash <=< stripPrefix itemPrefix . extractDir
- extractDir :: Path Abs Dir -> FilePath
- extractDir = dropTrailingPathSeparator . fromRelDir . dirname
-
--- | List all pending keys in the store.
-listPending :: MonadIO m => ContentStore -> m [ContentHash]
-listPending = fmap (^._1) . listAll
-
--- | List all completed keys in the store.
-listComplete :: MonadIO m => ContentStore -> m [ContentHash]
-listComplete = fmap (^._2) . listAll
-
--- | List all completed items in the store.
-listItems :: MonadIO m => ContentStore -> m [Item]
-listItems = fmap (^._3) . listAll
-
--- | Query the state of the item under the given key.
-query :: MonadIO m => ContentStore -> ContentHash -> m (Status () () ())
-query store hash = liftIO . withStoreLock store $
- internalQuery store hash >>= pure . \case
- Missing _ -> Missing ()
- Pending _ -> Pending ()
- Complete _ -> Complete ()
-
--- | Check if there is no complete or pending item under the given key.
-isMissing :: MonadIO m => ContentStore -> ContentHash -> m Bool
-isMissing store hash = (== Missing ()) <$> query store hash
-
--- | Check if there is a pending item under the given key.
-isPending :: MonadIO m => ContentStore -> ContentHash -> m Bool
-isPending store hash = (== Pending ()) <$> query store hash
-
--- | Check if there is a completed item under the given key.
-isComplete :: MonadIO m => ContentStore -> ContentHash -> m Bool
-isComplete store hash = (== Complete ()) <$> query store hash
-
--- | Query the state under the given key and return the item if completed.
--- Doesn't block if the item is pending.
-lookup :: MonadIO m => ContentStore -> ContentHash -> m (Status () () Item)
-lookup store hash = liftIO . withStoreLock store $
- internalQuery store hash >>= \case
- Missing () -> return $ Missing ()
- Pending _ -> return $ Pending ()
- Complete item -> return $ Complete item
-
--- | Query the state under the given key and return the item if completed.
--- Return an 'Control.Concurrent.Async' to await an update, if pending.
-lookupOrWait
- :: MonadIO m
- => ContentStore
- -> ContentHash
- -> m (Status () (Async Update) Item)
-lookupOrWait store hash = liftIO . withStoreLock store $
- internalQuery store hash >>= \case
- Complete item -> return $ Complete item
- Missing () -> return $ Missing ()
- Pending _ -> Pending <$> internalWatchPending store hash
-
--- | Query the state under the given key and return the item once completed.
--- Blocks if the item is pending.
--- Returns 'Nothing' if the item is missing, or failed to be completed.
-waitUntilComplete :: MonadIO m => ContentStore -> ContentHash -> m (Maybe Item)
-waitUntilComplete store hash = lookupOrWait store hash >>= \case
- Complete item -> return $ Just item
- Missing () -> return Nothing
- Pending a -> liftIO (wait a) >>= \case
- Completed item -> return $ Just item
- Failed -> return Nothing
-
--- | Atomically query the state under the given key and mark pending if missing.
---
--- Returns @'Complete' item@ if the item is complete.
--- Returns @'Pending' async@ if the item is pending, where @async@ is an
--- 'Control.Concurrent.Async' to await updates on.
--- Returns @'Missing' buildDir@ if the item was missing, and is now pending.
--- It should be constructed in the given @buildDir@,
--- and then marked as complete using 'markComplete'.
-constructOrAsync
- :: forall m remoteCache.
- (MonadIO m, MonadBaseControl IO m, MonadMask m, Remote.Cacher m remoteCache)
- => ContentStore
- -> remoteCache
- -> ContentHash
- -> m (Status (Path Abs Dir) (Async Update) Item)
-constructOrAsync store cacher hash =
- constructIfMissing store cacher hash >>= \case
- Complete item -> return $ Complete item
- Missing path -> return $ Missing path
- Pending _ -> Pending <$> liftIO (internalWatchPending store hash)
-
--- | Atomically query the state under the given key and mark pending if missing.
--- Wait for the item to be completed, if already pending.
--- Throws a 'FailedToConstruct' error if construction fails.
---
--- Returns @'Complete' item@ if the item is complete.
--- Returns @'Missing' buildDir@ if the item was missing, and is now pending.
--- It should be constructed in the given @buildDir@,
--- and then marked as complete using 'markComplete'.
-constructOrWait
- :: (MonadIO m, MonadMask m, MonadBaseControl IO m, Remote.Cacher m remoteCache)
- => ContentStore
- -> remoteCache
- -> ContentHash
- -> m (Status (Path Abs Dir) Void Item)
-constructOrWait store cacher hash = constructOrAsync store cacher hash >>= \case
- Pending a -> liftIO (wait a) >>= \case
- Completed item -> return $ Complete item
- -- XXX: Consider extending 'Status' with a 'Failed' constructor.
- -- If the store contains metadata as well, it could keep track of the
- -- number of failed attempts and further details about the failure.
- -- If an external task is responsible for the failure, the client could
- -- choose to resubmit a certain number of times.
- Failed -> liftIO . throwIO $ FailedToConstruct hash
- Complete item -> return $ Complete item
- Missing dir -> return $ Missing dir
-
--- | Atomically query the state under the given key and mark pending if missing.
-constructIfMissing
- :: (MonadIO m, MonadBaseControl IO m, MonadMask m, Remote.Cacher m remoteCache)
- => ContentStore
- -> remoteCache
- -> ContentHash
- -> m (Status (Path Abs Dir) () Item)
-constructIfMissing store cacher hash = withStoreLock store $
- internalQuery store hash >>= \case
- Complete item -> return $ Complete item
- Missing () -> withWritableStore store $ do
- let destDir :: Path Abs Dir = mkItemPath store hash
- Remote.pull cacher hash destDir >>= \case
- Remote.PullOK () -> return $ Complete (Item hash)
- Remote.NotInCache ->
- Missing <$> liftIO (internalMarkPending store hash)
- Remote.PullError _ ->
- Missing <$> liftIO (internalMarkPending store hash)
- Pending _ -> return $ Pending ()
-
--- | Atomically query the state under the given key and mark pending if missing.
--- Execute the given function to construct the item, mark as complete on success
--- and remove on failure. Forcibly removes if an uncaught exception occurs
--- during item construction.
-withConstructIfMissing
- :: (MonadIO m, MonadBaseControl IO m, MonadMask m, Remote.Cacher m remoteCache)
- => ContentStore
- -> remoteCache
- -> ContentHash
- -> (Path Abs Dir -> m (Either e a))
- -> m (Status e () (Maybe a, Item))
-withConstructIfMissing store cacher hash f =
- bracketOnError
- (constructIfMissing store cacher hash)
- (\case
- Missing _ -> removeForcibly store hash
- _ -> return ())
- (\case
- Pending () -> return (Pending ())
- Complete item -> return (Complete (Nothing, item))
- Missing fp -> f fp >>= \case
- Left e -> do
- removeFailed store hash
- return (Missing e)
- Right x -> do
- item <- markComplete store hash
- _ <- Remote.push cacher (itemHash item) (Just hash) (itemPath store item)
- return (Complete (Just x, item)))
-
--- | Mark a non-existent item as pending.
---
--- Creates the build directory and returns its path.
---
--- See also: 'Control.Funflow.ContentStore.constructIfMissing'.
-markPending :: MonadIO m => ContentStore -> ContentHash -> m (Path Abs Dir)
-markPending store hash = liftIO . withStoreLock store $
- internalQuery store hash >>= \case
- Complete _ -> throwIO (AlreadyComplete hash)
- Pending _ -> throwIO (AlreadyPending hash)
- Missing () -> withWritableStore store $
- internalMarkPending store hash
-
--- | Mark a pending item as complete.
-markComplete :: MonadIO m => ContentStore -> ContentHash -> m Item
-markComplete store inHash = liftIO . withStoreLock store $
- internalQuery store inHash >>= \case
- Missing () -> throwIO (NotPending inHash)
- Complete _ -> throwIO (AlreadyComplete inHash)
- Pending build -> withWritableStore store $ liftIO $ do
- do
- let metadataDir = mkMetadataDirPath store inHash
- exists <- doesDirExist metadataDir
- when exists $
- unsetWritableRecursively metadataDir
- -- XXX: Hashing large data can take some time,
- -- could we avoid locking the store for all that time?
- outHash <- contentHash (DirectoryContent build)
- let out = mkItemPath store outHash
- link' = mkCompletePath store inHash
- doesDirExist out >>= \case
- True -> removePathForcibly (fromAbsDir build)
- False -> do
- renameDir build out
- unsetWritableRecursively out
- rel <- makeRelative (parent link') out
- let from' = dropTrailingPathSeparator $ fromAbsDir link'
- to' = dropTrailingPathSeparator $ fromRelDir rel
- createSymbolicLink to' from'
- addBackReference store inHash (Item outHash)
- pure $! Item outHash
-
--- | Remove a pending item.
---
--- It is the callers responsibility to ensure that no other threads or processes
--- will attempt to access the item's contents afterwards.
-removeFailed :: MonadIO m => ContentStore -> ContentHash -> m ()
-removeFailed store hash = liftIO . withStoreLock store $
- internalQuery store hash >>= \case
- Missing () -> throwIO (NotPending hash)
- Complete _ -> throwIO (AlreadyComplete hash)
- Pending build -> withWritableStore store $
- removePathForcibly (fromAbsDir build)
-
--- | Remove a key association independent of the corresponding item state.
--- Do nothing if no item exists under the given key.
---
--- It is the callers responsibility to ensure that no other threads or processes
--- will attempt to access the contents afterwards.
---
--- Note, this will leave an orphan item behind if no other keys point to it.
--- There is no garbage collection mechanism in place at the moment.
-removeForcibly :: MonadIO m => ContentStore -> ContentHash -> m ()
-removeForcibly store hash = liftIO . withStoreLock store $ withWritableStore store $
- internalQuery store hash >>= \case
- Missing () -> pure ()
- Pending build -> liftIO $ removePathForcibly (fromAbsDir build)
- Complete _out -> liftIO $
- removePathForcibly $
- dropTrailingPathSeparator $ fromAbsDir $ mkCompletePath store hash
- -- XXX: This will leave orphan store items behind.
- -- Add GC in some form.
-
--- | Remove a completed item in the store.
--- Do nothing if not completed.
---
--- It is the callers responsibility to ensure that no other threads or processes
--- will attempt to access the contents afterwards.
---
--- Note, this will leave keys pointing to that item dangling.
--- There is no garbage collection mechanism in place at the moment.
-removeItemForcibly :: MonadIO m => ContentStore -> Item -> m ()
-removeItemForcibly store item = liftIO . withStoreLock store $ withWritableStore store $
- removePathForcibly (fromAbsDir $ itemPath store item)
- -- XXX: Remove dangling links.
- -- Add back-references in some form.
-
--- | Link the given alias to the given item.
--- If the alias existed before it is overwritten.
-assignAlias :: MonadIO m => ContentStore -> Alias -> Item -> m ()
-assignAlias store alias item =
- liftIO . withStoreLock store $ withWritableStore store $ do
- hash <- contentHash alias
- SQL.executeNamed (storeDb store)
- "INSERT OR REPLACE INTO\
- \ aliases\
- \ VALUES\
- \ (:hash, :dest, :name)"
- [ ":hash" SQL.:= hash
- , ":dest" SQL.:= itemHash item
- , ":name" SQL.:= alias
- ]
-
--- | Lookup an item under the given alias.
--- Returns 'Nothing' if the alias does not exist.
-lookupAlias :: MonadIO m => ContentStore -> Alias -> m (Maybe Item)
-lookupAlias store alias =
- liftIO . withStoreLock store $ do
- hash <- contentHash alias
- r <- SQL.queryNamed (storeDb store)
- "SELECT dest FROM aliases\
- \ WHERE\
- \ hash = :hash"
- [ ":hash" SQL.:= hash ]
- pure $! listToMaybe $ Item . SQL.fromOnly <$> r
-
--- | Remove the given alias.
-removeAlias :: MonadIO m => ContentStore -> Alias -> m ()
-removeAlias store alias =
- liftIO . withStoreLock store $ withWritableStore store $ do
- hash <- contentHash alias
- SQL.executeNamed (storeDb store)
- "DELETE FROM aliases\
- \ WHERE\
- \ hash = :hash"
- [ ":hash" SQL.:= hash ]
-
--- | List all aliases and the respective items.
-listAliases :: MonadIO m => ContentStore -> m [(Alias, Item)]
-listAliases store = liftIO . withStoreLock store $
- fmap (map (second Item)) $
- SQL.query_ (storeDb store)
- "SELECT name, dest FROM aliases"
-
--- | Get all hashes that resulted in the given item.
-getBackReferences :: MonadIO m => ContentStore -> Item -> m [ContentHash]
-getBackReferences store (Item outHash) = liftIO . withStoreLock store $
- map SQL.fromOnly <$> SQL.queryNamed (storeDb store)
- "SELECT hash FROM backrefs\
- \ WHERE\
- \ dest = :out"
- [ ":out" SQL.:= outHash ]
-
--- | Define the input items to a subtree.
-setInputs :: MonadIO m => ContentStore -> ContentHash -> [Item] -> m ()
-setInputs store hash items = liftIO $
- withStoreLock store $
- withWritableStore store $
- internalQuery store hash >>= \case
- Pending _ -> forM_ items $ \(Item input) ->
- SQL.executeNamed (storeDb store)
- "INSERT OR REPLACE INTO\
- \ inputs (hash, input)\
- \ VALUES\
- \ (:hash, :input)"
- [ ":hash" SQL.:= hash
- , ":input" SQL.:= input
- ]
- _ -> throwIO $ NotPending hash
-
--- | Get the input items to a subtree if any were defined.
-getInputs :: MonadIO m => ContentStore -> ContentHash -> m [Item]
-getInputs store hash = liftIO . withStoreLock store $
- map (Item . SQL.fromOnly) <$> SQL.queryNamed (storeDb store)
- "SELECT input FROM inputs\
- \ WHERE\
- \ hash = :hash"
- [ ":hash" SQL.:= hash ]
-
--- | Set a metadata entry on an item.
-setMetadata :: (SQL.ToField k, SQL.ToField v, MonadIO m )
- => ContentStore -> ContentHash -> k -> v -> m ()
-setMetadata store hash k v = liftIO $
- withStoreLock store $
- withWritableStore store $
- SQL.executeNamed (storeDb store)
- "INSERT OR REPLACE INTO\
- \ metadata (hash, key, value)\
- \ VALUES\
- \ (:hash, :key, :value)"
- [ ":hash" SQL.:= hash
- , ":key" SQL.:= k
- , ":value" SQL.:= v
- ]
-
--- | Retrieve a metadata entry on an item, or 'Nothing' if missing.
-getMetadata :: (SQL.ToField k, SQL.FromField v, MonadIO m)
- => ContentStore -> ContentHash -> k -> m (Maybe v)
-getMetadata store hash k = liftIO . withStoreLock store $ do
- r <- SQL.queryNamed (storeDb store)
- "SELECT value FROM metadata\
- \ WHERE\
- \ (hash = :hash AND key = :key)"
- [ ":hash" SQL.:= hash
- , ":key" SQL.:= k
- ]
- case r of
- [] -> pure Nothing
- [[v]] -> pure $ Just v
- _ -> throwIO $ MalformedMetadataEntry hash (SQL.toField k)
-
--- | Create and open a new metadata file on a pending item in write mode.
-createMetadataFile
- :: MonadIO m
- => ContentStore -> ContentHash -> Path Rel File -> m (Path Abs File, Handle)
-createMetadataFile store hash file = liftIO . withStoreLock store $
- internalQuery store hash >>= \case
- Pending _ -> do
- let path = mkMetadataFilePath store hash file
- createDirIfMissing True (parent path)
- handle <- openFile (fromAbsFile path) WriteMode
- pure (path, handle)
- _ -> throwIO $ NotPending hash
-
--- | Return the path to a metadata file if it exists.
-getMetadataFile
- :: MonadIO m
- => ContentStore -> ContentHash -> Path Rel File -> m (Maybe (Path Abs File))
-getMetadataFile store hash file = liftIO . withStoreLock store $ do
- let path = mkMetadataFilePath store hash file
- exists <- doesFileExist path
- if exists then
- pure $ Just path
- else
- pure Nothing
-
-----------------------------------------------------------------------
--- Internals
-
-lockPath :: Path Abs Dir -> Path Abs Dir
-lockPath = (</> [reldir|lock|])
-
-dbPath :: Path Abs Dir -> Path Abs File
-dbPath = (</> [relfile|metadata.db|])
-
-metadataPath :: Path Abs Dir -> Path Abs Dir
-metadataPath = (</> [reldir|metadata|])
-
--- | Holds a lock on the global 'MVar' and on the global lock file
--- for the duration of the given action.
-withStoreLock :: MonadBaseControl IO m => ContentStore -> m a -> m a
-withStoreLock store = withLock (storeLock store)
-
-prefixHashPath :: C8.ByteString -> ContentHash -> Path Rel Dir
-prefixHashPath pref hash
- | Just dir <- Path.parseRelDir $ C8.unpack $ pref <> encodeHash hash
- = dir
- | otherwise = error
- "[Control.Funflow.ContentStore.prefixHashPath] \
- \Failed to construct hash path."
-
-pendingPrefix, completePrefix, hashPrefix, itemPrefix :: IsString s => s
-pendingPrefix = "pending-"
-completePrefix = "complete-"
-hashPrefix = "hash-"
-itemPrefix = "item-"
-
--- | Return the full build path for the given input hash.
-mkPendingPath :: ContentStore -> ContentHash -> Path Abs Dir
-mkPendingPath ContentStore {storeRoot} hash =
- storeRoot </> prefixHashPath pendingPrefix hash
-
--- | Return the full link path for the given input hash.
-mkCompletePath :: ContentStore -> ContentHash -> Path Abs Dir
-mkCompletePath ContentStore {storeRoot} hash =
- storeRoot </> prefixHashPath completePrefix hash
-
--- | Return the full store path to the given output hash.
-mkItemPath :: ContentStore -> ContentHash -> Path Abs Dir
-mkItemPath ContentStore {storeRoot} hash =
- storeRoot </> prefixHashPath itemPrefix hash
-
--- | Return the full store path to the given metadata directory.
-mkMetadataDirPath :: ContentStore -> ContentHash -> Path Abs Dir
-mkMetadataDirPath ContentStore {storeRoot} hash =
- metadataPath storeRoot </> prefixHashPath hashPrefix hash
-
--- | Return the full store path to the given metadata file.
-mkMetadataFilePath
- :: ContentStore -> ContentHash -> Path Rel File -> Path Abs File
-mkMetadataFilePath store hash file =
- mkMetadataDirPath store hash </> file
-
--- | Query the state under the given key without taking a lock.
-internalQuery
- :: MonadIO m
- => ContentStore
- -> ContentHash
- -> m (Status () (Path Abs Dir) Item)
-internalQuery store inHash = liftIO $ do
- let build = mkPendingPath store inHash
- link' = mkCompletePath store inHash
- buildExists <- doesDirExist build
- if buildExists then
- pure $! Pending build
- else do
- linkExists <- doesDirExist link'
- if linkExists then do
- out <- readSymbolicLink
- (dropTrailingPathSeparator $ fromAbsDir link')
- case pathToHash =<< stripPrefix itemPrefix out of
- Nothing -> throwIO $ CorruptedLink inHash out
- Just outHash -> return $ Complete (Item outHash)
- else
- pure $! Missing ()
-
--- | Create the build directory for the given input hash
--- and make the metadata directory writable if it exists.
-internalMarkPending :: ContentStore -> ContentHash -> IO (Path Abs Dir)
-internalMarkPending store hash = do
- let dir = mkPendingPath store hash
- createDir dir
- setDirWritable dir
- let metadataDir = mkMetadataDirPath store hash
- metadirExists <- doesDirExist metadataDir
- when metadirExists $
- setWritableRecursively metadataDir
- return dir
-
--- | Watch the build directory of the pending item under the given key.
--- The returned 'Async' completes after the item is completed or failed.
-internalWatchPending
- :: ContentStore
- -> ContentHash
- -> IO (Async Update)
-internalWatchPending store hash = do
- let build = mkPendingPath store hash
- -- Add an inotify/kqueue watch and give a signal on relevant events.
- let notifier = storeNotifier store
- signal <- newEmptyMVar
- -- Signal the listener. If the 'MVar' is full,
- -- the listener didn't handle earlier signals, yet.
- let giveSignal = void $ tryPutMVar signal ()
- watch <- addDirWatch notifier (fromAbsDir build) giveSignal
- -- Additionally, poll on regular intervals.
- -- Inotify/Kqueue don't cover all cases, e.g. network filesystems.
- ticker <- async $ forever $ threadDelay 3007000 >> giveSignal
- let stopWatching = do
- cancel ticker
- removeDirWatch watch
- -- Listen to the signal asynchronously,
- -- and query the status when it fires.
- -- If the status changed, fill in the update.
- update <- newEmptyMVar
- let query' = liftIO . withStoreLock store $ internalQuery store hash
- loop = takeMVar signal >> query' >>= \case
- Pending _ -> loop
- Complete item -> tryPutMVar update $ Completed item
- Missing () -> tryPutMVar update Failed
- void $ async loop
- -- Wait for the update asynchronously.
- -- Stop watching when it arrives.
- async $ takeMVar update <* stopWatching
-
-setRootDirWritable :: MonadIO m => Path Abs Dir -> m ()
-setRootDirWritable storeRoot = liftIO $
- setFileMode (fromAbsDir storeRoot) writableRootDirMode
-
-writableRootDirMode :: FileMode
-writableRootDirMode = writableDirMode
-
-setRootDirReadOnly :: MonadIO m => Path Abs Dir -> m ()
-setRootDirReadOnly storeRoot = liftIO $
- setFileMode (fromAbsDir storeRoot) readOnlyRootDirMode
-
-readOnlyRootDirMode :: FileMode
-readOnlyRootDirMode = writableDirMode `intersectFileModes` allButWritableMode
-
-withWritableStoreRoot :: (MonadMask m, MonadIO m) => Path Abs Dir -> m a -> m a
-withWritableStoreRoot storeRoot =
- bracket_ (setRootDirWritable storeRoot) (setRootDirReadOnly storeRoot)
-
-withWritableStore :: (MonadMask m, MonadIO m) => ContentStore -> m a -> m a
-withWritableStore ContentStore {storeRoot} =
- withWritableStoreRoot storeRoot
-
-setDirWritable :: Path Abs Dir -> IO ()
-setDirWritable fp = setFileMode (fromAbsDir fp) writableDirMode
-
-writableDirMode :: FileMode
-writableDirMode = foldl' unionFileModes nullFileMode
- [ directoryMode, ownerModes
- , groupReadMode, groupExecuteMode
- , otherReadMode, otherExecuteMode
- ]
-
--- | Set write permissions on the given path.
-setWritable :: Path Abs t -> IO ()
-setWritable fp = do
- mode <- fileMode <$> getFileStatus (toFilePath fp)
- setFileMode (toFilePath fp) $ mode `unionFileModes` ownerWriteMode
-
--- | Unset write permissions on the given path.
-unsetWritable :: Path Abs t -> IO ()
-unsetWritable fp = do
- mode <- fileMode <$> getFileStatus (toFilePath fp)
- setFileMode (toFilePath fp) $ mode `intersectFileModes` allButWritableMode
-
-allButWritableMode :: FileMode
-allButWritableMode = complement $ foldl' unionFileModes nullFileMode
- [ownerWriteMode, groupWriteMode, otherWriteMode]
-
--- | Set write permissions on all items in a directory tree recursively.
-setWritableRecursively :: Path Abs Dir -> IO ()
-setWritableRecursively = walkDir $ \dir _ files -> do
- mapM_ setWritable files
- setWritable dir
- return $ WalkExclude []
-
--- | Unset write permissions on all items in a directory tree recursively.
-unsetWritableRecursively :: Path Abs Dir -> IO ()
-unsetWritableRecursively = walkDir $ \dir _ files -> do
- mapM_ unsetWritable files
- unsetWritable dir
- return $ WalkExclude []
-
-storeVersion :: Int
-storeVersion = 1
-
--- | Initialize the database.
-initDb :: Path Abs Dir -> SQL.Connection -> IO ()
-initDb storeDir db = do
- [[version]] <- SQL.query_ db "PRAGMA user_version"
- if version == 0 then
- SQL.execute_ db $
- "PRAGMA user_version = " <> fromString (show storeVersion)
- else
- unless (version == storeVersion) $
- throwIO $ IncompatibleStoreVersion storeDir version storeVersion
- -- Aliases to items.
- SQL.execute_ db
- "CREATE TABLE IF NOT EXISTS\
- \ aliases\
- \ ( hash TEXT PRIMARY KEY\
- \ , dest TEXT NOT NULL\
- \ , name TEXT NOT NULL\
- \ )"
- -- Back-references from items @dest@ to hashes @hash@.
- SQL.execute_ db
- "CREATE TABLE IF NOT EXISTS\
- \ backrefs\
- \ ( hash TEXT PRIMARY KEY\
- \ , dest TEXT NOT NULL\
- \ )"
- -- Inputs @input@ to hashes @hash@.
- SQL.execute_ db
- "CREATE TABLE IF NOT EXISTS\
- \ inputs\
- \ ( hash TEXT NOT NULL\
- \ , input TEXT NOT NULL\
- \ , UNIQUE (hash, input)\
- \ )"
- -- Arbitrary metadata on hashes.
- SQL.execute_ db
- "CREATE TABLE IF NOT EXISTS\
- \ metadata\
- \ ( hash TEXT NOT NULL\
- \ , key TEXT NOT NULL\
- \ , value TEXT\
- \ , PRIMARY KEY(hash, key)\
- \ )"
-
--- | Adds a link between input hash and the output hash.
---
--- Assumes that the store is locked and writable.
-addBackReference :: ContentStore -> ContentHash -> Item -> IO ()
-addBackReference store inHash (Item outHash) =
- SQL.executeNamed (storeDb store)
- "INSERT OR REPLACE INTO\
- \ backrefs (hash, dest)\
- \ VALUES\
- \ (:in, :out)"
- [ ":in" SQL.:= inHash
- , ":out" SQL.:= outHash
- ]
diff --git a/src/Control/Funflow/ContentStore/Notify.hs b/src/Control/Funflow/ContentStore/Notify.hs
deleted file mode 100644
index 53e1676..0000000
--- a/src/Control/Funflow/ContentStore/Notify.hs
+++ /dev/null
@@ -1,28 +0,0 @@
-{-# LANGUAGE CPP #-}
-
--- | Generic file change notifier library for unix-based systems.
---
--- This library abstracts over specific implementations for BSD and linux
--- systems.
---
--- It provides facilities to watch specific directories for the following changes:
--- - File moves
--- - File deletion
--- - Attribute changes.
-module Control.Funflow.ContentStore.Notify
- ( Notifier
- , initNotifier
- , killNotifier
-
- , Watch
- , addDirWatch
- , removeDirWatch
- ) where
-
-#ifdef OS_Linux
-import Control.Funflow.ContentStore.Notify.Linux
-#else
-# ifdef OS_BSD
-import Control.Funflow.ContentStore.Notify.BSD
-# endif
-#endif
diff --git a/src/Control/Funflow/ContentStore/Notify/BSD.hs b/src/Control/Funflow/ContentStore/Notify/BSD.hs
deleted file mode 100644
index 61aaf7d..0000000
--- a/src/Control/Funflow/ContentStore/Notify/BSD.hs
+++ /dev/null
@@ -1,78 +0,0 @@
-{-# LANGUAGE LambdaCase #-}
-{-# LANGUAGE ScopedTypeVariables #-}
-
--- | Implementation of filesystem watch for BSD-based systems using KQueue.
-module Control.Funflow.ContentStore.Notify.BSD
- ( Notifier
- , initNotifier
- , killNotifier
-
- , Watch
- , addDirWatch
- , removeDirWatch
- ) where
-
-import Control.Concurrent
-import Control.Concurrent.Async
-import Control.Exception.Safe
-import Control.Monad
-import Data.Typeable
-import Foreign.Ptr
-import System.KQueue
-import System.Posix.IO
-import System.Posix.Types
-
-type Notifier = KQueue
-
-data NotifierException
- = RequiresThreadedRuntime
- deriving (Show, Typeable)
-instance Exception NotifierException where
- displayException = \case
- -- XXX: Compile time check?
- RequiresThreadedRuntime ->
- "Threaded runtime required! Please rebuild with -threaded flag."
-
-initNotifier :: IO Notifier
-initNotifier = do
- unless rtsSupportsBoundThreads $ throwIO RequiresThreadedRuntime
- kqueue
-
-killNotifier :: Notifier -> IO ()
-killNotifier _ = return ()
-
-data Watch = Watch KQueue Fd (Async ())
-
-addDirWatch :: Notifier -> FilePath -> IO () -> IO Watch
-addDirWatch kq dir f = do
- fd <- openFd dir ReadOnly Nothing defaultFileFlags
- a <- async $ do
- let event = KEvent
- { ident = fromIntegral fd
- , evfilter = EvfiltVnode
- , flags = [EvAdd]
- , fflags = [NoteDelete, NoteAttrib, NoteRename, NoteRevoke]
- , data_ = 0
- , udata = nullPtr
- }
- loop = do
- chgs <- kevent kq [event] 1 Nothing
- unless (null chgs) f
- loop
- loop `finally` closeFd fd
- return $! Watch kq fd a
-
-removeDirWatch :: Watch -> IO ()
-removeDirWatch (Watch kq fd a) = do
- closeFd fd
- let event = KEvent
- { ident = fromIntegral fd
- , evfilter = EvfiltVnode
- , flags = [EvDelete]
- , fflags = []
- , data_ = 0
- , udata = nullPtr
- }
- void (kevent kq [event] 0 Nothing)
- `catch` \(_ :: KQueueException) -> return ()
- cancel a
diff --git a/src/Control/Funflow/ContentStore/Notify/Linux.hs b/src/Control/Funflow/ContentStore/Notify/Linux.hs
deleted file mode 100644
index 1d0bdf0..0000000
--- a/src/Control/Funflow/ContentStore/Notify/Linux.hs
+++ /dev/null
@@ -1,60 +0,0 @@
-{-# LANGUAGE CPP #-}
-{-# LANGUAGE LambdaCase #-}
-{-# LANGUAGE ScopedTypeVariables #-}
-
--- | Implementation of filesystem watching functionality for linux based on
--- inotify.
-module Control.Funflow.ContentStore.Notify.Linux
- ( Notifier
- , initNotifier
- , killNotifier
-
- , Watch
- , addDirWatch
- , removeDirWatch
- ) where
-
-import Control.Exception.Safe (catch)
-#if MIN_VERSION_hinotify(0,3,10)
-import qualified Data.ByteString.Char8 as BS
-#endif
-import System.INotify
-
-type Notifier = INotify
-
-initNotifier :: IO Notifier
-initNotifier = initINotify
-
-killNotifier :: Notifier -> IO ()
-killNotifier = killINotify
-
-type Watch = WatchDescriptor
-
-addDirWatch :: Notifier -> FilePath -> IO () -> IO Watch
-addDirWatch inotify dir f = addWatch inotify mask dir' $ \case
- Attributes True Nothing -> f
- MovedSelf True -> f
- DeletedSelf -> f
- _ -> return ()
- where
- mask = [Attrib, MoveSelf, DeleteSelf, OnlyDir]
-#if MIN_VERSION_hinotify(0,3,10)
- dir' = BS.pack dir
-#else
- dir' = dir
-#endif
-
-removeDirWatch :: Watch -> IO ()
-removeDirWatch w =
- -- When calling `addWatch` on a path that is already being watched,
- -- inotify will not create a new watch, but amend the existing watch
- -- and return the same watch descriptor.
- -- Therefore, the watch might already have been removed at this point,
- -- which will cause an 'IOError'.
- -- Fortunately, all event handlers to a file are called at once.
- -- So, that removing the watch here will not cause another handler
- -- to miss out on the event.
- -- Note, that this may change when adding different event handlers,
- -- that remove the watch under different conditions.
- removeWatch w
- `catch` \(_::IOError) -> return ()
diff --git a/src/Control/Funflow/Diagram.hs b/src/Control/Funflow/Diagram.hs
index 7305c9b..32a47c3 100644
--- a/src/Control/Funflow/Diagram.hs
+++ b/src/Control/Funflow/Diagram.hs
@@ -3,6 +3,9 @@
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE DerivingVia #-}
+{-# LANGUAGE RankNTypes #-}
+
-- | A Diagram is a representation of an arrow with labels.
-- No computation is actually performed by a diagram, it just carries
-- around a description.
@@ -13,6 +16,8 @@ module Control.Funflow.Diagram where
import Control.Arrow
import Control.Arrow.Free (ArrowError (..))
import Control.Category
+import qualified Data.Profunctor as P
+import qualified Data.Profunctor.Traversing as P
import Data.Proxy (Proxy (..))
import qualified Data.Text as T
import Prelude hiding (id, (.))
@@ -34,6 +39,9 @@ data Diagram ex a b where
Par :: Diagram ex a b -> Diagram ex c d -> Diagram ex (a,c) (b,d)
Fanin :: Diagram ex a c -> Diagram ex b c -> Diagram ex (Either a b) c
Try :: Diagram ex a b -> Diagram ex a (Either ex b)
+ Traverse :: Diagram ex a b -> Diagram ex (f a) (f b)
+ Wander :: (forall f. (Applicative f) => (a -> f b) -> s -> f t) -> Diagram ex a b -> Diagram ex s t
+ deriving (P.Profunctor, P.Strong, P.Choice) via P.WrappedArrow (Diagram ex)
instance Category (Diagram ex) where
id = Node emptyNodeProperties Proxy Proxy
@@ -53,6 +61,10 @@ instance ArrowChoice (Diagram ex) where
instance ArrowError ex (Diagram ex) where
try = Try
+instance P.Traversing (Diagram ex) where
+ traverse' = Traverse
+ wander = Wander
+
-- | Construct a labelled node
node :: forall arr a b ex. Arrow arr => arr a b -> [T.Text] -> (Diagram ex) a b
node _ lbls = Node props (Proxy :: Proxy a) (Proxy :: Proxy b)
diff --git a/src/Control/Funflow/Exec/Simple.hs b/src/Control/Funflow/Exec/Simple.hs
index 1d810f7..50a9b71 100644
--- a/src/Control/Funflow/Exec/Simple.hs
+++ b/src/Control/Funflow/Exec/Simple.hs
@@ -27,25 +27,22 @@ import Control.Concurrent.Async (withAsync)
import Control.Exception.Safe (Exception,
SomeException,
bracket,
- onException,
throwM, try)
import Control.Funflow.Base
-import Control.Funflow.ContentHashable
-import qualified Control.Funflow.ContentStore as CS
import Control.Funflow.External
import Control.Funflow.External.Coordinator
import Control.Funflow.External.Coordinator.Memory
import Control.Funflow.External.Executor (executeLoop)
-import qualified Control.Funflow.RemoteCache as Remote
+import Control.Lens (Identity (..))
import Control.Monad.Catch (MonadCatch,
MonadMask)
import Control.Monad.IO.Class
import Control.Monad.Trans.Control (MonadBaseControl)
-import qualified Data.ByteString as BS
+import Data.CAS.ContentHashable
+import qualified Data.CAS.ContentStore as CS
+import qualified Data.CAS.RemoteCache as Remote
import Data.Foldable (traverse_)
-import Data.Maybe
import Data.Monoid ((<>))
-import Data.Void
import Katip
import Path
import System.IO (stderr)
@@ -68,35 +65,17 @@ runFlowEx :: forall m c eff ex a b remoteCache.
-> Flow eff ex a b
-> a
-> m b
-runFlowEx _ cfg store cacher runWrapped confIdent flow input = do
+runFlowEx _ cfg store remoteCacher runWrapped confIdent flow input = do
hook <- initialise cfg
runAsyncA (eval (runFlow' hook) flow) input
where
- simpleOutPath item = toFilePath
- $ CS.itemPath store item </> [relfile|out|]
- withStoreCache :: forall i o. Cacher i o
+ withStoreCache :: forall i o. CS.Cacher i o
-> AsyncA m i o -> AsyncA m i o
- withStoreCache c@Cache{} (AsyncA f)
- | Just confIdent' <- confIdent = AsyncA $ \i -> do
- let chash = cacherKey c confIdent' i
- computeAndStore fp = do
- res <- f i -- Do the actual computation
- liftIO $ BS.writeFile (toFilePath $ fp </> [relfile|out|])
- . cacherStoreValue c $ res
- return $ Right res
- readItem item = do
- bs <- liftIO . BS.readFile $ simpleOutPath item
- return . cacherReadValue c $ bs
- CS.withConstructIfMissing store cacher chash computeAndStore >>= \case
- CS.Missing e -> absurd e
- CS.Pending _ ->
- liftIO (CS.waitUntilComplete store chash) >>= \case
- Just item -> readItem item
- Nothing -> throwM $ CS.FailedToConstruct chash
- CS.Complete (Just a, _) -> return a
- CS.Complete (_, item) -> readItem item
- withStoreCache _ f = f
-
+ withStoreCache c (AsyncA f) = AsyncA $
+ let c' = c{CS.cacherKey = \ident i -> return $ runIdentity $
+ CS.cacherKey c ident i}
+ in CS.cacheKleisliIO confIdent c' store remoteCacher f
+
writeMd :: forall i o. ContentHash
-> i
-> o
@@ -107,14 +86,13 @@ runFlowEx _ cfg store cacher runWrapped confIdent flow input = do
let kvs = writer i o
in traverse_ (uncurry $ CS.setMetadata store chash) kvs
-
runFlow' :: Hook c -> Flow' eff a1 b1 -> AsyncA m a1 b1
runFlow' _ (Step props f) = withStoreCache (cache props)
. AsyncA $ \x -> do
let out = f x
case cache props of
- Cache key _ _ | Just confIdent' <- confIdent ->
- writeMd (key confIdent' x) x out $ mdpolicy props
+ CS.Cache key _ _ | Just confIdent' <- confIdent ->
+ writeMd (runIdentity $ key confIdent' x) x out $ mdpolicy props
_ -> return ()
return out
runFlow' _ (StepIO props f) = withStoreCache (cache props)
@@ -162,24 +140,13 @@ runFlowEx _ cfg store cacher runWrapped confIdent flow input = do
mbStdout <- CS.getMetadataFile store chash [relfile|stdout|]
mbStderr <- CS.getMetadataFile store chash [relfile|stderr|]
throwM $ ExternalTaskFailed td ti mbStdout mbStderr
- runFlow' _ (PutInStore f) = AsyncA $ \x -> katipAddNamespace "putInStore" $ do
- chash <- liftIO $ contentHash x
- CS.constructOrWait store cacher chash >>= \case
- CS.Pending void -> absurd void
- CS.Complete item -> return item
- CS.Missing fp ->
- do
- liftIO $ f fp x
- finalItem <- CS.markComplete store chash
- _ <- Remote.push cacher (CS.itemHash finalItem) (Just chash) (CS.itemPath store finalItem)
- pure finalItem
- `onException`
- (do $(logTM) WarningS . ls $ "Exception in construction: removing " <> show chash
- CS.removeFailed store chash
- )
- runFlow' _ (GetFromStore f) = AsyncA $ \case
- CS.All item -> liftIO . f $ CS.itemPath store item
- item CS.:</> path -> liftIO . f $ CS.itemPath store item </> path
+ runFlow' _ (PutInStore f) = AsyncA $
+ katipAddNamespace "putInStore"
+ . CS.putInStore store remoteCacher
+ (\chash -> $(logTM) WarningS . ls $ "Exception in construction: removing " <> show chash)
+ (\p -> liftIO . f p)
+ runFlow' _ (GetFromStore f) = AsyncA $
+ liftIO . f . CS.contentPath store
runFlow' _ (InternalManipulateStore f) = AsyncA $ \i -> liftIO $ f store i
runFlow' _ (Wrapped props w) = withStoreCache (cache props)
$ runWrapped w
diff --git a/src/Control/Funflow/External.hs b/src/Control/Funflow/External.hs
index 4a21297..2b3fbbf 100644
--- a/src/Control/Funflow/External.hs
+++ b/src/Control/Funflow/External.hs
@@ -9,14 +9,15 @@
-- | Definition of external tasks
module Control.Funflow.External where
-import Control.Funflow.ContentHashable (ContentHash, ContentHashable, ExternallyAssuredDirectory (..),
- ExternallyAssuredFile (..))
-import qualified Control.Funflow.ContentStore as CS
import Control.Lens.TH
import Data.Aeson (FromJSON, ToJSON)
#if __GLASGOW_HASKELL__ < 804
import Data.Semigroup
#endif
+import Data.CAS.ContentHashable (ContentHash, ContentHashable, ExternallyAssuredDirectory (..),
+ ExternallyAssuredFile (..))
+import qualified Data.CAS.ContentStore as CS
+
import Data.Store (Store)
import Data.String (IsString (..))
import qualified Data.Text as T
diff --git a/src/Control/Funflow/External/Coordinator.hs b/src/Control/Funflow/External/Coordinator.hs
index 2771f94..b8c0943 100644
--- a/src/Control/Funflow/External/Coordinator.hs
+++ b/src/Control/Funflow/External/Coordinator.hs
@@ -16,11 +16,10 @@
module Control.Funflow.External.Coordinator where
import Control.Exception.Safe
-import Control.Funflow.ContentHashable (ContentHash)
import Control.Funflow.External
import Control.Lens
import Control.Monad.IO.Class (MonadIO, liftIO)
-
+import Data.CAS.ContentHashable (ContentHash)
import Data.Monoid ((<>))
import Data.Store (Store)
import Data.Store.TH (makeStore)
diff --git a/src/Control/Funflow/External/Coordinator/Memory.hs b/src/Control/Funflow/External/Coordinator/Memory.hs
index 8b364cd..451ff5e 100644
--- a/src/Control/Funflow/External/Coordinator/Memory.hs
+++ b/src/Control/Funflow/External/Coordinator/Memory.hs
@@ -7,12 +7,12 @@ module Control.Funflow.External.Coordinator.Memory where
import Control.Concurrent (threadDelay)
import Control.Concurrent.STM.TVar
-import Control.Funflow.ContentHashable (ContentHash)
import Control.Funflow.External
import Control.Funflow.External.Coordinator
import Control.Lens
import Control.Monad.IO.Class (liftIO)
import Control.Monad.STM
+import Data.CAS.ContentHashable (ContentHash)
import Data.List (find)
import qualified Data.Map.Strict as M
import System.Clock (fromNanoSecs)
diff --git a/src/Control/Funflow/External/Coordinator/Redis.hs b/src/Control/Funflow/External/Coordinator/Redis.hs
index af4de27..3fabf47 100644
--- a/src/Control/Funflow/External/Coordinator/Redis.hs
+++ b/src/Control/Funflow/External/Coordinator/Redis.hs
@@ -14,12 +14,12 @@ module Control.Funflow.External.Coordinator.Redis
, RedisPreconnected (..)
) where
-import qualified Control.Funflow.ContentHashable as CHash
import Control.Funflow.External
import Control.Funflow.External.Coordinator
import Control.Lens
import Control.Monad.Except
import Control.Monad.Fix (fix)
+import qualified Data.CAS.ContentHashable as CHash
import Data.Store
import qualified Database.Redis as R
import GHC.Conc
diff --git a/src/Control/Funflow/External/Coordinator/SQLite.hs b/src/Control/Funflow/External/Coordinator/SQLite.hs
index ef311dd..3c2e8e6 100644
--- a/src/Control/Funflow/External/Coordinator/SQLite.hs
+++ b/src/Control/Funflow/External/Coordinator/SQLite.hs
@@ -15,14 +15,14 @@ module Control.Funflow.External.Coordinator.SQLite
import Control.Concurrent (threadDelay)
import Control.Exception.Safe
-import Control.Funflow.ContentHashable
import Control.Funflow.External
import Control.Funflow.External.Coordinator
-import Control.Funflow.Lock
import Control.Lens
import Control.Monad.IO.Class
import qualified Data.Aeson as Json
import qualified Data.ByteString.Char8 as C8
+import Data.CAS.ContentHashable
+import Data.CAS.Lock
import qualified Data.Text as T
import Data.Typeable (Typeable)
import qualified Database.SQLite.Simple as SQL
diff --git a/src/Control/Funflow/External/Docker.hs b/src/Control/Funflow/External/Docker.hs
index ecf0edd..810e782 100644
--- a/src/Control/Funflow/External/Docker.hs
+++ b/src/Control/Funflow/External/Docker.hs
@@ -16,9 +16,9 @@ module Control.Funflow.External.Docker
) where
import Control.Arrow (Kleisli (..), second)
-import Control.Funflow.ContentHashable
import Control.Funflow.External
import Control.Monad.Trans.State.Strict
+import Data.CAS.ContentHashable
import Data.Semigroup (Semigroup, (<>))
import qualified Data.Text as T
import GHC.Generics (Generic)
diff --git a/src/Control/Funflow/External/Executor.hs b/src/Control/Funflow/External/Executor.hs
index 92c2c07..6117d84 100644
--- a/src/Control/Funflow/External/Executor.hs
+++ b/src/Control/Funflow/External/Executor.hs
@@ -16,16 +16,16 @@ import Control.Concurrent (threadDelay)
import Control.Concurrent.Async
import Control.Concurrent.MVar
import Control.Exception.Safe
-import qualified Control.Funflow.ContentStore as CS
import Control.Funflow.External
import Control.Funflow.External.Coordinator
-import qualified Control.Funflow.RemoteCache as Remote
import Control.Lens
import Control.Monad (forever, mzero, unless)
import Control.Monad.Trans (lift)
import Control.Monad.Trans.Maybe
import qualified Data.Aeson as Json
import qualified Data.ByteString as BS
+import qualified Data.CAS.ContentStore as CS
+import qualified Data.CAS.RemoteCache as Remote
import Data.Foldable (for_)
import Data.Maybe (isJust, isNothing)
import Data.Monoid ((<>))
diff --git a/src/Control/Funflow/Lock.hs b/src/Control/Funflow/Lock.hs
deleted file mode 100644
index ca24d26..0000000
--- a/src/Control/Funflow/Lock.hs
+++ /dev/null
@@ -1,120 +0,0 @@
-{-# LANGUAGE FlexibleContexts #-}
-{-# LANGUAGE QuasiQuotes #-}
-{-# LANGUAGE ScopedTypeVariables #-}
-
--- | Thread and process write lock.
---
--- Allows synchronisation between threads and processes.
--- Uses an 'MVar' for synchronisation between threads
--- and fcntl write locks for synchronisation between processes.
---
--- Only ever have one 'Lock' object per lock file per process!
-module Control.Funflow.Lock
- ( Lock
- , openLock
- , closeLock
- , withLock
- ) where
-
-import Control.Concurrent
-import Control.Exception.Safe
-import Control.Monad (unless)
-import Control.Monad.Trans.Control (MonadBaseControl, liftBaseOp_)
-import Network.HostName (getHostName)
-import Path
-import Path.IO
-import System.Posix.Files
-import System.Posix.IO
-import System.Posix.Process
-import System.Random
-
--- | Thread and process write lock.
---
--- Only ever have one 'Lock' object per lock file per process!
-data Lock = Lock
- { lockMVar :: MVar ()
- , lockDir :: Path Abs Dir
- }
-
--- | Open the lock file and create a lock object.
---
--- This does not acquire the lock.
---
--- Only ever have one 'Lock' object per lock file per process!
-openLock :: Path Abs Dir -> IO Lock
-openLock dir = do
- mvar <- newMVar ()
- createDirIfMissing True dir
- return $! Lock
- { lockMVar = mvar
- , lockDir = dir
- }
-
--- | Close the lock file.
---
--- Does not release the lock.
---
--- Blocks if the lock is taken.
-closeLock :: Lock -> IO ()
-closeLock lock = do
- takeMVar (lockMVar lock)
-
--- | Acquire the lock for the duration of the given action and release after.
-withLock :: MonadBaseControl IO m => Lock -> m a -> m a
-withLock lock = liftBaseOp_ $ \action ->
- withMVar (lockMVar lock) $ \() ->
- bracket_ (acquireDirLock $ lockDir lock) (releaseDirLock $ lockDir lock) $
- action
-
-----------------------------------------------------------------------
--- Internals
-
--- | Generate unique (per process) filename.
---
--- Combines the host name and process ID.
-getUniqueFileName :: IO (Path Rel File)
-getUniqueFileName = do
- hostName <- getHostName
- pid <- getProcessID
- parseRelFile $ hostName ++ show pid
-
-lockFileName :: Path Rel File
-lockFileName = [relfile|lock|]
-
--- | Acquire the lock.
---
--- Uses an algorithm that is described in the man-page of open (2) in the
--- last paragraph to @O_EXCL@ in release 4.14 of the Linux man-pages project.
---
--- Creates a file under a unique (per process) filename.
--- Attempts to hard-link that file to a common lock path.
--- If the operation succeeds, then the lock was acquired.
--- If not, but if the link count of the file under the unique filename
--- increased to two, then the lock was acquired.
--- Otherwise, another process holds the lock and this process waits
--- and retries.
-acquireDirLock :: Path Abs Dir -> IO ()
-acquireDirLock dir = do
- file <- getUniqueFileName
- let path = dir </> file
- fd <- createFile (fromAbsFile path) ownerWriteMode
- closeFd fd
- r <- try $ createLink (fromAbsFile path) (fromAbsFile $ dir </> lockFileName)
- case r of
- Right () -> return ()
- Left (_::IOError) -> do
- count <- linkCount <$> getFileStatus (fromAbsFile path)
- unless (count == 2) $ do
- delay <- randomRIO (50000, 100000)
- threadDelay delay
- acquireDirLock dir
-
--- | Release the lock.
---
--- Unlinks the file under the unique file name and the common lock file.
-releaseDirLock :: Path Abs Dir -> IO ()
-releaseDirLock dir = do
- file <- getUniqueFileName
- let path = dir </> file
- removeLink (fromAbsFile $ dir </> lockFileName)
- removeLink (fromAbsFile path)
diff --git a/src/Control/Funflow/Orphans.hs b/src/Control/Funflow/Orphans.hs
deleted file mode 100644
index 36b7db5..0000000
--- a/src/Control/Funflow/Orphans.hs
+++ /dev/null
@@ -1,28 +0,0 @@
-{-# OPTIONS_GHC -fno-warn-orphans #-}
-{-# LANGUAGE FlexibleInstances #-}
-
--- | Dedicated module for orphan instances.
-module Control.Funflow.Orphans where
-
-import Data.Functor.Contravariant
-import Data.Store (Store)
-import qualified Data.Store as Store
-import qualified Path as Path
-import qualified Path.Internal
-
-instance Store (Path.Path Path.Abs Path.File) where
- size = contramap (\(Path.Internal.Path fp) -> fp) Store.size
- peek = Path.Internal.Path <$> Store.peek
- poke = Store.poke . (\(Path.Internal.Path fp) -> fp)
-instance Store (Path.Path Path.Abs Path.Dir) where
- size = contramap (\(Path.Internal.Path fp) -> fp) Store.size
- peek = Path.Internal.Path <$> Store.peek
- poke = Store.poke . (\(Path.Internal.Path fp) -> fp)
-instance Store (Path.Path Path.Rel Path.File) where
- size = contramap (\(Path.Internal.Path fp) -> fp) Store.size
- peek = Path.Internal.Path <$> Store.peek
- poke = Store.poke . (\(Path.Internal.Path fp) -> fp)
-instance Store (Path.Path Path.Rel Path.Dir) where
- size = contramap (\(Path.Internal.Path fp) -> fp) Store.size
- peek = Path.Internal.Path <$> Store.peek
- poke = Store.poke . (\(Path.Internal.Path fp) -> fp)
diff --git a/src/Control/Funflow/Pretty.hs b/src/Control/Funflow/Pretty.hs
index ebeb860..0187453 100644
--- a/src/Control/Funflow/Pretty.hs
+++ b/src/Control/Funflow/Pretty.hs
@@ -19,6 +19,8 @@ ppFlow = ppDiagram . toDiagram where
ppDiagram (Par f g) = parens $ ppDiagram f <+> text "***" <+> ppDiagram g
ppDiagram (Fanin f g) = parens $ ppDiagram f <+> text "|||" <+> ppDiagram g
ppDiagram (Try f) = parens $ text "try" <+> ppDiagram f
+ ppDiagram (Traverse f) = parens $ text "traverse" <+> ppDiagram f
+ ppDiagram (Wander _trav f) = parens $ text "wander" <+> ppDiagram f
showFlow :: Flow eff ex a b -> String
showFlow = render . ppFlow
diff --git a/src/Control/Funflow/RemoteCache.hs b/src/Control/Funflow/RemoteCache.hs
deleted file mode 100644
index 803c466..0000000
--- a/src/Control/Funflow/RemoteCache.hs
+++ /dev/null
@@ -1,134 +0,0 @@
-{-# LANGUAGE FlexibleInstances #-}
-{-# LANGUAGE LambdaCase #-}
-{-# LANGUAGE MultiParamTypeClasses #-}
-
--- |
--- This module defines the remote caching mechanism of funflow which is used to
--- keep several funflow stores (possibly on different machines) in sync.
-module Control.Funflow.RemoteCache
- ( Cacher(..)
- , PullResult(..), PushResult(..), AliasResult(..)
- , NoCache(..), memoryCache
- , pullAsArchive, pushAsArchive
- ) where
-
-import qualified Codec.Archive.Tar as Tar
-import Control.Concurrent.MVar
-import Control.Funflow.ContentHashable
-import Control.Monad.IO.Class (MonadIO, liftIO)
-import Data.ByteString.Lazy (ByteString)
-import Data.Map.Strict (Map)
-import qualified Data.Map.Strict as Map
-import Path
-
-
--- |
--- The result of a tentative pull from the remote cache
-data PullResult a
- = PullOK a
- | NotInCache
- | PullError String
- deriving (Eq, Ord, Show)
-
--- |
--- The result of a tentative push to the remote cache
-data PushResult
- = PushOK
- | PushError String
- deriving (Eq, Ord, Show)
-
-data AliasResult
- = AliasOK
- | TargetNotInCache
- | AliasError String
-
--- |
--- A simple mechanism for remote-caching.
---
--- Provides a way to push a path to the cache and pull it back.
---
--- No assumption is made on the availability of a store path. In particular,
--- pushing a path to the cache doesn't mean that we can pull it back.
-class Monad m => Cacher m a where
- push ::
- a
- -> ContentHash -- ^ "Primary" key: hash of the content
- -> Maybe ContentHash -- ^ "Secondary" key: hash of the dependencies
- -> Path Abs Dir -- ^ Path to the content
- -> m PushResult
- pull :: a -> ContentHash -> Path Abs Dir -> m (PullResult ())
-
--- |
--- Push the path as an archive to the remote cache
-pushAsArchive ::
- MonadIO m
- => (ContentHash -> ContentHash -> m (Either String ())) -- ^ How to create the aliases
- -> (ContentHash -> ByteString -> m PushResult) -- ^ How to push the content
- -> ContentHash -- ^ Primary key
- -> Maybe ContentHash -- ^ Secondary key
- -> Path Abs Dir
- -> m PushResult
-pushAsArchive alias pushArchive primaryKey mSecondaryKey path = do
- archive <- liftIO $ Tar.write <$> Tar.pack (toFilePath path) ["."]
- pushArchive primaryKey archive >>= \case
- PushError e -> pure $ PushError e
- res ->
- case mSecondaryKey of
- Just secondaryKey ->
- alias primaryKey secondaryKey >>= \case
- Left err -> pure $ PushError err
- Right () -> pure res
- Nothing -> pure res
-
-pullAsArchive ::
- MonadIO m
- => (ContentHash -> m (PullResult ByteString))
- -> ContentHash
- -> Path Abs Dir
- -> m (PullResult ())
-pullAsArchive pullArchive hash path =
- pullArchive hash >>= \case
- PullOK archive -> do
- liftIO $ Tar.unpack (toFilePath path) $ Tar.read archive
- pure $ PullOK ()
- NotInCache -> pure NotInCache
- PullError e -> pure $ PullError e
-
--- |
--- A dummy remote cache implementation which does nothing
-data NoCache = NoCache
-
-instance Monad m => Cacher m NoCache where
- pull _ _ _ = pure NotInCache
- push _ _ _ _ = pure PushOK
-
--- |
--- An in-memory cache, for testing purposes
-data MemoryCache = MemoryCache (MVar (Map ContentHash ByteString))
-instance MonadIO m => Cacher m MemoryCache where
- pull (MemoryCache cacheVar) = pullAsArchive $ \hash -> do
- cacheMap <- liftIO $ readMVar cacheVar
- case Map.lookup hash cacheMap of
- Nothing -> pure NotInCache
- Just x -> pure (PullOK x)
- push (MemoryCache cacheVar) = pushAsArchive alias $ \hash content -> do
- liftIO $ modifyMVar_
- cacheVar
- (\cacheMap -> pure $ Map.insert hash content cacheMap)
- pure PushOK
- where
- alias from to = liftIO $ Right <$> modifyMVar_ cacheVar
- (\cacheMap -> pure $ Map.insert to (cacheMap Map.! from) cacheMap)
-
-memoryCache :: MonadIO m => m MemoryCache
-memoryCache = liftIO $ MemoryCache <$> newMVar mempty
-
--- |
--- If 'a' is a 'Cacher' then 'Maybe a' is a cacher such that 'Just x' behavies
--- like 'x' and 'Nothing' doesn't cache anything
-instance Cacher m a => Cacher m (Maybe a) where
- pull (Just x) = pull x
- pull Nothing = pull NoCache
-
- push (Just x) = push x
- push Nothing = push NoCache
diff --git a/src/Control/Funflow/Steps.hs b/src/Control/Funflow/Steps.hs
index 0281efa..26fec4b 100644
--- a/src/Control/Funflow/Steps.hs
+++ b/src/Control/Funflow/Steps.hs
@@ -46,15 +46,15 @@ where
import Control.Arrow
import Control.Arrow.Free (catch)
import Control.Exception.Safe (Exception, throwM)
-import Control.Funflow.Base (SimpleFlow, cache,
- defaultCacherWithIdent)
+import Control.Funflow.Base (SimpleFlow, cache)
import Control.Funflow.Class
-import Control.Funflow.ContentHashable (ContentHashable,
+import qualified Control.Funflow.External.Docker as Docker
+import Data.CAS.ContentHashable (ContentHashable,
DirectoryContent (..),
FileContent (..))
-import Control.Funflow.ContentStore (Content ((:</>)))
-import qualified Control.Funflow.ContentStore as CS
-import qualified Control.Funflow.External.Docker as Docker
+import Data.CAS.ContentStore (Content ((:</>)),
+ defaultCacherWithIdent)
+import qualified Data.CAS.ContentStore as CS
import Data.Default (def)
import Data.Foldable (for_)
import Data.Store
diff --git a/test/Funflow/ContentStore.hs b/test/Funflow/ContentStore.hs
deleted file mode 100644
index bdd8f9e..0000000
--- a/test/Funflow/ContentStore.hs
+++ /dev/null
@@ -1,472 +0,0 @@
-{-# OPTIONS_GHC -fno-warn-unused-do-bind #-}
-{-# LANGUAGE LambdaCase #-}
-{-# LANGUAGE OverloadedStrings #-}
-{-# LANGUAGE QuasiQuotes #-}
-{-# LANGUAGE TypeApplications #-}
-
-module Funflow.ContentStore
- ( tests
- ) where
-
-import Control.Concurrent.Async
-import Control.Exception.Safe (tryAny)
-import Control.Funflow.ContentHashable (contentHash)
-import Control.Funflow.ContentStore (ContentStore)
-import qualified Control.Funflow.ContentStore as ContentStore
-import qualified Control.Funflow.RemoteCache as Remote
-import Control.Monad (void)
-import Data.Maybe (catMaybes)
-import qualified Data.Set as Set
-import Path
-import Path.IO
-import System.Posix.Files
-import Test.Tasty
-import Test.Tasty.HUnit
-
-tests :: TestTree
-tests = testGroup "Content Store"
-
- [ testCase "initialise fresh store" $
- withTmpDir $ \dir -> do
- let root = dir </> [reldir|store|]
- ContentStore.withStore root $ \_ ->
- doesDirExist @IO root
- @? "store root exists"
-
- , testCase "initialise existing store" $
- withTmpDir $ \dir -> do
- let root = dir </> [reldir|store|]
- createDir root
- ContentStore.withStore root $ \_ ->
- doesDirExist @IO root
- @? "store root exists"
-
- , testCase "store is not writable" $
- withEmptyStore $ \store -> do
- let root = ContentStore.root store
- isNotWritable root
- @? "store not writable"
-
- , testCase "subtree stages" $
- withEmptyStore $ \store -> do
- hash <- contentHash ("test" :: String)
-
- missing <- ContentStore.query store hash
- missing @?= ContentStore.Missing ()
- missing' <- ContentStore.lookup store hash
- missing' @?= ContentStore.Missing ()
-
- subtree <- ContentStore.markPending store hash
- let dir = subtree </> [reldir|dir|]
- file = dir </> [relfile|file|]
- expectedContent = "Hello World"
- pending <- ContentStore.query store hash
- pending @?= ContentStore.Pending ()
- pending' <- ContentStore.lookup store hash
- pending' @?= ContentStore.Pending ()
- doesDirExist @IO subtree
- @? "pending subtree exists"
- isWritable subtree
- @? "pending subtree is writable"
- createDir dir
- writeFile (fromAbsFile file) expectedContent
- do
- content <- readFile (fromAbsFile file)
- content @?= expectedContent
-
- item <- ContentStore.markComplete store hash
- let itemDir = ContentStore.itemPath store item
- file' = itemDir </> [relfile|dir/file|]
- complete <- ContentStore.query store hash
- complete @?= ContentStore.Complete ()
- complete' <- ContentStore.lookup store hash
- complete' @?= ContentStore.Complete item
- doesDirExist @IO itemDir
- @? "complete subtree exists"
- isNotWritable itemDir
- @? "complete subtree is not writable"
- isNotWritable file'
- @? "complete file is not writable"
- do
- content <- readFile (fromAbsFile file')
- content @?= expectedContent
-
- , testCase "await construction" $
- Remote.memoryCache >>= \myCache ->
- withEmptyStore $ \store -> do
- hash <- contentHash ("test" :: String)
-
- ContentStore.constructOrAsync store myCache hash >>= \case
- ContentStore.Pending _ ->
- assertFailure "missing already under construction"
- ContentStore.Complete _ ->
- assertFailure "missing already complete"
- ContentStore.Missing _ ->
- return ()
-
- a <- ContentStore.constructOrAsync store Remote.NoCache hash >>= \case
- ContentStore.Missing _ -> do
- assertFailure "under construction still missing"
- undefined
- ContentStore.Complete _ -> do
- assertFailure "under construction already complete"
- undefined
- ContentStore.Pending a ->
- return a
-
- b <- ContentStore.lookupOrWait store hash >>= \case
- ContentStore.Missing _ -> do
- assertFailure "under construction still missing"
- undefined
- ContentStore.Complete _ -> do
- assertFailure "under construction already complete"
- undefined
- ContentStore.Pending b ->
- return b
-
- item <- ContentStore.markComplete store hash
-
- item' <- wait a
- item' @?= ContentStore.Completed item
-
- item'' <- wait b
- item'' @?= ContentStore.Completed item
-
- ContentStore.constructOrAsync store Remote.NoCache hash >>= \case
- ContentStore.Missing _ -> do
- assertFailure "complete still missing"
- ContentStore.Pending _ -> do
- assertFailure "complete still under construction"
- ContentStore.Complete _ -> do
- return ()
-
- , testCase "await failure" $
- withEmptyStore $ \store -> do
- hash <- contentHash ("test" :: String)
-
- ContentStore.constructOrAsync store Remote.NoCache hash >>= \case
- ContentStore.Pending _ ->
- assertFailure "missing already under construction"
- ContentStore.Complete _ ->
- assertFailure "missing already complete"
- ContentStore.Missing _ ->
- return ()
-
- a <- ContentStore.constructOrAsync store Remote.NoCache hash >>= \case
- ContentStore.Missing _ -> do
- assertFailure "under construction still missing"
- undefined
- ContentStore.Complete _ -> do
- assertFailure "under construction already complete"
- undefined
- ContentStore.Pending a ->
- return a
-
- b <- ContentStore.lookupOrWait store hash >>= \case
- ContentStore.Missing _ -> do
- assertFailure "under construction still missing"
- undefined
- ContentStore.Complete _ -> do
- assertFailure "under construction already complete"
- undefined
- ContentStore.Pending b ->
- return b
-
- ContentStore.removeFailed store hash
-
- item' <- wait a
- item' @?= ContentStore.Failed
-
- item'' <- wait b
- item'' @?= ContentStore.Failed
-
- ContentStore.constructOrAsync store Remote.NoCache hash >>= \case
- ContentStore.Pending _ -> do
- assertFailure "failed still under construction"
- ContentStore.Complete _ -> do
- assertFailure "failed already complete"
- ContentStore.Missing _ -> do
- return ()
-
- , testCase "construct if missing" $
- withEmptyStore $ \store -> do
- hash <- contentHash ("test" :: String)
- let file = [relfile|file|]
- expectedContent = "Hello World"
-
- ContentStore.constructIfMissing store Remote.NoCache hash >>= \case
- ContentStore.Pending () ->
- assertFailure "missing already under construction"
- ContentStore.Complete _ ->
- assertFailure "missing already complete"
- ContentStore.Missing subtree -> do
- isWritable subtree
- @? "under construction not writable"
- writeFile (fromAbsFile $ subtree </> file) expectedContent
-
- ContentStore.constructIfMissing store Remote.NoCache hash >>= \case
- ContentStore.Missing _ ->
- assertFailure "under construction still missing"
- ContentStore.Complete _ ->
- assertFailure "under construction already complete"
- ContentStore.Pending () ->
- void $ ContentStore.markComplete store hash
-
- ContentStore.constructIfMissing store Remote.NoCache hash >>= \case
- ContentStore.Missing _ ->
- assertFailure "complete still missing"
- ContentStore.Pending () ->
- assertFailure "complete still under construction"
- ContentStore.Complete item -> do
- let subtree = ContentStore.itemPath store item
- isNotWritable (subtree </> file)
- @? "complete still writable"
- content <- readFile (fromAbsFile $ subtree </> file)
- content @?= expectedContent
-
- , testCase "remove failed" $
- withEmptyStore $ \store -> do
- hash <- contentHash ("test" :: String)
- subtree <- ContentStore.markPending store hash
- ContentStore.removeFailed store hash
- not <$> doesDirExist @IO subtree
- @? "subtree was removed"
-
- , testCase "forcibly remove" $
- withEmptyStore $ \store -> do
- hash <- contentHash ("test" :: String)
- subtree <- ContentStore.markPending store hash
-
- ContentStore.removeForcibly store hash
- not <$> doesDirExist @IO subtree
- @? "remove under construction"
-
- ContentStore.removeForcibly store hash
- not <$> doesDirExist @IO subtree
- @? "remove missing"
-
- subtree' <- ContentStore.markPending store hash
- void $ ContentStore.markComplete store hash
- ContentStore.removeForcibly store hash
- not <$> doesDirExist @IO subtree'
- @? "remove complete"
-
- , testCase "subtree state is persisted" $
- withTmpDir $ \dir -> do
- let root = dir </> [reldir|store|]
- hash <- contentHash ("test" :: String)
-
- do
- ContentStore.withStore root $ \store ->
- void $ ContentStore.markPending store hash
-
- -- Imagine the process terminates and the store is closed
-
- do
- ContentStore.withStore root $ \store -> do
- underConstruction <- ContentStore.query store hash
- underConstruction @?= ContentStore.Pending ()
- void $ ContentStore.markComplete store hash
-
- -- Imagine the process terminates and the store is closed
-
- do
- ContentStore.withStore root $ \store -> do
- complete <- ContentStore.query store hash
- complete @?= ContentStore.Complete ()
-
- , testCase "mark complete before under construction fails" $
- withEmptyStore $ \store -> do
- hash <- contentHash ("test" :: String)
- ContentStore.markComplete store hash
- `shouldFail` "complete before under construction"
-
- , testCase "mark complete after complete fails" $
- withEmptyStore $ \store -> do
- hash <- contentHash ("test" :: String)
- void $ ContentStore.markPending store hash
- void $ ContentStore.markComplete store hash
- ContentStore.markComplete store hash
- `shouldFail` "complete after complete"
-
- , testCase "mark under construction after under construction fails" $
- withEmptyStore $ \store -> do
- hash <- contentHash ("test" :: String)
- void $ ContentStore.markPending store hash
- void $ ContentStore.markPending store hash
- `shouldFail` "under construction after under construction"
-
- , testCase "mark under construction after complete fails" $
- withEmptyStore $ \store -> do
- hash <- contentHash ("test" :: String)
- void $ ContentStore.markPending store hash
- void $ ContentStore.markComplete store hash
- void $ ContentStore.markPending store hash
- `shouldFail` "under construction after complete"
-
- , testCase "remove missing fails" $
- withEmptyStore $ \store -> do
- hash <- contentHash ("test" :: String)
- ContentStore.removeFailed store hash
- `shouldFail` "remove non existent"
-
- , testCase "remove complete fails" $
- withEmptyStore $ \store -> do
- hash <- contentHash ("test" :: String)
- void $ ContentStore.markPending store hash
- void $ ContentStore.markComplete store hash
- ContentStore.removeFailed store hash
- `shouldFail` "remove complete"
-
- , testCase "list store contents" $
- withEmptyStore $ \store -> do
- [a, b, c, d] <- mapM contentHash ["a", "b", "c", "d" :: String]
- void $ mapM (ContentStore.markPending store) [a, b, c, d]
- mapM_ (ContentStore.markComplete store) [a, b]
-
- (pendings, completes, items) <- ContentStore.listAll store
- let all' = pendings ++ completes
- Set.fromList all' @?= Set.fromList [a, b, c, d]
- Set.size (Set.fromList all') @?= 4
-
- underContsruction <- ContentStore.listPending store
- Set.fromList underContsruction @?= Set.fromList [c, d]
-
- complete <- ContentStore.listComplete store
- Set.fromList complete @?= Set.fromList [a, b]
-
- items' <- catMaybes <$> mapM (ContentStore.waitUntilComplete store) [a, b]
- Set.fromList items @?= Set.fromList items'
-
- , testCase "store aliases" $
- withEmptyStore $ \store -> do
- let fp = [relfile|test|]
- contentA = "itemA"
- contentB = "itemB"
- itemA <- do
- hash <- contentHash ("a" :: String)
- dir <- ContentStore.markPending store hash
- writeFile (fromAbsFile $ dir </> fp) contentA
- ContentStore.markComplete store hash
- itemB <- do
- hash <- contentHash ("b" :: String)
- dir <- ContentStore.markPending store hash
- writeFile (fromAbsFile $ dir </> fp) contentB
- ContentStore.markComplete store hash
- let aliasA = ContentStore.Alias "aliasA"
- aliasB = ContentStore.Alias "aliasB"
-
- do
- r <- ContentStore.lookupAlias store aliasA
- r @?= Nothing
-
- ContentStore.assignAlias store aliasA itemA
- ContentStore.assignAlias store aliasB itemB
- do
- r <- ContentStore.lookupAlias store aliasA
- r @?= Just itemA
- do
- r <- ContentStore.lookupAlias store aliasB
- r @?= Just itemB
-
- ContentStore.assignAlias store aliasB itemA
- do
- r <- ContentStore.lookupAlias store aliasA
- r @?= Just itemA
- do
- r <- ContentStore.lookupAlias store aliasB
- r @?= Just itemA
-
- ContentStore.removeAlias store aliasA
- ContentStore.removeAlias store aliasB
- do
- r <- ContentStore.lookupAlias store aliasA
- r @?= Nothing
- do
- r <- ContentStore.lookupAlias store aliasB
- r @?= Nothing
-
- , testCase "Remote caching (constructOrAsync)" $ do
- cacher <- Remote.memoryCache
- hash <- contentHash ("test" :: String)
- let file = [relfile|file|]
- expectedContent = "Hello World"
-
- -- Populate the remote cache
- withEmptyStore $ \store -> do
- ContentStore.constructOrAsync store cacher hash >>= \case
- ContentStore.Pending _ ->
- assertFailure "missing already under construction"
- ContentStore.Complete _ ->
- assertFailure "missing already complete"
- ContentStore.Missing subtree -> do
- isWritable subtree @? "under construction not writable"
- writeFile (fromAbsFile $ subtree </> file) expectedContent
- Remote.push cacher hash Nothing subtree
-
- -- Expects having the item in cache
- withEmptyStore $ \store -> do
- ContentStore.constructOrAsync store cacher hash >>= \case
- ContentStore.Pending _ ->
- assertFailure "missing already under construction"
- ContentStore.Complete _ -> pure ()
- ContentStore.Missing _ -> assertFailure "Not found in the cache"
-
- , testCase "Remote caching (withConstructIfMissing)" $ do
- cacher <- Remote.memoryCache
- hash <- contentHash ("test" :: String)
- let file = [relfile|file|]
- expectedContent = "Hello World"
- doWrite subtree = do
- isWritable subtree @? "under construction not writable"
- writeFile (fromAbsFile $ subtree </> file) expectedContent
- return $ Right ()
-
- -- Populates the remote cache
- withEmptyStore $ \store -> do
- ContentStore.withConstructIfMissing store cacher hash doWrite >>= \case
- ContentStore.Missing _ -> assertFailure "not found in the cache"
- ContentStore.Pending _ ->
- assertFailure "missing already under construction"
- -- ContentStore.waitUntilComplete store hash >>= \case
- -- Just item -> return ()
- -- Nothing -> assertFailure "item construction failed"
- ContentStore.Complete _ -> pure ()
-
- -- Expects having the item in the remote cache
- withEmptyStore $ \store -> do
- ContentStore.withConstructIfMissing store cacher hash
- (const $ assertFailure "should not try to write the file a second time") >>= \case
- ContentStore.Missing _ -> assertFailure "Not found in the cache"
- ContentStore.Pending _ ->
- assertFailure "missing already under construction"
- ContentStore.Complete _ -> pure ()
-
- ]
-
-shouldFail :: IO a -> String -> IO ()
-shouldFail m msg = tryAny m >>= \case
- Left _ -> return ()
- Right _ -> assertFailure msg
-
-withTmpDir :: (Path Abs Dir -> IO a) -> IO a
-withTmpDir = withSystemTempDir "funflow-test"
-
-withEmptyStore :: (ContentStore -> IO a) -> IO a
-withEmptyStore k = withTmpDir $ \dir ->
- ContentStore.withStore (dir </> [reldir|store|]) k
-
-isNotWritable :: Path Abs t -> IO Bool
-isNotWritable path = do
- mode <- fileMode <$> getFileStatus (toFilePath path)
- return $! nullFileMode == (mode `intersectFileModes` writeModes)
- where
- writeModes =
- ownerWriteMode
- `unionFileModes`
- groupWriteMode
- `unionFileModes`
- otherWriteMode
-
-isWritable :: Path Abs t -> IO Bool
-isWritable = fmap not . isNotWritable
diff --git a/test/Funflow/SQLiteCoordinator.hs b/test/Funflow/SQLiteCoordinator.hs
index 442bca3..a830cf0 100644
--- a/test/Funflow/SQLiteCoordinator.hs
+++ b/test/Funflow/SQLiteCoordinator.hs
@@ -12,9 +12,9 @@ import Control.Concurrent.Async (wait, withAsync)
import Control.Concurrent.MVar
import Control.Exception.Safe
import Control.Funflow
-import qualified Control.Funflow.ContentStore as CS
import Control.Funflow.External.Coordinator.SQLite
import Control.Monad
+import qualified Data.CAS.ContentStore as CS
import Data.Semigroup ((<>))
import Data.String (fromString)
import Path
diff --git a/test/Funflow/TestFlows.hs b/test/Funflow/TestFlows.hs
index 5c3161e..f84c143 100644
--- a/test/Funflow/TestFlows.hs
+++ b/test/Funflow/TestFlows.hs
@@ -12,11 +12,11 @@ import Control.Arrow.Free
import Control.Concurrent.Async (withAsync)
import Control.Exception.Safe hiding (catch)
import Control.Funflow
-import Control.Funflow.ContentStore (Content ((:</>)))
-import qualified Control.Funflow.ContentStore as CS
import Control.Funflow.External.Coordinator.Memory
import Control.Funflow.External.Executor (executeLoop)
import Control.Monad (when)
+import Data.CAS.ContentStore (Content ((:</>)))
+import qualified Data.CAS.ContentStore as CS
import Data.Default (def)
import Data.List (sort)
import Path
diff --git a/test/Test.hs b/test/Test.hs
index de3d732..5a09bef 100644
--- a/test/Test.hs
+++ b/test/Test.hs
@@ -1,5 +1,4 @@
import qualified Control.Arrow.Async.Tests
-import qualified Funflow.ContentStore
import qualified Funflow.SQLiteCoordinator
import qualified Funflow.TestFlows
import Test.Tasty
@@ -9,8 +8,7 @@ main = defaultMain tests
tests :: TestTree
tests = testGroup "Unit Tests"
- [ Funflow.ContentStore.tests
- , Control.Arrow.Async.Tests.tests
+ [ Control.Arrow.Async.Tests.tests
, Funflow.TestFlows.tests
, Funflow.SQLiteCoordinator.tests
]