summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authornclarke <>2018-05-08 13:34:00 (GMT)
committerhdiff <hdiff@hdiff.luite.com>2018-05-08 13:34:00 (GMT)
commit4aade684e6d16c8fcb22b0364de3d79cf9731f45 (patch)
tree260ecfebcca36a7a1bcf106651c2072a9d334aae
version 1.0.01.0.0
-rw-r--r--LICENSE21
-rw-r--r--TestFunflow.hs90
-rw-r--r--app/FFExecutorD.hs139
-rw-r--r--changelog.md0
-rw-r--r--funflow.cabal164
-rw-r--r--src/Control/Arrow/Async.hs50
-rw-r--r--src/Control/Arrow/Free.hs234
-rw-r--r--src/Control/Funflow.hs34
-rw-r--r--src/Control/Funflow/Base.hs125
-rw-r--r--src/Control/Funflow/Cache/TH.hs35
-rw-r--r--src/Control/Funflow/Class.hs66
-rw-r--r--src/Control/Funflow/ContentHashable.hs535
-rw-r--r--src/Control/Funflow/ContentStore.hs1013
-rw-r--r--src/Control/Funflow/ContentStore/Notify.hs28
-rw-r--r--src/Control/Funflow/ContentStore/Notify/BSD.hs78
-rw-r--r--src/Control/Funflow/ContentStore/Notify/Linux.hs51
-rw-r--r--src/Control/Funflow/Diagram.hs59
-rw-r--r--src/Control/Funflow/Exec/Simple.hs241
-rw-r--r--src/Control/Funflow/External.hs175
-rw-r--r--src/Control/Funflow/External/Coordinator.hs178
-rw-r--r--src/Control/Funflow/External/Coordinator/Memory.hs90
-rw-r--r--src/Control/Funflow/External/Coordinator/Redis.hs112
-rw-r--r--src/Control/Funflow/External/Coordinator/SQLite.hs318
-rw-r--r--src/Control/Funflow/External/Docker.hs69
-rw-r--r--src/Control/Funflow/External/Executor.hs239
-rw-r--r--src/Control/Funflow/Lock.hs118
-rw-r--r--src/Control/Funflow/Orphans.hs28
-rw-r--r--src/Control/Funflow/Pretty.hs24
-rw-r--r--src/Control/Funflow/Steps.hs234
-rw-r--r--test/Control/Arrow/Async/Tests.hs30
-rw-r--r--test/Funflow/ContentStore.hs413
-rw-r--r--test/Funflow/SQLiteCoordinator.hs142
-rw-r--r--test/Funflow/TestFlows.hs137
-rw-r--r--test/Test.hs16
34 files changed, 5286 insertions, 0 deletions
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..2e6833d
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2017 Tweag I/O
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/TestFunflow.hs b/TestFunflow.hs
new file mode 100644
index 0000000..667d33d
--- /dev/null
+++ b/TestFunflow.hs
@@ -0,0 +1,90 @@
+{-# LANGUAGE Arrows #-}
+{-# LANGUAGE GADTs #-}
+{-# LANGUAGE OverloadedStrings #-}
+
+import Control.Arrow
+import Control.Arrow.Free
+import Control.Exception.Safe
+import Control.Funflow
+import qualified Control.Funflow.ContentStore as CS
+import Control.Funflow.External.Coordinator.Memory
+import Control.Funflow.Pretty
+import Data.Monoid ((<>))
+import Path.IO
+
+mkError :: String -> SomeException
+mkError = toException . userError
+
+myFlow :: SimpleFlow () Bool
+myFlow = proc () -> do
+ age <- promptFor -< "How old are you"
+ returnA -< age > (65::Int)
+
+flow2 :: SimpleFlow () (Double,Double)
+flow2 = proc () -> do
+ r1 <- worstBernoulli mkError -< 0.1
+ r2 <- worstBernoulli mkError -< 0.2
+ returnA -< (r1,r2)
+
+flow2caught :: SimpleFlow () (Double,Double)
+flow2caught = retry 100 0 flow2
+
+flow3 :: SimpleFlow [Int] [Int]
+flow3 = mapA (arr (+1))
+
+main :: IO ()
+main =
+ withSystemTempDir "test_output" $ \storeDir ->
+ CS.withStore storeDir $ \store -> do
+ memHook <- createMemoryCoordinator
+ res <- runSimpleFlow MemoryCoordinator memHook store flow2 ()
+ print res
+ res' <- runSimpleFlow MemoryCoordinator memHook store flow2caught ()
+ print res'
+ putStrLn $ showFlow myFlow
+ putStrLn $ showFlow flow2
+ res1 <- runSimpleFlow MemoryCoordinator memHook store flow3 [1..10]
+ print res1
+-- main = redisTest
+ externalTest
+ storeTest
+
+externalTest :: IO ()
+externalTest = let
+ someString = "External test"
+ exFlow = external $ \t -> ExternalTask
+ { _etCommand = "/run/current-system/sw/bin/echo"
+ , _etParams = [textParam t]
+ , _etWriteToStdOut = True
+ }
+ flow = exFlow >>> readString_
+ in withSystemTempDir "test_output_external_" $ \storeDir -> do
+ withSimpleLocalRunner storeDir $ \run -> do
+ out <- run flow someString
+ case out of
+ Left err -> print err
+ Right outStr -> putStrLn outStr
+
+storeTest :: IO ()
+storeTest = let
+ string1 = "First line\n"
+ string2 = "Second line\n"
+ exFlow = external $ \(a, b) -> ExternalTask
+ { _etCommand = "/run/current-system/sw/bin/cat"
+ , _etParams = [contentParam a, contentParam b]
+ , _etWriteToStdOut = True
+ }
+ flow = proc (s1, s2) -> do
+ f1 <- writeString_ -< s1
+ s1' <- readString -< f1
+ f2 <- writeString_ -< s2
+ s2' <- readString -< f2
+ f12 <- exFlow -< (f1, f2)
+ s12 <- readString_ -< f12
+ returnA -< s12 == s1' <> s2'
+ in withSystemTempDir "test_output_store_" $ \storeDir -> do
+ withSimpleLocalRunner storeDir $ \run -> do
+ out <- run flow (string1, string2)
+ case out of
+ Left err -> print err
+ Right b -> print b
diff --git a/app/FFExecutorD.hs b/app/FFExecutorD.hs
new file mode 100644
index 0000000..cf445fb
--- /dev/null
+++ b/app/FFExecutorD.hs
@@ -0,0 +1,139 @@
+{-# LANGUAGE DataKinds #-}
+{-# LANGUAGE DeriveGeneric #-}
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE LambdaCase #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE StandaloneDeriving #-}
+{-# LANGUAGE TypeOperators #-}
+
+import Control.Applicative
+import Control.Concurrent (myThreadId)
+import Control.Concurrent.MVar
+import Control.Exception.Base (AsyncException (UserInterrupt))
+import Control.Exception.Safe
+import qualified Control.Funflow.ContentStore as CS
+import Control.Funflow.External.Coordinator
+import Control.Funflow.External.Coordinator.Redis
+import Control.Funflow.External.Coordinator.SQLite
+import Control.Funflow.External.Executor
+import Control.Monad (void)
+import Data.Monoid ((<>))
+import qualified Database.Redis as R
+import qualified Options.Applicative as Opt
+import Path
+import System.Clock
+import System.IO
+import qualified System.Posix.Signals as Signals
+
+
+data UseCoord
+ = UseRedis (Config Redis)
+ | UseSQLite FilePath
+
+data Args = Args
+ { storePath :: FilePath
+ , coordinator :: UseCoord
+ }
+
+argsParser :: Opt.Parser Args
+argsParser = Opt.subparser
+ $ Opt.command "redis"
+ (Opt.info
+ (redisParser Opt.<**> Opt.helper)
+ (Opt.progDesc "Use Redis coordinator."))
+ <> Opt.command "sqlite"
+ (Opt.info
+ (sqliteParser Opt.<**> Opt.helper)
+ (Opt.progDesc "Use SQLite coordinator."))
+ <> Opt.metavar "COORDINATOR"
+ <> Opt.commandGroup "Available coordinators:"
+ where
+ storeParser = Opt.strArgument
+ $ Opt.metavar "STOREDIR"
+ <> Opt.help "Path to the root of the content store."
+ redisParser = useRedis
+ <$> storeParser
+ <*> Opt.strArgument
+ ( Opt.metavar "HOSTNAME"
+ <> Opt.help "Host for the Redis instance." )
+ <*> Opt.argument Opt.auto
+ ( Opt.metavar "PORT"
+ <> Opt.help "Port for the Redis instance." )
+ <*> optional (Opt.argument Opt.auto
+ ( Opt.metavar "PASSWORD"
+ <> Opt.help "Password for the Redis instance, if needed." ))
+ useRedis store host port pw = Args store $ UseRedis R.defaultConnectInfo
+ { R.connectHost = host
+ , R.connectPort = R.PortNumber port
+ , R.connectAuth = pw
+ }
+ sqliteParser = useSQLite
+ <$> storeParser
+ <*> Opt.strArgument
+ ( Opt.metavar "SQLFILE"
+ <> Opt.help "Path to SQLite database file." )
+ useSQLite store sqlite = Args store $ UseSQLite sqlite
+
+parseArgs :: IO Args
+parseArgs = Opt.execParser $ Opt.info (argsParser Opt.<**> Opt.helper)
+ $ Opt.fullDesc
+ <> Opt.progDesc
+ "Await and execute funflow external tasks on the given coordinator."
+ <> Opt.header "ffexecutord - Funflow task executor"
+
+data HandlerState
+ = Initial
+ | Terminating TimeSpec TimeSpec
+
+data HandlerInstruction
+ = Terminate
+ | Ignore
+ | Message
+
+main :: IO ()
+main = do
+ config <- parseArgs
+
+ -- Configure custom interrupt handler.
+ -- GHC's default interrupt handler works only one time. On the second
+ -- interrupt the process will terminate immediately without any cleanup.
+ -- This could leave the funflow store and coordinator in an invalid state.
+ -- This installs a handler that works on multiple interrupts.
+ -- On the first interrupt it will send an async `UserInterrupt` exception to
+ -- the main-thread. On further interrupts it will print a message clarifying
+ -- that clean-up is ongoing.
+ mainThreadId <- myThreadId
+ mvState <- newMVar Initial
+ let handler name = Signals.Catch $ do
+ let msgTime = fromNanoSecs 1000000000
+ instr <- modifyMVar mvState $ \case
+ Initial -> do
+ t <- getTime Monotonic
+ return (Terminating t t, Terminate)
+ Terminating tsig tmsg -> do
+ t' <- getTime Monotonic
+ return $
+ if t' `diffTimeSpec` tmsg > msgTime then
+ (Terminating tsig 0, Message)
+ else
+ (Terminating tsig tmsg, Ignore)
+ case instr of
+ Terminate -> throwTo mainThreadId UserInterrupt
+ Ignore -> return ()
+ Message -> void $ tryAny $ hPutStrLn stderr $
+ "Handling " ++ name ++ " signal. Awaiting cleanup."
+ installHandler signal name = void $
+ Signals.installHandler signal (handler name) Nothing
+ installHandler Signals.sigINT "interrupt"
+ installHandler Signals.sigQUIT "quit"
+ installHandler Signals.sigTERM "terminate"
+
+ -- XXX: Improve handling of invalid paths.
+ storePath' <- parseAbsDir (storePath config)
+ case coordinator config of
+ UseRedis redisConfig -> CS.withStore storePath' $
+ executeLoop Redis redisConfig
+ UseSQLite sqlitePath -> do
+ sqlitePath' <- parseAbsDir sqlitePath
+ CS.withStore storePath' $
+ executeLoop SQLite sqlitePath'
diff --git a/changelog.md b/changelog.md
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/changelog.md
diff --git a/funflow.cabal b/funflow.cabal
new file mode 100644
index 0000000..8091276
--- /dev/null
+++ b/funflow.cabal
@@ -0,0 +1,164 @@
+cabal-version: >=1.10
+name: funflow
+version: 1.0.0
+license: MIT
+license-file: LICENSE
+maintainer: nicholas.clarke@tweag.io
+author: Tom Nielsen, Nicholas Clarke, Andreas Herrmann
+tested-with: ghc ==7.8.4 ghc ==7.10.2 ghc ==7.10.3 ghc ==8.0.1
+homepage: https://github.com/tweag/funflow
+bug-reports: https://github.com/tweag/funflow
+synopsis: Workflows with arrows
+description:
+ An arrow with resumable computations and logging
+category: Control
+build-type: Simple
+extra-source-files:
+ changelog.md
+
+library
+ exposed-modules:
+ Control.Arrow.Async
+ Control.Arrow.Free
+ Control.Funflow
+ Control.Funflow.Cache.TH
+ Control.Funflow.ContentHashable
+ Control.Funflow.ContentStore
+ Control.Funflow.Diagram
+ Control.Funflow.External
+ Control.Funflow.External.Docker
+ Control.Funflow.External.Executor
+ Control.Funflow.External.Coordinator
+ Control.Funflow.External.Coordinator.Memory
+ Control.Funflow.External.Coordinator.Redis
+ Control.Funflow.External.Coordinator.SQLite
+ Control.Funflow.Lock
+ Control.Funflow.Orphans
+ Control.Funflow.Steps
+ Control.Funflow.Pretty
+ Control.Funflow.Exec.Simple
+ hs-source-dirs: src
+ other-modules:
+ Control.Funflow.Base
+ Control.Funflow.Class
+ Control.Funflow.ContentStore.Notify
+ default-language: Haskell2010
+ ghc-options: -Wall -fno-warn-type-defaults
+ build-depends:
+ base >=4.6 && <5,
+ aeson >=1.2.3.0,
+ async >=2.1.1.1,
+ bytestring >=0.10.8.2,
+ clock >=0.7.2,
+ constraints >=0.9.1,
+ containers >=0.5.10.2,
+ contravariant >=1.4,
+ cryptonite >=0.24,
+ data-default >=0.7.1.1,
+ directory >=1.3.0.2,
+ exceptions >=0.8.3,
+ filepath >=1.4.1.2,
+ ghc-prim >=0.5.1.1,
+ hashable >=1.2.6.1,
+ hedis >=0.9.12,
+ hostname >=1.0,
+ integer-gmp >=1.0.1.0,
+ katip >=0.5.0.1,
+ lens >=4.15.4,
+ lifted-async >=0.9.3.2,
+ memory >=0.14.11,
+ monad-control >=1.0.2.2,
+ mtl >=2.2.1,
+ path >0.6.0,
+ path-io >=1.3.3,
+ pretty >=1.1.3.3,
+ process >=1.6.1.0,
+ random >=1.1,
+ safe-exceptions >=0.1.6.0,
+ scientific >=0.3.5.2,
+ sqlite-simple >=0.4.14.0,
+ stm >=2.4.4.1,
+ store >=0.4.3.2,
+ template-haskell >=2.11,
+ text >=1.2.2.2,
+ time >=1.8.0.2,
+ transformers >=0.5.2.0,
+ unix >=2.7.2.2,
+ unordered-containers >=0.2.8.0,
+ vector >=0.12.0.1,
+ yaml >=0.8.25.1
+
+ if os(linux)
+ cpp-options: -DOS_Linux
+ other-modules:
+ Control.Funflow.ContentStore.Notify.Linux
+ build-depends:
+ hinotify >=0.3.9
+ else
+
+ if (os(osx) || os(freebsd))
+ cpp-options: -DOS_BSD
+ other-modules:
+ Control.Funflow.ContentStore.Notify.BSD
+ build-depends:
+ kqueue -any
+
+executable ffexecutord
+ main-is: app/FFExecutorD.hs
+ default-language: Haskell2010
+ build-depends:
+ base >=4.6 && <5,
+ bytestring >=0.10.8.2,
+ clock >=0.7.2,
+ funflow -any,
+ hedis >=0.9.12,
+ path >=0.6.1,
+ text >=1.2.2.2,
+ unix >=2.7.2.2,
+ safe-exceptions >=0.1.6.0,
+ optparse-applicative >=0.14.0.0
+
+test-suite test-funflow
+ type: exitcode-stdio-1.0
+ main-is: TestFunflow.hs
+ default-language: Haskell2010
+ ghc-options: -Wall -threaded
+ build-depends:
+ base >=4.6 && <5,
+ funflow -any,
+ filepath >=1.4.1.2,
+ hedis >=0.9.12,
+ path >=0.6.1,
+ path-io >=1.3.3,
+ text >=1.2.2.2,
+ safe-exceptions >=0.1.6.0,
+ unix >=2.7.2.2
+
+test-suite unit-tests
+ type: exitcode-stdio-1.0
+ main-is: Test.hs
+ hs-source-dirs: test
+ other-modules:
+ Funflow.ContentStore
+ Funflow.SQLiteCoordinator
+ Funflow.TestFlows
+ Control.Arrow.Async.Tests
+ default-language: Haskell2010
+ ghc-options: -Wall -threaded
+ build-depends:
+ base >=4.10.1.0,
+ async >=2.1.1.1,
+ containers >=0.5.10.2,
+ data-default >=0.7,
+ directory >=1.3.0.2,
+ filepath >=1.4.1.2,
+ funflow -any,
+ path >=0.6.1,
+ path-io >=1.3.3,
+ process >=1.6.1.0,
+ random >=1.1,
+ safe-exceptions >=0.1.6.0,
+ tasty >=0.11.3,
+ tasty-hunit >=0.9.2,
+ temporary >=1.2.1.1,
+ unix >=2.7.2.2
diff --git a/src/Control/Arrow/Async.hs b/src/Control/Arrow/Async.hs
new file mode 100644
index 0000000..360668b
--- /dev/null
+++ b/src/Control/Arrow/Async.hs
@@ -0,0 +1,50 @@
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE UndecidableInstances #-}
+-- | Asynchronous arrows over monads with MonadBaseControl IO, using
+-- lifted-async.
+module Control.Arrow.Async where
+
+import Control.Arrow
+import Control.Arrow.Free (ArrowError (..))
+import Control.Category
+import Control.Concurrent.Async.Lifted
+import Control.Exception.Safe (Exception, MonadCatch)
+import qualified Control.Exception.Safe
+import Control.Monad.Trans.Class (MonadTrans, lift)
+import Control.Monad.Trans.Control (MonadBaseControl)
+import Prelude hiding (id, (.))
+
+newtype AsyncA m a b = AsyncA { runAsyncA :: a -> m b }
+
+instance Monad m => Category (AsyncA m) where
+ id = AsyncA return
+ (AsyncA f) . (AsyncA g) = AsyncA (\b -> g b >>= f)
+
+-- | @since 2.01
+instance MonadBaseControl IO m => Arrow (AsyncA m) where
+ arr f = AsyncA (return . f)
+ first (AsyncA f) = AsyncA (\ ~(b,d) -> f b >>= \c -> return (c,d))
+ second (AsyncA f) = AsyncA (\ ~(d,b) -> f b >>= \c -> return (d,c))
+ (AsyncA f) *** (AsyncA g) = AsyncA $ \ ~(a,b) ->
+ withAsync (f a) $ \c ->
+ withAsync (g b) $ \d ->
+ waitBoth c d
+
+instance MonadBaseControl IO m => ArrowChoice (AsyncA m) where
+ left f = f +++ arr id
+ right f = arr id +++ f
+ f +++ g = (f >>> arr Left) ||| (g >>> arr Right)
+ AsyncA f ||| AsyncA g = AsyncA (either f g)
+
+instance (Exception ex, MonadBaseControl IO m, MonadCatch m)
+ => ArrowError ex (AsyncA m) where
+ AsyncA arr1 `catch` AsyncA arr2 = AsyncA $ \x ->
+ arr1 x `Control.Exception.Safe.catch` curry arr2 x
+
+-- | Lift an AsyncA through a monad transformer of the underlying monad.
+liftAsyncA :: (MonadTrans t, Monad m)
+ => AsyncA m i o
+ -> AsyncA (t m) i o
+liftAsyncA (AsyncA f) = AsyncA $ \i -> lift (f i)
diff --git a/src/Control/Arrow/Free.hs b/src/Control/Arrow/Free.hs
new file mode 100644
index 0000000..7bae9f2
--- /dev/null
+++ b/src/Control/Arrow/Free.hs
@@ -0,0 +1,234 @@
+-- {-# LANGUAGE AllowAmbiguousTypes #-}
+{-# LANGUAGE Arrows #-}
+{-# LANGUAGE ConstraintKinds #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE GADTs #-}
+{-# LANGUAGE InstanceSigs #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE NoImplicitPrelude #-}
+{-# LANGUAGE PolyKinds #-}
+{-# LANGUAGE Rank2Types #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE TypeFamilies #-}
+{-# LANGUAGE TypeOperators #-}
+
+-- | Various varieties of free arrow constructions.
+--
+-- For all of these constructions, there are only two important functions:
+-- - 'eval' evaluates the free arrow in the context of another arrow.
+-- - 'effect' lifts the underlying effect into the arrow.
+--
+-- The class 'FreeArrowLike', which is not exported from this module, exists
+-- to allow these to be defined generally.
+--
+-- This module also defines some arrow combinators which are not exposed by
+-- the standard arrow library.
+module Control.Arrow.Free
+ ( Free
+ , Choice
+ , ErrorChoice
+ , effect
+ , eval
+ -- * ArrowError
+ , ArrowError(..)
+ -- * Arrow functions
+ , mapA
+ , mapSeqA
+ , filterA
+ , type (~>)
+ ) where
+
+import Control.Arrow
+import Control.Category
+import Control.Exception.Safe (Exception, MonadCatch)
+import qualified Control.Exception.Safe
+import Data.Bool (Bool)
+import Data.Constraint (Constraint, Dict (..), mapDict, weaken1,
+ weaken2)
+import Data.Either (Either (..))
+import Data.Function (const, flip, ($))
+import Data.List (uncons)
+import Data.Maybe (maybe)
+import Data.Tuple (curry, uncurry)
+
+-- | A natural transformation on type constructors of two arguments.
+type x ~> y = forall a b. x a b -> y a b
+
+--------------------------------------------------------------------------------
+-- FreeArrowLike
+--------------------------------------------------------------------------------
+
+-- | Small class letting us define `eval` and `effect` generally over
+-- multiple free structures
+class FreeArrowLike fal where
+ type Ctx fal :: (k -> k -> *) -> Constraint
+ effect :: eff a b -> fal eff a b
+ eval :: forall eff arr a b. ((Ctx fal) arr)
+ => (eff ~> arr)
+ -> fal eff a b
+ -> arr a b
+
+-- | Annoying hackery to let us tuple constraints and still use 'effect'
+-- and 'eval'
+class Join ( a :: k -> Constraint) (b :: k -> Constraint) (x :: k) where
+ ctx :: Dict (a x, b x)
+instance (a x, b x) => Join a b x where
+ ctx = Dict
+
+--------------------------------------------------------------------------------
+-- Arrow
+--------------------------------------------------------------------------------
+
+-- | Freely generated arrows over an effect.
+data Free eff a b where
+ Pure :: (a -> b) -> Free eff a b
+ Effect :: eff a b -> Free eff a b
+ Seq :: Free eff a b -> Free eff b c -> Free eff a c
+ Par :: Free eff a1 b1 -> Free eff a2 b2 -> Free eff (a1, a2) (b1, b2)
+
+instance Category (Free eff) where
+ id = Pure id
+ (.) = flip Seq
+
+instance Arrow (Free eff) where
+ arr = Pure
+ first f = Par f id
+ second f = Par id f
+ (***) = Par
+
+instance FreeArrowLike Free where
+ type Ctx Free = Arrow
+ -- | Lift an effect into an arrow.
+ effect :: eff a b -> Free eff a b
+ effect = Effect
+
+ -- | Evaluate given an implicit arrow
+ eval :: forall eff arr a0 b0. (Arrow arr)
+ => (eff ~> arr)
+ -> Free eff a0 b0
+ -> arr a0 b0
+ eval exec = go
+ where
+ go :: forall a b. Free eff a b -> arr a b
+ go freeA = case freeA of
+ Pure f -> arr f
+ Seq f1 f2 -> go f2 . go f1
+ Par f1 f2 -> go f1 *** go f2
+ Effect eff -> exec eff
+
+--------------------------------------------------------------------------------
+-- ArrowChoice
+--------------------------------------------------------------------------------
+
+-- | Freely generated `ArrowChoice` over an effect.
+newtype Choice eff a b = Choice {
+ runChoice :: forall ac. ArrowChoice ac => (eff ~> ac) -> ac a b
+}
+
+instance Category (Choice eff) where
+ id = Choice $ const id
+ Choice f . Choice g = Choice $ \x -> f x . g x
+
+instance Arrow (Choice eff) where
+ arr a = Choice $ const $ arr a
+ first (Choice a) = Choice $ \f -> first (a f)
+ second (Choice a) = Choice $ \f -> second (a f)
+ (Choice a) *** (Choice b) = Choice $ \f -> a f *** b f
+
+instance ArrowChoice (Choice eff) where
+ left (Choice a) = Choice $ \f -> left (a f)
+ right (Choice a) = Choice $ \f -> right (a f)
+ (Choice a) ||| (Choice b) = Choice $ \f -> a f ||| b f
+
+instance FreeArrowLike Choice where
+ type Ctx Choice = ArrowChoice
+ effect :: eff a b -> Choice eff a b
+ effect a = Choice $ \f -> f a
+
+ eval :: forall eff arr a0 b0. (ArrowChoice arr)
+ => (eff ~> arr)
+ -> Choice eff a0 b0
+ -> arr a0 b0
+ eval f a = runChoice a f
+
+--------------------------------------------------------------------------------
+-- ErrorChoice
+--------------------------------------------------------------------------------
+
+-- | ArrowError represents those arrows which can catch exceptions within the
+-- processing of the flow.
+class ArrowError ex a where
+ catch :: a e c -> a (e, ex) c -> a e c
+
+instance (Arrow (Kleisli m), Exception ex, MonadCatch m)
+ => ArrowError ex (Kleisli m) where
+ Kleisli arr1 `catch` Kleisli arr2 = Kleisli $ \x ->
+ arr1 x `Control.Exception.Safe.catch` curry arr2 x
+
+-- | Freely generated arrows with both choice and error handling.
+newtype ErrorChoice ex eff a b = ErrorChoice {
+ runErrorChoice :: forall ac. (ArrowChoice ac, ArrowError ex ac)
+ => (eff ~> ac) -> ac a b
+}
+
+instance Category (ErrorChoice ex eff) where
+ id = ErrorChoice $ const id
+ ErrorChoice f . ErrorChoice g = ErrorChoice $ \x -> f x . g x
+
+instance Arrow (ErrorChoice ex eff) where
+ arr a = ErrorChoice $ const $ arr a
+ first (ErrorChoice a) = ErrorChoice $ \f -> first (a f)
+ second (ErrorChoice a) = ErrorChoice $ \f -> second (a f)
+ (ErrorChoice a) *** (ErrorChoice b) = ErrorChoice $ \f -> a f *** b f
+
+instance ArrowChoice (ErrorChoice ex eff) where
+ left (ErrorChoice a) = ErrorChoice $ \f -> left (a f)
+ right (ErrorChoice a) = ErrorChoice $ \f -> right (a f)
+ (ErrorChoice a) ||| (ErrorChoice b) = ErrorChoice $ \f -> a f ||| b f
+
+instance ArrowError ex (ErrorChoice ex eff) where
+ (ErrorChoice a) `catch` (ErrorChoice h) = ErrorChoice $ \f -> a f `catch` h f
+
+instance FreeArrowLike (ErrorChoice ex) where
+ type Ctx (ErrorChoice ex) = Join ArrowChoice (ArrowError ex)
+ effect :: eff a b -> ErrorChoice ex eff a b
+ effect a = ErrorChoice $ \f -> f a
+
+ eval :: forall eff arr a0 b0. (Join ArrowChoice (ArrowError ex) arr)
+ => (eff ~> arr)
+ -> ErrorChoice ex eff a0 b0
+ -> arr a0 b0
+ eval f a = case (\x -> (mapDict weaken1 x, mapDict weaken2 x)) ctx of
+ ( Dict :: Dict (ArrowChoice arr)
+ , Dict :: Dict (ArrowError ex arr)
+ ) -> runErrorChoice a f
+
+
+--------------------------------------------------------------------------------
+-- Functions
+--------------------------------------------------------------------------------
+
+-- | Map an arrow over a list.
+mapA :: ArrowChoice a => a b c -> a [b] [c]
+mapA f = arr (maybe (Left ()) Right . uncons)
+ >>> (arr (const []) ||| ((f *** mapA f) >>> arr (uncurry (:))))
+
+-- | Map an arrow over a list, forcing sequencing between each element.
+mapSeqA :: ArrowChoice a => a b c -> a [b] [c]
+mapSeqA f = arr (maybe (Left ()) Right . uncons)
+ >>> (arr (const []) ||| ((first f >>> second (mapSeqA f)) >>> arr (uncurry (:))))
+
+-- | Filter a list given an arrow filter
+filterA :: ArrowChoice a => a b Bool -> a [b] [b]
+filterA f = proc xs ->
+ case xs of
+ [] -> returnA -< []
+ (y:ys) -> do
+ b <- f -< y
+ if b then
+ (second (filterA f) >>> arr (uncurry (:))) -< (y,ys)
+ else
+ filterA f -< ys
+
+
diff --git a/src/Control/Funflow.hs b/src/Control/Funflow.hs
new file mode 100644
index 0000000..756eb25
--- /dev/null
+++ b/src/Control/Funflow.hs
@@ -0,0 +1,34 @@
+-- | Central Funflow module.
+--
+-- This module just re-exports various other modules for convenience.
+module Control.Funflow
+ ( Base.Flow
+ , Base.SimpleFlow
+ , Base.NoEffect
+ , Base.Flow'(..)
+ , Base.Cacher(..)
+ , Base.ExternalProperties(..)
+ , Base.MDWriter
+ , Base.Properties(..)
+ , Base.defaultCacherWithIdent
+ , Cache.defaultCacher
+ , Cache.defaultCacherLoc
+ -- * Defines our primitive flow functions
+ , Class.ArrowFlow(..)
+ , Class.step
+ , Class.stepIO
+ , Class.wrap
+ , CS.withStore
+ , module Control.Funflow.Steps
+ , module Control.Funflow.Exec.Simple
+ , module Control.Funflow.External
+ )
+ where
+
+import qualified Control.Funflow.Base as Base
+import qualified Control.Funflow.Cache.TH as Cache
+import qualified Control.Funflow.Class as Class
+import qualified Control.Funflow.ContentStore as CS
+import Control.Funflow.Exec.Simple
+import Control.Funflow.External
+import Control.Funflow.Steps
diff --git a/src/Control/Funflow/Base.hs b/src/Control/Funflow/Base.hs
new file mode 100644
index 0000000..e434e1b
--- /dev/null
+++ b/src/Control/Funflow/Base.hs
@@ -0,0 +1,125 @@
+{-# LANGUAGE EmptyDataDecls #-}
+{-# LANGUAGE ExistentialQuantification #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE GADTs #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE TypeOperators #-}
+{-# LANGUAGE ViewPatterns #-}
+
+-- | Core Funflow types and functions.
+--
+-- In particular, you will probably care about the 'Flow' type, which is the
+-- type of all Funflow workflows.
+module Control.Funflow.Base where
+
+import Control.Arrow (Kleisli (..))
+import Control.Arrow.Free
+import Control.Exception.Safe (SomeException)
+import Control.Funflow.ContentHashable
+import qualified Control.Funflow.ContentStore as CS
+import Control.Funflow.Diagram
+import Control.Funflow.External
+import Data.ByteString (ByteString)
+import Data.Default
+import Data.Functor.Identity
+import Data.Proxy (Proxy (..))
+import qualified Data.Store as Store
+import qualified Data.Text as T
+import Path
+import Prelude hiding (id, (.))
+
+-- | Metadata writer
+type MDWriter i o = Maybe (i -> o -> [(T.Text, ByteString)])
+
+-- | A cacher is responsible for controlling how steps are cached.
+data Cacher i o =
+ NoCache -- ^ This step cannot be cached (default).
+ | Cache
+ { -- | Function to encode the input into a content
+ -- hash.
+ -- This function additionally takes an
+ -- 'identities' which gets incorporated into
+ -- the cacher.
+ cacherKey :: Int -> i -> ContentHash
+ , cacherStoreValue :: o -> ByteString
+ -- | Attempt to read the cache value back. May throw exceptions.
+ , cacherReadValue :: ByteString -> o
+ }
+
+defaultCacherWithIdent :: (Store.Store o, ContentHashable Identity i)
+ => Int -- ^ Seed for the cacher
+ -> Cacher i o
+defaultCacherWithIdent ident = Cache
+ { cacherKey = \i ident' -> runIdentity $ contentHash (ident', ident, i)
+ , cacherStoreValue = Store.encode
+ , cacherReadValue = Store.decodeEx
+ }
+
+data Properties i o = Properties
+ { -- | Name of this step. Used when describing the step in diagrams
+ -- or other reporting.
+ name :: Maybe T.Text
+ -- | Specify whether this step can be cached or not and, if so,
+ -- how to do so.
+ , cache :: Cacher i o
+ -- | Write additional metadata to the content store.
+ , mdpolicy :: MDWriter i o
+ }
+
+instance Default (Properties i o) where
+ def = Properties
+ { name = Nothing
+ , cache = NoCache
+ , mdpolicy = Nothing
+ }
+
+-- | Additional properties associated with external tasks.
+data ExternalProperties a = ExternalProperties
+ { -- | Write additional metadata to the content store.
+ ep_mdpolicy :: MDWriter a ()
+ -- | Specify that this external step is impure, and as such should not be
+ -- cached.
+ , ep_impure :: Bool
+ }
+
+instance Default (ExternalProperties a) where
+ def = ExternalProperties
+ { ep_mdpolicy = Nothing
+ , ep_impure = False
+ }
+
+data Flow' eff a b where
+ Step :: Properties a b -> (a -> b) -> Flow' eff a b
+ StepIO :: Properties a b -> (a -> IO b) -> Flow' eff a b
+ External :: ExternalProperties a
+ -> (a -> ExternalTask)
+ -> Flow' eff a CS.Item
+ -- XXX: Constrain allowed user actions.
+ PutInStore :: ContentHashable IO a => (Path Abs Dir -> a -> IO ()) -> Flow' eff a CS.Item
+ -- XXX: Constrain allowed user actions.
+ GetFromStore :: (Path Abs t -> IO a) -> Flow' eff (CS.Content t) a
+ -- Internally manipulate the store. This should not be used by
+ -- client libraries.
+ InternalManipulateStore :: (CS.ContentStore -> a -> IO b)
+ -> Flow' eff a b
+ Wrapped :: Properties a b -> eff a b -> Flow' eff a b
+
+type Flow eff ex = ErrorChoice ex (Flow' eff)
+
+data NoEffect a b
+
+-- | Since there are no constructors for 'NoEffect', this code can never be
+-- reached and so is fine.
+runNoEffect :: forall arr. NoEffect ~> arr
+runNoEffect = error "Impossible!"
+
+type SimpleFlow = Flow NoEffect SomeException
+
+-- | Convert a flow to a diagram, for inspection/pretty printing
+toDiagram :: Flow eff ex a b -> Diagram ex a b
+toDiagram = eval toDiagram' where
+ toDiagram' (Step (name -> Just n) f) = node f [n]
+ toDiagram' (StepIO (name -> Just n) f) = node (Kleisli f) [n]
+ toDiagram' _
+ = Node emptyNodeProperties (Proxy :: Proxy a1) (Proxy :: Proxy b1)
diff --git a/src/Control/Funflow/Cache/TH.hs b/src/Control/Funflow/Cache/TH.hs
new file mode 100644
index 0000000..6850995
--- /dev/null
+++ b/src/Control/Funflow/Cache/TH.hs
@@ -0,0 +1,35 @@
+{-# OPTIONS_GHC -fno-warn-orphans #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE QuasiQuotes #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE TemplateHaskell #-}
+
+-- | Template Haskell splices for the funflow cache.
+module Control.Funflow.Cache.TH where
+
+import Control.Funflow.Base
+import Data.Hashable
+import Language.Haskell.TH.Syntax
+import System.Random
+
+-- | Create a default cacher with a random identity.
+--
+-- Note that this cacher is deliberately conservative - e.g.
+-- if the application is recompiled, the cache will not be
+-- reused.
+defaultCacher :: Q Exp
+defaultCacher = do
+ (seed :: Int) <- runIO randomIO
+ [e| defaultCacherWithIdent seed |]
+
+instance Hashable Loc
+
+-- | Create a default cacher based on the location of this splice.
+-- Note that this may lead to invalid cacheing if the code is changed
+-- without the version being updated.
+defaultCacherLoc :: Int -- ^ Version
+ -> Q Exp
+defaultCacherLoc ver = do
+ loc <- location
+ [e| defaultCacherWithIdent (hash (loc :: Loc, ver :: Int)) |]
diff --git a/src/Control/Funflow/Class.hs b/src/Control/Funflow/Class.hs
new file mode 100644
index 0000000..833afc1
--- /dev/null
+++ b/src/Control/Funflow/Class.hs
@@ -0,0 +1,66 @@
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE FunctionalDependencies #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE TypeSynonymInstances #-}
+
+-- | Defines a class for types which can be used as workflows.
+--
+-- In general, you should not need this functionality unless you are defining
+-- your own flow types (perhaps to use a state transformer or similar), in
+-- which case this allows you to work using the standard combinators in
+-- Funflow and then define the mapping to 'Base.Flow'
+--
+-- For an example of use, see the module @Control.Funflow.Checkpoints@, which
+-- defines a checkpointed flow.
+module Control.Funflow.Class where
+
+import Control.Arrow
+import Control.Arrow.Free
+import qualified Control.Funflow.Base as Base
+import Control.Funflow.ContentHashable
+import qualified Control.Funflow.ContentStore as CS
+import Control.Funflow.External
+import Data.Default (def)
+import Path
+
+class (ArrowChoice arr, ArrowError ex arr) => ArrowFlow eff ex arr | arr -> eff ex where
+ -- | Create a flow from a pure function.
+ step' :: Base.Properties a b -> (a -> b) -> arr a b
+ -- | Create a flow from an IO action.
+ stepIO' :: Base.Properties a b -> (a -> IO b) -> arr a b
+ -- | Create an external task in the flow.
+ external :: (a -> ExternalTask) -> arr a CS.Item
+ -- | Create an external task with additional properties
+ external' :: Base.ExternalProperties a -> (a -> ExternalTask) -> arr a CS.Item
+ -- | Create a flow from a user-defined effect.
+ wrap' :: Base.Properties a b -> eff a b -> arr a b
+ -- | Create a flow which will write its incoming data to the store.
+ putInStore :: ContentHashable IO a => (Path Abs Dir -> a -> IO ()) -> arr a CS.Item
+ -- | Create a flow which will read data from the given store item.
+ getFromStore :: (Path Abs t -> IO a) -> arr (CS.Content t) a
+ -- | Perform some internal manipulation of the content store.
+ internalManipulateStore :: (CS.ContentStore -> a -> IO b) -> arr a b
+
+instance ArrowFlow eff ex (Base.Flow eff ex) where
+ step' props = effect . Base.Step props
+ stepIO' props = effect . Base.StepIO props
+ external = effect . Base.External def
+ external' p td = effect $ Base.External p td
+ wrap' p eff = effect $ Base.Wrapped p eff
+ putInStore = effect . Base.PutInStore
+ getFromStore = effect . Base.GetFromStore
+ internalManipulateStore = effect . Base.InternalManipulateStore
+
+-- | Create a flow from a pure function.
+-- This is a variant on 'step'' which uses the default properties.
+step :: ArrowFlow eff ex arr => (a -> b) -> arr a b
+step = step' def
+
+-- | Create a flow from an IO action.
+-- This is a variant on 'stepIO'' which uses the default properties.
+stepIO :: ArrowFlow eff ex arr => (a -> IO b) -> arr a b
+stepIO = stepIO' def
+
+wrap :: ArrowFlow eff ex arr => eff a b -> arr a b
+wrap = wrap' def
diff --git a/src/Control/Funflow/ContentHashable.hs b/src/Control/Funflow/ContentHashable.hs
new file mode 100644
index 0000000..44df30a
--- /dev/null
+++ b/src/Control/Funflow/ContentHashable.hs
@@ -0,0 +1,535 @@
+{-# LANGUAGE DefaultSignatures #-}
+{-# LANGUAGE DeriveGeneric #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE LambdaCase #-}
+{-# LANGUAGE MagicHash #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE TypeApplications #-}
+{-# LANGUAGE TypeOperators #-}
+{-# LANGUAGE TypeSynonymInstances #-}
+{-# LANGUAGE UnboxedTuples #-}
+
+-- | 'ContentHashable' provides a hashing function suitable for use in the
+-- Funflow content store.
+--
+-- This behaves as does a normal hashing function on Haskell types. However,
+-- on path types, this instead calculates a hash based on the contents of the
+-- file or directory referenced.
+--
+-- We also export the 'ExternallyAssuredFile' and 'ExternallyAssuredDirectory'
+-- types. These instead use the path, file size and modification time to control
+-- the hash.
+module Control.Funflow.ContentHashable
+ ( ContentHash
+ , toBytes
+ , fromBytes
+ , ContentHashable (..)
+ , contentHashUpdate_binaryFile
+ , contentHashUpdate_byteArray#
+ , contentHashUpdate_fingerprint
+ , contentHashUpdate_primitive
+ , contentHashUpdate_storable
+
+ , FileContent (..)
+ , DirectoryContent (..)
+
+ , ExternallyAssuredFile(..)
+ , ExternallyAssuredDirectory(..)
+
+ , encodeHash
+ , decodeHash
+ , hashToPath
+ , pathToHash
+
+ , SHA256
+ , Context
+ , Digest
+ ) where
+
+
+import Control.Funflow.Orphans ()
+import Control.Monad (foldM, mzero, (>=>))
+import Crypto.Hash (Context, Digest, SHA256,
+ digestFromByteString,
+ hashFinalize, hashInit,
+ hashUpdate)
+import qualified Data.Aeson as Aeson
+import qualified Data.Aeson.Types as Aeson
+import Data.Bits (shiftL)
+import Data.ByteArray (Bytes, MemView (MemView),
+ allocAndFreeze, convert)
+import Data.ByteArray.Encoding (Base (Base16),
+ convertFromBase,
+ convertToBase)
+import qualified Data.ByteString as BS
+import Data.ByteString.Builder.Extra (defaultChunkSize)
+import qualified Data.ByteString.Char8 as C8
+import qualified Data.ByteString.Lazy as BSL
+import Data.Foldable (foldlM)
+import Data.Functor.Contravariant
+import qualified Data.Hashable
+import qualified Data.HashMap.Lazy as HashMap
+import qualified Data.HashSet as HashSet
+import Data.Int
+import Data.List (sort)
+import Data.List.NonEmpty (NonEmpty)
+import Data.Map (Map)
+import qualified Data.Map as Map
+import Data.Ratio
+import Data.Scientific
+import Data.Store (Store (..), peekException)
+import qualified Data.Text as T
+import qualified Data.Text.Array as TA
+import qualified Data.Text.Encoding as TE
+import qualified Data.Text.Internal as T
+import qualified Data.Text.Lazy as TL
+import Data.Time.Clock (UTCTime)
+import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds)
+import Data.Typeable
+import qualified Data.Vector as V
+import Data.Word
+import qualified Database.SQLite.Simple.FromField as SQL
+import qualified Database.SQLite.Simple.ToField as SQL
+import Foreign.Marshal.Utils (with)
+import Foreign.Ptr (castPtr)
+import Foreign.Storable (Storable, sizeOf)
+import GHC.Fingerprint
+import GHC.Generics
+import GHC.Integer.GMP.Internals (BigNat (..), Integer (..))
+import GHC.Natural (Natural (..))
+import GHC.Prim (ByteArray#,
+ copyByteArrayToAddr#,
+ sizeofByteArray#)
+import GHC.Ptr (Ptr (Ptr))
+import GHC.Types (IO (IO), Int (I#), Word (W#))
+import qualified Path
+import qualified Path.Internal
+import qualified Path.IO
+import System.IO (IOMode (ReadMode),
+ withBinaryFile)
+import System.IO.Unsafe (unsafePerformIO)
+import System.Posix.Files (fileSize, getFileStatus)
+
+
+newtype ContentHash = ContentHash { unContentHash :: Digest SHA256 }
+ deriving (Eq, Ord, Generic)
+
+instance Aeson.FromJSON ContentHash where
+ parseJSON (Aeson.String s)
+ | Just h <- decodeHash (TE.encodeUtf8 s) = pure h
+ | otherwise = fail "Invalid hash encoding"
+ parseJSON invalid
+ = Aeson.typeMismatch "ContentHash" invalid
+instance Aeson.ToJSON ContentHash where
+ toJSON = Aeson.String . TE.decodeUtf8 . encodeHash
+
+instance Data.Hashable.Hashable ContentHash where
+ hashWithSalt s = Data.Hashable.hashWithSalt s . encodeHash
+
+instance Show ContentHash where
+ showsPrec d h = showParen (d > app_prec)
+ $ showString "ContentHash \""
+ . (showString $ C8.unpack $ encodeHash h)
+ . showString "\""
+ where app_prec = 10
+
+instance Store ContentHash where
+ size = contramap toBytes size
+ peek = fromBytes <$> peek >>= \case
+ Nothing -> peekException "Store ContentHash: Illegal digest"
+ Just x -> return x
+ poke = poke . toBytes
+
+instance SQL.FromField ContentHash where
+ fromField f = do
+ bs <- SQL.fromField f
+ case decodeHash bs of
+ Just h -> pure h
+ Nothing -> mzero
+
+instance SQL.ToField ContentHash where
+ toField = SQL.toField . encodeHash
+
+toBytes :: ContentHash -> BS.ByteString
+toBytes = convert . unContentHash
+
+fromBytes :: BS.ByteString -> Maybe ContentHash
+fromBytes bs = ContentHash <$> digestFromByteString bs
+
+hashEncoding :: Base
+hashEncoding = Base16
+
+-- | File path appropriate encoding of a hash
+encodeHash :: ContentHash -> BS.ByteString
+encodeHash = convertToBase hashEncoding . toBytes
+
+-- | Inverse of 'encodeHash' if given a valid input.
+--
+-- prop> decodeHash (encodeHash x) = Just x
+decodeHash :: BS.ByteString -> Maybe ContentHash
+decodeHash bs = case convertFromBase hashEncoding bs of
+ Left _ -> Nothing
+ Right x -> fromBytes x
+
+-- | File path appropriate encoding of a hash
+hashToPath :: ContentHash -> Path.Path Path.Rel Path.Dir
+hashToPath h =
+ case Path.parseRelDir $ C8.unpack $ encodeHash h of
+ Nothing -> error
+ "[ContentHashable.hashToPath] \
+ \Failed to convert hash to directory name"
+ Just dir -> dir
+
+
+-- | Inverse of 'hashToPath' if given a valid input.
+--
+-- prop> pathToHash (hashToPath x) = Just x
+pathToHash :: FilePath -> Maybe ContentHash
+pathToHash = decodeHash . C8.pack
+
+
+class Monad m => ContentHashable m a where
+
+ -- | Update a hash context based on the given value.
+ --
+ -- See 'Crypto.Hash.hashUpdate'.
+ --
+ -- XXX: Consider swapping the arguments.
+ contentHashUpdate :: Context SHA256 -> a -> m (Context SHA256)
+
+ default contentHashUpdate :: (Generic a, GContentHashable m (Rep a))
+ => Context SHA256 -> a -> m (Context SHA256)
+ contentHashUpdate ctx a = gContentHashUpdate ctx (from a)
+
+ -- | Generate hash of the given value.
+ --
+ -- See 'Crypto.Hash.hash'.
+ contentHash :: a -> m ContentHash
+ contentHash x = ContentHash . hashFinalize <$> contentHashUpdate hashInit x
+
+
+-- | Update hash context based on binary in memory representation due to 'Foreign.Storable.Storable'.
+--
+-- XXX: Do we need to worry about endianness?
+contentHashUpdate_storable :: (Monad m, Storable a) => Context SHA256 -> a -> m (Context SHA256)
+contentHashUpdate_storable ctx a =
+ return . unsafePerformIO $ with a (\p -> pure $! hashUpdate ctx (MemView (castPtr p) (sizeOf a)))
+
+-- | Update hash context based on a type's 'GHC.Fingerprint.Type.Fingerprint'.
+--
+-- The fingerprint is constructed from the library-name, module-name, and name of the type itself.
+contentHashUpdate_fingerprint :: (Monad m, Typeable a) => Context SHA256 -> a -> m (Context SHA256)
+contentHashUpdate_fingerprint ctx = contentHashUpdate ctx . typeRepFingerprint . typeOf
+
+-- | Update hash context by combining 'contentHashUpdate_fingerprint' and 'contentHashUpdate_storable'.
+-- Intended for primitive types like 'Int'.
+contentHashUpdate_primitive :: (Monad m, Typeable a, Storable a) => Context SHA256 -> a -> m (Context SHA256)
+contentHashUpdate_primitive ctx a =
+ flip contentHashUpdate_fingerprint a >=> flip contentHashUpdate_storable a $ ctx
+
+-- | Update hash context based on binary contents of the given file.
+contentHashUpdate_binaryFile :: Context SHA256 -> FilePath -> IO (Context SHA256)
+contentHashUpdate_binaryFile ctx0 fp = withBinaryFile fp ReadMode $ \h ->
+ let go ctx = do
+ chunk <- BS.hGetSome h defaultChunkSize
+ if BS.null chunk then
+ pure ctx
+ else
+ go $! hashUpdate ctx chunk
+ in go ctx0
+
+-- | Update hash context based on 'GHC.Prim.ByteArray#'
+-- by copying into a newly allocated 'Data.ByteArray.Bytes'
+-- and updating the hash context from there.
+--
+-- XXX: @'GHC.Prim.byteArrayContents#' :: 'GHC.Prim.ByteArray#' -> 'GHC.Prim.Addr#'@
+-- could be used together with 'Data.ByteArray.MemView' instead.
+-- However, 'GHC.Prim.byteArrayContents#' explicitly says, that it is only safe to use
+-- on a pinned 'GHC.Prim.ByteArray#'.
+contentHashUpdate_byteArray# :: ByteArray# -> Int -> Int -> Context SHA256 -> Context SHA256
+contentHashUpdate_byteArray# ba (I# off) (I# len) ctx = hashUpdate ctx $
+ allocAndFreeze @Bytes (I# len) $ \(Ptr addr) -> IO $ \s ->
+ (# copyByteArrayToAddr# ba off addr len s, () #)
+
+-- | Update hash context based on the contents of a strict 'Data.Text.Text'.
+contentHashUpdate_text :: Context SHA256 -> T.Text -> Context SHA256
+contentHashUpdate_text ctx (T.Text arr off_ len_) =
+ contentHashUpdate_byteArray# (TA.aBA arr) off len ctx
+ where
+ off = off_ `shiftL` 1 -- convert from 'Word16' to 'Word8'
+ len = len_ `shiftL` 1 -- convert from 'Word16' to 'Word8'
+
+instance Monad m => ContentHashable m Fingerprint where
+ contentHashUpdate ctx (Fingerprint a b) = flip contentHashUpdate_storable a >=> flip contentHashUpdate_storable b $ ctx
+
+instance Monad m => ContentHashable m Bool where contentHashUpdate = contentHashUpdate_primitive
+
+instance Monad m => ContentHashable m Char where contentHashUpdate = contentHashUpdate_primitive
+
+instance Monad m => ContentHashable m Int where contentHashUpdate = contentHashUpdate_primitive
+instance Monad m => ContentHashable m Int8 where contentHashUpdate = contentHashUpdate_primitive
+instance Monad m => ContentHashable m Int16 where contentHashUpdate = contentHashUpdate_primitive
+instance Monad m => ContentHashable m Int32 where contentHashUpdate = contentHashUpdate_primitive
+instance Monad m => ContentHashable m Int64 where contentHashUpdate = contentHashUpdate_primitive
+
+instance Monad m => ContentHashable m Word where contentHashUpdate = contentHashUpdate_primitive
+instance Monad m => ContentHashable m Word8 where contentHashUpdate = contentHashUpdate_primitive
+instance Monad m => ContentHashable m Word16 where contentHashUpdate = contentHashUpdate_primitive
+instance Monad m => ContentHashable m Word32 where contentHashUpdate = contentHashUpdate_primitive
+instance Monad m => ContentHashable m Word64 where contentHashUpdate = contentHashUpdate_primitive
+
+instance Monad m => ContentHashable m Float where contentHashUpdate = contentHashUpdate_primitive
+instance Monad m => ContentHashable m Double where contentHashUpdate = contentHashUpdate_primitive
+
+instance (ContentHashable m n, Typeable n) => ContentHashable m (Ratio n) where
+ contentHashUpdate ctx x =
+ flip contentHashUpdate_fingerprint x
+ >=> flip contentHashUpdate (numerator x)
+ >=> flip contentHashUpdate (denominator x)
+ $ ctx
+
+instance Monad m => ContentHashable m Scientific where
+ contentHashUpdate ctx x =
+ flip contentHashUpdate_fingerprint x
+ >=> flip contentHashUpdate (toRational x)
+ $ ctx
+
+instance Monad m => ContentHashable m Integer where
+ contentHashUpdate ctx n = ($ ctx) $
+ flip contentHashUpdate_fingerprint n >=> case n of
+ S# i ->
+ pure . flip hashUpdate (C8.pack "S") -- tag constructur
+ >=> flip contentHashUpdate_storable (I# i) -- hash field
+ Jp# (BN# ba) ->
+ pure . flip hashUpdate (C8.pack "L") -- tag constructur
+ >=> pure . contentHashUpdate_byteArray# ba 0 (I# (sizeofByteArray# ba)) -- hash field
+ Jn# (BN# ba) ->
+ pure . flip hashUpdate (C8.pack "N") -- tag constructur
+ >=> pure . contentHashUpdate_byteArray# ba 0 (I# (sizeofByteArray# ba)) -- hash field
+
+instance Monad m => ContentHashable m Natural where
+ contentHashUpdate ctx n = ($ ctx) $
+ flip contentHashUpdate_fingerprint n >=> case n of
+ NatS# w ->
+ pure . flip hashUpdate (C8.pack "S") -- tag constructur
+ >=> flip contentHashUpdate_storable (W# w) -- hash field
+ NatJ# (BN# ba) ->
+ pure . flip hashUpdate (C8.pack "L") -- tag constructur
+ >=> pure . contentHashUpdate_byteArray# ba 0 (I# (sizeofByteArray# ba)) -- hash field
+
+instance Monad m => ContentHashable m BS.ByteString where
+ contentHashUpdate ctx s =
+ flip contentHashUpdate_fingerprint s
+ >=> pure . flip hashUpdate s $ ctx
+
+instance Monad m => ContentHashable m BSL.ByteString where
+ contentHashUpdate ctx s =
+ flip contentHashUpdate_fingerprint s
+ >=> pure . flip (BSL.foldlChunks hashUpdate) s $ ctx
+
+instance Monad m => ContentHashable m T.Text where
+ contentHashUpdate ctx s =
+ flip contentHashUpdate_fingerprint s
+ >=> pure . flip contentHashUpdate_text s $ ctx
+
+instance Monad m => ContentHashable m TL.Text where
+ contentHashUpdate ctx s =
+ flip contentHashUpdate_fingerprint s
+ >=> pure . flip (TL.foldlChunks contentHashUpdate_text) s $ ctx
+
+instance (Typeable k, Typeable v, ContentHashable m k, ContentHashable m v)
+ => ContentHashable m (Map k v) where
+ contentHashUpdate ctx m =
+ flip contentHashUpdate_fingerprint m
+ >=> flip contentHashUpdate (Map.toList m) $ ctx
+
+instance (Typeable k, Typeable v, ContentHashable m k, ContentHashable m v)
+ => ContentHashable m (HashMap.HashMap k v) where
+ contentHashUpdate ctx m =
+ flip contentHashUpdate_fingerprint m
+ -- XXX: The order of the list is unspecified.
+ >=> flip contentHashUpdate (HashMap.toList m) $ ctx
+
+instance (Typeable v, ContentHashable m v)
+ => ContentHashable m (HashSet.HashSet v) where
+ contentHashUpdate ctx s =
+ flip contentHashUpdate_fingerprint s
+ -- XXX: The order of the list is unspecified.
+ >=> flip contentHashUpdate (HashSet.toList s) $ ctx
+
+instance (Typeable a, ContentHashable m a)
+ => ContentHashable m [a] where
+ contentHashUpdate ctx l =
+ flip contentHashUpdate_fingerprint l
+ >=> flip (foldM contentHashUpdate) l $ ctx
+
+instance (Typeable a, ContentHashable m a)
+ => ContentHashable m (NonEmpty a) where
+ contentHashUpdate ctx l =
+ flip contentHashUpdate_fingerprint l
+ >=> flip (foldlM contentHashUpdate) l $ ctx
+
+instance (Typeable a, ContentHashable m a)
+ => ContentHashable m (V.Vector a) where
+ contentHashUpdate ctx v =
+ flip contentHashUpdate_fingerprint v
+ >=> flip (V.foldM' contentHashUpdate) v $ ctx
+
+instance Monad m => ContentHashable m ()
+instance (ContentHashable m a, ContentHashable m b) => ContentHashable m (a, b)
+instance (ContentHashable m a, ContentHashable m b, ContentHashable m c) => ContentHashable m (a, b, c)
+instance (ContentHashable m a, ContentHashable m b, ContentHashable m c, ContentHashable m d) => ContentHashable m (a, b, c, d)
+instance (ContentHashable m a, ContentHashable m b, ContentHashable m c, ContentHashable m d, ContentHashable m e) => ContentHashable m (a, b, c, d, e)
+instance (Monad m, ContentHashable m a, ContentHashable m b, ContentHashable m c, ContentHashable m d, ContentHashable m e, ContentHashable m f) => ContentHashable m (a, b, c, d, e, f)
+instance (Monad m, ContentHashable m a, ContentHashable m b, ContentHashable m c, ContentHashable m d, ContentHashable m e, ContentHashable m f, ContentHashable m g) => ContentHashable m (a, b, c, d, e, f, g)
+
+instance ContentHashable m a => ContentHashable m (Maybe a)
+
+instance (ContentHashable m a, ContentHashable m b) => ContentHashable m (Either a b)
+
+instance Monad m => ContentHashable m Aeson.Value
+
+
+class Monad m => GContentHashable m f where
+ gContentHashUpdate :: Context SHA256 -> f a -> m (Context SHA256)
+
+instance Monad m => GContentHashable m V1 where
+ gContentHashUpdate ctx _ = pure ctx
+
+instance Monad m => GContentHashable m U1 where
+ gContentHashUpdate ctx U1 = pure ctx
+
+instance ContentHashable m c => GContentHashable m (K1 i c) where
+ gContentHashUpdate ctx x = contentHashUpdate ctx (unK1 x)
+
+instance (Constructor c, GContentHashable m f) => GContentHashable m (C1 c f) where
+ gContentHashUpdate ctx0 x = gContentHashUpdate nameCtx (unM1 x)
+ where nameCtx = hashUpdate ctx0 $ C8.pack (conName x)
+
+instance (Datatype d, GContentHashable m f) => GContentHashable m (D1 d f) where
+ gContentHashUpdate ctx0 x = gContentHashUpdate packageCtx (unM1 x)
+ where
+ datatypeCtx = hashUpdate ctx0 $ C8.pack (datatypeName x)
+ moduleCtx = hashUpdate datatypeCtx $ C8.pack (datatypeName x)
+ packageCtx = hashUpdate moduleCtx $ C8.pack (datatypeName x)
+
+instance GContentHashable m f => GContentHashable m (S1 s f) where
+ gContentHashUpdate ctx x = gContentHashUpdate ctx (unM1 x)
+
+instance (GContentHashable m a, GContentHashable m b) => GContentHashable m (a :*: b) where
+ gContentHashUpdate ctx (x :*: y) = gContentHashUpdate ctx x >>= flip gContentHashUpdate y
+
+instance (GContentHashable m a, GContentHashable m b) => GContentHashable m (a :+: b) where
+ gContentHashUpdate ctx (L1 x) = gContentHashUpdate ctx x
+ gContentHashUpdate ctx (R1 x) = gContentHashUpdate ctx x
+
+-- XXX: Do we need this?
+-- instance GContentHashable (a :.: b) where
+-- gContentHashUpdate ctx x = _ (unComp1 x)
+
+
+instance (Monad m, Typeable b, Typeable t) => ContentHashable m (Path.Path b t) where
+ contentHashUpdate ctx p@(Path.Internal.Path fp) =
+ flip contentHashUpdate_fingerprint p
+ >=> flip contentHashUpdate fp
+ $ ctx
+
+
+-- | Path to a regular file
+--
+-- Only the file's content and its executable permission is taken into account
+-- when generating the content hash. The path itself is ignored.
+newtype FileContent = FileContent (Path.Path Path.Abs Path.File)
+
+instance ContentHashable IO FileContent where
+
+ contentHashUpdate ctx (FileContent fp) = do
+ exec <- Path.IO.executable <$> Path.IO.getPermissions fp
+ ctx' <- if exec then contentHashUpdate ctx () else pure ctx
+ contentHashUpdate_binaryFile ctx' (Path.fromAbsFile fp)
+
+-- | Path to a directory
+--
+-- Only the contents of the directory and their path relative to the directory
+-- are taken into account when generating the content hash.
+-- The path to the directory is ignored.
+newtype DirectoryContent = DirectoryContent (Path.Path Path.Abs Path.Dir)
+
+instance ContentHashable IO DirectoryContent where
+
+ contentHashUpdate ctx0 (DirectoryContent dir0) = do
+ (dirs, files) <- Path.IO.listDir dir0
+ ctx' <- foldM hashFile ctx0 (sort files)
+ foldM hashDir ctx' (sort dirs)
+ where
+ hashFile ctx fp =
+ -- XXX: Do we need to treat symbolic links specially?
+ flip contentHashUpdate (Path.filename fp)
+ >=> flip contentHashUpdate (FileContent fp)
+ $ ctx
+ hashDir ctx dir =
+ flip contentHashUpdate (Path.dirname dir)
+ >=> flip contentHashUpdate (DirectoryContent dir)
+ $ ctx
+
+instance Monad m => ContentHashable m UTCTime where
+ contentHashUpdate ctx utcTime = let
+ secondsSinceEpoch = fromEnum . utcTimeToPOSIXSeconds $ utcTime
+ in flip contentHashUpdate_fingerprint utcTime
+ >=> flip contentHashUpdate secondsSinceEpoch
+ $ ctx
+
+-- | Path to a file to be treated as _externally assured_.
+--
+-- An externally assured file is handled in a somewhat 'cheating' way by
+-- funflow. The 'ContentHashable' instance for such assumes that some external
+-- agent guarantees the integrity of the file being referenced. Thus, rather
+-- than hashing the file contents, we only consider its (absolute) path, size and
+-- modification time, which can be rapidly looked up from filesystem metadata.
+--
+-- For a similar approach, see the instance for 'ObjectInBucket' in
+-- Control.Funflow.AWS.S3, where we exploit the fact that S3 is already
+-- content hashed to avoid performing any hashing.
+newtype ExternallyAssuredFile = ExternallyAssuredFile (Path.Path Path.Abs Path.File)
+ deriving (Generic, Show)
+
+instance Aeson.FromJSON ExternallyAssuredFile
+instance Aeson.ToJSON ExternallyAssuredFile
+instance Store ExternallyAssuredFile
+
+instance ContentHashable IO ExternallyAssuredFile where
+ contentHashUpdate ctx (ExternallyAssuredFile fp) = do
+ modTime <- Path.IO.getModificationTime fp
+ fSize <- fileSize <$> getFileStatus (Path.toFilePath fp)
+ flip contentHashUpdate fp
+ >=> flip contentHashUpdate modTime
+ >=> flip contentHashUpdate_storable fSize
+ $ ctx
+
+
+-- | Path to a directory to be treated as _externally assured_.
+--
+-- For an externally assured directory, we _do_ traverse its contents and verify
+-- those as we would externally assured files, rather than just relying on the
+-- directory path. Doing this traversal is pretty cheap, and it's quite likely
+-- for directory contents to be modified without modifying the contents.
+newtype ExternallyAssuredDirectory = ExternallyAssuredDirectory (Path.Path Path.Abs Path.Dir)
+ deriving (Generic, Show)
+
+instance Aeson.FromJSON ExternallyAssuredDirectory
+instance Aeson.ToJSON ExternallyAssuredDirectory
+instance Store ExternallyAssuredDirectory
+
+instance ContentHashable IO ExternallyAssuredDirectory where
+ contentHashUpdate ctx0 (ExternallyAssuredDirectory dir0) = do
+ -- Note that we don't bother looking at the relative directory paths and
+ -- including these in the hash. This is because the absolute hash gets
+ -- included every time we hash a file.
+ (dirs, files) <- Path.IO.listDir dir0
+ ctx' <- foldM hashFile ctx0 (sort files)
+ foldM hashDir ctx' (sort dirs)
+ where
+ hashFile ctx fp = contentHashUpdate ctx (ExternallyAssuredFile fp)
+ hashDir ctx dir = contentHashUpdate ctx (ExternallyAssuredDirectory dir)
diff --git a/src/Control/Funflow/ContentStore.hs b/src/Control/Funflow/ContentStore.hs
new file mode 100644
index 0000000..b75f7f6
--- /dev/null
+++ b/src/Control/Funflow/ContentStore.hs
@@ -0,0 +1,1013 @@
+{-# LANGUAGE DeriveGeneric #-}
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE GADTs #-}
+{-# LANGUAGE GeneralizedNewtypeDeriving #-}
+{-# LANGUAGE LambdaCase #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE NamedFieldPuns #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE PatternSynonyms #-}
+{-# LANGUAGE QuasiQuotes #-}
+{-# LANGUAGE RecordWildCards #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE StandaloneDeriving #-}
+
+-- | Hash addressed store in file system.
+--
+-- Associates a key ('Control.Funflow.ContentHashable.ContentHash')
+-- with an item in the store. An item can either be
+-- 'Control.Funflow.ContentStore.Missing',
+-- 'Control.Funflow.ContentStore.Pending', or
+-- 'Control.Funflow.ContentStore.Complete'.
+-- The state is persisted in the file system.
+--
+-- Items are stored under a path derived from their hash. Therefore,
+-- there can be no two copies of the same item in the store.
+-- If two keys are associated with the same item, then there will be
+-- only one copy of that item in the store.
+--
+-- The store is thread-safe and multi-process safe.
+--
+-- It is assumed that the user that the process is running under is the owner
+-- of the store root, or has permission to create it if missing.
+--
+-- It is assumed that the store root and its immediate contents are not modified
+-- externally. The contents of pending items may be modified externally.
+--
+-- __Implementation notes:__
+--
+-- The hash of an item can only be determined once it is completed.
+-- If that hash already exists in the store, then the new item is discarded.
+--
+-- Store state is persisted in the file-system:
+--
+-- * Pending items are stored writable under the path @pending-\<key>@.
+-- * Complete items are stored read-only under the path @item-\<hash>@,
+-- with a link under @complete-\<key>@ pointing to that directory.
+module Control.Funflow.ContentStore
+ (
+ -- * Open/Close
+ withStore
+ , open
+ , close
+
+ -- * List Contents
+ , listAll
+ , listPending
+ , listComplete
+ , listItems
+
+ -- * Query/Lookup
+ , query
+ , isMissing
+ , isPending
+ , isComplete
+ , lookup
+ , lookupOrWait
+ , waitUntilComplete
+
+ -- * Construct Items
+ , constructOrAsync
+ , constructOrWait
+ , constructIfMissing
+ , withConstructIfMissing
+ , markPending
+ , markComplete
+
+ -- * Remove Contents
+ , removeFailed
+ , removeForcibly
+ , removeItemForcibly
+
+ -- * Aliases
+ , assignAlias
+ , lookupAlias
+ , removeAlias
+ , listAliases
+
+ -- * Metadata
+ , getBackReferences
+ , setInputs
+ , getInputs
+ , setMetadata
+ , getMetadata
+ , createMetadataFile
+ , getMetadataFile
+
+ -- * Accessors
+ , itemHash
+ , itemPath
+ , contentPath
+ , contentItem
+ , contentFilename
+ , root
+
+ -- * Types
+ , ContentStore
+ , Item
+ , Content (..)
+ , (^</>)
+ , Alias (..)
+ , Status (..)
+ , Status_
+ , Update (..)
+ , StoreError (..)
+ ) where
+
+
+import Prelude hiding (lookup)
+
+import Control.Arrow (second)
+import Control.Concurrent (threadDelay)
+import Control.Concurrent.Async
+import Control.Concurrent.MVar
+import Control.Exception.Safe (Exception, MonadMask,
+ bracket, bracket_,
+ bracketOnError,
+ displayException, throwIO)
+import Control.Funflow.ContentStore.Notify
+import Control.Funflow.Orphans ()
+import Control.Lens
+import Control.Monad (forever, forM_, unless,
+ void, when, (<=<), (>=>))
+import Control.Monad.IO.Class (MonadIO, liftIO)
+import Crypto.Hash (hashUpdate)
+import Data.Aeson (FromJSON, ToJSON)
+import Data.Bits (complement)
+import qualified Data.ByteString.Char8 as C8
+import Data.Foldable (asum)
+import qualified Data.Hashable
+import Data.List (foldl', stripPrefix)
+import Data.Maybe (fromMaybe, listToMaybe)
+import Data.Monoid ((<>))
+import qualified Data.Store
+import Data.String (IsString (..))
+import qualified Data.Text as T
+import Data.Typeable (Typeable)
+import Data.Void
+import qualified Database.SQLite.Simple as SQL
+import qualified Database.SQLite.Simple.FromField as SQL
+import qualified Database.SQLite.Simple.ToField as SQL
+import GHC.Generics (Generic)
+import Path
+import Path.IO
+import System.Directory (removePathForcibly)
+import System.FilePath (dropTrailingPathSeparator)
+import System.IO (Handle, IOMode (..),
+ openFile)
+import System.Posix.Files
+import System.Posix.Types
+
+import Control.Funflow.ContentHashable (ContentHash,
+ ContentHashable (..),
+ DirectoryContent (..),
+ contentHashUpdate_fingerprint,
+ encodeHash, pathToHash,
+ toBytes)
+import Control.Funflow.Lock
+
+
+-- | Status of an item in the store.
+data Status missing pending complete
+ = Missing missing
+ -- ^ The item does not exist, yet.
+ | Pending pending
+ -- ^ The item is under construction and not ready for consumption.
+ | Complete complete
+ -- ^ The item is complete and ready for consumption.
+ deriving (Eq, Show)
+
+type Status_ = Status () () ()
+
+-- | Update about the status of a pending item.
+data Update
+ = Completed Item
+ -- ^ The item is now completed and ready for consumption.
+ | Failed
+ -- ^ Constructing the item failed.
+ deriving (Eq, Show)
+
+-- | Errors that can occur when interacting with the store.
+data StoreError
+ = NotPending ContentHash
+ -- ^ An item is not under construction when it should be.
+ | AlreadyPending ContentHash
+ -- ^ An item is already under construction when it should be missing.
+ | AlreadyComplete ContentHash
+ -- ^ An item is already complete when it shouldn't be.
+ | CorruptedLink ContentHash FilePath
+ -- ^ The link under the given hash points to an invalid path.
+ | FailedToConstruct ContentHash
+ -- ^ A failure occurred while waiting for the item to be constructed.
+ | IncompatibleStoreVersion (Path Abs Dir) Int Int
+ -- ^ @IncompatibleStoreVersion storeDir actual expected@
+ -- The given store has a version number that is incompatible.
+ | MalformedMetadataEntry ContentHash SQL.SQLData
+ -- ^ @MalformedMetadataEntry hash key@
+ -- The metadata entry for the give @hash@, @key@ pair is malformed.
+ deriving (Show, Typeable)
+instance Exception StoreError where
+ displayException = \case
+ NotPending hash ->
+ "The following input hash is not pending '"
+ ++ C8.unpack (encodeHash hash)
+ ++ "'."
+ AlreadyPending hash ->
+ "The following input hash is already pending '"
+ ++ C8.unpack (encodeHash hash)
+ ++ "'."
+ AlreadyComplete hash ->
+ "The following input hash is already completed '"
+ ++ C8.unpack (encodeHash hash)
+ ++ "'."
+ CorruptedLink hash fp ->
+ "The completed input hash '"
+ ++ C8.unpack (encodeHash hash)
+ ++ "' points to an invalid store item '"
+ ++ fp
+ ++ "'."
+ FailedToConstruct hash ->
+ "Failed to construct the input hash '"
+ ++ C8.unpack (encodeHash hash)
+ ++ "'."
+ IncompatibleStoreVersion storeDir actual expected ->
+ "The store in '"
+ ++ fromAbsDir storeDir
+ ++ "' has version "
+ ++ show actual
+ ++ ". This software expects version "
+ ++ show expected
+ ++ ". No automatic migration is available, \
+ \please use a fresh store location."
+ MalformedMetadataEntry hash key ->
+ "The metadtaa entry for hash '"
+ ++ C8.unpack (encodeHash hash)
+ ++ "' under key '"
+ ++ show key
+ ++ "' is malformed."
+
+-- | A hash addressed store on the file system.
+data ContentStore = ContentStore
+ { storeRoot :: Path Abs Dir
+ -- ^ Root directory of the content store.
+ -- The process must be able to create this directory if missing,
+ -- change permissions, and create files and directories within.
+ , storeLock :: Lock
+ -- ^ Write lock on store metadata to ensure multi thread and process safety.
+ -- The lock is taken when item state is changed or queried.
+ , storeNotifier :: Notifier
+ -- ^ Used to watch for updates on store items.
+ , storeDb :: SQL.Connection
+ -- ^ Connection to the metadata SQLite database.
+ }
+
+-- | A completed item in the 'ContentStore'.
+data Item = Item { itemHash :: ContentHash }
+ deriving (Eq, Ord, Show, Generic)
+
+instance Monad m => ContentHashable m Item where
+ contentHashUpdate ctx item =
+ flip contentHashUpdate_fingerprint item
+ >=> pure . flip hashUpdate (toBytes $ itemHash item)
+ $ ctx
+
+instance FromJSON Item
+instance ToJSON Item
+instance Data.Hashable.Hashable Item
+instance Data.Store.Store Item
+
+-- | File or directory within a content store 'Item'.
+data Content t where
+ All :: Item -> Content Dir
+ (:</>) :: Item -> Path Rel t -> Content t
+infixr 5 :</>
+deriving instance Eq (Content t)
+deriving instance Show (Content t)
+instance Monad m => ContentHashable m (Content Dir) where
+ contentHashUpdate ctx x = case x of
+ All i ->
+ flip contentHashUpdate_fingerprint x
+ >=> flip contentHashUpdate i
+ $ ctx
+ i :</> p ->
+ flip contentHashUpdate_fingerprint x
+ >=> flip contentHashUpdate i
+ >=> flip contentHashUpdate p
+ $ ctx
+instance Monad m => ContentHashable m (Content File) where
+ contentHashUpdate ctx x = case x of
+ i :</> p ->
+ flip contentHashUpdate_fingerprint x
+ >=> flip contentHashUpdate i
+ >=> flip contentHashUpdate p
+ $ ctx
+
+-- | Append to the path within a store item.
+(^</>) :: Content Dir -> Path Rel t -> Content t
+All item ^</> path = item :</> path
+(item :</> dir) ^</> path = item :</> dir </> path
+infixl 4 ^</>
+
+newtype Alias = Alias { unAlias :: T.Text }
+ deriving (ContentHashable IO, Eq, Ord, Show, SQL.FromField, SQL.ToField, Data.Store.Store)
+
+-- | The root directory of the store.
+root :: ContentStore -> Path Abs Dir
+root = storeRoot
+
+-- | The store path of a completed item.
+itemPath :: ContentStore -> Item -> Path Abs Dir
+itemPath store = mkItemPath store . itemHash
+
+-- | Store item containing the given content.
+contentItem :: Content t -> Item
+contentItem (All i) = i
+contentItem (i :</> _) = i
+
+contentFilename :: Content File -> Path Rel File
+contentFilename (_ :</> relPath) = filename relPath
+
+-- | The absolute path to content within the store.
+contentPath :: ContentStore -> Content t -> Path Abs t
+contentPath store (All item) = itemPath store item
+contentPath store (item :</> dir) = itemPath store item </> dir
+
+-- | @open root@ opens a store under the given root directory.
+--
+-- The root directory is created if necessary.
+--
+-- It is not safe to have multiple store objects
+-- refer to the same root directory.
+open :: Path Abs Dir -> IO ContentStore
+open storeRoot = do
+ createDirIfMissing True storeRoot
+ storeLock <- openLock (lockPath storeRoot)
+ withLock storeLock $ withWritableStoreRoot storeRoot $ do
+ storeDb <- SQL.open (fromAbsFile $ dbPath storeRoot)
+ initDb storeRoot storeDb
+ createDirIfMissing True (metadataPath storeRoot)
+ storeNotifier <- initNotifier
+ return ContentStore {..}
+
+-- | Free the resources associated with the given store object.
+--
+-- The store object may not be used afterwards.
+close :: ContentStore -> IO ()
+close store = do
+ closeLock (storeLock store)
+ killNotifier (storeNotifier store)
+ SQL.close (storeDb store)
+
+-- | Open the store under the given root and perform the given action.
+-- Closes the store once the action is complete
+--
+-- See also: 'Control.Funflow.ContentStore.open'
+withStore :: (MonadIO m, MonadMask m)
+ => Path Abs Dir -> (ContentStore -> m a) -> m a
+withStore root' = bracket (liftIO $ open root') (liftIO . close)
+
+-- | List all elements in the store
+-- @(pending keys, completed keys, completed items)@.
+listAll :: MonadIO m => ContentStore -> m ([ContentHash], [ContentHash], [Item])
+listAll ContentStore {storeRoot} = liftIO $
+ foldr go ([], [], []) . fst <$> listDir storeRoot
+ where
+ go d prev@(builds, outs, items) = fromMaybe prev $ asum
+ [ parsePending d >>= \x -> Just (x:builds, outs, items)
+ , parseComplete d >>= \x -> Just (builds, x:outs, items)
+ , parseItem d >>= \x -> Just (builds, outs, x:items)
+ ]
+ parsePending :: Path Abs Dir -> Maybe ContentHash
+ parsePending = pathToHash <=< stripPrefix pendingPrefix . extractDir
+ parseComplete :: Path Abs Dir -> Maybe ContentHash
+ parseComplete = pathToHash <=< stripPrefix completePrefix . extractDir
+ parseItem :: Path Abs Dir -> Maybe Item
+ parseItem = fmap Item . pathToHash <=< stripPrefix itemPrefix . extractDir
+ extractDir :: Path Abs Dir -> FilePath
+ extractDir = dropTrailingPathSeparator . fromRelDir . dirname
+
+-- | List all pending keys in the store.
+listPending :: MonadIO m => ContentStore -> m [ContentHash]
+listPending = fmap (^._1) . listAll
+
+-- | List all completed keys in the store.
+listComplete :: MonadIO m => ContentStore -> m [ContentHash]
+listComplete = fmap (^._2) . listAll
+
+-- | List all completed items in the store.
+listItems :: MonadIO m => ContentStore -> m [Item]
+listItems = fmap (^._3) . listAll
+
+-- | Query the state of the item under the given key.
+query :: MonadIO m => ContentStore -> ContentHash -> m (Status () () ())
+query store hash = liftIO . withStoreLock store $
+ internalQuery store hash >>= pure . \case
+ Missing _ -> Missing ()
+ Pending _ -> Pending ()
+ Complete _ -> Complete ()
+
+-- | Check if there is no complete or pending item under the given key.
+isMissing :: MonadIO m => ContentStore -> ContentHash -> m Bool
+isMissing store hash = (== Missing ()) <$> query store hash
+
+-- | Check if there is a pending item under the given key.
+isPending :: MonadIO m => ContentStore -> ContentHash -> m Bool
+isPending store hash = (== Pending ()) <$> query store hash
+
+-- | Check if there is a completed item under the given key.
+isComplete :: MonadIO m => ContentStore -> ContentHash -> m Bool
+isComplete store hash = (== Complete ()) <$> query store hash
+
+-- | Query the state under the given key and return the item if completed.
+-- Doesn't block if the item is pending.
+lookup :: MonadIO m => ContentStore -> ContentHash -> m (Status () () Item)
+lookup store hash = liftIO . withStoreLock store $
+ internalQuery store hash >>= \case
+ Missing () -> return $ Missing ()
+ Pending _ -> return $ Pending ()
+ Complete item -> return $ Complete item
+
+-- | Query the state under the given key and return the item if completed.
+-- Return an 'Control.Concurrent.Async' to await an update, if pending.
+lookupOrWait
+ :: MonadIO m
+ => ContentStore
+ -> ContentHash
+ -> m (Status () (Async Update) Item)
+lookupOrWait store hash = liftIO . withStoreLock store $
+ internalQuery store hash >>= \case
+ Complete item -> return $ Complete item
+ Missing () -> return $ Missing ()
+ Pending _ -> Pending <$> internalWatchPending store hash
+
+-- | Query the state under the given key and return the item once completed.
+-- Blocks if the item is pending.
+-- Returns 'Nothing' if the item is missing, or failed to be completed.
+waitUntilComplete :: MonadIO m => ContentStore -> ContentHash -> m (Maybe Item)
+waitUntilComplete store hash = lookupOrWait store hash >>= \case
+ Complete item -> return $ Just item
+ Missing () -> return Nothing
+ Pending a -> liftIO (wait a) >>= \case
+ Completed item -> return $ Just item
+ Failed -> return Nothing
+
+-- | Atomically query the state under the given key and mark pending if missing.
+--
+-- Returns @'Complete' item@ if the item is complete.
+-- Returns @'Pending' async@ if the item is pending, where @async@ is an
+-- 'Control.Concurrent.Async' to await updates on.
+-- Returns @'Missing' buildDir@ if the item was missing, and is now pending.
+-- It should be constructed in the given @buildDir@,
+-- and then marked as complete using 'markComplete'.
+constructOrAsync
+ :: MonadIO m
+ => ContentStore
+ -> ContentHash
+ -> m (Status (Path Abs Dir) (Async Update) Item)
+constructOrAsync store hash = liftIO . withStoreLock store $
+ internalQuery store hash >>= \case
+ Complete item -> return $ Complete item
+ Missing () -> withWritableStore store $
+ Missing <$> internalMarkPending store hash
+ Pending _ -> Pending <$> internalWatchPending store hash
+
+-- | Atomically query the state under the given key and mark pending if missing.
+-- Wait for the item to be completed, if already pending.
+-- Throws a 'FailedToConstruct' error if construction fails.
+--
+-- Returns @'Complete' item@ if the item is complete.
+-- Returns @'Missing' buildDir@ if the item was missing, and is now pending.
+-- It should be constructed in the given @buildDir@,
+-- and then marked as complete using 'markComplete'.
+constructOrWait
+ :: MonadIO m
+ => ContentStore
+ -> ContentHash
+ -> m (Status (Path Abs Dir) Void Item)
+constructOrWait store hash = constructOrAsync store hash >>= \case
+ Pending a -> liftIO (wait a) >>= \case
+ Completed item -> return $ Complete item
+ -- XXX: Consider extending 'Status' with a 'Failed' constructor.
+ -- If the store contains metadata as well, it could keep track of the
+ -- number of failed attempts and further details about the failure.
+ -- If an external task is responsible for the failure, the client could
+ -- choose to resubmit a certain number of times.
+ Failed -> liftIO . throwIO $ FailedToConstruct hash
+ Complete item -> return $ Complete item
+ Missing dir -> return $ Missing dir
+
+-- | Atomically query the state under the given key and mark pending if missing.
+constructIfMissing
+ :: MonadIO m
+ => ContentStore
+ -> ContentHash
+ -> m (Status (Path Abs Dir) () Item)
+constructIfMissing store hash = liftIO . withStoreLock store $
+ internalQuery store hash >>= \case
+ Complete item -> return $ Complete item
+ Pending _ -> return $ Pending ()
+ Missing () -> withWritableStore store $
+ Missing <$> internalMarkPending store hash
+
+-- | Atomically query the state under the given key and mark pending if missing.
+-- Execute the given function to construct the item, mark as complete on success
+-- and remove on failure. Forcibly removes if an uncaught exception occurs
+-- during item construction.
+withConstructIfMissing
+ :: (MonadIO m, MonadMask m)
+ => ContentStore
+ -> ContentHash
+ -> (Path Abs Dir -> m (Either e a))
+ -> m (Status e () (Maybe a, Item))
+withConstructIfMissing store hash f =
+ bracketOnError
+ (constructIfMissing store hash)
+ (\case
+ Missing _ -> removeForcibly store hash
+ _ -> return ())
+ (\case
+ Pending () -> return (Pending ())
+ Complete item -> return (Complete (Nothing, item))
+ Missing fp -> f fp >>= \case
+ Left e -> do
+ removeFailed store hash
+ return (Missing e)
+ Right x -> do
+ item <- markComplete store hash
+ return (Complete (Just x, item)))
+
+-- | Mark a non-existent item as pending.
+--
+-- Creates the build directory and returns its path.
+--
+-- See also: 'Control.Funflow.ContentStore.constructIfMissing'.
+markPending :: MonadIO m => ContentStore -> ContentHash -> m (Path Abs Dir)
+markPending store hash = liftIO . withStoreLock store $
+ internalQuery store hash >>= \case
+ Complete _ -> throwIO (AlreadyComplete hash)
+ Pending _ -> throwIO (AlreadyPending hash)
+ Missing () -> withWritableStore store $
+ internalMarkPending store hash
+
+-- | Mark a pending item as complete.
+markComplete :: MonadIO m => ContentStore -> ContentHash -> m Item
+markComplete store inHash = liftIO . withStoreLock store $
+ internalQuery store inHash >>= \case
+ Missing () -> throwIO (NotPending inHash)
+ Complete _ -> throwIO (AlreadyComplete inHash)
+ Pending build -> withWritableStore store $ liftIO $ do
+ do
+ let metadataDir = mkMetadataDirPath store inHash
+ exists <- doesDirExist metadataDir
+ when exists $
+ unsetWritableRecursively metadataDir
+ -- XXX: Hashing large data can take some time,
+ -- could we avoid locking the store for all that time?
+ outHash <- contentHash (DirectoryContent build)
+ let out = mkItemPath store outHash
+ link' = mkCompletePath store inHash
+ doesDirExist out >>= \case
+ True -> removePathForcibly (fromAbsDir build)
+ False -> do
+ renameDir build out
+ unsetWritableRecursively out
+ rel <- makeRelative (parent link') out
+ let from' = dropTrailingPathSeparator $ fromAbsDir link'
+ to' = dropTrailingPathSeparator $ fromRelDir rel
+ createSymbolicLink to' from'
+ addBackReference store inHash (Item outHash)
+ pure $! Item outHash
+
+-- | Remove a pending item.
+--
+-- It is the callers responsibility to ensure that no other threads or processes
+-- will attempt to access the item's contents afterwards.
+removeFailed :: MonadIO m => ContentStore -> ContentHash -> m ()
+removeFailed store hash = liftIO . withStoreLock store $
+ internalQuery store hash >>= \case
+ Missing () -> throwIO (NotPending hash)
+ Complete _ -> throwIO (AlreadyComplete hash)
+ Pending build -> withWritableStore store $
+ removePathForcibly (fromAbsDir build)
+
+-- | Remove a key association independent of the corresponding item state.
+-- Do nothing if no item exists under the given key.
+--
+-- It is the callers responsibility to ensure that no other threads or processes
+-- will attempt to access the contents afterwards.
+--
+-- Note, this will leave an orphan item behind if no other keys point to it.
+-- There is no garbage collection mechanism in place at the moment.
+removeForcibly :: MonadIO m => ContentStore -> ContentHash -> m ()
+removeForcibly store hash = liftIO . withStoreLock store $ withWritableStore store $
+ internalQuery store hash >>= \case
+ Missing () -> pure ()
+ Pending build -> liftIO $ removePathForcibly (fromAbsDir build)
+ Complete _out -> liftIO $
+ removePathForcibly $
+ dropTrailingPathSeparator $ fromAbsDir $ mkCompletePath store hash
+ -- XXX: This will leave orphan store items behind.
+ -- Add GC in some form.
+
+-- | Remove a completed item in the store.
+-- Do nothing if not completed.
+--
+-- It is the callers responsibility to ensure that no other threads or processes
+-- will attempt to access the contents afterwards.
+--
+-- Note, this will leave keys pointing to that item dangling.
+-- There is no garbage collection mechanism in place at the moment.
+removeItemForcibly :: MonadIO m => ContentStore -> Item -> m ()
+removeItemForcibly store item = liftIO . withStoreLock store $ withWritableStore store $
+ removePathForcibly (fromAbsDir $ itemPath store item)
+ -- XXX: Remove dangling links.
+ -- Add back-references in some form.
+
+-- | Link the given alias to the given item.
+-- If the alias existed before it is overwritten.
+assignAlias :: MonadIO m => ContentStore -> Alias -> Item -> m ()
+assignAlias store alias item =
+ liftIO . withStoreLock store $ withWritableStore store $ do
+ hash <- contentHash alias
+ SQL.executeNamed (storeDb store)
+ "INSERT OR REPLACE INTO\
+ \ aliases\
+ \ VALUES\
+ \ (:hash, :dest, :name)"
+ [ ":hash" SQL.:= hash
+ , ":dest" SQL.:= itemHash item
+ , ":name" SQL.:= alias
+ ]
+
+-- | Lookup an item under the given alias.
+-- Returns 'Nothing' if the alias does not exist.
+lookupAlias :: MonadIO m => ContentStore -> Alias -> m (Maybe Item)
+lookupAlias store alias =
+ liftIO . withStoreLock store $ do
+ hash <- contentHash alias
+ r <- SQL.queryNamed (storeDb store)
+ "SELECT dest FROM aliases\
+ \ WHERE\
+ \ hash = :hash"
+ [ ":hash" SQL.:= hash ]
+ pure $! listToMaybe $ Item . SQL.fromOnly <$> r
+
+-- | Remove the given alias.
+removeAlias :: MonadIO m => ContentStore -> Alias -> m ()
+removeAlias store alias =
+ liftIO . withStoreLock store $ withWritableStore store $ do
+ hash <- contentHash alias
+ SQL.executeNamed (storeDb store)
+ "DELETE FROM aliases\
+ \ WHERE\
+ \ hash = :hash"
+ [ ":hash" SQL.:= hash ]
+
+-- | List all aliases and the respective items.
+listAliases :: MonadIO m => ContentStore -> m [(Alias, Item)]
+listAliases store = liftIO . withStoreLock store $
+ fmap (map (second Item)) $
+ SQL.query_ (storeDb store)
+ "SELECT name, dest FROM aliases"
+
+-- | Get all hashes that resulted in the given item.
+getBackReferences :: MonadIO m => ContentStore -> Item -> m [ContentHash]
+getBackReferences store (Item outHash) = liftIO . withStoreLock store $
+ map SQL.fromOnly <$> SQL.queryNamed (storeDb store)
+ "SELECT hash FROM backrefs\
+ \ WHERE\
+ \ dest = :out"
+ [ ":out" SQL.:= outHash ]
+
+-- | Define the input items to a subtree.
+setInputs :: MonadIO m => ContentStore -> ContentHash -> [Item] -> m ()
+setInputs store hash items = liftIO $
+ withStoreLock store $
+ withWritableStore store $
+ internalQuery store hash >>= \case
+ Pending _ -> forM_ items $ \(Item input) ->
+ SQL.executeNamed (storeDb store)
+ "INSERT OR REPLACE INTO\
+ \ inputs (hash, input)\
+ \ VALUES\
+ \ (:hash, :input)"
+ [ ":hash" SQL.:= hash
+ , ":input" SQL.:= input
+ ]
+ _ -> throwIO $ NotPending hash
+
+-- | Get the input items to a subtree if any were defined.
+getInputs :: MonadIO m => ContentStore -> ContentHash -> m [Item]
+getInputs store hash = liftIO . withStoreLock store $
+ map (Item . SQL.fromOnly) <$> SQL.queryNamed (storeDb store)
+ "SELECT input FROM inputs\
+ \ WHERE\
+ \ hash = :hash"
+ [ ":hash" SQL.:= hash ]
+
+-- | Set a metadata entry on an item.
+setMetadata :: (SQL.ToField k, SQL.ToField v, MonadIO m )
+ => ContentStore -> ContentHash -> k -> v -> m ()
+setMetadata store hash k v = liftIO $
+ withStoreLock store $
+ withWritableStore store $
+ SQL.executeNamed (storeDb store)
+ "INSERT OR REPLACE INTO\
+ \ metadata (hash, key, value)\
+ \ VALUES\
+ \ (:hash, :key, :value)"
+ [ ":hash" SQL.:= hash
+ , ":key" SQL.:= k
+ , ":value" SQL.:= v
+ ]
+
+-- | Retrieve a metadata entry on an item, or 'Nothing' if missing.
+getMetadata :: (SQL.ToField k, SQL.FromField v, MonadIO m)
+ => ContentStore -> ContentHash -> k -> m (Maybe v)
+getMetadata store hash k = liftIO . withStoreLock store $ do
+ r <- SQL.queryNamed (storeDb store)
+ "SELECT value FROM metadata\
+ \ WHERE\
+ \ (hash = :hash AND key = :key)"
+ [ ":hash" SQL.:= hash
+ , ":key" SQL.:= k
+ ]
+ case r of
+ [] -> pure Nothing
+ [[v]] -> pure $ Just v
+ _ -> throwIO $ MalformedMetadataEntry hash (SQL.toField k)
+
+-- | Create and open a new metadata file on a pending item in write mode.
+createMetadataFile
+ :: MonadIO m
+ => ContentStore -> ContentHash -> Path Rel File -> m (Path Abs File, Handle)
+createMetadataFile store hash file = liftIO . withStoreLock store $
+ internalQuery store hash >>= \case
+ Pending _ -> do
+ let path = mkMetadataFilePath store hash file
+ createDirIfMissing True (parent path)
+ handle <- openFile (fromAbsFile path) WriteMode
+ pure (path, handle)
+ _ -> throwIO $ NotPending hash
+
+-- | Return the path to a metadata file if it exists.
+getMetadataFile
+ :: MonadIO m
+ => ContentStore -> ContentHash -> Path Rel File -> m (Maybe (Path Abs File))
+getMetadataFile store hash file = liftIO . withStoreLock store $ do
+ let path = mkMetadataFilePath store hash file
+ exists <- doesFileExist path
+ if exists then
+ pure $ Just path
+ else
+ pure Nothing
+
+----------------------------------------------------------------------
+-- Internals
+
+lockPath :: Path Abs Dir -> Path Abs Dir
+lockPath = (</> [reldir|lock|])
+
+dbPath :: Path Abs Dir -> Path Abs File
+dbPath = (</> [relfile|metadata.db|])
+
+metadataPath :: Path Abs Dir -> Path Abs Dir
+metadataPath = (</> [reldir|metadata|])
+
+-- | Holds a lock on the global 'MVar' and on the global lock file
+-- for the duration of the given action.
+withStoreLock :: ContentStore -> IO a -> IO a
+withStoreLock store = withLock (storeLock store)
+
+prefixHashPath :: C8.ByteString -> ContentHash -> Path Rel Dir
+prefixHashPath pref hash
+ | Just dir <- Path.parseRelDir $ C8.unpack $ pref <> encodeHash hash
+ = dir
+ | otherwise = error
+ "[Control.Funflow.ContentStore.prefixHashPath] \
+ \Failed to construct hash path."
+
+pendingPrefix, completePrefix, hashPrefix, itemPrefix :: IsString s => s
+pendingPrefix = "pending-"
+completePrefix = "complete-"
+hashPrefix = "hash-"
+itemPrefix = "item-"
+
+-- | Return the full build path for the given input hash.
+mkPendingPath :: ContentStore -> ContentHash -> Path Abs Dir
+mkPendingPath ContentStore {storeRoot} hash =
+ storeRoot </> prefixHashPath pendingPrefix hash
+
+-- | Return the full link path for the given input hash.
+mkCompletePath :: ContentStore -> ContentHash -> Path Abs Dir
+mkCompletePath ContentStore {storeRoot} hash =
+ storeRoot </> prefixHashPath completePrefix hash
+
+-- | Return the full store path to the given output hash.
+mkItemPath :: ContentStore -> ContentHash -> Path Abs Dir
+mkItemPath ContentStore {storeRoot} hash =
+ storeRoot </> prefixHashPath itemPrefix hash
+
+-- | Return the full store path to the given metadata directory.
+mkMetadataDirPath :: ContentStore -> ContentHash -> Path Abs Dir
+mkMetadataDirPath ContentStore {storeRoot} hash =
+ metadataPath storeRoot </> prefixHashPath hashPrefix hash
+
+-- | Return the full store path to the given metadata file.
+mkMetadataFilePath
+ :: ContentStore -> ContentHash -> Path Rel File -> Path Abs File
+mkMetadataFilePath store hash file =
+ mkMetadataDirPath store hash </> file
+
+-- | Query the state under the given key without taking a lock.
+internalQuery
+ :: MonadIO m
+ => ContentStore
+ -> ContentHash
+ -> m (Status () (Path Abs Dir) Item)
+internalQuery store inHash = liftIO $ do
+ let build = mkPendingPath store inHash
+ link' = mkCompletePath store inHash
+ buildExists <- doesDirExist build
+ if buildExists then
+ pure $! Pending build
+ else do
+ linkExists <- doesDirExist link'
+ if linkExists then do
+ out <- readSymbolicLink
+ (dropTrailingPathSeparator $ fromAbsDir link')
+ case pathToHash =<< stripPrefix itemPrefix out of
+ Nothing -> throwIO $ CorruptedLink inHash out
+ Just outHash -> return $ Complete (Item outHash)
+ else
+ pure $! Missing ()
+
+-- | Create the build directory for the given input hash
+-- and make the metadata directory writable if it exists.
+internalMarkPending :: ContentStore -> ContentHash -> IO (Path Abs Dir)
+internalMarkPending store hash = do
+ let dir = mkPendingPath store hash
+ createDir dir
+ setDirWritable dir
+ let metadataDir = mkMetadataDirPath store hash
+ metadirExists <- doesDirExist metadataDir
+ when metadirExists $
+ setWritableRecursively metadataDir
+ return dir
+
+-- | Watch the build directory of the pending item under the given key.
+-- The returned 'Async' completes after the item is completed or failed.
+internalWatchPending
+ :: ContentStore
+ -> ContentHash
+ -> IO (Async Update)
+internalWatchPending store hash = do
+ let build = mkPendingPath store hash
+ -- Add an inotify/kqueue watch and give a signal on relevant events.
+ let notifier = storeNotifier store
+ signal <- newEmptyMVar
+ -- Signal the listener. If the 'MVar' is full,
+ -- the listener didn't handle earlier signals, yet.
+ let giveSignal = void $ tryPutMVar signal ()
+ watch <- addDirWatch notifier (fromAbsDir build) giveSignal
+ -- Additionally, poll on regular intervals.
+ -- Inotify/Kqueue don't cover all cases, e.g. network filesystems.
+ ticker <- async $ forever $ threadDelay 3007000 >> giveSignal
+ let stopWatching = do
+ cancel ticker
+ removeDirWatch watch
+ -- Listen to the signal asynchronously,
+ -- and query the status when it fires.
+ -- If the status changed, fill in the update.
+ update <- newEmptyMVar
+ let query' = liftIO . withStoreLock store $ internalQuery store hash
+ loop = takeMVar signal >> query' >>= \case
+ Pending _ -> loop
+ Complete item -> tryPutMVar update $ Completed item
+ Missing () -> tryPutMVar update Failed
+ void $ async loop
+ -- Wait for the update asynchronously.
+ -- Stop watching when it arrives.
+ async $ takeMVar update <* stopWatching
+
+setRootDirWritable :: Path Abs Dir -> IO ()
+setRootDirWritable storeRoot =
+ setFileMode (fromAbsDir storeRoot) writableRootDirMode
+
+writableRootDirMode :: FileMode
+writableRootDirMode = writableDirMode
+
+setRootDirReadOnly :: Path Abs Dir -> IO ()
+setRootDirReadOnly storeRoot =
+ setFileMode (fromAbsDir storeRoot) readOnlyRootDirMode
+
+readOnlyRootDirMode :: FileMode
+readOnlyRootDirMode = writableDirMode `intersectFileModes` allButWritableMode
+
+withWritableStoreRoot :: Path Abs Dir -> IO a -> IO a
+withWritableStoreRoot storeRoot =
+ bracket_ (setRootDirWritable storeRoot) (setRootDirReadOnly storeRoot)
+
+withWritableStore :: ContentStore -> IO a -> IO a
+withWritableStore ContentStore {storeRoot} =
+ withWritableStoreRoot storeRoot
+
+setDirWritable :: Path Abs Dir -> IO ()
+setDirWritable fp = setFileMode (fromAbsDir fp) writableDirMode
+
+writableDirMode :: FileMode
+writableDirMode = foldl' unionFileModes nullFileMode
+ [ directoryMode, ownerModes
+ , groupReadMode, groupExecuteMode
+ , otherReadMode, otherExecuteMode
+ ]
+
+-- | Set write permissions on the given path.
+setWritable :: Path Abs t -> IO ()
+setWritable fp = do
+ mode <- fileMode <$> getFileStatus (toFilePath fp)
+ setFileMode (toFilePath fp) $ mode `unionFileModes` ownerWriteMode
+
+-- | Unset write permissions on the given path.
+unsetWritable :: Path Abs t -> IO ()
+unsetWritable fp = do
+ mode <- fileMode <$> getFileStatus (toFilePath fp)
+ setFileMode (toFilePath fp) $ mode `intersectFileModes` allButWritableMode
+
+allButWritableMode :: FileMode
+allButWritableMode = complement $ foldl' unionFileModes nullFileMode
+ [ownerWriteMode, groupWriteMode, otherWriteMode]
+
+-- | Set write permissions on all items in a directory tree recursively.
+setWritableRecursively :: Path Abs Dir -> IO ()
+setWritableRecursively = walkDir $ \dir _ files -> do
+ mapM_ setWritable files
+ setWritable dir
+ return $ WalkExclude []
+
+-- | Unset write permissions on all items in a directory tree recursively.
+unsetWritableRecursively :: Path Abs Dir -> IO ()
+unsetWritableRecursively = walkDir $ \dir _ files -> do
+ mapM_ unsetWritable files
+ unsetWritable dir
+ return $ WalkExclude []
+
+storeVersion :: Int
+storeVersion = 1
+
+-- | Initialize the database.
+initDb :: Path Abs Dir -> SQL.Connection -> IO ()
+initDb storeDir db = do
+ [[version]] <- SQL.query_ db "PRAGMA user_version"
+ if version == 0 then
+ SQL.execute_ db $
+ "PRAGMA user_version = " <> fromString (show storeVersion)
+ else
+ unless (version == storeVersion) $
+ throwIO $ IncompatibleStoreVersion storeDir version storeVersion
+ -- Aliases to items.
+ SQL.execute_ db
+ "CREATE TABLE IF NOT EXISTS\
+ \ aliases\
+ \ ( hash TEXT PRIMARY KEY\
+ \ , dest TEXT NOT NULL\
+ \ , name TEXT NOT NULL\
+ \ )"
+ -- Back-references from items @dest@ to hashes @hash@.
+ SQL.execute_ db
+ "CREATE TABLE IF NOT EXISTS\
+ \ backrefs\
+ \ ( hash TEXT PRIMARY KEY\
+ \ , dest TEXT NOT NULL\
+ \ )"
+ -- Inputs @input@ to hashes @hash@.
+ SQL.execute_ db
+ "CREATE TABLE IF NOT EXISTS\
+ \ inputs\
+ \ ( hash TEXT NOT NULL\
+ \ , input TEXT NOT NULL\
+ \ , UNIQUE (hash, input)\
+ \ )"
+ -- Arbitrary metadata on hashes.
+ SQL.execute_ db
+ "CREATE TABLE IF NOT EXISTS\
+ \ metadata\
+ \ ( hash TEXT NOT NULL\
+ \ , key TEXT NOT NULL\
+ \ , value TEXT\
+ \ , PRIMARY KEY(hash, key)\
+ \ )"
+
+-- | Adds a link between input hash and the output hash.
+--
+-- Assumes that the store is locked and writable.
+addBackReference :: ContentStore -> ContentHash -> Item -> IO ()
+addBackReference store inHash (Item outHash) =
+ SQL.executeNamed (storeDb store)
+ "INSERT OR REPLACE INTO\
+ \ backrefs (hash, dest)\
+ \ VALUES\
+ \ (:in, :out)"
+ [ ":in" SQL.:= inHash
+ , ":out" SQL.:= outHash
+ ]
diff --git a/src/Control/Funflow/ContentStore/Notify.hs b/src/Control/Funflow/ContentStore/Notify.hs
new file mode 100644
index 0000000..53e1676
--- /dev/null
+++ b/src/Control/Funflow/ContentStore/Notify.hs
@@ -0,0 +1,28 @@
+{-# LANGUAGE CPP #-}
+
+-- | Generic file change notifier library for unix-based systems.
+--
+-- This library abstracts over specific implementations for BSD and linux
+-- systems.
+--
+-- It provides facilities to watch specific directories for the following changes:
+-- - File moves
+-- - File deletion
+-- - Attribute changes.
+module Control.Funflow.ContentStore.Notify
+ ( Notifier
+ , initNotifier
+ , killNotifier
+
+ , Watch
+ , addDirWatch
+ , removeDirWatch
+ ) where
+
+#ifdef OS_Linux
+import Control.Funflow.ContentStore.Notify.Linux
+#else
+# ifdef OS_BSD
+import Control.Funflow.ContentStore.Notify.BSD
+# endif
+#endif
diff --git a/src/Control/Funflow/ContentStore/Notify/BSD.hs b/src/Control/Funflow/ContentStore/Notify/BSD.hs
new file mode 100644
index 0000000..61aaf7d
--- /dev/null
+++ b/src/Control/Funflow/ContentStore/Notify/BSD.hs
@@ -0,0 +1,78 @@
+{-# LANGUAGE LambdaCase #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+
+-- | Implementation of filesystem watch for BSD-based systems using KQueue.
+module Control.Funflow.ContentStore.Notify.BSD
+ ( Notifier
+ , initNotifier
+ , killNotifier
+
+ , Watch
+ , addDirWatch
+ , removeDirWatch
+ ) where
+
+import Control.Concurrent
+import Control.Concurrent.Async
+import Control.Exception.Safe
+import Control.Monad
+import Data.Typeable
+import Foreign.Ptr
+import System.KQueue
+import System.Posix.IO
+import System.Posix.Types
+
+type Notifier = KQueue
+
+data NotifierException
+ = RequiresThreadedRuntime
+ deriving (Show, Typeable)
+instance Exception NotifierException where
+ displayException = \case
+ -- XXX: Compile time check?
+ RequiresThreadedRuntime ->
+ "Threaded runtime required! Please rebuild with -threaded flag."
+
+initNotifier :: IO Notifier
+initNotifier = do
+ unless rtsSupportsBoundThreads $ throwIO RequiresThreadedRuntime
+ kqueue
+
+killNotifier :: Notifier -> IO ()
+killNotifier _ = return ()
+
+data Watch = Watch KQueue Fd (Async ())
+
+addDirWatch :: Notifier -> FilePath -> IO () -> IO Watch
+addDirWatch kq dir f = do
+ fd <- openFd dir ReadOnly Nothing defaultFileFlags
+ a <- async $ do
+ let event = KEvent
+ { ident = fromIntegral fd
+ , evfilter = EvfiltVnode
+ , flags = [EvAdd]
+ , fflags = [NoteDelete, NoteAttrib, NoteRename, NoteRevoke]
+ , data_ = 0
+ , udata = nullPtr
+ }
+ loop = do
+ chgs <- kevent kq [event] 1 Nothing
+ unless (null chgs) f
+ loop
+ loop `finally` closeFd fd
+ return $! Watch kq fd a
+
+removeDirWatch :: Watch -> IO ()
+removeDirWatch (Watch kq fd a) = do
+ closeFd fd
+ let event = KEvent
+ { ident = fromIntegral fd
+ , evfilter = EvfiltVnode
+ , flags = [EvDelete]
+ , fflags = []
+ , data_ = 0
+ , udata = nullPtr
+ }
+ void (kevent kq [event] 0 Nothing)
+ `catch` \(_ :: KQueueException) -> return ()
+ cancel a
diff --git a/src/Control/Funflow/ContentStore/Notify/Linux.hs b/src/Control/Funflow/ContentStore/Notify/Linux.hs
new file mode 100644
index 0000000..6411236
--- /dev/null
+++ b/src/Control/Funflow/ContentStore/Notify/Linux.hs
@@ -0,0 +1,51 @@
+{-# LANGUAGE LambdaCase #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+
+-- | Implementation of filesystem watching functionality for linux based on
+-- inotify.
+module Control.Funflow.ContentStore.Notify.Linux
+ ( Notifier
+ , initNotifier
+ , killNotifier
+
+ , Watch
+ , addDirWatch
+ , removeDirWatch
+ ) where
+
+import Control.Exception.Safe (catch)
+import System.INotify
+
+type Notifier = INotify
+
+initNotifier :: IO Notifier
+initNotifier = initINotify
+
+killNotifier :: Notifier -> IO ()
+killNotifier = killINotify
+
+type Watch = WatchDescriptor
+
+addDirWatch :: Notifier -> FilePath -> IO () -> IO Watch
+addDirWatch inotify dir f = addWatch inotify mask dir $ \case
+ Attributes True Nothing -> f
+ MovedSelf True -> f
+ DeletedSelf -> f
+ _ -> return ()
+ where
+ mask = [Attrib, MoveSelf, DeleteSelf, OnlyDir]
+
+removeDirWatch :: Watch -> IO ()
+removeDirWatch w =
+ -- When calling `addWatch` on a path that is already being watched,
+ -- inotify will not create a new watch, but amend the existing watch
+ -- and return the same watch descriptor.
+ -- Therefore, the watch might already have been removed at this point,
+ -- which will cause an 'IOError'.
+ -- Fortunately, all event handlers to a file are called at once.
+ -- So, that removing the watch here will not cause another handler
+ -- to miss out on the event.
+ -- Note, that this may change when adding different event handlers,
+ -- that remove the watch under different conditions.
+ removeWatch w
+ `catch` \(_::IOError) -> return ()
diff --git a/src/Control/Funflow/Diagram.hs b/src/Control/Funflow/Diagram.hs
new file mode 100644
index 0000000..9581911
--- /dev/null
+++ b/src/Control/Funflow/Diagram.hs
@@ -0,0 +1,59 @@
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE GADTs #-}
+{-# LANGUAGE InstanceSigs #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+-- | A Diagram is a representation of an arrow with labels.
+-- No computation is actually performed by a diagram, it just carries
+-- around a description.
+-- We explot the fact that in 7.10,
+-- everything is typeable to label the incoming and outgoing node types.
+module Control.Funflow.Diagram where
+
+import Control.Arrow
+import Control.Arrow.Free (ArrowError (..))
+import Control.Category
+import Data.Proxy (Proxy (..))
+import qualified Data.Text as T
+import Prelude hiding (id, (.))
+
+
+newtype NodeProperties = NodeProperties {
+ labels :: [T.Text]
+}
+
+emptyNodeProperties :: NodeProperties
+emptyNodeProperties = NodeProperties []
+
+data Diagram ex a b where
+ Node :: NodeProperties
+ -> Proxy a
+ -> Proxy b
+ -> Diagram ex a b
+ Seq :: Diagram ex a b -> Diagram ex b c -> Diagram ex a c
+ Par :: Diagram ex a b -> Diagram ex c d -> Diagram ex (a,c) (b,d)
+ Fanin :: Diagram ex a c -> Diagram ex b c -> Diagram ex (Either a b) c
+ Catch :: Diagram ex a b -> Diagram ex (a,ex) b -> Diagram ex a b
+
+instance Category (Diagram ex) where
+ id = Node emptyNodeProperties Proxy Proxy
+ (.) = flip Seq
+
+instance Arrow (Diagram ex) where
+ arr :: forall a b. (a -> b) -> Diagram ex a b
+ arr = const $ Node emptyNodeProperties (Proxy :: Proxy a) (Proxy :: Proxy b)
+ first f = Par f id
+ second f = Par id f
+ (***) = Par
+
+instance ArrowChoice (Diagram ex) where
+ f +++ g = (f >>> arr Left) ||| (g >>> arr Right)
+ f ||| g = Fanin f g
+
+instance ArrowError ex (Diagram ex) where
+ f `catch` g = Catch f g
+
+-- | Construct a labelled node
+node :: forall arr a b ex. Arrow arr => arr a b -> [T.Text] -> (Diagram ex) a b
+node _ lbls = Node props (Proxy :: Proxy a) (Proxy :: Proxy b)
+ where props = NodeProperties lbls
diff --git a/src/Control/Funflow/Exec/Simple.hs b/src/Control/Funflow/Exec/Simple.hs
new file mode 100644
index 0000000..e0b4ad0
--- /dev/null
+++ b/src/Control/Funflow/Exec/Simple.hs
@@ -0,0 +1,241 @@
+{-# LANGUAGE Arrows #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE GADTs #-}
+{-# LANGUAGE LambdaCase #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE QuasiQuotes #-}
+{-# LANGUAGE Rank2Types #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE TemplateHaskell #-}
+{-# LANGUAGE TypeOperators #-}
+
+-- | This module contains the means to execute a pipeline.
+--
+-- You should probably start with 'withSimpleLocalRunner' and 'runSimpleFlow'.
+-- The other functions in this module provide more flexible versions of
+-- 'runSimpleFlow'.
+module Control.Funflow.Exec.Simple
+ ( runFlow
+ , runFlowEx
+ , runSimpleFlow
+ , withSimpleLocalRunner
+ ) where
+
+import Control.Arrow (returnA)
+import Control.Arrow.Async
+import Control.Arrow.Free (type (~>), eval)
+import Control.Concurrent.Async (withAsync)
+import Control.Exception.Safe (Exception,
+ SomeException,
+ bracket,
+ onException,
+ throwM, try)
+import Control.Funflow.Base
+import Control.Funflow.ContentHashable
+import qualified Control.Funflow.ContentStore as CS
+import Control.Funflow.External
+import Control.Funflow.External.Coordinator
+import Control.Funflow.External.Coordinator.Memory
+import Control.Funflow.External.Executor (executeLoop)
+import Control.Monad.IO.Class (liftIO)
+import Control.Monad.Trans.Class (lift)
+import qualified Data.ByteString as BS
+import Data.Foldable (traverse_)
+import Data.Int (Int64)
+import Data.Monoid ((<>))
+import Data.Void
+import Katip
+import Path
+import System.IO (stderr)
+import System.Random (randomIO)
+
+-- | Simple evaulation of a flow
+runFlowEx :: forall c eff ex a b. (Coordinator c, Exception ex)
+ => c
+ -> Config c
+ -> CS.ContentStore
+ -> (eff ~> AsyncA (KatipContextT IO)) -- ^ Natural transformation from wrapped effects
+ -> Int -- ^ Flow configuration identity. This forms part of the caching
+ -- system and is used to disambiguate the same flow run in
+ -- multiple configurations.
+ -> Flow eff ex a b
+ -> a
+ -> KatipContextT IO b
+runFlowEx _ cfg store runWrapped confIdent flow input = do
+ hook <- initialise cfg
+ runAsyncA (eval (runFlow' hook) flow) input
+ where
+ simpleOutPath item = toFilePath
+ $ CS.itemPath store item </> [relfile|out|]
+ withStoreCache :: forall i o. Cacher i o
+ -> AsyncA (KatipContextT IO) i o -> AsyncA (KatipContextT IO) i o
+ withStoreCache NoCache f = f
+ withStoreCache c f = let
+ chashOf i = cacherKey c confIdent i
+ checkStore = AsyncA $ \chash ->
+ CS.constructOrWait store chash >>= \case
+ CS.Pending void -> absurd void
+ CS.Complete item -> do
+ bs <- liftIO . BS.readFile $ simpleOutPath item
+ return . Right . cacherReadValue c $ bs
+ CS.Missing fp -> return $ Left fp
+ writeStore = AsyncA $ \(chash, fp, res) ->
+ do
+ liftIO $ BS.writeFile (toFilePath $ fp </> [relfile|out|])
+ . cacherStoreValue c $ res
+ _ <- CS.markComplete store chash
+ return res
+ `onException`
+ CS.removeFailed store chash
+ in proc i -> do
+ let chash = chashOf i
+ mcontents <- checkStore -< chash
+ case mcontents of
+ Right contents -> returnA -< contents
+ Left fp -> do
+ res <- f -< i
+ writeStore -< (chash, fp, res)
+ writeMd :: forall i o. ContentHash
+ -> i
+ -> o
+ -> MDWriter i o
+ -> KatipContextT IO ()
+ writeMd _ _ _ Nothing = return ()
+ writeMd chash i o (Just writer) =
+ let kvs = writer i o
+ in traverse_ (uncurry $ CS.setMetadata store chash) kvs
+
+
+ runFlow' :: Hook c -> Flow' eff a1 b1 -> AsyncA (KatipContextT IO) a1 b1
+ runFlow' _ (Step props f) = withStoreCache (cache props)
+ . AsyncA $ \x -> do
+ let out = f x
+ case cache props of
+ NoCache -> return ()
+ Cache key _ _ -> writeMd (key confIdent x) x out $ mdpolicy props
+ return out
+ runFlow' _ (StepIO props f) = withStoreCache (cache props)
+ . liftAsyncA $ AsyncA f
+ runFlow' po (External props toTask) = AsyncA $ \x -> do
+ chash <- liftIO $ if (ep_impure props)
+ then do
+ salt <- randomIO :: IO Int64
+ contentHash (toTask x, salt)
+ else contentHash (toTask x)
+ CS.lookup store chash >>= \case
+ -- The item in question is already in the store. No need to submit a task.
+ CS.Complete item -> return item
+ -- The item is pending in the store. In this case, we should check whether
+ -- the coordinator knows about it
+ CS.Pending _ -> taskInfo po chash >>= \case
+ -- Something has gone wrong here. A task is marked as pending but the
+ -- coordinator does not know about it. Attempt to remove the pending
+ -- path and submit as normal.
+ UnknownTask -> do
+ CS.removeFailed store chash
+ writeMd chash x () $ ep_mdpolicy props
+ submitAndWait chash (TaskDescription chash (toTask x))
+ -- Task is already known to the coordinator. Most likely something is
+ -- running this task. Just wait for it.
+ KnownTask _ -> wait chash (TaskDescription chash (toTask x))
+ -- Nothing in the store. Submit and run.
+ CS.Missing _ -> do
+ writeMd chash x () $ ep_mdpolicy props
+ submitAndWait chash (TaskDescription chash (toTask x))
+ where
+ submitAndWait chash td = do
+ submitTask po td
+ wait chash td
+ wait chash td = do
+ KnownTask _ <- awaitTask po chash
+ CS.waitUntilComplete store chash >>= \case
+ Just item -> return item
+ Nothing -> do
+ ti <- taskInfo po chash
+ mbStdout <- CS.getMetadataFile store chash [relfile|stdout|]
+ mbStderr <- CS.getMetadataFile store chash [relfile|stderr|]
+ throwM $ ExternalTaskFailed td ti mbStdout mbStderr
+ runFlow' _ (PutInStore f) = AsyncA $ \x -> katipAddNamespace "putInStore" $ do
+ chash <- liftIO $ contentHash x
+ CS.constructOrWait store chash >>= \case
+ CS.Pending void -> absurd void
+ CS.Complete item -> return item
+ CS.Missing fp ->
+ do
+ liftIO $ f fp x
+ CS.markComplete store chash
+ `onException`
+ (do $(logTM) WarningS . ls $ "Exception in construction: removing " <> show chash
+ CS.removeFailed store chash
+ )
+ runFlow' _ (GetFromStore f) = AsyncA $ \case
+ CS.All item -> lift . f $ CS.itemPath store item
+ item CS.:</> path -> lift . f $ CS.itemPath store item </> path
+ runFlow' _ (InternalManipulateStore f) = AsyncA $ \i ->lift $ f store i
+ runFlow' _ (Wrapped props w) = withStoreCache (cache props)
+ $ runWrapped w
+
+-- | Run a flow in a logging context.
+runFlowLog :: forall c eff ex a b. (Coordinator c, Exception ex)
+ => c
+ -> Config c
+ -> CS.ContentStore
+ -> (eff ~> AsyncA (KatipContextT IO)) -- ^ Natural transformation from wrapped effects
+ -> Int -- ^ Flow configuration identity. This forms part of the caching
+ -- system and is used to disambiguate the same flow run in
+ -- multiple configurations.
+ -> Flow eff ex a b
+ -> a
+ -> KatipContextT IO (Either ex b)
+runFlowLog c cfg store runWrapped confIdent flow input =
+ try $ runFlowEx c cfg store runWrapped confIdent flow input
+
+-- | Run a flow, discarding all logging.
+runFlow :: forall c eff ex a b. (Coordinator c, Exception ex)
+ => c
+ -> Config c
+ -> CS.ContentStore
+ -> (eff ~> AsyncA IO) -- ^ Natural transformation from wrapped effects
+ -> Int -- ^ Flow configuration identity. This forms part of the caching
+ -- system and is used to disambiguate the same flow run in
+ -- multiple configurations.
+ -> Flow eff ex a b
+ -> a
+ -> IO (Either ex b)
+runFlow c cfg store runWrapped confIdent flow input = do
+ le <- initLogEnv "funflow" "production"
+ runKatipContextT le () "runFlow"
+ $ runFlowLog c cfg store (liftAsyncA . runWrapped) confIdent flow input
+
+-- | Run a simple flow. Logging will be sent to stderr
+runSimpleFlow :: forall c a b. (Coordinator c)
+ => c
+ -> Config c
+ -> CS.ContentStore
+ -> SimpleFlow a b
+ -> a
+ -> IO (Either SomeException b)
+runSimpleFlow c ccfg store flow input = do
+ handleScribe <- mkHandleScribe ColorIfTerminal stderr InfoS V2
+ let mkLogEnv = registerScribe "stderr" handleScribe defaultScribeSettings =<< initLogEnv "funflow" "production"
+ bracket mkLogEnv closeScribes $ \le -> do
+ let initialContext = ()
+ initialNamespace = "executeLoop"
+
+ runKatipContextT le initialContext initialNamespace
+ $ runFlowLog c ccfg store runNoEffect 12345 flow input
+
+-- | Create a full pipeline runner locally. This includes an executor for
+-- executing external tasks.
+-- This function is specialised to `SimpleFlow` since in cases where
+-- a custom term algebra is in use, we assume that probably a centralised
+-- coordinator and external runners may be desired as well.
+withSimpleLocalRunner :: Path Abs Dir -- ^ Path to content store
+ -> ((SimpleFlow a b -> a -> IO (Either SomeException b))
+ -> IO c)
+ -> IO c
+withSimpleLocalRunner storePath action =
+ CS.withStore storePath $ \store -> do
+ memHook <- createMemoryCoordinator
+ withAsync (executeLoop MemoryCoordinator memHook store) $ \_ ->
+ action $ runSimpleFlow MemoryCoordinator memHook store
diff --git a/src/Control/Funflow/External.hs b/src/Control/Funflow/External.hs
new file mode 100644
index 0000000..318713c
--- /dev/null
+++ b/src/Control/Funflow/External.hs
@@ -0,0 +1,175 @@
+{-# LANGUAGE DeriveGeneric #-}
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE GADTs #-}
+{-# LANGUAGE GeneralizedNewtypeDeriving #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE TemplateHaskell #-}
+-- | Definition of external tasks
+module Control.Funflow.External where
+
+import Control.Funflow.ContentHashable (ContentHash, ContentHashable, ExternallyAssuredDirectory (..),
+ ExternallyAssuredFile (..))
+import qualified Control.Funflow.ContentStore as CS
+import Control.Lens.TH
+import Data.Aeson (FromJSON, ToJSON)
+import Data.Semigroup
+import Data.Store (Store)
+import Data.String (IsString (..))
+import qualified Data.Text as T
+import GHC.Generics (Generic)
+import Path
+import System.Posix.Types (CGid, CUid)
+
+-- | Set of items which may be treated as an input path to an external task.
+data InputPath
+ -- | An item in the content store.
+ = IPItem CS.Item
+ -- | An external file whose contents are considered assured by the external
+ -- system.
+ | IPExternalFile ExternallyAssuredFile
+ -- | An external directory whose contents are considered assured by the
+ -- external system.
+ | IPExternalDir ExternallyAssuredDirectory
+ deriving (Generic, Show)
+
+instance ContentHashable IO InputPath
+instance FromJSON InputPath
+instance ToJSON InputPath
+instance Store InputPath
+
+-- | Component of a parameter
+data ParamField
+ = ParamText !T.Text
+ -- ^ Text component.
+ | ParamPath !InputPath
+ -- ^ Reference to a path to a content store item.
+ | ParamEnv !T.Text
+ -- ^ Reference to an environment variable.
+ | ParamUid
+ -- ^ Reference to the effective user ID of the executor.
+ | ParamGid
+ -- ^ Reference to the effective group ID of the executor.
+ | ParamOut
+ -- ^ Reference to the output path in the content store.
+ deriving (Generic, Show)
+
+instance ContentHashable IO ParamField
+instance FromJSON ParamField
+instance ToJSON ParamField
+instance Store ParamField
+
+-- | A parameter to an external task
+--
+-- The runtime values to external references, e.g. environment variables,
+-- should not significantly influence the result of the external task.
+-- In particular, the content hash will not depend on these runtime values.
+newtype Param = Param [ParamField]
+ deriving (Generic, Monoid, Semigroup, Show)
+
+instance IsString Param where
+ fromString s = Param [ParamText (fromString s)]
+
+instance ContentHashable IO Param
+instance FromJSON Param
+instance ToJSON Param
+instance Store Param
+
+-- | Converter of path components.
+data ConvParam f = ConvParam
+ { convPath :: CS.Item -> f (Path Abs Dir)
+ -- ^ Resolve a reference to a content store item.
+ , convEnv :: T.Text -> f T.Text
+ -- ^ Resolve an environment variable.
+ , convUid :: f CUid
+ -- ^ Resolve the effective user ID.
+ , convGid :: f CGid
+ -- ^ Resolve the effective group ID.
+ , convOut :: f (Path Abs Dir)
+ -- ^ Resolve the output path in the content store.
+ }
+
+paramFieldToText :: Applicative f
+ => ConvParam f -> ParamField -> f T.Text
+paramFieldToText _ (ParamText txt) = pure txt
+paramFieldToText c (ParamPath (IPItem item)) = T.pack . fromAbsDir <$> convPath c item
+paramFieldToText _ (ParamPath (IPExternalFile (ExternallyAssuredFile item)))
+ = pure . T.pack . fromAbsFile $ item
+paramFieldToText _ (ParamPath (IPExternalDir (ExternallyAssuredDirectory item)))
+ = pure . T.pack . fromAbsDir $ item
+paramFieldToText c (ParamEnv env) = convEnv c env
+paramFieldToText c ParamUid = T.pack . show <$> convUid c
+paramFieldToText c ParamGid = T.pack . show <$> convGid c
+paramFieldToText c ParamOut = T.pack . fromAbsDir <$> convOut c
+
+-- | Transform a parameter to text using the given converter.
+paramToText :: Applicative f
+ => ConvParam f -> Param -> f T.Text
+paramToText c (Param ps) = mconcat <$> traverse (paramFieldToText c) ps
+
+stringParam :: String -> Param
+stringParam str = Param [ParamText (T.pack str)]
+
+textParam :: T.Text -> Param
+textParam txt = Param [ParamText txt]
+
+-- | Reference to a path to either:
+-- - a content store item, or
+-- - an externally assured file/directory.
+pathParam :: InputPath -> Param
+pathParam item = Param [ParamPath item]
+
+-- | Reference to a path to a file or directory within a store item.
+contentParam :: CS.Content t -> Param
+contentParam (CS.All item) = pathParam $ IPItem item
+contentParam (item CS.:</> path) =
+ pathParam (IPItem item) <> stringParam (toFilePath path)
+
+-- | Reference an externally assured file
+externalFileParam :: ExternallyAssuredFile -> Param
+externalFileParam = pathParam . IPExternalFile
+
+-- | Reference an externally assured file
+externalDirectoryParam :: ExternallyAssuredDirectory -> Param
+externalDirectoryParam = pathParam . IPExternalDir
+
+-- | Reference to an environment variable.
+envParam :: T.Text -> Param
+envParam env = Param [ParamEnv env]
+
+-- | Reference to the effective user ID of the executor.
+uidParam :: Param
+uidParam = Param [ParamUid]
+
+-- | Reference to the effective group ID of the executor.
+gidParam :: Param
+gidParam = Param [ParamGid]
+
+-- | Reference to the output path in the content store.
+outParam :: Param
+outParam = Param [ParamOut]
+
+-- | A monomorphic description of an external task. This is basically just
+-- a command which can be run.
+data ExternalTask = ExternalTask {
+ _etCommand :: T.Text
+ , _etParams :: [Param]
+ -- | If this is set, then the process outputs on its stdout stream
+ -- rather than writing to a file. In this case, output will be
+ -- redirected into a file called 'out' in the output directory.
+ -- Otherwise, the task is assumed to write itself to files in its
+ -- working directory.
+ , _etWriteToStdOut :: Bool
+} deriving (Generic, Show)
+
+instance ContentHashable IO ExternalTask
+instance FromJSON ExternalTask
+instance ToJSON ExternalTask
+instance Store ExternalTask
+
+data TaskDescription = TaskDescription {
+ _tdOutput :: ContentHash
+ , _tdTask :: ExternalTask
+ } deriving (Generic, Show)
+
+makeLenses ''ExternalTask
+makeLenses ''TaskDescription
diff --git a/src/Control/Funflow/External/Coordinator.hs b/src/Control/Funflow/External/Coordinator.hs
new file mode 100644
index 0000000..5086e94
--- /dev/null
+++ b/src/Control/Funflow/External/Coordinator.hs
@@ -0,0 +1,178 @@
+{-# OPTIONS_GHC -fno-warn-orphans #-}
+{-# LANGUAGE GeneralizedNewtypeDeriving #-}
+{-# LANGUAGE LambdaCase #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE StrictData #-}
+{-# LANGUAGE TemplateHaskell #-}
+{-# LANGUAGE TypeFamilies #-}
+{-# LANGUAGE TypeFamilyDependencies #-}
+
+-- | A Funflow coordinator is used to distribute tasks amongst multiple
+-- executors. It provides a functionality to submit tasks, to fetch them for
+-- execution, and to check on their status.
+--
+-- There are multiple possible instantiations of the 'Coordinator' class.
+module Control.Funflow.External.Coordinator where
+
+import Control.Exception.Safe
+import Control.Funflow.ContentHashable (ContentHash)
+import Control.Funflow.External
+import Control.Lens
+import Control.Monad.IO.Class (MonadIO, liftIO)
+
+import Data.Monoid ((<>))
+import Data.Store (Store)
+import Data.Store.TH (makeStore)
+import Data.Typeable (Typeable)
+import Katip
+import Network.HostName
+import Path
+import System.Clock (TimeSpec)
+
+instance Store TimeSpec
+
+-- | Information about an executor capable of running tasks. Currently this
+-- is just a newtype wrapper around hostname.
+newtype Executor = Executor HostName
+ deriving (Show, Store)
+
+data TaskStatus =
+ -- | Task is in the queue and has not begun executing
+ Pending
+ | Running ExecutionInfo
+ | Completed ExecutionInfo
+ -- | Task has failed with failure count
+ | Failed ExecutionInfo Int
+ deriving Show
+
+data TaskInfo =
+ KnownTask TaskStatus
+ | UnknownTask
+ deriving Show
+
+data ExecutionInfo = ExecutionInfo {
+ _eiExecutor :: Executor
+ , _eiElapsed :: TimeSpec
+ } deriving Show
+
+data TaskError
+ = ExternalTaskFailed
+ TaskDescription
+ TaskInfo
+ (Maybe (Path Abs File))
+ (Maybe (Path Abs File))
+ deriving (Show, Typeable)
+instance Exception TaskError where
+ displayException (ExternalTaskFailed td ti mbStdout mbStderr) =
+ "External task failed to construct item '"
+ ++ show (_tdOutput td)
+ ++ "'. Task info: "
+ ++ show ti
+ ++ " stdout: "
+ ++ show mbStdout
+ ++ " stderr: "
+ ++ show mbStderr
+ ++ " Task: "
+ ++ show (_tdTask td)
+
+class Coordinator c where
+ type Config c
+ type Hook c = h | h -> c
+
+ -- | Perform any necessary initialisation to connect to the coordinator.
+ initialise :: MonadIO m => Config c -> m (Hook c)
+
+ -- | Submit a task to the task queue.
+ -- It is allowed to overwrite a known task.
+ submitTask :: MonadIO m => Hook c -> TaskDescription -> m ()
+
+ -- | View the size of the current task queue
+ queueSize :: MonadIO m => Hook c -> m Int
+
+ -- | Fetch information on the current task
+ taskInfo :: MonadIO m => Hook c -> ContentHash -> m TaskInfo
+
+ -- | Pop a task off of the queue for execution. The popped task should be
+ -- added to the execution queue
+ popTask :: MonadIO m => Hook c -> Executor
+ -> m (Maybe TaskDescription)
+
+ -- | Await task completion.
+ --
+ -- If the task is complete, this will return 'KnownTask Completed'.
+ -- If the task is failed, this will return 'KnownTask Failed'.
+ -- If the task is not known to the system, this will return 'UnknownTask'.
+ -- Otherwise (if the task is pending or running), this will block until
+ -- the task either completes or fails.
+ awaitTask :: MonadIO m => Hook c -> ContentHash -> m TaskInfo
+
+ -- | Update execution status for a running task.
+ -- This should error for a task which is not running.
+ updateTaskStatus :: MonadIO m => Hook c -> ContentHash -> TaskStatus -> m ()
+
+ -- | Remove all pending tasks from the queue.
+ dropTasks :: MonadIO m => Hook c -> m ()
+
+-- TH Splices
+
+makeLenses ''ExecutionInfo
+makeStore ''TaskStatus
+makeStore ''ExecutionInfo
+makeStore ''TaskInfo
+
+-- Derived functionality
+
+startTask :: (Coordinator c, MonadIO m)
+ => Hook c
+ -> m (Maybe TaskDescription)
+startTask h = liftIO $ do
+ executorInfo <- Executor <$> getHostName
+ popTask h executorInfo
+
+-- | Check if a task is currently 'in progress' - e.g.
+-- pending or running.
+isInProgress :: (Coordinator c, MonadIO m)
+ => Hook c
+ -> ContentHash
+ -> m Bool
+isInProgress h ch = do
+ ti <- taskInfo h ch
+ return $ case ti of
+ KnownTask Pending -> True
+ KnownTask (Running _) -> True
+ _ -> False
+
+-- | Pop a task off of the queue for execution. Passes the popped task to the
+-- given function for execution. If the function returns success ('Right'),
+-- then the task will be marked as completed in the given time. If the
+-- function returns failure ('Left'), then the task will be marked as
+-- failed. If the function raises an exception or is interrupted by an
+-- asynchronous exception, then the task will be placed back on the task
+-- queue and the exception propagated. Returns 'Nothing' if no task is
+-- available and @'Just' ()@ on task completion or regular failure.
+withPopTask :: (Coordinator c, MonadIO m, MonadMask m, KatipContext m)
+ => Hook c -> Executor
+ -> (TaskDescription -> m (TimeSpec, Either Int ()))
+ -> m (Maybe ())
+withPopTask hook executor f =
+ bracketOnError
+ (popTask hook executor)
+ (\case
+ Nothing -> return ()
+ Just td ->
+ update td Pending
+ `withException`
+ \e -> $(logTM) ErrorS $
+ "Failed to place task "
+ <> showLS (td ^. tdOutput)
+ <> " back on queue: "
+ <> ls (displayException (e :: SomeException))
+ )
+ (\case
+ Nothing -> return Nothing
+ Just td -> f td >>= \case
+ (t, Left ec) -> Just <$> update td (Failed (execInfo t) ec)
+ (t, Right ()) -> Just <$> update td (Completed (execInfo t)))
+ where
+ update td = updateTaskStatus hook (td ^. tdOutput)
+ execInfo = ExecutionInfo executor
diff --git a/src/Control/Funflow/External/Coordinator/Memory.hs b/src/Control/Funflow/External/Coordinator/Memory.hs
new file mode 100644
index 0000000..8b364cd
--- /dev/null
+++ b/src/Control/Funflow/External/Coordinator/Memory.hs
@@ -0,0 +1,90 @@
+{-# LANGUAGE EmptyDataDecls #-}
+{-# LANGUAGE TemplateHaskell #-}
+{-# LANGUAGE TypeFamilies #-}
+-- | In-memory co-ordinator for funflow. This module is not greatly useful
+-- except for testing purposes.
+module Control.Funflow.External.Coordinator.Memory where
+
+import Control.Concurrent (threadDelay)
+import Control.Concurrent.STM.TVar
+import Control.Funflow.ContentHashable (ContentHash)
+import Control.Funflow.External
+import Control.Funflow.External.Coordinator
+import Control.Lens
+import Control.Monad.IO.Class (liftIO)
+import Control.Monad.STM
+import Data.List (find)
+import qualified Data.Map.Strict as M
+import System.Clock (fromNanoSecs)
+
+data MemoryCoordinator = MemoryCoordinator
+
+data MemHook = MemHook {
+ _mhTaskQueue :: TVar [TaskDescription]
+ , _mhExecutionQueue :: TVar (M.Map ContentHash TaskStatus)
+ }
+
+makeLenses ''MemHook
+
+createMemoryCoordinator :: IO MemHook
+createMemoryCoordinator = liftIO . atomically $ do
+ taskQueue <- newTVar mempty
+ executionQueue <- newTVar mempty
+ return $ MemHook taskQueue executionQueue
+
+instance Coordinator MemoryCoordinator where
+ type Config MemoryCoordinator = MemHook
+ type Hook MemoryCoordinator = MemHook
+
+ initialise = return
+
+ submitTask mh td = liftIO $
+ atomically $
+ modifyTVar (mh ^. mhTaskQueue) (td : )
+
+ queueSize mh = liftIO $ do
+ queue <- atomically . readTVar $ mh ^. mhTaskQueue
+ return $ length queue
+
+ taskInfo mh tid = liftIO $ do
+ (eq, tq) <- atomically $ do
+ eq <- readTVar (mh ^. mhExecutionQueue)
+ tq <- readTVar (mh ^. mhTaskQueue)
+ return (eq, tq)
+ return $ case M.lookup tid eq of
+ Just ti -> KnownTask ti
+ Nothing -> case find ((==tid) . (^. tdOutput)) tq of
+ Just _ -> KnownTask Pending
+ Nothing -> UnknownTask
+
+ awaitTask mh tid = liftIO $ do
+ ti <- taskInfo mh tid
+ case ti of
+ UnknownTask -> return UnknownTask
+ info@(KnownTask (Completed _)) -> return info
+ info@(KnownTask (Failed _ _)) -> return info
+ _ -> do
+ threadDelay 1000000
+ awaitTask mh tid
+
+ popTask mh executor = let
+ executionInfo = ExecutionInfo executor (fromNanoSecs 0)
+ taskStatus = Running executionInfo
+ in liftIO . atomically $ do
+ tq <- readTVar (mh ^. mhTaskQueue)
+ case reverse tq of
+ [] -> return Nothing
+ (td:xs) -> do
+ writeTVar (mh ^. mhTaskQueue) xs
+ modifyTVar (mh ^. mhExecutionQueue) $ \eq ->
+ M.insert (td ^. tdOutput) taskStatus eq
+ return $ Just td
+
+ updateTaskStatus mh tid stat = liftIO . atomically $
+ modifyTVar (mh ^. mhExecutionQueue) $ \eq ->
+ if M.member tid eq
+ then M.insert tid stat eq
+ else error "Cannot update task status: task not executing."
+
+ dropTasks mh = liftIO . atomically $
+ modifyTVar (mh ^. mhTaskQueue) $ const []
diff --git a/src/Control/Funflow/External/Coordinator/Redis.hs b/src/Control/Funflow/External/Coordinator/Redis.hs
new file mode 100644
index 0000000..af4de27
--- /dev/null
+++ b/src/Control/Funflow/External/Coordinator/Redis.hs
@@ -0,0 +1,112 @@
+{-# LANGUAGE GADTs #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE TypeFamilies #-}
+
+-- | Redis-based co-ordinator for Funflow.
+--
+-- There are two co-ordinators defined in this module. They differ in whether
+-- they open a new connection to Redis or re-use an existing one. Other than
+-- that they behave identically.
+module Control.Funflow.External.Coordinator.Redis
+ ( Redis (..)
+ , RedisPreconnected (..)
+ ) where
+
+import qualified Control.Funflow.ContentHashable as CHash
+import Control.Funflow.External
+import Control.Funflow.External.Coordinator
+import Control.Lens
+import Control.Monad.Except
+import Control.Monad.Fix (fix)
+import Data.Store
+import qualified Database.Redis as R
+import GHC.Conc
+import System.Clock (fromNanoSecs)
+
+data Redis = Redis
+
+instance Coordinator Redis where
+ type Config Redis = R.ConnectInfo
+ type Hook Redis = R.Connection
+
+ -- | Create a redis connection
+ initialise = liftIO . R.connect
+
+ submitTask conn td =
+ liftIO $
+ R.runRedis conn $ do
+ void $ R.rpush "jobs_queue" [encode (jid, td ^. tdTask)]
+ void $ R.set jid (encode Pending)
+ where
+ jid = CHash.toBytes $ td ^. tdOutput
+
+ queueSize conn = liftIO $ R.runRedis conn $
+ fromIntegral . either (const 0) id <$> R.llen "jobs_queue"
+
+ taskInfo conn chash = liftIO $
+ R.runRedis conn $ do
+ eoutput <- R.get $ CHash.toBytes chash
+ case eoutput of
+ Left r -> fail $ "Redis fail: " ++ show r
+ Right Nothing -> return UnknownTask
+ Right (Just bs) -> case decode bs of
+ Left r -> fail $ "Decode fail: " ++ show r
+ Right ti -> return $ KnownTask ti
+
+ awaitTask conn chash = liftIO . R.runRedis conn $
+ fix $ \waitGet -> do
+ ti <- taskInfo conn chash
+ case ti of
+ UnknownTask -> return UnknownTask
+ info@(KnownTask (Completed _)) -> return info
+ info@(KnownTask (Failed _ _)) -> return info
+ _ -> do
+ liftIO $ threadDelay 500000
+ waitGet
+
+ updateTaskStatus conn chash status = liftIO $
+ R.runRedis conn
+ $ void $ R.set (CHash.toBytes chash) (encode status)
+
+ popTask conn executor = liftIO . R.runRedis conn $ do
+ job <- R.brpoplpush "jobs_queue" "job_running" 1
+ case job of
+ Left r -> fail $ "redis fail " ++ show r
+ Right Nothing -> return Nothing
+ Right (Just bs) -> case decode bs of
+ Left r -> fail $ "Decode fail: " ++ show r
+ Right (chashbytes, task) ->
+ case CHash.fromBytes chashbytes of
+ Just chash -> do
+ let status = Running $ ExecutionInfo executor (fromNanoSecs 0)
+ _ <- R.set chashbytes (encode status)
+ return . Just $ TaskDescription chash task
+ Nothing -> fail "Cannot decode content hash."
+
+ dropTasks conn = liftIO . R.runRedis conn $ do
+ job <- R.del ["jobs_queue"]
+ case job of
+ Left r -> fail $ "redis fail " ++ show r
+ Right _ -> return ()
+
+
+data RedisPreconnected = RedisPreconnected
+
+newtype Preconnected = Preconnected R.Connection
+
+-- | Allow a preestablished redis connection to be used.
+instance Coordinator RedisPreconnected where
+ type Config RedisPreconnected = R.Connection
+ type Hook RedisPreconnected = Preconnected
+
+ initialise = return . Preconnected
+
+ submitTask (Preconnected conn) = submitTask conn
+ queueSize (Preconnected conn) = queueSize conn
+ taskInfo (Preconnected conn) = taskInfo conn
+ awaitTask (Preconnected conn) = awaitTask conn
+ updateTaskStatus (Preconnected conn) = updateTaskStatus conn
+ popTask (Preconnected conn) = popTask conn
+ dropTasks (Preconnected conn) = dropTasks conn
diff --git a/src/Control/Funflow/External/Coordinator/SQLite.hs b/src/Control/Funflow/External/Coordinator/SQLite.hs
new file mode 100644
index 0000000..ef311dd
--- /dev/null
+++ b/src/Control/Funflow/External/Coordinator/SQLite.hs
@@ -0,0 +1,318 @@
+{-# LANGUAGE LambdaCase #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE TemplateHaskell #-}
+{-# LANGUAGE TypeFamilies #-}
+{-# LANGUAGE QuasiQuotes #-}
+
+-- | SQLLite co-ordinator for Funflow.
+--
+-- This co-ordinator effectively uses the shared filesystem as a tool for task
+-- distribution and sequencing. This means that it can control a distributed
+-- funflow task without needing any additional processes running.
+module Control.Funflow.External.Coordinator.SQLite
+ ( SQLite (..)
+ ) where
+
+import Control.Concurrent (threadDelay)
+import Control.Exception.Safe
+import Control.Funflow.ContentHashable
+import Control.Funflow.External
+import Control.Funflow.External.Coordinator
+import Control.Funflow.Lock
+import Control.Lens
+import Control.Monad.IO.Class
+import qualified Data.Aeson as Json
+import qualified Data.ByteString.Char8 as C8
+import qualified Data.Text as T
+import Data.Typeable (Typeable)
+import qualified Database.SQLite.Simple as SQL
+import qualified Database.SQLite.Simple.FromField as SQL
+import qualified Database.SQLite.Simple.Ok as SQL
+import qualified Database.SQLite.Simple.ToField as SQL
+import Path
+import Path.IO
+import System.Clock
+
+-- | SQLite coordinator tag.
+data SQLite = SQLite
+
+-- | SQLite coordinator hook.
+data SQLiteHook = SQLiteHook
+ { _sqlConn :: SQL.Connection
+ , _sqlLock :: Lock
+ }
+makeLenses ''SQLiteHook
+
+-- | Take the lock and run the given action on the SQLite connection.
+withSQLite :: SQLiteHook -> (SQL.Connection -> IO a) -> IO a
+withSQLite hook action = withLock (hook^.sqlLock) $ action (hook^.sqlConn)
+
+-- | Enumeration of possible 'TaskStatus' cases for SQLite status column.
+data SqlTaskStatus
+ = SqlPending
+ | SqlRunning
+ | SqlCompleted
+ | SqlFailed
+ deriving Enum
+instance SQL.FromField SqlTaskStatus where
+ fromField field = do
+ n <- SQL.fromField field
+ pure $! toEnum n
+instance SQL.ToField SqlTaskStatus where
+ toField = SQL.toField . fromEnum
+
+-- | Wrapper around 'Executor' for SQLite serialization.
+newtype SqlExecutor = SqlExecutor { getSqlExecutor :: Executor }
+instance SQL.FromField SqlExecutor where
+ fromField field = SqlExecutor . Executor <$> SQL.fromField field
+instance SQL.ToField SqlExecutor where
+ toField (SqlExecutor (Executor host)) = SQL.toField host
+
+-- | SQLite task info query result.
+data SqlTaskInfo = SqlTaskInfo
+ { _stiStatus :: SqlTaskStatus
+ , _stiExecutor :: Maybe SqlExecutor
+ , _stiElapsed :: Maybe Integer
+ , _stiExitCode :: Maybe Int
+ }
+makeLenses '' SqlTaskInfo
+instance SQL.FromRow SqlTaskInfo where
+ fromRow = SqlTaskInfo
+ <$> SQL.field
+ <*> SQL.field
+ <*> SQL.field
+ <*> SQL.field
+
+-- | Wrapper around 'ExternalTask' for SQLite serialization.
+newtype SqlExternal = SqlExternal { getSqlExternal :: ExternalTask }
+instance SQL.FromField SqlExternal where
+ fromField field = do
+ bs <- SQL.fromField field
+ case Json.eitherDecode bs of
+ Left err -> SQL.Errors [toException $ DecodingError "task" err]
+ Right x -> pure $! SqlExternal x
+instance SQL.ToField SqlExternal where
+ toField = SQL.toField . Json.encode . getSqlExternal
+
+-- | Wrapper around 'TaskDescription' for SQLite serialization.
+newtype SqlTask = SqlTask TaskDescription
+instance SQL.FromRow SqlTask where
+ fromRow = do
+ output <- SQL.field
+ SqlExternal task <- SQL.field
+ pure $! SqlTask $! TaskDescription
+ { _tdOutput = output
+ , _tdTask = task
+ }
+
+-- | Errors that can occur when interacting with the SQLite coordinator.
+data SQLiteCoordinatorError
+ -- | @MissingDBTaskEntry output field@
+ -- The task database entry is missing a field.
+ = MissingDBTaskEntry ContentHash T.Text
+ -- | @DecodingError field error@
+ -- Failed to decode the field.
+ | DecodingError T.Text String
+ -- | @NonRunningTask output@
+ -- The task is not running.
+ | NonRunningTask ContentHash
+ -- | @IllegalStatusUpdate output status@
+ -- Cannot update the status of the task.
+ | IllegalStatusUpdate ContentHash TaskStatus
+ deriving (Show, Typeable)
+instance Exception SQLiteCoordinatorError where
+ displayException (MissingDBTaskEntry output field) =
+ "Missing field in SQLite task entry '"
+ ++ T.unpack field
+ ++ "' for output "
+ ++ C8.unpack (encodeHash output)
+ displayException (DecodingError field err) =
+ "Failed to decode field '"
+ ++ T.unpack field
+ ++ "': "
+ ++ err
+ displayException (NonRunningTask output) =
+ "Task was not running when expected: "
+ ++ C8.unpack (encodeHash output)
+ displayException (IllegalStatusUpdate output status) =
+ "Illegal status update for "
+ ++ C8.unpack (encodeHash output)
+ ++ ": "
+ ++ show status
+
+-- | Helper for @NULL@ valued data-base fields.
+--
+-- Throws 'MissingDBTaskEntry' on 'Nothing', otherwise returns the value.
+fromMaybeField :: MonadIO m => ContentHash -> T.Text -> Maybe a -> m a
+fromMaybeField output f = \case
+ Nothing -> liftIO $ throwIO $ MissingDBTaskEntry output f
+ Just x -> pure x
+
+-- | Unlifted version of 'taskInfo'.
+taskInfo' :: SQLiteHook -> ContentHash -> IO TaskInfo
+taskInfo' hook output = do
+ r <- withSQLite hook $ \conn -> SQL.queryNamed conn
+ "SELECT status, executor, elapsed, exit_code FROM tasks\
+ \ WHERE\
+ \ output = :output"
+ [ ":output" SQL.:= output ]
+ case r of
+ [] -> pure UnknownTask
+ (ti:_) -> case ti^.stiStatus of
+ SqlPending -> pure $! KnownTask Pending
+ SqlRunning -> do
+ executor <- fromMaybeField output "executor" (ti^.stiExecutor)
+ pure $! KnownTask $! Running ExecutionInfo
+ { _eiExecutor = getSqlExecutor executor
+ , _eiElapsed = fromNanoSecs 0
+ }
+ SqlCompleted -> do
+ executor <- fromMaybeField output "executor" (ti^.stiExecutor)
+ elapsed <- fromMaybeField output "elapsed" (ti^.stiElapsed)
+ pure $! KnownTask $! Completed ExecutionInfo
+ { _eiExecutor = getSqlExecutor executor
+ , _eiElapsed = fromNanoSecs elapsed
+ }
+ SqlFailed -> do
+ executor <- fromMaybeField output "executor" (ti^.stiExecutor)
+ elapsed <- fromMaybeField output "elapsed" (ti^.stiElapsed)
+ exitCode <- fromMaybeField output "exit_code" (ti^.stiExitCode)
+ pure $! KnownTask $! Failed
+ ExecutionInfo
+ { _eiExecutor = getSqlExecutor executor
+ , _eiElapsed = fromNanoSecs elapsed
+ }
+ exitCode
+
+instance Coordinator SQLite where
+ type Config SQLite = Path Abs Dir
+ type Hook SQLite = SQLiteHook
+
+ initialise dir = liftIO $ do
+ createDirIfMissing True dir
+ lock <- openLock (dir </> [reldir|lock|])
+ withLock lock $ do
+ conn <- SQL.open $ fromAbsFile (dir </> [relfile|db.sqlite|])
+ SQL.execute_ conn
+ "CREATE TABLE IF NOT EXISTS\
+ \ tasks\
+ \ ( output TEXT PRIMARY KEY\
+ \ , status INT NOT NULL\
+ \ , executor TEXT\
+ \ , elapsed INT\
+ \ , exit_code INT\
+ \ , task TEXT NOT NULL\
+ \ )"
+ return SQLiteHook
+ { _sqlConn = conn
+ , _sqlLock = lock
+ }
+
+ submitTask hook td = liftIO $
+ withSQLite hook $ \conn -> SQL.executeNamed conn
+ "INSERT OR REPLACE INTO\
+ \ tasks (output, status, task)\
+ \ VALUES\
+ \ (:output, :status, :task)"
+ [ ":output" SQL.:= (td^.tdOutput)
+ , ":status" SQL.:= SqlPending
+ , ":task" SQL.:= SqlExternal (td^.tdTask)
+ ]
+
+ queueSize hook = liftIO $ do
+ [[n]] <- withSQLite hook $ \conn -> SQL.queryNamed conn
+ "SELECT COUNT(*) FROM tasks WHERE status = :pending"
+ [ ":pending" SQL.:= SqlPending ]
+ return n
+
+ taskInfo hook output = liftIO $
+ taskInfo' hook output
+
+ popTask hook executor = liftIO $
+ withSQLite hook $ \conn -> SQL.withTransaction conn $ do
+ r <- SQL.queryNamed conn
+ "SELECT output, task FROM tasks\
+ \ WHERE\
+ \ status = :pending\
+ \ LIMIT 1"
+ [ ":pending" SQL.:= SqlPending ]
+ case r of
+ [] -> pure Nothing
+ (SqlTask td:_) -> do
+ SQL.executeNamed conn
+ "UPDATE tasks\
+ \ SET\
+ \ status = :status,\
+ \ executor = :executor\
+ \ WHERE\
+ \ output = :output"
+ [ ":status" SQL.:= SqlRunning
+ , ":executor" SQL.:= SqlExecutor executor
+ , ":output" SQL.:= td^.tdOutput
+ ]
+ pure $! Just td
+
+ awaitTask hook output = liftIO $ loop
+ where
+ -- XXX: SQLite has callback mechanisms built-in (e.g. @sqlite3_commit_hook@).
+ -- Unfortunately, @direct-sqlite@, which @sqlite-simple@ builds on top of,
+ -- doesn't expose this functionality at the moment.
+ loop = taskInfo' hook output >>= \case
+ KnownTask Pending -> sleep >> loop
+ KnownTask (Running _) -> sleep >> loop
+ ti -> pure ti
+ sleep = liftIO $ threadDelay oneSeconds
+ oneSeconds = 1000000
+
+ updateTaskStatus hook output ts = liftIO $
+ withSQLite hook $ \conn -> SQL.withTransaction conn $ do
+ r <- SQL.queryNamed conn
+ "SELECT status FROM tasks\
+ \ WHERE\
+ \ output = :output"
+ [ ":output" SQL.:= output ]
+ case r of
+ [SqlRunning]:_ -> case ts of
+ Completed ei -> SQL.executeNamed conn
+ "UPDATE tasks\
+ \ SET\
+ \ status = :completed,\
+ \ elapsed = :elapsed\
+ \ WHERE\
+ \ output = :output"
+ [ ":completed" SQL.:= SqlCompleted
+ , ":elapsed" SQL.:= toNanoSecs (ei^.eiElapsed)
+ , ":output" SQL.:= output
+ ]
+ Failed ei exitCode -> SQL.executeNamed conn
+ "UPDATE tasks\
+ \ SET\
+ \ status = :failed,\
+ \ elapsed = :elapsed,\
+ \ exit_code = :exit_code\
+ \ WHERE\
+ \ output = :output"
+ [ ":failed" SQL.:= SqlFailed
+ , ":elapsed" SQL.:= toNanoSecs (ei^.eiElapsed)
+ , ":exit_code" SQL.:= exitCode
+ , ":output" SQL.:= output
+ ]
+ Pending -> SQL.executeNamed conn
+ "UPDATE tasks\
+ \ SET\
+ \ status = :pending\
+ \ WHERE\
+ \ output = :output"
+ [ ":pending" SQL.:= SqlPending
+ , ":output" SQL.:= output
+ ]
+ Running _ -> throwIO $ IllegalStatusUpdate output ts
+ _ -> throwIO $ NonRunningTask output
+
+ dropTasks hook = liftIO $
+ withSQLite hook $ \conn ->
+ SQL.executeNamed conn
+ "DELETE FROM tasks\
+ \ WHERE\
+ \ status = :pending"
+ [ ":pending" SQL.:= SqlPending ]
diff --git a/src/Control/Funflow/External/Docker.hs b/src/Control/Funflow/External/Docker.hs
new file mode 100644
index 0000000..5631b76
--- /dev/null
+++ b/src/Control/Funflow/External/Docker.hs
@@ -0,0 +1,69 @@
+{-# LANGUAGE DeriveGeneric #-}
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE OverloadedStrings #-}
+
+-- | Module supporting the use of docker containers as external tasks.
+--
+-- In general, an external task can be any command. This module just makes it
+-- easier to specify certain tasks which will run inside docker containers. It
+-- handles constructing the call to docker, mounting input and output
+-- directories, and specifying the docker image and version to use.
+module Control.Funflow.External.Docker where
+
+import Control.Funflow.ContentHashable
+import Control.Funflow.External
+import Data.Map.Strict (Map)
+import qualified Data.Map.Strict as Map
+import Data.Semigroup ((<>))
+import qualified Data.Text as T
+import GHC.Generics (Generic)
+import System.FilePath
+
+data Bind
+ -- | Single input, will get mounted to @/input@ on the image.
+ = SingleInput InputPath
+ -- | Multiple inputs, each gets mouted into a subdirectory under
+ -- @/input@ as described by the given mapping.
+ | MultiInput (Map FilePath InputPath)
+ deriving Generic
+
+instance ContentHashable IO Bind
+
+data Config = Config
+ { image :: T.Text
+ , optImageID :: Maybe T.Text
+ , input :: Bind
+ , command :: FilePath
+ , args :: [T.Text]
+ } deriving Generic
+
+instance ContentHashable IO Config
+
+toExternal :: Config -> ExternalTask
+toExternal cfg = ExternalTask
+ -- XXX: Allow to configure the path to the docker executable.
+ { _etCommand = "docker"
+ , _etParams =
+ [ "run"
+ , "--user=" <> uidParam
+ ] ++ mounts ++
+ [ imageArg
+ , stringParam (command cfg)
+ ] ++ map textParam (args cfg)
+ , _etWriteToStdOut = False
+ }
+ where
+ mounts = outputMount : inputMounts
+ mount src dst =
+ "--volume=" <> pathParam src <> ":" <> stringParam dst
+ outputMount = "--volume=" <> outParam <> ":/output"
+ inputMounts = case input cfg of
+ SingleInput chash -> [ mount chash "/input" ]
+ MultiInput cmap ->
+ [ mount chash ("/input" </> dest)
+ | (dest, chash) <- Map.toList cmap
+ ]
+ imageArg = textParam $ case optImageID cfg of
+ Nothing -> image cfg
+ Just id' -> image cfg <> ":" <> id'
diff --git a/src/Control/Funflow/External/Executor.hs b/src/Control/Funflow/External/Executor.hs
new file mode 100644
index 0000000..a74d6ad
--- /dev/null
+++ b/src/Control/Funflow/External/Executor.hs
@@ -0,0 +1,239 @@
+{-# LANGUAGE LambdaCase #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE QuasiQuotes #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE TemplateHaskell #-}
+-- | Executor for external tasks.
+--
+-- An executor will poll for tasks on the co-ordinator, mark them as in
+-- progress and then execute them.
+--
+-- You probably want to start with 'executeLoop'.
+module Control.Funflow.External.Executor where
+
+import Control.Concurrent (threadDelay)
+import Control.Concurrent.MVar
+import Control.Exception.Safe
+import Control.Concurrent.Async
+import qualified Control.Funflow.ContentStore as CS
+import Control.Funflow.External
+import Control.Funflow.External.Coordinator
+import Control.Lens
+import Control.Monad (forever, mzero, unless,
+ when)
+import Control.Monad.Trans (lift)
+import Control.Monad.Trans.Maybe
+import qualified Data.Aeson as Json
+import qualified Data.ByteString as BS
+import Data.Maybe (isJust)
+import Data.Monoid ((<>))
+import qualified Data.Text as T
+import Katip as K
+import Network.HostName
+import Path
+import Path.IO
+import System.Clock
+import System.Exit (ExitCode (..))
+import System.IO (Handle, IOMode (..),
+ openFile, stderr, stdout)
+import System.Posix.Env (getEnv)
+import System.Posix.User
+import System.Process
+
+data ExecutionResult =
+ -- | The result already exists in the store and there is no need
+ -- to execute. This is also returned if the job is already running
+ -- elsewhere.
+ Cached
+ -- | The computation is already running elsewhere. This is probably
+ -- indicative of a bug, because the coordinator should only allow one
+ -- instance of a task to be running at any time.
+ | AlreadyRunning
+ -- | Execution completed successfully after a certain amount of time.
+ | Success TimeSpec
+ -- | Execution failed with the following exit code.
+ -- TODO where should logs go?
+ | Failure TimeSpec Int
+ -- | The executor itself failed to execute the external task.
+ -- E.g. because the executable was not found.
+ | ExecutorFailure IOException
+
+-- | Execute an individual task.
+execute :: CS.ContentStore -> TaskDescription -> KatipContextT IO ExecutionResult
+execute store td = logError $ do
+ status <- CS.withConstructIfMissing store (td ^. tdOutput) $ \fp -> do
+ (fpOut, hOut) <- lift $
+ CS.createMetadataFile store (td ^. tdOutput) [relfile|stdout|]
+ (fpErr, hErr) <- lift $
+ CS.createMetadataFile store (td ^. tdOutput) [relfile|stderr|]
+ let
+ withFollowOutput
+ | td ^. tdTask . etWriteToStdOut
+ = withFollowFile fpErr stderr
+ | otherwise
+ = withFollowFile fpErr stderr
+ . withFollowFile fpOut stdout
+ cmd = T.unpack $ td ^. tdTask . etCommand
+ procSpec params = (proc cmd $ T.unpack <$> params)
+ { cwd = Just (fromAbsDir fp)
+ , close_fds = True
+ , std_err = UseHandle hErr
+ , std_out = UseHandle hOut
+ }
+ convParam = ConvParam
+ { convPath = pure . CS.itemPath store
+ , convEnv = \e -> T.pack <$> MaybeT (getEnv $ T.unpack e)
+ , convUid = lift getEffectiveUserID
+ , convGid = lift getEffectiveGroupID
+ , convOut = pure fp
+ }
+ mbParams <- lift $ runMaybeT $
+ traverse (paramToText convParam) (td ^. tdTask . etParams)
+ params <- case mbParams of
+ Nothing -> fail "A parameter was not ready"
+ Just params -> return params
+
+ let
+ inputItems :: [CS.Item]
+ inputItems = do
+ Param fields <- td ^. tdTask . etParams
+ ParamPath inputPath <- fields
+ case inputPath of
+ IPItem item -> pure item
+ -- XXX: Store these references as well.
+ IPExternalFile _ -> mzero
+ IPExternalDir _ -> mzero
+ CS.setInputs store (td ^. tdOutput) inputItems
+ CS.setMetadata store (td ^. tdOutput)
+ ("external-task"::T.Text)
+ (Json.encode (td ^. tdTask))
+
+ start <- lift $ getTime Monotonic
+ let theProc = procSpec params
+ katipAddNamespace "process" . katipAddContext (sl "processId" $ show theProc) $ do
+ $(logTM) InfoS "Executing"
+ res <- lift $ tryIO $ withCreateProcess theProc $ \_ _ _ ph ->
+ -- Error output should be displayed on our stderr stream
+ withFollowOutput $ do
+ exitCode <- waitForProcess ph
+ end <- getTime Monotonic
+ case exitCode of
+ ExitSuccess -> do
+ when (td ^. tdTask . etWriteToStdOut) $
+ copyFile fpOut (fp </> [relfile|out|])
+ return $ Right (diffTimeSpec start end)
+ ExitFailure i ->
+ return $ Left (diffTimeSpec start end, i)
+ case res of
+ -- execution was successful
+ Right (Right r) -> return $ Right r
+ -- execution failed
+ Right (Left e) -> return $ Left (Right e)
+ -- executor itself failed
+ Left e -> return $ Left (Left e)
+ case status of
+ CS.Missing (Left e) -> return (ExecutorFailure e)
+ CS.Missing (Right (t, ec)) -> return (Failure t ec)
+ CS.Pending () -> return AlreadyRunning
+ CS.Complete (Nothing, _) -> return Cached
+ CS.Complete (Just t, _) -> return (Success t)
+ where
+ logError = flip withException $ \(e::SomeException) ->
+ $(logTM) ErrorS . ls $ displayException e
+
+-- | Execute tasks forever
+executeLoop :: forall c. Coordinator c
+ => c
+ -> Config c
+ -> CS.ContentStore
+ -> IO ()
+executeLoop coord cfg store =
+ executeLoopWithScribe coord cfg store =<<
+ mkHandleScribe ColorIfTerminal stdout InfoS V2
+
+-- | Same as 'executeLoop', but allows specifying a custom 'Scribe' for logging
+executeLoopWithScribe :: forall c. Coordinator c
+ => c
+ -> Config c
+ -> CS.ContentStore
+ -> Scribe
+ -> IO ()
+executeLoopWithScribe _ cfg store handleScribe = do
+ let mkLogEnv = registerScribe "stdout" handleScribe defaultScribeSettings =<< initLogEnv "FFExecutorD" "production"
+ bracket mkLogEnv closeScribes $ \le -> do
+ let initialContext = ()
+ initialNamespace = "executeLoop"
+
+ runKatipContextT le initialContext initialNamespace $ do
+ $(logTM) InfoS "Initialising connection to coordinator."
+ hook :: Hook c <- lift $ initialise cfg
+ executor <- lift $ Executor <$> getHostName
+
+ let -- Known failures that do not affect the executors ability
+ -- to execute further tasks will be logged and ignored.
+ handleFailures = handle $ \(e::CS.StoreError) ->
+ -- Certain store errors can occur if an item is forcibly removed
+ -- while the executor is constructing it or picked up a
+ -- corresponding outdated task from the queue.
+ -- XXX: The store should distinguish between recoverable
+ -- and unrecoverable errors.
+ $(logTM) WarningS . ls $ displayException e
+
+ forever $ handleFailures $ do
+ $(logTM) DebugS "Awaiting task from coordinator."
+ mb <- withPopTask hook executor $ \task ->
+ katipAddContext (sl "task" $ task ^. tdOutput) $ do
+ $(logTM) DebugS "Checking task"
+ res <- execute store task
+ case res of
+ Cached -> do
+ $(logTM) InfoS "Task was cached"
+ return (0, Right ())
+ Success t -> do
+ $(logTM) InfoS "Task completed successfully"
+ return (t, Right ())
+ Failure t i -> do
+ $(logTM) WarningS "Task failed"
+ return (t, Left i)
+ ExecutorFailure e -> do
+ $(logTM) ErrorS $ "Executor failed: " <> ls (displayException e)
+ return (0, Left 2)
+ AlreadyRunning -> do
+ -- XXX:
+ -- This should not happen and indicates a programming error
+ -- or invalid state.
+ -- We do not want to just put the task back on the queue,
+ -- as it would cause a loop.
+ -- We do not want to just mark the task done, as a potential
+ -- later completion of the already running external task
+ -- would to mark it as done then.
+ -- We cannot mark it as ongoing, as we don't have information
+ -- on the executor where the task is already running.
+ $(logTM) ErrorS $
+ "Received an already running task from the coordinator "
+ <> showLS (task ^. tdOutput)
+ error $
+ "Received an already running task from the coordinator "
+ ++ show (task ^. tdOutput)
+ case mb of
+ Nothing -> lift $ threadDelay 1000000
+ Just () -> return ()
+
+-- | @withFollowFile in out@ follows the file @in@
+-- and prints contents to @out@ as they appear.
+-- The file must exist. Doesn't handle file truncation.
+withFollowFile :: Path Abs File -> Handle -> IO a -> IO a
+withFollowFile infile outhandle action = do
+ mv <- newEmptyMVar
+ inhandle <- openFile (fromAbsFile infile) ReadMode
+ let loop = do
+ some <- BS.hGetSome inhandle 4096
+ if BS.null some then do
+ done <- isJust <$> tryTakeMVar mv
+ unless done $ do
+ threadDelay 10000
+ loop
+ else do
+ BS.hPut outhandle some
+ loop
+ snd <$> concurrently (tryIO loop) (action <* putMVar mv ())
diff --git a/src/Control/Funflow/Lock.hs b/src/Control/Funflow/Lock.hs
new file mode 100644
index 0000000..ea6f0fa
--- /dev/null
+++ b/src/Control/Funflow/Lock.hs
@@ -0,0 +1,118 @@
+{-# LANGUAGE QuasiQuotes #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+
+-- | Thread and process write lock.
+--
+-- Allows synchronisation between threads and processes.
+-- Uses an 'MVar' for synchronisation between threads
+-- and fcntl write locks for synchronisation between processes.
+--
+-- Only ever have one 'Lock' object per lock file per process!
+module Control.Funflow.Lock
+ ( Lock
+ , openLock
+ , closeLock
+ , withLock
+ ) where
+
+import Control.Concurrent
+import Control.Exception.Safe
+import Control.Monad (unless)
+import Network.HostName (getHostName)
+import Path
+import Path.IO
+import System.Posix.Files
+import System.Posix.IO
+import System.Posix.Process
+import System.Random
+
+-- | Thread and process write lock.
+--
+-- Only ever have one 'Lock' object per lock file per process!
+data Lock = Lock
+ { lockMVar :: MVar ()
+ , lockDir :: Path Abs Dir
+ }
+
+-- | Open the lock file and create a lock object.
+--
+-- This does not acquire the lock.
+--
+-- Only ever have one 'Lock' object per lock file per process!
+openLock :: Path Abs Dir -> IO Lock
+openLock dir = do
+ mvar <- newMVar ()
+ createDirIfMissing True dir
+ return $! Lock
+ { lockMVar = mvar
+ , lockDir = dir
+ }
+
+-- | Close the lock file.
+--
+-- Does not release the lock.
+--
+-- Blocks if the lock is taken.
+closeLock :: Lock -> IO ()
+closeLock lock = do
+ takeMVar (lockMVar lock)
+
+-- | Acquire the lock for the duration of the given action and release after.
+withLock :: Lock -> IO a -> IO a
+withLock lock action =
+ withMVar (lockMVar lock) $ \() ->
+ bracket_ (acquireDirLock $ lockDir lock) (releaseDirLock $ lockDir lock) $
+ action
+
+----------------------------------------------------------------------
+-- Internals
+
+-- | Generate unique (per process) filename.
+--
+-- Combines the host name and process ID.
+getUniqueFileName :: IO (Path Rel File)
+getUniqueFileName = do
+ hostName <- getHostName
+ pid <- getProcessID
+ parseRelFile $ hostName ++ show pid
+
+lockFileName :: Path Rel File
+lockFileName = [relfile|lock|]
+
+-- | Acquire the lock.
+--
+-- Uses an algorithm that is described in the man-page of open (2) in the
+-- last paragraph to @O_EXCL@ in release 4.14 of the Linux man-pages project.
+--
+-- Creates a file under a unique (per process) filename.
+-- Attempts to hard-link that file to a common lock path.
+-- If the operation succeeds, then the lock was acquired.
+-- If not, but if the link count of the file under the unique filename
+-- increased to two, then the lock was acquired.
+-- Otherwise, another process holds the lock and this process waits
+-- and retries.
+acquireDirLock :: Path Abs Dir -> IO ()
+acquireDirLock dir = do
+ file <- getUniqueFileName
+ let path = dir </> file
+ fd <- createFile (fromAbsFile path) ownerWriteMode
+ closeFd fd
+ r <- try $ createLink (fromAbsFile path) (fromAbsFile $ dir </> lockFileName)
+ case r of
+ Right () -> return ()
+ Left (_::IOError) -> do
+ count <- linkCount <$> getFileStatus (fromAbsFile path)
+ unless (count == 2) $ do
+ delay <- randomRIO (50000, 100000)
+ threadDelay delay
+ acquireDirLock dir
+
+-- | Release the lock.
+--
+-- Unlinks the file under the unique file name and the common lock file.
+releaseDirLock :: Path Abs Dir -> IO ()
+releaseDirLock dir = do
+ file <- getUniqueFileName
+ let path = dir </> file
+ removeLink (fromAbsFile $ dir </> lockFileName)
+ removeLink (fromAbsFile path)
diff --git a/src/Control/Funflow/Orphans.hs b/src/Control/Funflow/Orphans.hs
new file mode 100644
index 0000000..36b7db5
--- /dev/null
+++ b/src/Control/Funflow/Orphans.hs
@@ -0,0 +1,28 @@
+{-# OPTIONS_GHC -fno-warn-orphans #-}
+{-# LANGUAGE FlexibleInstances #-}
+
+-- | Dedicated module for orphan instances.
+module Control.Funflow.Orphans where
+
+import Data.Functor.Contravariant
+import Data.Store (Store)
+import qualified Data.Store as Store
+import qualified Path as Path
+import qualified Path.Internal
+
+instance Store (Path.Path Path.Abs Path.File) where
+ size = contramap (\(Path.Internal.Path fp) -> fp) Store.size
+ peek = Path.Internal.Path <$> Store.peek
+ poke = Store.poke . (\(Path.Internal.Path fp) -> fp)
+instance Store (Path.Path Path.Abs Path.Dir) where
+ size = contramap (\(Path.Internal.Path fp) -> fp) Store.size
+ peek = Path.Internal.Path <$> Store.peek
+ poke = Store.poke . (\(Path.Internal.Path fp) -> fp)
+instance Store (Path.Path Path.Rel Path.File) where
+ size = contramap (\(Path.Internal.Path fp) -> fp) Store.size
+ peek = Path.Internal.Path <$> Store.peek
+ poke = Store.poke . (\(Path.Internal.Path fp) -> fp)
+instance Store (Path.Path Path.Rel Path.Dir) where
+ size = contramap (\(Path.Internal.Path fp) -> fp) Store.size
+ peek = Path.Internal.Path <$> Store.peek
+ poke = Store.poke . (\(Path.Internal.Path fp) -> fp)
diff --git a/src/Control/Funflow/Pretty.hs b/src/Control/Funflow/Pretty.hs
new file mode 100644
index 0000000..2f7dea1
--- /dev/null
+++ b/src/Control/Funflow/Pretty.hs
@@ -0,0 +1,24 @@
+{-# LANGUAGE GADTs #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+
+-- | Pretty-printer for Funflow diagrams.
+module Control.Funflow.Pretty where
+
+import Control.Funflow.Base
+import Control.Funflow.Diagram
+import qualified Data.Text as T
+import Text.PrettyPrint
+
+ppFlow :: Flow eff ex a b -> Doc
+ppFlow = ppDiagram . toDiagram where
+ ppDiagram :: forall ex a b. Diagram ex a b -> Doc
+ ppDiagram (Node (NodeProperties (lbl:_)) _ _) = text . T.unpack $ lbl
+ ppDiagram (Node _ _ _) = text "unlabeled step"
+ ppDiagram (Seq f g) = parens $ ppDiagram f <+> text ">>>" <+> ppDiagram g
+ ppDiagram (Par f g) = parens $ ppDiagram f <+> text "***" <+> ppDiagram g
+ ppDiagram (Fanin f g) = parens $ ppDiagram f <+> text "|||" <+> ppDiagram g
+ ppDiagram (Catch f g) = parens $ ppDiagram f <+> text "catch" <+> ppDiagram g
+
+showFlow :: Flow eff ex a b -> String
+showFlow = render . ppFlow
diff --git a/src/Control/Funflow/Steps.hs b/src/Control/Funflow/Steps.hs
new file mode 100644
index 0000000..9cde1f2
--- /dev/null
+++ b/src/Control/Funflow/Steps.hs
@@ -0,0 +1,234 @@
+{-# LANGUAGE Arrows #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE GADTs #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE QuasiQuotes #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE TupleSections #-}
+
+-- | Miscallaneous steps to form part of Funflow computations.
+module Control.Funflow.Steps
+ ( -- * Error handling
+ retry
+ -- * Store manipulation
+ , assignAliasInStore
+ , copyDirToStore
+ , copyFileToStore
+ , listDirContents
+ , lookupAliasInStore
+ , mergeDirs
+ , mergeFiles
+ , putInStoreAt
+ , readString
+ , readString_
+ , readYaml
+ , writeExecutableString
+ , writeString
+ , writeString_
+ , writeYaml
+ , writeYaml_
+ -- * Docker
+ , docker
+ -- * Testing and debugging
+ , promptFor
+ , printS
+ , failStep
+ , worstBernoulli
+ , pauseWith
+ , melancholicLazarus
+ )
+where
+
+import Control.Arrow
+import Control.Arrow.Free (catch)
+import Control.Exception.Safe (Exception, throwM)
+import Control.Funflow.Base (SimpleFlow)
+import Control.Funflow.Class
+import Control.Funflow.ContentHashable (ContentHashable,
+ DirectoryContent (..),
+ FileContent (..))
+import Control.Funflow.ContentStore (Content ((:</>)))
+import qualified Control.Funflow.ContentStore as CS
+import qualified Control.Funflow.External.Docker as Docker
+import Data.Foldable (for_)
+import Data.Store
+import Data.Traversable (for)
+import Data.Typeable (Typeable)
+import qualified Data.Yaml as Yaml
+import GHC.Conc (threadDelay)
+import Path
+import Path.IO
+import System.Posix.Files (accessModes, createLink,
+ setFileMode)
+import System.Random
+
+promptFor :: (Read a, ArrowFlow eff ex arr) => arr String a
+promptFor = proc s -> do
+ () <- stepIO putStr -< (s++"> ")
+ s' <- stepIO (const getLine) -< ()
+ returnA -< read s'
+
+printS :: (Show a, ArrowFlow eff ex arr) => arr a ()
+printS = stepIO $ \s-> print s
+
+failStep :: ArrowFlow eff ex arr => arr () ()
+failStep = stepIO $ \_ -> fail "failStep"
+
+worstBernoulli :: (Exception ex, ArrowFlow eff ex arr) => (String -> ex) -> arr Double Double
+worstBernoulli errorC = stepIO $ \p -> do
+ r <- randomRIO (0,1)
+ if r < p
+ then return r
+ else throwM . errorC $ "worstBernoulli fail with "++ show r++ " > "++show p
+
+-- | pause for a given number of seconds. Thread through a value to ensure
+-- delay does not happen inparallel with other processing
+pauseWith :: (Store a, ArrowFlow eff ex arr) => arr (Int, a) a
+pauseWith = stepIO $ \(secs,a) -> do
+ threadDelay (secs*1000000)
+ return a
+
+-- | on first invocation die and leave a suicide note
+-- on second invocation it is resurrected and destroys suicide note, returning contents
+melancholicLazarus :: ArrowFlow eff ex arr => arr String String
+melancholicLazarus = stepIO $ \s -> do
+ let fnm = [absfile|/tmp/lazarus_note|]
+ ex <- doesFileExist fnm
+ if ex
+ then do s1 <- readFile (fromAbsFile fnm)
+ removeFile fnm
+ return s1
+ else do writeFile (fromAbsFile fnm) s
+ fail "lazarus fail"
+
+-- | `retry n s f` reruns `f` on failure at most n times with a delay of `s`
+-- seconds between retries
+retry :: forall arr eff ex a b. (Exception ex, Store a, ArrowFlow eff ex arr)
+ => Int -> Int -> arr a b -> arr a b
+retry 0 _ f = f
+retry n secs f = catch f $ proc (x, _ :: ex) -> do
+ x1 <- pauseWith -< (secs,x)
+ x2 <- retry (n-1) secs f -< x1
+ returnA -< x2
+
+lookupAliasInStore :: ArrowFlow eff ex arr => arr CS.Alias (Maybe CS.Item)
+lookupAliasInStore = internalManipulateStore CS.lookupAlias
+assignAliasInStore :: ArrowFlow eff ex arr => arr (CS.Alias, CS.Item) ()
+assignAliasInStore = internalManipulateStore $ \store (alias, item) ->
+ CS.assignAlias store alias item
+
+putInStoreAt :: (ContentHashable IO a, Typeable t, ArrowFlow eff ex arr)
+ => (Path Abs t -> a -> IO ()) -> arr (a, Path Rel t) (CS.Content t)
+putInStoreAt f = proc (a, p) -> do
+ item <- putInStore (\d (a, p) -> do
+ createDirIfMissing True (parent $ d </> p)
+ f (d </> p) a
+ ) -< (a, p)
+ returnA -< item :</> p
+
+-- | @copyFileToStore (fIn, fOut)@ copies the contents of @fIn@ into the store
+-- under the relative path @fOut@ within the subtree.
+copyFileToStore :: ArrowFlow eff ex arr => arr (FileContent, Path Rel File) (CS.Content File)
+copyFileToStore = putInStoreAt $ \p (FileContent inFP) -> copyFile inFP p
+
+-- | @copyDirToStore (dIn, Nothing)@ copies the contents of @dIn@ into the store
+-- right under the subtree.
+--
+-- | @copyDirToStore (dIn, Just dOut)@ copies the contents of @dIn@ into the store
+-- under relative path @dOut@ within the subtree
+copyDirToStore :: ArrowFlow eff ex arr => arr (DirectoryContent, Maybe (Path Rel Dir)) (CS.Content Dir)
+copyDirToStore = proc (inDir, mbOutDir) ->
+ case mbOutDir of
+ Nothing -> do
+ item <- putInStore (\d (DirectoryContent inDir) ->
+ copyDirRecur inDir d
+ ) -< inDir
+ returnA -< CS.All item
+ Just outDir ->
+ putInStoreAt (\p (DirectoryContent inDir) ->
+ copyDirRecur inDir p
+ ) -< (inDir, outDir)
+
+-- | List the contents of a directory within the store
+listDirContents :: ArrowFlow eff ex arr => arr (CS.Content Dir)
+ ([CS.Content Dir], [CS.Content File])
+listDirContents = internalManipulateStore
+ ( \store dir -> let
+ item = CS.contentItem dir
+ itemRoot = CS.itemPath store item
+ in do
+ (dirs, files) <- listDir $ CS.contentPath store dir
+ relDirs <- for dirs (stripProperPrefix itemRoot)
+ relFiles <- for files (stripProperPrefix itemRoot)
+ return ( (item :</>) <$> relDirs
+ , (item :</>) <$> relFiles
+ )
+ )
+
+-- | Merge a number of store directories together into a single output directory.
+-- This uses hardlinks to avoid duplicating the data on disk.
+mergeDirs :: ArrowFlow eff ex arr => arr [CS.Content Dir] (CS.Content Dir)
+mergeDirs = proc inDirs -> do
+ paths <- internalManipulateStore
+ ( \store items -> return $ CS.contentPath store <$> items) -< inDirs
+ arr CS.All <<< putInStore
+ ( \d inDirs -> for_ inDirs $ \inDir -> do
+ (subDirs, files) <- listDirRecur inDir
+ for_ subDirs $ \absSubDir -> do
+ relSubDir <- stripProperPrefix inDir absSubDir
+ createDirIfMissing True (d </> relSubDir)
+ for_ files $ \absFile -> do
+ relFile <- stripProperPrefix inDir absFile
+ createLink (toFilePath absFile) (toFilePath $ d </> relFile)
+ ) -< paths
+
+-- | Merge a number of files into a single output directory.
+mergeFiles :: ArrowFlow eff ex arr => arr [CS.Content File] (CS.Content Dir)
+mergeFiles = proc inFiles -> do
+ absFiles <- internalManipulateStore
+ ( \store items -> return $ CS.contentPath store <$> items) -< inFiles
+ arr CS.All <<< putInStore
+ (\d inFiles -> for_ inFiles $ \inFile ->
+ createLink (toFilePath inFile) (toFilePath $ d </> filename inFile)
+ ) -< absFiles
+
+
+-- | Read the contents of the given file in the store.
+readString :: ArrowFlow eff ex arr => arr (CS.Content File) String
+readString = getFromStore $ readFile . fromAbsFile
+
+-- | Read the contents of a file named @out@ within the given item.
+readString_ :: ArrowFlow eff ex arr => arr CS.Item String
+readString_ = arr (:</> [relfile|out|]) >>> readString
+
+-- | Create and write into a file under the given path in the store.
+writeString :: ArrowFlow eff ex arr => arr (String, Path Rel File) (CS.Content File)
+writeString = putInStoreAt $ writeFile . fromAbsFile
+
+writeExecutableString :: ArrowFlow eff ex arr => arr (String, Path Rel File) (CS.Content File)
+writeExecutableString = putInStoreAt $ \p i -> do
+ writeFile (fromAbsFile p) i
+ setFileMode (fromAbsFile p) accessModes
+
+-- | Create and write into a file named @out@ within the given item.
+writeString_ :: ArrowFlow eff ex arr => arr String (CS.Content File)
+writeString_ = Control.Funflow.Steps.writeString <<< arr (, [relfile|out|])
+
+-- | Read a YAML file from the given file in the store.
+readYaml :: Yaml.FromJSON a
+ => SimpleFlow (CS.Content File) (Either Yaml.ParseException a)
+readYaml = getFromStore (Yaml.decodeFileEither . fromAbsFile)
+
+-- | Write a YAML file under the given name to the store.
+writeYaml :: (ContentHashable IO a, Yaml.ToJSON a)
+ => SimpleFlow (a, Path Rel File) (CS.Content File)
+writeYaml = putInStoreAt $ Yaml.encodeFile . fromAbsFile
+
+-- | Write a YAML file named @out.yaml@ to the store.
+writeYaml_ :: (ContentHashable IO a, Yaml.ToJSON a)
+ => SimpleFlow a (CS.Content File)
+writeYaml_ = writeYaml <<< arr (, [relfile|out.yaml|])
+
+docker :: (ContentHashable IO a, ArrowFlow eff ex arr) => (a -> Docker.Config) -> arr a CS.Item
+docker f = external $ Docker.toExternal . f
diff --git a/test/Control/Arrow/Async/Tests.hs b/test/Control/Arrow/Async/Tests.hs
new file mode 100644
index 0000000..d7de734
--- /dev/null
+++ b/test/Control/Arrow/Async/Tests.hs
@@ -0,0 +1,30 @@
+{-# LANGUAGE GADTs #-}
+module Control.Arrow.Async.Tests (tests) where
+import Control.Arrow
+import Control.Arrow.Async
+import Control.Arrow.Free
+import Control.Category
+import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar)
+import Prelude hiding (id, (.))
+import Test.Tasty
+import Test.Tasty.HUnit
+
+data Step a b where
+ Step :: (a -> IO b) -> Step a b
+
+runStep :: Step a b -> AsyncA IO a b
+runStep (Step f) = AsyncA $ \a -> f a
+
+type Flow = Free Step
+
+tests :: TestTree
+tests = testGroup "AsyncA"
+ [ testCase "parallel arrows executed in parallel" $ do
+ sem <- newEmptyMVar
+ let flow1 = effect . Step $ \() -> takeMVar sem
+ flow2 = effect . Step $ \i -> putMVar sem i
+ flow :: Flow ((), Int) (Int, ())
+ flow = flow1 *** flow2
+ (out1, out2) <- runAsyncA (eval runStep flow) ((), 3)
+ assertEqual "Should finish" ((), 3) (out2, out1)
+ ]
diff --git a/test/Funflow/ContentStore.hs b/test/Funflow/ContentStore.hs
new file mode 100644
index 0000000..5bfa098
--- /dev/null
+++ b/test/Funflow/ContentStore.hs
@@ -0,0 +1,413 @@
+{-# LANGUAGE LambdaCase #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE QuasiQuotes #-}
+{-# LANGUAGE TypeApplications #-}
+
+module Funflow.ContentStore
+ ( tests
+ ) where
+
+import Control.Concurrent.Async
+import Control.Exception.Safe (tryAny)
+import Control.Funflow.ContentHashable (contentHash)
+import Control.Funflow.ContentStore (ContentStore)
+import qualified Control.Funflow.ContentStore as ContentStore
+import Control.Monad (void)
+import Data.Maybe (catMaybes)
+import qualified Data.Set as Set
+import Path
+import Path.IO
+import System.Posix.Files
+import Test.Tasty
+import Test.Tasty.HUnit
+
+tests :: TestTree
+tests = testGroup "Content Store"
+
+ [ testCase "initialise fresh store" $
+ withTmpDir $ \dir -> do
+ let root = dir </> [reldir|store|]
+ ContentStore.withStore root $ \_ ->
+ doesDirExist @IO root
+ @? "store root exists"
+
+ , testCase "initialise existing store" $
+ withTmpDir $ \dir -> do
+ let root = dir </> [reldir|store|]
+ createDir root
+ ContentStore.withStore root $ \_ ->
+ doesDirExist @IO root
+ @? "store root exists"
+
+ , testCase "store is not writable" $
+ withEmptyStore $ \store -> do
+ let root = ContentStore.root store
+ isNotWritable root
+ @? "store not writable"
+
+ , testCase "subtree stages" $
+ withEmptyStore $ \store -> do
+ hash <- contentHash ("test" :: String)
+
+ missing <- ContentStore.query store hash
+ missing @?= ContentStore.Missing ()
+ missing' <- ContentStore.lookup store hash
+ missing' @?= ContentStore.Missing ()
+
+ subtree <- ContentStore.markPending store hash
+ let dir = subtree </> [reldir|dir|]
+ file = dir </> [relfile|file|]
+ expectedContent = "Hello World"
+ pending <- ContentStore.query store hash
+ pending @?= ContentStore.Pending ()
+ pending' <- ContentStore.lookup store hash
+ pending' @?= ContentStore.Pending ()
+ doesDirExist @IO subtree
+ @? "pending subtree exists"
+ isWritable subtree
+ @? "pending subtree is writable"
+ createDir dir
+ writeFile (fromAbsFile file) expectedContent
+ do
+ content <- readFile (fromAbsFile file)
+ content @?= expectedContent
+
+ item <- ContentStore.markComplete store hash
+ let itemDir = ContentStore.itemPath store item
+ file' = itemDir </> [relfile|dir/file|]
+ complete <- ContentStore.query store hash
+ complete @?= ContentStore.Complete ()
+ complete' <- ContentStore.lookup store hash
+ complete' @?= ContentStore.Complete item
+ doesDirExist @IO itemDir
+ @? "complete subtree exists"
+ isNotWritable itemDir
+ @? "complete subtree is not writable"
+ isNotWritable file'
+ @? "complete file is not writable"
+ do
+ content <- readFile (fromAbsFile file')
+ content @?= expectedContent
+
+ , testCase "await construction" $
+ withEmptyStore $ \store -> do
+ hash <- contentHash ("test" :: String)
+
+ ContentStore.constructOrAsync store hash >>= \case
+ ContentStore.Pending _ ->
+ assertFailure "missing already under construction"
+ ContentStore.Complete _ ->
+ assertFailure "missing already complete"
+ ContentStore.Missing _ ->
+ return ()
+
+ a <- ContentStore.constructOrAsync store hash >>= \case
+ ContentStore.Missing _ -> do
+ assertFailure "under construction still missing"
+ undefined
+ ContentStore.Complete _ -> do
+ assertFailure "under construction already complete"
+ undefined
+ ContentStore.Pending a ->
+ return a
+
+ b <- ContentStore.lookupOrWait store hash >>= \case
+ ContentStore.Missing _ -> do
+ assertFailure "under construction still missing"
+ undefined
+ ContentStore.Complete _ -> do
+ assertFailure "under construction already complete"
+ undefined
+ ContentStore.Pending b ->
+ return b
+
+ item <- ContentStore.markComplete store hash
+
+ item' <- wait a
+ item' @?= ContentStore.Completed item
+
+ item'' <- wait b
+ item'' @?= ContentStore.Completed item
+
+ ContentStore.constructOrAsync store hash >>= \case
+ ContentStore.Missing _ -> do
+ assertFailure "complete still missing"
+ ContentStore.Pending _ -> do
+ assertFailure "complete still under construction"
+ ContentStore.Complete _ -> do
+ return ()
+
+ , testCase "await failure" $
+ withEmptyStore $ \store -> do
+ hash <- contentHash ("test" :: String)
+
+ ContentStore.constructOrAsync store hash >>= \case
+ ContentStore.Pending _ ->
+ assertFailure "missing already under construction"
+ ContentStore.Complete _ ->
+ assertFailure "missing already complete"
+ ContentStore.Missing _ ->
+ return ()
+
+ a <- ContentStore.constructOrAsync store hash >>= \case
+ ContentStore.Missing _ -> do
+ assertFailure "under construction still missing"
+ undefined
+ ContentStore.Complete _ -> do
+ assertFailure "under construction already complete"
+ undefined
+ ContentStore.Pending a ->
+ return a
+
+ b <- ContentStore.lookupOrWait store hash >>= \case
+ ContentStore.Missing _ -> do
+ assertFailure "under construction still missing"
+ undefined
+ ContentStore.Complete _ -> do
+ assertFailure "under construction already complete"
+ undefined
+ ContentStore.Pending b ->
+ return b
+
+ ContentStore.removeFailed store hash
+
+ item' <- wait a
+ item' @?= ContentStore.Failed
+
+ item'' <- wait b
+ item'' @?= ContentStore.Failed
+
+ ContentStore.constructOrAsync store hash >>= \case
+ ContentStore.Pending _ -> do
+ assertFailure "failed still under construction"
+ ContentStore.Complete _ -> do
+ assertFailure "failed already complete"
+ ContentStore.Missing _ -> do
+ return ()
+
+ , testCase "construct if missing" $
+ withEmptyStore $ \store -> do
+ hash <- contentHash ("test" :: String)
+ let file = [relfile|file|]
+ expectedContent = "Hello World"
+
+ ContentStore.constructIfMissing store hash >>= \case
+ ContentStore.Pending () ->
+ assertFailure "missing already under construction"
+ ContentStore.Complete _ ->
+ assertFailure "missing already complete"
+ ContentStore.Missing subtree -> do
+ isWritable subtree
+ @? "under construction not writable"
+ writeFile (fromAbsFile $ subtree </> file) expectedContent
+
+ ContentStore.constructIfMissing store hash >>= \case
+ ContentStore.Missing _ ->
+ assertFailure "under construction still missing"
+ ContentStore.Complete _ ->
+ assertFailure "under construction already complete"
+ ContentStore.Pending () ->
+ void $ ContentStore.markComplete store hash
+
+ ContentStore.constructIfMissing store hash >>= \case
+ ContentStore.Missing _ ->
+ assertFailure "complete still missing"
+ ContentStore.Pending () ->
+ assertFailure "complete still under construction"
+ ContentStore.Complete item -> do
+ let subtree = ContentStore.itemPath store item
+ isNotWritable (subtree </> file)
+ @? "complete still writable"
+ content <- readFile (fromAbsFile $ subtree </> file)
+ content @?= expectedContent
+
+ , testCase "remove failed" $
+ withEmptyStore $ \store -> do
+ hash <- contentHash ("test" :: String)
+ subtree <- ContentStore.markPending store hash
+ ContentStore.removeFailed store hash
+ not <$> doesDirExist @IO subtree
+ @? "subtree was removed"
+
+ , testCase "forcibly remove" $
+ withEmptyStore $ \store -> do
+ hash <- contentHash ("test" :: String)
+ subtree <- ContentStore.markPending store hash
+
+ ContentStore.removeForcibly store hash
+ not <$> doesDirExist @IO subtree
+ @? "remove under construction"
+
+ ContentStore.removeForcibly store hash
+ not <$> doesDirExist @IO subtree
+ @? "remove missing"
+
+ subtree' <- ContentStore.markPending store hash
+ void $ ContentStore.markComplete store hash
+ ContentStore.removeForcibly store hash
+ not <$> doesDirExist @IO subtree'
+ @? "remove complete"
+
+ , testCase "subtree state is persisted" $
+ withTmpDir $ \dir -> do
+ let root = dir </> [reldir|store|]
+ hash <- contentHash ("test" :: String)
+
+ do
+ ContentStore.withStore root $ \store ->
+ void $ ContentStore.markPending store hash
+
+ -- Imagine the process terminates and the store is closed
+
+ do
+ ContentStore.withStore root $ \store -> do
+ underConstruction <- ContentStore.query store hash
+ underConstruction @?= ContentStore.Pending ()
+ void $ ContentStore.markComplete store hash
+
+ -- Imagine the process terminates and the store is closed
+
+ do
+ ContentStore.withStore root $ \store -> do
+ complete <- ContentStore.query store hash
+ complete @?= ContentStore.Complete ()
+
+ , testCase "mark complete before under construction fails" $
+ withEmptyStore $ \store -> do
+ hash <- contentHash ("test" :: String)
+ ContentStore.markComplete store hash
+ `shouldFail` "complete before under construction"
+
+ , testCase "mark complete after complete fails" $
+ withEmptyStore $ \store -> do
+ hash <- contentHash ("test" :: String)
+ void $ ContentStore.markPending store hash
+ void $ ContentStore.markComplete store hash
+ ContentStore.markComplete store hash
+ `shouldFail` "complete after complete"
+
+ , testCase "mark under construction after under construction fails" $
+ withEmptyStore $ \store -> do
+ hash <- contentHash ("test" :: String)
+ void $ ContentStore.markPending store hash
+ void $ ContentStore.markPending store hash
+ `shouldFail` "under construction after under construction"
+
+ , testCase "mark under construction after complete fails" $
+ withEmptyStore $ \store -> do
+ hash <- contentHash ("test" :: String)
+ void $ ContentStore.markPending store hash
+ void $ ContentStore.markComplete store hash
+ void $ ContentStore.markPending store hash
+ `shouldFail` "under construction after complete"
+
+ , testCase "remove missing fails" $
+ withEmptyStore $ \store -> do
+ hash <- contentHash ("test" :: String)
+ ContentStore.removeFailed store hash
+ `shouldFail` "remove non existent"
+
+ , testCase "remove complete fails" $
+ withEmptyStore $ \store -> do
+ hash <- contentHash ("test" :: String)
+ void $ ContentStore.markPending store hash
+ void $ ContentStore.markComplete store hash
+ ContentStore.removeFailed store hash
+ `shouldFail` "remove complete"
+
+ , testCase "list store contents" $
+ withEmptyStore $ \store -> do
+ [a, b, c, d] <- mapM contentHash ["a", "b", "c", "d" :: String]
+ void $ mapM (ContentStore.markPending store) [a, b, c, d]
+ mapM_ (ContentStore.markComplete store) [a, b]
+
+ (pendings, completes, items) <- ContentStore.listAll store
+ let all' = pendings ++ completes
+ Set.fromList all' @?= Set.fromList [a, b, c, d]
+ Set.size (Set.fromList all') @?= 4
+
+ underContsruction <- ContentStore.listPending store
+ Set.fromList underContsruction @?= Set.fromList [c, d]
+
+ complete <- ContentStore.listComplete store
+ Set.fromList complete @?= Set.fromList [a, b]
+
+ items' <- catMaybes <$> mapM (ContentStore.waitUntilComplete store) [a, b]
+ Set.fromList items @?= Set.fromList items'
+
+ , testCase "store aliases" $
+ withEmptyStore $ \store -> do
+ let fp = [relfile|test|]
+ contentA = "itemA"
+ contentB = "itemB"
+ itemA <- do
+ hash <- contentHash ("a" :: String)
+ dir <- ContentStore.markPending store hash
+ writeFile (fromAbsFile $ dir </> fp) contentA
+ ContentStore.markComplete store hash
+ itemB <- do
+ hash <- contentHash ("b" :: String)
+ dir <- ContentStore.markPending store hash
+ writeFile (fromAbsFile $ dir </> fp) contentB
+ ContentStore.markComplete store hash
+ let aliasA = ContentStore.Alias "aliasA"
+ aliasB = ContentStore.Alias "aliasB"
+
+ do
+ r <- ContentStore.lookupAlias store aliasA
+ r @?= Nothing
+
+ ContentStore.assignAlias store aliasA itemA
+ ContentStore.assignAlias store aliasB itemB
+ do
+ r <- ContentStore.lookupAlias store aliasA
+ r @?= Just itemA
+ do
+ r <- ContentStore.lookupAlias store aliasB
+ r @?= Just itemB
+
+ ContentStore.assignAlias store aliasB itemA
+ do
+ r <- ContentStore.lookupAlias store aliasA
+ r @?= Just itemA
+ do
+ r <- ContentStore.lookupAlias store aliasB
+ r @?= Just itemA
+
+ ContentStore.removeAlias store aliasA
+ ContentStore.removeAlias store aliasB
+ do
+ r <- ContentStore.lookupAlias store aliasA
+ r @?= Nothing
+ do
+ r <- ContentStore.lookupAlias store aliasB
+ r @?= Nothing
+
+ ]
+
+shouldFail :: IO a -> String -> IO ()
+shouldFail m msg = tryAny m >>= \case
+ Left _ -> return ()
+ Right _ -> assertFailure msg
+
+withTmpDir :: (Path Abs Dir -> IO a) -> IO a
+withTmpDir = withSystemTempDir "funflow-test"
+
+withEmptyStore :: (ContentStore -> IO a) -> IO a
+withEmptyStore k = withTmpDir $ \dir ->
+ ContentStore.withStore (dir </> [reldir|store|]) k
+
+isNotWritable :: Path Abs t -> IO Bool
+isNotWritable path = do
+ mode <- fileMode <$> getFileStatus (toFilePath path)
+ return $! nullFileMode == (mode `intersectFileModes` writeModes)
+ where
+ writeModes =
+ ownerWriteMode
+ `unionFileModes`
+ groupWriteMode
+ `unionFileModes`
+ otherWriteMode
+
+isWritable :: Path Abs t -> IO Bool
+isWritable = fmap not . isNotWritable
diff --git a/test/Funflow/SQLiteCoordinator.hs b/test/Funflow/SQLiteCoordinator.hs
new file mode 100644
index 0000000..4851e9c
--- /dev/null
+++ b/test/Funflow/SQLiteCoordinator.hs
@@ -0,0 +1,142 @@
+{-# LANGUAGE Arrows #-}
+{-# LANGUAGE LambdaCase #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE QuasiQuotes #-}
+
+module Funflow.SQLiteCoordinator where
+
+import Control.Arrow
+import Control.Arrow.Free
+import Control.Concurrent (threadDelay)
+import Control.Concurrent.Async (wait, withAsync)
+import Control.Concurrent.MVar
+import Control.Exception.Safe
+import Control.Funflow
+import qualified Control.Funflow.ContentStore as CS
+import Control.Funflow.External.Coordinator.SQLite
+import Control.Monad
+import Data.Semigroup ((<>))
+import Data.String (fromString)
+import Path
+import Path.IO
+import qualified System.Posix.Signals as Signals
+import System.Posix.Types (ProcessID)
+import System.Process
+import qualified System.Process.Internals as Process
+import System.Timeout (timeout)
+import Test.Tasty
+import Test.Tasty.HUnit
+
+storeDir :: Path Rel Dir
+storeDir = [reldir|store|]
+
+dbDir :: Path Rel Dir
+dbDir = [reldir|coord|]
+
+-- XXX: Reduce noise by piping stdout/stderr to a logfile.
+spawnExecutor :: Path Abs Dir -> IO ProcessHandle
+spawnExecutor wd = spawnProcess "ffexecutord"
+ ["sqlite", fromAbsDir $ wd </> storeDir, fromAbsDir $ wd </> dbDir]
+
+spawnExecutors :: Path Abs Dir -> Int -> IO [ProcessHandle]
+spawnExecutors wd n = replicateM n (spawnExecutor wd)
+
+killExecutors :: [ProcessHandle] -> IO ()
+killExecutors = mapM_ terminateProcess
+
+withExecutors :: Path Abs Dir -> Int -> ([ProcessHandle] -> IO a) -> IO a
+withExecutors wd n = bracket (spawnExecutors wd n) killExecutors
+
+withExecutors_ :: Path Abs Dir -> Int -> IO a -> IO a
+withExecutors_ wd n = withExecutors wd n . const
+
+getProcessID :: ProcessHandle -> IO (Maybe ProcessID)
+getProcessID ph = Process.withProcessHandle ph $ \case
+ Process.OpenHandle h -> return (Just h)
+ Process.OpenExtHandle h _ _ -> return (Just h)
+ Process.ClosedHandle {} -> return Nothing
+
+signalProcess :: Signals.Signal -> ProcessHandle -> IO ()
+signalProcess sig = getProcessID >=> \case
+ Nothing -> return ()
+ Just pid -> Signals.signalProcess sig pid
+
+runTestFlow :: Path Abs Dir -> SimpleFlow a b -> a -> IO (Either SomeException b)
+runTestFlow wd flow' input =
+ CS.withStore (wd </> storeDir) $ \store ->
+ runSimpleFlow SQLite (wd </> dbDir) store flow' input
+
+echo :: SimpleFlow String CS.Item
+echo = external $ \msg -> ExternalTask
+ { _etCommand = "echo"
+ , _etWriteToStdOut = True
+ , _etParams = ["-n", fromString msg]
+ }
+
+sleepEcho :: SimpleFlow (Double, String) CS.Item
+sleepEcho = external $ \(time, msg) -> ExternalTask
+ { _etCommand = "sh"
+ , _etWriteToStdOut = True
+ , _etParams =
+ [ "-c"
+ , "sleep " <> fromString (show time) <> ";"
+ <> "echo -n " <> fromString msg
+ ]
+ }
+
+flow :: SimpleFlow () String
+flow = proc () -> do
+ (a, (b, (c, d)))
+ <- echo *** echo *** echo *** echo
+ -< ("a", ("b", ("c", "d")))
+ (e, (f, (g, h)))
+ <- echo *** echo *** echo *** echo
+ -< ("e", ("f", ("g", "h")))
+ arr concat <<< mapA readString_ -< [a, b, c, d, e, f, g, h]
+
+tests :: TestTree
+tests = testGroup "SQLite Coordinator"
+ [ testCase "echo flow" $
+ withSystemTempDir "funflow_sqlite_" $ \wd ->
+ withExecutors_ wd 4 $ do
+ r <- runTestFlow wd flow ()
+ case r of
+ Left err -> assertFailure $ displayException err
+ Right x -> x @?= "abcdefgh"
+ , testCase "interrupt worker" $
+ withSystemTempDir "funflow_sqlite_" $ \wd -> do
+ r <- timeout 10000000 $
+ -- Spawn one initial executor.
+ withExecutors wd 1 $ \[executorHandle] -> do
+ mvar <- newMVar False
+ let trigger :: SimpleFlow () ()
+ trigger = stepIO (\_ -> modifyMVar_ mvar $ \_ -> pure True)
+ sleepFlow :: SimpleFlow () String
+ sleepFlow = proc () -> do
+ r <- sleepEcho -< (1, "test")
+ trigger -< const () r
+ readString_ -< r
+ -- Run the flow in parallel.
+ withAsync (runTestFlow wd sleepFlow ()) $ \flowAsync -> do
+ threadDelay 500000
+ -- Interrupt the executor while the external task is running.
+ signalProcess Signals.sigINT executorHandle
+ -- Send a second interrupt shortly after. Users can be impatient.
+ -- GHC's default interrupt handler is one-time, after the first
+ -- interrupt was called a second will terminate the process
+ -- immediately, leaving no time for clean-up.
+ threadDelay 50000
+ signalProcess Signals.sigINT executorHandle
+ threadDelay 2000000
+ -- Check that the executor did not complete the task.
+ progress <- readMVar mvar
+ when progress $
+ assertFailure "Executor should not have completed the task"
+ -- Spawn a new executor to finish the flow.
+ withExecutors wd 1 $ \_ ->
+ wait flowAsync
+ case r of
+ Nothing -> assertFailure "Timed out"
+ Just (Left err) -> assertFailure $ displayException err
+ Just (Right x) -> x @?= "test"
+ ]
diff --git a/test/Funflow/TestFlows.hs b/test/Funflow/TestFlows.hs
new file mode 100644
index 0000000..3732c43
--- /dev/null
+++ b/test/Funflow/TestFlows.hs
@@ -0,0 +1,137 @@
+{-# LANGUAGE Arrows #-}
+{-# LANGUAGE GADTs #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE QuasiQuotes #-}
+{-# LANGUAGE TemplateHaskell #-}
+{-# LANGUAGE TypeApplications #-}
+
+module Funflow.TestFlows where
+
+import Control.Arrow
+import Control.Arrow.Free
+import Control.Concurrent.Async (withAsync)
+import Control.Exception.Safe hiding (catch)
+import Control.Funflow
+import Control.Funflow.ContentStore (Content ((:</>)))
+import qualified Control.Funflow.ContentStore as CS
+import Control.Funflow.External.Coordinator.Memory
+import Control.Funflow.External.Executor (executeLoop)
+import Control.Monad (when)
+import Data.Default (def)
+import Data.List (sort)
+import Path
+import Path.IO
+import System.Random
+import Test.Tasty
+import Test.Tasty.HUnit
+
+data FlowAssertion where
+ FlowAssertion :: (Eq b, Show b)
+ => String -- test name
+ -> a -- input
+ -> SimpleFlow a b -- the flow to test
+ -> Maybe b --expected output - Nothing for expected failure
+ -> IO () -- test setup action
+ -> FlowAssertion
+
+mkError :: String -> SomeException
+mkError = toException . userError
+
+flow2 :: SimpleFlow () (Double,Double)
+flow2 = proc () -> do
+ r1 <- worstBernoulli mkError -< 0.2
+ r2 <- worstBernoulli mkError -< 0.3
+ returnA -< (r1,r2)
+
+flow2caught :: SimpleFlow () (Double,Double)
+flow2caught = retry 100 0 flow2
+
+aliasFlow :: SimpleFlow () (Maybe String, Maybe String)
+aliasFlow = proc () -> do
+ let alias = CS.Alias "alias"
+ mb1 <- lookupAliasInStore -< alias
+ r1 <- case mb1 of
+ Nothing -> do
+ item :</> _path <- writeString_ -< "test"
+ assignAliasInStore -< (alias, item)
+ returnA -< Nothing
+ Just item ->
+ arr Just <<< readString_ -< item
+ mb2 <- lookupAliasInStore -< alias
+ r2 <- case mb2 of
+ Nothing ->
+ returnA -< Nothing
+ Just item ->
+ arr Just <<< readString_ -< item
+ returnA -< (r1, r2)
+
+flowCached :: SimpleFlow () Bool
+flowCached = let
+ randomStep = stepIO' (def { cache = $(defaultCacherLoc (0 :: Int))}) $ const (randomIO :: IO Int)
+ in proc () -> do
+ t1 <- randomStep -< ()
+ t2 <- randomStep -< ()
+ returnA -< (t1 == t2)
+
+-- | Test that we can merge directories within the content store.
+flowMerge :: SimpleFlow () Bool
+flowMerge = proc () -> do
+ f1 <- writeString -< ("Hello World",[relfile|a|] )
+ f2 <- writeString -< ("Goodbye World", [relfile|b|])
+ comb <- mergeFiles -< [f1, f2]
+ files <- arr (fmap CS.contentFilename) <<< arr snd <<< listDirContents -< comb
+ returnA -< (sort files == sort [[relfile|a|], [relfile|b|]])
+
+-- | Test that a missing executable in an external causes a catchable error.
+flowMissingExecutable :: SimpleFlow () (Either () ())
+flowMissingExecutable = proc () -> do
+ r <- (arr Right <<< external (\() -> ExternalTask
+ { _etCommand = "non-existent-executable-39fd1e85a0a05113938e0"
+ , _etParams = []
+ , _etWriteToStdOut = True
+ }))
+ `catch` arr (Left @SomeException . snd)
+ -< ()
+ returnA -< case r of
+ Left _ -> Left ()
+ Right _ -> Right ()
+
+
+flowAssertions :: [FlowAssertion]
+flowAssertions =
+ [ FlowAssertion "death" "foo" melancholicLazarus Nothing setup
+ , FlowAssertion "resurrection" "bar" (retry 1 0 melancholicLazarus) (Just "bar") setup
+ , FlowAssertion "bernoulli_once" 0.2 (retry 20 0 $ worstBernoulli mkError >>^ (<2.0)) (Just True) (return ())
+ , FlowAssertion "bernoulli_twice" () (flow2caught >>^ snd >>^ (<2.0)) (Just True) (return ())
+ , FlowAssertion "failStep" () failStep Nothing (return ())
+ , FlowAssertion "aliasFlow" () aliasFlow (Just (Nothing, Just "test")) (return ())
+ , FlowAssertion "cachingFlow" () flowCached (Just True) (return ())
+ , FlowAssertion "mergingStoreItems" () flowMerge (Just True) (return ())
+ , FlowAssertion "missingExecutable" () flowMissingExecutable (Just (Left ())) (return ())
+ ]
+
+setup :: IO ()
+setup = do ex <- doesFileExist [absfile|/tmp/lazarus_note|]
+ when ex $ removeFile [absfile|/tmp/lazarus_note|]
+
+testFlowAssertion :: FlowAssertion -> TestTree
+testFlowAssertion (FlowAssertion nm x flw expect before) =
+ testCase nm $
+ withSystemTempDir "test_output_" $ \storeDir ->
+ CS.withStore storeDir $ \store -> do
+ hook <- createMemoryCoordinator
+ before
+ res <- withAsync (executeLoop MemoryCoordinator hook store) $ \_ ->
+ runSimpleFlow MemoryCoordinator hook store flw x
+ assertFlowResult expect res
+
+assertFlowResult :: (Eq a, Show ex, Show a) => Maybe a -> Either ex a -> Assertion
+assertFlowResult expect res =
+ case (expect, res) of
+ (Nothing, Left _) -> return ()
+ (Just xr, Right y) -> assertEqual "flow results" xr y
+ (Nothing, Right y) -> assertFailure $ "expected flow failure, got success" ++ show y
+ (Just xr, Left err) -> assertFailure $ "expected success "++ show xr++", got error" ++ show err
+
+tests :: TestTree
+tests = testGroup "Flow Assertions" $ map testFlowAssertion flowAssertions
diff --git a/test/Test.hs b/test/Test.hs
new file mode 100644
index 0000000..de3d732
--- /dev/null
+++ b/test/Test.hs
@@ -0,0 +1,16 @@
+import qualified Control.Arrow.Async.Tests
+import qualified Funflow.ContentStore
+import qualified Funflow.SQLiteCoordinator
+import qualified Funflow.TestFlows
+import Test.Tasty
+
+main :: IO ()
+main = defaultMain tests
+
+tests :: TestTree
+tests = testGroup "Unit Tests"
+ [ Funflow.ContentStore.tests
+ , Control.Arrow.Async.Tests.tests
+ , Funflow.TestFlows.tests
+ , Funflow.SQLiteCoordinator.tests
+ ]