summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authornclarke <>2018-07-27 11:46:00 (GMT)
committerhdiff <hdiff@hdiff.luite.com>2018-07-27 11:46:00 (GMT)
commit5103dd099c28df0f75cb547fe147721bba991a43 (patch)
tree0e1a2091cf281faf26a2de4343b74b6138f0cbec
parent64ea52e9497cd0ea7c37ebb6112d24af83ab90ba (diff)
version 1.3.01.3.0
-rw-r--r--TestFunflow.hs2
-rw-r--r--funflow.cabal3
-rw-r--r--src/Control/Funflow.hs3
-rw-r--r--src/Control/Funflow/Base.hs1
-rw-r--r--src/Control/Funflow/ContentStore.hs5
-rw-r--r--src/Control/Funflow/External.hs2
-rw-r--r--src/Control/Funflow/External/Docker.hs124
-rw-r--r--src/Control/Funflow/External/Executor.hs8
-rw-r--r--src/Control/Funflow/Steps.hs23
-rw-r--r--test/Funflow/SQLiteCoordinator.hs2
-rw-r--r--test/Funflow/TestFlows.hs14
11 files changed, 156 insertions, 31 deletions
diff --git a/TestFunflow.hs b/TestFunflow.hs
index 63e7adf..a65bf6f 100644
--- a/TestFunflow.hs
+++ b/TestFunflow.hs
@@ -56,6 +56,7 @@ externalTest = let
{ _etCommand = "/run/current-system/sw/bin/echo"
, _etParams = [textParam t]
, _etWriteToStdOut = StdOutCapture
+ , _etEnv = []
}
flow = exFlow >>> readString_
in withSystemTempDir "test_output_external_" $ \storeDir -> do
@@ -73,6 +74,7 @@ storeTest = let
{ _etCommand = "/run/current-system/sw/bin/cat"
, _etParams = [contentParam a, contentParam b]
, _etWriteToStdOut = StdOutCapture
+ , _etEnv = []
}
flow = proc (s1, s2) -> do
f1 <- writeString_ -< s1
diff --git a/funflow.cabal b/funflow.cabal
index 9e02b66..d929124 100644
--- a/funflow.cabal
+++ b/funflow.cabal
@@ -1,5 +1,5 @@
Name: funflow
-Version: 1.1.0
+Version: 1.3.0
Synopsis: Workflows with arrows
Description:
An arrow with resumable computations and logging
@@ -49,6 +49,7 @@ Library
, Control.Funflow.ContentStore.Notify
Build-depends:
base >= 4.6 && <5
+ , Glob
, aeson >= 1.2.3.0
, async
, bytestring
diff --git a/src/Control/Funflow.hs b/src/Control/Funflow.hs
index 756eb25..c7540d7 100644
--- a/src/Control/Funflow.hs
+++ b/src/Control/Funflow.hs
@@ -1,9 +1,12 @@
+{-# LANGUAGE TypeOperators #-}
+
-- | Central Funflow module.
--
-- This module just re-exports various other modules for convenience.
module Control.Funflow
( Base.Flow
, Base.SimpleFlow
+ , type (Base.==>)
, Base.NoEffect
, Base.Flow'(..)
, Base.Cacher(..)
diff --git a/src/Control/Funflow/Base.hs b/src/Control/Funflow/Base.hs
index e434e1b..582917c 100644
--- a/src/Control/Funflow/Base.hs
+++ b/src/Control/Funflow/Base.hs
@@ -115,6 +115,7 @@ runNoEffect :: forall arr. NoEffect ~> arr
runNoEffect = error "Impossible!"
type SimpleFlow = Flow NoEffect SomeException
+type (==>) = SimpleFlow
-- | Convert a flow to a diagram, for inspection/pretty printing
toDiagram :: Flow eff ex a b -> Diagram ex a b
diff --git a/src/Control/Funflow/ContentStore.hs b/src/Control/Funflow/ContentStore.hs
index b75f7f6..86d774e 100644
--- a/src/Control/Funflow/ContentStore.hs
+++ b/src/Control/Funflow/ContentStore.hs
@@ -97,6 +97,7 @@ module Control.Funflow.ContentStore
-- * Accessors
, itemHash
, itemPath
+ , itemRelPath
, contentPath
, contentItem
, contentFilename
@@ -315,6 +316,10 @@ newtype Alias = Alias { unAlias :: T.Text }
root :: ContentStore -> Path Abs Dir
root = storeRoot
+-- | The scoped path to a content item within the store.
+itemRelPath :: Item -> Path Rel Dir
+itemRelPath (Item x) = prefixHashPath itemPrefix x
+
-- | The store path of a completed item.
itemPath :: ContentStore -> Item -> Path Abs Dir
itemPath store = mkItemPath store . itemHash
diff --git a/src/Control/Funflow/External.hs b/src/Control/Funflow/External.hs
index 2a5eb12..f9834af 100644
--- a/src/Control/Funflow/External.hs
+++ b/src/Control/Funflow/External.hs
@@ -177,6 +177,8 @@ instance Store OutputCapture
data ExternalTask = ExternalTask {
_etCommand :: T.Text
, _etParams :: [Param]
+ -- ^ Environment variables to set for the scope of the execution.
+ , _etEnv :: [(T.Text, Param)]
, _etWriteToStdOut :: OutputCapture
} deriving (Generic, Show)
diff --git a/src/Control/Funflow/External/Docker.hs b/src/Control/Funflow/External/Docker.hs
index c5398ae..e20a5a6 100644
--- a/src/Control/Funflow/External/Docker.hs
+++ b/src/Control/Funflow/External/Docker.hs
@@ -2,6 +2,7 @@
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE ViewPatterns #-}
-- | Module supporting the use of docker containers as external tasks.
--
@@ -9,39 +10,106 @@
-- easier to specify certain tasks which will run inside docker containers. It
-- handles constructing the call to docker, mounting input and output
-- directories, and specifying the docker image and version to use.
-module Control.Funflow.External.Docker where
+module Control.Funflow.External.Docker
+ ( Config (..)
+ , toExternal
+ ) where
+import Control.Arrow (Kleisli (..), second)
import Control.Funflow.ContentHashable
import Control.Funflow.External
-import Data.Map.Strict (Map)
-import qualified Data.Map.Strict as Map
-import Data.Semigroup ((<>))
-import qualified Data.Text as T
-import GHC.Generics (Generic)
-import System.FilePath
+import Control.Monad.Trans.State.Strict
+import Data.Semigroup (Semigroup, (<>))
+import qualified Data.Text as T
+import GHC.Generics (Generic)
data Bind
- -- | Single input, will get mounted to @/input@ on the image.
- = SingleInput InputPath
- -- | Multiple inputs, each gets mouted into a subdirectory under
- -- @/input@ as described by the given mapping.
- | MultiInput (Map FilePath InputPath)
+ -- | No inputs
+ = NoInput
+ -- | Single input, and the path to its mountpoint within the system.
+ | SingleInput (InputPath, FilePath)
+ -- | Multiple inputs.
+ | MultiInput [(InputPath, FilePath)]
deriving Generic
instance ContentHashable IO Bind
+instance Semigroup Bind where
+ NoInput <> x = x
+ x <> NoInput = x
+ (SingleInput x) <> (SingleInput y) =
+ MultiInput [x, y]
+ (SingleInput x) <> (MultiInput m) =
+ MultiInput $ x : m
+ (MultiInput m) <> (SingleInput x) =
+ MultiInput $ x : m
+ (MultiInput m) <> (MultiInput m') =
+ MultiInput $ m <> m'
+
+instance Monoid Bind where
+ mempty = NoInput
+ mappend = (<>)
+
data Config = Config
{ image :: T.Text
, optImageID :: Maybe T.Text
- , input :: Bind
- , command :: FilePath
- , args :: [T.Text]
+ , command :: Param
+ , args :: [Param]
+ , env :: [(T.Text, Param)]
+ , stdout :: OutputCapture
+ } deriving Generic
+
+data Docker = Docker
+ { dImage :: T.Text
+ , dOptImageID :: Maybe T.Text
+ , dInput :: Bind
+ , dCommand :: Param
+ , dArgs :: [Param]
+ , dEnv :: [(T.Text, Param)]
+ , dStdout :: OutputCapture
} deriving Generic
-instance ContentHashable IO Config
+instance ContentHashable IO Docker
+
+-- | Convertion state for mapping from 'Config' to 'Docker'
+data ConvertState = ConvertState
+ { _csInput :: Bind
+ -- | Fresh name generator
+ , _csFresh :: Int
+ }
+
+toDocker :: Config -> Docker
+toDocker cfg = Docker
+ { dImage = image cfg
+ , dOptImageID = optImageID cfg
+ , dInput = input'
+ , dCommand = command'
+ , dArgs = args'
+ , dEnv = env'
+ , dStdout = stdout cfg
+ }
+ where
+ initState = ConvertState NoInput 0
+ ((command', args', env'), ConvertState input' _) = flip runState initState $ do
+ command'' <- transformParam (command cfg)
+ args'' <- mapM transformParam (args cfg)
+ env'' <- mapM (runKleisli $ second $ Kleisli transformParam) (env cfg)
+ return (command'', args'', env'')
+ transformParam :: Param -> State ConvertState Param
+ transformParam (Param pfs) = Param <$> mapM transformParamField pfs
+ transformParamField :: ParamField -> State ConvertState ParamField
+ transformParamField (ParamPath ip) = do
+ ConvertState input fresh <- get
+ put $ ConvertState
+ (input <> SingleInput (ip, mkInputPath fresh))
+ (fresh + 1)
+ return $ ParamText (T.pack $ mkInputPath fresh)
+ transformParamField po = return po
+ mkInputPath :: Int -> String
+ mkInputPath x = "/input/" <> show x <> "/"
toExternal :: Config -> ExternalTask
-toExternal cfg = ExternalTask
+toExternal (toDocker -> cfg) = ExternalTask
-- XXX: Allow to configure the path to the docker executable.
{ _etCommand = "docker"
, _etParams =
@@ -49,21 +117,23 @@ toExternal cfg = ExternalTask
, "--user=" <> uidParam
] ++ mounts ++
[ imageArg
- , stringParam (command cfg)
- ] ++ map textParam (args cfg)
- , _etWriteToStdOut = NoOutputCapture
+ , dCommand cfg
+ ] ++ dArgs cfg
+ , _etEnv = dEnv cfg
+ , _etWriteToStdOut = dStdout cfg
}
where
mounts = outputMount : inputMounts
mount src dst =
"--volume=" <> pathParam src <> ":" <> stringParam dst
outputMount = "--volume=" <> outParam <> ":/output"
- inputMounts = case input cfg of
- SingleInput chash -> [ mount chash "/input" ]
+ inputMounts = case dInput cfg of
+ NoInput -> []
+ SingleInput (chash, dest) -> [ mount chash dest ]
MultiInput cmap ->
- [ mount chash ("/input" </> dest)
- | (dest, chash) <- Map.toList cmap
+ [ mount chash dest
+ | (chash, dest) <- cmap
]
- imageArg = textParam $ case optImageID cfg of
- Nothing -> image cfg
- Just id' -> image cfg <> ":" <> id'
+ imageArg = textParam $ case dOptImageID cfg of
+ Nothing -> dImage cfg
+ Just id' -> dImage cfg <> ":" <> id'
diff --git a/src/Control/Funflow/External/Executor.hs b/src/Control/Funflow/External/Executor.hs
index 1503767..a300507 100644
--- a/src/Control/Funflow/External/Executor.hs
+++ b/src/Control/Funflow/External/Executor.hs
@@ -11,6 +11,7 @@
-- You probably want to start with 'executeLoop'.
module Control.Funflow.External.Executor where
+import Control.Arrow (second)
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async
import Control.Concurrent.MVar
@@ -75,11 +76,12 @@ execute store td = logError $ do
= withFollowFile fpErr stderr
. withFollowFile fpOut stdout
cmd = T.unpack $ td ^. tdTask . etCommand
- procSpec params = (proc cmd $ T.unpack <$> params)
+ procSpec params textEnv = (proc cmd $ T.unpack <$> params)
{ cwd = Just (fromAbsDir fp)
, close_fds = True
, std_err = UseHandle hErr
, std_out = UseHandle hOut
+ , env = map (bimap T.unpack T.unpack) <$> textEnv
}
convParam = ConvParam
{ convPath = pure . CS.itemPath store
@@ -90,6 +92,8 @@ execute store td = logError $ do
}
mbParams <- lift $ runMaybeT $
traverse (paramToText convParam) (td ^. tdTask . etParams)
+ mbEnv <- lift $ runMaybeT $
+ traverse (sequence . second (paramToText convParam)) (td ^. tdTask . etEnv)
params <- case mbParams of
Nothing -> fail "A parameter was not ready"
Just params -> return params
@@ -110,7 +114,7 @@ execute store td = logError $ do
(Json.encode (td ^. tdTask))
start <- lift $ getTime Monotonic
- let theProc = procSpec params
+ let theProc = procSpec params mbEnv
katipAddNamespace "process" . katipAddContext (sl "processId" $ show theProc) $ do
$(logTM) InfoS "Executing"
res <- lift $ tryIO $ withCreateProcess theProc $ \_ _ _ ph ->
diff --git a/src/Control/Funflow/Steps.hs b/src/Control/Funflow/Steps.hs
index 9cde1f2..1789574 100644
--- a/src/Control/Funflow/Steps.hs
+++ b/src/Control/Funflow/Steps.hs
@@ -16,6 +16,8 @@ module Control.Funflow.Steps
, copyDirToStore
, copyFileToStore
, listDirContents
+ , globDir
+ , globDirPattern
, lookupAliasInStore
, mergeDirs
, mergeFiles
@@ -59,6 +61,7 @@ import qualified Data.Yaml as Yaml
import GHC.Conc (threadDelay)
import Path
import Path.IO
+import qualified System.FilePath.Glob as Glob
import System.Posix.Files (accessModes, createLink,
setFileMode)
import System.Random
@@ -166,6 +169,25 @@ listDirContents = internalManipulateStore
)
)
+-- | Search for files in the directory matching the given text string, as a glob
+-- pattern.
+globDir :: ArrowFlow eff ex arr
+ => arr (CS.Content Dir, String) [CS.Content File]
+globDir = globDirPattern <<< second (arr $ Glob.simplify . Glob.compile)
+
+-- | Search for files in the directory matching the given pattern.
+globDirPattern :: ArrowFlow eff ex arr
+ => arr (CS.Content Dir, Glob.Pattern) [CS.Content File]
+globDirPattern = internalManipulateStore
+ ( \store (dir, patt) -> let
+ item = CS.contentItem dir
+ itemRoot = CS.itemPath store item
+ in do
+ files <- mapM parseAbsFile =<< Glob.globDir1 patt (toFilePath itemRoot)
+ relFiles <- for files (stripProperPrefix itemRoot)
+ return ( (item :</>) <$> relFiles )
+ )
+
-- | Merge a number of store directories together into a single output directory.
-- This uses hardlinks to avoid duplicating the data on disk.
mergeDirs :: ArrowFlow eff ex arr => arr [CS.Content Dir] (CS.Content Dir)
@@ -193,7 +215,6 @@ mergeFiles = proc inFiles -> do
createLink (toFilePath inFile) (toFilePath $ d </> filename inFile)
) -< absFiles
-
-- | Read the contents of the given file in the store.
readString :: ArrowFlow eff ex arr => arr (CS.Content File) String
readString = getFromStore $ readFile . fromAbsFile
diff --git a/test/Funflow/SQLiteCoordinator.hs b/test/Funflow/SQLiteCoordinator.hs
index faaa734..5a7a4a5 100644
--- a/test/Funflow/SQLiteCoordinator.hs
+++ b/test/Funflow/SQLiteCoordinator.hs
@@ -71,6 +71,7 @@ echo = external $ \msg -> ExternalTask
{ _etCommand = "echo"
, _etWriteToStdOut = StdOutCapture
, _etParams = ["-n", fromString msg]
+ , _etEnv = []
}
sleepEcho :: SimpleFlow (Double, String) CS.Item
@@ -82,6 +83,7 @@ sleepEcho = external $ \(time, msg) -> ExternalTask
, "sleep " <> fromString (show time) <> ";"
<> "echo -n " <> fromString msg
]
+ , _etEnv = []
}
flow :: SimpleFlow () String
diff --git a/test/Funflow/TestFlows.hs b/test/Funflow/TestFlows.hs
index c0e578f..f46fa10 100644
--- a/test/Funflow/TestFlows.hs
+++ b/test/Funflow/TestFlows.hs
@@ -89,6 +89,7 @@ flowMissingExecutable = proc () -> do
{ _etCommand = "non-existent-executable-39fd1e85a0a05113938e0"
, _etParams = []
, _etWriteToStdOut = StdOutCapture
+ , _etEnv = []
}))
`catch` arr (Left @SomeException . snd)
-< ()
@@ -96,6 +97,18 @@ flowMissingExecutable = proc () -> do
Left _ -> Left ()
Right _ -> Right ()
+-- | Test that we can provide an environment variable to an external step.
+externalEnvVar :: SimpleFlow () (Either String ())
+externalEnvVar = proc () -> do
+ r <- readString_ <<< external (\() -> ExternalTask
+ { _etCommand = "bash"
+ , _etParams = [textParam "-c", textParam "echo -n $FOO"]
+ , _etWriteToStdOut = StdOutCapture
+ , _etEnv = [("FOO", textParam "testing")]
+ }) -< ()
+ returnA -< case r of
+ "testing" -> Right ()
+ x -> Left x
flowAssertions :: [FlowAssertion]
flowAssertions =
@@ -108,6 +121,7 @@ flowAssertions =
, FlowAssertion "cachingFlow" () flowCached (Just True) (return ())
, FlowAssertion "mergingStoreItems" () flowMerge (Just True) (return ())
, FlowAssertion "missingExecutable" () flowMissingExecutable (Just (Left ())) (return ())
+ , FlowAssertion "externalEnvVar" () externalEnvVar (Just (Right ())) (return ())
]
setup :: IO ()