diff options
author | nclarke <> | 2018-07-27 11:46:00 (GMT) |
---|---|---|
committer | hdiff <hdiff@hdiff.luite.com> | 2018-07-27 11:46:00 (GMT) |
commit | 5103dd099c28df0f75cb547fe147721bba991a43 (patch) | |
tree | 0e1a2091cf281faf26a2de4343b74b6138f0cbec | |
parent | 64ea52e9497cd0ea7c37ebb6112d24af83ab90ba (diff) |
version 1.3.01.3.0
-rw-r--r-- | TestFunflow.hs | 2 | ||||
-rw-r--r-- | funflow.cabal | 3 | ||||
-rw-r--r-- | src/Control/Funflow.hs | 3 | ||||
-rw-r--r-- | src/Control/Funflow/Base.hs | 1 | ||||
-rw-r--r-- | src/Control/Funflow/ContentStore.hs | 5 | ||||
-rw-r--r-- | src/Control/Funflow/External.hs | 2 | ||||
-rw-r--r-- | src/Control/Funflow/External/Docker.hs | 124 | ||||
-rw-r--r-- | src/Control/Funflow/External/Executor.hs | 8 | ||||
-rw-r--r-- | src/Control/Funflow/Steps.hs | 23 | ||||
-rw-r--r-- | test/Funflow/SQLiteCoordinator.hs | 2 | ||||
-rw-r--r-- | test/Funflow/TestFlows.hs | 14 |
11 files changed, 156 insertions, 31 deletions
diff --git a/TestFunflow.hs b/TestFunflow.hs index 63e7adf..a65bf6f 100644 --- a/TestFunflow.hs +++ b/TestFunflow.hs @@ -56,6 +56,7 @@ externalTest = let { _etCommand = "/run/current-system/sw/bin/echo" , _etParams = [textParam t] , _etWriteToStdOut = StdOutCapture + , _etEnv = [] } flow = exFlow >>> readString_ in withSystemTempDir "test_output_external_" $ \storeDir -> do @@ -73,6 +74,7 @@ storeTest = let { _etCommand = "/run/current-system/sw/bin/cat" , _etParams = [contentParam a, contentParam b] , _etWriteToStdOut = StdOutCapture + , _etEnv = [] } flow = proc (s1, s2) -> do f1 <- writeString_ -< s1 diff --git a/funflow.cabal b/funflow.cabal index 9e02b66..d929124 100644 --- a/funflow.cabal +++ b/funflow.cabal @@ -1,5 +1,5 @@ Name: funflow -Version: 1.1.0 +Version: 1.3.0 Synopsis: Workflows with arrows Description: An arrow with resumable computations and logging @@ -49,6 +49,7 @@ Library , Control.Funflow.ContentStore.Notify Build-depends: base >= 4.6 && <5 + , Glob , aeson >= 1.2.3.0 , async , bytestring diff --git a/src/Control/Funflow.hs b/src/Control/Funflow.hs index 756eb25..c7540d7 100644 --- a/src/Control/Funflow.hs +++ b/src/Control/Funflow.hs @@ -1,9 +1,12 @@ +{-# LANGUAGE TypeOperators #-} + -- | Central Funflow module. -- -- This module just re-exports various other modules for convenience. module Control.Funflow ( Base.Flow , Base.SimpleFlow + , type (Base.==>) , Base.NoEffect , Base.Flow'(..) , Base.Cacher(..) diff --git a/src/Control/Funflow/Base.hs b/src/Control/Funflow/Base.hs index e434e1b..582917c 100644 --- a/src/Control/Funflow/Base.hs +++ b/src/Control/Funflow/Base.hs @@ -115,6 +115,7 @@ runNoEffect :: forall arr. NoEffect ~> arr runNoEffect = error "Impossible!" type SimpleFlow = Flow NoEffect SomeException +type (==>) = SimpleFlow -- | Convert a flow to a diagram, for inspection/pretty printing toDiagram :: Flow eff ex a b -> Diagram ex a b diff --git a/src/Control/Funflow/ContentStore.hs b/src/Control/Funflow/ContentStore.hs index b75f7f6..86d774e 100644 --- a/src/Control/Funflow/ContentStore.hs +++ b/src/Control/Funflow/ContentStore.hs @@ -97,6 +97,7 @@ module Control.Funflow.ContentStore -- * Accessors , itemHash , itemPath + , itemRelPath , contentPath , contentItem , contentFilename @@ -315,6 +316,10 @@ newtype Alias = Alias { unAlias :: T.Text } root :: ContentStore -> Path Abs Dir root = storeRoot +-- | The scoped path to a content item within the store. +itemRelPath :: Item -> Path Rel Dir +itemRelPath (Item x) = prefixHashPath itemPrefix x + -- | The store path of a completed item. itemPath :: ContentStore -> Item -> Path Abs Dir itemPath store = mkItemPath store . itemHash diff --git a/src/Control/Funflow/External.hs b/src/Control/Funflow/External.hs index 2a5eb12..f9834af 100644 --- a/src/Control/Funflow/External.hs +++ b/src/Control/Funflow/External.hs @@ -177,6 +177,8 @@ instance Store OutputCapture data ExternalTask = ExternalTask { _etCommand :: T.Text , _etParams :: [Param] + -- ^ Environment variables to set for the scope of the execution. + , _etEnv :: [(T.Text, Param)] , _etWriteToStdOut :: OutputCapture } deriving (Generic, Show) diff --git a/src/Control/Funflow/External/Docker.hs b/src/Control/Funflow/External/Docker.hs index c5398ae..e20a5a6 100644 --- a/src/Control/Funflow/External/Docker.hs +++ b/src/Control/Funflow/External/Docker.hs @@ -2,6 +2,7 @@ {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE ViewPatterns #-} -- | Module supporting the use of docker containers as external tasks. -- @@ -9,39 +10,106 @@ -- easier to specify certain tasks which will run inside docker containers. It -- handles constructing the call to docker, mounting input and output -- directories, and specifying the docker image and version to use. -module Control.Funflow.External.Docker where +module Control.Funflow.External.Docker + ( Config (..) + , toExternal + ) where +import Control.Arrow (Kleisli (..), second) import Control.Funflow.ContentHashable import Control.Funflow.External -import Data.Map.Strict (Map) -import qualified Data.Map.Strict as Map -import Data.Semigroup ((<>)) -import qualified Data.Text as T -import GHC.Generics (Generic) -import System.FilePath +import Control.Monad.Trans.State.Strict +import Data.Semigroup (Semigroup, (<>)) +import qualified Data.Text as T +import GHC.Generics (Generic) data Bind - -- | Single input, will get mounted to @/input@ on the image. - = SingleInput InputPath - -- | Multiple inputs, each gets mouted into a subdirectory under - -- @/input@ as described by the given mapping. - | MultiInput (Map FilePath InputPath) + -- | No inputs + = NoInput + -- | Single input, and the path to its mountpoint within the system. + | SingleInput (InputPath, FilePath) + -- | Multiple inputs. + | MultiInput [(InputPath, FilePath)] deriving Generic instance ContentHashable IO Bind +instance Semigroup Bind where + NoInput <> x = x + x <> NoInput = x + (SingleInput x) <> (SingleInput y) = + MultiInput [x, y] + (SingleInput x) <> (MultiInput m) = + MultiInput $ x : m + (MultiInput m) <> (SingleInput x) = + MultiInput $ x : m + (MultiInput m) <> (MultiInput m') = + MultiInput $ m <> m' + +instance Monoid Bind where + mempty = NoInput + mappend = (<>) + data Config = Config { image :: T.Text , optImageID :: Maybe T.Text - , input :: Bind - , command :: FilePath - , args :: [T.Text] + , command :: Param + , args :: [Param] + , env :: [(T.Text, Param)] + , stdout :: OutputCapture + } deriving Generic + +data Docker = Docker + { dImage :: T.Text + , dOptImageID :: Maybe T.Text + , dInput :: Bind + , dCommand :: Param + , dArgs :: [Param] + , dEnv :: [(T.Text, Param)] + , dStdout :: OutputCapture } deriving Generic -instance ContentHashable IO Config +instance ContentHashable IO Docker + +-- | Convertion state for mapping from 'Config' to 'Docker' +data ConvertState = ConvertState + { _csInput :: Bind + -- | Fresh name generator + , _csFresh :: Int + } + +toDocker :: Config -> Docker +toDocker cfg = Docker + { dImage = image cfg + , dOptImageID = optImageID cfg + , dInput = input' + , dCommand = command' + , dArgs = args' + , dEnv = env' + , dStdout = stdout cfg + } + where + initState = ConvertState NoInput 0 + ((command', args', env'), ConvertState input' _) = flip runState initState $ do + command'' <- transformParam (command cfg) + args'' <- mapM transformParam (args cfg) + env'' <- mapM (runKleisli $ second $ Kleisli transformParam) (env cfg) + return (command'', args'', env'') + transformParam :: Param -> State ConvertState Param + transformParam (Param pfs) = Param <$> mapM transformParamField pfs + transformParamField :: ParamField -> State ConvertState ParamField + transformParamField (ParamPath ip) = do + ConvertState input fresh <- get + put $ ConvertState + (input <> SingleInput (ip, mkInputPath fresh)) + (fresh + 1) + return $ ParamText (T.pack $ mkInputPath fresh) + transformParamField po = return po + mkInputPath :: Int -> String + mkInputPath x = "/input/" <> show x <> "/" toExternal :: Config -> ExternalTask -toExternal cfg = ExternalTask +toExternal (toDocker -> cfg) = ExternalTask -- XXX: Allow to configure the path to the docker executable. { _etCommand = "docker" , _etParams = @@ -49,21 +117,23 @@ toExternal cfg = ExternalTask , "--user=" <> uidParam ] ++ mounts ++ [ imageArg - , stringParam (command cfg) - ] ++ map textParam (args cfg) - , _etWriteToStdOut = NoOutputCapture + , dCommand cfg + ] ++ dArgs cfg + , _etEnv = dEnv cfg + , _etWriteToStdOut = dStdout cfg } where mounts = outputMount : inputMounts mount src dst = "--volume=" <> pathParam src <> ":" <> stringParam dst outputMount = "--volume=" <> outParam <> ":/output" - inputMounts = case input cfg of - SingleInput chash -> [ mount chash "/input" ] + inputMounts = case dInput cfg of + NoInput -> [] + SingleInput (chash, dest) -> [ mount chash dest ] MultiInput cmap -> - [ mount chash ("/input" </> dest) - | (dest, chash) <- Map.toList cmap + [ mount chash dest + | (chash, dest) <- cmap ] - imageArg = textParam $ case optImageID cfg of - Nothing -> image cfg - Just id' -> image cfg <> ":" <> id' + imageArg = textParam $ case dOptImageID cfg of + Nothing -> dImage cfg + Just id' -> dImage cfg <> ":" <> id' diff --git a/src/Control/Funflow/External/Executor.hs b/src/Control/Funflow/External/Executor.hs index 1503767..a300507 100644 --- a/src/Control/Funflow/External/Executor.hs +++ b/src/Control/Funflow/External/Executor.hs @@ -11,6 +11,7 @@ -- You probably want to start with 'executeLoop'. module Control.Funflow.External.Executor where +import Control.Arrow (second) import Control.Concurrent (threadDelay) import Control.Concurrent.Async import Control.Concurrent.MVar @@ -75,11 +76,12 @@ execute store td = logError $ do = withFollowFile fpErr stderr . withFollowFile fpOut stdout cmd = T.unpack $ td ^. tdTask . etCommand - procSpec params = (proc cmd $ T.unpack <$> params) + procSpec params textEnv = (proc cmd $ T.unpack <$> params) { cwd = Just (fromAbsDir fp) , close_fds = True , std_err = UseHandle hErr , std_out = UseHandle hOut + , env = map (bimap T.unpack T.unpack) <$> textEnv } convParam = ConvParam { convPath = pure . CS.itemPath store @@ -90,6 +92,8 @@ execute store td = logError $ do } mbParams <- lift $ runMaybeT $ traverse (paramToText convParam) (td ^. tdTask . etParams) + mbEnv <- lift $ runMaybeT $ + traverse (sequence . second (paramToText convParam)) (td ^. tdTask . etEnv) params <- case mbParams of Nothing -> fail "A parameter was not ready" Just params -> return params @@ -110,7 +114,7 @@ execute store td = logError $ do (Json.encode (td ^. tdTask)) start <- lift $ getTime Monotonic - let theProc = procSpec params + let theProc = procSpec params mbEnv katipAddNamespace "process" . katipAddContext (sl "processId" $ show theProc) $ do $(logTM) InfoS "Executing" res <- lift $ tryIO $ withCreateProcess theProc $ \_ _ _ ph -> diff --git a/src/Control/Funflow/Steps.hs b/src/Control/Funflow/Steps.hs index 9cde1f2..1789574 100644 --- a/src/Control/Funflow/Steps.hs +++ b/src/Control/Funflow/Steps.hs @@ -16,6 +16,8 @@ module Control.Funflow.Steps , copyDirToStore , copyFileToStore , listDirContents + , globDir + , globDirPattern , lookupAliasInStore , mergeDirs , mergeFiles @@ -59,6 +61,7 @@ import qualified Data.Yaml as Yaml import GHC.Conc (threadDelay) import Path import Path.IO +import qualified System.FilePath.Glob as Glob import System.Posix.Files (accessModes, createLink, setFileMode) import System.Random @@ -166,6 +169,25 @@ listDirContents = internalManipulateStore ) ) +-- | Search for files in the directory matching the given text string, as a glob +-- pattern. +globDir :: ArrowFlow eff ex arr + => arr (CS.Content Dir, String) [CS.Content File] +globDir = globDirPattern <<< second (arr $ Glob.simplify . Glob.compile) + +-- | Search for files in the directory matching the given pattern. +globDirPattern :: ArrowFlow eff ex arr + => arr (CS.Content Dir, Glob.Pattern) [CS.Content File] +globDirPattern = internalManipulateStore + ( \store (dir, patt) -> let + item = CS.contentItem dir + itemRoot = CS.itemPath store item + in do + files <- mapM parseAbsFile =<< Glob.globDir1 patt (toFilePath itemRoot) + relFiles <- for files (stripProperPrefix itemRoot) + return ( (item :</>) <$> relFiles ) + ) + -- | Merge a number of store directories together into a single output directory. -- This uses hardlinks to avoid duplicating the data on disk. mergeDirs :: ArrowFlow eff ex arr => arr [CS.Content Dir] (CS.Content Dir) @@ -193,7 +215,6 @@ mergeFiles = proc inFiles -> do createLink (toFilePath inFile) (toFilePath $ d </> filename inFile) ) -< absFiles - -- | Read the contents of the given file in the store. readString :: ArrowFlow eff ex arr => arr (CS.Content File) String readString = getFromStore $ readFile . fromAbsFile diff --git a/test/Funflow/SQLiteCoordinator.hs b/test/Funflow/SQLiteCoordinator.hs index faaa734..5a7a4a5 100644 --- a/test/Funflow/SQLiteCoordinator.hs +++ b/test/Funflow/SQLiteCoordinator.hs @@ -71,6 +71,7 @@ echo = external $ \msg -> ExternalTask { _etCommand = "echo" , _etWriteToStdOut = StdOutCapture , _etParams = ["-n", fromString msg] + , _etEnv = [] } sleepEcho :: SimpleFlow (Double, String) CS.Item @@ -82,6 +83,7 @@ sleepEcho = external $ \(time, msg) -> ExternalTask , "sleep " <> fromString (show time) <> ";" <> "echo -n " <> fromString msg ] + , _etEnv = [] } flow :: SimpleFlow () String diff --git a/test/Funflow/TestFlows.hs b/test/Funflow/TestFlows.hs index c0e578f..f46fa10 100644 --- a/test/Funflow/TestFlows.hs +++ b/test/Funflow/TestFlows.hs @@ -89,6 +89,7 @@ flowMissingExecutable = proc () -> do { _etCommand = "non-existent-executable-39fd1e85a0a05113938e0" , _etParams = [] , _etWriteToStdOut = StdOutCapture + , _etEnv = [] })) `catch` arr (Left @SomeException . snd) -< () @@ -96,6 +97,18 @@ flowMissingExecutable = proc () -> do Left _ -> Left () Right _ -> Right () +-- | Test that we can provide an environment variable to an external step. +externalEnvVar :: SimpleFlow () (Either String ()) +externalEnvVar = proc () -> do + r <- readString_ <<< external (\() -> ExternalTask + { _etCommand = "bash" + , _etParams = [textParam "-c", textParam "echo -n $FOO"] + , _etWriteToStdOut = StdOutCapture + , _etEnv = [("FOO", textParam "testing")] + }) -< () + returnA -< case r of + "testing" -> Right () + x -> Left x flowAssertions :: [FlowAssertion] flowAssertions = @@ -108,6 +121,7 @@ flowAssertions = , FlowAssertion "cachingFlow" () flowCached (Just True) (return ()) , FlowAssertion "mergingStoreItems" () flowMerge (Just True) (return ()) , FlowAssertion "missingExecutable" () flowMissingExecutable (Just (Left ())) (return ()) + , FlowAssertion "externalEnvVar" () externalEnvVar (Just (Right ())) (return ()) ] setup :: IO () |