summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authornclarke <>2018-11-13 12:05:00 (GMT)
committerhdiff <hdiff@hdiff.luite.com>2018-11-13 12:05:00 (GMT)
commit3ceb65118bcf26cd628c9e0a0aba32cde5e9b4bf (patch)
tree8825b0366fab4fa7633b02c5f8bcef26c76d22a9
parent89828122a1266d6afd1a81ca44d2506ba9b5a15b (diff)
version 1.4.01.4.0
-rw-r--r--TestFunflow.hs4
-rw-r--r--funflow.cabal2
-rw-r--r--src/Control/Funflow.hs2
-rw-r--r--src/Control/Funflow/Base.hs14
-rw-r--r--src/Control/Funflow/Exec/Simple.hs71
-rw-r--r--src/Control/Funflow/External.hs22
-rw-r--r--src/Control/Funflow/External/Docker.hs8
-rw-r--r--src/Control/Funflow/External/Executor.hs16
-rw-r--r--test/Funflow/SQLiteCoordinator.hs4
-rw-r--r--test/Funflow/TestFlows.hs4
10 files changed, 100 insertions, 47 deletions
diff --git a/TestFunflow.hs b/TestFunflow.hs
index a65bf6f..93828dc 100644
--- a/TestFunflow.hs
+++ b/TestFunflow.hs
@@ -56,7 +56,7 @@ externalTest = let
{ _etCommand = "/run/current-system/sw/bin/echo"
, _etParams = [textParam t]
, _etWriteToStdOut = StdOutCapture
- , _etEnv = []
+ , _etEnv = EnvExplicit []
}
flow = exFlow >>> readString_
in withSystemTempDir "test_output_external_" $ \storeDir -> do
@@ -74,7 +74,7 @@ storeTest = let
{ _etCommand = "/run/current-system/sw/bin/cat"
, _etParams = [contentParam a, contentParam b]
, _etWriteToStdOut = StdOutCapture
- , _etEnv = []
+ , _etEnv = EnvExplicit []
}
flow = proc (s1, s2) -> do
f1 <- writeString_ -< s1
diff --git a/funflow.cabal b/funflow.cabal
index c4a9581..a1bf17f 100644
--- a/funflow.cabal
+++ b/funflow.cabal
@@ -1,5 +1,5 @@
Name: funflow
-Version: 1.3.2
+Version: 1.4.0
Synopsis: Workflows with arrows
Description:
An arrow with resumable computations and logging
diff --git a/src/Control/Funflow.hs b/src/Control/Funflow.hs
index c7540d7..c2d90ed 100644
--- a/src/Control/Funflow.hs
+++ b/src/Control/Funflow.hs
@@ -13,6 +13,8 @@ module Control.Funflow
, Base.ExternalProperties(..)
, Base.MDWriter
, Base.Properties(..)
+ , Base.EpPurity(..)
+ , Base.alwaysRecompile
, Base.defaultCacherWithIdent
, Cache.defaultCacher
, Cache.defaultCacherLoc
diff --git a/src/Control/Funflow/Base.hs b/src/Control/Funflow/Base.hs
index 582917c..5402784 100644
--- a/src/Control/Funflow/Base.hs
+++ b/src/Control/Funflow/Base.hs
@@ -23,11 +23,13 @@ import Control.Funflow.External
import Data.ByteString (ByteString)
import Data.Default
import Data.Functor.Identity
+import Data.Int (Int64)
import Data.Proxy (Proxy (..))
import qualified Data.Store as Store
import qualified Data.Text as T
import Path
import Prelude hiding (id, (.))
+import System.Random (randomIO)
-- | Metadata writer
type MDWriter i o = Maybe (i -> o -> [(T.Text, ByteString)])
@@ -74,19 +76,27 @@ instance Default (Properties i o) where
, mdpolicy = Nothing
}
+data EpPurity = EpPure | EpImpure (IO Int64)
+
+instance Default EpPurity where
+ def = EpPure
+
+alwaysRecompile :: EpPurity
+alwaysRecompile = EpImpure randomIO
+
-- | Additional properties associated with external tasks.
data ExternalProperties a = ExternalProperties
{ -- | Write additional metadata to the content store.
ep_mdpolicy :: MDWriter a ()
-- | Specify that this external step is impure, and as such should not be
-- cached.
- , ep_impure :: Bool
+ , ep_impure :: EpPurity
}
instance Default (ExternalProperties a) where
def = ExternalProperties
{ ep_mdpolicy = Nothing
- , ep_impure = False
+ , ep_impure = def
}
data Flow' eff a b where
diff --git a/src/Control/Funflow/Exec/Simple.hs b/src/Control/Funflow/Exec/Simple.hs
index e0b4ad0..9e60966 100644
--- a/src/Control/Funflow/Exec/Simple.hs
+++ b/src/Control/Funflow/Exec/Simple.hs
@@ -37,30 +37,32 @@ import Control.Funflow.External
import Control.Funflow.External.Coordinator
import Control.Funflow.External.Coordinator.Memory
import Control.Funflow.External.Executor (executeLoop)
-import Control.Monad.IO.Class (liftIO)
-import Control.Monad.Trans.Class (lift)
+import Control.Monad.IO.Class
+import Control.Monad.Trans.Control (MonadBaseControl)
+import Control.Monad.Catch (MonadCatch
+ ,MonadMask)
import qualified Data.ByteString as BS
import Data.Foldable (traverse_)
-import Data.Int (Int64)
import Data.Monoid ((<>))
import Data.Void
import Katip
import Path
import System.IO (stderr)
-import System.Random (randomIO)
-- | Simple evaulation of a flow
-runFlowEx :: forall c eff ex a b. (Coordinator c, Exception ex)
+runFlowEx :: forall m c eff ex a b.
+ (Coordinator c, Exception ex, MonadIO m, MonadBaseControl IO m
+ ,MonadCatch m, MonadMask m, KatipContext m)
=> c
-> Config c
-> CS.ContentStore
- -> (eff ~> AsyncA (KatipContextT IO)) -- ^ Natural transformation from wrapped effects
+ -> (eff ~> AsyncA m) -- ^ Natural transformation from wrapped effects
-> Int -- ^ Flow configuration identity. This forms part of the caching
-- system and is used to disambiguate the same flow run in
-- multiple configurations.
-> Flow eff ex a b
-> a
- -> KatipContextT IO b
+ -> m b
runFlowEx _ cfg store runWrapped confIdent flow input = do
hook <- initialise cfg
runAsyncA (eval (runFlow' hook) flow) input
@@ -68,7 +70,7 @@ runFlowEx _ cfg store runWrapped confIdent flow input = do
simpleOutPath item = toFilePath
$ CS.itemPath store item </> [relfile|out|]
withStoreCache :: forall i o. Cacher i o
- -> AsyncA (KatipContextT IO) i o -> AsyncA (KatipContextT IO) i o
+ -> AsyncA m i o -> AsyncA m i o
withStoreCache NoCache f = f
withStoreCache c f = let
chashOf i = cacherKey c confIdent i
@@ -99,14 +101,14 @@ runFlowEx _ cfg store runWrapped confIdent flow input = do
-> i
-> o
-> MDWriter i o
- -> KatipContextT IO ()
+ -> m ()
writeMd _ _ _ Nothing = return ()
writeMd chash i o (Just writer) =
let kvs = writer i o
in traverse_ (uncurry $ CS.setMetadata store chash) kvs
- runFlow' :: Hook c -> Flow' eff a1 b1 -> AsyncA (KatipContextT IO) a1 b1
+ runFlow' :: Hook c -> Flow' eff a1 b1 -> AsyncA m a1 b1
runFlow' _ (Step props f) = withStoreCache (cache props)
. AsyncA $ \x -> do
let out = f x
@@ -115,13 +117,13 @@ runFlowEx _ cfg store runWrapped confIdent flow input = do
Cache key _ _ -> writeMd (key confIdent x) x out $ mdpolicy props
return out
runFlow' _ (StepIO props f) = withStoreCache (cache props)
- . liftAsyncA $ AsyncA f
+ . AsyncA $ liftIO . f
runFlow' po (External props toTask) = AsyncA $ \x -> do
- chash <- liftIO $ if (ep_impure props)
- then do
- salt <- randomIO :: IO Int64
- contentHash (toTask x, salt)
- else contentHash (toTask x)
+ chash <- liftIO $ case (ep_impure props) of
+ EpPure -> contentHash (toTask x)
+ EpImpure fn -> do
+ salt <- fn
+ contentHash (toTask x, salt)
CS.lookup store chash >>= \case
-- The item in question is already in the store. No need to submit a task.
CS.Complete item -> return item
@@ -169,56 +171,63 @@ runFlowEx _ cfg store runWrapped confIdent flow input = do
CS.removeFailed store chash
)
runFlow' _ (GetFromStore f) = AsyncA $ \case
- CS.All item -> lift . f $ CS.itemPath store item
- item CS.:</> path -> lift . f $ CS.itemPath store item </> path
- runFlow' _ (InternalManipulateStore f) = AsyncA $ \i ->lift $ f store i
+ CS.All item -> liftIO . f $ CS.itemPath store item
+ item CS.:</> path -> liftIO . f $ CS.itemPath store item </> path
+ runFlow' _ (InternalManipulateStore f) = AsyncA $ \i -> liftIO $ f store i
runFlow' _ (Wrapped props w) = withStoreCache (cache props)
$ runWrapped w
-- | Run a flow in a logging context.
-runFlowLog :: forall c eff ex a b. (Coordinator c, Exception ex)
+runFlowLog :: forall m c eff ex a b.
+ (Coordinator c, Exception ex, MonadIO m, MonadBaseControl IO m
+ ,MonadCatch m, MonadMask m, KatipContext m)
=> c
-> Config c
-> CS.ContentStore
- -> (eff ~> AsyncA (KatipContextT IO)) -- ^ Natural transformation from wrapped effects
+ -> (eff ~> AsyncA m) -- ^ Natural transformation from wrapped effects
-> Int -- ^ Flow configuration identity. This forms part of the caching
-- system and is used to disambiguate the same flow run in
-- multiple configurations.
-> Flow eff ex a b
-> a
- -> KatipContextT IO (Either ex b)
+ -> m (Either ex b)
runFlowLog c cfg store runWrapped confIdent flow input =
try $ runFlowEx c cfg store runWrapped confIdent flow input
-- | Run a flow, discarding all logging.
-runFlow :: forall c eff ex a b. (Coordinator c, Exception ex)
+runFlow :: forall m c eff ex a b.
+ (Coordinator c, Exception ex, MonadIO m, MonadBaseControl IO m
+ ,MonadCatch m, MonadMask m)
=> c
-> Config c
-> CS.ContentStore
- -> (eff ~> AsyncA IO) -- ^ Natural transformation from wrapped effects
+ -> (eff ~> AsyncA m) -- ^ Natural transformation from wrapped effects
-> Int -- ^ Flow configuration identity. This forms part of the caching
-- system and is used to disambiguate the same flow run in
-- multiple configurations.
-> Flow eff ex a b
-> a
- -> IO (Either ex b)
+ -> m (Either ex b)
runFlow c cfg store runWrapped confIdent flow input = do
- le <- initLogEnv "funflow" "production"
+ le <- liftIO $ initLogEnv "funflow" "production"
runKatipContextT le () "runFlow"
$ runFlowLog c cfg store (liftAsyncA . runWrapped) confIdent flow input
-- | Run a simple flow. Logging will be sent to stderr
-runSimpleFlow :: forall c a b. (Coordinator c)
+runSimpleFlow :: forall m c a b.
+ (Coordinator c, MonadIO m, MonadBaseControl IO m
+ ,MonadCatch m, MonadMask m)
=> c
-> Config c
-> CS.ContentStore
-> SimpleFlow a b
-> a
- -> IO (Either SomeException b)
+ -> m (Either SomeException b)
runSimpleFlow c ccfg store flow input = do
- handleScribe <- mkHandleScribe ColorIfTerminal stderr InfoS V2
- let mkLogEnv = registerScribe "stderr" handleScribe defaultScribeSettings =<< initLogEnv "funflow" "production"
- bracket mkLogEnv closeScribes $ \le -> do
+ handleScribe <- liftIO $ mkHandleScribe ColorIfTerminal stderr InfoS V2
+ let mkLogEnv = liftIO $
+ registerScribe "stderr" handleScribe defaultScribeSettings =<< initLogEnv "funflow" "production"
+ bracket mkLogEnv (liftIO . closeScribes) $ \le -> do
let initialContext = ()
initialNamespace = "executeLoop"
diff --git a/src/Control/Funflow/External.hs b/src/Control/Funflow/External.hs
index d4e4d4e..4a21297 100644
--- a/src/Control/Funflow/External.hs
+++ b/src/Control/Funflow/External.hs
@@ -55,6 +55,9 @@ data ParamField
-- ^ Reference to the effective group ID of the executor.
| ParamOut
-- ^ Reference to the output path in the content store.
+ | ParamCmd Param
+ -- ^ A quoted command that we can pass to another program as an
+ -- argument.
deriving (Generic, Show)
instance ContentHashable IO ParamField
@@ -104,6 +107,7 @@ paramFieldToText c (ParamEnv env) = convEnv c env
paramFieldToText c ParamUid = T.pack . show <$> convUid c
paramFieldToText c ParamGid = T.pack . show <$> convGid c
paramFieldToText c ParamOut = T.pack . fromAbsDir <$> convOut c
+paramFieldToText c (ParamCmd cmd) = paramToText c cmd
-- | Transform a parameter to text using the given converter.
paramToText :: Applicative f
@@ -175,13 +179,29 @@ instance FromJSON OutputCapture
instance ToJSON OutputCapture
instance Store OutputCapture
+-- | Control the environment set for the external process. This can either
+-- inherit from the surrounding environment, or explicitly set things.
+data Env
+ -- | Inherit all environment variables from the surrounding shell. Note that
+ -- the values of these variables will not be taken into account in the
+ -- content hash, and so changes to them will not trigger a rerun of the
+ -- step.
+ = EnvInherit
+ | EnvExplicit [(T.Text, Param)]
+ deriving (Generic, Show)
+
+instance ContentHashable IO Env
+instance FromJSON Env
+instance ToJSON Env
+instance Store Env
+
-- | A monomorphic description of an external task. This is basically just
-- a command which can be run.
data ExternalTask = ExternalTask {
_etCommand :: T.Text
, _etParams :: [Param]
-- ^ Environment variables to set for the scope of the execution.
- , _etEnv :: [(T.Text, Param)]
+ , _etEnv :: Env
, _etWriteToStdOut :: OutputCapture
} deriving (Generic, Show)
diff --git a/src/Control/Funflow/External/Docker.hs b/src/Control/Funflow/External/Docker.hs
index 2d8edc6..ecf0edd 100644
--- a/src/Control/Funflow/External/Docker.hs
+++ b/src/Control/Funflow/External/Docker.hs
@@ -55,7 +55,7 @@ data Config = Config
, optImageID :: Maybe T.Text
, command :: Param
, args :: [Param]
- , env :: [(T.Text, Param)]
+ , env :: Env
, stdout :: OutputCapture
} deriving Generic
@@ -65,7 +65,7 @@ data Docker = Docker
, dInput :: Bind
, dCommand :: Param
, dArgs :: [Param]
- , dEnv :: [(T.Text, Param)]
+ , dEnv :: Env
, dStdout :: OutputCapture
} deriving Generic
@@ -93,7 +93,9 @@ toDocker cfg = Docker
((command', args', env'), ConvertState input' _) = flip runState initState $ do
command'' <- transformParam (command cfg)
args'' <- mapM transformParam (args cfg)
- env'' <- mapM (runKleisli $ second $ Kleisli transformParam) (env cfg)
+ env'' <- case env cfg of
+ EnvInherit -> pure EnvInherit
+ EnvExplicit x -> EnvExplicit <$> mapM (runKleisli $ second $ Kleisli transformParam) x
return (command'', args'', env'')
transformParam :: Param -> State ConvertState Param
transformParam (Param pfs) = Param <$> mapM transformParamField pfs
diff --git a/src/Control/Funflow/External/Executor.hs b/src/Control/Funflow/External/Executor.hs
index a300507..c7c9a5c 100644
--- a/src/Control/Funflow/External/Executor.hs
+++ b/src/Control/Funflow/External/Executor.hs
@@ -40,6 +40,7 @@ import System.IO (Handle, IOMode (..),
import System.Posix.Env (getEnv)
import System.Posix.User
import System.Process
+import GHC.IO.Handle (hClose)
data ExecutionResult =
-- | The result already exists in the store and there is no need
@@ -92,8 +93,13 @@ execute store td = logError $ do
}
mbParams <- lift $ runMaybeT $
traverse (paramToText convParam) (td ^. tdTask . etParams)
- mbEnv <- lift $ runMaybeT $
- traverse (sequence . second (paramToText convParam)) (td ^. tdTask . etEnv)
+ mbEnv <- case td ^. tdTask . etEnv of
+ EnvInherit -> pure Nothing
+ EnvExplicit x ->
+ (lift . runMaybeT $ traverse (sequence . second (paramToText convParam)) x)
+ >>= \case
+ Nothing -> fail "A parameter was not ready"
+ jp -> return jp
params <- case mbParams of
Nothing -> fail "A parameter was not ready"
Just params -> return params
@@ -121,6 +127,8 @@ execute store td = logError $ do
-- Error output should be displayed on our stderr stream
withFollowOutput $ do
exitCode <- waitForProcess ph
+ hClose hErr
+ hClose hOut
end <- getTime Monotonic
case exitCode of
ExitSuccess -> do
@@ -241,4 +249,6 @@ withFollowFile infile outhandle action = do
else do
BS.hPut outhandle some
loop
- snd <$> concurrently (tryIO loop) (action <* putMVar mv ())
+ res <- snd <$> concurrently (tryIO loop) (action <* putMVar mv ())
+ hClose inhandle
+ return res
diff --git a/test/Funflow/SQLiteCoordinator.hs b/test/Funflow/SQLiteCoordinator.hs
index 5a7a4a5..442bca3 100644
--- a/test/Funflow/SQLiteCoordinator.hs
+++ b/test/Funflow/SQLiteCoordinator.hs
@@ -71,7 +71,7 @@ echo = external $ \msg -> ExternalTask
{ _etCommand = "echo"
, _etWriteToStdOut = StdOutCapture
, _etParams = ["-n", fromString msg]
- , _etEnv = []
+ , _etEnv = EnvExplicit []
}
sleepEcho :: SimpleFlow (Double, String) CS.Item
@@ -83,7 +83,7 @@ sleepEcho = external $ \(time, msg) -> ExternalTask
, "sleep " <> fromString (show time) <> ";"
<> "echo -n " <> fromString msg
]
- , _etEnv = []
+ , _etEnv = EnvExplicit []
}
flow :: SimpleFlow () String
diff --git a/test/Funflow/TestFlows.hs b/test/Funflow/TestFlows.hs
index f46fa10..5c3161e 100644
--- a/test/Funflow/TestFlows.hs
+++ b/test/Funflow/TestFlows.hs
@@ -89,7 +89,7 @@ flowMissingExecutable = proc () -> do
{ _etCommand = "non-existent-executable-39fd1e85a0a05113938e0"
, _etParams = []
, _etWriteToStdOut = StdOutCapture
- , _etEnv = []
+ , _etEnv = EnvExplicit []
}))
`catch` arr (Left @SomeException . snd)
-< ()
@@ -104,7 +104,7 @@ externalEnvVar = proc () -> do
{ _etCommand = "bash"
, _etParams = [textParam "-c", textParam "echo -n $FOO"]
, _etWriteToStdOut = StdOutCapture
- , _etEnv = [("FOO", textParam "testing")]
+ , _etEnv = EnvExplicit [("FOO", textParam "testing")]
}) -< ()
returnA -< case r of
"testing" -> Right ()