summaryrefslogtreecommitdiff
path: root/src/System
diff options
context:
space:
mode:
authorYvesPares <>2019-10-09 10:16:00 (GMT)
committerhdiff <hdiff@hdiff.luite.com>2019-10-09 10:16:00 (GMT)
commit0a26644cffcc096b169b4f1ee3b790b562c01c51 (patch)
tree0404a48f10a79540a26f315f0af2da316474e3a9 /src/System
version 0.1.0.00.1.0.0
Diffstat (limited to 'src/System')
-rw-r--r--src/System/ClockHelpers.hs32
-rw-r--r--src/System/TaskPipeline.hs15
-rw-r--r--src/System/TaskPipeline/CLI.hs437
-rw-r--r--src/System/TaskPipeline/Caching.hs112
-rw-r--r--src/System/TaskPipeline/ConfigurationReader.hs150
-rw-r--r--src/System/TaskPipeline/Logger.hs117
-rw-r--r--src/System/TaskPipeline/Options.hs84
-rw-r--r--src/System/TaskPipeline/PTask.hs218
-rw-r--r--src/System/TaskPipeline/PTask/Internal.hs286
-rw-r--r--src/System/TaskPipeline/PorcupineTree.hs689
-rw-r--r--src/System/TaskPipeline/Repetition.hs119
-rw-r--r--src/System/TaskPipeline/Repetition/Foldl.hs145
-rw-r--r--src/System/TaskPipeline/Repetition/Internal.hs119
-rw-r--r--src/System/TaskPipeline/Repetition/Streaming.hs127
-rw-r--r--src/System/TaskPipeline/Run.hs269
-rw-r--r--src/System/TaskPipeline/VirtualFileAccess.hs405
16 files changed, 3324 insertions, 0 deletions
diff --git a/src/System/ClockHelpers.hs b/src/System/ClockHelpers.hs
new file mode 100644
index 0000000..244e221
--- /dev/null
+++ b/src/System/ClockHelpers.hs
@@ -0,0 +1,32 @@
+{-# OPTIONS_GHC -fno-warn-orphans #-}
+
+module System.ClockHelpers
+ ( Clock(..)
+ , TimeSpec(..)
+ , diffTimeSpec
+ , getTime
+ , showTimeSpec
+ , clockM
+ ) where
+
+import Control.Monad.IO.Class
+import Data.Aeson
+import Katip
+import System.Clock
+
+
+clockM :: (MonadIO m) => m a -> m (a, TimeSpec)
+clockM act = f <$> time <*> act <*> time
+ where
+ time = liftIO $ getTime Realtime
+ f start res end = (res, end `diffTimeSpec` start)
+
+showTimeSpec :: TimeSpec -> String
+showTimeSpec ts = show (fromIntegral (toNanoSecs ts) / (10**9) :: Double) ++ "s"
+
+-- To be able to use TimeSpec as part of katip contexts (katipAddContext)
+instance ToJSON TimeSpec
+instance ToObject TimeSpec
+instance LogItem TimeSpec where
+ payloadKeys v _ | v >= V1 = AllKeys
+ | otherwise = SomeKeys []
diff --git a/src/System/TaskPipeline.hs b/src/System/TaskPipeline.hs
new file mode 100644
index 0000000..ec49546
--- /dev/null
+++ b/src/System/TaskPipeline.hs
@@ -0,0 +1,15 @@
+module System.TaskPipeline
+ ( module System.TaskPipeline.PTask
+ , module System.TaskPipeline.PorcupineTree
+ , module System.TaskPipeline.VirtualFileAccess
+ , module System.TaskPipeline.Options
+ , module System.TaskPipeline.Repetition
+ , module Data.Locations.LogAndErrors
+ ) where
+
+import Data.Locations.LogAndErrors
+import System.TaskPipeline.Options
+import System.TaskPipeline.PTask
+import System.TaskPipeline.Repetition
+import System.TaskPipeline.PorcupineTree
+import System.TaskPipeline.VirtualFileAccess
diff --git a/src/System/TaskPipeline/CLI.hs b/src/System/TaskPipeline/CLI.hs
new file mode 100644
index 0000000..d5d71f4
--- /dev/null
+++ b/src/System/TaskPipeline/CLI.hs
@@ -0,0 +1,437 @@
+{-# LANGUAGE ExistentialQuantification #-}
+{-# LANGUAGE GADTs #-}
+{-# LANGUAGE LambdaCase #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE RankNTypes #-}
+{-# LANGUAGE TemplateHaskell #-}
+{-# LANGUAGE TupleSections #-}
+{-# OPTIONS_GHC -Wall #-}
+
+module System.TaskPipeline.CLI
+ ( module System.TaskPipeline.ConfigurationReader
+ , PipelineCommand(..)
+ , PipelineConfigMethod(..)
+ , BaseInputConfig(..)
+ , PostParsingAction(..)
+ , PreRun(..)
+ , ConfigFileSource(..)
+ , tryReadConfigFileSource
+ , cliYamlParser
+ , execCliParser
+ , withCliParser
+ , pipelineCliParser
+
+ , pipelineConfigMethodProgName
+ , pipelineConfigMethodDefRoot
+ , pipelineConfigMethodConfigFile
+ , pipelineConfigMethodDefReturnVal
+ , withConfigFileSourceFromCLI
+ ) where
+
+import Control.Lens hiding (argument)
+import Control.Monad.IO.Class
+import Data.Aeson as A
+import qualified Data.Aeson.Encode.Pretty as A
+import qualified Data.ByteString as BS
+import qualified Data.ByteString.Lazy as LBS
+import Data.Char (toLower)
+import qualified Data.HashMap.Lazy as HashMap
+import Data.Locations
+import Data.Maybe
+import Data.Representable
+import qualified Data.Text as T
+import qualified Data.Text.Encoding as T
+import qualified Data.Yaml as Y
+import Katip
+import Options.Applicative
+import System.Directory
+import System.Environment (getArgs, withArgs)
+import System.IO (stdin)
+import System.TaskPipeline.ConfigurationReader
+import System.TaskPipeline.Logger
+import System.TaskPipeline.PorcupineTree (PhysicalFileNodeShowOpts(..))
+
+
+-- | The command to parse from the CLI
+data PipelineCommand
+ = RunPipeline
+ | ShowTree LocationTreePath PhysicalFileNodeShowOpts
+
+-- | Tells whether and how command-line options should be used. @r@ is the type
+-- of the result of the PTask that this PipelineConfigMethod will be use for
+-- (see @runPipelineTask@). _pipelineConfigMethodDefReturnVal will be used as a
+-- return value for the pipeline eg. when the user just wants to write the
+-- config template, and not run the pipeline.
+data PipelineConfigMethod r
+ = NoConfig { _pipelineConfigMethodProgName :: String
+ , _pipelineConfigMethodDefRoot :: Loc }
+ -- ^ No CLI and no config file are used, in which case we require a program
+ -- name (for logs) and the root directory.
+ | ConfigFileOnly { _pipelineConfigMethodProgName :: String
+ , _pipelineConfigMethodConfigFile :: LocalFilePath
+ , _pipelineConfigMethodDefRoot :: Loc }
+ -- ^ Config file is read and loaded if it exists. No CLI is used.
+ | FullConfig { _pipelineConfigMethodProgName :: String
+ , _pipelineConfigMethodConfigFile :: LocalFilePath
+ , _pipelineConfigMethodDefRoot :: Loc
+ , _pipelineConfigMethodDefReturnVal :: r }
+ -- ^ Both config file and CLI will be used. We require a name (for the help
+ -- page in the CLI), the config file path, the default 'LocationTree' root
+ -- mapping, and a default value to return when the CLI is called with a
+ -- command other than 'run' (in which case the pipeline will not run).
+
+makeLenses ''PipelineConfigMethod
+
+-- | Runs the new ModelCLI unless a Yaml or Json config file is given on the
+-- command line
+withCliParser
+ :: String
+ -> String
+ -> Parser (Maybe (a, cmd), LoggerScribeParams, [PostParsingAction])
+ -> r
+ -> (a -> cmd -> LoggerScribeParams -> PreRun -> IO r)
+ -> IO r
+withCliParser progName progDesc_ cliParser defRetVal f = do
+ (mbArgs, lsp, actions) <- execCliParser progName progDesc_ cliParser
+ case mbArgs of
+ Just (cfg, cmd) ->
+ f cfg cmd lsp $ PreRun $ mapM_ processAction actions
+ Nothing -> runLogger progName lsp $ do
+ mapM_ processAction actions
+ return defRetVal
+ where
+ processAction (PostParsingLog s l) = logFM s l
+ processAction (PostParsingWrite configFile cfg) = do
+ let rawFile = configFile ^. pathWithExtensionAsRawFilePath
+ case configFile of
+ PathWithExtension "-" _ ->
+ error "Config was read from stdin, cannot overwrite it"
+ PathWithExtension _ e | e `elem` ["yaml","yml"] ->
+ liftIO $ Y.encodeFile rawFile cfg
+ PathWithExtension _ "json" ->
+ liftIO $ LBS.writeFile rawFile $ A.encodePretty cfg
+ _ -> error $ "Config file has unknown format"
+ logFM NoticeS $ logStr $ "Wrote file '" ++ rawFile ++ "'"
+
+data ConfigFileSource = YAMLStdin | ConfigFileURL Loc
+
+-- | Tries to read a yaml filepath on CLI, then a JSON path, then command line
+-- args as expected by the @callParser@ argument.
+withConfigFileSourceFromCLI
+ :: (Maybe ConfigFileSource -> IO b) -- If a filepath has been read as first argument
+ -> IO b
+withConfigFileSourceFromCLI f = do
+ cliArgs <- liftIO getArgs
+ case cliArgs of
+ [] -> f Nothing
+ filename : rest -> do
+ case parseURL filename of
+ Left _ -> f Nothing
+ Right (LocalFile (PathWithExtension "-" ext)) ->
+ if ext == "" || allowedExt ext
+ then withArgs rest $ f $ Just YAMLStdin
+ else error $ filename ++ ": Only JSON or YAML config can be read from stdin"
+ Right loc | getLocType loc == "" -> f Nothing
+ -- No extension, therefore probably not a filepath
+ | allowedExt (getLocType loc) -> withArgs rest $ f $ Just $ ConfigFileURL loc
+ | otherwise -> error $ filename ++ ": Only JSON or YAML cofiles allowed"
+ where
+ allowedExt = (`elem` ["yaml","yml","json"]) . map toLower
+
+tryReadConfigFileSource :: (FromJSON cfg)
+ => ConfigFileSource -> (Loc -> IO cfg) -> IO (Maybe cfg)
+tryReadConfigFileSource configFileSource ifRemote =
+ case configFileSource of
+ YAMLStdin ->
+ Just <$> (BS.hGetContents stdin >>= Y.decodeThrow)
+ ConfigFileURL (LocalFile lfp) -> do
+ let p = lfp ^. pathWithExtensionAsRawFilePath
+ yamlFound <- doesFileExist p
+ if yamlFound
+ then Just <$> Y.decodeFileThrow p
+ else return Nothing
+ ConfigFileURL remoteLoc -> -- If config is remote, it must always be present
+ -- for now
+ Just <$> ifRemote remoteLoc
+
+data BaseInputConfig cfg = BaseInputConfig
+ { bicSourceFile :: Maybe LocalFilePath
+ , bicLoadedConfig :: Maybe Y.Value
+ , bicDefaultConfig :: cfg
+ }
+
+-- | Creates a command line parser that will return an action returning the
+-- configuration and the chosen subcommand or Nothing if the user simply asked
+-- to save some overrides and that the program should stop. It _does not_ mean
+-- that an error has occured, just that the program should not continue.
+cliYamlParser
+ :: (ToJSON cfg)
+ => String -- ^ The program name
+ -> BaseInputConfig cfg -- ^ Default configuration
+ -> ConfigurationReader cfg overrides
+ -> [(Parser cmd, String, String)] -- ^ [(Parser cmd, Command repr, Command help string)]
+ -> cmd -- ^ Default command
+ -> IO (Parser (Maybe (cfg, cmd), LoggerScribeParams, [PostParsingAction]))
+cliYamlParser progName baseInputCfg inputParsing cmds defCmd = do
+ return $ pureCliParser progName baseInputCfg inputParsing cmds defCmd
+
+-- | A shortcut to run a parser and defining the program help strings
+execCliParser
+ :: String
+ -> String
+ -> Parser a
+ -> IO a
+execCliParser header_ progDesc_ parser_ = do
+ let opts = info (helper <*> parser_)
+ ( fullDesc
+ <> header header_
+ <> progDesc progDesc_ )
+ execParser opts
+
+pureCliParser
+ :: (ToJSON cfg)
+ => String -- ^ The program name
+ -> BaseInputConfig cfg -- ^ The base configuration we read
+ -> ConfigurationReader cfg overrides
+ -> [(Parser cmd, String, String)] -- ^ [(Parser cmd, Command repr, Command help string)]
+ -> cmd -- ^ Default command
+ -> Parser (Maybe (cfg, cmd), LoggerScribeParams, [PostParsingAction])
+ -- ^ (Config and command, actions to run to
+ -- override the yaml file)
+pureCliParser progName baseInputConfig cfgCLIParsing cmds defCmd =
+ (case bicSourceFile baseInputConfig of
+ Nothing -> empty
+ Just f -> subparser $
+ command "write-config-template"
+ (info
+ (pure (Nothing, maxVerbosityLoggerScribeParams
+ ,[PostParsingWrite f (bicDefaultConfig baseInputConfig)]))
+ (progDesc $ "Write a default configuration file in " <> (f^.pathWithExtensionAsRawFilePath))))
+ <|>
+ handleOptions progName baseInputConfig cliOverriding
+ <$> ((subparser $
+ (case bicSourceFile baseInputConfig of
+ Nothing -> mempty
+ Just f ->
+ command "save" $
+ info (pure Nothing) $
+ progDesc $ "Just save the command line overrides in " <> (f^.pathWithExtensionAsRawFilePath))
+ <>
+ foldMap
+ (\(cmdParser, cmdShown, cmdInfo) ->
+ command cmdShown $
+ info (Just . (,cmdShown) <$> cmdParser) $
+ progDesc cmdInfo)
+ cmds)
+ <|>
+ pure (Just (defCmd, "")))
+ <*> (case bicSourceFile baseInputConfig of
+ Nothing -> pure False
+ Just f ->
+ switch ( long "save"
+ <> short 's'
+ <> help ("Save overrides in the " <> (f^.pathWithExtensionAsRawFilePath) <> " before running.") ))
+ <*> overridesParser cliOverriding
+ where
+ cliOverriding = addScribeParamsParsing cfgCLIParsing
+
+severityShortcuts :: Parser Severity
+severityShortcuts =
+ numToSeverity <$> liftA2 (-)
+ (length <$>
+ (many
+ (flag' ()
+ ( short 'q'
+ <> long "quiet"
+ <> help "Print only warning (-q) or error (-qq) messages. Cancels out with -v."))))
+ (length <$>
+ (many
+ (flag' ()
+ ( long "verbose"
+ <> short 'v'
+ <> help "Print info (-v) and debug (-vv) messages. Cancels out with -q."))))
+ where
+ numToSeverity (-1) = InfoS
+ numToSeverity 0 = NoticeS
+ numToSeverity 1 = WarningS
+ numToSeverity 2 = ErrorS
+ numToSeverity 3 = CriticalS
+ numToSeverity 4 = AlertS
+ numToSeverity x | x>0 = EmergencyS
+ | otherwise = DebugS
+
+-- | Parses the CLI options that will be given to Katip's logger scribe
+parseScribeParams :: Parser LoggerScribeParams
+parseScribeParams = LoggerScribeParams
+ <$> ((option (eitherReader severityParser)
+ ( long "severity"
+ <> help "Control exactly which minimal severity level will be logged (used instead of -q or -v)"))
+ <|>
+ severityShortcuts)
+ <*> (numToVerbosity <$>
+ option auto
+ ( long "context-verb"
+ <> short 'c'
+ <> help "A number from 0 to 3 (default: 0). Controls the amount of context to show per log line"
+ <> value (0 :: Int)))
+ <*> (option (eitherReader loggerFormatParser)
+ ( long "log-format"
+ <> help "Selects a format for the log: 'pretty' (default, only for human consumption), 'compact' (pretty but more compact), 'json' or 'bracket'"
+ <> value PrettyLog))
+ where
+ severityParser = \case
+ "debug" -> Right DebugS
+ "info" -> Right InfoS
+ "notice" -> Right NoticeS
+ "warning" -> Right WarningS
+ "error" -> Right ErrorS
+ "critical" -> Right CriticalS
+ "alert" -> Right AlertS
+ "emergency" -> Right EmergencyS
+ s -> Left $ s ++ " isn't a valid severity level"
+ numToVerbosity 0 = V0
+ numToVerbosity 1 = V1
+ numToVerbosity 2 = V2
+ numToVerbosity _ = V3
+ loggerFormatParser "pretty" = Right PrettyLog
+ loggerFormatParser "compact" = Right CompactLog
+ loggerFormatParser "json" = Right JSONLog
+ loggerFormatParser "bracket" = Right BracketLog
+ loggerFormatParser s = Left $ s ++ " isn't a valid log format"
+
+-- | Modifies a CLI parsing so it features verbosity and severity flags
+addScribeParamsParsing :: ConfigurationReader cfg ovs -> ConfigurationReader (LoggerScribeParams, cfg) (LoggerScribeParams, ovs)
+addScribeParamsParsing super = ConfigurationReader
+ { overridesParser = (,) <$> parseScribeParams <*> overridesParser super
+ , nullOverrides = \(_, ovs) -> nullOverrides super ovs
+ , overrideCfgFromYamlFile = \yaml (scribeParams, ovs) ->
+ let (warns, res) = overrideCfgFromYamlFile super yaml ovs
+ in (warns, (scribeParams,) <$> res)
+ }
+
+-- | Some action to be carried out after the parser is done. Writing the config
+-- file is done here, as is the logging of config.
+data PostParsingAction
+ = PostParsingLog Severity LogStr -- ^ Log a message
+ | forall a. (ToJSON a) => PostParsingWrite LocalFilePath a
+ -- ^ Write to a file and log a message about it
+
+-- | Wraps the actions to override the config file
+newtype PreRun = PreRun {unPreRun :: forall m. (KatipContext m, MonadIO m) => m ()}
+
+handleOptions
+ :: forall cfg cmd overrides.
+ (ToJSON cfg)
+ => String -- ^ Program name
+ -> BaseInputConfig cfg
+ -> ConfigurationReader (LoggerScribeParams, cfg) (LoggerScribeParams, overrides)
+ -> Maybe (cmd, String) -- ^ Command to run (and a name/description for it). If
+ -- Nothing, means we should just save the config
+ -> Bool -- ^ Whether to save the overrides
+ -> (LoggerScribeParams, overrides) -- ^ overrides
+ -> (Maybe (cfg, cmd), LoggerScribeParams, [PostParsingAction])
+ -- ^ (Config and command, actions to run to override
+ -- the yaml file)
+handleOptions progName (BaseInputConfig _ Nothing _) _ Nothing _ _ = error $
+ "No config found and nothing to save. Please run `" ++ progName ++ " write-config-template' first."
+handleOptions progName (BaseInputConfig mbCfgFile mbCfg defCfg) cliOverriding mbCmd saveOverridesAlong overrides =
+ let defaultCfg = toJSON defCfg
+ (cfgWarnings, cfg) = case mbCfg of
+ Just c -> mergeWithDefault [] defaultCfg c
+ Nothing -> ([PostParsingLog DebugS $ logStr $
+ "Config file" ++ configFile' ++ " is not found. Treated as empty."]
+ ,defaultCfg)
+ (overrideWarnings, mbScribeParamsAndCfgOverriden) =
+ overrideCfgFromYamlFile cliOverriding cfg overrides
+ allWarnings = cfgWarnings ++ map (PostParsingLog WarningS . logStr) overrideWarnings
+ in case mbScribeParamsAndCfgOverriden of
+ Right (lsp, cfgOverriden) ->
+ case mbCmd of
+ Nothing -> (Nothing, lsp, allWarnings ++
+ [PostParsingWrite (fromJust mbCfgFile) cfgOverriden])
+ Just (cmd, _cmdShown) ->
+ let actions =
+ allWarnings ++
+ (if saveOverridesAlong
+ then [PostParsingWrite (fromJust mbCfgFile) cfgOverriden]
+ else []) ++
+ [PostParsingLog DebugS $ logStr $ "Running `" <> T.pack progName
+ <> "' with the following config:\n"
+ <> T.decodeUtf8 (Y.encode cfgOverriden)]
+ in (Just (cfgOverriden, cmd), lsp, actions)
+ Left err -> dispErr err
+ where
+ configFile' = case mbCfgFile of Nothing -> ""
+ Just f -> " " ++ f ^. pathWithExtensionAsRawFilePath
+ dispErr err = error $
+ (if nullOverrides cliOverriding overrides
+ then "C"
+ else "Overriden c") ++ "onfig" <> shownFile <> " is not valid:\n " ++ err
+ where
+ shownFile = case mbCfgFile of
+ Just f -> " from " <> show f
+ Nothing -> ""
+
+mergeWithDefault :: [T.Text] -> Y.Value -> Y.Value -> ([PostParsingAction], Y.Value)
+mergeWithDefault path (Object o1) (Object o2) =
+ let newKeys = HashMap.keys $ o2 `HashMap.difference` o1
+ warnings = map (\key -> PostParsingLog DebugS $ logStr $
+ "The key '" <> T.intercalate "." (reverse $ key:path) <>
+ "' isn't present in the default configuration. " <>
+ "Please make sure it isn't a typo.")
+ newKeys
+ (subWarnings, merged) =
+ sequenceA $ HashMap.unionWithKey
+ (\key (_,v1) (_,v2) -> mergeWithDefault (key:path) v1 v2)
+ (fmap pure o1)
+ (fmap pure o2)
+ in (warnings ++ subWarnings, Object merged)
+mergeWithDefault _ _ v = pure v
+
+parseShowTree :: Parser PipelineCommand
+parseShowTree = ShowTree <$> parseRoot <*> parseShowOpts
+ where
+ parseRoot = argument (eitherReader (fromTextRepr . T.pack))
+ (help "Path from which to display the porcupine tree" <> value (LTP []))
+ parseShowOpts = PhysicalFileNodeShowOpts
+ <$> flag False True
+ (long "mappings"
+ <> short 'm'
+ <> help "Show mappings of virtual files")
+ <*> flag True False
+ (long "no-serials"
+ <> short 'S'
+ <> help "Don't show if the virtual file can be used as a source or a sink")
+ <*> flag True False
+ (long "no-fields"
+ <> short 'F'
+ <> help "Don't show the option fields and their docstrings")
+ <*> flag False True
+ (long "types"
+ <> short 't'
+ <> help "Show types written to virtual files")
+ <*> flag False True
+ (long "accesses"
+ <> short 'a'
+ <> help "Show how virtual files will be accessed")
+ <*> flag True False
+ (long "no-extensions"
+ <> short 'E'
+ <> help "Don't show the possible extensions for physical files")
+ <*> option auto
+ (long "num-chars"
+ <> short 'c'
+ <> help "The number of characters to show for the type (default: 60)"
+ <> value (60 :: Int))
+
+pipelineCliParser
+ :: (ToJSON cfg)
+ => (cfg -> ConfigurationReader cfg overrides)
+ -> String
+ -> BaseInputConfig cfg
+ -> IO (Parser (Maybe (cfg, PipelineCommand), LoggerScribeParams, [PostParsingAction]))
+pipelineCliParser getCliOverriding progName baseInputConfig =
+ cliYamlParser progName baseInputConfig (getCliOverriding $ bicDefaultConfig baseInputConfig)
+ [(pure RunPipeline, "run", "Run the pipeline")
+ ,(parseShowTree, "show-tree", "Show the porcupine tree of the pipeline")]
+ RunPipeline
diff --git a/src/System/TaskPipeline/Caching.hs b/src/System/TaskPipeline/Caching.hs
new file mode 100644
index 0000000..dbc62c1
--- /dev/null
+++ b/src/System/TaskPipeline/Caching.hs
@@ -0,0 +1,112 @@
+{-# LANGUAGE Arrows #-}
+{-# LANGUAGE TupleSections #-}
+
+-- | Functions in that module are necessary only if you want a fine control over
+-- the caching of some actions. When you want to perform several reads and
+-- writes from and to VirtualFiles as part of a /single/ cached task, the recommended way is
+-- to use:
+--
+-- - 'getDataReader'/'getDataWriter' to obtain the accessors
+-- - 'toTask'' to create the cached task, to which you give the accessors
+--
+-- Given the accessors are hashable, the files that are bound to them are
+-- incorporated to the hash, so binding them to new files will re-trigger the
+-- task.
+
+module System.TaskPipeline.Caching
+ ( toTaskAndWrite
+ , toTaskAndWrite_
+
+ -- * Re-exports
+
+ , module Data.Locations.LogAndErrors
+ , Properties(..)
+ , defaultCacherWithIdent
+ , Default(..)
+ ) where
+
+import qualified Control.Exception.Safe as SE
+import Control.Funflow
+import Data.Default (Default (..))
+import Data.Locations.LogAndErrors
+import Data.Locations.VirtualFile
+import System.TaskPipeline.PorcupineTree
+import System.TaskPipeline.PTask
+import System.TaskPipeline.VirtualFileAccess
+
+import Prelude hiding (id, (.))
+
+
+-- | For when the result of the lifted function just needs to be written, not
+-- returned.
+toTaskAndWrite_
+ :: (LogCatch m, Typeable b, Typeable ignored)
+ => Properties (a, DataWriter m b) () -- ^ Location types aren't ContentHashable, but
+ -- all are convertible to JSON. We need that to
+ -- hash on locations so the task is repeated if
+ -- we bind to new locations.
+ -> VirtualFile b ignored -- ^ The VirtualFile to write
+ -> (a -> m b) -- ^ The function to lift. Won't be executed if
+ -- the file isn't mapped
+ -> PTask m a ()
+toTaskAndWrite_ props vf f =
+ toTaskAndWrite props id vf (fmap (,()) . f) (const $ return ())
+{-# INLINE toTaskAndWrite_ #-}
+
+
+-- | Similar to 'toTask'', but caches a write action of the result too. In this
+-- case we use the filepath bound to the VirtualFile to compute the hash. That
+-- means that if the VirtualFile is bound to something else, the step will be
+-- re-executed.
+toTaskAndWrite
+ :: (LogCatch m, Typeable b, Typeable ignored)
+ => Properties (a', DataWriter m b) c -- ^ Location types aren't ContentHashable, but
+ -- all are convertible to JSON. We need that to
+ -- hash on locations so the task is repeated if
+ -- we bind to new locations.
+ -> (a -> a') -- ^ If the input mustn't or cannot be fully
+ -- hashed, you can select a subset of it or
+ -- transform it into a hashable intermediate
+ -- representation (like aeson Value). Else just
+ -- use 'id'
+ -> VirtualFile b ignored -- ^ The VirtualFile to write. If the file
+ -- isn't mapped, the action won't be performed,
+ -- and the task will return the default result.
+ -> (a -> m (b,c)) -- ^ The function to lift. First item of the
+ -- returned tuple will be written to the
+ -- VirtualFile. The second will be returned by
+ -- the task, so it must be loadable from the
+ -- store.
+ -> (a -> m c) -- ^ Called when the VirtualFile isn't mapped,
+ -- and therefore no @b@ needs to be computed
+ -> PTask m a c
+toTaskAndWrite props inputHashablePart vf action actionWhenNotMapped = proc input -> do
+ writer <- getDataWriter vf -< ()
+ throwTask <<< toTask' props' cached -< (input,writer)
+ where
+ cached (input,writer) | null (dwLocsAccessed writer)
+ = Right <$> actionWhenNotMapped input
+ | otherwise
+ = do
+ res <- SE.try $ action input
+ case res of
+ Right (outputForVFile, outputForStore) -> do
+ dwPerformWrite writer outputForVFile
+ return $ Right outputForStore
+ Left err -> return $ Left (err::SomeException)
+
+ props' = props { cache = cache'
+ , mdpolicy = updMdw <$> mdpolicy props }
+ getH (input,writer) = (inputHashablePart input,writer)
+ cache' = case cache props of
+ NoCache -> NoCache
+ Cache key sv rv ->
+ let key' salt = key salt . getH
+ sv' (Left e) = error $
+ "toTaskAndWrite: An exception occured during the cached function: "
+ ++ displayException e
+ sv' (Right x) = sv x
+ rv' = Right . rv
+ in Cache key' sv' rv'
+ updMdw mdWriter i (Right o) = mdWriter (getH i) o
+ updMdw _ _ (Left _) = []
diff --git a/src/System/TaskPipeline/ConfigurationReader.hs b/src/System/TaskPipeline/ConfigurationReader.hs
new file mode 100644
index 0000000..005815f
--- /dev/null
+++ b/src/System/TaskPipeline/ConfigurationReader.hs
@@ -0,0 +1,150 @@
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE GADTs #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE RecordWildCards #-}
+{-# OPTIONS_GHC -Wall #-}
+
+module System.TaskPipeline.ConfigurationReader
+ ( ConfigurationReader(..)
+ , docRecBasedConfigurationReader
+ , genericAesonBasedConfigurationReader
+ , parseJSONEither
+ ) where
+
+import Control.Applicative
+import Control.Lens
+import Control.Monad
+import qualified Data.Aeson as A
+import Data.DocRecord
+import Data.DocRecord.OptParse
+import qualified Data.HashMap.Lazy as HashMap
+import Data.Locations.Loc
+import Data.Locations.SerializationMethod (parseJSONEither)
+import qualified Data.Text as T
+import qualified Data.Text.Encoding as T
+import qualified Data.Yaml as Y
+import Options.Applicative
+
+
+-- | How to override a YAML file config from the command-line to return some
+-- config type @cfg@
+data ConfigurationReader cfg overrides = ConfigurationReader
+ { overridesParser :: Parser overrides
+ -- ^ Generate a parser from default cfg
+ , nullOverrides :: overrides -> Bool
+ -- ^ True if no override has been provided on the CLI
+ , overrideCfgFromYamlFile
+ :: A.Value -> overrides -> ([String], Either String cfg)
+ -- ^ How to override the config read from YAML file. Returns: (Warnings,
+ -- Overriden config or an error).
+ }
+
+-- | defCfg must be a 'DocRec' here. Uses it to generate one option per field in
+-- the DocRec, along with its documentation.
+docRecBasedConfigurationReader
+ :: (RecordUsableWithCLI rs)
+ => DocRec rs -> ConfigurationReader (DocRec rs) (Rec SourcedDocField rs)
+docRecBasedConfigurationReader defCfg = ConfigurationReader{..}
+ where
+ overridesParser = parseRecFromCLI $ tagWithDefaultSource defCfg
+ -- The parser will set the source to CLI for each modified
+ -- field. Unmodified fields' source with remain Default
+ nullOverrides :: Rec SourcedDocField rs -> Bool
+ nullOverrides RNil = True
+ nullOverrides _ = False
+ overrideCfgFromYamlFile aesonCfg cliOverrides = ([], result)
+ where
+ result = do
+ yamlCfg <- tagWithYamlSource <$> parseJSONEither aesonCfg
+ return $ rmTags $ rzipWith chooseHighestPriority yamlCfg cliOverrides
+ -- CLI overrides YAML and YAML overrides Default. This
+ -- way, options from the CLI that have not been changed
+ -- from their Default value do not erase the value
+ -- specified in the JSON file.
+
+-- | Generates a --override/-o CLI option that takes a "key=value" argument and
+-- can be repeated. Also has a -q flag that activates quiet mode. Doesn't make
+-- assumptions on the types of values, but doesn't do extra checks and doesn't
+-- display any extra documentation.
+genericAesonBasedConfigurationReader
+ :: (A.FromJSON cfg)
+ => LocalFilePath -> [(String, Char, String)] -> ConfigurationReader cfg [String]
+genericAesonBasedConfigurationReader configFile shortcuts =
+ ConfigurationReader genParser null
+ (\origCfg overrides ->
+ let (warnings, result) =
+ overrideConfigFromKeyValues origCfg overrides
+ in (warnings, parseJSONEither =<< result))
+ where
+ genParser = foldr1 (liftA2 (++)) overrideArgs
+ mkOption (l,s,h,f) = f <$>
+ many (strOption
+ ( long l <> short s <> metavar "yaml.path=YAML_VALUE" <> help h ))
+ mkShortcut (l,s,p) =
+ ( l,s,"A shortcut for `-o "<>p<>".yaml.path=YAML_VALUE'"
+ , map ((p++".")++) )
+ overrideArgs = map mkOption $
+ ("override", 'o', "Override a field value in the " <> configFile ^. pathWithExtensionAsRawFilePath <>
+ " configuration.", id)
+ : map mkShortcut shortcuts
+
+overrideConfigFromKeyValues :: A.Value -> [String] -> ([String], Either String A.Value)
+overrideConfigFromKeyValues origCfg overrides =
+ case foldM parseAndOverride ([], origCfg) $ map T.pack overrides of
+ Left s -> ([], Left s)
+ Right (w, c) -> (w, Right c)
+ where
+ badPath fullPath =
+ Left $ "Path `" ++ fullPath ++ "' malformed."
+ pathNotFound fullPath fields =
+ Left $ "Path `" ++ fullPath
+ ++ "' contains unknown nested field(s): " ++ show fields
+ parseAndOverride (w, cfg) override = case T.splitOn "=" override of
+ [path, val] -> case Y.decodeEither' $ T.encodeUtf8 val of
+ Right jsonVal -> do
+ (w', cfg') <- doOverride cfg (T.unpack path) (T.splitOn "." path) jsonVal
+ return (w'++w, cfg')
+ Left e -> Left $ "`" ++ T.unpack path ++ "': `"
+ ++ T.unpack val ++ "' is not valid yaml: got"
+ ++ show e
+ _ -> badPath $ T.unpack override
+ doOverride _ _ [] v = Right ([],v)
+ doOverride (A.Object cfg) fullPath (k:ks) v =
+ case HashMap.lookup k cfg of
+ Just cfg' -> do
+ (w, cfg'') <- doOverride cfg' fullPath ks v
+ Right $ checkTypeAndInsert w fullPath cfg' k cfg'' cfg
+ Nothing -> case ks of
+ [] -> Right $
+ ( ["`" ++ fullPath ++
+ "': This field does not exist in config file, it will be added (but beware of typos!)"]
+ , A.Object $ HashMap.insert k v cfg)
+ k' -> pathNotFound fullPath k'
+ doOverride _ fullPath k _ = pathNotFound fullPath k
+
+jsonType :: A.Value -> String
+jsonType a = case a of
+ A.String _ -> "a string"
+ A.Object _ -> "an object"
+ A.Number _ -> "a number"
+ A.Array _ -> "an array"
+ A.Bool _ -> "a bool"
+ A.Null -> "a null"
+
+checkTypeAndInsert :: [String]
+ -> String
+ -> A.Value
+ -> T.Text
+ -> A.Value
+ -> HashMap.HashMap T.Text A.Value
+ -> ([String], A.Value)
+checkTypeAndInsert w fullPath v' k v m =
+ let i = A.Object $ HashMap.insert k v m
+ t = jsonType v
+ t' = jsonType v'
+ in if t == t'
+ then (w,i)
+ else
+ (["`" ++ fullPath ++ "': Overriding " ++ t'
+ ++ " with " ++ t] ++ w
+ , i)
diff --git a/src/System/TaskPipeline/Logger.hs b/src/System/TaskPipeline/Logger.hs
new file mode 100644
index 0000000..461dc3d
--- /dev/null
+++ b/src/System/TaskPipeline/Logger.hs
@@ -0,0 +1,117 @@
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE RecordWildCards #-}
+
+module System.TaskPipeline.Logger
+ ( LoggerScribeParams(..)
+ , LoggerFormat(..)
+ , Severity(..)
+ , Verbosity(..)
+ , maxVerbosityLoggerScribeParams
+ , warningsAndErrorsLoggerScribeParams
+ , log
+ , runLogger
+ ) where
+
+import Control.Exception.Safe
+import Control.Monad.IO.Class (MonadIO, liftIO)
+import Data.Aeson
+import Data.Aeson.Encode.Pretty (encodePrettyToTextBuilder)
+import Data.Aeson.Text (encodeToTextBuilder)
+import qualified Data.HashMap.Strict as HM
+import Data.String
+import Data.Text (Text)
+import Data.Text.Lazy.Builder hiding (fromString)
+import Katip
+import Katip.Core
+import System.IO (stdout)
+
+
+-- | Switch between the different type of formatters for the log
+data LoggerFormat
+ = PrettyLog -- ^ Just shows the log messages, colored, with namespace and
+ -- pretty-prints . For human consumption.
+ | CompactLog -- ^ Like pretty, but prints JSON context just on one line
+ | JSONLog -- ^ JSON-formatted log, from katip
+ | BracketLog -- ^ Regular bracket log, from katip
+ deriving (Eq, Show)
+
+-- | Scribe parameters for Logger. Define a severity threshold and a verbosity level.
+data LoggerScribeParams = LoggerScribeParams
+ { loggerSeverityThreshold :: Severity
+ , loggerVerbosity :: Verbosity
+ , loggerFormat :: LoggerFormat
+ }
+ deriving (Eq, Show)
+
+-- | Show log message from Debug level, with V2 verbosity.
+maxVerbosityLoggerScribeParams :: LoggerScribeParams
+maxVerbosityLoggerScribeParams = LoggerScribeParams DebugS V2 PrettyLog
+
+-- | Show log message from Warning level, with V2 verbosity and one-line logs.
+warningsAndErrorsLoggerScribeParams :: LoggerScribeParams
+warningsAndErrorsLoggerScribeParams = LoggerScribeParams WarningS V2 CompactLog
+
+-- | Starts a logger.
+runLogger
+ :: (MonadMask m, MonadIO m)
+ => String
+ -> LoggerScribeParams
+ -> KatipContextT m a
+ -> m a
+runLogger progName (LoggerScribeParams sev verb logFmt) x = do
+ let logFmt' :: LogItem t => ItemFormatter t
+ logFmt' = case logFmt of
+ PrettyLog -> prettyFormat True
+ CompactLog -> prettyFormat False
+ BracketLog -> bracketFormat
+ JSONLog -> jsonFormat
+ handleScribe <- liftIO $
+ mkHandleScribeWithFormatter logFmt' ColorIfTerminal stdout (permitItem sev) verb
+ let mkLogEnv = liftIO $
+ registerScribe "stdout" handleScribe defaultScribeSettings
+ =<< initLogEnv (fromString progName) "devel"
+ bracket mkLogEnv (liftIO . closeScribes) $ \le ->
+ runKatipContextT le () "main" $ x
+
+-- | Doesn't log time, host, file location etc. Colors the whole message and
+-- displays context AFTER the message.
+prettyFormat :: LogItem a => Bool -> ItemFormatter a
+prettyFormat usePrettyJSON withColor verb Item{..} =
+ colorize withColor "40" (mconcat $ map fromText $ intercalateNs _itemNamespace) <>
+ fromText " " <>
+ colorBySeverity' withColor _itemSeverity (mbSeverity <> unLogStr _itemMessage) <>
+ colorize withColor "2" ctx
+ where
+ ctx = case toJSON $ payloadObject verb _itemPayload of
+ Object hm | HM.null hm -> mempty
+ c -> if usePrettyJSON
+ then fromText "\n" <> encodePrettyToTextBuilder c
+ else fromText " " <> encodeToTextBuilder c
+ -- We display severity levels not distinguished by color
+ mbSeverity = case _itemSeverity of
+ CriticalS -> fromText "[CRITICAL] "
+ AlertS -> fromText "[ALERT] "
+ EmergencyS -> fromText "[EMERGENCY] "
+ _ -> mempty
+
+-- | Like 'colorBySeverity' from katip, but works on builders
+colorBySeverity' :: Bool -> Severity -> Builder -> Builder
+colorBySeverity' withColor severity msg = case severity of
+ EmergencyS -> red msg
+ AlertS -> red msg
+ CriticalS -> red msg
+ ErrorS -> red msg
+ WarningS -> yellow msg
+ NoticeS -> bold msg
+ DebugS -> grey msg
+ _ -> msg
+ where
+ bold = colorize withColor "1"
+ red = colorize withColor "31"
+ yellow = colorize withColor "33"
+ grey = colorize withColor "2"
+
+colorize :: Bool -> Text -> Builder -> Builder
+colorize withColor c s
+ | withColor = fromText "\ESC[" <> fromText c <> fromText "m" <> s <> fromText "\ESC[0m"
+ | otherwise = s
diff --git a/src/System/TaskPipeline/Options.hs b/src/System/TaskPipeline/Options.hs
new file mode 100644
index 0000000..4d02279
--- /dev/null
+++ b/src/System/TaskPipeline/Options.hs
@@ -0,0 +1,84 @@
+{-# LANGUAGE DataKinds #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE TypeOperators #-}
+{-# LANGUAGE PatternSynonyms #-}
+{-# LANGUAGE GADTs #-}
+{-# OPTIONS_GHC -Wall #-}
+{-# OPTIONS_GHC -Wno-unticked-promoted-constructors #-}
+
+module System.TaskPipeline.Options
+ ( -- * API
+ getOptions
+ , getOption
+ , optionsVirtualFile
+ , optionVirtualFile
+ -- * Re-exports from docrecords
+ , DocRec, Rec(..), (^^.), (^^?), (^^?!), (=:)
+ , PathWithType(..)
+ , docField
+ , pattern FV
+ ) where
+
+import Data.Aeson
+import Data.DocRecord
+import Data.DocRecord.OptParse
+import Data.Locations.SerializationMethod
+import Data.Locations.VirtualFile
+import Data.Typeable
+import GHC.TypeLits (KnownSymbol)
+import System.TaskPipeline.PTask
+import System.TaskPipeline.VirtualFileAccess
+
+import Prelude hiding (id, (.))
+
+
+-- | Field Value. Allows you to directly pattern match on the output of
+-- 'getOptions'/'loadData'
+pattern FV :: a -> DocField (s:|:a)
+pattern FV v <- DocField _ (Right (Field v))
+
+-- | Creates a 'VirtualFile' from a default set of options (as a DocRec). To be
+-- used with 'loadData'.
+optionsVirtualFile
+ :: forall rs. (Typeable rs, RecordUsableWithCLI rs)
+ => [LocationTreePathItem] -- ^ The path for the options in the LocationTree
+ -> DocRec rs -- ^ The DocRec containing the fields with their
+ -- docs and default values
+ -> BidirVirtualFile (DocRec rs)
+optionsVirtualFile path defOpts =
+ withEmbeddedValue defOpts $
+ bidirVirtualFile path $
+ someBidirSerial (OptionsSerial id id :: OptionsSerial (DocRec rs))
+ <> someBidirSerial YAMLSerial
+
+-- | Just like 'optionsVirtualFile', but for a single field
+optionVirtualFile
+ :: (KnownSymbol s, Typeable t, ToJSON t, FieldFromCLI ('[s] :|: t))
+ => [LocationTreePathItem] -- ^ The path for the option field in the LocationTree
+ -> DocField ('[s] :|: t) -- ^ The field, usually with default value (created
+ -- with 'docField')
+ -> BidirVirtualFile t
+optionVirtualFile path field =
+ dimap (field =:) (^^?! field) $
+ optionsVirtualFile path (field :& RNil)
+
+-- | Add a set of options (as a DocRec) to the 'LocationTree', in order to
+-- expose them to the user, and returns the final values of these options
+getOptions
+ :: (LogThrow m, Typeable rs, RecordUsableWithCLI rs)
+ => [LocationTreePathItem] -- ^ The path for the options in the LocationTree
+ -> DocRec rs -- ^ The DocRec containing the fields with their
+ -- docs and default values
+ -> PTask m () (DocRec rs) -- ^ A PTask that returns the new options values,
+ -- overriden by the user
+getOptions path = loadData . optionsVirtualFile path
+
+-- | Just like 'getOptions', but for a single field.
+getOption
+ :: (LogThrow m, KnownSymbol s, Typeable t, ToJSON t, FieldFromCLI ('[s] :|: t))
+ => [LocationTreePathItem] -- ^ The path for the option field in the LocationTree
+ -> DocField ('[s] :|: t) -- ^ The field (created with 'docField')
+ -> PTask m () t -- ^ A PTask that returns the new option,
+ -- overriden by the user
+getOption path = loadData . optionVirtualFile path
diff --git a/src/System/TaskPipeline/PTask.hs b/src/System/TaskPipeline/PTask.hs
new file mode 100644
index 0000000..c70e556
--- /dev/null
+++ b/src/System/TaskPipeline/PTask.hs
@@ -0,0 +1,218 @@
+{-# LANGUAGE Arrows #-}
+{-# LANGUAGE ConstraintKinds #-}
+{-# LANGUAGE DataKinds #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE GADTs #-}
+{-# LANGUAGE InstanceSigs #-}
+{-# LANGUAGE TupleSections #-}
+{-# LANGUAGE TypeFamilies #-}
+{-# LANGUAGE TypeOperators #-}
+{-# OPTIONS_GHC -Wall #-}
+
+
+module System.TaskPipeline.PTask
+ ( module Control.Category
+ , module Control.Arrow
+ , module Data.Locations.LogAndErrors
+ , PTask
+ , Severity(..)
+ , CanRunPTask
+ , Properties
+ , tryTask, throwTask, clockTask, clockTask'
+ , catchAndLog, throwStringTask
+ , toTask, toTask'
+ , ioTask, stepIO, stepIO'
+ , taskUsedFiles
+ , taskRequirements
+ , taskRunnablePart
+ , taskDataAccessTree
+ , taskInSubtree
+ , voidTask
+ , addContextToTask
+ , addStaticContextToTask
+ , addNamespaceToTask
+ , nameTask
+ , logTask
+ , logDebug, logInfo, logNotice, logWarning, logError
+ ) where
+
+import Prelude hiding (id, (.))
+
+import Control.Arrow
+import qualified Control.Arrow.Free as AF
+import Control.Category
+import Control.DeepSeq (NFData (..), force)
+import Control.Exception (evaluate)
+import Control.Funflow (Properties, stepIO, stepIO')
+import Control.Lens
+import Data.Locations
+import Data.Locations.LogAndErrors
+import Data.String
+import Katip
+import System.ClockHelpers
+import System.TaskPipeline.PorcupineTree
+import System.TaskPipeline.PTask.Internal
+
+
+-- | a tasks that discards its inputs and returns ()
+voidTask :: PTask m a ()
+voidTask = arr (const ())
+
+-- | Just a shortcut for when you want an IO step that requires no input
+ioTask :: (KatipContext m) => PTask m (IO a) a
+ioTask = stepIO id
+
+-- | Catches an error happening in a task. Leaves the tree intact if an error
+-- occured.
+tryTask
+ :: PTask m a b -> PTask m a (Either SomeException b)
+tryTask = AF.try
+
+-- | An version of 'tryPTask' that just logs when an error happens
+catchAndLog :: (KatipContext m)
+ => Severity -> PTask m a b -> PTask m a (Maybe b)
+catchAndLog severity task =
+ tryTask task
+ >>> toTask (\i ->
+ case i of
+ Left e -> do
+ logFM severity $ logStr $ displayException (e::SomeException)
+ return Nothing
+ Right x -> return $ Just x)
+
+-- | Fails the whole pipeline if an exception occured, or just continues as
+-- normal
+throwTask :: (Exception e, LogThrow m) => PTask m (Either e b) b
+throwTask = arr (over _Left displayException) >>> throwStringTask
+
+-- | Fails the whole pipeline if an exception occured, or just continues as
+-- normal
+throwStringTask :: (LogThrow m) => PTask m (Either String b) b
+throwStringTask = toTask $ \i ->
+ case i of
+ Left e -> throwWithPrefix e
+ Right r -> return r
+
+-- | Turn an action into a PTask. BEWARE! The resulting 'PTask' will have NO
+-- requirements, so if the action uses files or resources, they won't appear in
+-- the LocationTree.
+toTask :: (KatipContext m)
+ => (a -> m b) -> PTask m a b
+toTask = makeTask mempty . const
+
+-- | A version of 'toTask' that can perform caching. It's analog to
+-- funflow wrap' except the action passed here is just a simple function (it
+-- will be wrapped later as a funflow effect).
+toTask' :: (KatipContext m)
+ => Properties a b -> (a -> m b) -> PTask m a b
+toTask' props = makeTask' props mempty . const
+
+-- | Measures the time taken by a 'PTask'.
+clockTask
+ :: (KatipContext m) => PTask m a b -> PTask m a (b, TimeSpec)
+clockTask task = proc input -> do
+ start <- time -< ()
+ output <- task -< input
+ end <- time -< ()
+ returnA -< (output, end `diffTimeSpec` start)
+ where
+ time = stepIO $ const $ getTime Realtime
+
+-- | Measures the time taken by a 'PTask' and the deep evaluation of its result.
+clockTask'
+ :: (NFData b, KatipContext m) => PTask m a b -> PTask m a (b, TimeSpec)
+clockTask' task = clockTask $
+ task >>> stepIO (evaluate . force)
+
+-- | Logs a message during the pipeline execution
+logTask :: (KatipContext m) => PTask m (Severity, String) ()
+logTask = toTask $ \(sev, s) -> logFM sev $ logStr s
+
+-- | Logs a message at a predefined severity level
+logDebug, logInfo, logNotice, logWarning, logError :: (KatipContext m) => PTask m String ()
+logDebug = arr (DebugS,) >>> logTask
+logInfo = arr (InfoS,) >>> logTask
+logNotice = arr (NoticeS,) >>> logTask
+logWarning = arr (WarningS,) >>> logTask
+logError = arr (ErrorS,) >>> logTask
+
+-- | To access and transform the requirements of the PTask before it runs
+taskRequirements :: Lens' (PTask m a b) VirtualTree
+taskRequirements = splitTask . _1
+
+-- | To access and transform all the 'VirtualFiles' used by this 'PTask'. The
+-- parameters of the VirtualFiles will remain hidden, but all the metadata is
+-- accessible. NOTE: The original path of the files isn't settable.
+taskUsedFiles :: Traversal' (PTask m a b) (VirtualFile NoWrite NoRead)
+taskUsedFiles = taskRequirements . traversed . vfnodeFileVoided
+
+-- | Permits to access the 'RunnableTask' inside the PTask. It is the PTask,
+-- devoid of its requirements. It is also and Arrow, and additionally it's an
+-- ArrowChoice, so by using 'over ptaskRunnablePart' you can access a structure
+-- in which you can use /case/ and /if/ statements.
+taskRunnablePart :: Lens (PTask m a b) (PTask m a' b')
+ (RunnableTask m a b) (RunnableTask m a' b')
+taskRunnablePart = splitTask . _2
+
+-- | To transform the state of the PTask when it will run
+taskReaderState :: Setter' (PTask m a b) (PTaskState m)
+taskReaderState = taskRunnablePart . runnableTaskReaderState
+
+-- | To transform the 'DataAccessTree' of the PTask when it will run
+taskDataAccessTree :: Setter' (PTask m a b) (DataAccessTree m)
+taskDataAccessTree = taskReaderState . ptrsDataAccessTree
+
+-- | Adds some context to a task, that will be used by the logger. That bit of
+-- context is dynamic, that's why what we do is wrap the task into a new one,
+-- expecting the 'LogItem'. See 'katipAddContext'. If your bit of context can be
+-- known statically (ie. before the pipeline actually runs), prefer
+-- 'addStaticContextToTask'.
+addContextToTask :: (LogItem i, Monad m) => PTask m a b -> PTask m (i,a) b
+addContextToTask = over taskRunnablePart $ modifyingRuntimeState
+ (\(item,_) -> over ptrsKatipContext (<> liftPayload item))
+ snd
+
+-- | Adds to a task some context that is know _before_ the pipeline run. The
+-- 'LogItem' to add is therefore static and can be given just as an argument.
+addStaticContextToTask :: (LogItem i) => i -> PTask m a b -> PTask m a b
+addStaticContextToTask item =
+ over (taskReaderState . ptrsKatipContext) (<> liftPayload item)
+
+-- | Adds a namespace to the task. See 'katipAddNamespace'. Like context in
+-- 'addStaticContextToTask', the namespace is meant to be static, that's why we
+-- give it as a parameter to 'addNamespaceToTask', instead of creating a PTask
+-- that expects the namespace as an input.
+--
+-- NOTE: Prefer the use of 'nameTask', which records the time spent within the
+-- task. Directly use 'addNamespaceToTask' only if that time tracking hurts
+-- performance.
+addNamespaceToTask :: String -> PTask m a b -> PTask m a b
+addNamespaceToTask ns =
+ over (taskReaderState . ptrsKatipNamespace) (<> fromString ns)
+
+-- | This gives the task a name, making porcupine aware that this task should be
+-- considered a entity by itself. This has a few effects:
+--
+-- change the logging output by wrapping it in a namespace (as per
+-- 'addNamespaceToTask') and measure and log (InfoS level) the time spent within
+-- that task
+nameTask :: (KatipContext m) => String -> PTask m a b -> PTask m a b
+nameTask ns task =
+ addNamespaceToTask ns $
+ clockTask task
+ >>> toTask (\(output, time) -> do
+ katipAddContext time $
+ logFM InfoS $ logStr $ "Finished task '" ++ ns ++ "' in " ++ showTimeSpec time
+ return output)
+
+-- | Moves the 'VirtualTree' associated to the task deeper in the final
+-- tree. This can be used to solve conflicts between tasks that have
+-- 'VirtualTree's that are identical (for instance input files for a model if
+-- you want to solve several models, in which case you'd want for instance to
+-- add an extra level at the root of the tree with the model name).
+taskInSubtree :: [LocationTreePathItem] -> PTask m a b -> PTask m a b
+taskInSubtree path = over splitTask $ \(reqTree, runnable) ->
+ let reqTree' = foldr (\pathItem rest -> folderNode [pathItem :/ rest]) reqTree path
+ runnable' = runnable & over (runnableTaskReaderState . ptrsDataAccessTree)
+ (view $ atSubfolderRec path)
+ in (reqTree', runnable')
diff --git a/src/System/TaskPipeline/PTask/Internal.hs b/src/System/TaskPipeline/PTask/Internal.hs
new file mode 100644
index 0000000..b1826f8
--- /dev/null
+++ b/src/System/TaskPipeline/PTask/Internal.hs
@@ -0,0 +1,286 @@
+{-# LANGUAGE Arrows #-}
+{-# LANGUAGE ConstraintKinds #-}
+{-# LANGUAGE ExistentialQuantification #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE GeneralizedNewtypeDeriving #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE QuasiQuotes #-}
+{-# LANGUAGE RecordWildCards #-}
+{-# LANGUAGE TemplateHaskell #-}
+{-# LANGUAGE TypeSynonymInstances #-}
+
+-- | This module exposes the 'PTask' arrow along with some low-level functions
+-- to create and run a 'PTask'.
+
+module System.TaskPipeline.PTask.Internal
+ ( PTask(..)
+ , PTaskState
+ , RunnableTask
+ , FunflowRunConfig(..)
+ , CanRunPTask
+ , FunflowOpts(..)
+ , ptrsKatipContext
+ , ptrsKatipNamespace
+ , ptrsFunflowRunConfig
+ , ptrsDataAccessTree
+ , splitTask
+ , runnableTaskReaderState
+ , makeTask
+ , makeTask'
+ , modifyingRuntimeState
+ , withRunnableState
+ , withRunnableState'
+ , execRunnableTask
+ , runnableWithoutReqs
+ , withTaskState
+ ) where
+
+import Prelude hiding (id, (.))
+
+import Control.Arrow
+import Control.Arrow.AppArrow
+import Control.Arrow.Async
+import Control.Arrow.Free (ArrowError)
+import Control.Category
+import Control.Funflow
+import qualified Control.Funflow.ContentStore as CS
+import Control.Funflow.External.Coordinator
+import Control.Funflow.External.Coordinator.SQLite
+import qualified Control.Funflow.RemoteCache as Remote
+import Control.Lens
+import Control.Monad.Trans
+import Control.Monad.Trans.Control
+import Control.Monad.Trans.Reader
+import Control.Monad.Trans.State
+import Control.Monad.Trans.Writer
+import Data.Default
+import Data.Locations.Accessors
+import Data.Locations.FunflowRemoteCache
+import Data.Locations.LocationTree
+import Data.Locations.LogAndErrors
+import Katip.Core (Namespace)
+import Katip.Monadic
+import Path
+import System.TaskPipeline.PorcupineTree
+
+
+-- | PTask functions like mappingOverStream make necessary to recursively run
+-- some flows. Until we find a better solution than to run flows in flows, this
+-- is how we do it. These are the arguments to
+-- Control.Funflow.Exec.Simple.runFlowEx
+data FunflowRunConfig m = forall c rc. (Coordinator c, Remote.Cacher m rc) => FunflowRunConfig
+ { _ffrcCoordinator :: !c
+ , _ffrcCoordinatorConfig :: !(Config c)
+ , _ffrcContentStore :: !CS.ContentStore
+ , _ffrcFlowIdentity :: !(Maybe Int)
+ , _ffrcRemoteCache :: !rc
+ }
+
+-- | This is the state that will be shared by the whole PTask pipeline once it
+-- starts running.
+data PTaskState m = PTaskState
+ { _ptrsKatipContext :: !LogContexts
+ , _ptrsKatipNamespace :: !Namespace
+ , _ptrsFunflowRunConfig :: !(FunflowRunConfig m)
+ , _ptrsDataAccessTree :: !(DataAccessTree m) }
+
+makeLenses ''PTaskState
+
+-- | The part of a 'PTask' that will be ran once the whole pipeline is composed
+-- and the tree of requirements has been bound to physical locations. Is is
+-- important to note that while both 'PTask' and 'RunnableTask' are Arrows,
+-- only 'RunnableTask' is an ArrowChoice.
+type RunnableTask m =
+ AppArrow
+ (Reader (PTaskState m)) -- The reader layer contains the mapped
+ -- tree. Will be used only as an applicative.
+ (Flow (InnerEffect m) SomeException)
+
+-- | The constraints that must be satisfied by the base monad m so that a @PTask
+-- m@ can be run
+type CanRunPTask m = (MonadBaseControl IO m, LogMask m)
+
+-- | Runs a 'RunnableTask' given its state
+execRunnableTask
+ :: (CanRunPTask m)
+ => RunnableTask m a b -> PTaskState m -> a -> m b
+execRunnableTask
+ (AppArrow act)
+ st@PTaskState{_ptrsFunflowRunConfig=FunflowRunConfig{..}}
+ input =
+ flip evalStateT [] $
+ runFlowEx _ffrcCoordinator _ffrcCoordinatorConfig
+ _ffrcContentStore (LiftCacher _ffrcRemoteCache) id _ffrcFlowIdentity
+ (runReader act st)
+ input
+
+-- | A task is an Arrow than turns @a@ into @b@. It runs in some monad @m@.
+-- Each 'PTask' will expose its requirements in terms of resource it wants to
+-- access in the form of a virtual tree (implemented as a 'LocationTree' of
+-- 'VirtualFile's). These trees of requirements are aggregated when the tasks
+-- are combined with each other, so once the full pipeline has been composed
+-- (through Arrow composition), its 'pTaskVirtualTree' will contain the
+-- complete requirements of the pipeline.
+newtype PTask m a b = PTask
+ (AppArrow
+ (Writer VirtualTree) -- The writer layer accumulates the requirements. It will
+ -- be used only as an applicative.
+ (RunnableTask m)
+ a b)
+ deriving (Category, Arrow, ArrowError SomeException, Functor, Applicative)
+ -- PTask doesn't instanciate ArrowChoice. That's intentional, even if an
+ -- instance could be automatically derived. The ArrowChoice implementation for
+ -- `AppArrow (Writer x) arr` isn't sane for PTasks, as the monoid state (the
+ -- PTask requirements) will be accumulated by (|||) in a way that's
+ -- indistiguishable from (>>>), that is to say that doesn't differentiate
+ -- VirtualFiles that _will_ be used from those that _may_ be used. Maybe in
+ -- the future we will implement ArrowChoice/ArrowPlus/ArrowZero in a saner way (it
+ -- should even be necessary if we want to implement serialization methods with
+ -- PTask themselves, and have serial method selection based on file format or
+ -- mapping metadata), but in that case it's necessary that the pipeline
+ -- configuration file reflects this "either-or" nature of VirtualFiles.
+
+flowToPTask :: Flow (InnerEffect m) SomeException a b -> PTask m a b
+flowToPTask = PTask . appArrow . appArrow
+
+-- | The type of effects we can run. The reader layer is executed by 'wrap',
+-- this is why it doesn't appear in the Flow part of the 'RunnableTask' type.
+type OuterEffect m =
+ AsyncA (ReaderT (PTaskState m) m)
+
+-- | The effects ran inside the flow have to handle some dynamic modifications
+-- of the state (for instance from task inputs) that have to be applied to each
+-- state passed to 'wrap'. We store these modifications as a stack of functions.
+type InnerEffect m =
+ AsyncA (StateT [PTaskState m -> PTaskState m] m)
+
+instance (KatipContext m)
+ => ArrowFlow (OuterEffect m) SomeException (PTask m) where
+ step' props f = flowToPTask $ step' props f
+ stepIO' props f = flowToPTask $ stepIO' props f
+ external f = flowToPTask $ external f
+ external' props f = flowToPTask $ external' props f
+ -- wrap' transmits the Reader state of the PTask down to the flow:
+ wrap' props (AsyncA rdrAct) = runnableWithoutReqs $
+ withRunnableState' props $ \outerState input ->
+ runReaderT (rdrAct input) outerState
+ putInStore f = flowToPTask $ putInStore f
+ getFromStore f = flowToPTask $ getFromStore f
+ internalManipulateStore f = flowToPTask $ internalManipulateStore f
+
+withOuterState
+ :: (ArrowFlow (AsyncA m) ex arr)
+ => Properties a b
+ -> (t -> a -> m b)
+ -> AppArrow (Reader t) arr a b
+withOuterState props f =
+ AppArrow $ reader $ \outerState ->
+ wrap' props $ AsyncA $ \input ->
+ f outerState input
+
+-- | The task will be executed with a new state modifier pushed on the modifiers
+-- stack.
+modifyingRuntimeState
+ :: (Monad m)
+ => (a -> PTaskState m -> PTaskState m)
+ -> (a -> a')
+ -> RunnableTask m a' b
+ -> RunnableTask m a b
+modifyingRuntimeState alterState alterInput ar = pushState >>> ar >>> popState
+ where
+ pushState =
+ withOuterState def $ \_ x -> do
+ modify (alterState x :)
+ return (alterInput x)
+ popState =
+ withOuterState def $ \_ x -> do
+ modify popMod
+ return x
+ popMod [] = error $
+ "modifyingRunnableState: Modifiers list shouldn't be empty!"
+ popMod (_:ms) = ms
+
+-- | At the 'RunnableTask' level, access the reader state and run an action
+withRunnableState' :: (KatipContext m)
+ => Properties a b -> (PTaskState m -> a -> m b) -> RunnableTask m a b
+withRunnableState' props f = withOuterState props $ \outerState input -> do
+ mods <- get
+ let ptrs = foldr ($) outerState mods
+ lift $
+ localKatipContext (const $ _ptrsKatipContext ptrs) $
+ localKatipNamespace (const $ _ptrsKatipNamespace ptrs) $
+ f ptrs input
+
+-- | 'withRunnableState'' without caching.
+withRunnableState :: (KatipContext m)
+ => (PTaskState m -> a -> m b) -> RunnableTask m a b
+withRunnableState = withRunnableState' def
+
+-- | Wraps a 'RunnableTask' into a 'PTask' that declares no requirements
+runnableWithoutReqs :: RunnableTask m a b -> PTask m a b
+runnableWithoutReqs = PTask . appArrow
+
+-- | An Iso to the requirements and the runnable part of a 'PTask'
+splitTask :: Iso (PTask m a b) (PTask m a' b')
+ (VirtualTree, RunnableTask m a b)
+ (VirtualTree, RunnableTask m a' b')
+splitTask = iso to_ from_
+ where
+ to_ (PTask (AppArrow wrtrAct)) = swap $ runWriter wrtrAct
+ from_ = PTask . AppArrow . writer . swap
+ swap (a,b) = (b,a)
+
+-- | Permits to apply a function to the Reader state of a 'RunnableTask' when
+-- in runs.
+runnableTaskReaderState :: Setter' (RunnableTask m a b) (PTaskState m)
+runnableTaskReaderState = lens unAppArrow (const AppArrow) . setting local
+
+-- | Makes a task from a tree of requirements and a function. The 'Properties'
+-- indicate whether we can cache this task.
+makeTask' :: (KatipContext m)
+ => Properties a b
+ -> LocationTree VirtualFileNode
+ -> (DataAccessTree m -> a -> m b)
+ -> PTask m a b
+makeTask' props tree f =
+ (tree, withRunnableState' props (f . _ptrsDataAccessTree)) ^. from splitTask
+
+-- | Makes a task from a tree of requirements and a function. This is the entry
+-- point to PTasks
+makeTask :: (KatipContext m)
+ => LocationTree VirtualFileNode
+ -> (DataAccessTree m -> a -> m b)
+ -> PTask m a b
+makeTask = makeTask' def
+
+data FunflowOpts m = FunflowOpts
+ { storePath :: FilePath
+ , coordPath :: FilePath
+ , flowIdentity :: Maybe Int
+ , remoteCacheLoc :: Maybe (SomeLoc m) }
+ deriving (Show)
+
+withFunflowRunConfig
+ :: (LogMask m)
+ => FunflowOpts m
+ -> (FunflowRunConfig m -> m r)
+ -> m r
+withFunflowRunConfig ffopts f = do
+ storePath' <- parseAbsDir $ storePath ffopts
+ coordPath' <- parseAbsDir $ coordPath ffopts
+ let cacher = locationCacher $ remoteCacheLoc ffopts
+ CS.withStore storePath' (\store ->
+ f $ FunflowRunConfig SQLite coordPath' store (flowIdentity ffopts) cacher)
+
+-- | Given a 'KatipContext' and a 'DataAccessTree', gets the initial state to
+-- give to 'execRunnableTask'
+withTaskState :: (LogMask m)
+ => FunflowOpts m
+ -> DataAccessTree m
+ -> (PTaskState m -> m r) -> m r
+withTaskState ffPaths tree f =
+ withFunflowRunConfig ffPaths $ \ffconfig -> do
+ ctx <- getKatipContext
+ ns <- getKatipNamespace
+ f $ PTaskState ctx ns ffconfig tree
diff --git a/src/System/TaskPipeline/PorcupineTree.hs b/src/System/TaskPipeline/PorcupineTree.hs
new file mode 100644
index 0000000..bf2cf94
--- /dev/null
+++ b/src/System/TaskPipeline/PorcupineTree.hs
@@ -0,0 +1,689 @@
+{-# LANGUAGE DeriveGeneric #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE GADTs #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE PatternSynonyms #-}
+{-# LANGUAGE RecordWildCards #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE TypeOperators #-}
+{-# LANGUAGE ViewPatterns #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# OPTIONS_GHC -fno-warn-missing-pattern-synonym-signatures #-}
+{-# OPTIONS_GHC -fno-warn-name-shadowing #-}
+
+-- | This file defines all the state of the _porcupine tree_.
+--
+-- A porcupine tree is a "LocationTree" containing the resources for a pipeline.
+-- The porcupine trees of subtasks are aggregated into the resource trees of a
+-- pipeline.
+--
+-- The porcupine tree can be in three different states:
+--
+-- * "VirtualTree": It contains only VirtualFiles. It is the state of
+-- the porcupine tree that is used by the tasks to declare their requirements,
+-- and by the configuration manager to write the default tree and mappings
+-- and read back the conf
+--
+-- * "PhysicalTree": It contains physical locations. Once the
+-- configuration and mappings to physical files have been read,
+-- each node is attached its corresponding physical locations. The locations for
+-- the node which have no explicit mappings in the configuration are derived from the
+-- nodes higher in the hierarchy (this implies that the only mandatory mapping is the
+-- root of the tree). Physical trees are used to check that each virtual file
+-- is compatible with the physical locations bound to it.
+--
+-- * "DataAccessTree": It contains the functions that allow to read or write
+-- to the resources. This tree is created after every physical location has
+-- been checked according to the following rules:
+--
+-- * if a physical location has an extension that is not recognized by the
+-- VirtualFile it is bound to, the process fails
+--
+-- * if a VirtualFile has at the same time physical locations bound AND
+-- embedded data, then the embedded data is considered to be the
+-- _rightmost_ layer (ie. the one overriding all the other ones), so that
+-- the data of this VirtualFile can be easily overriden by just changing
+-- the config file.
+--
+-- The VirtualFiles of a VirtualTree, when written to the configuration file,
+-- are divided into 2 sections: "data" and "locations". "data" is a tree of the
+-- json objects corresponding to the VirtualFiles that have default data which
+-- can be represented as 'Value's (keeping the structure of the virtual
+-- tree). "locations" contains a flat json object of all the other nodes, mapped
+-- in a "/virtual/path: physical_paths" fashion.
+--
+-- Indeed, each node can be mapped to _several_ physical paths, which we call
+-- _layers_. We require everything that is read from a VirtualFile to be a
+-- Monoid, so that we can combine the content of each layer into one value. This
+-- Monoid should be right-biased (ie. in the expression @a <> b@, @b@ overwrites
+-- the contents of @a@).
+--
+-- TODO: The physical tree still has variables in the locations. When
+-- are these spliced? What are the uses cases for physical locations with
+-- variables, and how do they differ from virtual locations?
+--
+module System.TaskPipeline.PorcupineTree where
+
+import Control.Exception.Safe
+import Control.Funflow.ContentHashable
+import Control.Lens hiding ((:>))
+import Control.Monad
+import Data.Aeson
+import qualified Data.ByteString as Strict
+import qualified Data.ByteString.Lazy as Lazy
+import qualified Data.ByteString.Streaming as BSS
+import Data.DocRecord
+import Data.DocRecord.OptParse
+import qualified Data.HashMap.Strict as HM
+import Data.List (intersperse)
+import Data.Locations
+import Data.Locations.Accessors
+import Data.Maybe
+import Data.Monoid (First (..))
+import Data.Representable
+import qualified Data.Text as T
+import qualified Data.Text.Encoding as TE
+import qualified Data.Text.Lazy as LT
+import qualified Data.Text.Lazy.Encoding as LTE
+import Data.Typeable
+import qualified Data.Yaml as Y
+import GHC.Generics (Generic)
+import Katip
+import Options.Applicative
+import Streaming
+import System.ClockHelpers
+import System.TaskPipeline.ConfigurationReader
+
+
+-- * API for manipulating porcupine tree _nodes_
+
+-- | The internal part of a 'VirtualFileNode', closing over the type params of
+-- the 'VirtualFile'
+data SomeVirtualFile where
+ SomeVirtualFile :: (Typeable a, Typeable b)
+ => VirtualFile a b
+ -> SomeVirtualFile
+
+instance Semigroup SomeVirtualFile where
+ SomeVirtualFile vf <> SomeVirtualFile vf' = case cast vf' of
+ Just vf'' -> SomeVirtualFile (vf <> vf'')
+ Nothing -> error $ "Two differently typed VirtualFiles share the same virtual path "
+ ++ (T.unpack $ toTextRepr (LTP $ _vfileOriginalPath vf))
+ ++ ":\n file 1 type: " ++ show (typeOf vf)
+ ++ "\n file 2 type: " ++ show (typeOf vf')
+
+-- | Packs together the two functions that will read and write a location, along
+-- with the actual locations that will be accessed. This interface permits to
+-- separate the resolution of locations and their access. A @DataAccessor@ is
+-- usually meant to be used inside an 'toPTask'. A @DataAccessor@ is hashable
+-- (via the locations it accesses) so a cached task can take a @DataAccessor@ as
+-- an input, and this task will be re-triggered if the physical location(s)
+-- bound to this accessor change(s).
+data DataAccessor m a b = DataAccessor
+ { daPerformWrite :: a -> m ()
+ , daPerformRead :: m b
+ , daLocsAccessed :: Either String [SomeLoc m] }
+
+instance (Monad m) => ContentHashable m (DataAccessor m' a b) where
+ contentHashUpdate ctx = contentHashUpdate ctx . fmap toHashableLocs . daLocsAccessed
+
+-- | Like a 'DataAccessor' but only with the writer part
+data DataWriter m a = DataWriter
+ { dwPerformWrite :: a -> m ()
+ , dwLocsAccessed :: Either String [SomeLoc m] }
+
+instance (Monad m) => ContentHashable m (DataWriter m' a) where
+ contentHashUpdate ctx = contentHashUpdate ctx . fmap toHashableLocs . dwLocsAccessed
+
+-- | Like a 'DataAccessor' but only with the reader part
+data DataReader m a = DataReader
+ { drPerformRead :: m a
+ , drLocsAccessed :: Either String [SomeLoc m] }
+
+instance (Monad m) => ContentHashable m (DataReader m' a) where
+ contentHashUpdate ctx = contentHashUpdate ctx . fmap toHashableLocs . drLocsAccessed
+
+-- | The internal part of a 'DataAccessNode, closing over the type params of the
+-- access function.
+data SomeDataAccess m where
+ SomeDataAccess :: (Typeable a, Typeable b)
+ => (LocVariableMap -> DataAccessor m a b) -> SomeDataAccess m
+
+-- | Tells the type of accesses that some VirtualFile will undergo. They are
+-- accumulated though the whole pipeline.
+data VFNodeAccessType = ATWrite | ATRead
+ deriving (Eq, Show, Generic)
+
+instance ToJSON VFNodeAccessType
+instance FromJSON VFNodeAccessType
+
+-- | The nodes of a "VirtualTree"
+data VirtualFileNode = MbVirtualFileNode [VFNodeAccessType] (Maybe SomeVirtualFile)
+-- | A non-empty 'VirtualFileNode'
+pattern VirtualFileNode {vfnodeAccesses, vfnodeFile} =
+ MbVirtualFileNode vfnodeAccesses (Just (SomeVirtualFile vfnodeFile))
+
+-- | vfnodeFile is a @Traversal'@ into the VirtualFile contained in a
+-- VirtualFileNode, but hiding its real read/write types.
+vfnodeFileVoided :: Traversal' VirtualFileNode (VirtualFile NoWrite NoRead)
+vfnodeFileVoided f (VirtualFileNode at vf) =
+ VirtualFileNode at <$> vfileVoided f vf
+vfnodeFileVoided _ vfn = pure vfn
+
+-- | The nodes of a "PhysicalTree"
+data PhysicalFileNode m = MbPhysicalFileNode [SomeLocWithVars m] [VFNodeAccessType] (Maybe SomeVirtualFile)
+-- | A non-empty 'PhysicalFileNode'
+pattern PhysicalFileNode {pfnodeLayers, pfnodeAccesses, pfnodeFile} =
+ MbPhysicalFileNode pfnodeLayers pfnodeAccesses (Just (SomeVirtualFile pfnodeFile))
+
+-- | The nodes of a "DataAccessTree"
+data DataAccessNode m = MbDataAccessNode [SomeLocWithVars m] (First (SomeDataAccess m))
+ -- Data access function isn't a semigroup, hence the use of First here instead
+ -- of Maybe.
+-- | A non-empty 'DataAccessNode'
+pattern DataAccessNode l x = MbDataAccessNode l (First (Just (SomeDataAccess x)))
+
+
+instance Semigroup VirtualFileNode where
+ MbVirtualFileNode ats vf <> MbVirtualFileNode ats' vf' =
+ MbVirtualFileNode (ats <> ats') (vf <> vf')
+instance Monoid VirtualFileNode where
+ mempty = MbVirtualFileNode [] mempty
+-- TODO: It is dubious that composing DataAccessNodes is really needed in the
+-- end. Find a way to remove that.
+instance Semigroup (DataAccessNode m) where
+ MbDataAccessNode locs f <> MbDataAccessNode _ f' = MbDataAccessNode locs $ f <> f'
+instance Monoid (DataAccessNode m) where
+ mempty = MbDataAccessNode [] mempty
+
+toJSONTxt :: SomeLocWithVars m -> T.Text
+toJSONTxt (SomeGLoc a) = case toJSON a of
+ String s -> s
+ v -> LT.toStrict $ LTE.decodeUtf8 $ encode v
+
+-- | How to Show a PhysicalFileNode
+data PhysicalFileNodeShowOpts = PhysicalFileNodeShowOpts
+ { pfshowWithMappings :: Bool
+ , pfshowWithSerials :: Bool
+ , pfshowWithOptions :: Bool
+ , pfshowWithTypes :: Bool
+ , pfshowWithAccesses :: Bool
+ , pfshowWithExtensions :: Bool
+ , pfshowTypeNumChars :: Int }
+
+data PhysicalFileNodeWithShowOpts m =
+ PhysicalFileNodeWithShowOpts PhysicalFileNodeShowOpts (PhysicalFileNode m)
+
+instance Show (PhysicalFileNodeWithShowOpts m) where
+ show (PhysicalFileNodeWithShowOpts PhysicalFileNodeShowOpts{..}
+ PhysicalFileNode{..}) =
+ mconcat $ intersperse "\n " $
+ (showIf pfshowWithMappings $
+ (if null pfnodeLayers
+ then "<no mapping>"
+ else T.unpack (mconcat
+ (intersperse "\n + "
+ (map toJSONTxt pfnodeLayers)))))
+ ++ (showIf pfshowWithSerials $
+ describeVFileAsSourceSink pfnodeFile)
+ ++ (showIf pfshowWithTypes $
+ describeVFileTypes pfnodeFile pfshowTypeNumChars)
+ ++ (showIf pfshowWithExtensions $
+ describeVFileExtensions pfnodeFile)
+ ++ (showIf pfshowWithAccesses $
+ "Accessed with " ++ show pfnodeAccesses)
+ ++ (showIf pfshowWithOptions $
+ describeVFileAsRecOfOptions pfnodeFile pfshowTypeNumChars)
+ where
+ showIf cond content =
+ if cond && (not $ null content) then [content] else []
+ show _ = ""
+
+-- * API for manipulating porcupine trees globally
+
+-- | The tree manipulated by tasks during their construction
+type VirtualTree = LocationTree VirtualFileNode
+
+-- | The tree manipulated when checking if each location is bound to something
+-- legit
+type PhysicalTree m = LocationTree (PhysicalFileNode m)
+
+-- | The tree manipulated by tasks when they actually run
+type DataAccessTree m = LocationTree (DataAccessNode m)
+
+instance HasDefaultMappingRule VirtualFileNode where
+ getDefaultLocShortcut VirtualFileNode{..} = getDefaultLocShortcut vfnodeFile
+ getDefaultLocShortcut _ = Nothing
+
+-- | Filters the tree to get only the nodes that don't have data and can be
+-- mapped to external files
+--
+-- TODO: Explain the rules. What does it mean for a node to have data?
+-- Can a node with data be mapped to an external file?
+virtualTreeToMappings
+ :: VirtualTree
+ -> Maybe LocationMappings
+virtualTreeToMappings tree = mappingsFromLocTree <$> over filteredLocsInTree rmOpts tree
+ where
+ rmOpts VirtualFileNode{..}
+ | Just VFForCLIOptions <- intent = Nothing -- Nodes with default data are
+ -- by default not put in the
+ -- mappings
+ where intent = vfileDescIntent $ getVFileDescription vfnodeFile
+ rmOpts n = Just n
+
+-- | Filters the tree to get only the nodes than can be embedded in the config file
+--
+-- TODO: Explain which are the nodes that can be embedded in config files.
+-- Explain how they relate to the nodes obtained in the previous function.
+-- Can a node have data and be embedded in a config file?
+-- Can a node be mapped externally and be embedded in a config file?
+--
+-- TODO: It is going to create some confusion having DataTree's and
+-- DataAccessTree's in the discourse. Could we define better what a
+-- DataTree is and see if there are better names?
+embedDataInVirtualTree
+ :: VirtualTree
+ -> Maybe VirtualTree
+embedDataInVirtualTree = over filteredLocsInTree keepOpts
+ where
+ keepOpts n@VirtualFileNode{..}
+ | Just VFForCLIOptions <- intent = Just n
+ | otherwise = Nothing
+ where intent = vfileDescIntent $ getVFileDescription vfnodeFile
+ keepOpts n = Just n
+
+variablesSection :: T.Text
+variablesSection = "variables"
+
+embeddedDataSection :: T.Text
+embeddedDataSection = "data"
+
+mappingsSection :: T.Text
+mappingsSection = "locations"
+
+-- TODO: This function type is suspicious given that it always
+-- yields a singleton list. May be fixed by changing the type or
+-- explaining what the function does. Recursion would work just
+-- as well if the function returned a pair.
+embeddedDataTreeToJSONFields
+ :: T.Text -> VirtualTree -> [(T.Text, Value)]
+embeddedDataTreeToJSONFields thisPath (LocationTree mbOpts sub) =
+ [(thisPath, Object $ opts' <> sub')]
+ where
+ opts' = case mbOpts of
+ VirtualFileNode{..} ->
+ case (vfnodeFile ^? vfileAsBidir) >>= getConvertedEmbeddedValue of
+ Just (Object o) -> o
+ _ -> mempty
+ _ -> mempty
+ sub' = HM.fromList $
+ concatMap (\(k,v) -> embeddedDataTreeToJSONFields (_ltpiName k) v) $ HM.toList sub
+
+
+-- ** VirtualTreeAndMappings: join a virtual tree with the locations it
+-- should be mapped to
+
+-- | A 'VirtualTree' associated with the mapping that should be applied
+-- to it. This is the way to serialize and deserialize a virtual tree
+data VirtualTreeAndMappings = VirtualTreeAndMappings
+ { vtamTree :: VirtualTree
+ , vtamMappings :: Either Loc LocationMappings
+ , vtamVariables :: LocVariableMap }
+
+-- VirtualTreeAndMappings is only 'ToJSON' and not 'FromJSON' because we need
+-- more context to deserialize it. It is done by virtualTreeConfigurationReader
+instance ToJSON VirtualTreeAndMappings where
+ toJSON (VirtualTreeAndMappings tree mappings variables) = Object $
+ (case embedDataInVirtualTree tree of
+ Just t -> HM.fromList $ embeddedDataTreeToJSONFields embeddedDataSection t
+ Nothing -> HM.empty)
+ <>
+ (case virtualTreeToMappings tree of
+ Just m ->
+ HM.singleton mappingsSection $ toJSON $ case mappings of
+ Right m' -> m'
+ Left rootLoc -> mappingRootOnly rootLoc <> m
+ Nothing -> HM.empty)
+ <>
+ (HM.singleton variablesSection $ toJSON variables)
+
+-- ** Mapping virtual trees to input files
+
+data LayerOperator = ReplaceLayers | AddLayer
+ deriving (Eq, Show)
+
+type VirtualTreeAndMappingsOverrides =
+ ( LocVariableMap
+ -- The map of variables and their values read from CLI too
+ , [(LocationTreePath, LayerOperator, SerializableLocShortcut)]
+ -- Locations mapped to new layers
+ , LocationTree (VirtualFileNode, Maybe (RecOfOptions SourcedDocField))
+ -- The tree containing the options parsed by optparse-applicative
+ )
+
+splitVarBinding :: String -> Either String (LocVariable, String)
+splitVarBinding (T.splitOn "=" . T.pack -> [T.unpack -> var, T.unpack -> val]) =
+ Right (LocVariable var,val)
+splitVarBinding _ = Left "Var binding must be of the form \"variable=value\""
+
+-- | Reads the data from the input config file. Constructs the parser for the
+-- command-line arguments. Combines both results to create the
+-- 'VirtualTree' (and its mappings) the pipeline should run on.
+virtualTreeConfigurationReader
+ :: VirtualTreeAndMappings
+ -> ConfigurationReader VirtualTreeAndMappings VirtualTreeAndMappingsOverrides
+virtualTreeConfigurationReader VirtualTreeAndMappings{vtamTree=defTree} =
+ ConfigurationReader overridesParser_ nullOverrides_ overrideCfgFromYamlFile_
+ where
+ overridesParser_ =
+ (,,) <$> variablesParser <*> mappingsParser <*> treeOfOptsParser
+ where
+ treeOfOptsParser = traverseOf (traversed . _2 . _Just) parseOptions $
+ fmap nodeAndRecOfOptions defTree
+ variablesParser = HM.fromList <$>
+ many (option (eitherReader splitVarBinding)
+ (long "var"
+ <> help "Set a variable already present in the config file"))
+
+ mappingsParser =
+ many (option (eitherReader locBinding)
+ (long "loc"
+ <> short 'l'
+ <> help "Map a virtual file path to a physical location"))
+ parseLocBinding vpath locOp loc = do
+ p <- fromTextRepr vpath
+ l <- over _Left displayException $ Y.decodeEither' $ TE.encodeUtf8 loc
+ return (p,locOp,l)
+ locBinding (T.splitOn "+=" . T.pack -> [vpath,loc]) =
+ parseLocBinding vpath AddLayer loc
+ locBinding (T.splitOn "=" . T.pack -> [vpath,loc]) =
+ parseLocBinding vpath ReplaceLayers loc
+ locBinding _ =
+ Left "Location mapping must be of the form \"virtual_path(+)=physical_path\""
+
+ nodeAndRecOfOptions :: VirtualFileNode -> (VirtualFileNode, Maybe DocRecOfOptions)
+ nodeAndRecOfOptions n@VirtualFileNode{..} = (n, (vfnodeFile ^? vfileAsBidir) >>= getConvertedEmbeddedValue)
+ nodeAndRecOfOptions n = (n, Nothing)
+ parseOptions :: RecOfOptions DocField -> Parser (RecOfOptions SourcedDocField)
+ parseOptions (RecOfOptions r) = RecOfOptions <$>
+ parseRecFromCLI (tagWithDefaultSource r)
+
+ nullOverrides_ (_,_,t) = allOf (traversed . _2 . _Just) nullRec t
+ nullRec (RecOfOptions RNil) = True
+ nullRec _ = False
+
+ overrideCfgFromYamlFile_ (Object aesonCfg) (cliVars, cliMappings, embeddedDataTree) = ([], vtam)
+ where
+ dataSectionContent = HM.lookup embeddedDataSection aesonCfg
+ mappingsSectionContent = HM.lookup mappingsSection aesonCfg
+ variablesSectionContent = HM.lookup variablesSection aesonCfg
+ addCLIMappings (LocationMappings_ yamlMappings) =
+ LocationMappings_ $ foldl addOne yamlMappings cliMappings
+ where
+ addOne mappings (path, locOp, loc) = HM.alter (go locOp loc) path mappings
+ go AddLayer loc (Just locs) = Just $ locs ++ [loc]
+ go _ loc _ = Just [loc]
+ vtam = VirtualTreeAndMappings
+ <$> traverseOf traversedTreeWithPath
+ (replaceWithDataFromConfig dataSectionContent) embeddedDataTree
+ <*> (Right . addCLIMappings <$> case mappingsSectionContent of
+ Just m -> parseJSONEither m
+ _ -> pure mempty)
+ <*> ((cliVars <>) <$> case variablesSectionContent of
+ Just m -> parseJSONEither m
+ _ -> pure mempty)
+ overrideCfgFromYamlFile_ _ _ = ([], Left "Configuration file doesn't contain a JSON object")
+
+ replaceWithDataFromConfig
+ :: Maybe Value -- The content of the embedded data section
+ -> (LocationTreePath, (VirtualFileNode, Maybe (RecOfOptions SourcedDocField)))
+ -> Either String VirtualFileNode
+ replaceWithDataFromConfig (Just dataSectionContent)
+ (LTP path, (node@VirtualFileNode{..}, mbRecFromCLI)) =
+ let rebuildNode :: (Typeable a', Typeable b')
+ => VirtualFile a' b' -> VirtualFileNode
+ rebuildNode newF = VirtualFileNode{vfnodeFile = newF, ..}
+ err :: String -> Either String a
+ err s = Left $ "replaceWithDataFromConfig: " ++ showVFileOriginalPath vfnodeFile ++ ": " ++ s
+ in case findInAesonVal path dataSectionContent of
+ Right rawObjFromYaml ->
+ case (mbRecFromCLI, vfnodeFile ^? vfileAsBidir) of
+ (Just _, Nothing) ->
+ err "that virtualfile contains an embedded record of options, yet it isn't bidirectional"
+ (Just (RecOfOptions (recFromCLI :: Rec SourcedDocField rs)), Just bidirVFile) -> do
+ -- YAML: yes, CLI: yes
+ -- First we merge all the data present in the yaml:
+ mergedDataFromYaml <- getMergedLayersFromAesonValue bidirVFile rawObjFromYaml
+ (RecOfOptions::DocRec rs -> DocRecOfOptions)
+ -- then we convert it back to a DocRec, as we need to override
+ -- it from the CLI:
+ RecOfOptions (recFromYaml::DocRec rs') <- case getToAtomicFn (bidirVFile ^. vfileSerials) of
+ Just toDocRec -> return (toDocRec mergedDataFromYaml :: DocRecOfOptions)
+ Nothing -> err "that virtualfile contained an embedded record of options, yet it contained \
+ \no function to convert back to DocRecOfOptions"
+ case eqT :: Maybe (rs' :~: rs) of
+ Nothing -> err "not the same record type has been returned \
+ \by getMergedLayersFromAesonValue"
+ Just Refl -> do
+ -- finally, we merge CLI and YAML config and modify the
+ -- node's embedded data:
+ rebuildNode <$> setConvertedEmbeddedValue bidirVFile
+ (RecOfOptions $ rmTags $ rzipWith chooseHighestPriority (tagWithYamlSource recFromYaml) recFromCLI)
+ (Nothing, _) -> do
+ -- YAML: yes, CLI: no
+ mergedDataFromYaml <- getMergedLayersFromAesonValue vfnodeFile rawObjFromYaml (id::Value -> Value)
+ return $ rebuildNode $ vfnodeFile & vfileEmbeddedValue .~ Just mergedDataFromYaml
+ Left _ -> case mbRecFromCLI of
+ Just (RecOfOptions recFromCLI) ->
+ -- YAML: no, CLI: yes
+ rebuildNode <$> setConvertedEmbeddedValue vfnodeFile (RecOfOptions (rmTags recFromCLI))
+ Nothing ->
+ -- YAML: no, CLI: no
+ return node
+ replaceWithDataFromConfig _ (_, (node, _)) = return node
+
+ findInAesonVal path = go path
+ where
+ go [] v = return v
+ go (p:ps) (Object (HM.lookup (_ltpiName p) -> Just v)) = go ps v
+ go _ _ = Left $ "virtualTreeConfigurationReader: " ++
+ (T.unpack $ toTextRepr $ LTP path) ++ " doesn't match any path in the Yaml config"
+
+ getMergedLayersFromAesonValue vf objFromYaml f = do
+ layersFromYaml <- case objFromYaml of
+ Object m
+ | Just v <- HM.lookup "$layers" m ->
+ case v of
+ Array layers -> mapM parseJSONEither $ foldr (:) [] layers
+ _ -> Left $ "If you specify data with $layers, $layers must contain an array."
+ _ -> (:[]) <$> parseJSONEither objFromYaml
+ tryMergeLayersForVFile vf $ map f layersFromYaml
+
+-- ** Transforming a virtual tree to a physical resource tree (ie. a
+-- tree with physical locations attached)
+
+-- | Transform a virtual file node in file node with definite physical
+-- locations. Splices in the locs the variables that can be spliced.
+--
+-- See the meaning of the parameters in 'Data.Locations.Mappings.applyMappings'.
+applyOneVFileMapping :: LocVariableMap -> [SomeLocWithVars m] -> VirtualFileNode -> Bool -> PhysicalFileNode m
+applyOneVFileMapping variables configLayers mbVF mappingIsExplicit = buildPhysicalNode mbVF
+ where
+ configLayers' = map (\(SomeGLoc l) -> SomeGLoc $ spliceLocVariables variables l) configLayers
+ buildPhysicalNode VirtualFileNode{..} = PhysicalFileNode layers vfnodeAccesses vfnodeFile
+ where
+ First defExt = vfnodeFile ^. vfileSerials . serialDefaultExt
+ intent = vfileDescIntent $ getVFileDescription vfnodeFile
+ -- TODO: It seems to be the case that the are some constraints to meet
+ -- on a valid physical tree. For instance, that having a VFForCLIOptions
+ -- node with derived layers is invalid?
+ layers | not mappingIsExplicit, Just VFForCLIOptions <- intent = []
+ -- Options usually present in the config file need an _explicit_
+ -- mapping to be present in the config file, if we want them to be
+ -- read from external files instead
+ | otherwise = map resolveExt configLayers'
+ resolveExt (SomeGLoc loc) = SomeGLoc $ setLocTypeIfMissing loc $ T.unpack $ fromMaybe "" defExt
+ buildPhysicalNode _ = MbPhysicalFileNode configLayers' [] Nothing
+
+-- | Binds together a 'VirtualTree' with physical locations an splices
+-- in the variables read from the configuration.
+getPhysicalTreeFromMappings
+ :: (LogThrow m) => VirtualTreeAndMappings -> LocResolutionM m (PhysicalTree m)
+getPhysicalTreeFromMappings (VirtualTreeAndMappings tree mappings variables) =
+ applyMappings (applyOneVFileMapping variables) m' tree
+ where
+ m' = case mappings of
+ Right m -> m
+ Left rootLoc -> mappingRootOnly rootLoc
+
+-- ** Transforming a physical tree to a data access tree (ie. a tree where each
+-- node is just a function that pulls or writes the relevant data)
+
+newtype TaskConstructionError =
+ TaskConstructionError String
+ deriving (Show)
+instance Exception TaskConstructionError
+
+-- TODO: Is this dead-code?
+data DataAccessContext = DAC
+ { locationAccessed :: Value
+ , locationAccessType :: VFNodeAccessType
+ , requiredLocVariables :: [LocVariable]
+ , providedLocVariables :: LocVariableMap
+ , splicedLocation :: Value }
+ deriving (Generic)
+
+instance ToJSON DataAccessContext
+instance ToObject DataAccessContext
+instance LogItem DataAccessContext where
+ payloadKeys v _
+ | v == V3 = AllKeys
+ | v >= V1 = SomeKeys ["locationAccessed", "locationAccessType"]
+ | otherwise = SomeKeys []
+
+makeDataAccessor
+ :: forall m a b a' b'. (LogMask m)
+ => String -- ^ VirtualFile path (for doc)
+ -> VirtualFile a' b' -- ^ The virtual file
+ -> [SomeLocWithVars m] -- ^ Every mapped layer (for doc)
+ -> Maybe b -- ^ Default value (used as base layer)
+ -> LayeredReadScheme b -- ^ How to handle the different layers
+ -> [(ToAtomicFn a, SomeLocWithVars m)] -- ^ Layers to write to
+ -> [(FromStreamFn b, SomeLocWithVars m)] -- ^ Layers to read from
+ -> LocVariableMap -- ^ The map of the values of the repetition indices
+ -> DataAccessor m a b
+makeDataAccessor vpath vf layers mbDefVal readScheme writeLocs readLocs repetKeyMap =
+ DataAccessor{..}
+ where
+ (VFileImportance sevRead sevWrite sevError clockAccess) = vf ^. vfileImportance
+ rkeys = vf ^. vfileSerials.serialRepetitionKeys
+ timeAccess :: String -> Severity -> String -> m t -> m t
+ timeAccess prefix sev loc action
+ | clockAccess = do
+ (r, time) <- clockM action
+ katipAddContext time $
+ logFM sev $ logStr $ prefix ++ " '" ++ loc ++ "' in " ++ showTimeSpec time
+ return r
+ | otherwise = do
+ r <- action
+ logFM sev $ logStr $ prefix ++ " '" ++ loc ++ "'"
+ return r
+ daLocsAccessed = traverse (\(SomeGLoc loc) -> SomeGLoc <$> fillLoc' repetKeyMap loc) layers
+ daPerformWrite input =
+ forM_ writeLocs $ \(ToAtomicFn f, SomeGLoc loc) ->
+ case cast (f input) of
+ Nothing -> error "Some atomic serializer isn't converting to a lazy ByteString"
+ Just bs -> do
+ loc' <- fillLoc repetKeyMap loc
+ katipAddNamespace "dataAccessor" $ katipAddNamespace "writer" $
+ katipAddContext (DAC (toJSON loc) ATWrite rkeys repetKeyMap (toJSON loc')) $ do
+ let runWrite = writeBSS loc' (BSS.fromLazy bs)
+ timeAccess "Wrote" sevWrite (show loc') $
+ withException runWrite $ \ioError ->
+ logFM sevError $ logStr $ displayException (ioError :: IOException)
+ daPerformRead = do
+ dataFromLayers <- forM readLocs (\(FromStreamFn (f :: Stream (Of i) m () -> m b), SomeGLoc loc) ->
+ case eqT :: Maybe (i :~: Strict.ByteString) of
+ Nothing -> error "Some stream reader isn't expecting a stream of strict ByteStrings"
+ Just Refl -> do
+ loc' <- fillLoc repetKeyMap loc
+ katipAddNamespace "dataAccessor" $ katipAddNamespace "reader" $
+ katipAddContext (DAC (toJSON loc) ATRead rkeys repetKeyMap (toJSON loc')) $ do
+ let runRead = readBSS loc' (f . BSS.toChunks)
+ r <- timeAccess "Read" sevRead (show loc') $ withException runRead $ \ioError ->
+ logFM sevError $ logStr $ displayException (ioError :: IOException)
+ return r)
+ let embeddedValAndLayers = maybe id (:) mbDefVal dataFromLayers
+ case (readScheme, embeddedValAndLayers) of
+ (_, [x]) -> return x
+ (LayeredReadWithNull, ls) -> return $ mconcat ls
+ (_, []) -> throwWithPrefix $ vpath ++ " has no layers from which to read"
+ (LayeredRead, l:ls) -> return $ foldl (<>) l ls
+ (SingleLayerRead, ls) -> do
+ logFM WarningS $ logStr $ vpath ++
+ " doesn't support layered mapping. Using only result from last layer '"
+ ++ show (last $ layers) ++ "'"
+ return $ last ls
+
+ fillLoc' rkMap loc = terminateLocWithVars $ spliceLocVariables rkMap loc
+ fillLoc rkMap loc =
+ case fillLoc' rkMap loc of
+ Left e -> katipAddNamespace "dataAccessor" $ throwWithPrefix e
+ Right r -> return r
+
+-- | Transform a file node with physical locations in node with a data access
+-- function to run. Matches the location (especially the filetype/extension) to
+-- the readers & writers available in the 'VirtualFile'.
+resolveDataAccess
+ :: forall m m'. (LogMask m, MonadThrow m')
+ => PhysicalFileNode m
+ -> m' (DataAccessNode m)
+resolveDataAccess (PhysicalFileNode{pfnodeFile=vf, ..}) = do
+ -- resolveDataAccess performs some checks when we build the pipeline:
+
+ -- First, that we aren't illegally binding to no layers a VirtualFile that
+ -- will be read without having any default value:
+ case (any (==ATRead) pfnodeAccesses, pfnodeLayers) of
+ (True, [])
+ -> case readScheme of
+ LayeredReadWithNull -> return ()
+ _ -> case mbEmbeddedVal of
+ Just _ -> return ()
+ Nothing ->
+ throwM $ TaskConstructionError $
+ vpath ++ " cannot be mapped to null. It will be read from and doesn't contain any default value."
+ _ -> return ()
+ -- Then, that we aren't writing to an unsupported filetype:
+ writeLocs <- findFunctions (typeOf (undefined :: Lazy.ByteString)) writers
+ -- And finally, that we aren't reading from an unsupported filetype:
+ readLocs <- findFunctions (typeOf (undefined :: Strict.ByteString)) readers
+ return $
+ DataAccessNode pfnodeLayers $
+ makeDataAccessor vpath vf pfnodeLayers
+ mbEmbeddedVal readScheme
+ writeLocs readLocs
+ where
+ readScheme = vf ^. vfileLayeredReadScheme
+ mbEmbeddedVal = vf ^. vfileEmbeddedValue
+
+ vpath = T.unpack $ toTextRepr $ LTP $ vf ^. vfileOriginalPath
+
+ readers = vf ^. vfileSerials . serialReaders . serialReadersFromStream
+ writers = vf ^. vfileSerials . serialWriters . serialWritersToAtomic
+
+ findFunctions :: TypeRep -> HM.HashMap (TypeRep,Maybe FileExt) v -> m' [(v, SomeLocWithVars m)]
+ findFunctions typeRep hm | HM.null hm = return []
+ | otherwise = mapM findFunction pfnodeLayers
+ where
+ findFunction (SomeGLoc loc) = case HM.lookup (typeRep, Just $ T.pack $ getLocType loc) hm of
+ Just f -> return (f, SomeGLoc loc)
+ -- TODO: add VirtualFile path to error
+ Nothing -> throwM $ TaskConstructionError $
+ show loc ++ " is mapped to " ++ vpath ++ " which doesn't support filetype '" ++ getLocType loc ++
+ "'. Accepted filetypes here are: " ++
+ mconcat (intersperse "," [T.unpack ext | (_,Just ext) <- HM.keys hm]) ++ "."
+
+resolveDataAccess (MbPhysicalFileNode layers _ _) =
+ return $ MbDataAccessNode layers $ First Nothing
diff --git a/src/System/TaskPipeline/Repetition.hs b/src/System/TaskPipeline/Repetition.hs
new file mode 100644
index 0000000..8dde3e4
--- /dev/null
+++ b/src/System/TaskPipeline/Repetition.hs
@@ -0,0 +1,119 @@
+{-# LANGUAGE GeneralizedNewtypeDeriving #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE TupleSections #-}
+
+module System.TaskPipeline.Repetition
+ ( RepInfo(..)
+ , TRIndex(..)
+ , HasTRIndex(..)
+ , OneOrSeveral(..)
+ , parMapTask
+ , parMapTask_
+ , IndexRange(..)
+ , oneIndex
+ , oneRange
+ , enumIndices
+ , enumTRIndices
+ ) where
+
+import Control.Applicative
+import Control.Arrow.Free (mapA)
+import Control.Lens hiding ((.=))
+import Control.Monad
+import Data.Aeson
+import Data.Aeson.Types (Parser)
+import qualified Data.Text as T
+import Prelude hiding ((.))
+import System.TaskPipeline.PTask
+import System.TaskPipeline.Repetition.Internal
+
+
+-- | Makes a 'PTask' repeatable and maps it in parallel over a list.
+parMapTask
+ :: (HasTRIndex a, KatipContext m)
+ => RepInfo
+ -> PTask m a b
+ -> PTask m [a] [b]
+parMapTask ri =
+ over taskRunnablePart mapA . makeTaskRepeatable ri
+
+-- | Simply repeats a task which takes no input over a list of indices, and
+-- ignores the end result. See 'RepInfo' for how these indices are
+-- used. See 'parMapTask' for a more complete version.
+parMapTask_
+ :: (HasTRIndex idx, KatipContext m)
+ => RepInfo
+ -> PTask m () b
+ -> PTask m [idx] ()
+parMapTask_ ri task =
+ arr (map (, ())) >>> parMapTask ri (arr snd >>> task) >>> arr (const ())
+
+
+-- * A simple type to handle index ranges
+
+data OneRange i = OneIndex i | OneRange i i
+
+toJSONStr :: (ToJSON a) => a -> Either Value T.Text
+toJSONStr a = case toJSON a of
+ String s -> Right s
+ Number n -> Right $ T.pack $ show n
+ o -> Left o
+
+parseJSONStr :: (FromJSON a) => T.Text -> Parser a
+parseJSONStr v = tryNumber v <|> parseJSON (String v)
+ where
+ tryNumber n = case reads $ T.unpack n of
+ [(n',_)] -> parseJSON $ Number n'
+ _ -> fail "Not a number"
+
+instance (ToJSON i) => ToJSON (OneRange i) where
+ toJSON (OneIndex i) = toJSON i
+ toJSON (OneRange a b) = case (toJSONStr a, toJSONStr b) of
+ (Right a', Right b') -> String $ a' <> ".." <> b'
+ (a', b') -> object ["lower" .= toJ a', "upper" .= toJ b']
+ where toJ (Left o) = o
+ toJ (Right s) = String s
+
+instance (FromJSON i) => FromJSON (OneRange i) where
+ parseJSON o@(String s) = case T.splitOn ".." s of
+ [a,b] -> (OneRange <$> parseJSONStr a <*> parseJSONStr b)
+ <|> (OneIndex <$> parseJSON o)
+ _ -> OneIndex <$> parseJSON o
+ parseJSON (Object o) = OneRange <$> o .: "lower" <*> o .: "upper"
+ parseJSON o = OneIndex <$> parseJSON o
+
+-- | Allows to read from a JSON file either one @a@ or an array of @a@
+newtype OneOrSeveral a = OneOrSeveral {getOneOrSeveral :: [a]}
+
+instance (ToJSON a) => ToJSON (OneOrSeveral a) where
+ toJSON (OneOrSeveral [r]) = toJSON r
+ toJSON (OneOrSeveral rs) = toJSON rs
+
+instance (FromJSON a) => FromJSON (OneOrSeveral a) where
+ parseJSON o@(Array _) = OneOrSeveral <$> parseJSON o
+ parseJSON o = OneOrSeveral . (:[]) <$> parseJSON o
+
+-- | A simple index list that can be used in configuration, and from which a
+-- list of indices can be extracted. The JSON representation of it is more
+-- compact than that of [(i,i)], as ranges are represented by "a..b" strings
+newtype IndexRange i = IndexRange (OneOrSeveral (OneRange i))
+ deriving (FromJSON, ToJSON)
+
+-- | A range of just one index
+oneIndex :: i -> IndexRange i
+oneIndex i = IndexRange $ OneOrSeveral [OneIndex i]
+
+-- | A range of consecutive values
+oneRange :: i -> i -> IndexRange i
+oneRange a b = IndexRange $ OneOrSeveral [OneRange a b]
+
+-- | Gives a list of indices from an index range
+enumIndices :: (Enum i) => IndexRange i -> [i]
+enumIndices (IndexRange (OneOrSeveral rs)) = concatMap toL rs
+ where
+ toL (OneIndex i) = [i]
+ toL (OneRange a b) = [a..b]
+
+-- | Gives a list of TaskRepetitionIndex
+enumTRIndices :: (Enum i, Show i) => IndexRange i -> [TRIndex]
+enumTRIndices = map (TRIndex . show) . enumIndices
diff --git a/src/System/TaskPipeline/Repetition/Foldl.hs b/src/System/TaskPipeline/Repetition/Foldl.hs
new file mode 100644
index 0000000..91cd2c8
--- /dev/null
+++ b/src/System/TaskPipeline/Repetition/Foldl.hs
@@ -0,0 +1,145 @@
+{-# LANGUAGE Arrows #-}
+{-# LANGUAGE ExistentialQuantification #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE RankNTypes #-}
+{-# LANGUAGE BangPatterns #-}
+{-# OPTIONS_GHC -fno-warn-orphans #-}
+
+-- | This module implements a Foldl-based interface for arrow computations
+-- compatible with the <https://hackage.haskell.org/package/foldl foldl
+-- library>. Use 'generalizeA' and 'generalizeM' to convert folds to
+-- 'FoldA'. This is the most general way to repeat a 'PTask' over some an input
+-- (list, array, stream, etc.).
+--
+-- This API is still experimental and might be subject to changes in the future
+
+module System.TaskPipeline.Repetition.Foldl
+ ( module Control.Arrow.FoldA
+ , RepInfo(..)
+ , TRIndex(..)
+ , HasTRIndex(..)
+ , generalizeM
+ , generalizeM_
+ , foldlTask
+ , foldStreamTask
+ , runFoldAOverPTask
+ , premapMaybe
+ ) where
+
+import Control.Arrow.FoldA
+import Control.Lens hiding (Fold)
+import Data.Locations
+import Prelude hiding ((.), id)
+import Streaming (Of (..), Stream)
+import qualified Streaming.Prelude as S
+import System.TaskPipeline.PTask
+import System.TaskPipeline.PTask.Internal
+import System.TaskPipeline.Repetition.Internal
+
+
+-- * Folding data with a PTask
+
+data RunningFoldM m a b =
+ forall x. RFM (x -> a -> m x) !x (x -> m b)
+
+-- | Turns a function creating a 'FoldM' into a 'FoldA' over 'PTasks'
+generalizeM :: (KatipContext m)
+ => (i -> FoldM m a b)
+ -> FoldA (PTask m) i a b
+generalizeM f =
+ FoldA (toTask $ \(Pair (RFM step acc done) x) -> do
+ acc' <- step acc x
+ return $ RFM step acc' done)
+ (toTask $ \i ->
+ case f i of
+ FoldM step start done -> do
+ initAcc <- start
+ return $ RFM step initAcc done)
+ (toTask $ \(RFM _ acc done) -> done acc)
+
+-- | Turns a 'FoldM' in some monad to a 'FoldA' compatible with 'foldTask'
+--
+-- This is a version of 'generalizeM' for when your initial accumulator doesn't
+-- need to be computed by a PTask
+generalizeM_ :: (KatipContext m)
+ => FoldM m a b -> FoldA (PTask m) i a b
+generalizeM_ (FoldM step start done) =
+ FoldA (toTask $ \(Pair a x) -> step a x)
+ (toTask $ const start)
+ (toTask done)
+
+instance (HasTRIndex a)
+ => HasTRIndex (Pair x a) where
+ getTRIndex (Pair _ a) = getTRIndex a
+
+-- | Runs a 'FoldA' created with 'arrowFold', 'generalizeA', 'unsafeGeneralizeM',
+-- or a composition of such folds.
+--
+-- You shouldn't use 'runFoldAOverPTask' directly in client code, rather you should
+-- specialize it to some collection. See e.g 'foldStreamTask' or 'foldlTask'.
+runFoldAOverPTask
+ :: (HasTRIndex a, KatipContext m)
+ => (forall ar x. (ArrowChoice ar)
+ => (forall inp out. (inp -> m out) -> ar inp out)
+ -> ar (Pair x a) x
+ -> ar (x, col) (x, r))
+ -- ^ This function receives a function to wrap an action in the @m@ monad
+ -- and the step to repeat. It should consume the collection
+ -> RepInfo -- ^ How to log the repeated task
+ -> FoldA (PTask m) i a b -- ^ The 'FoldA' to run
+ -> PTask m (i, col) (b, r)
+runFoldAOverPTask loopStep ri (FoldA step_ start done) =
+ (reqs, runnable) ^. from splitTask
+ where
+ (reqsStep, runnableStep) = makeTaskRepeatable ri step_ ^. splitTask
+ (reqsStart, runnableStart) = start ^. splitTask
+ (reqsDone, runnableDone) = done ^. splitTask
+ reqs = reqsStart <> reqsStep <> reqsDone
+ runnable =
+ first runnableStart >>> loopStep (withRunnableState . const) runnableStep
+ >>> first runnableDone
+
+-- | Consumes a Stream with a 'FoldA' created with 'arrowFold', 'generalizeA',
+-- 'unsafeGeneralizeM', or a composition of such folds.
+foldStreamTask
+ :: (HasTRIndex a, KatipContext m)
+ => RepInfo -- ^ How to log the repeated task
+ -> FoldA (PTask m) i a b -- ^ The FoldA to run
+ -> PTask m (i, Stream (Of a) m r) (b, r)
+foldStreamTask = runFoldAOverPTask $ \wrap step ->
+ let
+ consumeStream = proc (acc, stream) -> do
+ firstElem <- wrap S.next -< stream
+ case firstElem of
+ Left r -> returnA -< (acc, r)
+ Right (a, stream') -> do
+ !acc' <- step -< Pair acc a
+ consumeStream -< (acc', stream')
+ in consumeStream
+
+-- | Consumes a Foldable with a 'FoldA' over 'PTask'.
+--
+-- See 'arrowFold' to create such a 'FoldA'
+foldlTask
+ :: (Foldable f, HasTRIndex a, KatipContext m)
+ => RepInfo -- ^ How to log the repeated task
+ -> FoldA (PTask m) i a b
+ -> PTask m (i, f a) b
+foldlTask ri fld =
+ arr (second S.each) >>> foldStreamTask ri fld >>> arr fst
+
+-- | Allows to filter out some data before it is taken into account by the FoldA
+-- of PTask
+--
+-- We provide a implementation specific to PTask because a general
+-- implementation requires ArrowChoice
+premapMaybe :: (a -> Maybe a')
+ -> FoldA (PTask m) i a' b
+ -> FoldA (PTask m) i a b
+premapMaybe f (FoldA step start done) = FoldA step' start done
+ where
+ step' = step & over taskRunnablePart
+ (\run -> proc (Pair acc input) ->
+ case f input of
+ Nothing -> returnA -< acc
+ Just input' -> run -< Pair acc input')
diff --git a/src/System/TaskPipeline/Repetition/Internal.hs b/src/System/TaskPipeline/Repetition/Internal.hs
new file mode 100644
index 0000000..0dab24a
--- /dev/null
+++ b/src/System/TaskPipeline/Repetition/Internal.hs
@@ -0,0 +1,119 @@
+{-# LANGUAGE RecordWildCards #-}
+{-# LANGUAGE GeneralizedNewtypeDeriving #-}
+{-# OPTIONS_GHC -fno-warn-name-shadowing #-}
+
+module System.TaskPipeline.Repetition.Internal
+ ( RepInfo(..)
+ , TRIndex(..)
+ , HasTRIndex(..)
+ , makeTaskRepeatable
+ ) where
+
+import Control.Category
+import Control.Lens hiding ((:>))
+import Control.Monad
+import Data.Aeson
+import qualified Data.HashMap.Strict as HM
+import Data.Locations
+import Data.String (IsString(..))
+import Katip
+import Prelude hiding (id, (.))
+import System.TaskPipeline.PorcupineTree
+import System.TaskPipeline.PTask
+import System.TaskPipeline.PTask.Internal
+
+
+-- | Gives information about how a task will be repeated. The repInfoIndex will
+-- be used as a suffix in the default bindings to locations accessed by this
+-- task. If repInfoLogging is not Nothing, repInfoIndex will also be mentioned
+-- in the context of each line logged by the task to identify which repetition
+-- of the task is generating this log line. RepInfo is an instance of IsString
+-- so you can use it with OverloadedStrings (in which case repInfoIndex will be
+-- added to the context when verbosity level is at least 1).
+data RepInfo = RepInfo
+ { repInfoIndex :: LocVariable
+ -- ^ A name that will be used as a metavariable in the config file. It may
+ -- also be used by the logger as a context key, to indicate which repetition
+ -- is currently running.
+ , repInfoLogging :: Maybe Verbosity
+ -- ^ The minimal vebosity level at which to display the value associated with
+ -- the repetition index in the logger context. Nothing if we don't want to add
+ -- context.
+ } deriving (Eq, Show)
+
+instance IsString RepInfo where
+ fromString s = RepInfo (fromString s) (Just V1)
+
+-- | Logging context for repeated tasks
+data TaskRepetitionContext = TRC
+ { _repetitionKey :: LocVariable
+ , _repetitionKeyVal :: String
+ , _repetitionKeyVerb :: Verbosity }
+
+instance ToJSON TaskRepetitionContext where
+ toJSON (TRC k v _) = toJSON $ HM.singleton k v
+instance ToObject TaskRepetitionContext
+instance LogItem TaskRepetitionContext where
+ payloadKeys v (TRC _ _ v') | v >= v' = AllKeys
+ | otherwise = SomeKeys []
+
+-- | Task Repetition Index. Is given to functions that repeat tasks for each
+-- iteration.
+newtype TRIndex = TRIndex { unTRIndex :: String }
+ deriving (FromJSON, ToJSON)
+
+instance IsString TRIndex where
+ fromString = TRIndex
+
+-- | The class of every data that can be repeated
+class HasTRIndex a where
+ getTRIndex :: a -> TRIndex
+
+instance HasTRIndex TRIndex where
+ getTRIndex = id
+
+instance HasTRIndex Int where
+ getTRIndex = TRIndex . show
+
+instance HasTRIndex Integer where
+ getTRIndex = TRIndex . show
+
+instance (HasTRIndex i) => HasTRIndex (i,a) where
+ getTRIndex (i,_) = getTRIndex i
+
+-- | Turns a task into one that can be called several times, each time with a
+-- different index value @i@. This index will be used to alter every path
+-- accessed by the task. The first argument gives a name to that index, that
+-- will appear in the configuration file in the default bindings for the
+-- VirtualFiles accessed by this task. The second one controls whether we want
+-- to add to the logging context which repetition is currently running.
+makeTaskRepeatable
+ :: (HasTRIndex a, KatipContext m)
+ => RepInfo
+ -> PTask m a b
+ -> PTask m a b
+makeTaskRepeatable (RepInfo repetitionKey mbVerb) =
+ over splitTask
+ (\(reqTree, runnable) ->
+ ( fmap addKeyToVirtualFile reqTree
+ , modifyingRuntimeState alterState id runnable ))
+ where
+ addKeyToVirtualFile VirtualFileNode{..} =
+ VirtualFileNode
+ {vfnodeFile = vfnodeFile &
+ over (vfileSerials.serialRepetitionKeys) (repetitionKey:)
+ ,..}
+ addKeyToVirtualFile emptyNode = emptyNode
+
+ alterState input =
+ over ptrsKatipContext alterContext
+ . over (ptrsDataAccessTree.traversed) addKeyValToDataAccess
+ where
+ idxStr = unTRIndex $ getTRIndex input
+ newCtxItem = TRC repetitionKey idxStr <$> mbVerb
+ alterContext ctx = case newCtxItem of
+ Nothing -> ctx
+ Just item -> ctx <> liftPayload item
+ addKeyValToDataAccess (DataAccessNode l fn) =
+ DataAccessNode l $ fn . HM.insert repetitionKey idxStr
+ addKeyValToDataAccess emptyNode = emptyNode
diff --git a/src/System/TaskPipeline/Repetition/Streaming.hs b/src/System/TaskPipeline/Repetition/Streaming.hs
new file mode 100644
index 0000000..3edfe32
--- /dev/null
+++ b/src/System/TaskPipeline/Repetition/Streaming.hs
@@ -0,0 +1,127 @@
+{-# LANGUAGE FlexibleContexts #-}
+
+-- | This module contains functions that map a task over a stream. Delaying
+-- effects of tasks like this plays badly with caching. This interface will be
+-- removed in a future version of porcupine.
+
+module System.TaskPipeline.Repetition.Streaming
+ ( STask, ISTask, OSTask
+ , mappingOverStream
+ , listToStreamTask, runStreamTask, streamToListTask
+ , Typeable
+ ) where
+
+import Control.Arrow
+import Control.Category
+import Control.Lens hiding ((:>))
+import Control.Monad
+import Data.Locations
+import Katip
+import Prelude hiding ((.))
+import Streaming (Of (..), Stream)
+import qualified Streaming.Prelude as S
+import System.TaskPipeline.PTask
+import System.TaskPipeline.PTask.Internal
+import System.TaskPipeline.Repetition.Internal
+
+
+-- * Type aliases for tasks over streams
+
+-- | An PTask mapping a action over a Stream, transforming @a@'s into
+-- @b@'s. Each element in the stream should be associated to an identifier.
+type STask m a b r =
+ PTask m
+ (Stream (Of a) m r)
+ (Stream (Of b) m r)
+
+-- | An 'PTask' that consumes an Input Stream and just returns its result.
+type ISTask m a r =
+ PTask m
+ (Stream (Of a) m r)
+ r
+
+-- | An 'PTask' that emits an Output Stream.
+type OSTask m a b =
+ PTask m
+ a
+ (Stream (Of b) m ())
+
+-- * Running tasks over streams
+
+-- | Turns a task into something that will be repeated once per each item in its
+-- input. This is done by transforming VirtualFile accessed by the tasks to add
+-- a 'RepetitionKey' to it, indicating that its final file name should be
+-- modified by adding an identifier to it just before reading it or writing it.
+-- So each loop actually accesses different locations in the end.
+--
+-- Calls to 'mappingOverStream' can be nested, this way the underlying VirtualFiles
+-- will have one 'RepetitionKey' per loop (from outermost loop to innermost).
+mappingOverStream
+ :: (HasTRIndex a, CanRunPTask m)
+ => LocVariable -- ^ A variable name, used as a key to indicate which
+ -- repetition we're at. Used in the logger context and
+ -- exposed in the yaml file for each VirtualFile that
+ -- will be repeated by this task
+ -> Maybe Verbosity -- ^ The minimal vebosity level at which to display the
+ -- logger context. (Nothing if we don't want to add
+ -- context)
+ -> PTask m a b -- ^ The base task X to repeat
+ -> STask m a b r -- ^ A task that will repeat X it for each input. Each
+ -- input is associated to a identifier that will be
+ -- appended to every Loc mapped to every leaf in the
+ -- LocationTree given to X.
+mappingOverStream repetitionKey mbVerb =
+ over taskRunnablePart mappingRunnableOverStream
+ . makeTaskRepeatable (RepInfo repetitionKey mbVerb)
+
+{-# DEPRECATED mappingOverStream "Prefer the FoldA API to repeat tasks and consume streams" #-}
+
+-- | IMPORTANT: That requires the RunnableTask to be repeatable. See
+-- 'makeTaskRepeatable'.
+mappingRunnableOverStream
+ :: (CanRunPTask m)
+ => RunnableTask m a b
+ -> RunnableTask m
+ (Stream (Of a) m r)
+ (Stream (Of b) m r)
+mappingRunnableOverStream runnable =
+ withRunnableState $ \state inputStream -> do
+ firstElem <- S.next inputStream
+ case firstElem of
+ Left r -> return (return r) -- Empty input stream
+ Right (firstInput, inputStream') -> do
+ firstResult <- go state firstInput
+ return $
+ firstResult `S.cons` S.mapM (go state) inputStream'
+ where
+ go = execRunnableTask runnable
+ -- NOTE: We "cheat" here: we run the funflow layer of the inner
+ -- task. We should find a way not to have to do that, but when using
+ -- Streaming (which delays effects in a monad) it's really problematic.
+
+-- * Helper functions to create and run streams
+
+-- | Runs the input stream, forgets all its elements and just returns its result
+runStreamTask :: (KatipContext m)
+ => PTask m
+ (Stream (Of t) m r)
+ r
+runStreamTask = toTask S.effects
+
+-- | An 'PTask' converting a list to a stream
+listToStreamTask :: (Monad m)
+ => PTask m
+ [t]
+ (Stream (Of t) m ())
+listToStreamTask = arr S.each
+
+-- | An 'PTask' converting an input stream to a list. WARNING: It can cause
+-- space leaks if the list is too big, as the output list will be eagerly
+-- evaluated. This function is provided only for compatibility with existing
+-- tasks expecting lists. Please consider switching to processing streams
+-- directly. See 'S.toList' for more details.
+streamToListTask :: (KatipContext m)
+ => PTask m
+ (Stream (Of t) m r)
+ [t]
+streamToListTask = toTask (S.toList_ . void)
diff --git a/src/System/TaskPipeline/Run.hs b/src/System/TaskPipeline/Run.hs
new file mode 100644
index 0000000..e81f0ce
--- /dev/null
+++ b/src/System/TaskPipeline/Run.hs
@@ -0,0 +1,269 @@
+{-# LANGUAGE ConstraintKinds #-}
+{-# LANGUAGE DataKinds #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE GADTs #-}
+{-# LANGUAGE OverloadedLabels #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE RankNTypes #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE TemplateHaskell #-}
+{-# OPTIONS_GHC "-fno-warn-missing-signatures" #-}
+
+module System.TaskPipeline.Run
+ ( PipelineConfigMethod(..)
+ , PipelineCommand(..)
+ , CanRunPTask
+ , Rec(..)
+ , ContextRunner(..)
+ , SimplePTaskM, BasePorcupineContexts
+ , ReaderSoup
+ , FieldWithAccessors
+ , AcceptableArgsAndContexts
+ , (<--)
+ , (:::)
+ , baseContexts, baseContextsWithScribeParams
+ , maxVerbosityLoggerScribeParams
+ , warningsAndErrorsLoggerScribeParams
+ , runPipelineTask
+ , runLocalPipelineTask
+ , simpleRunPTask
+ , runPipelineTaskWithExceptionHandlers
+ , runPipelineCommandOnPTask
+ ) where
+
+import Control.Lens
+import Control.Monad.IO.Class
+import Control.Monad.ReaderSoup
+import Control.Monad.ReaderSoup.Katip ()
+import Data.Locations hiding ((</>))
+import Data.Locations.Accessors
+import Data.Maybe
+import Data.Vinyl.Derived (HasField, rlensf)
+import Katip
+import Prelude hiding ((.))
+import System.Environment (lookupEnv, withArgs)
+import System.FilePath ((</>))
+import System.Posix.Directory (getWorkingDirectory)
+import System.TaskPipeline.CLI
+import System.TaskPipeline.Logger
+import System.TaskPipeline.PorcupineTree
+import System.TaskPipeline.PTask
+import System.TaskPipeline.PTask.Internal
+
+
+-- | Tells whether a record of args can be used to run a PTask
+type AcceptableArgsAndContexts args ctxs m =
+ (ArgsForSoupConsumption args, ctxs ~ ContextsFromArgs args
+ ,IsInSoup ctxs "katip", IsInSoup ctxs "resource"
+ ,RunsKatipOver args m)
+
+-- | We need to have some information about how katip will be run, because we
+-- will want to override that from the command-line
+type RunsKatipOver args m =
+ (HasField Rec "katip" args args (ContextRunner KatipContextT m) (ContextRunner KatipContextT m)
+ ,MonadMask m, MonadIO m)
+
+-- | Runs an 'PTask' according to some 'PipelineConfigMethod' and with an input
+-- @i@. In principle, it should be directly called by your @main@ function. It
+-- exits with @ExitFailure 1@ when the 'PipelineTask' raises an uncatched
+-- exception.
+runPipelineTask
+ :: (AcceptableArgsAndContexts args ctxs m)
+ => PipelineConfigMethod o -- ^ Whether to use the CLI and load the yaml
+ -- config or not
+ -> Rec (FieldWithAccessors (ReaderSoup ctxs)) args -- ^ The location
+ -- accessors to use
+ -> PTask (ReaderSoup ctxs) i o -- ^ The whole pipeline task to run
+ -> i -- ^ The pipeline task input
+ -> IO o -- ^ The pipeline task output
+runPipelineTask = runPipelineTaskWithExceptionHandlers []
+
+runPipelineTaskWithExceptionHandlers
+ :: (AcceptableArgsAndContexts args ctxs m)
+ => [Handler IO o] -- ^ Exception handlers in case the pipeline raises
+ -- an exception.
+ -> PipelineConfigMethod o -- ^ Whether to use the CLI and load the yaml
+ -- config or not
+ -> Rec (FieldWithAccessors (ReaderSoup ctxs)) args -- ^ The location
+ -- accessors to use
+ -> PTask (ReaderSoup ctxs) i o -- ^ The whole pipeline task to run
+ -> i -- ^ The pipeline task input
+ -> IO o -- ^ The pipeline task output
+runPipelineTaskWithExceptionHandlers exceptionHandlers configMethod accessors ptask input = do
+ let tree = ptask ^. taskRequirements
+ catches
+ (bindVirtualTreeAndRun configMethod accessors tree $
+ runPipelineCommandOnPTask ptask input)
+ exceptionHandlers
+
+-- | A monad that implements MonadIO, MonadUnliftIO, KatipContext and
+-- MonadResource. For simplest uses of porcupine.
+type SimplePTaskM = ReaderSoup BasePorcupineContexts
+
+-- | Like 'runPipelineTask' if you don't need to access any other resources than
+-- local files. Uses the 'maxVerbosityLoggerScribeParams' by default.
+runLocalPipelineTask
+ :: PipelineConfigMethod o
+ -> PTask SimplePTaskM i o
+ -> i
+ -> IO o
+runLocalPipelineTask configMethod =
+ runPipelineTask configMethod (baseContexts $ configMethod ^. pipelineConfigMethodProgName)
+
+-- | Runs a PTask without reading any configuration nor parsing the CLI, with
+-- only local files being accessible, and using PWD as the root location for all
+-- files read and written.
+simpleRunPTask
+ :: PTask SimplePTaskM i o
+ -> i
+ -> IO o
+simpleRunPTask = runLocalPipelineTask (NoConfig "simpleRunPTask" ".")
+
+-- | Runs the required 'PipelineCommand' on an 'PTask'
+runPipelineCommandOnPTask
+ :: (CanRunPTask m)
+ => PTask m i o
+ -> i
+ -> PipelineCommand
+ -> Maybe o
+ -> PhysicalTree m
+ -> FunflowOpts m
+ -> m o
+runPipelineCommandOnPTask ptask input cmd defRetVal physTree ffopts = do
+ case cmd of
+ RunPipeline -> do
+ dataTree <- traverse resolveDataAccess physTree
+ withTaskState ffopts dataTree $ \initState -> do
+ $(logTM) NoticeS $ logStr $ case flowIdentity ffopts of
+ Just i -> "Using funflow store at '" ++ storePath ffopts ++ "' with identity "
+ ++ show i ++ "." ++
+ (case remoteCacheLoc ffopts of
+ Just l -> "Using remote cache at " ++ show l
+ _ -> "")
+ Nothing -> identityVar ++ " not specified. The cache will not be used."
+ execRunnableTask (ptask ^. taskRunnablePart) initState input
+ ShowTree root showOpts -> do
+ liftIO $ putStrLn $ prettyLocTree root $
+ case physTree ^. inLocTree root of
+ Just t -> fmap (PhysicalFileNodeWithShowOpts showOpts) t
+ _ -> error $ "Path `" ++ showLTP root ++ "' doesn't exist in the porcupine tree"
+ case defRetVal of
+ Just r -> return r
+ Nothing -> error "NOT EXPECTED: runPipelineCommandOnPTask(ShowTree) was not given a default\
+ \value to return. Please submit this as a bug."
+
+storeVar,remoteCacheVar,identityVar,coordVar :: String
+storeVar = "FUNFLOW_STORE"
+remoteCacheVar = "FUNFLOW_REMOTE_CACHE"
+coordVar = "FUNFLOW_COORDINATOR"
+identityVar = "FUNFLOW_IDENTITY"
+
+-- | Reads the relevant environment variables to construct the set of parameters
+-- necessary to initialize funflow
+getFunflowOpts :: (MonadIO m, LogThrow m) => LocResolutionM m (FunflowOpts m)
+getFunflowOpts = do
+ pwd <- liftIO getWorkingDirectory
+ givenStore <- lookupEnv' storeVar
+ opts <- FunflowOpts
+ (fromMaybe (pwd </> "_funflow/store") givenStore)
+ <$> (fromMaybe (pwd </> "_funflow/coordinator.db") <$> lookupEnv' coordVar)
+ <*> (lookupEnv' identityVar >>= parseIdentity)
+ <*> (lookupEnv' remoteCacheVar >>= traverse resolveYamlDocToSomeLoc)
+ case (flowIdentity opts, givenStore, remoteCacheLoc opts) of
+ (Nothing, Just _, _) -> warnAboutIdentity storeVar
+ (Nothing, _, Just _) -> warnAboutIdentity remoteCacheVar
+ _ -> return ()
+ return opts
+ where
+ lookupEnv' = liftIO . lookupEnv
+ parseIdentity Nothing = return Nothing
+ parseIdentity (Just "") = return Nothing
+ parseIdentity (Just s) = case reads s of
+ [(i,_)] -> return $ Just i
+ _ -> fail $ identityVar ++ " isn't a valid integer"
+ warnAboutIdentity var = $(logTM) WarningS $ logStr $
+ var ++ " has been given but no " ++ identityVar ++
+ " has been provided. Caching will NOT be performed."
+
+-- | Resolve all the JSON values in the mappings and paths from environment (for
+-- funflow) to locations tied to their respective LocationAccessors
+getPhysTreeAndFFOpts
+ :: (MonadIO m, LogThrow m)
+ => VirtualTreeAndMappings
+ -> AvailableAccessors m
+ -> m (PhysicalTree m, FunflowOpts m)
+getPhysTreeAndFFOpts vtam accessors =
+ flip runReaderT accessors $
+ (,) <$> getPhysicalTreeFromMappings vtam
+ <*> getFunflowOpts
+
+-- | Runs the cli if using FullConfig, binds every location in the virtual tree
+-- to its final value/path, and passes to the continuation the physical tree.
+bindVirtualTreeAndRun
+ :: (AcceptableArgsAndContexts args ctxs m)
+ => PipelineConfigMethod r -- ^ How to read the configuration
+ -> Rec (FieldWithAccessors (ReaderSoup ctxs)) args
+ -> VirtualTree -- ^ The tree to look for DocRecOfoptions in
+ -> (PipelineCommand
+ -> Maybe r
+ -> PhysicalTree (ReaderSoup ctxs)
+ -> FunflowOpts (ReaderSoup ctxs)
+ -> ReaderSoup ctxs r)
+ -- ^ What to do with the tree
+ -> IO r
+bindVirtualTreeAndRun (NoConfig _ root) accessorsRec tree f =
+ consumeSoup argsRec $ do
+ (physTree, ffPaths) <- getPhysTreeAndFFOpts defaultConfig accessors
+ f RunPipeline Nothing physTree ffPaths
+ where
+ defaultConfig = VirtualTreeAndMappings tree (Left root) mempty
+ (accessors, argsRec) = splitAccessorsFromArgRec accessorsRec
+bindVirtualTreeAndRun (ConfigFileOnly progName configFileURL defRoot) accessorsRec tree f = do
+ -- We deactivate every argument that might have been passed so the only choice
+ -- is to run the pipeline. Given the parsing of the config file and the
+ -- command-line are quite related, it is difficult to just remove the CLI
+ -- parsing until that part of the code is refactored to better separate CLI
+ -- parsing and deserialization of the VirtualTreeAndMappings from the config
+ -- file
+ res <- withArgs ["-qq", "--context-verb", "2", "--log-format", "compact"] $
+ -- No CLI arg is passable, so until we improve CLI parsin as stated
+ -- just above, in that case we limit ourselves to warnings and errors
+ bindVirtualTreeAndRun (FullConfig progName configFileURL defRoot Nothing) accessorsRec tree $
+ \_ _ t o -> Just <$> f RunPipeline Nothing t o
+ case res of
+ Just r -> return r
+ Nothing -> error "NOT EXPECTED: bindVirtualTreeAndRun(ConfigFileOnly) didn't receive a result\
+ \from the pipeline. Please submit this as a bug."
+bindVirtualTreeAndRun (FullConfig progName defConfigFileURL defRoot defRetVal) accessorsRec tree f =
+ withConfigFileSourceFromCLI $ \mbConfigFileSource -> do
+ let configFileSource = fromMaybe (ConfigFileURL (LocalFile defConfigFileURL)) mbConfigFileSource
+ mbConfigFromFile <-
+ tryReadConfigFileSource configFileSource $ \remoteURL ->
+ consumeSoup argsRec $ do
+ -- If config file is remote, we use the accessors and run
+ -- the readerSoup with the defaut katip params
+ SomeGLoc loc <- flip runReaderT accessors $
+ resolvePathToSomeLoc $ show remoteURL
+ -- TODO: Implement locExists in each accessor and use it
+ -- here. For now we fail if given a remote config that
+ -- doesn't exist.
+ readBSS loc decodeYAMLStream
+ parser <- pipelineCliParser virtualTreeConfigurationReader progName $
+ BaseInputConfig (case configFileSource of
+ ConfigFileURL (LocalFile filep) -> Just filep
+ _ -> Nothing)
+ mbConfigFromFile
+ defaultConfig
+ withCliParser progName "Run a task pipeline" parser defRetVal run
+ where
+ defaultConfig = VirtualTreeAndMappings tree (Left defRoot) mempty
+ (accessors, argsRec) = splitAccessorsFromArgRec accessorsRec
+ run finalConfig cmd lsp performConfigWrites =
+ let -- We change the katip runner, from the options we got from CLI:
+ argsRec' = argsRec & set (rlensf #katip)
+ (ContextRunner (runLogger progName lsp))
+ in
+ consumeSoup argsRec' $ do
+ unPreRun performConfigWrites
+ (physTree, ffPaths) <- getPhysTreeAndFFOpts finalConfig accessors
+ f cmd (Just defRetVal) physTree ffPaths
diff --git a/src/System/TaskPipeline/VirtualFileAccess.hs b/src/System/TaskPipeline/VirtualFileAccess.hs
new file mode 100644
index 0000000..c6c3722
--- /dev/null
+++ b/src/System/TaskPipeline/VirtualFileAccess.hs
@@ -0,0 +1,405 @@
+{-# LANGUAGE ExistentialQuantification #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE PartialTypeSignatures #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE TupleSections #-}
+{-# LANGUAGE TypeFamilies #-}
+{-# LANGUAGE TypeOperators #-}
+{-# OPTIONS_GHC -Wall #-}
+
+-- | This module provides some utilities for when the pipeline needs to access
+-- several files organized in layers for each location in the 'LocationTree'
+module System.TaskPipeline.VirtualFileAccess
+ ( -- * Reexports
+ module Data.Locations.LogAndErrors
+
+ -- * High-level API
+ , loadData
+ , loadDataStream
+ , loadDataList
+ , tryLoadDataStream
+ , writeData
+ , writeDataStream
+ , writeDataList
+ , writeEffData
+ , writeDataFold
+ , DataWriter(..), DataReader(..)
+ , getDataWriter, getDataReader
+
+ -- * Lower-level API
+ , EffectSeq(..), EffectSeqFromList(..)
+ , SingletonES(..), ListES(..), StreamES(..)
+ , AccessToPerform(..)
+ , DataAccessor(..)
+ , VFNodeAccessType(..)
+ , SomeGLoc(..), SomeLoc
+ , accessVirtualFile'
+ , getDataAccessorFn
+ , getLocsMappedTo
+
+ -- * Internal API
+ , accessVirtualFile
+ , withVFileInternalAccessFunction
+ , withFolderDataAccessNodes
+ ) where
+
+import Prelude hiding (id, (.))
+
+import Control.Funflow (Properties(..))
+import Control.Lens
+import Control.Monad (forM)
+import Control.Monad.Trans
+import Data.Default
+import qualified Data.Foldable as F
+import qualified Data.HashMap.Strict as HM
+import Data.Locations
+import Data.Locations.Accessors
+import Data.Locations.LogAndErrors
+import Data.Monoid
+import Data.Representable
+import qualified Data.Text as T
+import Data.Typeable
+import Streaming (Of (..), Stream)
+import qualified Streaming.Prelude as S
+import System.TaskPipeline.PorcupineTree
+import System.TaskPipeline.PTask
+import System.TaskPipeline.PTask.Internal
+import System.TaskPipeline.Repetition.Internal
+import qualified System.TaskPipeline.Repetition.Foldl as F
+
+
+-- | Uses only the read part of a 'VirtualFile'. It is therefore considered as a
+-- pure 'DataSource'.
+loadData
+ :: (LogThrow m, Typeable a, Typeable b)
+ => VirtualFile a b -- ^ Use as a 'DataSource'
+ -> PTask m ignored b -- ^ The resulting task. Ignores its input.
+loadData vf =
+ getDataReader vf >>> toTask' props drPerformRead
+ where
+ cacher = case _vfileReadCacher vf of
+ NoCache -> NoCache
+ Cache k s r -> Cache (\i d -> k i (fmap toHashableLocs $ drLocsAccessed d)) s r
+ props = Properties Nothing cacher Nothing
+
+-- | Loads a stream of repeated occurences of a VirtualFile, from a stream of
+-- indices. The process is lazy: the data will actually be read when the
+-- resulting stream is consumed BUT this also means that the reading cannot be
+-- cached.
+loadDataStream
+ :: (HasTRIndex idx, LogThrow m, Typeable a, Typeable b, Monoid r)
+ => LocVariable
+ -> VirtualFile a b -- ^ Used as a 'DataSource'
+ -> PTask m (Stream (Of idx) m r) (Stream (Of (idx, b)) m r)
+loadDataStream lv vf =
+ arr (StreamES . S.map (,error "loadDataStream: THIS IS VOID"))
+ >>> accessVirtualFile' (DoRead id) lv vf
+ >>> arr streamFromES
+
+-- | Loads as a list repeated occurences of a VirtualFile, from a list of
+-- indices. BEWARE: the process is _strict_: every file will be loaded in
+-- memory, and the list returned is fully evaluated. So you should not use
+-- 'loadDataList' if the number of files to read is too huge.
+loadDataList
+ :: (HasTRIndex idx, LogThrow m, Typeable a, Typeable b)
+ => LocVariable
+ -> VirtualFile a b -- ^ Used as a 'DataSource'
+ -> PTask m [idx] [(idx, b)]
+loadDataList lv vf =
+ arr (ListES . map (return . (,error "loadDataList: THIS IS VOID")))
+ >>> accessVirtualFile' (DoRead id) lv vf
+ >>> toTask (sequence . getListFromES)
+
+-- | Like 'loadDataStream', but won't stop on a failure on a single file
+tryLoadDataStream
+ :: (Exception e, HasTRIndex idx, LogCatch m, Typeable a, Typeable b, Monoid r)
+ => LocVariable
+ -> VirtualFile a b -- ^ Used as a 'DataSource'
+ -> PTask m (Stream (Of idx) m r) (Stream (Of (idx, Either e b)) m r)
+tryLoadDataStream lv vf =
+ arr (StreamES . S.map (,error "loadDataStream: THIS IS VOID"))
+ >>> accessVirtualFile' (DoRead try) lv vf
+ >>> arr streamFromES
+
+-- | Uses only the write part of a 'VirtualFile'. It is therefore considered as
+-- a pure 'DataSink'.
+writeData
+ :: (LogThrow m, Typeable a, Typeable b)
+ => VirtualFile a b -- ^ Used as a 'DataSink'
+ -> PTask m a ()
+writeData vf =
+ id &&& getDataWriter vf >>> toTask' props (uncurry $ flip dwPerformWrite)
+ where
+ cacher = case _vfileWriteCacher vf of
+ NoCache -> NoCache
+ Cache k s r -> Cache (\i (a, d) -> k i (a, fmap toHashableLocs $ dwLocsAccessed d)) s r
+ props = Properties Nothing cacher Nothing
+
+-- | Like 'writeData', but the data to write first has to be computed by an
+-- action running some effects. Therefore, these effects will be executed only
+-- if the 'VirtualFile' is mapped to something.
+writeEffData
+ :: (LogThrow m, Typeable a, Typeable b)
+ => VirtualFile a b -- ^ Used as a 'DataSink'
+ -> PTask m (m a) ()
+writeEffData vf =
+ arr (SingletonES . (fmap ([] :: [TRIndex],)))
+ >>> accessVirtualFile (DoWrite id) [] vf
+ >>> toTask runES
+
+-- | Like 'writeDataList', but takes a stream as an input instead of a list. If
+-- the VirtualFile is not mapped to any physical file (this can be authorized if
+-- the VirtualFile 'canBeUnmapped'), then the input stream's effects will not be
+-- executed. This is why its end result must be a Monoid. See
+-- System.TaskPipeline.Repetition.Fold for more complex ways to consume a
+-- Stream.
+writeDataStream
+ :: (HasTRIndex idx, LogThrow m, Typeable a, Typeable b, Monoid r)
+ => LocVariable
+ -> VirtualFile a b -- ^ Used as a 'DataSink'
+ -> PTask m (Stream (Of (idx, a)) m r) r
+writeDataStream lv vf =
+ arr StreamES
+ >>> accessVirtualFile' (DoWrite id) lv vf
+ >>> toTask runES
+
+-- | The simplest way to consume a stream of data inside a pipeline. Just write
+-- it to repeated occurences of a 'VirtualFile'.
+writeDataList
+ :: (HasTRIndex idx, LogThrow m, Typeable a, Typeable b)
+ => LocVariable
+ -> VirtualFile a b -- ^ Used as a 'DataSink'
+ -> PTask m [(idx, a)] ()
+writeDataList lv vf =
+ arr (ListES . map return)
+ >>> accessVirtualFile' (DoWrite id) lv vf
+ >>> toTask runES
+
+-- | A very simple fold that will just repeatedly write the data to different
+-- occurences of a 'VirtualFile'.
+writeDataFold :: (LogThrow m, Typeable a, Typeable b)
+ => VirtualFile a b -> F.FoldA (PTask m) i a ()
+writeDataFold vf = F.premapInitA (arr $ const ()) $ F.arrowFold (arr snd >>> writeData vf)
+
+-- | Gets a 'DataAccessor' to the 'VirtualFile', ie. doesn't read or write it
+-- immediately but gets a function that will make it possible.
+getDataAccessorFn
+ :: (LogThrow m, Typeable a, Typeable b)
+ => [VFNodeAccessType] -- ^ The accesses that will be performed on the DataAccessor
+ -> VirtualFile a b
+ -> PTask m () (LocVariableMap -> DataAccessor m a b)
+getDataAccessorFn accesses vfile = withVFileInternalAccessFunction def accesses vfile
+ (\mkAccessor _ _ -> return mkAccessor)
+
+-- | Gets a 'DataWriter' to the 'VirtualFile', ie. a function to write to it
+-- that can be passed to cached tasks.
+getDataWriter
+ :: (LogThrow m, Typeable a, Typeable b)
+ => VirtualFile a b
+ -> PTask m ignored (DataWriter m a)
+getDataWriter vfile = withVFileInternalAccessFunction def [ATWrite] vfile
+ (\mkAccessor _ _ -> case mkAccessor mempty of
+ DataAccessor w _ l -> return $ DataWriter w l)
+
+-- | Gets a 'DataReader' from the 'VirtualFile', ie. a function to read from it
+-- than can be passed to cached tasks.
+getDataReader
+ :: (LogThrow m, Typeable a, Typeable b)
+ => VirtualFile a b
+ -> PTask m ignored (DataReader m b)
+getDataReader vfile = withVFileInternalAccessFunction def [ATRead] vfile
+ (\mkAccessor _ _ -> case mkAccessor mempty of
+ DataAccessor _ r l -> return $ DataReader r l)
+
+-- | Gives a wrapper that should be used when the actual read or write is
+-- performed.
+data AccessToPerform m b b'
+ = DoWrite (m () -> m b')
+ | DoRead (m b -> m b')
+ | DoWriteAndRead (m b -> m b')
+
+-- | A unique value, computed from a action in @m@
+newtype SingletonES m a = SingletonES { getSingletonFromES :: m a }
+
+-- | Just a wrapper around [m a]
+newtype ListES m a = ListES { getListFromES :: [m a] }
+
+-- | Just a wrapper around Stream, with arguments reordered
+newtype StreamES r m a = StreamES { getStreamFromES :: Stream (Of a) m r }
+
+-- | Some class around Stream (Of a) m () and [m a].
+class (Monoid (ESResult seq)) => EffectSeq seq where
+ type ESResult seq :: *
+ mapES :: (Monad m) => (a -> b) -> seq m a -> seq m b
+ mapESM :: (Monad m) => (a -> m b) -> seq m a -> seq m b
+ streamFromES :: (Monad m) => seq m a -> Stream (Of a) m (ESResult seq)
+ runES :: (Monad m) => seq m a -> m (ESResult seq)
+ emptyES :: (Monad m) => seq m a
+
+instance EffectSeq SingletonES where
+ type ESResult SingletonES = ()
+ mapES f = SingletonES . fmap f . getSingletonFromES
+ mapESM f = SingletonES . (>>= f) . getSingletonFromES
+ streamFromES (SingletonES act) = lift act >>= S.yield
+ runES (SingletonES act) = act >> return ()
+ emptyES = SingletonES $ return $ error "SingletonES: THIS IS VOID"
+
+instance EffectSeq ListES where
+ type ESResult ListES = ()
+ mapES f = ListES . map (f <$>) . getListFromES
+ mapESM f = ListES . map (>>= f) . getListFromES
+ streamFromES = mapM_ (\act -> lift act >>= S.yield) . getListFromES
+ runES = sequence_ . getListFromES
+ emptyES = ListES []
+
+instance (Monoid r) => EffectSeq (StreamES r) where
+ type ESResult (StreamES r) = r
+ mapES f = StreamES . S.map f . getStreamFromES
+ mapESM f = StreamES . S.mapM f . getStreamFromES
+ streamFromES = getStreamFromES
+ runES = S.effects . getStreamFromES
+ emptyES = StreamES (return mempty)
+
+class (EffectSeq seq) => EffectSeqFromList seq where
+ esFromList :: (Monad m) => [a] -> seq m a
+
+instance EffectSeqFromList ListES where
+ esFromList = ListES . map return
+
+instance (Monoid r) => EffectSeqFromList (StreamES r) where
+ esFromList x = StreamES $ S.each x >> return mempty
+
+-- | Like 'accessVirtualFile', but uses only one repetition variable
+accessVirtualFile' :: forall seq idx m a b b'.
+ (HasTRIndex idx, LogThrow m, Typeable a, Typeable b
+ ,EffectSeq seq)
+ => AccessToPerform m b b'
+ -> LocVariable
+ -> VirtualFile a b -- ^ Used as a 'DataSource'
+ -> PTask m (seq m (idx, a)) (seq m (idx, b'))
+accessVirtualFile' access repIndex_ vf =
+ arr (mapES $ \(i, a) -> ([i], a))
+ >>> accessVirtualFile access [repIndex_] vf
+ >>> arr (mapES $ first head)
+
+toAccessTypes :: AccessToPerform m b b' -> [VFNodeAccessType]
+toAccessTypes ac = case ac of
+ DoWriteAndRead{} -> [ATWrite,ATRead]
+ DoWrite{} -> [ATWrite]
+ DoRead{} -> [ATRead]
+
+-- | When building the pipeline, stores into the location tree the way to read
+-- or write the required resource. When running the pipeline, accesses the
+-- instances of this ressource corresponding to the values of some repetition
+-- indices.
+accessVirtualFile
+ :: forall m a b b' seq idx.
+ (LogThrow m, Typeable a, Typeable b, HasTRIndex idx
+ ,EffectSeq seq)
+ => AccessToPerform m b b'
+ -> [LocVariable] -- ^ The list of repetition indices. Can be empty if the
+ -- file isn't meant to be repeated
+ -> VirtualFile a b -- ^ The VirtualFile to access
+ -> PTask m (seq m ([idx], a)) (seq m ([idx], b')) -- ^ The resulting task reads a stream of indices and
+ -- input values and returns a stream of the same indices
+ -- associated to their outputs.
+accessVirtualFile accessToDo repIndices vfile =
+ withVFileInternalAccessFunction props (toAccessTypes accessToDo) vfile' $
+ \accessFn isMapped inputEffSeq ->
+ case (isMapped, accessToDo) of
+ (False, DoWrite{}) -> return emptyES
+ -- We don't even run the input effects if the
+ -- input shouldn't be written
+ _ -> return $ mapESM (runOnce accessFn) inputEffSeq
+ where
+ runOnce :: (LocVariableMap -> DataAccessor m a b) -> ([idx], a) -> m ([idx], b')
+ runOnce accessFn (ixVals, input) =
+ (ixVals,) <$> case accessToDo of
+ DoWrite wrap -> wrap $ daPerformWrite da input
+ DoRead wrap -> wrap $ daPerformRead da
+ DoWriteAndRead wrap -> wrap $ daPerformWrite da input >> daPerformRead da
+ where
+ da = accessFn lvMap
+ lvMap = HM.fromList $ zip repIndices $ map (unTRIndex . getTRIndex) ixVals
+ vfile' = case repIndices of
+ [] -> vfile
+ _ -> vfile & over (vfileSerials.serialRepetitionKeys) (repIndices++)
+ props = def
+
+-- | Executes as a task a function that needs to access the content of the
+-- DataAccessNode of a VirtualFile.
+withVFileInternalAccessFunction
+ :: forall m i o a b.
+ (LogThrow m, Typeable a, Typeable b)
+ => Properties i o
+ -> [VFNodeAccessType] -- ^ The accesses that will be performed on it
+ -> VirtualFile a b -- ^ The VirtualFile to access
+ -> ((LocVariableMap -> DataAccessor m a b) -> Bool -> i -> m o)
+ -- ^ The action to run, and a Bool telling if the file has been
+ -- mapped. It will be a function to access the VirtualFile. The
+ -- LocVariableMap can just be empty if the VirtualFile isn't meant to
+ -- be repeated
+ -> PTask m i o
+withVFileInternalAccessFunction props accessesToDo vfile f =
+ withFolderDataAccessNodes props path (Identity fname) $
+ \(Identity n) input -> case n of
+ DataAccessNode layers (action :: LocVariableMap -> DataAccessor m a' b') ->
+ case (eqT :: Maybe (a :~: a'), eqT :: Maybe (b :~: b')) of
+ (Just Refl, Just Refl)
+ -> f action (not $ null layers) input
+ _ -> err "input or output types don't match"
+ _ -> err "no access action is present in the tree"
+ where
+ path = init $ vfile ^. vfileOriginalPath
+ fname = file (last $ vfile ^. vfileOriginalPath) $ VirtualFileNode accessesToDo vfile
+ err s = throwWithPrefix $
+ "withVFileInternalAccessFunction (" ++ showVFileOriginalPath vfile ++ "): " ++ s
+
+-- | Wraps in a task a function that needs to access some items present in a
+-- subfolder of the 'LocationTree' and mark these accesses as done.
+withFolderDataAccessNodes
+ :: (LogThrow m, Traversable t)
+ => Properties i o
+ -> [LocationTreePathItem] -- ^ Path to folder in 'LocationTree'
+ -> t (LTPIAndSubtree VirtualFileNode) -- ^ Items of interest in the subfolder
+ -> (t (DataAccessNode m) -> i -> m o) -- ^ What to run with these items
+ -> PTask m i o -- ^ The resulting PTask
+withFolderDataAccessNodes props path filesToAccess accessFn =
+ makeTask' props tree runAccess
+ where
+ tree = foldr (\pathItem subtree -> folderNode [ pathItem :/ subtree ])
+ (folderNode $ F.toList filesToAccess) path
+ runAccess virtualTree input = do
+ let mbSubtree = virtualTree ^? atSubfolderRec path
+ subtree <- case mbSubtree of
+ Just s -> return s
+ Nothing -> throwWithPrefix $
+ "path '" ++ show path ++ "' not found in the LocationTree"
+ nodeTags <- forM filesToAccess $ \(filePathItem :/ _) ->
+ case subtree ^? atSubfolder filePathItem . locTreeNodeTag of
+ Nothing -> throwWithPrefix $
+ "path '" ++ show filePathItem ++ "' not found in the LocationTree"
+ Just tag -> return tag
+ accessFn nodeTags input
+
+-- | Returns the locs mapped to some path in the location tree. It *doesn't*
+-- expose this path as a requirement (hence the result list may be empty, as no
+-- mapping might exist). SHOULD NOT BE USED UNLESS loadData/writeData cannot do
+-- what you want.
+getLocsMappedTo :: (LogThrow m)
+ => [LocationTreePathItem] -> PTask m () [SomeLoc m]
+getLocsMappedTo path = runnableWithoutReqs $ withRunnableState $
+ \state _ -> getLocs $ state^.ptrsDataAccessTree
+ where
+ onErr (Left s) = throwWithPrefix $
+ "getLocsMappedTo (" ++ T.unpack (toTextRepr (LTP path)) ++ "): " ++ s
+ onErr (Right x) = return x
+ getLocs tree =
+ case tree ^? (atSubfolderRec path . locTreeNodeTag) of
+ -- NOTE: Will fail on repeated folders (because here we can only access
+ -- the final locations -- with variables spliced in -- in the case of
+ -- nodes with a data access function, not intermediary folders).
+ Just (MbDataAccessNode locsWithVars (First mbAccess)) -> case mbAccess of
+ Just (SomeDataAccess fn) -> onErr $ daLocsAccessed $ fn mempty
+ Nothing -> onErr $ traverse terminateLocWithVars locsWithVars
+ _ -> return []