summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYvesPares <>2019-10-09 10:16:00 (GMT)
committerhdiff <hdiff@hdiff.luite.com>2019-10-09 10:16:00 (GMT)
commit0a26644cffcc096b169b4f1ee3b790b562c01c51 (patch)
tree0404a48f10a79540a26f315f0af2da316474e3a9
version 0.1.0.00.1.0.0
-rw-r--r--LICENSE21
-rw-r--r--examples/example-radon/ExampleRadon.hs169
-rw-r--r--examples/example-radon/Plotting.hs79
-rw-r--r--examples/example0.1/Example0_1.hs36
-rw-r--r--examples/example0/Example0.hs30
-rw-r--r--examples/example1/Example1.hs75
-rw-r--r--examples/example2/Example2.hs78
-rw-r--r--porcupine-core.cabal440
-rw-r--r--src/Control/Arrow/FoldA.hs189
-rw-r--r--src/Data/Locations.hs17
-rw-r--r--src/Data/Locations/Accessors.hs381
-rw-r--r--src/Data/Locations/FunflowRemoteCache.hs66
-rw-r--r--src/Data/Locations/Loc.hs381
-rw-r--r--src/Data/Locations/LocVariable.hs14
-rw-r--r--src/Data/Locations/LocationTree.hs283
-rw-r--r--src/Data/Locations/LogAndErrors.hs58
-rw-r--r--src/Data/Locations/Mappings.hs259
-rw-r--r--src/Data/Locations/SerializationMethod.hs745
-rw-r--r--src/Data/Locations/VirtualFile.hs394
-rw-r--r--src/Data/Representable.hs24
-rw-r--r--src/Porcupine.hs15
-rw-r--r--src/Porcupine/Foldl.hs13
-rw-r--r--src/Porcupine/Run.hs5
-rw-r--r--src/Porcupine/Serials.hs7
-rw-r--r--src/Porcupine/Tasks.hs5
-rw-r--r--src/Porcupine/VFiles.hs15
-rw-r--r--src/Streaming/TaskPipelineUtils.hs157
-rw-r--r--src/System/ClockHelpers.hs32
-rw-r--r--src/System/TaskPipeline.hs15
-rw-r--r--src/System/TaskPipeline/CLI.hs437
-rw-r--r--src/System/TaskPipeline/Caching.hs112
-rw-r--r--src/System/TaskPipeline/ConfigurationReader.hs150
-rw-r--r--src/System/TaskPipeline/Logger.hs117
-rw-r--r--src/System/TaskPipeline/Options.hs84
-rw-r--r--src/System/TaskPipeline/PTask.hs218
-rw-r--r--src/System/TaskPipeline/PTask/Internal.hs286
-rw-r--r--src/System/TaskPipeline/PorcupineTree.hs689
-rw-r--r--src/System/TaskPipeline/Repetition.hs119
-rw-r--r--src/System/TaskPipeline/Repetition/Foldl.hs145
-rw-r--r--src/System/TaskPipeline/Repetition/Internal.hs119
-rw-r--r--src/System/TaskPipeline/Repetition/Streaming.hs127
-rw-r--r--src/System/TaskPipeline/Run.hs269
-rw-r--r--src/System/TaskPipeline/VirtualFileAccess.hs405
43 files changed, 7280 insertions, 0 deletions
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..2b55c2c
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2018 Tweag I/O, NovaDiscovery
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE. \ No newline at end of file
diff --git a/examples/example-radon/ExampleRadon.hs b/examples/example-radon/ExampleRadon.hs
new file mode 100644
index 0000000..1695e6b
--- /dev/null
+++ b/examples/example-radon/ExampleRadon.hs
@@ -0,0 +1,169 @@
+{-# LANGUAGE DataKinds #-}
+{-# LANGUAGE DeriveAnyClass #-}
+{-# LANGUAGE DeriveGeneric #-}
+{-# LANGUAGE DuplicateRecordFields #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE OverloadedLabels #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE TypeApplications #-}
+{-# LANGUAGE TupleSections #-}
+{-# LANGUAGE Arrows #-}
+{-# OPTIONS_GHC -Wwarn -Wno-missing-signatures -Wno-name-shadowing #-}
+
+-- This example is loosely based on the series of blog posts by Thomas Wiecki
+-- https://twiecki.io/blog/2014/03/17/bayesian-glms-3/ .
+
+import Control.Monad
+import Data.Aeson
+import qualified Data.Csv as Csv
+import Data.DocRecord
+import qualified Data.Text as T
+import qualified Data.Vector as V
+import Data.Functor
+import GHC.Generics
+import Porcupine
+import Prelude hiding (id, (.))
+import qualified Control.Foldl as L
+import Graphics.Vega.VegaLite as VL
+import Control.Monad.Bayes.Class
+import Control.Monad.Bayes.Sampler
+import Control.Monad.Bayes.Weighted
+import Control.Monad.Bayes.Traced
+import Numeric.Log
+
+import Plotting -- In the same folder
+
+
+data RadonObservation = RadonObservation
+ { state :: !T.Text
+ , county :: !T.Text
+ , basement :: !T.Text
+ , log_radon :: !Double }
+ deriving (Generic, FromJSON, ToJSON
+ ,Csv.FromNamedRecord, Csv.ToNamedRecord, Csv.DefaultOrdered)
+
+-- | We want to read each RadonObservation as a set of Records. This supports
+-- reading from CSV files with headers and from JSON files. The Vector cannot
+-- directly be read from the CSV, as we would not known whether the columns are
+-- positional or nominal. This is why we use the 'Records' wrapper here (for
+-- nominal columns). This requires our datatype to instantiate
+-- Csv.From/ToNamedRecord
+radonObsSerials :: BidirSerials (V.Vector RadonObservation)
+radonObsSerials = dimap Records fromRecords $ -- We wrap/unwrap the Records
+ someBidirSerial (CSVSerial "csv" True ',')
+ <>
+ someBidirSerial JSONSerial
+
+radonObsFile :: DataSource (V.Vector RadonObservation)
+radonObsFile = dataSource ["data", "radon"] radonObsSerials
+
+filteredCsvFile :: DataSink (V.Vector RadonObservation)
+filteredCsvFile = dataSink ["debug", "radon-filtered"] radonObsSerials
+
+vegaliteSerials :: PureSerials VegaLite
+vegaliteSerials =
+ lmap VL.toHtml (somePureSerial $ PlainTextSerial $ Just "html")
+ <> lmap VL.fromVL (somePureSerial JSONSerial)
+
+writeViz name = writeData (dataSink ["viz", name] vegaliteSerials)
+
+data Summary = Summary { numRows :: Int
+ , uniqueStates :: [T.Text]
+ , numUniqueCounties :: Int }
+ deriving (Show)
+
+foldSummary :: L.Fold RadonObservation Summary
+foldSummary = Summary <$> L.length
+ <*> L.premap state L.nub
+ <*> (L.premap county L.nub <&> length)
+
+data ModelParams = ModelParams
+ { rateWithB :: Double -- ^ ratio of houses with and without basement
+ , radonWithB :: Double -- ^ radon level in houses with basement
+ , radonWithoutB :: Double -- ^ radon level in houses without basement
+ , noiseWithB :: Double -- ^ variation around radonWithB
+ , noiseWithoutB :: Double -- ^ variation around radonWithoutB
+ } deriving (Eq, Show, Generic, ToJSON, FromJSON)
+
+priorModel :: MonadSample m => m ModelParams
+priorModel =
+ ModelParams <$> uniform 0 1
+ <*> uniform 0 10
+ <*> uniform 0 10
+ <*> uniform 0 10
+ <*> uniform 0 10
+
+likelihood :: ModelParams -> (Bool, Double) -> Log Double
+likelihood params (hasBasement, radonObserved) =
+ case hasBasement of
+ True -> let radonModel = radonWithB params
+ noiseModel = noiseWithB params
+ rate = realToFrac $ rateWithB params
+ in rate * normalPdf radonModel noiseModel radonObserved
+ False -> let radonModel = radonWithoutB params
+ noiseModel = noiseWithoutB params
+ rate = realToFrac $ 1 - rateWithB params
+ in rate * normalPdf radonModel noiseModel radonObserved
+
+model :: MonadInfer m => [(Bool, Double)] -> m ModelParams
+model observations = do
+ params <- priorModel
+ mapM_ (score . likelihood params) observations
+ return params
+
+posteriorForward :: MonadSample m => m ModelParams -> m (Bool, Double)
+posteriorForward model = do
+ params <- model
+ hasBasement <- bernoulli (rateWithB params)
+ value <- case hasBasement of
+ True -> normal (radonWithB params) (noiseWithB params)
+ False -> normal (radonWithoutB params) (noiseWithoutB params)
+ return (hasBasement, value)
+
+sampleFlatLinRegModel :: (LogThrow m) => PTask m () ()
+sampleFlatLinRegModel = proc () -> do
+ radonObs <- loadData radonObsFile -< ()
+ writeData filteredCsvFile -< radonObs
+ let (summary,xs,ys) = flip L.fold radonObs $
+ (,,) <$> foldSummary
+ <*> L.premap ((== "Y") . basement) L.list
+ <*> L.premap log_radon L.list
+ xLbl = "has basement"
+ yLbl = "log radon"
+ logInfo -< show summary
+
+ vizSize <- getOption ["viz", "options"]
+ (docField @"vizSize" (400,400) "Width & height of visualisations") -< ()
+ writeViz "1" -< plot vizSize
+ (S $ scatter2 xLbl yLbl (-3,5))
+ (Cols [(xLbl, VL.Booleans xs)
+ ,(yLbl, VL.Numbers ys)])
+ nsamples <- getOption ["sampling", "options"]
+ (docField @"nsamples" 5000 "Number of samples to draw") -< ()
+ samples <- ioTask -<
+ sampleIOfixed $ prior $ mh nsamples $ model (zip xs ys)
+ writeViz "2" -< plot vizSize
+ (H [[density2DPlot "radonWithB" "radonWithoutB" (0,2) (0,2)]
+ ,[density2DPlot "noiseWithB" "noiseWithoutB" (0,2) (0,2)]])
+ (J samples)
+
+ samples <- ioTask -<
+ sampleIOfixed $ prior $ mh nsamples $ posteriorForward $ model (zip xs ys)
+ let (xModel, yModel) = unzip samples
+ writeViz "3" -< plot vizSize
+ (S $ scatter2 xLbl yLbl (-3,5))
+ (Cols [(xLbl, VL.Booleans xModel)
+ ,(yLbl, VL.Numbers yModel)])
+
+
+runIn topdir = runPipelineTask
+ (FullConfig "example-radon" -- Name of the executable (for --help)
+ "example-radon.yaml" -- Default config file path
+ topdir -- Default root directory for mappings
+ ())
+ (baseContexts "")
+ sampleFlatLinRegModel ()
+
+main :: IO ()
+main = runIn "examples/example-radon"
diff --git a/examples/example-radon/Plotting.hs b/examples/example-radon/Plotting.hs
new file mode 100644
index 0000000..f4522da
--- /dev/null
+++ b/examples/example-radon/Plotting.hs
@@ -0,0 +1,79 @@
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE ExistentialQuantification #-}
+
+module Plotting where
+
+import Data.Aeson
+import Graphics.Vega.VegaLite
+import Data.Text (Text, pack)
+
+
+barPlot :: Text -> VLSpec
+barPlot xName =
+ let enc = encoding
+ . position X [PName xName, PmType Nominal, PAxis [AxGrid True, AxTitle xName]]
+ . position Y [PName "binnedData", PAggregate Count, PmType Quantitative, PAxis [AxGrid False, AxTitle "count"]]
+ in asSpec $ [mark Bar [MOpacity 1.0, MColor "#a3c6de"], enc []]
+
+barPlot2 :: Text -> Text -> VLSpec
+barPlot2 xName yName =
+ let enc = encoding
+ . position X [PName xName, PmType Nominal, PAxis [AxGrid True, AxTitle xName]]
+ . position Y [PName yName, PmType Quantitative, PAxis [AxGrid False, AxTitle yName]]
+ in asSpec $ [mark Bar [MOpacity 1.0, MColor "#a3c6de"], enc []]
+
+linePlot :: Text -> Text -> VLSpec
+linePlot xName yName =
+ let enc = encoding
+ . position X [PName xName, PmType Quantitative, PAxis [AxGrid True, AxTitle xName]]
+ . position Y [PName yName, PmType Quantitative, PAxis [AxGrid False, AxTitle yName]]
+ in asSpec $ [mark Line [MColor "green"], enc []]
+
+density2DPlot :: Text -> Text -> (Double, Double) -> (Double, Double) -> VLSpec
+density2DPlot xName yName (xmin, xmax) (ymin, ymax) =
+ let enc = encoding
+ . position X [PName xName
+ ,PScale [SDomain (DNumbers [xmin, xmax])]
+ ,PBin [Step 0.1], PmType Quantitative, PAxis [AxGrid True, AxTitle xName]]
+ . position Y [PName yName
+ ,PScale [SDomain (DNumbers [ymin, ymax])]
+ ,PBin [Step 0.1]
+ ,PmType Quantitative
+ ,PAxis [AxGrid True, AxTitle yName]]
+ . color [ MAggregate Count, MName "col", MmType Quantitative
+ , MScale [{-SReverse False,-} SScheme "blues" [0.0, 1.0]]]
+ in asSpec $ [mark Rect [MClip True], enc []]
+
+scatter2 :: Text -> Text -> (Double, Double) -> VLSpec
+scatter2 xName yName (ymin, ymax) =
+ let enc = encoding
+ . position X [PName xName, PmType Nominal, PAxis [AxGrid True, AxTitle xName]]
+ . position Y [PName yName
+ , PScale [SDomain (DNumbers [ymin, ymax])]
+ , PmType Quantitative
+ , PAxis [AxGrid True, AxTitle yName]]
+ in asSpec $ [mark Tick [MClip True], enc []]
+
+data SpecGrid = H [[VLSpec]] | V [[VLSpec]] | L [VLSpec] | S VLSpec
+
+data InputData = Cols [(Text, DataValues)]
+ | forall j. (ToJSON j) => J j
+ | File FilePath
+
+plot :: (Double, Double) -> SpecGrid -> InputData -> VegaLite
+plot (figw,figh) gridOfLayers dat =
+ let desc = description "Plot"
+ dat' = case dat of
+ Cols cols -> foldl (.) (dataFromColumns []) (map (uncurry dataColumn) cols) []
+ J o -> dataFromJson (toJSON o) []
+ File fp -> dataFromSource (pack fp) []
+ conf = configure
+ -- . configuration (Axis [ DomainWidth 1 ])
+ . configuration (SelectionStyle [ ( Single, [ On "dblclick" ] ) ])
+ . configuration (View [ViewStroke (Just "transparent")])
+ spec = case gridOfLayers of
+ S l -> layer [l]
+ L ls -> layer ls
+ H lss -> hConcat (map (asSpec . (:[]) . layer) lss)
+ V lss -> vConcat (map (asSpec . (:[]) . layer) lss)
+ in toVegaLite [width figw, height figh, conf [], desc, dat', spec]
diff --git a/examples/example0.1/Example0_1.hs b/examples/example0.1/Example0_1.hs
new file mode 100644
index 0000000..517921f
--- /dev/null
+++ b/examples/example0.1/Example0_1.hs
@@ -0,0 +1,36 @@
+{-# LANGUAGE DataKinds #-}
+{-# LANGUAGE TypeApplications #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE GADTs #-}
+{-# LANGUAGE Arrows #-}
+
+import Data.DocRecord
+import qualified Data.Text.Lazy as T
+import Porcupine
+import Prelude hiding (id, (.))
+
+
+yzCompress :: T.Text -> T.Text
+yzCompress = T.concat . map counts . T.group
+ where
+ counts s = T.pack (show (T.length s)) <> T.take 1 s <> ","
+
+resultFile :: DataSink T.Text
+resultFile = dataSink ["result"] $
+ somePureSerial (PlainTextSerial (Just "txt"))
+ <> lmap yzCompress
+ (somePureSerial (PlainTextSerial (Just "yz")))
+
+myTask :: (LogThrow m) => PTask m () ()
+myTask = proc () -> do
+ (FV chars :& FV nums :& _) <-
+ getOptions ["options"]
+ ( docField @"chars" "a" "The chars to repeat"
+ :& docField @"replications" [10::Int] "The numbers of replications"
+ :& RNil) -< ()
+ let txt = T.concat $
+ zipWith (\s n -> T.replicate (fromIntegral n) (T.singleton s)) chars nums
+ writeData resultFile -< txt
+
+main :: IO ()
+main = runLocalPipelineTask (FullConfig "example0.1" "example0_1.yaml" "." ()) myTask ()
diff --git a/examples/example0/Example0.hs b/examples/example0/Example0.hs
new file mode 100644
index 0000000..a8e74c3
--- /dev/null
+++ b/examples/example0/Example0.hs
@@ -0,0 +1,30 @@
+{-# LANGUAGE DataKinds #-}
+{-# LANGUAGE TypeApplications #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE GADTs #-}
+{-# LANGUAGE Arrows #-}
+
+import Data.DocRecord
+import qualified Data.Text.Lazy as T
+import Porcupine
+
+
+resultFile :: DataSink T.Text
+resultFile = dataSink ["result"] $
+ somePureSerial (PlainTextSerial (Just "txt"))
+
+myTask :: (LogThrow m) => PTask m () ()
+myTask = proc () -> do
+ (FV char :& FV num :& _) <- getMyOptions -< ()
+ let txt = T.replicate (fromIntegral num) (T.singleton char)
+ writeData resultFile -< txt
+ where
+ getMyOptions =
+ getOptions ["options"]
+ ( docField @"char" 'a' "The character to repeat"
+ :& docField @"replications" (10::Int) "The number of replications"
+ :& RNil)
+
+main :: IO ()
+-- main = simpleRunPTask myTask ()
+main = runLocalPipelineTask (FullConfig "example0" "example0.yaml" "." ()) myTask ()
diff --git a/examples/example1/Example1.hs b/examples/example1/Example1.hs
new file mode 100644
index 0000000..b81df38
--- /dev/null
+++ b/examples/example1/Example1.hs
@@ -0,0 +1,75 @@
+{-# LANGUAGE DataKinds #-}
+{-# LANGUAGE DeriveGeneric #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE TypeApplications #-}
+
+import Data.Aeson
+import Data.DocRecord
+import qualified Data.HashMap.Strict as HM
+import qualified Data.Text as T
+import GHC.Generics
+import Porcupine
+
+
+data User = User { userName :: T.Text
+ , userSurname :: T.Text
+ , userAge :: Int }
+ deriving (Generic)
+instance FromJSON User
+
+newtype Analysis = Analysis { numLetters :: HM.HashMap Char Int }
+ deriving (Generic)
+instance ToJSON Analysis
+
+-- | How to load users
+userFile :: DataSource User
+userFile = dataSource ["Inputs", "User"]
+ (somePureDeserial JSONSerial)
+
+-- | How to write analysis
+analysisFile :: DataSink Analysis
+analysisFile = dataSink ["Outputs", "Analysis"]
+ (somePureSerial JSONSerial)
+
+-- | The simple computation we want to perform
+computeAnalysis :: User -> Analysis
+computeAnalysis (User name surname _) = Analysis $
+ HM.fromListWith (+) $ [(c,1) | c <- T.unpack name]
+ ++ [(c,1) | c <- T.unpack surname]
+
+-- | The task combining the three previous operations.
+--
+-- This task may look very opaque from the outside, having no parameters and no
+-- return value. But we will be able to reuse it over different users without
+-- having to change it at all.
+analyseOneUser :: (LogThrow m) => PTask m () ()
+analyseOneUser =
+ loadData userFile >>> arr computeAnalysis >>> writeData analysisFile
+
+mainTask :: (LogThrow m) => PTask m () ()
+mainTask =
+ -- First we get the ids of the users that we want to analyse. We need only one
+ -- field that will contain a range of values, see IndexRange. By default, this
+ -- range contains just one value, zero.
+ getOption ["Settings"] (docField @"users" (oneIndex (0::Int)) "The user ids to load")
+ -- We turn the range we read into a full lazy list:
+ >>> arr enumTRIndices
+ -- Then we just map over these ids and call analyseOneUser each time:
+ >>> parMapTask_ "userId" analyseOneUser
+
+main :: IO ()
+main = runPipelineTask (FullConfig "example1" "porcupine-example1.yaml" "porcupine-core/examples/example1/data" ())
+ -- The CLI/Yaml configuration to use (prog name,
+ -- default config file to create, and default root to
+ -- use for the porcupine tree)
+ (baseContexts "")
+ -- The contexts to use. 'baseContexts' is the
+ -- minimum. It gives out katip logging and local files
+ -- access (through ResourceT). The string param is the
+ -- top namespace for the logger. When we use
+ -- FullConfig (and therefore CLI), the progName for
+ -- the CLI given above ("example1") will be inherited
+ -- by the logger, so we can leave it blank
+ mainTask ()
diff --git a/examples/example2/Example2.hs b/examples/example2/Example2.hs
new file mode 100644
index 0000000..e9e043c
--- /dev/null
+++ b/examples/example2/Example2.hs
@@ -0,0 +1,78 @@
+{-# LANGUAGE DataKinds #-}
+{-# LANGUAGE DeriveGeneric #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE TypeApplications #-}
+
+import Data.Aeson
+import qualified Data.Text as T
+import GHC.Generics
+import Porcupine
+
+import Prelude hiding (id, (.))
+
+
+-- This example uses the porcupine to read a data that represents the evloution of a given stock in given data and
+-- gives back the average and standard deviation of the stock on that date.
+
+data Stockdaily = Stockdaily {date :: String , close :: Double}
+ deriving (Generic)
+instance FromJSON Stockdaily
+
+newtype Stock = Stock { chart :: [Stockdaily] }
+ deriving (Generic)
+instance FromJSON Stock
+
+getCloseStock :: Stock -> [Double]
+getCloseStock s = map close (chart s)
+
+-- | How to load Stock prices
+stockFile :: DataSource Stock
+stockFile = dataSource ["Inputs", "Stock"]
+ (somePureDeserial JSONSerial)
+
+-- | How to write the smoothed stock prices
+globalMatrix :: DataSink (Tabular [[Double]])
+globalMatrix = dataSink ["Outputs" , "globalData"]
+ (somePureSerial (CSVSerial (T.pack "csv") False ','))
+
+avg :: [Double] -> Double
+avg list = let s = sum list
+ n = fromIntegral (length list)
+ in s/n
+
+msliding :: Int -> [a] -> [[a]]
+msliding n p = case p of
+ [] -> []
+ (_:xs) -> [take n p] ++ (msliding n xs)
+
+-- | The simple computation we want to perform
+computeSmoothedCurve :: Stock -> [Double]
+computeSmoothedCurve s = curve
+ where
+ price = getCloseStock s
+ curve = map avg (msliding 10 price)
+
+analyseStocks :: (LogThrow m) => PTask m () ()
+analyseStocks =
+ arr (const ["aapl"::TRIndex, "fb" , "googl"]) -- We want the stocks for some
+ -- fixed set of companies
+ >>> loadDataList "company" stockFile
+ >>> arr (Tabular Nothing . map (\(_idx,stock) -> computeSmoothedCurve stock))
+ >>> writeData globalMatrix
+
+main :: IO ()
+main = runPipelineTask (FullConfig "example2" "porcupine-example2.yaml" "porcupine-core/examples/example2/data" ())
+ -- The CLI/Yaml configuration to use (prog name,
+ -- default config file to create, and default root to
+ -- use for the porcupine tree)
+ (baseContexts "")
+ -- The contexts to use. 'baseContexts' is the
+ -- minimum. It gives out katip logging and local files
+ -- access (through ResourceT). The string param is the
+ -- top namespace for the logger. When we use
+ -- FullConfig (and therefore CLI), the progName for
+ -- the CLI given above ("example1") will be inherited
+ -- by the logger, so we can leave it blank
+ analyseStocks ()
diff --git a/porcupine-core.cabal b/porcupine-core.cabal
new file mode 100644
index 0000000..8402259
--- /dev/null
+++ b/porcupine-core.cabal
@@ -0,0 +1,440 @@
+cabal-version: 1.12
+
+-- This file has been generated from package.yaml by hpack version 0.32.0.
+--
+-- see: https://github.com/sol/hpack
+--
+-- hash: f4672c1d7de06695f07c27b5c5621ceffb7e30abb9469d2c87f60c0dbf8a6fff
+
+name: porcupine-core
+version: 0.1.0.0
+synopsis: Express portable, composable and reusable data tasks and pipelines
+description: porcupine is centered around the PTask datatype, which represents a
+ computation that will request access to some resources (both in read and write
+ modes) and require some options (parameters, with docstrings and default
+ values). PTasks are composable both sequentially and in parallel, into a
+ pipeline of tasks. The resources and parameters are organized in a tree which
+ will be automatically exposed to the outside world. This makes the pipeline
+ self-documented, and makes it so any option or file required at some point by
+ any task can be visualized and set/remapped (via a combination of a YAML/JSON
+ config file and command-line arguments) before the pipeline will run. That
+ means that the PTasks are completely agnostic of their data inputs, and that
+ new data sources can be added without having to change any of the tasks' logic
+ or even their types. This is done via the LocationAccessor
+ typeclass. `porcupine-core` provides only access to local files (via
+ resourcet), other location accessors will be in separate packages. See for
+ instance the <https://hackage.haskell.org/package/porcupine-http
+ `porcupine-http`> package to access HTTP urls. PTasks also provide caching
+ thanks to the funflow package. See the README at
+ <https://github.com/tweag/porcupine#README.md> and the examples in
+ `porcupine-core` package.
+category: Data, Arrows, Combinators, Control, Pipes, Streaming, Numerical, JSON
+homepage: https://github.com/tweag/porcupine#readme
+bug-reports: https://github.com/tweag/porcupine/issues
+maintainer: Yves Parès <yves.pares@tweag.io>
+copyright: 2018 EURL Tweag, NovaDiscovery
+license: MIT
+license-file: LICENSE
+build-type: Simple
+
+source-repository head
+ type: git
+ location: https://github.com/tweag/porcupine
+
+library
+ exposed-modules:
+ Control.Arrow.FoldA
+ Data.Locations
+ Data.Locations.Accessors
+ Data.Locations.FunflowRemoteCache
+ Data.Locations.Loc
+ Data.Locations.LocationTree
+ Data.Locations.LocVariable
+ Data.Locations.LogAndErrors
+ Data.Locations.Mappings
+ Data.Locations.SerializationMethod
+ Data.Locations.VirtualFile
+ Data.Representable
+ Porcupine
+ Porcupine.Foldl
+ Porcupine.Run
+ Porcupine.Serials
+ Porcupine.Tasks
+ Porcupine.VFiles
+ Streaming.TaskPipelineUtils
+ System.ClockHelpers
+ System.TaskPipeline
+ System.TaskPipeline.Caching
+ System.TaskPipeline.CLI
+ System.TaskPipeline.ConfigurationReader
+ System.TaskPipeline.Logger
+ System.TaskPipeline.Options
+ System.TaskPipeline.PorcupineTree
+ System.TaskPipeline.PTask
+ System.TaskPipeline.PTask.Internal
+ System.TaskPipeline.Repetition
+ System.TaskPipeline.Repetition.Foldl
+ System.TaskPipeline.Repetition.Internal
+ System.TaskPipeline.Repetition.Streaming
+ System.TaskPipeline.Run
+ System.TaskPipeline.VirtualFileAccess
+ other-modules:
+ Paths_porcupine_core
+ hs-source-dirs:
+ src
+ ghc-options: -Wall
+ build-depends:
+ aeson
+ , aeson-pretty
+ , attoparsec
+ , base >=4.10 && <5
+ , binary
+ , binary-orphans
+ , bytestring
+ , cassava
+ , clock
+ , conduit
+ , containers
+ , contravariant
+ , data-default
+ , deepseq
+ , directory
+ , docrecords ==0.1.*
+ , filepath
+ , foldl
+ , formatting
+ , funflow >=1.5.0
+ , hashable
+ , katip ==0.8.*
+ , lens
+ , monad-control
+ , mtl
+ , optparse-applicative
+ , path
+ , profunctors
+ , reader-soup ==0.1.*
+ , resourcet
+ , safe-exceptions
+ , store
+ , streaming
+ , streaming-bytestring
+ , streaming-conduit
+ , streaming-utils
+ , template-haskell
+ , temporary
+ , text
+ , transformers
+ , transformers-base
+ , unix
+ , unliftio-core
+ , unordered-containers
+ , url
+ , vector
+ , vinyl
+ , yaml
+ , zlib
+ default-language: Haskell2010
+
+executable example-radon
+ main-is: ExampleRadon.hs
+ other-modules:
+ Plotting
+ Paths_porcupine_core
+ hs-source-dirs:
+ examples/example-radon
+ ghc-options: -Wall
+ build-depends:
+ aeson
+ , aeson-pretty
+ , attoparsec
+ , base >=4.10 && <5
+ , binary
+ , binary-orphans
+ , bytestring
+ , cassava
+ , clock
+ , conduit
+ , containers
+ , contravariant
+ , data-default
+ , deepseq
+ , directory
+ , docrecords
+ , filepath
+ , foldl
+ , formatting
+ , funflow >=1.5.0
+ , hashable
+ , hvega
+ , katip ==0.8.*
+ , lens
+ , log-domain
+ , monad-bayes
+ , monad-control
+ , mtl
+ , optparse-applicative
+ , path
+ , porcupine-core
+ , profunctors
+ , reader-soup ==0.1.*
+ , resourcet
+ , safe-exceptions
+ , store
+ , streaming
+ , streaming-bytestring
+ , streaming-conduit
+ , streaming-utils
+ , template-haskell
+ , temporary
+ , text
+ , transformers
+ , transformers-base
+ , unix
+ , unliftio-core
+ , unordered-containers
+ , url
+ , vector
+ , vinyl
+ , yaml
+ , zlib
+ default-language: Haskell2010
+
+executable example0
+ main-is: Example0.hs
+ other-modules:
+ Paths_porcupine_core
+ hs-source-dirs:
+ examples/example0
+ ghc-options: -Wall
+ build-depends:
+ aeson
+ , aeson-pretty
+ , attoparsec
+ , base >=4.10 && <5
+ , binary
+ , binary-orphans
+ , bytestring
+ , cassava
+ , clock
+ , conduit
+ , containers
+ , contravariant
+ , data-default
+ , deepseq
+ , directory
+ , docrecords ==0.1.*
+ , filepath
+ , foldl
+ , formatting
+ , funflow >=1.5.0
+ , hashable
+ , katip ==0.8.*
+ , lens
+ , monad-control
+ , mtl
+ , optparse-applicative
+ , path
+ , porcupine-core
+ , profunctors
+ , reader-soup ==0.1.*
+ , resourcet
+ , safe-exceptions
+ , store
+ , streaming
+ , streaming-bytestring
+ , streaming-conduit
+ , streaming-utils
+ , template-haskell
+ , temporary
+ , text
+ , transformers
+ , transformers-base
+ , unix
+ , unliftio-core
+ , unordered-containers
+ , url
+ , vector
+ , vinyl
+ , yaml
+ , zlib
+ default-language: Haskell2010
+
+executable example0.1
+ main-is: Example0_1.hs
+ other-modules:
+ Paths_porcupine_core
+ hs-source-dirs:
+ examples/example0.1
+ ghc-options: -Wall
+ build-depends:
+ aeson
+ , aeson-pretty
+ , attoparsec
+ , base >=4.10 && <5
+ , binary
+ , binary-orphans
+ , bytestring
+ , cassava
+ , clock
+ , conduit
+ , containers
+ , contravariant
+ , data-default
+ , deepseq
+ , directory
+ , docrecords ==0.1.*
+ , filepath
+ , foldl
+ , formatting
+ , funflow >=1.5.0
+ , hashable
+ , katip ==0.8.*
+ , lens
+ , monad-control
+ , mtl
+ , optparse-applicative
+ , path
+ , porcupine-core
+ , profunctors
+ , reader-soup ==0.1.*
+ , resourcet
+ , safe-exceptions
+ , store
+ , streaming
+ , streaming-bytestring
+ , streaming-conduit
+ , streaming-utils
+ , template-haskell
+ , temporary
+ , text
+ , transformers
+ , transformers-base
+ , unix
+ , unliftio-core
+ , unordered-containers
+ , url
+ , vector
+ , vinyl
+ , yaml
+ , zlib
+ default-language: Haskell2010
+
+executable example1
+ main-is: Example1.hs
+ other-modules:
+ Paths_porcupine_core
+ hs-source-dirs:
+ examples/example1
+ ghc-options: -Wall
+ build-depends:
+ aeson
+ , aeson-pretty
+ , attoparsec
+ , base >=4.10 && <5
+ , binary
+ , binary-orphans
+ , bytestring
+ , cassava
+ , clock
+ , conduit
+ , containers
+ , contravariant
+ , data-default
+ , deepseq
+ , directory
+ , docrecords ==0.1.*
+ , filepath
+ , foldl
+ , formatting
+ , funflow >=1.5.0
+ , hashable
+ , katip ==0.8.*
+ , lens
+ , monad-control
+ , mtl
+ , optparse-applicative
+ , path
+ , porcupine-core
+ , profunctors
+ , reader-soup ==0.1.*
+ , resourcet
+ , safe-exceptions
+ , store
+ , streaming
+ , streaming-bytestring
+ , streaming-conduit
+ , streaming-utils
+ , template-haskell
+ , temporary
+ , text
+ , transformers
+ , transformers-base
+ , unix
+ , unliftio-core
+ , unordered-containers
+ , url
+ , vector
+ , vinyl
+ , yaml
+ , zlib
+ default-language: Haskell2010
+
+executable example2
+ main-is: Example2.hs
+ other-modules:
+ Paths_porcupine_core
+ hs-source-dirs:
+ examples/example2
+ ghc-options: -Wall
+ build-depends:
+ aeson
+ , aeson-pretty
+ , attoparsec
+ , base >=4.10 && <5
+ , binary
+ , binary-orphans
+ , bytestring
+ , cassava
+ , clock
+ , conduit
+ , containers
+ , contravariant
+ , data-default
+ , deepseq
+ , directory
+ , docrecords ==0.1.*
+ , filepath
+ , foldl
+ , formatting
+ , funflow >=1.5.0
+ , hashable
+ , katip ==0.8.*
+ , lens
+ , monad-control
+ , mtl
+ , optparse-applicative
+ , path
+ , porcupine-core
+ , profunctors
+ , reader-soup ==0.1.*
+ , resourcet
+ , safe-exceptions
+ , store
+ , streaming
+ , streaming-bytestring
+ , streaming-conduit
+ , streaming-utils
+ , template-haskell
+ , temporary
+ , text
+ , transformers
+ , transformers-base
+ , unix
+ , unliftio-core
+ , unordered-containers
+ , url
+ , vector
+ , vinyl
+ , yaml
+ , zlib
+ default-language: Haskell2010
diff --git a/src/Control/Arrow/FoldA.hs b/src/Control/Arrow/FoldA.hs
new file mode 100644
index 0000000..3916167
--- /dev/null
+++ b/src/Control/Arrow/FoldA.hs
@@ -0,0 +1,189 @@
+{-# LANGUAGE Arrows #-}
+{-# LANGUAGE BangPatterns #-}
+{-# LANGUAGE ExistentialQuantification #-}
+
+-- | This module defines the type 'FoldA', which is a generalization of 'Fold'
+-- from the `foldl` package.
+--
+-- This module intentionally doesn't provide a function to run the FoldA over a
+-- Foldable in any arrow, because that function may depend on the arrow.
+module Control.Arrow.FoldA
+ ( module Control.Foldl
+ , FoldA(..)
+ , FoldA'
+ , Pair(..)
+ , arrowFold
+ , arrowFold_
+ , generalizeA
+ , generalizeA_
+ , specializeA
+ , premapA
+ , premapInitA
+ , postmapA
+ , prefilterA
+
+ , toTup
+ , fromTup
+ , (&&&&)
+ , (****)
+ ) where
+
+import Prelude hiding ((.), id)
+
+import Control.Arrow
+import Control.Category
+import Control.Foldl
+import Data.Profunctor
+
+
+ret :: (Arrow arr) => t -> arr a t
+ret = arr . const
+{-# INLINE ret #-}
+
+data Pair a b = Pair !a !b
+
+toTup :: Pair a b -> (a,b)
+toTup (Pair x y) = (x,y)
+{-# INLINE toTup #-}
+
+fromTup :: (a,b) -> Pair a b
+fromTup (x,y) = Pair x y
+{-# INLINE fromTup #-}
+
+applyP :: Pair (a -> b) a -> b
+applyP (Pair f x) = f x
+{-# INLINE applyP #-}
+
+uncurryP :: (a -> b -> c) -> Pair a b -> c
+uncurryP f (Pair x y) = f x y
+{-# INLINE uncurryP #-}
+
+(****) :: (Arrow a) => a b c -> a b' c' -> a (Pair b b') (Pair c c')
+a1 **** a2 =
+ arr toTup >>> (a1 *** a2) >>> arr fromTup
+{-# INLINE (****) #-}
+
+(&&&&) :: (Arrow a) => a b c -> a b c' -> a b (Pair c c')
+a1 &&&& a2 = a1 &&& a2 >>> arr fromTup
+{-# INLINE (&&&&) #-}
+
+secondP :: (Arrow a) => a c c' -> a (Pair b c) (Pair b c')
+secondP ar =
+ arr toTup >>> second ar >>> arr fromTup
+
+
+-- | This is a generalization of 'Control.Foldl.Fold' that
+-- allows computing on arrows.
+--
+-- 'FoldA (->) ()' is isomorphic to 'Fold' as testified
+-- by 'generalizeA' and 'specializeA'.
+--
+-- 'FoldA (Kleisly m) ()' is isomorphic to 'FoldM m'.
+--
+-- We must keep an extra type parameter @i@ that will allow to initialize the
+-- accumulator, else we would need 'ArrowApply' everytime we want to create a
+-- 'FoldA' from an accumulator that will be computed by a previous arrow
+-- computation.
+data FoldA arr i a b =
+ forall x. FoldA (arr (Pair x a) x) (arr i x) (arr x b)
+
+-- | A fold that will directly receive its initial accumulator
+type FoldA' arr a b = FoldA arr b a b
+
+-- | Turns a function that returns a 'Fold' into a 'FoldA' that will feed the
+-- initializer
+generalizeA :: (Arrow arr) => (i -> Fold a b) -> FoldA arr i a b
+generalizeA f =
+ FoldA (arr $ \(Pair (Fold step !acc done) x) ->
+ Fold step (step acc x) done)
+ (arr f)
+ (arr $ \(Fold _ acc done) -> done acc)
+
+-- | Turns a 'Fold' into a 'FoldA' that just ignores its initializer
+generalizeA_ :: (Arrow arr) => Fold a b -> FoldA arr i a b
+generalizeA_ (Fold step start done) =
+ FoldA (arr $ uncurryP step) (ret start) (arr done)
+
+-- | Turns a 'FoldA' over pure function into a pure 'Fold'
+specializeA :: FoldA (->) () a b -> Fold a b
+specializeA (FoldA step start done) =
+ Fold (\x a -> step $ Pair x a) (start ()) done
+
+instance (Arrow arr) => Functor (FoldA arr i a) where
+ fmap f (FoldA step start done) = FoldA step start done'
+ where
+ done' = done >>> arr (f $!)
+ {-# INLINE fmap #-}
+
+instance (Arrow arr) => Applicative (FoldA arr i a) where
+ pure x = FoldA (ret ()) (ret ()) (ret x)
+ {-# INLINE pure #-}
+
+ FoldA stepL startL doneL <*> FoldA stepR startR doneR =
+ let step =
+ arr (\(Pair (Pair xL xR) a) ->
+ Pair (Pair xL a) (Pair xR a))
+ >>> (stepL **** stepR)
+ start = startL &&&& startR
+ done = (doneL **** doneR) >>> arr applyP
+ in FoldA step start done
+ {-# INLINE (<*>) #-}
+
+instance (Arrow arr) => Profunctor (FoldA arr i) where
+ rmap = fmap
+ lmap f (FoldA step start done) = FoldA step' start done
+ where
+ step' = arr (\(Pair x a) -> Pair x (f a)) >>> step
+ {-# INLINE lmap #-}
+
+-- | Changes the type initializing the accumulator
+premapInitA :: (Arrow arr)
+ => arr i' i -> FoldA arr i a b -> FoldA arr i' a b
+premapInitA ar (FoldA step start done) =
+ FoldA step (ar >>> start) done
+
+-- | Changes all the inputs arriving to the 'FoldA'
+premapA :: (Arrow arr)
+ => arr a b -> FoldA arr i b r -> FoldA arr i a r
+premapA ar (FoldA step start done) =
+ FoldA (secondP ar >>> step) start done
+{-# INLINABLE premapA #-}
+
+-- | Changes the output of the 'FoldA'
+postmapA :: (Category arr)
+ => FoldA arr i a b -> arr b r -> FoldA arr i a r
+postmapA (FoldA step start done) ar =
+ FoldA step start (done >>> ar)
+{-# INLINABLE postmapA #-}
+
+prefilterA :: (ArrowChoice arr)
+ => arr a Bool -> FoldA arr i a r -> FoldA arr i a r
+prefilterA fltr (FoldA step start done) =
+ FoldA (proc (Pair x a) -> do
+ b <- fltr -< a
+ if b
+ then step -< Pair x a
+ else returnA -< x)
+ start
+ done
+{-# INLINABLE prefilterA #-}
+
+-- | Creates a 'FoldA' from an arrow computation.
+arrowFold :: (Arrow a)
+ => a (acc,input) acc -- ^ The folding task
+ -> FoldA' a input acc
+arrowFold step =
+ FoldA (arr onInput >>> step) id id
+ where
+ onInput (Pair acc x) = (acc,x)
+
+-- | Creates a 'FoldA' that will never alter its accumulator's initial value,
+-- just pass it around
+arrowFold_ :: (Arrow a)
+ => a (acc,input) ()
+ -> FoldA a acc input ()
+arrowFold_ task =
+ rmap (const ()) $
+ arrowFold $ proc (acc,input) -> do
+ task -< (acc,input)
+ returnA -< acc
diff --git a/src/Data/Locations.hs b/src/Data/Locations.hs
new file mode 100644
index 0000000..a1a5a6b
--- /dev/null
+++ b/src/Data/Locations.hs
@@ -0,0 +1,17 @@
+module Data.Locations
+ ( module Data.Locations.Loc
+ , module Data.Locations.LocationTree
+ , module Data.Locations.LocVariable
+ , module Data.Locations.Mappings
+ , module Data.Locations.VirtualFile
+ , module Data.Locations.SerializationMethod
+ , module Data.Locations.LogAndErrors
+ ) where
+
+import Data.Locations.Loc
+import Data.Locations.LocationTree
+import Data.Locations.LocVariable
+import Data.Locations.LogAndErrors
+import Data.Locations.Mappings
+import Data.Locations.SerializationMethod
+import Data.Locations.VirtualFile
diff --git a/src/Data/Locations/Accessors.hs b/src/Data/Locations/Accessors.hs
new file mode 100644
index 0000000..5642453
--- /dev/null
+++ b/src/Data/Locations/Accessors.hs
@@ -0,0 +1,381 @@
+{-# LANGUAGE DataKinds #-}
+{-# LANGUAGE DefaultSignatures #-}
+{-# LANGUAGE DeriveTraversable #-}
+{-# LANGUAGE ExistentialQuantification #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE GADTs #-}
+{-# LANGUAGE GeneralizedNewtypeDeriving #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE OverloadedLabels #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE PatternSynonyms #-}
+{-# LANGUAGE RankNTypes #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE TypeFamilies #-}
+{-# LANGUAGE TypeOperators #-}
+{-# OPTIONS_GHC "-fno-warn-incomplete-uni-patterns" #-}
+{-# OPTIONS_GHC "-fno-warn-missing-signatures" #-}
+{-# OPTIONS_GHC "-fno-warn-redundant-constraints" #-}
+
+module Data.Locations.Accessors
+ ( module Control.Monad.ReaderSoup.Resource
+ , FromJSON(..), ToJSON(..)
+ , LocationAccessor(..)
+ , LocOf, LocWithVarsOf
+ , SomeGLoc(..), SomeLoc, SomeLocWithVars
+ , SomeHashableLocs
+ , toHashableLocs
+ , FieldWithAccessors
+ , Rec(..), ElField(..)
+ , MayProvideLocationAccessors(..)
+ , SomeLocationAccessor(..)
+ , AvailableAccessors
+ , LocResolutionM
+ , BasePorcupineContexts
+ , (<--)
+ , baseContexts, baseContextsWithScribeParams
+ , pattern L
+ , splitAccessorsFromArgRec
+ , withParsedLocs, withParsedLocsWithVars, resolvePathToSomeLoc, resolveYamlDocToSomeLoc
+ , writeLazyByte, readLazyByte, readText, writeText
+ ) where
+
+import Control.Funflow.ContentHashable
+import Control.Lens (over, (^.), _1)
+import Control.Monad.IO.Unlift
+import Control.Monad.ReaderSoup
+import Control.Monad.ReaderSoup.Resource
+import Control.Monad.Trans.Resource
+import Data.Aeson
+import qualified Data.ByteString.Lazy as LBS
+import qualified Data.ByteString.Streaming as BSS
+import qualified Data.HashMap.Strict as HM
+import Data.Locations.Loc
+import Data.Locations.LogAndErrors
+import qualified Data.Text as T
+import qualified Data.Text.Encoding as TE
+import qualified Data.Text.Lazy as LT
+import qualified Data.Text.Lazy.Encoding as LTE
+import Data.Vinyl
+import Data.Vinyl.Functor
+import qualified Data.Yaml as Y
+import GHC.TypeLits
+import Katip
+import System.Directory (createDirectoryIfMissing,
+ createFileLink,
+ doesPathExist)
+import qualified System.FilePath as Path
+import qualified System.IO.Temp as Tmp
+import System.TaskPipeline.Logger
+
+
+-- | A location where no variables left to be instanciated.
+type LocOf l = GLocOf l String
+
+-- | A location that contains variables needing to be instanciated.
+type LocWithVarsOf l = GLocOf l StringWithVars
+
+-- | Creates some Loc type, indexed over a symbol (see ReaderSoup for how that
+-- symbol should be used), and equipped with functions to access it in some
+-- Monad
+class ( MonadMask m, MonadIO m
+ , TypedLocation (GLocOf l) )
+ => LocationAccessor m (l::Symbol) where
+
+ -- | Generalized location. The implementation is completely to the discretion
+ -- of the LocationAccessor, but it must be serializable in json, and it must
+ -- be able to contain "variable bits" (that will correspond for instance to
+ -- indices). These "variable bits" must be exposed through the parameter @a@
+ -- in @GLocOf l a@, and @GLocOf l@ must be a Traversable. @a@ will always be
+ -- an instance of 'IsLocString'. The rest of the implementation of
+ -- 'LocationAccessor' doesn't have to work in the most general case @GLocOf l
+ -- a@, as when all variables have been replaced by their final values, @a@ is
+ -- just @String@.
+ data GLocOf l :: * -> *
+
+ locExists :: LocOf l -> m Bool
+
+ writeBSS :: LocOf l -> BSS.ByteString m r -> m r
+
+ readBSS :: LocOf l -> (BSS.ByteString m () -> m b) -> m b
+
+ copy :: LocOf l -> LocOf l -> m ()
+ copy locFrom locTo = readBSS locFrom (writeBSS locTo)
+
+ withLocalBuffer :: (FilePath -> m a) -> LocOf l -> m a
+ -- If we have a local resource accessor, we use it:
+ default withLocalBuffer :: (MonadResource m)
+ => (FilePath -> m a) -> LocOf l -> m a
+ withLocalBuffer f loc =
+ Tmp.withSystemTempDirectory "pipeline-tools-tmp" writeAndUpload
+ where
+ writeAndUpload tmpDir = do
+ let tmpFile = tmpDir Path.</> "out"
+ res <- f tmpFile
+ _ <- readBSS (L (localFile tmpFile)) (writeBSS loc)
+ return res
+
+-- | Reifies an instance of LocationAccessor
+data SomeLocationAccessor m where
+ SomeLocationAccessor :: (KnownSymbol l, LocationAccessor m l)
+ => Label l -> SomeLocationAccessor m
+
+-- | This class is meant to be implemented by every label used in the reader
+-- soup. It tells whether this label provides LocationAccessors (usually zero or
+-- 1).
+class MayProvideLocationAccessors m l where
+ getLocationAccessors :: Label l -> [SomeLocationAccessor m]
+ default getLocationAccessors :: (KnownSymbol l, LocationAccessor m l)
+ => Label l -> [SomeLocationAccessor m]
+ getLocationAccessors x = [SomeLocationAccessor x]
+
+-- | By default, no accessor is provided
+instance {-# OVERLAPPABLE #-} MayProvideLocationAccessors m l where
+ getLocationAccessors _ = []
+
+-- | Packs together the args to run a context of the ReaderSoup, and if
+-- available, an instance of LocationAccessor
+type FieldWithAccessors m =
+ Compose ((,) [SomeLocationAccessor m]) ElField
+
+-- | Much like (=:) builds an ElField, (<--) builds a Field composed with
+-- LocationAccessors (if available)
+(<--) :: (KnownSymbol l, MayProvideLocationAccessors m l)
+ => Label l -> args -> FieldWithAccessors m (l:::args)
+lbl <-- args = Compose (getLocationAccessors lbl, lbl =: args)
+
+-- | All the LocationAccessors available to the system during a run, so that
+-- when we encounter an Aeson Value corresponding to some LocOf, we may try them
+-- all and use the first one that matches.
+newtype AvailableAccessors m = AvailableAccessors [SomeLocationAccessor m]
+
+-- | Retrieves the list of all available LocationAccessors
+--
+-- The ArgsForSoupConsumption constraint is redundant, but it is placed here to
+-- help type inference when using this function.
+splitAccessorsFromArgRec
+ :: (ArgsForSoupConsumption args)
+ => Rec (FieldWithAccessors (ReaderSoup (ContextsFromArgs args))) args
+ -> ( AvailableAccessors (ReaderSoup (ContextsFromArgs args))
+ , Rec ElField args )
+splitAccessorsFromArgRec = over _1 AvailableAccessors . rtraverse getCompose
+ -- `(,) a` is an Applicative if a is a Monoid, so this will merge all the lists
+ -- of SomeLocationAccessors
+
+-- * Making "resource" a LocationAccessor
+
+checkLocal :: String -> Loc -> (LocalFilePath -> p) -> p
+checkLocal _ (LocalFile fname) f = f fname
+checkLocal funcName loc _ = error $ funcName ++ ": location " ++ show loc ++ " isn't a LocalFile"
+
+-- | Accessing local resources
+instance (MonadResource m, MonadMask m) => LocationAccessor m "resource" where
+ newtype GLocOf "resource" a = L (URL a)
+ deriving (Functor, Foldable, Traversable, ToJSON, TypedLocation)
+ locExists (L l) = checkLocal "locExists" l $
+ liftIO . doesPathExist . (^. pathWithExtensionAsRawFilePath)
+ writeBSS (L l) body = checkLocal "writeBSS" l $ \path -> do
+ let raw = path ^. pathWithExtensionAsRawFilePath
+ liftIO $ createDirectoryIfMissing True (Path.takeDirectory raw)
+ BSS.writeFile raw body
+ readBSS (L l) f = checkLocal "readBSS" l $ \path ->
+ f $ BSS.readFile $ path ^. pathWithExtensionAsRawFilePath
+ withLocalBuffer f (L l) = checkLocal "withLocalBuffer" l $ \path ->
+ f $ path ^. pathWithExtensionAsRawFilePath
+ copy (L l1) (L l2) =
+ checkLocal "copy" l1 $ \path1 ->
+ checkLocal "copy (2nd argument)" l2 $ \path2 ->
+ liftIO $ createFileLink
+ (path1 ^. pathWithExtensionAsRawFilePath)
+ (path2 ^. pathWithExtensionAsRawFilePath)
+
+instance (MonadResource m, MonadMask m) => MayProvideLocationAccessors m "resource"
+
+instance (IsLocString a) => Show (GLocOf "resource" a) where
+ show (L l) = show l -- Not automatically derived to avoid the 'L' constructor
+ -- being added
+
+instance (IsLocString a) => FromJSON (GLocOf "resource" a) where
+ parseJSON v = do
+ loc <- parseJSON v
+ case loc of
+ LocalFile{} -> return $ L loc
+ _ -> fail "Isn't a local file"
+
+
+-- * Treating locations in a general manner
+
+-- | Some generalized location. Wraps a @GLocOf l a@ where @l@ is a
+-- 'LocationAccessor' in monad @m@.
+data SomeGLoc m a = forall l. (LocationAccessor m l) => SomeGLoc (GLocOf l a)
+
+instance Functor (SomeGLoc m) where
+ fmap f (SomeGLoc l) = SomeGLoc $ fmap f l
+instance Foldable (SomeGLoc m) where
+ foldMap f (SomeGLoc l) = foldMap f l
+instance Traversable (SomeGLoc m) where
+ traverse f (SomeGLoc l) = SomeGLoc <$> traverse f l
+
+type SomeLoc m = SomeGLoc m String
+type SomeLocWithVars m = SomeGLoc m StringWithVars
+
+instance Show (SomeLoc m) where
+ show (SomeGLoc l) = show l
+instance Show (SomeLocWithVars m) where
+ show (SomeGLoc l) = show l
+
+instance ToJSON (SomeLoc m) where
+ toJSON (SomeGLoc l) = toJSON l
+instance ToJSON (SomeLocWithVars m) where
+ toJSON (SomeGLoc l) = toJSON l
+
+-- | 'SomeLoc' turned into something that can be hashed
+newtype SomeHashableLocs = SomeHashableLocs [Value]
+ -- TODO: We go through Aeson.Value representation of the locations to update
+ -- the hash. That's not terribly efficient, we should measure if that's a
+ -- problem.
+
+instance (Monad m) => ContentHashable m SomeHashableLocs where
+ contentHashUpdate ctx (SomeHashableLocs vals) = contentHashUpdate ctx vals
+
+toHashableLocs :: [SomeLoc m] -> SomeHashableLocs
+toHashableLocs = SomeHashableLocs . map toJSON
+
+-- * Some helper functions to directly read write/read bytestring into/from
+-- locations
+
+writeLazyByte
+ :: (LocationAccessor m l)
+ => LocOf l
+ -> LBS.ByteString
+ -> m ()
+writeLazyByte loc = writeBSS loc . BSS.fromLazy
+
+-- The following functions are DEPRECATED, because converting to a lazy
+-- ByteString with BSS.toLazy_ isn't actually lazy
+
+readLazyByte
+ :: (LocationAccessor m l)
+ => LocOf l
+ -> m LBS.ByteString
+readLazyByte loc = readBSS loc BSS.toLazy_
+
+readText
+ :: (LocationAccessor m l)
+ => LocOf l
+ -> m T.Text
+readText loc =
+ LT.toStrict . LTE.decodeUtf8 <$> readLazyByte loc
+
+writeText
+ :: (LocationAccessor m l)
+ => LocOf l
+ -> T.Text
+ -> m ()
+writeText loc = writeBSS loc . BSS.fromStrict . TE.encodeUtf8
+
+
+-- * Base contexts, providing LocationAccessor to local filesystem resources
+
+type BasePorcupineContexts =
+ '[ "katip" ::: ContextFromName "katip"
+ , "resource" ::: ContextFromName "resource" ]
+
+-- | Use it as the base of the record you give to 'runPipelineTask'. Use '(:&)'
+-- to stack other contexts and LocationAccessors on top of it
+baseContexts topNamespace =
+ #katip <-- ContextRunner (runLogger topNamespace maxVerbosityLoggerScribeParams)
+ :& #resource <-- useResource
+ :& RNil
+
+-- | Like 'baseContext' but allows you to set the 'LoggerScribeParams'. Useful
+-- when no CLI is used (see 'NoConfig' and 'ConfigFileOnly')
+baseContextsWithScribeParams topNamespace scribeParams =
+ #katip <-- ContextRunner (runLogger topNamespace scribeParams)
+ :& #resource <-- useResource
+ :& RNil
+
+-- * Parsing and resolving locations, tying them to one LocationAccessor
+
+-- | The context in which aeson Values can be resolved to actual Locations
+type LocResolutionM m = ReaderT (AvailableAccessors m) m
+
+newtype ErrorsFromAccessors = ErrorsFromAccessors Object
+ deriving (ToObject, ToJSON)
+instance LogItem ErrorsFromAccessors where
+ payloadKeys _ _ = AllKeys
+
+errsFromAccs :: Object -> ErrorsFromAccessors
+errsFromAccs = ErrorsFromAccessors . HM.singleton "errorsFromAccessors" . Object
+
+-- | Finds in the accessors list a way to parse a list of JSON values that
+-- should correspond to some `LocOf l` type
+withParsedLocsWithVars
+ :: (LogThrow m)
+ => [Value]
+ -> (forall l. (LocationAccessor m l)
+ => [LocWithVarsOf l] -> LocResolutionM m r)
+ -> LocResolutionM m r
+withParsedLocsWithVars aesonVals f = do
+ AvailableAccessors allAccessors <- ask
+ case allAccessors of
+ [] -> throwWithPrefix $ "List of accessors is empty"
+ _ -> return ()
+ loop allAccessors mempty
+ where
+ showJ = LT.unpack . LT.intercalate ", " . map (LTE.decodeUtf8 . encode)
+ loop [] errCtxs =
+ katipAddContext (errsFromAccs errCtxs) $
+ throwWithPrefix $ "Location(s) " ++ showJ aesonVals
+ ++ " cannot be used by the location accessors in place."
+ loop (SomeLocationAccessor (lbl :: Label l) : accs) errCtxs =
+ case mapM fromJSON aesonVals of
+ Success a -> f (a :: [LocWithVarsOf l])
+ Error e -> loop accs (errCtxs <>
+ HM.singleton (T.pack $ symbolVal lbl) (String $ T.pack e))
+
+-- | Finds in the accessors list a way to parse a list of JSON values that
+-- should correspond to some `LocOf l` type
+withParsedLocs :: (LogThrow m)
+ => [Value]
+ -> (forall l. (LocationAccessor m l)
+ => [LocOf l] -> LocResolutionM m r)
+ -> LocResolutionM m r
+withParsedLocs aesonVals f = do
+ AvailableAccessors allAccessors <- ask
+ case allAccessors of
+ [] -> throwWithPrefix $ "List of accessors is empty"
+ _ -> return ()
+ loop allAccessors mempty
+ where
+ showJ = LT.unpack . LT.intercalate ", " . map (LTE.decodeUtf8 . encode)
+ loop [] errCtxs =
+ katipAddContext (errsFromAccs errCtxs) $
+ throwWithPrefix $ "Location(s) " ++ showJ aesonVals
+ ++ " cannot be used by the location accessors in place."
+ loop (SomeLocationAccessor (lbl :: Label l) : accs) errCtxs =
+ case mapM fromJSON aesonVals of
+ Success a -> f (a :: [LocOf l])
+ Error e -> loop accs (errCtxs <>
+ HM.singleton (T.pack $ symbolVal lbl) (String $ T.pack e))
+
+-- | The string will be parsed as a YAML value. It can be a simple string or the
+-- representation used by some location acccessor. Every accessor will be
+-- tried. Will fail if no accessor can handle the YAML value.
+resolveYamlDocToSomeLoc
+ :: (LogThrow m)
+ => String
+ -> LocResolutionM m (SomeLoc m)
+resolveYamlDocToSomeLoc doc = do
+ val <- Y.decodeThrow $ TE.encodeUtf8 $ T.pack doc
+ withParsedLocs [val] $ \[l] -> return $ SomeGLoc l
+
+-- | For locations which can be expressed as a simple String. The path will be
+-- used as a JSON string. Will fail if no accessor can handle the path.
+resolvePathToSomeLoc
+ :: (LogThrow m)
+ => FilePath
+ -> LocResolutionM m (SomeLoc m)
+resolvePathToSomeLoc p =
+ withParsedLocs [String $ T.pack p] $ \[l] -> return $ SomeGLoc l
diff --git a/src/Data/Locations/FunflowRemoteCache.hs b/src/Data/Locations/FunflowRemoteCache.hs
new file mode 100644
index 0000000..d501db2
--- /dev/null
+++ b/src/Data/Locations/FunflowRemoteCache.hs
@@ -0,0 +1,66 @@
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE OverloadedStrings #-}
+
+module Data.Locations.FunflowRemoteCache
+ ( locationCacher
+ , LiftCacher(..)
+ ) where
+
+import Control.Exception.Safe
+import Control.Funflow.ContentHashable (ContentHash, hashToPath)
+import qualified Control.Funflow.RemoteCache as Remote
+import Control.Lens
+import Control.Monad.Trans
+import Data.Bifunctor (first)
+import Data.Locations.Accessors
+import Data.Locations.Loc
+import Katip
+import Path (toFilePath)
+import System.FilePath (dropTrailingPathSeparator)
+
+
+hashToFilePath :: ContentHash -> FilePath
+hashToFilePath = dropTrailingPathSeparator . toFilePath . hashToPath
+
+newtype LocationCacher m = LocationCacher (SomeLoc m)
+
+tryS :: (MonadCatch m) => m a -> m (Either String a)
+tryS = fmap (over _Left (displayException :: SomeException -> String)) . try
+
+instance (KatipContext m)
+ => Remote.Cacher m (LocationCacher m) where
+ push (LocationCacher (SomeGLoc rootLoc)) = Remote.pushAsArchive aliasPath $ \hash body -> do
+ let loc = rootLoc `addSubdirToLoc` hashToFilePath hash
+ katipAddNamespace "remoteCacher" $ logFM DebugS $ logStr $
+ "Writing to file " ++ show loc
+ writeLazyByte loc body
+ pure Remote.PushOK
+ where
+ aliasPath from_ to_ = first show <$> tryS
+ (copy
+ (rootLoc `addSubdirToLoc` hashToFilePath from_)
+ (rootLoc `addSubdirToLoc` hashToFilePath to_))
+ pull (LocationCacher (SomeGLoc rootLoc)) = Remote.pullAsArchive $ \hash ->
+ katipAddNamespace "remoteCacher" $ do
+ let loc = rootLoc `addSubdirToLoc` hashToFilePath hash
+ readResult <- tryS $ readLazyByte loc
+ case readResult of
+ Right bs -> do
+ logFM DebugS $ logStr $ "Found in remote cache " ++ show loc
+ return $ Remote.PullOK bs
+ Left err -> do
+ katipAddContext (sl "errorFromRemoteCache" err) $
+ logFM DebugS $ logStr $ "Not in remote cache " ++ show loc
+ return $ Remote.PullError err
+
+locationCacher :: Maybe (SomeLoc m) -> Maybe (LocationCacher m)
+locationCacher = fmap LocationCacher
+
+newtype LiftCacher cacher = LiftCacher cacher
+
+instance (MonadTrans t, Remote.Cacher m cacher, Monad (t m)) =>
+ Remote.Cacher (t m) (LiftCacher cacher) where
+ push (LiftCacher c) hash hash2 path = lift $ Remote.push c hash hash2 path
+ pull (LiftCacher c) hash path = lift $ Remote.pull c hash path
diff --git a/src/Data/Locations/Loc.hs b/src/Data/Locations/Loc.hs
new file mode 100644
index 0000000..1cc9c7c
--- /dev/null
+++ b/src/Data/Locations/Loc.hs
@@ -0,0 +1,381 @@
+{-# LANGUAGE DeriveAnyClass #-}
+{-# LANGUAGE DeriveDataTypeable #-}
+{-# LANGUAGE DeriveFoldable #-}
+{-# LANGUAGE DeriveFunctor #-}
+{-# LANGUAGE DeriveGeneric #-}
+{-# LANGUAGE DeriveTraversable #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE NamedFieldPuns #-}
+{-# LANGUAGE RecordWildCards #-}
+{-# LANGUAGE StaticPointers #-}
+{-# LANGUAGE TemplateHaskell #-}
+{-# LANGUAGE TypeSynonymInstances #-}
+{-# OPTIONS_GHC -Wall #-}
+
+module Data.Locations.Loc where
+
+import Control.Applicative
+import Control.Funflow.ContentHashable
+import Control.Lens
+import Control.Monad (foldM)
+import Data.Aeson
+import Data.Binary (Binary)
+import Data.Char (toLower)
+import qualified Data.HashMap.Strict as HM
+import Data.Locations.LocVariable
+import Data.Representable
+import Data.Store (Store)
+import Data.String
+import qualified Data.Text as T
+import Data.Typeable
+import GHC.Generics (Generic)
+import qualified Network.URL as URL
+import qualified System.Directory as Dir (createDirectoryIfMissing)
+import qualified System.FilePath as Path
+
+
+-- | Each location bit can be a simple chunk of string, or a variable name
+-- waiting to be spliced in.
+data StringOrVariable
+ = SoV_String FilePath -- ^ A raw filepath part, to be used as is
+ | SoV_Variable LocVariable -- ^ A variable name
+ deriving (Eq, Generic, ToJSON, FromJSON, Store)
+
+instance Show StringOrVariable where
+ show (SoV_String s) = s
+ show (SoV_Variable (LocVariable v)) = "{" ++ v ++ "}"
+
+locBitContent :: Lens' StringOrVariable String
+locBitContent f (SoV_String p) = SoV_String <$> f p
+locBitContent f (SoV_Variable (LocVariable v)) = SoV_Variable . LocVariable <$> f v
+
+-- | A newtype so that we can redefine the Show instance
+newtype StringWithVars = StringWithVars [StringOrVariable]
+ deriving (Generic, Store)
+
+instance Semigroup StringWithVars where
+ StringWithVars l1 <> StringWithVars l2 = StringWithVars $ concatSoV_Strings $ l1++l2
+instance Monoid StringWithVars where
+ mempty = StringWithVars []
+
+instance Show StringWithVars where
+ show (StringWithVars l) = concatMap show l
+
+-- | Get all the variable names still in the loc string and possibly replace
+-- them.
+locStringVariables :: Traversal' StringWithVars StringOrVariable
+locStringVariables f (StringWithVars bits) = StringWithVars . concatSoV_Strings <$> traverse f' bits
+ where f' c@SoV_String{} = pure c
+ f' c@SoV_Variable{} = f c
+
+-- | Ensures 2 consecutive chunks are concatenated together
+concatSoV_Strings :: [StringOrVariable] -> [StringOrVariable]
+concatSoV_Strings (SoV_String p1 : SoV_String p2 : rest) =
+ concatSoV_Strings (SoV_String (p1++p2) : rest)
+concatSoV_Strings (x : rest) = x : concatSoV_Strings rest
+concatSoV_Strings [] = []
+
+data PathWithExtension a = PathWithExtension { _pathWithoutExt :: a, _pathExtension :: String }
+ deriving (Eq, Ord, Generic, ToJSON, FromJSON, Functor, Foldable, Traversable, Binary, Store)
+
+instance (Monad m, ContentHashable m a) => ContentHashable m (PathWithExtension a)
+
+makeLenses ''PathWithExtension
+
+firstNonEmptyExt :: String -> String -> String
+firstNonEmptyExt "" b = b
+firstNonEmptyExt a _ = a
+
+instance (Semigroup a) => Semigroup (PathWithExtension a) where
+ -- Concats the filepaths /without considering extension/ and then chooses one
+ -- non-empty extension, right-biased.
+ PathWithExtension p e <> PathWithExtension p' e' =
+ PathWithExtension (p<>p') $ firstNonEmptyExt e' e
+instance (Monoid a) => Monoid (PathWithExtension a) where
+ mempty = PathWithExtension mempty ""
+
+-- | Turns the 'PathWithExtension' to/from a simple string to be used as is.
+pathWithExtensionAsRawFilePath :: (IsLocString a) => Iso' (PathWithExtension a) FilePath
+pathWithExtensionAsRawFilePath = iso to_ from_
+ where
+ to_ (PathWithExtension p e) = case e of
+ "" -> p'
+ _ -> p'++"."++e
+ where p' = p ^. locStringAsRawString
+ from_ fp = let (p,e) = splitExtension' fp
+ in PathWithExtension (p ^. from locStringAsRawString) e
+
+instance (IsLocString a) => IsString (PathWithExtension a) where
+ fromString p = p ^. from pathWithExtensionAsRawFilePath
+
+instance (IsLocString a) => Show (PathWithExtension a) where
+ show p = fmap (view locStringAsRawString) p ^. pathWithExtensionAsRawFilePath
+
+data QParam a = QParam a a
+ deriving (Eq, Ord, Generic, Functor, Foldable, Traversable, Binary, Store)
+
+instance (Monad m, ContentHashable m a) => ContentHashable m (QParam a)
+
+instance (IsLocString a) => Show (QParam a) where
+ show = show . view (from asQParam)
+
+asQParam :: (IsLocString a) => Iso' (String,String) (QParam a)
+asQParam = iso to_ from_
+ where
+ to_ (x,y) = QParam (x ^. from locStringAsRawString) (y ^. from locStringAsRawString)
+ from_ (QParam x y) = (x ^. locStringAsRawString, y ^. locStringAsRawString)
+
+-- | Location's main type. A value of type 'URL' denotes a file or a
+-- folder that may be local or hosted remotely
+data URL a
+ = LocalFile { filePath :: PathWithExtension a }
+ | RemoteFile { rfProtocol :: String
+ , rfServerName :: String
+ , rfPortNumber :: Maybe Integer
+ , rfPathWithExtension :: PathWithExtension a
+ , rfLocParams :: [QParam a] }
+ deriving ( Eq, Ord, Generic
+ , Functor, Foldable, Traversable, Binary, Store )
+
+instance (Monad m, Typeable a, ContentHashable m a) => ContentHashable m (URL a)
+
+instance (IsLocString a) => Show (URL a) where
+ show LocalFile{ filePath } = show filePath
+ show RemoteFile{ rfProtocol, rfServerName, rfPathWithExtension, rfPortNumber, rfLocParams } =
+ rfProtocol ++ "://" ++ rfServerName ++ port ++ "/" ++ show rfPathWithExtension ++ qs
+ where
+ port = case rfPortNumber of
+ Nothing -> ""
+ Just p -> ":" <> show p
+ qs = case rfLocParams of
+ [] -> ""
+ _ -> "?" ++ URL.exportParams (map (view (from asQParam)) rfLocParams)
+
+urlPathWithExtension :: Lens' (URL a) (PathWithExtension a)
+urlPathWithExtension f (LocalFile fp) = LocalFile <$> f fp
+urlPathWithExtension f RemoteFile{rfPathWithExtension=fp,..} =
+ (\fp' -> RemoteFile{rfPathWithExtension=fp',..}) <$> f fp
+
+-- | A 'URL' that might contain some named holes, called variables, that we
+-- have first to replace by a value before we can get a definite physical
+-- location.
+type LocWithVars = URL StringWithVars
+
+-- | A 'URL' that can directly be accessed as is.
+type Loc = URL String
+
+type LocalFilePath = PathWithExtension String
+
+-- | Creates a 'Loc' from a simple litteral string
+localFile :: FilePath -> Loc
+localFile s = LocalFile $ s ^. from pathWithExtensionAsRawFilePath
+
+-- | Creates a 'LocWithVars' that will only contain a chunk, no variables
+locWithVarsFromLoc :: (Functor f) => f String -> f StringWithVars
+locWithVarsFromLoc = fmap (StringWithVars . (:[]) . SoV_String)
+
+-- | A map that can be used to splice variables in a 'LocWithVars'
+type LocVariableMap = HM.HashMap LocVariable String
+
+-- | Splices in the variables present in the hashmap
+spliceLocVariables :: (Functor f) => LocVariableMap -> f StringWithVars -> f StringWithVars
+spliceLocVariables vars = fmap $ over locStringVariables $ \v -> case v of
+ SoV_Variable vname ->
+ case HM.lookup vname vars of
+ Just val -> SoV_String val
+ Nothing -> v
+ _ -> error "spliceLocVariables: Should not happen"
+
+-- | Yields @Left _@ if any of the given StringWithVars contains variables.
+terminateLocWithVars :: (Traversable f) => f StringWithVars -> Either String (f String)
+terminateLocWithVars = traverse terminateStringWithVars
+ where
+ terminateStringWithVars (StringWithVars [SoV_String s]) = Right s
+ terminateStringWithVars locString = Left $
+ "Variable(s) " ++ show (locString ^.. locStringVariables)
+ ++ " in '" ++ show locString ++ "' haven't been given a value"
+
+-- | Means that @a@ can represent file paths
+class (Monoid a) => IsLocString a where
+ locStringAsRawString :: Iso' a String
+ parseLocString :: String -> Either String a
+
+parseLocStringAndExt :: (IsLocString a) => String -> Either String (PathWithExtension a)
+parseLocStringAndExt s =
+ PathWithExtension <$> parseLocString p <*> refuseVarRefs "extension" e
+ where (p, e) = splitExtension' s
+
+splitExtension' :: FilePath -> (FilePath, String)
+splitExtension' fp = let (f,e) = Path.splitExtension fp in
+ case e of '.':e' -> (f,e')
+ _ -> (f,e)
+
+instance IsLocString String where
+ locStringAsRawString = id
+ parseLocString = Right
+
+parseStringWithVars :: String -> Either String StringWithVars
+parseStringWithVars s = (StringWithVars . reverse . map (over locBitContent reverse) . filter isFull)
+ <$> foldM oneChar [] s
+ where
+ oneChar (SoV_Variable _ : _) '{' = Left "Cannot nest {...}"
+ oneChar acc '{' = return $ SoV_Variable (LocVariable "") : acc
+ oneChar (SoV_String _ : _) '}' = Left "'}' terminates nothing"
+ oneChar acc '}' = return $ SoV_String "" : acc
+ oneChar (hd : rest) c = return $ over locBitContent (c:) hd : rest
+ oneChar [] c = return [SoV_String [c]]
+
+ isFull (SoV_String "") = False
+ isFull _ = True
+
+-- | @refuseVarRefs p s == Right s@ if `s` contains no variables.
+-- Otherwise, yields an error message.
+refuseVarRefs :: String -> String -> Either String String
+refuseVarRefs place s = do
+ l <- parseStringWithVars s
+ case l of
+ (StringWithVars []) -> return ""
+ (StringWithVars [SoV_String p]) -> return p
+ _ -> Left $ "Variable references {...} are not allowed in the " ++ place ++ " part of a URL"
+
+instance IsLocString StringWithVars where
+ locStringAsRawString = iso show from_
+ where from_ s = StringWithVars [SoV_String s]
+ parseLocString = parseStringWithVars
+
+-- | The main way to parse an 'URL'. Variables are not allowed in the protocol
+-- and server parts.
+parseURL :: (IsLocString a) => String -> Either String (URL a)
+parseURL "." = Right $ LocalFile $ PathWithExtension ("." ^. from locStringAsRawString) ""
+parseURL litteralPath = do
+ url <- maybe (Left $ "parseURL: Invalid URL '" ++ litteralPath ++ "'") Right $
+ URL.importURL litteralPath
+ case URL.url_type url of
+ URL.Absolute h ->
+ RemoteFile <$> (refuseVarRefs "protocol" $ getProtocol $ URL.protocol h)
+ <*> (refuseVarRefs "server" $ URL.host h)
+ <*> (Right $ URL.port h)
+ <*> (parseLocStringAndExt $ URL.url_path url)
+ <*> (map (uncurry QParam) <$>
+ mapMOf (traversed.both) parseLocString (URL.url_params url))
+ URL.HostRelative -> LocalFile <$> (parseLocStringAndExt $ "/" ++ URL.url_path url)
+ URL.PathRelative -> LocalFile <$> (parseLocStringAndExt $ URL.url_path url)
+ where getProtocol (URL.RawProt h) = map toLower h
+ getProtocol (URL.HTTP False) = "http"
+ getProtocol (URL.HTTP True) = "https"
+ getProtocol (URL.FTP False) = "ftp"
+ getProtocol (URL.FTP True) = "ftps"
+
+instance (IsLocString a) => IsString (URL a) where
+ fromString s = case parseURL s of
+ Right l -> l
+ Left e -> error e
+
+instance (IsLocString a) => Representable (PathWithExtension a) where
+ toTextRepr = T.pack . show
+ fromTextRepr x = case parseLocStringAndExt $ T.unpack x of
+ Left _ -> empty
+ Right x' -> pure x'
+
+instance (IsLocString a) => Representable (URL a) where
+ toTextRepr = T.pack . show
+ fromTextRepr x = case parseURL $ T.unpack x of
+ Left _ -> empty
+ Right x' -> pure x'
+
+instance (IsLocString a) => FromJSON (URL a) where
+ parseJSON (String j) = fromTextRepr j
+ parseJSON _ = fail "URL must be read from a JSON String"
+
+instance (IsLocString a) => ToJSON (URL a) where
+ toJSON = String . toTextRepr
+
+-- | The equivalent of </> from `filepath` package on 'PathWithExtension's
+appendToPathWithExtensionAsSubdir :: (IsLocString a) => PathWithExtension a -> String -> PathWithExtension a
+fp `appendToPathWithExtensionAsSubdir` s = view (from pathWithExtensionAsRawFilePath) $
+ (fp^.pathWithExtensionAsRawFilePath) Path.</> s
+
+-- | Appends a path to a location. The Loc is considered to be a folder, so its
+-- possible extension will be /ignored/.
+(</>) :: (IsLocString a) => URL a -> String -> URL a
+f </> p = f & over urlPathWithExtension (`appendToPathWithExtensionAsSubdir` p)
+infixl 4 </>
+
+-- | Alias for '</>'
+(<//>) :: (IsLocString a) => URL a -> String -> URL a
+(<//>) = (</>)
+infixl 4 <//>
+
+-- | Replaces a Loc extension
+(-<.>) :: Loc -> String -> Loc
+f -<.> ext = f & urlPathWithExtension . pathExtension .~ ext
+infixl 3 -<.>
+
+-- | Initialises a directory from a Loc to it, so that we can safely write in it
+-- afterwards. For a local filesystem, this means creating it.
+initDir :: Loc -> IO ()
+initDir f@LocalFile{} =
+ Dir.createDirectoryIfMissing True $ f ^. urlPathWithExtension . pathWithoutExt
+initDir _ = pure ()
+
+-- | Analog to 'Path.takeDirectory' for generalized locations
+takeDirectory :: Loc -> Loc
+takeDirectory = over (urlPathWithExtension . pathWithoutExt) Path.takeDirectory . dropExtension
+
+-- | Analog of 'Path.dropExtension'
+dropExtension :: URL a -> URL a
+dropExtension f = f & urlPathWithExtension . pathExtension .~ ""
+
+-- | The class of all locations that can be mapped to VirtualFiles in a
+-- configuration file.
+class (Traversable f
+ -- Just ensure that `forall a. (IsLocString a) => (FromJSON (f a),
+ -- ToJSON (f a))`:
+ ,FromJSON (f String), FromJSON (f StringWithVars)
+ ,ToJSON (f String), ToJSON (f StringWithVars)
+ -- `forall a. (IsLocString a) => (Show (f a))`:
+ ,Show (f String), Show (f StringWithVars)) => TypedLocation f where
+
+ -- TODO: Find a way to replace get/setLocType by a Lens. This displeased
+ -- GeneralizedNewtypeDeriving when making LocationAccessor and trying to
+ -- automatically derived instances of TypedLocation
+ getLocType :: f a -> String
+
+ -- | Access the file type part of a location
+ --
+ -- For locations that encode types as extensions, this would access the
+ -- extension. But for others (like HTTP urls), locType would probably need to
+ -- translate it first to a mime type, or some other implementation-specific
+ -- way to to represent resource types
+ setLocType :: f a -> (String -> String) -> f a
+
+ -- | Use the location as a directory and append a "subdir" to it. Depending on
+ -- the implementation this "subdir" relationship can only be semantic (the
+ -- result doesn't have to physically be a subdirectory of the input)
+ --
+ -- Note: this isn't a path, the subdir shouldn't contain any slashes
+ addSubdirToLoc :: (IsLocString a) => f a -> String -> f a
+
+ -- | Apply a mapping shortcut (represented as a partial file path, with its
+ -- extension) to the location. For now, non URL-based locations should send an
+ -- error
+ --
+ -- Note: contrary to 'addSubdirToLoc', the 'PathWithExtension' MAY contain
+ -- slashes
+ useLocAsPrefix :: (IsLocString a) => f a -> PathWithExtension a -> f a
+
+instance TypedLocation URL where
+ setLocType l f = l & over (urlPathWithExtension . pathExtension) f
+ getLocType = view (urlPathWithExtension . pathExtension)
+ addSubdirToLoc = (</>)
+ useLocAsPrefix l p = l & over urlPathWithExtension (<> p)
+
+-- | Sets the file type of a location
+overrideLocType :: (TypedLocation f) => f a -> String -> f a
+overrideLocType loc newExt = setLocType loc (newExt `firstNonEmptyExt`)
+
+-- | Sets the file type of a location unless it already has one
+setLocTypeIfMissing :: (TypedLocation f) => f a -> String -> f a
+setLocTypeIfMissing loc newExt = setLocType loc (`firstNonEmptyExt` newExt)
diff --git a/src/Data/Locations/LocVariable.hs b/src/Data/Locations/LocVariable.hs
new file mode 100644
index 0000000..760a20f
--- /dev/null
+++ b/src/Data/Locations/LocVariable.hs
@@ -0,0 +1,14 @@
+{-# LANGUAGE GeneralizedNewtypeDeriving #-}
+
+module Data.Locations.LocVariable where
+
+import Data.Aeson
+import Data.Hashable (Hashable)
+import Data.Store (Store)
+import Data.String
+
+
+-- | Just a variable name
+newtype LocVariable = LocVariable { unLocVariable :: String }
+ deriving (IsString, Show, ToJSON, FromJSON, Eq, Hashable
+ ,FromJSONKey, ToJSONKey, Store)
diff --git a/src/Data/Locations/LocationTree.hs b/src/Data/Locations/LocationTree.hs
new file mode 100644
index 0000000..f26c692
--- /dev/null
+++ b/src/Data/Locations/LocationTree.hs
@@ -0,0 +1,283 @@
+{-# LANGUAGE DeriveAnyClass #-}
+{-# LANGUAGE DeriveFoldable #-}
+{-# LANGUAGE DeriveFunctor #-}
+{-# LANGUAGE DeriveGeneric #-}
+{-# LANGUAGE DeriveTraversable #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE RankNTypes #-}
+{-# LANGUAGE TemplateHaskell #-}
+{-# LANGUAGE TupleSections #-}
+{-# LANGUAGE TypeOperators #-}
+{-# LANGUAGE TypeSynonymInstances #-}
+{-# OPTIONS_GHC -Wall #-}
+{-# LANGUAGE DataKinds #-}
+{-# OPTIONS_GHC -fno-warn-type-defaults #-}
+
+-- | Abstraction layer around the filesystem, so that inputs and outputs may be
+-- redirected to various files without the logic code having to know.
+module Data.Locations.LocationTree
+ (
+ -- * Types
+ LocationTree(..)
+ , LocationTreePathItem(..), LocationTreePath(..)
+ , LTPIAndSubtree(..)
+ , (:||)(..), _Unprioritized, _Prioritized
+ -- * Functions
+ , locTreeNodeTag, locTreeSubfolders
+ , inLocTree
+ , allSubLocTrees, traversedTreeWithPath
+ , atSubfolder, atSubfolderRec
+ , filteredLocsInTree
+ , subtractPathFromTree
+ , singLTP
+ , showLTPIName
+ , ltpiName
+ , locNode, folderNode, fileEmpty, file
+ , splitLocTree, joinLocTrees
+ , locTreeToDataTree
+ , prettyLocTree
+ , apLocationTree
+ , showLTP
+ )
+where
+
+import Control.Applicative
+import Control.Lens
+import Data.Aeson
+import Data.Binary
+import Data.Hashable
+import qualified Data.HashMap.Strict as HM
+import Data.List
+import Data.Maybe
+import Data.Representable
+import Data.String
+import qualified Data.Text as T
+import qualified Data.Tree as DT
+import GHC.Generics (Generic)
+-- import Data.Tree.Pretty
+-- import Diagrams.TwoD.Layout.Tree
+
+
+-- | A very simple virtual filesystem. Defines a hierarchy of virtual locations,
+-- and some rules for how to store and read files. Each type of pipeline
+-- (solving, exploration) will need its 'LocationTree', that can be obtained by
+-- composing the 'LocationTree's of the tasks it contains.
+--
+-- In a project using pipeline-tools, the project's code will only use virtual
+-- paths, and it won't know what actual physical location is behind that path.
+--
+-- We could use a DocRecord to represent a LocationTree. Maybe we'll refactor it
+-- to use DocRecords in the future, but for now it was simpler to use a simple
+-- tree of maps.
+data LocationTree a = LocationTree
+ { _locTreeNodeTag :: a -- ^ In the case of a 'BareLocationTree', indicates
+ -- the prefered serialization method of the content
+ -- of that node. If that node is a folder, the
+ -- serialization method can mean that its content
+ -- will be packed at a single place (for instance
+ -- one table in a database to group several virtual
+ -- JSON files)
+ , _locTreeSubfolders :: HM.HashMap LocationTreePathItem (LocationTree a)
+ -- ^ The content of the node. Is empty for a terminal file.
+ }
+ deriving (Eq, Show, Functor, Foldable, Traversable)
+
+instance (Monoid a) => Monoid (LocationTree a) where
+ mempty = LocationTree mempty mempty
+
+instance (Semigroup a) => Semigroup (LocationTree a) where
+ LocationTree m1 s1 <> LocationTree m2 s2 =
+ LocationTree (m1 <> m2) (HM.unionWith (<>) s1 s2)
+
+-- | LocationTree cannot be an applicative because pure cannot construct an
+-- infinite tree (since HashMaps are strict in their keys), but <*> can be
+-- implemented, and a LocationTree is already a Functor. Branches that don't
+-- match are just abandoned.
+apLocationTree :: LocationTree (a -> b) -> LocationTree a -> LocationTree b
+apLocationTree (LocationTree f sub) (LocationTree x sub') = LocationTree (f x) sub''
+ where
+ sub'' = HM.intersectionWith apLocationTree sub sub'
+
+-- | Identifies a folder or file-like object in the 'LocationTree'.
+newtype LocationTreePathItem
+ = LTPI { _ltpiName :: T.Text -- ^ Name of the file or folder
+ -- , _ltpiIsRepeatable :: Bool -- ^ If true, then will correspond to
+ -- -- several files or folders, where _ltpiName
+ -- -- is just a prefix followed by a number
+ -- -- (e.g. patient01, patient02, etc.)
+ -- , _ltpiIsTerminal :: Bool -- ^ If true, is a pure file that must be
+ -- -- deserizalized as a whole
+ }
+ deriving (Eq, Ord, Show, Generic, Hashable, Binary)
+
+showLTPIName :: LocationTreePathItem -> String
+showLTPIName = T.unpack . _ltpiName
+
+singleFolder :: T.Text -> LocationTreePathItem
+singleFolder x = LTPI x -- False False
+
+instance IsString LocationTreePathItem where
+ fromString = singleFolder . T.pack
+
+-- | A path in a 'LocationTree'
+newtype LocationTreePath = LTP [LocationTreePathItem]
+ deriving (Eq, Ord, Show, Generic, Hashable, Binary)
+
+instance Representable LocationTreePath where
+ toTextRepr (LTP l) = mconcat $ "/" : intersperse "/" (map _ltpiName l)
+ fromTextRepr =
+ pure . LTP . map singleFolder . filter (not . T.null) . T.splitOn "/"
+
+showLTP :: LocationTreePath -> String
+showLTP = T.unpack . toTextRepr
+
+instance ToJSON LocationTreePath where
+ toJSON = String . toTextRepr
+
+instance FromJSON LocationTreePath where
+ parseJSON (String t) = fromTextRepr t
+ parseJSON _ = mempty
+
+instance ToJSONKey LocationTreePath
+instance FromJSONKey LocationTreePath
+
+instance Semigroup LocationTreePath where
+ (LTP a) <> (LTP b) = LTP $ a ++ b
+instance Monoid LocationTreePath where
+ mempty = LTP []
+
+singLTP :: LocationTreePathItem -> LocationTreePath
+singLTP = LTP . (:[])
+
+-- | Permits to filter a tree and to remove some nodes
+filteredLocsInTree
+ :: Traversal (LocationTree a) (Maybe (LocationTree b)) a (Maybe b)
+filteredLocsInTree f (LocationTree a sub) =
+ liftA2 LocationTree <$> f a
+ <*> (Just . HM.fromList . catMaybes <$> traverse onSub (HM.toList sub))
+ where
+ onSub (k,t) = fmap (k,) <$> filteredLocsInTree f t
+
+-- | Access or edit a subtree
+inLocTree :: LocationTreePath -> Lens' (LocationTree a) (Maybe (LocationTree a))
+inLocTree path f t = fromJust <$> go path (Just t)
+ where
+ go _ Nothing = f Nothing
+ go (LTP []) mbT = f mbT
+ go (LTP (p:ps)) (Just (LocationTree m s)) = rebuild <$> go (LTP ps) (HM.lookup p s)
+ where
+ rebuild Nothing | HM.null s' = Nothing
+ | otherwise = Just $ LocationTree m s'
+ where s' = HM.delete p s
+ rebuild (Just res) = Just $ LocationTree m $ HM.insert p res s
+
+-- | Find all the subtrees, indexed by their 'LocationTreePath'
+allSubLocTrees
+ :: Traversal (LocationTree a) (LocationTree b)
+ (LocationTreePath, LocationTree a) b
+allSubLocTrees f = go []
+ where go ps n@(LocationTree _ sub) = LocationTree
+ <$> f (LTP $ reverse ps, n)
+ <*> itraverse (\p n' -> go (p:ps) n') sub
+
+-- | Traverse all the nodes, indexed by their 'LocationTreePath'
+traversedTreeWithPath
+ :: Traversal (LocationTree a) (LocationTree b)
+ (LocationTreePath, a) b
+traversedTreeWithPath f = go []
+ where go ps (LocationTree n sub) = LocationTree
+ <$> f (LTP $ reverse ps, n)
+ <*> itraverse (\p n' -> go (p:ps) n') sub
+
+-- | Removes a path from a 'LocationTree'.
+subtractPathFromTree :: LocationTree a -> LocationTreePath -> LocationTree a
+subtractPathFromTree tree path = tree & inLocTree path .~ Nothing
+
+-- | Just a tuple-like type. An entry for the map of contents at some path in a
+-- 'LocationTree'
+data LTPIAndSubtree a = LocationTreePathItem :/ LocationTree a
+ deriving (Eq, Show, Functor, Foldable, Traversable)
+
+infixr 5 :/
+
+locNode :: a -> [LTPIAndSubtree a] -> LocationTree a
+locNode a = LocationTree a . HM.fromList . map (\(x:/y) -> (x,y))
+
+-- | A shortcut for 'locNode' for folders
+folderNode :: (Monoid a) => [LTPIAndSubtree a] -> LocationTree a
+folderNode = locNode mempty
+
+fileEmpty :: (Monoid a)
+ => LocationTreePathItem
+ -> LTPIAndSubtree a
+fileEmpty i = i :/ mempty
+file :: LocationTreePathItem
+ -> a
+ -> LTPIAndSubtree a
+file i a = i :/ LocationTree a mempty
+
+instance (Monoid a) => IsString (LTPIAndSubtree a) where
+ fromString = fileEmpty . fromString
+
+-- | Like Either, but equipped with a Monoid instance that would prioritize Right over Left
+data a :|| b = Unprioritized a | Prioritized b
+infixr 5 :||
+
+instance (Semigroup a, Semigroup b) => Semigroup (a :|| b) where
+ (<>) (Prioritized x) (Prioritized x') = Prioritized (x<>x')
+ (<>) (Unprioritized x) (Unprioritized x') = Unprioritized (x<>x')
+ (<>) p@(Prioritized _) _ = p
+ (<>) _ p@(Prioritized _) = p
+instance (Monoid a, Monoid b) => Monoid (a :|| b) where
+ mempty = Unprioritized mempty
+
+-- | Merges two trees of different node types, prioritizing those of the second
+-- tree when a node exists in both trees
+joinLocTrees
+ :: (Monoid a, Monoid b)
+ => LocationTree a -> LocationTree b -> LocationTree (a :|| b)
+joinLocTrees ta tb = fmap Unprioritized ta <> fmap Prioritized tb
+
+-- | Splits a 'LocationTree' of @a :|| b@ into two trees that will have the same
+-- structure but not the same nodes
+splitLocTree :: LocationTree (a :|| b) -> (LocationTree (Maybe a), LocationTree (Maybe b))
+splitLocTree (LocationTree n sub) = case n of
+ Unprioritized a -> (LocationTree (Just a) subA, LocationTree Nothing subB)
+ Prioritized b -> (LocationTree Nothing subA, LocationTree (Just b) subB)
+ where
+ subA = HM.fromList subAL
+ subB = HM.fromList subBL
+ (subAL, subBL) = unzip $
+ map (\(path, ltree) ->
+ let (na, nb) = splitLocTree ltree
+ in ((path, na), (path, nb)))
+ (HM.toList sub)
+
+makeLenses ''LocationTree
+makeLenses ''LocationTreePathItem
+makePrisms ''(:||)
+
+
+atSubfolder :: Applicative f => LocationTreePathItem -> (LocationTree a -> f (LocationTree a)) -> LocationTree a -> f (LocationTree a)
+atSubfolder pathItem = locTreeSubfolders . at pathItem . _Just
+
+atSubfolderRec :: (Applicative f, Foldable t) => t LocationTreePathItem -> (LocationTree a -> f (LocationTree a)) -> LocationTree a -> f (LocationTree a)
+atSubfolderRec path =
+ foldr (\pathItem subtree -> atSubfolder pathItem . subtree) id path
+
+locTreeToDataTree :: LocationTreePath -> LocationTree b -> DT.Tree (LocationTreePathItem, b)
+locTreeToDataTree (LTP root) t = toCanonicalTree root' t
+ where
+ root' = case root of
+ [] -> "/"
+ _ -> last root
+ toCanonicalTree p (LocationTree n sub) =
+ DT.Node (p,n) $ map (uncurry toCanonicalTree) $ HM.toList sub
+
+prettyLocTree :: (Show a) => LocationTreePath -> LocationTree a -> String
+prettyLocTree root t = DT.drawTree t'
+ where
+ str (p,n) = T.unpack (_ltpiName p) ++ ": " ++ show n
+ t' = str <$> locTreeToDataTree root t
diff --git a/src/Data/Locations/LogAndErrors.hs b/src/Data/Locations/LogAndErrors.hs
new file mode 100644
index 0000000..675ccbe
--- /dev/null
+++ b/src/Data/Locations/LogAndErrors.hs
@@ -0,0 +1,58 @@
+{-# LANGUAGE ConstraintKinds #-}
+{-# LANGUAGE OverloadedStrings #-}
+
+-- | This module contains some helper functions for logging info and throwing
+-- errors
+
+module Data.Locations.LogAndErrors
+ ( module Control.Exception.Safe
+ , KatipContext
+ , LogThrow, LogCatch, LogMask
+ , TaskRunError(..)
+ , logAndThrowM
+ , throwWithPrefix
+ ) where
+
+import Control.Exception.Safe
+import qualified Data.Text as T
+import GHC.Stack
+import Katip
+
+
+-- | An error when running a pipeline of tasks
+newtype TaskRunError = TaskRunError String
+
+instance Show TaskRunError where
+ show (TaskRunError s) = s
+
+instance Exception TaskRunError where
+ displayException (TaskRunError s) = s
+
+getTaskErrorPrefix :: (KatipContext m) => m String
+getTaskErrorPrefix = do
+ Namespace ns <- getKatipNamespace
+ case ns of
+ [] -> return ""
+ _ -> return $ T.unpack $ T.intercalate "." ns <> ": "
+
+-- | Just an alias for monads that can throw errors and log them
+type LogThrow m = (KatipContext m, MonadThrow m, HasCallStack)
+
+-- | Just an alias for monads that can throw,catch errors and log them
+type LogCatch m = (KatipContext m, MonadCatch m)
+
+-- | Just an alias for monads that can throw,catch,mask errors and log them
+type LogMask m = (KatipContext m, MonadMask m)
+
+-- | A replacement for throwM. Logs an error (using displayException) and throws
+logAndThrowM :: (LogThrow m, Exception e) => e -> m a
+logAndThrowM exc = do
+ logFM ErrorS $ logStr $ displayException exc
+ throwM exc
+
+-- | Logs an error and throws a 'TaskRunError'
+throwWithPrefix :: (LogThrow m) => String -> m a
+throwWithPrefix msg = do
+ logFM ErrorS $ logStr msg
+ prefix <- getTaskErrorPrefix
+ throwM $ TaskRunError $ prefix ++ msg
diff --git a/src/Data/Locations/Mappings.hs b/src/Data/Locations/Mappings.hs
new file mode 100644
index 0000000..b469eb6
--- /dev/null
+++ b/src/Data/Locations/Mappings.hs
@@ -0,0 +1,259 @@
+{-# OPTIONS_GHC -fno-warn-type-defaults #-}
+{-# LANGUAGE DeriveFunctor #-}
+{-# LANGUAGE ExistentialQuantification #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE GeneralizedNewtypeDeriving #-}
+{-# LANGUAGE LambdaCase #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE TupleSections #-}
+{-# LANGUAGE TypeApplications #-}
+{-# LANGUAGE ViewPatterns #-}
+{-# OPTIONS_GHC -Wall #-}
+{-# OPTIONS_GHC "-fno-warn-incomplete-uni-patterns" #-}
+
+module Data.Locations.Mappings
+ ( LocationMappings, LocationMappings_(..)
+ , HasDefaultMappingRule(..)
+ , LocShortcut(..), SerializableLocShortcut
+ --, allLocsInMappings
+ , mappingsFromLocTree
+ , mappingRootOnly
+ , insertMappings
+ , propagateMappings
+ , applyMappings
+ ) where
+
+import Control.Arrow ((***))
+import Control.Lens
+import Data.Aeson
+import qualified Data.HashMap.Strict as HM
+import Data.List
+import Data.Locations.Accessors
+import Data.Locations.Loc
+import Data.Locations.LocationTree
+import Data.Locations.LogAndErrors
+import Data.Locations.SerializationMethod (FileExt)
+import Data.Maybe
+import Data.Representable
+import qualified Data.Text as T
+
+
+-- * The 'LocationMappings' type
+
+newtype LocationMappings_ n = LocationMappings_
+ (HM.HashMap LocationTreePath [n])
+ deriving (Functor, Show)
+
+-- | Describes how physical locations are mapped to an application's
+-- LocationTree. This is the type that is written to the pipeline yaml config
+-- file under the "locations" section.
+type LocationMappings = LocationMappings_ SerializableLocShortcut
+
+instance Monoid (LocationMappings_ n) where
+ mempty = LocationMappings_ mempty
+
+instance Semigroup (LocationMappings_ n) where
+ (LocationMappings_ m) <> (LocationMappings_ m') = LocationMappings_ $
+ HM.unionWith (++) m m'
+
+instance (ToJSON n) => ToJSON (LocationMappings_ n) where
+ toJSON (LocationMappings_ m) = Object $ HM.fromList $
+ map (toTextRepr *** layersToJSON) $ HM.toList m
+ where
+ layersToJSON [] = Null
+ layersToJSON [l] = toJSON l
+ layersToJSON layers = toJSON layers
+
+instance FromJSON LocationMappings where
+ parseJSON (Object m) = LocationMappings_ . HM.fromList <$>
+ mapM (\(k, v) -> (,) <$> fromTextRepr k <*> parseJSONLayers v) (HM.toList m)
+ where
+ parseJSONLayers Null = pure []
+ parseJSONLayers j@Array{} = parseJSON j
+ parseJSONLayers j = (:[]) <$> parseJSON j
+ parseJSON _ = mempty
+
+-- -- | Lists all the physical paths that have been associated to some virtual
+-- -- location
+-- allLocsInMappings :: LocationMappings -> [LocWithVars]
+-- allLocsInMappings (LocationMappings_ m) =
+-- [ loc
+-- | (_,layers) <- HM.toList m, FullySpecifiedLoc loc <- layers ]
+
+
+-- * How to get pre-filled defaut mappings from an existing LocationTree
+
+-- | Means that we can possibly derive a default @LocShortcut@ from @a@
+class HasDefaultMappingRule a where
+ getDefaultLocShortcut :: a -> Maybe (LocShortcut x)
+ -- ^ Nothing means that the @a@ should not be mapped by default
+
+-- | Pre-fills the mappings from the context of a 'LocationTree', with extra
+-- metadata saying whether each node should be explicitely mapped or unmapped.
+mappingsFromLocTree :: (HasDefaultMappingRule a) => LocationTree a -> LocationMappings
+mappingsFromLocTree (LocationTree node subtree) | HM.null subtree =
+ LocationMappings_ $
+ HM.singleton (LTP [])
+ (case getDefaultLocShortcut node of
+ Just shortcuts -> [shortcuts]
+ Nothing -> [])
+mappingsFromLocTree (LocationTree _ sub) =
+ LocationMappings_ (mconcat $ map f $ HM.toList sub)
+ where
+ f (ltpi, t) =
+ HM.fromList $ map appendPath $ HM.toList m
+ where
+ appendPath (LTP path, maps) = (LTP $ ltpi : path, maps)
+ LocationMappings_ m = mappingsFromLocTree t
+
+-- | Creates a 'LocationMappings_' where the whole LocationTree is mapped to a
+-- single folder
+mappingRootOnly :: Loc -> LocationMappings
+mappingRootOnly l = LocationMappings_ $
+ HM.singleton (LTP [])
+ [FullySpecifiedLoc $ toJSON $ locWithVarsFromLoc l]
+
+
+-- * How to parse mappings to and from JSON
+
+-- | A location with variables where some parts may have been eluded
+data LocShortcut a
+ = DeriveWholeLocFromTree FileExt
+ -- ^ Means that this loc path and name should be inherited from locs up the
+ -- virtual tree.
+ | DeriveLocPrefixFromTree (PathWithExtension StringWithVars)
+ -- ^ Means that this loc path should be inherited from locs up the resource
+ -- tree. Its name should be a concatenation of the corresponding name in the
+ -- tree and the PathWithExtension provided
+ | FullySpecifiedLoc a
+ -- ^ Means that this shortcut is a full location
+ deriving (Show)
+
+-- | A 'LocShorcut' where fully specified locs are aeson Values ready to be
+-- parsed by some LocationAccessor. It is parsed from the mappings in the
+-- configuration file.
+type SerializableLocShortcut = LocShortcut Value
+
+-- | A 'LocShortcut' where fully specified locs have been parsed, and resolved
+-- to be tied to some specific LocationAccessor. It isn't serializable in JSON,
+-- hence the separation between this and 'SerializableLocShortcut'.
+type ResolvedLocShortcut m = LocShortcut (SomeLocWithVars m)
+
+ -- The underscore sign here means "reuse inherited", depending on the
+ -- position it can mean either file path or extension or both.
+instance ToJSON SerializableLocShortcut where
+ toJSON (DeriveWholeLocFromTree ext) = String $ case ext of
+ "" -> "_"
+ _ -> "_." <> ext
+ toJSON (DeriveLocPrefixFromTree l) = String $ "_" <> toTextRepr l
+ toJSON (FullySpecifiedLoc v) = v
+
+instance FromJSON SerializableLocShortcut where
+ parseJSON (String "_") = pure $ DeriveWholeLocFromTree ""
+ parseJSON (String (T.uncons -> Just ('_', s))) = case parseLocStringAndExt $ T.unpack s of
+ Left e -> fail e
+ Right r -> pure $ DeriveLocPrefixFromTree r
+ parseJSON v = pure $ FullySpecifiedLoc v
+
+-- * How to apply mappings to a LocationTree to get the physical locations bound
+-- to each of its nodes
+
+-- | Returns a new 'LocationTree', updated from the mappings. Paths in the
+-- 'LocationMappings_' that don't correspond to anything in the 'LocationTree'
+-- will just be ignored
+insertMappings
+ :: LocationMappings
+ -> LocationTree a
+ -> LocationTree (a, Maybe [SerializableLocShortcut])
+insertMappings (LocationMappings_ m) tree = foldl' go initTree $ HM.toList m
+ where
+ initTree = fmap (,Nothing) tree
+ -- By defaut, each node is set to "no mapping defined"...
+ go t (path, layers) = t &
+ inLocTree path . _Just . locTreeNodeTag . _2 .~ Just layers
+ -- ...then we update the tree for each mapping present in the
+ -- LocationMappings
+
+-- | For each location in the tree, gives it a final list of physical locations,
+-- as /layers/ (which can be empty)
+propagateMappings :: forall m a b.
+ ([SomeLocWithVars m] -> a -> Bool -> b)
+ -> LocationTree (a, Maybe [ResolvedLocShortcut m])
+ -> LocationTree b
+propagateMappings f tree = propagateMappings' [] tree
+ where
+ -- if a folder is explicitly set to null (ie if no layer exist for this
+ -- folder), then we recursively unmap everything is contains, ignoring every
+ -- submapping that might exist:
+ propagateMappings' _ t@(LocationTree (_, Just []) _) = fmap unmap t
+ where unmap (n, _) = f [] n True
+ -- if a folder is mapped, we propagate the mapping downwards:
+ propagateMappings' inheritedLayers (LocationTree (thisNode, mbTheseMappings) thisSub) =
+ LocationTree thisNode' $ imap recur thisSub
+ where
+ theseLayers = applyInheritedLayersToShortcuts inheritedLayers mbTheseMappings
+ thisNode' = f theseLayers thisNode (isJust mbTheseMappings)
+ recur fname subtree = propagateMappings' sublayers subtree
+ where
+ addSubdir :: SomeLocWithVars m -> SomeLocWithVars m
+ addSubdir (SomeGLoc l) = SomeGLoc $ addSubdirToLoc l $ T.unpack (_ltpiName fname)
+ sublayers = fmap addSubdir theseLayers
+
+-- | Given a list of loc layers inherited from further up the tree, fills in the
+-- blanks in the loc shortcuts given for once node of the tree in order to get
+-- the final loc layers mapped to this node.
+applyInheritedLayersToShortcuts
+ :: forall m.
+ [SomeLocWithVars m] -- ^ Inherited layers
+ -> Maybe [ResolvedLocShortcut m] -- ^ LocShortcuts mapped to the node
+ -> [SomeLocWithVars m] -- ^ Final layers mapped to this node
+applyInheritedLayersToShortcuts inheritedLayers Nothing = inheritedLayers
+applyInheritedLayersToShortcuts inheritedLayers (Just shortcuts) =
+ concatMap fillShortcut shortcuts
+ where
+ fillShortcut = \case
+ FullySpecifiedLoc l -> [l]
+ DeriveLocPrefixFromTree fp ->
+ flip map inheritedLayers $ \(SomeGLoc l) ->
+ SomeGLoc @m $ useLocAsPrefix l fp
+ DeriveWholeLocFromTree ext ->
+ flip map inheritedLayers $ \(SomeGLoc l) ->
+ SomeGLoc @m $ overrideLocType l (T.unpack ext)
+
+-- | In a context where we have LocationAccessors available, we parse the
+-- locations in a 'SerializableLocShortcut' and obtain a 'ResolvedLocShortcut'
+resolveLocShortcut :: (LogThrow m) => SerializableLocShortcut -> LocResolutionM m (ResolvedLocShortcut m)
+resolveLocShortcut (DeriveWholeLocFromTree ext) = return $ DeriveWholeLocFromTree ext
+resolveLocShortcut (DeriveLocPrefixFromTree path) = return $ DeriveLocPrefixFromTree path
+resolveLocShortcut (FullySpecifiedLoc value) =
+ withParsedLocsWithVars [value] $ \[resolvedLoc] ->
+ return $ FullySpecifiedLoc $ SomeGLoc resolvedLoc
+
+-- | Transform a tree to one where unmapped nodes have been changed to 'mempty'
+-- and mapped nodes have been associated to their physical location. A function
+-- is applied to ask each node to integrate its final mappings, with a Bool to
+-- tell whether whether the mapping for a node was explicit (True) or not
+-- (False), ie. if it was explicitely declared in the config file or if it was
+-- derived from the mapping of a parent folder. @n'@ is often some file type or
+-- metadata that's required in the mapping.
+--
+-- TODO: Maybe change the callback type to
+-- @DerivedOrExplicit [SomeLocWithVars m] -> a -> b@ with
+-- @data DerivedOrExplicit a = Derived a | Explicit a@
+applyMappings :: (LogThrow m)
+ => ([SomeLocWithVars m] -> a -> Bool -> b)
+ -- ^ Add physical locations (if they exist)
+ -- to a node
+ -> LocationMappings -- ^ Mappings to apply
+ -> LocationTree a -- ^ Original tree
+ -> LocResolutionM m (LocationTree b) -- ^ Tree with physical locations
+applyMappings f mappings loctree = do
+ let treeWithShortcuts = insertMappings mappings loctree
+ resolve (node, Nothing) = return (node, Nothing)
+ resolve (node, Just shortcuts) =
+ (\rs -> (node, Just rs)) <$> mapM resolveLocShortcut shortcuts
+ treeWithResolvedShortcuts <- traverse resolve treeWithShortcuts
+ return $ propagateMappings f treeWithResolvedShortcuts
diff --git a/src/Data/Locations/SerializationMethod.hs b/src/Data/Locations/SerializationMethod.hs
new file mode 100644
index 0000000..49e751c
--- /dev/null
+++ b/src/Data/Locations/SerializationMethod.hs
@@ -0,0 +1,745 @@
+{-# LANGUAGE DataKinds #-}
+{-# LANGUAGE DefaultSignatures #-}
+{-# LANGUAGE DeriveAnyClass #-}
+{-# LANGUAGE DeriveFunctor #-}
+{-# LANGUAGE DeriveGeneric #-}
+{-# LANGUAGE ExistentialQuantification #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE GADTs #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE Rank2Types #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE StandaloneDeriving #-}
+{-# LANGUAGE TemplateHaskell #-}
+{-# LANGUAGE TupleSections #-}
+{-# LANGUAGE TypeOperators #-}
+{-# OPTIONS_GHC -Wall #-}
+
+module Data.Locations.SerializationMethod where
+
+import Control.Lens hiding ((:>))
+import Control.Funflow.ContentHashable
+import Data.Aeson as A
+import qualified Data.Attoparsec.Lazy as AttoL
+import qualified Data.Binary.Builder as BinBuilder
+import qualified Data.ByteString as BS
+import qualified Data.ByteString.Lazy as LBS
+import qualified Data.ByteString.Streaming as BSS
+import Data.Char (ord)
+import qualified Data.Csv as Csv
+import qualified Data.Csv.Builder as CsvBuilder
+import qualified Data.Csv.Parser as CsvParser
+import Codec.Compression.Zlib as Zlib
+import Data.DocRecord
+import Data.DocRecord.OptParse (RecordUsableWithCLI)
+import qualified Data.HashMap.Strict as HM
+import Data.Locations.LocVariable
+import Data.Locations.LogAndErrors
+import Data.Maybe
+import Data.Monoid (First (..))
+import qualified Data.Text as T
+import qualified Data.Text.Encoding as TE
+import qualified Data.Text.Lazy as LT
+import qualified Data.Text.Lazy.Encoding as LTE
+import Data.Typeable
+import qualified Data.Vector as V
+import qualified Data.Yaml as Y
+import Katip
+import GHC.Generics
+import Streaming
+import qualified Streaming.Prelude as S
+import qualified Streaming.Zip as SZip
+
+
+-- | A file extension
+type FileExt = T.Text
+
+type FromAtomicFn' i a = i -> Either String a
+
+-- | How to read an @a@ from some identified type @i@, which is meant to be a
+-- general-purpose intermediate representation, like 'A.Value'.
+data FromAtomicFn a =
+ forall i. (Typeable i) => FromAtomicFn (FromAtomicFn' i a)
+deriving instance Functor FromAtomicFn
+
+instance Show (FromAtomicFn a) where
+ show _ = "<FromAtomicFn>"
+
+fromAtomicFn
+ :: forall i a. (Typeable i)
+ => [Maybe FileExt]
+ -> FromAtomicFn' i a
+ -> HM.HashMap (TypeRep,Maybe FileExt) (FromAtomicFn a)
+fromAtomicFn exts f = HM.fromList $ map (\ext -> ((argTypeRep,ext), FromAtomicFn f)) exts
+ where
+ argTypeRep = typeOf (undefined :: i)
+
+allFromAtomicFnsWithType :: forall i ext a. (Typeable i)
+ => HM.HashMap (TypeRep,Maybe ext) (FromAtomicFn a)
+ -> [(ext, FromAtomicFn' i a)]
+allFromAtomicFnsWithType = mapMaybe fltr . HM.toList
+ where
+ wanted = typeOf (undefined :: i)
+ fltr ((_,Nothing),_) = Nothing
+ fltr ((tr,Just ext), FromAtomicFn (f :: FromAtomicFn' i' a))
+ | tr == wanted = case eqT :: Maybe (i:~:i') of
+ Just Refl -> Just (ext, f)
+ Nothing -> error $ "allFromAtomicFnsWithType: some function doesn't deal with type "
+ ++ show wanted ++ " when it should"
+ | otherwise = Nothing
+
+
+type FromStreamFn' i a =
+ forall m. (LogMask m) => Stream (Of i) m () -> m a
+
+-- | How to read an @a@ from some @Stream (Of i) m r@
+data FromStreamFn a =
+ forall i. (Typeable i) => FromStreamFn (FromStreamFn' i a)
+
+instance Functor FromStreamFn where
+ fmap f (FromStreamFn g) = FromStreamFn $ \s -> do
+ f <$> g s
+
+instance Show (FromStreamFn a) where
+ show _ = "<FromStreamFn>"
+
+fromStreamFn
+ :: forall i a. (Typeable i)
+ => [Maybe FileExt]
+ -> FromStreamFn' i a
+ -> HM.HashMap (TypeRep,Maybe FileExt) (FromStreamFn a)
+fromStreamFn exts f = HM.fromList $ map (\ext -> ((argTypeRep,ext), FromStreamFn f)) exts
+ where
+ argTypeRep = typeOf (undefined :: i)
+
+newtype FromStreamFn'' i a = FromStreamFn'' (FromStreamFn' i a)
+
+allFromStreamFnsWithType :: forall i ext a. (Typeable i)
+ => HM.HashMap (TypeRep,Maybe ext) (FromStreamFn a)
+ -> [(ext, FromStreamFn'' i a)]
+allFromStreamFnsWithType = mapMaybe fltr . HM.toList
+ where
+ wanted = typeOf (undefined :: i)
+ fltr ((_,Nothing),_) = Nothing
+ fltr ((tr,Just ext), FromStreamFn (f :: FromStreamFn' i' a))
+ | tr == wanted = case eqT :: Maybe (i:~:i') of
+ Just Refl -> Just (ext, FromStreamFn'' f)
+ Nothing -> error $ "allFromStreamFnsWithType: some function doesn't deal with type "
+ ++ show wanted ++ " when it should"
+ | otherwise = Nothing
+
+-- | A function to read @a@ from a 'DocRec'
+data ReadFromConfigFn a = forall rs. (Typeable rs) => ReadFromConfigFn (DocRec rs -> a)
+deriving instance Functor ReadFromConfigFn
+
+instance Show (ReadFromConfigFn a) where
+ show _ = "<ReadFromConfigFn>"
+
+-- | Here, "serial" is short for "serialization method". 'SerialReaders' is the
+-- **covariant** part of 'SerialsFor'. It describes the different ways a serial
+-- can be used to obtain data.
+data SerialReaders a = SerialReaders
+ { -- TODO: Establish whether we should remove readersFromAtomic? It is often
+ -- equivalent to reading from a stream of just one element, and therefore
+ -- mostly duplicates code.
+ _serialReadersFromAtomic ::
+ HM.HashMap (TypeRep,Maybe FileExt) (FromAtomicFn a)
+ -- ^ How to read data from an intermediate type (like 'A.Value' or
+ -- 'T.Text'). As much as possible these intermediate atomic
+ -- representations should be **strict**.
+ , _serialReadersFromStream ::
+ HM.HashMap (TypeRep,Maybe FileExt) (FromStreamFn a)
+ -- ^ How to read data from a stream of intermediate data types (like
+ -- strict ByteStrings). Each one of them being strict as much as
+ -- possible.
+ }
+ deriving (Functor, Show)
+
+makeLenses ''SerialReaders
+
+instance Semigroup (SerialReaders a) where
+ SerialReaders a s <> SerialReaders a' s' =
+ SerialReaders (HM.unionWith const a a') (HM.unionWith const s s')
+instance Monoid (SerialReaders a) where
+ mempty = SerialReaders mempty mempty
+
+-- | How to turn an @a@ into some identified type @i@, which is meant to a
+-- general purpose intermediate representation, like 'A.Value' or even 'T.Text'.
+data ToAtomicFn a =
+ forall i. (Typeable i) => ToAtomicFn (a -> i)
+
+instance Show (ToAtomicFn a) where
+ show _ = "<ToAtomicFn>"
+
+toAtomicFn :: forall i a. (Typeable i)
+ => [Maybe FileExt]
+ -> (a -> i)
+ -> HM.HashMap (TypeRep,Maybe FileExt) (ToAtomicFn a)
+toAtomicFn exts f = HM.fromList $ map (\ext -> ((argTypeRep,ext), ToAtomicFn f)) exts
+ where
+ argTypeRep = typeOf (undefined :: i)
+
+allToAtomicFnsWithType :: forall i ext a. (Typeable i)
+ => HM.HashMap (TypeRep,Maybe ext) (ToAtomicFn a)
+ -> [(ext, a -> i)]
+allToAtomicFnsWithType = mapMaybe fltr . HM.toList
+ where
+ wanted = typeOf (undefined :: i)
+ fltr ((_,Nothing),_) = Nothing
+ fltr ((tr,Just ext), ToAtomicFn (f :: a -> i'))
+ | tr == wanted = case eqT :: Maybe (i:~:i') of
+ Just Refl -> Just (ext, f)
+ Nothing -> error $ "allToAtomicFnsWithType: some function doesn't deal with type "
+ ++ show wanted ++ " when it should"
+ | otherwise = Nothing
+
+-- -- | How to turn an @a@ into some @Stream (Of i) m ()@
+-- data ToStreamFn a =
+-- forall i. (Typeable i)
+-- => ToStreamFn (forall m. (LogMask m)
+-- => a -> Stream (Of i) m ())
+
+-- instance Show (ToStreamFn a) where
+-- show _ = "<ToStreamFn>"
+
+-- singletonToStreamFn
+-- :: forall i a. (Typeable i)
+-- => Maybe FileExt
+-- -> (forall m. (LogMask m) => a -> Stream (Of i) m ())
+-- -> HM.HashMap (TypeRep,Maybe FileExt) (ToStreamFn a)
+-- singletonToStreamFn ext f = HM.singleton (argTypeRep,ext) (ToStreamFn f)
+-- where argTypeRep = typeOf (undefined :: i)
+
+-- | The contravariant part of 'ReadFromConfigFn'. Permits to write default values
+-- of the input config
+data WriteToConfigFn a = forall rs. (Typeable rs, RecordUsableWithCLI rs)
+ => WriteToConfigFn (a -> DocRec rs)
+
+instance Show (WriteToConfigFn a) where
+ show _ = "<WriteToConfigFn>"
+
+-- | The writing part of a serial. 'SerialWriters' describes the different ways
+-- a serial can be used to serialize (write) data.
+data SerialWriters a = SerialWriters
+ { _serialWritersToAtomic :: HM.HashMap (TypeRep,Maybe FileExt) (ToAtomicFn a)
+ -- ^ How to write the data to an intermediate type (like 'A.Value'). As
+ -- much as possible this intermediate type should be **lazy**.
+
+ -- , _serialWritersToStream :: HM.HashMap (TypeRep,Maybe FileExt) (ToStreamFn a)
+ -- -- ^ How to write the data to an external file or storage.
+ }
+ deriving (Show)
+
+makeLenses ''SerialWriters
+
+instance Semigroup (SerialWriters a) where
+ SerialWriters a <> SerialWriters a' = SerialWriters (HM.unionWith const a a')
+instance Monoid (SerialWriters a) where
+ mempty = SerialWriters mempty
+
+instance Contravariant SerialWriters where
+ contramap f sw = SerialWriters
+ { _serialWritersToAtomic = fmap (\(ToAtomicFn f') -> ToAtomicFn $ f' . f)
+ (_serialWritersToAtomic sw)
+ -- , _serialWritersToStream = fmap (\(ToStreamFn f') -> ToStreamFn $ f' . f)
+ -- (_serialWritersToStream sw)
+ }
+
+-- | Links a serialization method to a prefered file extension, if this is
+-- relevant.
+class SerializationMethod serial where
+ -- | If @Just x@, @x@ should correspond to one of the keys in
+ -- _serialReadersFromStream or _serialWritersToAtomic.
+ getSerialDefaultExt :: serial -> Maybe FileExt
+ getSerialDefaultExt _ = Nothing
+
+-- | Tells whether some type @a@ can be serialized by some _serial_ (serialization
+-- method).
+class (SerializationMethod serial) => SerializesWith serial a where
+ getSerialWriters :: serial -> SerialWriters a
+
+-- | Tells whether some type @a@ can be deserialized by some _serial_
+-- (serialization method).
+class (SerializationMethod serial) => DeserializesWith serial a where
+ getSerialReaders :: serial -> SerialReaders a
+
+-- * Serialization to/from JSON and YAML, which both use the same intermediary
+-- type, Data.Aeson.Value
+
+-- | Has 'SerializesWith' & 'DeserializesWith' instances that permits to
+-- store/load JSON and YAML files and 'A.Value's.
+data JSONSerial = JSONSerial -- ^ Expects @.json@ files by default, but supports
+ -- @.yaml@/@.yml@ files too "for free"
+ | YAMLSerial -- ^ Expects @.yaml@/@.yml@ files by default, but
+ -- supports @.json@ files too "for free"
+
+-- | For when you want a JSON **only** or YAML **only** serialization, but tied to a
+-- specific extension. It's more restrictive than 'JSONSerial' in the sense that
+-- JSONSerialWithExt cannot read from values from the configuration (because in
+-- the config we only have an Aeson Value, without an associated extension, so
+-- we cannot know for sure this Value corresponds to the expected extension)
+data JSONSerialWithExt = JSONSerialWithExt FileExt
+ -- ^ Expects files of a given extension, ONLY
+ -- formatted in JSON (YAML not provided "for free")
+ | YAMLSerialWithExt FileExt
+ -- ^ Expects files of a given extension, ONLY
+ -- formatted in YAML (JSON not provided "for free")
+
+instance SerializationMethod JSONSerial where
+ getSerialDefaultExt JSONSerial = Just "json"
+ getSerialDefaultExt YAMLSerial = Just "yaml"
+
+instance SerializationMethod JSONSerialWithExt where
+ getSerialDefaultExt (JSONSerialWithExt e) = Just e
+ getSerialDefaultExt (YAMLSerialWithExt e) = Just e
+
+-- | To lazy bytestring of JSON
+toAtomicJSON, toAtomicYAML
+ :: ToJSON a
+ => [FileExt] -> HM.HashMap (TypeRep, Maybe FileExt) (ToAtomicFn a)
+toAtomicJSON exts =
+ toAtomicFn (map Just exts) A.encode
+
+-- | To lazy bytestring of YAML
+toAtomicYAML exts =
+ toAtomicFn (map Just exts) $ LBS.fromStrict . Y.encode
+
+instance (ToJSON a) => SerializesWith JSONSerial a where
+ getSerialWriters _srl = mempty
+ { _serialWritersToAtomic =
+ toAtomicFn [Nothing] A.toJSON -- To A.Value, doesn't need an extension
+ <> toAtomicJSON ["json"]
+ <> toAtomicYAML ["yaml","yml"] }
+
+instance (ToJSON a) => SerializesWith JSONSerialWithExt a where
+ getSerialWriters (JSONSerialWithExt ext) = mempty
+ { _serialWritersToAtomic = toAtomicJSON [ext] }
+ getSerialWriters (YAMLSerialWithExt ext) = mempty
+ { _serialWritersToAtomic = toAtomicYAML [ext] }
+
+parseJSONEither :: (A.FromJSON t) => A.Value -> Either String t
+parseJSONEither x = case A.fromJSON x of
+ A.Success s -> Right s
+ A.Error r -> Left r
+{-# INLINE parseJSONEither #-}
+
+-- | From strict bytestring of JSON
+fromAtomicJSON, fromAtomicYAML
+ :: FromJSON a
+ => [FileExt] -> HM.HashMap (TypeRep, Maybe FileExt) (FromAtomicFn a)
+fromAtomicJSON exts =
+ fromAtomicFn (map Just exts) A.eitherDecodeStrict
+
+-- | From strict bytestring of YAML
+fromAtomicYAML exts =
+ fromAtomicFn (map Just exts) $
+ over _Left displayException . Y.decodeEither'
+
+-- | From a stream of strict bytestrings of JSON
+fromJSONStream, fromYAMLStream
+ :: FromJSON a
+ => [FileExt] -> HM.HashMap (TypeRep, Maybe FileExt) (FromStreamFn a)
+fromJSONStream exts = fromStreamFn (map Just exts) $ \strm -> do
+ BSS.toStrict_ (BSS.fromChunks strm) >>= decodeJ
+ -- TODO: Enhance this so we don't have to accumulate the whole
+ where
+ decodeJ x = case A.eitherDecodeStrict x of
+ Right y -> return y
+ Left msg -> throwWithPrefix msg
+
+-- | From a stream of strict bytestrings of YAML
+fromYAMLStream exts = fromStreamFn (map Just exts) (decodeYAMLStream . BSS.fromChunks)
+
+decodeYAMLStream :: (LogThrow m, FromJSON a) => BSS.ByteString m () -> m a
+decodeYAMLStream strm = do
+ BSS.toStrict_ strm >>= decodeY -- TODO: same than above
+ where
+ decodeY x = case Y.decodeEither' x of
+ Right y -> return y
+ Left exc -> logAndThrowM exc
+
+instance (FromJSON a) => DeserializesWith JSONSerial a where
+ getSerialReaders _srl = mempty
+ { _serialReadersFromAtomic =
+ fromAtomicFn [Nothing] parseJSONEither -- From A.Value, doesn't need an
+ -- extension
+ <> fromAtomicJSON ["json"]
+ <> fromAtomicYAML ["yaml","yml"]
+ , _serialReadersFromStream =
+ fromJSONStream ["json"]
+ -- TODO: Add reading from a stream of JSON objects (which would
+ -- therefore be considered a JSON array of objects?)
+ <>
+ fromYAMLStream ["yaml","yml"] }
+
+instance (FromJSON a) => DeserializesWith JSONSerialWithExt a where
+ getSerialReaders (JSONSerialWithExt ext) = mempty
+ { _serialReadersFromAtomic = fromAtomicJSON [ext]
+ , _serialReadersFromStream = fromJSONStream [ext] }
+ getSerialReaders (YAMLSerialWithExt ext) = mempty
+ { _serialReadersFromAtomic = fromAtomicYAML [ext]
+ , _serialReadersFromStream = fromYAMLStream [ext] }
+
+
+-- * Helpers to write to and from binary representations
+
+class ToBinaryBuilder serial a where
+ toBinaryBuilder :: serial -> a -> BinBuilder.Builder
+
+-- | Recommendation: instances should implement fromLazyByteString and
+-- fromByteStream whenever possible.
+class FromByteStream serial a where
+ fromLazyByteString :: serial -> LBS.ByteString -> Either String a
+ fromLazyByteString s = fromStrictByteString s . LBS.toStrict
+ fromStrictByteString :: serial -> BS.ByteString -> Either String a
+ fromStrictByteString s = fromLazyByteString s . LBS.fromStrict
+ fromByteStream :: (LogThrow m) => serial -> BSS.ByteString m () -> m a
+ fromByteStream s bss = do
+ bs <- BSS.toLazy_ bss -- This default implementation is stricter than
+ -- it needs to be
+ case fromLazyByteString s bs of
+ Left msg -> throwWithPrefix msg
+ Right y -> return y
+
+getSerialWriters_ToBinaryBuilder
+ :: (SerializationMethod srl, ToBinaryBuilder srl a) => srl -> SerialWriters a
+getSerialWriters_ToBinaryBuilder srl = mempty
+ { _serialWritersToAtomic =
+ toAtomicFn [getSerialDefaultExt srl] $
+ BinBuilder.toLazyByteString . toBinaryBuilder srl }
+
+getSerialReaders_FromByteStream
+ :: (SerializationMethod srl, FromByteStream srl a) => srl -> SerialReaders a
+getSerialReaders_FromByteStream srl = mempty
+ { _serialReadersFromStream =
+ fromStreamFn [getSerialDefaultExt srl] (fromByteStream srl . BSS.fromChunks)
+ , _serialReadersFromAtomic = -- From strict bytestring
+ fromAtomicFn [getSerialDefaultExt srl] (fromStrictByteString srl)
+ }
+
+-- * Serialization to/from CSV
+
+-- | Data with header not known in advance, that can be converted to/from CSV,
+-- keeping track of the header
+data Tabular a = Tabular
+ { tabularHeader :: Maybe [T.Text]
+ , tabularData :: a }
+ deriving (Show, Generic, ToJSON, FromJSON)
+
+-- | Data that can be converted to/from CSV, with previous knowledge of the
+-- headers
+newtype Records a = Records { fromRecords :: a }
+
+instance (Show a) => Show (Records a) where
+ show = show . fromRecords
+
+instance (ToJSON a) => ToJSON (Records a) where
+ toJSON = toJSON . fromRecords
+
+instance (FromJSON a) => FromJSON (Records a) where
+ parseJSON = fmap Records . parseJSON
+
+-- | Can serialize and deserialize any @Tabular a@ from a CSV file
+data CSVSerial = CSVSerial
+ { csvSerialExt :: FileExt
+ -- ^ The extension to use (csv, tsv, txt, etc.)
+ , csvSerialHasHeader :: Bool
+ -- ^ The csv file contains a header (to skip or to read/write). Must be True
+ -- if we want to read 'Records' from it
+ , csvSerialDelimiter :: Char
+ -- ^ The character (@,@, @\t@, etc.) to use as a field delimiter.
+ }
+
+instance SerializationMethod CSVSerial where
+ getSerialDefaultExt = Just . csvSerialExt
+
+instance (Foldable f, Csv.ToRecord a) => ToBinaryBuilder CSVSerial (Tabular (f a)) where
+ toBinaryBuilder (CSVSerial _ hasHeader delim) (Tabular mbHeader dat) =
+ mbAddHeader $ foldMap encField dat
+ where
+ mbAddHeader | hasHeader = maybe id (<>) (encHeader <$> mbHeader)
+ | otherwise = id
+ encodeOpts = Csv.defaultEncodeOptions {Csv.encDelimiter = fromIntegral $ ord delim}
+ encHeader = CsvBuilder.encodeRecordWith encodeOpts
+ encField = CsvBuilder.encodeRecordWith encodeOpts
+
+instance (Foldable f, Csv.ToNamedRecord a, Csv.DefaultOrdered a)
+ => ToBinaryBuilder CSVSerial (Records (f a)) where
+ toBinaryBuilder (CSVSerial _ hasHeader delim) (Records dat) =
+ mbAddHeader $ foldMap encField dat
+ where
+ mbAddHeader | hasHeader = (encHeader (Csv.headerOrder (undefined :: a)) <>)
+ | otherwise = id
+ encodeOpts = Csv.defaultEncodeOptions {Csv.encDelimiter = fromIntegral $ ord delim}
+ encHeader = CsvBuilder.encodeHeaderWith encodeOpts
+ encField = CsvBuilder.encodeDefaultOrderedNamedRecordWith encodeOpts
+
+instance (Csv.FromRecord a) => FromByteStream CSVSerial (Tabular (V.Vector a)) where
+ fromLazyByteString (CSVSerial _ hasHeader delim) bs = do
+ (mbHeader, rest) <- if hasHeader
+ then case AttoL.parse (CsvParser.header delim') bs of
+ AttoL.Fail _ _ err -> Left err
+ AttoL.Done rest r -> return (Just r, rest)
+ else return (Nothing, bs)
+ let mbHeader' = map TE.decodeUtf8 . V.toList <$> mbHeader
+ Tabular mbHeader' <$> Csv.decodeWith decOpts Csv.NoHeader rest
+ where
+ delim' = fromIntegral $ ord delim
+ decOpts = Csv.defaultDecodeOptions {Csv.decDelimiter=delim'}
+
+instance (Csv.FromNamedRecord a) => FromByteStream CSVSerial (Records (V.Vector a)) where
+ fromLazyByteString (CSVSerial _ hasHeader delim) bs =
+ if not hasHeader then error "CANNOT USE ColNamed on CSV files without headers"
+ else do
+ (_, v) <- Csv.decodeByNameWith decOpts bs
+ return $ Records v
+ where
+ decOpts = Csv.defaultDecodeOptions {Csv.decDelimiter=fromIntegral $ ord delim}
+
+instance (Foldable f, Csv.ToRecord a) => SerializesWith CSVSerial (Tabular (f a)) where
+ getSerialWriters = getSerialWriters_ToBinaryBuilder
+
+instance (Foldable f, Csv.ToNamedRecord a, Csv.DefaultOrdered a)
+ => SerializesWith CSVSerial (Records (f a)) where
+ getSerialWriters = getSerialWriters_ToBinaryBuilder
+
+instance (Csv.FromRecord a) => DeserializesWith CSVSerial (Tabular (V.Vector a)) where
+ getSerialReaders = getSerialReaders_FromByteStream
+
+instance (Csv.FromNamedRecord a) => DeserializesWith CSVSerial (Records (V.Vector a)) where
+ getSerialReaders = getSerialReaders_FromByteStream
+
+-- * "Serialization" to/from bytestrings
+
+-- | ByteStringSerial is just a reader of strict ByteStrings and writer of lazy
+-- ByteStrings. It's the simplest SerializationMethod possible
+newtype ByteStringSerial = ByteStringSerial { bsSerialSpecificExt :: Maybe FileExt }
+
+instance SerializationMethod ByteStringSerial where
+ getSerialDefaultExt (ByteStringSerial ext) = ext
+
+instance SerializesWith ByteStringSerial LBS.ByteString where
+ getSerialWriters (ByteStringSerial ext) = mempty
+ { _serialWritersToAtomic = toAtomicFn [ext] id }
+ -- TODO: Add base64 encoding so it can be read/written from/to JSON strings
+ -- too
+
+-- We only deserialize *strict* bytestrings, in order not to hide the fact that
+-- the data must be accumulated from the stream we read if you want to break
+-- away from it
+
+instance DeserializesWith ByteStringSerial BS.ByteString where
+ getSerialReaders (ByteStringSerial ext) = mempty
+ { _serialReadersFromAtomic =
+ fromAtomicFn [ext] Right
+ , _serialReadersFromStream =
+ fromStreamFn [ext] S.mconcat_ }
+
+-- * Serialization to/from plain text
+
+-- | Can read from text files or raw input strings in the pipeline configuration
+-- file. Should be used only for small files or input strings. If we should
+-- accept only some extension, specify it. Else just use Nothing.
+newtype PlainTextSerial = PlainTextSerial { plainTextSerialSpecificExt :: Maybe FileExt }
+
+instance SerializationMethod PlainTextSerial where
+ getSerialDefaultExt (PlainTextSerial ext) = ext
+
+instance SerializesWith PlainTextSerial T.Text where
+ getSerialWriters (PlainTextSerial ext) = mempty
+ { _serialWritersToAtomic =
+ toAtomicFn [Nothing] (\t -> LT.fromChunks [t]) -- To lazy text
+ <> toAtomicFn [ext] (\t -> LTE.encodeUtf8 $ LT.fromChunks [t]) -- To lazy bytestring
+ <> toAtomicFn [ext] toJSON -- To A.Value
+ }
+
+instance SerializesWith PlainTextSerial LT.Text where
+ getSerialWriters (PlainTextSerial ext) = mempty
+ { _serialWritersToAtomic =
+ toAtomicFn [Nothing] id -- To lazy text
+ <> toAtomicFn [ext] LTE.encodeUtf8 -- To lazy bytestring
+ <> toAtomicFn [ext] toJSON -- To A.Value
+ }
+
+instance DeserializesWith PlainTextSerial T.Text where
+ getSerialReaders (PlainTextSerial ext) = mempty
+ { _serialReadersFromAtomic =
+ fromAtomicFn [Nothing] Right
+ <> fromAtomicFn [ext] parseJSONEither
+ <> fromAtomicFn [ext] (Right . TE.decodeUtf8)
+ , _serialReadersFromStream =
+ fromStreamFn [ext] S.mconcat_
+ <>
+ fromStreamFn [ext] (fmap TE.decodeUtf8 . S.mconcat_)
+ }
+
+-- * Serialization of options
+
+-- | Contains any set of options that should be exposed via the CLI
+data RecOfOptions field where
+ RecOfOptions :: (Typeable rs, RecordUsableWithCLI rs) => Rec field rs -> RecOfOptions field
+
+type DocRecOfOptions = RecOfOptions DocField
+
+-- | A serialization method used for options which can have a default value,
+-- that can be exposed through the configuration.
+data OptionsSerial a = forall rs. (Typeable rs, RecordUsableWithCLI rs)
+ => OptionsSerial (a -> DocRec rs) (DocRec rs -> a)
+instance SerializationMethod (OptionsSerial a)
+instance SerializesWith (OptionsSerial a) a where
+ getSerialWriters (OptionsSerial f _) = mempty
+ { _serialWritersToAtomic =
+ toAtomicFn [Nothing] (RecOfOptions . f) }
+instance DeserializesWith (OptionsSerial a) a where
+ getSerialReaders (OptionsSerial _ (f :: DocRec rs -> a)) = mempty
+ { _serialReadersFromAtomic =
+ let conv :: DocRecOfOptions -> Either String a
+ conv (RecOfOptions r) = case cast r of
+ Just r' -> Right $ f r'
+ Nothing -> Left "OptionsSerial: _serialReadersFromAtomic: Not the right fields"
+ in fromAtomicFn [Nothing] conv }
+
+
+-- * Combining serializers and deserializers into one structure
+
+-- | Can serialize @a@ and deserialize @b@.
+data SerialsFor a b = SerialsFor
+ { _serialWriters :: SerialWriters a
+ , _serialReaders :: SerialReaders b
+ , _serialDefaultExt :: First FileExt
+ , _serialRepetitionKeys :: [LocVariable] }
+ deriving (Show)
+
+makeLenses ''SerialsFor
+
+-- | An equivaluent of 'Void', to avoid orphan instances
+data NoWrite
+
+instance (Monad m) => ContentHashable m NoWrite where
+ contentHashUpdate ctx _ = contentHashUpdate ctx ()
+
+-- | Just for symmetry with 'NoWrite'
+data NoRead = NoRead
+ deriving (Eq, Ord, Show)
+
+instance Semigroup NoRead where
+ _ <> _ = NoRead
+instance Monoid NoRead where
+ mempty = NoRead
+
+instance (Monad m) => ContentHashable m NoRead where
+ contentHashUpdate ctx _ = contentHashUpdate ctx ()
+
+-- | Can serialize and deserialize @a@. Use 'dimap' to transform it
+type BidirSerials a = SerialsFor a a
+
+-- | Can only serialize @a@. Use 'lmap' to transform it.
+type PureSerials a = SerialsFor a NoRead
+
+-- | Can only deserialize @a@. Use 'rmap' to transform it.
+type PureDeserials a = SerialsFor NoWrite a
+
+instance Profunctor SerialsFor where
+ lmap f (SerialsFor sers desers ext rk) = SerialsFor (contramap f sers) desers ext rk
+ rmap f (SerialsFor sers desers ext rk) = SerialsFor sers (fmap f desers) ext rk
+
+instance Semigroup (SerialsFor a b) where
+ SerialsFor s d ext rk <> SerialsFor s' d' ext' _ =
+ SerialsFor (s<>s') (d<>d') (ext<>ext') rk
+instance Monoid (SerialsFor a b) where
+ mempty = SerialsFor mempty mempty mempty []
+
+-- | Changes the serialization function used by default
+setDefaultSerial :: FileExt -> SerialsFor a b -> SerialsFor a b
+setDefaultSerial = set serialDefaultExt . First . Just
+
+-- | Packs together ways to serialize and deserialize some data @a@
+someBidirSerial :: (SerializesWith s a, DeserializesWith s a) => s -> BidirSerials a
+someBidirSerial s =
+ SerialsFor (getSerialWriters s) (getSerialReaders s) (First $ getSerialDefaultExt s) []
+
+makeBidir :: PureSerials a -> PureDeserials a -> BidirSerials a
+makeBidir (SerialsFor sers _ ext rk) (SerialsFor _ desers ext' _) =
+ SerialsFor sers desers (ext<>ext') rk
+
+-- | Packs together ways to serialize some data @a@
+somePureSerial :: (SerializesWith s a) => s -> PureSerials a
+somePureSerial s =
+ SerialsFor (getSerialWriters s) mempty (First $ getSerialDefaultExt s) []
+
+-- | Packs together ways to deserialize and deserialize some data @a@
+somePureDeserial :: (DeserializesWith s a) => s -> PureDeserials a
+somePureDeserial s =
+ SerialsFor mempty (getSerialReaders s) (First $ getSerialDefaultExt s) []
+
+eraseSerials :: SerialsFor a b -> PureDeserials b
+eraseSerials (SerialsFor _ desers ext rk) = SerialsFor mempty desers ext rk
+
+eraseDeserials :: SerialsFor a b -> PureSerials a
+eraseDeserials (SerialsFor sers _ ext rk) = SerialsFor sers mempty ext rk
+
+
+-- * Retrieve conversion functions from a 'SerialsFor' @a@ @b@
+
+-- | Tries to get a conversion function to some type @i@
+getToAtomicFn :: forall i a b. (Typeable i) => SerialsFor a b -> Maybe (a -> i)
+getToAtomicFn ser = do
+ ToAtomicFn (f :: a -> i') <-
+ HM.lookup (typeOf (undefined :: i),Nothing) (ser ^. serialWriters . serialWritersToAtomic)
+ case eqT :: Maybe (i' :~: i) of
+ Just Refl -> return f
+ Nothing -> error $ "getToAtomicFn: Some conversion function isn't properly indexed. Should not happen"
+
+-- | Tries to get a conversion function from some type @i@
+getFromAtomicFn :: forall i a b. (Typeable i) => SerialsFor a b -> Maybe (FromAtomicFn' i b)
+getFromAtomicFn ser = do
+ FromAtomicFn (f :: FromAtomicFn' i' b) <-
+ HM.lookup (typeOf (undefined :: i),Nothing) (ser ^. serialReaders . serialReadersFromAtomic)
+ case eqT :: Maybe (i' :~: i) of
+ Just Refl -> return f
+ Nothing -> error $ "getFromAtomicFn: Some conversion function isn't properly indexed. Should not happen"
+
+
+-- * Serialization for compressed formats
+
+-- | Wraps all the functions in the serial so for each serial (extension) @xxx@
+-- supported, we know also support @xxxzlib@. Doesn't change the default
+-- extension
+addZlibSerials :: SerialsFor a b -> SerialsFor a b
+addZlibSerials = over serialWriters (over serialWritersToAtomic editTA)
+ . over serialReaders (over serialReadersFromAtomic editFA
+ . over serialReadersFromStream editFS)
+ where
+ editTA hm = (hm <>) $ mconcat $ flip map (allToAtomicFnsWithType hm) $
+ \(ext, f) ->
+ toAtomicFn [Just $ ext <> "zlib"] $ Zlib.compress . f -- Lazy bytestring
+ editFA hm = (hm <>) $ mconcat $ flip map (allFromAtomicFnsWithType hm) $
+ \(ext, f) ->
+ fromAtomicFn [Just $ ext <> "zlib"] $
+ f . LBS.toStrict . Zlib.decompress . LBS.fromStrict -- Strict bytestring
+ editFS hm = (hm <>) $ mconcat $ flip map (allFromStreamFnsWithType hm) $
+ \(ext, FromStreamFn'' f) ->
+ fromStreamFn [Just $ ext <> "zlib"] $
+ f . BSS.toChunks . SZip.decompress SZip.defaultWindowBits . BSS.fromChunks
+
+-- | Adds warnings when deserializing values /from a stream/
+addDeserialWarnings :: (b -> [String]) -> SerialsFor a b -> SerialsFor a b
+addDeserialWarnings f = serialReaders . serialReadersFromStream . traversed %~ addW
+ where
+ addW (FromStreamFn g) = FromStreamFn $ \s -> do
+ a <- g s
+ let warnings = f a
+ mapM_ (logFM WarningS . logStr) warnings
+ return a
+
+
+-- -- | Traverses to the repetition keys stored in the access functions of a
+-- -- 'SerialsFor'
+-- serialsRepetitionKeys :: Traversal' (SerialsFor a b) [LocVariable]
+-- serialsRepetitionKeys f (SerialsFor writers readers ext rk) =
+-- rebuild <$> (serialWritersToOutputFile . traversed . writeToLocRepetitionKeys) f writers
+-- <*> (serialReadersFromInputFile . traversed . readFromLocRepetitionKeys) f readers
+-- where
+-- rebuild w r = SerialsFor w r ext rk
diff --git a/src/Data/Locations/VirtualFile.hs b/src/Data/Locations/VirtualFile.hs
new file mode 100644
index 0000000..f2d23cf
--- /dev/null
+++ b/src/Data/Locations/VirtualFile.hs
@@ -0,0 +1,394 @@
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE GADTs #-}
+{-# LANGUAGE Rank2Types #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE TemplateHaskell #-}
+{-# LANGUAGE TypeOperators #-}
+{-# LANGUAGE DataKinds #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+
+module Data.Locations.VirtualFile
+ ( LocationTreePathItem
+ , module Data.Locations.SerializationMethod
+ , Profunctor(..)
+ , VirtualFile(..), LayeredReadScheme(..)
+ , BidirVirtualFile, DataSource, DataSink
+ , VFileIntent(..), VFileDescription(..)
+ , RecOfOptions(..)
+ , VFileImportance(..)
+ , Cacher(..)
+ , vfileSerials
+ , vfileAsBidir, vfileImportance
+ , vfileEmbeddedValue
+ , getConvertedEmbeddedValue, setConvertedEmbeddedValue
+ , tryMergeLayersForVFile
+ , vfileOriginalPath, showVFileOriginalPath
+ , vfileLayeredReadScheme
+ , vfileVoided
+ , vfiReadSuccess, vfiWriteSuccess, vfiError
+ , dataSource, dataSink, bidirVirtualFile
+ , makeSink, makeSource
+ , documentedFile
+ , withEmbeddedValue
+ , usesLayeredMapping, canBeUnmapped, unmappedByDefault
+ , usesCacherWithIdent
+ , getVFileDescription
+ , describeVFileAsSourceSink, describeVFileExtensions, describeVFileTypes
+ , describeVFileAsRecOfOptions
+ , clockVFileAccesses
+ , defaultCacherWithIdent
+ ) where
+
+import Control.Funflow
+import Control.Funflow.ContentHashable
+import Control.Lens
+import Data.Aeson (Value)
+import Data.Default
+import Data.DocRecord
+import qualified Data.HashMap.Strict as HM
+import qualified Data.HashSet as HS
+import Data.List (intersperse)
+import Data.List.NonEmpty (NonEmpty (..))
+import Data.Locations.Accessors
+import Data.Locations.Loc
+import Data.Locations.LocationTree
+import Data.Locations.LocVariable
+import Data.Locations.Mappings (HasDefaultMappingRule (..),
+ LocShortcut (..))
+import Data.Locations.SerializationMethod
+import Data.Maybe
+import Data.Monoid (First (..))
+import Data.Profunctor (Profunctor (..))
+import Data.Representable
+import Data.Semigroup (sconcat)
+import Data.Store (Store)
+import qualified Data.Text as T
+import Data.Type.Equality
+import Data.Typeable
+import Katip
+
+
+-- * The general 'VirtualFile' type
+
+-- | Tells how the file is meant to be read
+data LayeredReadScheme b where
+ SingleLayerRead :: LayeredReadScheme b
+ -- No layered reading accepted
+ LayeredRead :: Semigroup b => LayeredReadScheme b
+ -- A layered reading combining all the layers with (<>)
+ LayeredReadWithNull :: Monoid b => LayeredReadScheme b
+ -- Like 'LayeredRead', and handles mapping to no layer (mempty)
+
+-- | Tells how the accesses to this 'VirtualFile' should be logged
+data VFileImportance = VFileImportance
+ { _vfiReadSuccess :: Severity
+ , _vfiWriteSuccess :: Severity
+ , _vfiError :: Severity
+ , _vfiClockAccess :: Bool }
+ deriving (Show)
+
+makeLenses ''VFileImportance
+
+instance Default VFileImportance where
+ def = VFileImportance InfoS NoticeS ErrorS False
+
+-- | A virtual file in the location tree to which we can write @a@ and from
+-- which we can read @b@.
+data VirtualFile a b = VirtualFile
+ { _vfileOriginalPath :: [LocationTreePathItem]
+ , _vfileLayeredReadScheme :: LayeredReadScheme b
+ , _vfileEmbeddedValue :: Maybe b
+ , _vfileMappedByDefault :: Bool
+ , _vfileImportance :: VFileImportance
+ , _vfileDocumentation :: Maybe T.Text
+ , _vfileWriteCacher :: Cacher (a, Either String SomeHashableLocs) ()
+ , _vfileReadCacher :: Cacher (Either String SomeHashableLocs) b
+ , _vfileSerials :: SerialsFor a b }
+
+makeLenses ''VirtualFile
+
+-- How we derive the default configuration for mapping some VirtualFile
+instance HasDefaultMappingRule (VirtualFile a b) where
+ getDefaultLocShortcut vf = if vf ^. vfileMappedByDefault
+ then Just $
+ case vf ^? vfileSerials . serialRepetitionKeys . filtered (not . null) of
+ Nothing -> DeriveWholeLocFromTree defExt
+ -- LIMITATION: For now we suppose that every reading/writing function in
+ -- the serials has the same repetition keys
+ Just rkeys -> DeriveLocPrefixFromTree $
+ let toVar rkey = SoV_Variable rkey
+ locStr = StringWithVars $ (SoV_String "-")
+ : intersperse (SoV_String "-") (map toVar rkeys)
+ in PathWithExtension locStr $ T.unpack defExt
+ else Nothing
+ where
+ defExt =
+ case vf ^. vfileSerials . serialDefaultExt of
+ First (Just ext) -> ext
+ _ -> T.pack ""
+
+-- For now, given the requirement of PTask, VirtualFile has to be a Monoid
+-- because a VirtualTree also has to.
+instance Semigroup (VirtualFile a b) where
+ VirtualFile p l v m i d wc rc s <> VirtualFile _ _ _ _ _ _ _ _ s' =
+ VirtualFile p l v m i d wc rc (s<>s')
+instance Monoid (VirtualFile a b) where
+ mempty = VirtualFile [] SingleLayerRead Nothing True def Nothing NoCache NoCache mempty
+
+-- | The Profunctor instance is forgetful, it forgets about the mapping scheme
+-- and the caching properties.
+instance Profunctor VirtualFile where
+ dimap f g (VirtualFile p _ v m i d _ _ s) =
+ VirtualFile p SingleLayerRead (g <$> v) m i d NoCache NoCache $ dimap f g s
+
+
+-- * Obtaining a description of how the 'VirtualFile' should be used
+
+-- | Describes how a virtual file is meant to be used
+data VFileIntent =
+ VFForWriting | VFForReading | VFForRW | VFForCLIOptions
+ deriving (Show, Eq)
+
+-- | Gives the purpose of the 'VirtualFile'. Used to document the pipeline and check
+-- mappings to physical files.
+data VFileDescription = VFileDescription
+ { vfileDescIntent :: Maybe VFileIntent
+ -- ^ How is the 'VirtualFile' meant to be used
+ , vfileDescEmbeddableInConfig :: Bool
+ -- ^ True if the data can be read directly from the
+ -- pipeline's config file
+ , vfileDescEmbeddableInOutput :: Bool
+ -- ^ True if the data can be written directly in the
+ -- pipeline's output location tree
+ , vfileDescPossibleExtensions :: [FileExt]
+ -- ^ Possible extensions for the files this virtual file
+ -- can be mapped to (prefered extension is the first)
+ } deriving (Show)
+
+-- | Gives a 'VirtualFileDescription'. To be used on files stored in the
+-- VirtualTree.
+getVFileDescription :: VirtualFile a b -> VFileDescription
+getVFileDescription vf =
+ VFileDescription intent readableFromConfig writableInOutput exts
+ where
+ (SerialsFor
+ (SerialWriters toA)
+ (SerialReaders fromA fromS)
+ prefExt
+ _) = _vfileSerials vf
+ intent
+ | HM.null fromA && HM.null fromS && HM.null toA = Nothing
+ | HM.null fromA && HM.null fromS = Just VFForWriting
+ | HM.null toA = Just VFForReading
+ | Just _ <- vf ^. vfileEmbeddedValue = Just VFForCLIOptions
+ | otherwise = Just VFForRW
+ extSet = HS.fromList . mapMaybe snd . HM.keys
+ otherExts = extSet toA <> extSet fromA <> extSet fromS
+ exts = case prefExt of
+ First (Just e) -> e:(HS.toList $ HS.delete e otherExts)
+ _ -> HS.toList otherExts
+ typeOfAesonVal = typeOf (undefined :: Value)
+ readableFromConfig = (typeOfAesonVal,Nothing) `HM.member` fromA
+ writableInOutput = (typeOfAesonVal,Nothing) `HM.member` toA
+
+describeVFileAsSourceSink :: VirtualFile a b -> String
+describeVFileAsSourceSink vf =
+ sourceSink
+ ++ (if vfileDescEmbeddableInConfig vfd then " (embeddable)" else "")
+ ++ (case vf ^. vfileSerials.serialRepetitionKeys of
+ [] -> ""
+ lvs -> " repeated over " ++ concat
+ (intersperse ", " (map (("\""++) . (++"\"") . unLocVariable) lvs)))
+ where
+ sourceSink = case vfileDescIntent vfd of
+ Nothing -> ""
+ Just i -> case i of
+ VFForWriting -> "DATA SINK"
+ VFForReading -> "DATA SOURCE"
+ VFForRW -> "BIDIR VFILE"
+ VFForCLIOptions -> "OPTION SOURCE"
+ vfd = getVFileDescription vf
+
+describeVFileAsRecOfOptions :: (Typeable a, Typeable b) => VirtualFile a b -> Int -> String
+describeVFileAsRecOfOptions vf charLimit =
+ case (vf ^? vfileAsBidir) >>= getConvertedEmbeddedValue of
+ Just (RecOfOptions record :: DocRecOfOptions) ->
+ "\n--- Fields ---\n" ++ T.unpack (showDocumentation charLimit record)
+ _ -> ""
+
+describeVFileExtensions :: VirtualFile a b -> String
+describeVFileExtensions vf =
+ "Accepts " ++ T.unpack (T.intercalate (T.pack ", ") (vfileDescPossibleExtensions vfd))
+ where vfd = getVFileDescription vf
+
+describeVFileTypes :: forall a b. (Typeable a, Typeable b) => VirtualFile a b -> Int -> String
+describeVFileTypes _ charLimit
+ | a == b = "Receives & emits: " ++ cap (show a)
+ | b == typeOf (undefined :: NoRead) = "Receives " ++ cap (show a)
+ | a == typeOf (undefined :: NoWrite) = "Emits " ++ cap (show b)
+ | otherwise = "Receives " ++ cap (show a) ++ " & emits " ++ cap (show b)
+ where
+ cap x | length x >= charLimit = take charLimit x ++ "..."
+ | otherwise = x
+ a = typeOf (undefined :: a)
+ b = typeOf (undefined :: b)
+
+-- | Just for logs and error messages
+showVFileOriginalPath :: VirtualFile a b -> String
+showVFileOriginalPath = T.unpack . toTextRepr . LTP . _vfileOriginalPath
+
+-- | Embeds a value inside the 'VirtualFile'. This value will be considered the
+-- base layer if we read extra @b@'s from external physical files.
+withEmbeddedValue :: b -> VirtualFile a b -> VirtualFile a b
+withEmbeddedValue = set vfileEmbeddedValue . Just
+
+-- | Indicates that the file uses layered mapping
+usesLayeredMapping :: (Semigroup b) => VirtualFile a b -> VirtualFile a b
+usesLayeredMapping =
+ vfileLayeredReadScheme .~ LayeredRead
+
+-- | Indicates that the file uses layered mapping, and additionally can be left
+-- unmapped (ie. mapped to null)
+canBeUnmapped :: (Monoid b) => VirtualFile a b -> VirtualFile a b
+canBeUnmapped =
+ vfileLayeredReadScheme .~ LayeredReadWithNull
+
+-- | Indicates that the file should be mapped to null by default
+unmappedByDefault :: (Monoid b) => VirtualFile a b -> VirtualFile a b
+unmappedByDefault =
+ (vfileLayeredReadScheme .~ LayeredReadWithNull)
+ . (vfileMappedByDefault .~ False)
+
+-- | Gives a documentation to the 'VirtualFile'
+documentedFile :: T.Text -> VirtualFile a b -> VirtualFile a b
+documentedFile doc = vfileDocumentation .~ Just doc
+
+-- | Sets the file's reads and writes to be cached. Useful if the file is bound
+-- to a source/sink that takes time to respond, such as an HTTP endpoint, or
+-- that uses an expensive text serialization method (like JSON or XML).
+usesCacherWithIdent :: (ContentHashable Identity a, Store b)
+ => Int -> VirtualFile a b -> VirtualFile a b
+usesCacherWithIdent ident =
+ (vfileWriteCacher .~ defaultCacherWithIdent ident)
+ . (vfileReadCacher .~ defaultCacherWithIdent ident)
+
+-- * Creating VirtualFiles and convertings between its different subtypes (bidir
+-- files, sources and sinks)
+
+-- | A virtual file which depending on the situation can be written or read
+type BidirVirtualFile a = VirtualFile a a
+
+-- | A virtual file that's only readable
+type DataSource a = VirtualFile NoWrite a
+
+-- | A virtual file that's only writable
+type DataSink a = VirtualFile a NoRead
+
+-- | Creates a virtuel file from its virtual path and ways serialize/deserialize
+-- the data. You should prefer 'dataSink' and 'dataSource' for clarity when the
+-- file is meant to be readonly or writeonly.
+virtualFile :: [LocationTreePathItem] -> SerialsFor a b -> VirtualFile a b
+virtualFile path sers = VirtualFile path SingleLayerRead Nothing True def Nothing NoCache NoCache sers
+
+-- | Creates a virtual file from its virtual path and ways to deserialize the
+-- data.
+dataSource :: [LocationTreePathItem] -> SerialsFor a b -> DataSource b
+dataSource path = makeSource . virtualFile path
+
+-- | Creates a virtual file from its virtual path and ways to serialize the
+-- data.
+dataSink :: [LocationTreePathItem] -> SerialsFor a b -> DataSink a
+dataSink path = makeSink . virtualFile path
+
+-- | Like 'virtualFile', but constrained to bidirectional serials, for clarity
+bidirVirtualFile :: [LocationTreePathItem] -> BidirSerials a -> BidirVirtualFile a
+bidirVirtualFile = virtualFile
+
+-- | Turns the 'VirtualFile' into a pure source
+makeSource :: VirtualFile a b -> DataSource b
+makeSource vf = vf{_vfileSerials=eraseSerials $ _vfileSerials vf
+ ,_vfileWriteCacher=NoCache}
+
+-- | Turns the 'VirtualFile' into a pure sink
+makeSink :: VirtualFile a b -> DataSink a
+makeSink vf = vf{_vfileSerials=eraseDeserials $ _vfileSerials vf
+ ,_vfileLayeredReadScheme=LayeredReadWithNull
+ ,_vfileReadCacher=NoCache
+ ,_vfileEmbeddedValue=Nothing}
+
+
+-- * Traversals to the content of the VirtualFile, when it already embeds some
+-- value
+
+-- | If we have the internal proof that a VirtualFile is actually bidirectional,
+ -- we convert it.
+vfileAsBidir :: forall a b. (Typeable a, Typeable b)
+ => Traversal' (VirtualFile a b) (BidirVirtualFile a)
+vfileAsBidir f vf = case eqT :: Maybe (a :~: b) of
+ Just Refl -> f vf
+ Nothing -> pure vf
+
+-- | Gives access to a version of the VirtualFile without type params. The
+-- original path isn't settable.
+vfileVoided :: Lens' (VirtualFile a b) (VirtualFile NoWrite NoRead)
+vfileVoided f (VirtualFile p l v m i d wc rc s) =
+ rebuild <$> f (VirtualFile p SingleLayerRead Nothing m i d NoCache NoCache mempty)
+ where
+ rebuild (VirtualFile _ _ _ m' i' d' _ _ _) =
+ VirtualFile p l v m' i' d' wc rc s
+
+-- | If the 'VirtualFile' has an embedded value convertible to type @i@, we get
+-- it.
+getConvertedEmbeddedValue
+ :: (Typeable i)
+ => BidirVirtualFile a
+ -> Maybe i
+getConvertedEmbeddedValue vf = do
+ toA <- getToAtomicFn (vf ^. vfileSerials)
+ toA <$> vf ^. vfileEmbeddedValue
+
+-- | If the 'VirtualFile' can hold a embedded value of type @a@ that's
+-- convertible from type @i@, we set it. Note that the conversion may fail, we
+-- return Left if the VirtualFile couldn't be set.
+setConvertedEmbeddedValue
+ :: forall a b i. (Typeable i)
+ => VirtualFile a b
+ -> i
+ -> Either String (VirtualFile a b)
+setConvertedEmbeddedValue vf i =
+ case getFromAtomicFn (vf ^. vfileSerials) of
+ Nothing -> Left $ showVFileOriginalPath vf ++
+ ": no conversion function is available to transform type " ++ show (typeOf (undefined :: i))
+ Just fromA -> do
+ i' <- fromA i
+ return $ vf & vfileEmbeddedValue .~ Just i'
+
+-- | Tries to convert each @i@ layer to and from type @b@ and find a
+-- Monoid/Semigroup instance for @b@ in the vfileLayeredReadScheme, so we can
+-- merge these layers. So if we have more that one layer, this will fail if the
+-- file doesn't use LayeredRead.
+tryMergeLayersForVFile
+ :: forall a b i. (Typeable i)
+ => VirtualFile a b
+ -> [i]
+ -> Either String b
+tryMergeLayersForVFile vf layers = let ser = vf ^. vfileSerials in
+ case getFromAtomicFn ser of
+ Nothing -> Left $ showVFileOriginalPath vf ++
+ ": no conversion functions are available to transform back and forth type "
+ ++ show (typeOf (undefined :: i))
+ Just fromA -> do
+ case (layers, vf^.vfileLayeredReadScheme) of
+ ([], LayeredReadWithNull) -> return mempty
+ ([], _) -> Left $ "tryMergeLayersForVFile: " ++ showVFileOriginalPath vf
+ ++ " doesn't support mapping to no layers"
+ ([x], _) -> fromA x
+ (x:xs, LayeredRead) -> sconcat <$> traverse fromA (x:|xs)
+ (xs, LayeredReadWithNull) -> mconcat <$> traverse fromA xs
+ (_, _) -> Left $ "tryMergeLayersForVFile: " ++ showVFileOriginalPath vf
+ ++ " cannot use several layers of data"
+
+-- | Sets vfileImportance . vfiClockAccess to True. This way each access to the
+-- file will be clocked and logged.
+clockVFileAccesses :: VirtualFile a b -> VirtualFile a b
+clockVFileAccesses = vfileImportance . vfiClockAccess .~ True
diff --git a/src/Data/Representable.hs b/src/Data/Representable.hs
new file mode 100644
index 0000000..8109179
--- /dev/null
+++ b/src/Data/Representable.hs
@@ -0,0 +1,24 @@
+{-# LANGUAGE OverloadedStrings #-}
+
+module Data.Representable where
+
+import Control.Applicative
+import Data.Text
+
+-- | A class for small objects that can be printed and read back from Text.
+--
+-- > fromTextRepr . toTextRepr == pure
+--
+class Representable a where
+ toTextRepr :: a -> Text
+ fromTextRepr :: (Alternative f) => Text -> f a
+
+instance (Representable a) => Representable (Maybe a) where
+ toTextRepr Nothing = ""
+ toTextRepr (Just x) = toTextRepr x
+ fromTextRepr "" = pure Nothing
+ fromTextRepr t = Just <$> fromTextRepr t
+
+instance Representable Text where
+ toTextRepr = id
+ fromTextRepr = pure
diff --git a/src/Porcupine.hs b/src/Porcupine.hs
new file mode 100644
index 0000000..41b081f
--- /dev/null
+++ b/src/Porcupine.hs
@@ -0,0 +1,15 @@
+-- | The top level module for porcupine. Suitable for examples. Prefer
+-- selectively importing the needed Porcupine.* modules in bigger applications
+-- that define serials and tasks in separate files.
+
+module Porcupine
+ ( module Porcupine.Serials
+ , module Porcupine.VFiles
+ , module Porcupine.Tasks
+ , module Porcupine.Run )
+where
+
+import Porcupine.Serials
+import Porcupine.VFiles
+import Porcupine.Tasks
+import Porcupine.Run
diff --git a/src/Porcupine/Foldl.hs b/src/Porcupine/Foldl.hs
new file mode 100644
index 0000000..8cf5e59
--- /dev/null
+++ b/src/Porcupine/Foldl.hs
@@ -0,0 +1,13 @@
+-- | This module implements a Foldl-based interface for arrow computations
+-- compatible with the <https://hackage.haskell.org/package/foldl foldl
+-- library>. Use 'generalizeA' and 'generalizeM' to convert folds to
+-- 'FoldA'. This is the most general way to repeat a PTask over some an input
+-- (list, array, stream, etc.).
+--
+-- This API is still experimental and might be subject to changes in the future
+
+module Porcupine.Foldl
+ ( module System.TaskPipeline.Repetition.Foldl )
+where
+
+import System.TaskPipeline.Repetition.Foldl
diff --git a/src/Porcupine/Run.hs b/src/Porcupine/Run.hs
new file mode 100644
index 0000000..b52cb7c
--- /dev/null
+++ b/src/Porcupine/Run.hs
@@ -0,0 +1,5 @@
+module Porcupine.Run
+ ( module System.TaskPipeline.Run )
+ where
+
+import System.TaskPipeline.Run
diff --git a/src/Porcupine/Serials.hs b/src/Porcupine/Serials.hs
new file mode 100644
index 0000000..ee08632
--- /dev/null
+++ b/src/Porcupine/Serials.hs
@@ -0,0 +1,7 @@
+module Porcupine.Serials
+ ( module Data.Locations.SerializationMethod
+ , Profunctor(..) )
+where
+
+import Data.Locations.SerializationMethod
+import Data.Profunctor
diff --git a/src/Porcupine/Tasks.hs b/src/Porcupine/Tasks.hs
new file mode 100644
index 0000000..b7dc487
--- /dev/null
+++ b/src/Porcupine/Tasks.hs
@@ -0,0 +1,5 @@
+module Porcupine.Tasks
+ ( module System.TaskPipeline )
+ where
+
+import System.TaskPipeline
diff --git a/src/Porcupine/VFiles.hs b/src/Porcupine/VFiles.hs
new file mode 100644
index 0000000..1b5d15d
--- /dev/null
+++ b/src/Porcupine/VFiles.hs
@@ -0,0 +1,15 @@
+module Porcupine.VFiles
+ ( VirtualFile(..), VFileImportance(..)
+ , BidirVirtualFile, DataSource, DataSink
+ , LocationTreePathItem
+ , Store
+ , documentedFile
+ , usesLayeredMapping, canBeUnmapped, unmappedByDefault
+ , usesCacherWithIdent
+ , clockVFileAccesses
+ , bidirVirtualFile, dataSource, dataSink
+ , makeSource, makeSink )
+where
+
+import Data.Locations.VirtualFile
+import Data.Store
diff --git a/src/Streaming/TaskPipelineUtils.hs b/src/Streaming/TaskPipelineUtils.hs
new file mode 100644
index 0000000..5011bb3
--- /dev/null
+++ b/src/Streaming/TaskPipelineUtils.hs
@@ -0,0 +1,157 @@
+{-# LANGUAGE DeriveGeneric #-}
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE LambdaCase #-}
+{-# LANGUAGE RankNTypes #-}
+{-# LANGUAGE TemplateHaskell #-}
+{-# LANGUAGE TypeFamilies #-}
+{-# LANGUAGE TypeOperators #-}
+
+module Streaming.TaskPipelineUtils
+ ( module S
+ , Of(..)
+ , MonadTrans(..)
+ , MonadIO(..)
+ , StreamFilter(..)
+ , Copy(..)
+ , (&)
+ , S.mapM_
+ , asConduit
+ , intoSink
+ , streamFolder
+ , streamFolderRel
+ , mapCopy
+ , hoistCopy
+ , With(..), elt, ann
+ , StreamWith
+ , mapStreamW
+ , mapStreamWM )
+ where
+
+import Control.Lens hiding ((:>))
+import Control.Monad (forM_)
+import Control.Monad.IO.Class
+import Data.Aeson
+import Data.Conduit (ConduitT, Void, runConduit, (.|))
+import Data.Function ((&))
+import GHC.Generics
+import Streaming
+import Streaming.Conduit (asConduit, fromStreamSource)
+import qualified Streaming.Prelude as S
+import System.Directory (doesDirectoryExist,
+ getDirectoryContents)
+import System.FilePath ((</>))
+
+
+intoSink :: Monad m => ConduitT a Void m b -> Stream (Of a) m r -> m b
+intoSink snk src = runConduit $ fromStreamSource src .| snk
+
+-- TODO: rename to streamFolderRecursive
+streamFolder :: (MonadIO m) => FilePath -> Stream (Of FilePath) m ()
+streamFolder topPath = S.map (topPath </>) $ streamFolderRel topPath
+
+-- TODO: rename to streamFolderRecursiveRel
+streamFolderRel :: (MonadIO m) => FilePath -> Stream (Of FilePath) m ()
+streamFolderRel topPath =
+ aux ""
+ where
+ aux relPath = do
+ names <- liftIO $ getDirectoryContents (topPath </> relPath)
+ let properNames = filter (`notElem` [".", ".."]) names
+ forM_ properNames $ \name -> do
+ let path = relPath </> name
+ isDirectory <- liftIO $ doesDirectoryExist path
+ if isDirectory
+ then aux path
+ else S.yield path
+
+-- | Generalizes 'partitionEithers' from Streaming.Prelude to fork a stream into
+-- several substreams
+-- TODO: This language of filters needs an introduction.
+-- What problem do they solve? Why bare streams aren't suitable for the task?
+class StreamFilter s where
+ type Wanted s :: *
+ type Split s (m :: * -> *) :: * -> *
+ filters :: (Monad m) => s -> Stream (Of (Wanted s)) m r -> Split s m r
+
+instance StreamFilter (a -> Bool) where
+ type Wanted (a -> Bool) = a
+ type Split (a -> Bool) m = Stream (Of a) m
+ filters = S.filter
+
+instance StreamFilter (a -> Maybe b) where
+ type Wanted (a -> Maybe b) = a
+ type Split (a -> Maybe b) m = Stream (Of b) m
+ filters = S.mapMaybe
+
+instance StreamFilter (a -> Either b c) where
+ type Wanted (a -> Either b c) = a
+ type Split (a -> Either b c) m = Stream (Of b) (Stream (Of c) m)
+ filters f = S.partitionEithers . S.map f
+
+instance (StreamFilter s', Wanted s' ~ a) => StreamFilter (Of (a -> Bool) s') where
+ type Wanted (Of (a -> Bool) s') = a
+ type Split (Of (a -> Bool) s') m = Stream (Of a) (Split s' m)
+ filters (f :> s') = hoist (filters s') . S.partition f
+
+instance (StreamFilter s', Wanted s' ~ a) => StreamFilter (Of (a -> Maybe b) s') where
+ type Wanted (Of (a -> Maybe b) s') = a
+ type Split (Of (a -> Maybe b) s') m = Stream (Of b) (Split s' m)
+ filters (f :> s') = filters (f' :> s')
+ where f' a = case f a of
+ Just b -> Left b
+ Nothing -> Right a
+
+instance (StreamFilter s', Wanted s' ~ c) => StreamFilter (Of (a -> Either b c) s') where
+ type Wanted (Of (a -> Either b c) s') = a
+ type Split (Of (a -> Either b c) s') m = Stream (Of b) (Split s' m)
+ filters (f :> s') = hoist (filters s') . S.partitionEithers . S.map f
+
+data Copy = Copy
+
+instance (StreamFilter s') => StreamFilter (Of Copy s') where
+ type Wanted (Of Copy s') = Wanted s'
+ type Split (Of Copy s') m = Stream (Of (Wanted s')) (Split s' m)
+ filters (Copy :> s') = hoist (filters s') . S.copy
+
+-- | Copies stream elements to a layer underneath after applying a function on
+-- them
+-- TODO: Motivate.
+mapCopy
+ :: Monad m
+ => (a -> b) -> Stream (Of a) (Stream (Of b) m) r -> Stream (Of a) (Stream (Of b) m) r
+mapCopy f stream = S.for stream $ \x -> do
+ S.yield x
+ lift $ S.yield $ f x
+
+-- | A version of mapCopy that takes the whole substream of copied values and
+-- merges it downwards
+-- TODO: Motivate.
+hoistCopy
+ :: (Monad m)
+ => (forall n s. (Monad n) => Stream (Of a) n s -> Stream (Of b) n s)
+ -> Stream (Of a) (Stream (Of b) m) r -> Stream (Of a) (Stream (Of b) m) r
+hoistCopy g stream =
+ S.copy stream & hoist (S.effects . flip S.for (lift . S.yield) . g)
+
+-- | Just a simple tuple to annotate stream elements. It is strict in the
+-- annotation.
+-- TODO: Motivate. How is this better than a primitive pair?
+data t `With` ann = With { _ann :: !ann, _elt :: t }
+ deriving (Eq, Generic)
+
+makeLenses ''With
+
+instance (ToJSON t, ToJSON ann) => ToJSON (t `With` ann)
+instance (FromJSON t, FromJSON ann) => FromJSON (t `With` ann)
+
+type StreamWith id a = Stream (Of (a `With` id))
+
+-- TODO: Maybe this function can be defined more generally when the stream
+-- elements are functors.
+mapStreamW :: Monad m => (a -> b) -> StreamWith ann a m r -> StreamWith ann b m r
+mapStreamW f = S.map $ \case
+ With pid a -> With pid (f a)
+
+mapStreamWM :: Monad m => (a -> m b) -> StreamWith ann a m r -> StreamWith ann b m r
+mapStreamWM f = S.mapM $ \case
+ With pid a -> With pid <$> f a
diff --git a/src/System/ClockHelpers.hs b/src/System/ClockHelpers.hs
new file mode 100644
index 0000000..244e221
--- /dev/null
+++ b/src/System/ClockHelpers.hs
@@ -0,0 +1,32 @@
+{-# OPTIONS_GHC -fno-warn-orphans #-}
+
+module System.ClockHelpers
+ ( Clock(..)
+ , TimeSpec(..)
+ , diffTimeSpec
+ , getTime
+ , showTimeSpec
+ , clockM
+ ) where
+
+import Control.Monad.IO.Class
+import Data.Aeson
+import Katip
+import System.Clock
+
+
+clockM :: (MonadIO m) => m a -> m (a, TimeSpec)
+clockM act = f <$> time <*> act <*> time
+ where
+ time = liftIO $ getTime Realtime
+ f start res end = (res, end `diffTimeSpec` start)
+
+showTimeSpec :: TimeSpec -> String
+showTimeSpec ts = show (fromIntegral (toNanoSecs ts) / (10**9) :: Double) ++ "s"
+
+-- To be able to use TimeSpec as part of katip contexts (katipAddContext)
+instance ToJSON TimeSpec
+instance ToObject TimeSpec
+instance LogItem TimeSpec where
+ payloadKeys v _ | v >= V1 = AllKeys
+ | otherwise = SomeKeys []
diff --git a/src/System/TaskPipeline.hs b/src/System/TaskPipeline.hs
new file mode 100644
index 0000000..ec49546
--- /dev/null
+++ b/src/System/TaskPipeline.hs
@@ -0,0 +1,15 @@
+module System.TaskPipeline
+ ( module System.TaskPipeline.PTask
+ , module System.TaskPipeline.PorcupineTree
+ , module System.TaskPipeline.VirtualFileAccess
+ , module System.TaskPipeline.Options
+ , module System.TaskPipeline.Repetition
+ , module Data.Locations.LogAndErrors
+ ) where
+
+import Data.Locations.LogAndErrors
+import System.TaskPipeline.Options
+import System.TaskPipeline.PTask
+import System.TaskPipeline.Repetition
+import System.TaskPipeline.PorcupineTree
+import System.TaskPipeline.VirtualFileAccess
diff --git a/src/System/TaskPipeline/CLI.hs b/src/System/TaskPipeline/CLI.hs
new file mode 100644
index 0000000..d5d71f4
--- /dev/null
+++ b/src/System/TaskPipeline/CLI.hs
@@ -0,0 +1,437 @@
+{-# LANGUAGE ExistentialQuantification #-}
+{-# LANGUAGE GADTs #-}
+{-# LANGUAGE LambdaCase #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE RankNTypes #-}
+{-# LANGUAGE TemplateHaskell #-}
+{-# LANGUAGE TupleSections #-}
+{-# OPTIONS_GHC -Wall #-}
+
+module System.TaskPipeline.CLI
+ ( module System.TaskPipeline.ConfigurationReader
+ , PipelineCommand(..)
+ , PipelineConfigMethod(..)
+ , BaseInputConfig(..)
+ , PostParsingAction(..)
+ , PreRun(..)
+ , ConfigFileSource(..)
+ , tryReadConfigFileSource
+ , cliYamlParser
+ , execCliParser
+ , withCliParser
+ , pipelineCliParser
+
+ , pipelineConfigMethodProgName
+ , pipelineConfigMethodDefRoot
+ , pipelineConfigMethodConfigFile
+ , pipelineConfigMethodDefReturnVal
+ , withConfigFileSourceFromCLI
+ ) where
+
+import Control.Lens hiding (argument)
+import Control.Monad.IO.Class
+import Data.Aeson as A
+import qualified Data.Aeson.Encode.Pretty as A
+import qualified Data.ByteString as BS
+import qualified Data.ByteString.Lazy as LBS
+import Data.Char (toLower)
+import qualified Data.HashMap.Lazy as HashMap
+import Data.Locations
+import Data.Maybe
+import Data.Representable
+import qualified Data.Text as T
+import qualified Data.Text.Encoding as T
+import qualified Data.Yaml as Y
+import Katip
+import Options.Applicative
+import System.Directory
+import System.Environment (getArgs, withArgs)
+import System.IO (stdin)
+import System.TaskPipeline.ConfigurationReader
+import System.TaskPipeline.Logger
+import System.TaskPipeline.PorcupineTree (PhysicalFileNodeShowOpts(..))
+
+
+-- | The command to parse from the CLI
+data PipelineCommand
+ = RunPipeline
+ | ShowTree LocationTreePath PhysicalFileNodeShowOpts
+
+-- | Tells whether and how command-line options should be used. @r@ is the type
+-- of the result of the PTask that this PipelineConfigMethod will be use for
+-- (see @runPipelineTask@). _pipelineConfigMethodDefReturnVal will be used as a
+-- return value for the pipeline eg. when the user just wants to write the
+-- config template, and not run the pipeline.
+data PipelineConfigMethod r
+ = NoConfig { _pipelineConfigMethodProgName :: String
+ , _pipelineConfigMethodDefRoot :: Loc }
+ -- ^ No CLI and no config file are used, in which case we require a program
+ -- name (for logs) and the root directory.
+ | ConfigFileOnly { _pipelineConfigMethodProgName :: String
+ , _pipelineConfigMethodConfigFile :: LocalFilePath
+ , _pipelineConfigMethodDefRoot :: Loc }
+ -- ^ Config file is read and loaded if it exists. No CLI is used.
+ | FullConfig { _pipelineConfigMethodProgName :: String
+ , _pipelineConfigMethodConfigFile :: LocalFilePath
+ , _pipelineConfigMethodDefRoot :: Loc
+ , _pipelineConfigMethodDefReturnVal :: r }
+ -- ^ Both config file and CLI will be used. We require a name (for the help
+ -- page in the CLI), the config file path, the default 'LocationTree' root
+ -- mapping, and a default value to return when the CLI is called with a
+ -- command other than 'run' (in which case the pipeline will not run).
+
+makeLenses ''PipelineConfigMethod
+
+-- | Runs the new ModelCLI unless a Yaml or Json config file is given on the
+-- command line
+withCliParser
+ :: String
+ -> String
+ -> Parser (Maybe (a, cmd), LoggerScribeParams, [PostParsingAction])
+ -> r
+ -> (a -> cmd -> LoggerScribeParams -> PreRun -> IO r)
+ -> IO r
+withCliParser progName progDesc_ cliParser defRetVal f = do
+ (mbArgs, lsp, actions) <- execCliParser progName progDesc_ cliParser
+ case mbArgs of
+ Just (cfg, cmd) ->
+ f cfg cmd lsp $ PreRun $ mapM_ processAction actions
+ Nothing -> runLogger progName lsp $ do
+ mapM_ processAction actions
+ return defRetVal
+ where
+ processAction (PostParsingLog s l) = logFM s l
+ processAction (PostParsingWrite configFile cfg) = do
+ let rawFile = configFile ^. pathWithExtensionAsRawFilePath
+ case configFile of
+ PathWithExtension "-" _ ->
+ error "Config was read from stdin, cannot overwrite it"
+ PathWithExtension _ e | e `elem` ["yaml","yml"] ->
+ liftIO $ Y.encodeFile rawFile cfg
+ PathWithExtension _ "json" ->
+ liftIO $ LBS.writeFile rawFile $ A.encodePretty cfg
+ _ -> error $ "Config file has unknown format"
+ logFM NoticeS $ logStr $ "Wrote file '" ++ rawFile ++ "'"
+
+data ConfigFileSource = YAMLStdin | ConfigFileURL Loc
+
+-- | Tries to read a yaml filepath on CLI, then a JSON path, then command line
+-- args as expected by the @callParser@ argument.
+withConfigFileSourceFromCLI
+ :: (Maybe ConfigFileSource -> IO b) -- If a filepath has been read as first argument
+ -> IO b
+withConfigFileSourceFromCLI f = do
+ cliArgs <- liftIO getArgs
+ case cliArgs of
+ [] -> f Nothing
+ filename : rest -> do
+ case parseURL filename of
+ Left _ -> f Nothing
+ Right (LocalFile (PathWithExtension "-" ext)) ->
+ if ext == "" || allowedExt ext
+ then withArgs rest $ f $ Just YAMLStdin
+ else error $ filename ++ ": Only JSON or YAML config can be read from stdin"
+ Right loc | getLocType loc == "" -> f Nothing
+ -- No extension, therefore probably not a filepath
+ | allowedExt (getLocType loc) -> withArgs rest $ f $ Just $ ConfigFileURL loc
+ | otherwise -> error $ filename ++ ": Only JSON or YAML cofiles allowed"
+ where
+ allowedExt = (`elem` ["yaml","yml","json"]) . map toLower
+
+tryReadConfigFileSource :: (FromJSON cfg)
+ => ConfigFileSource -> (Loc -> IO cfg) -> IO (Maybe cfg)
+tryReadConfigFileSource configFileSource ifRemote =
+ case configFileSource of
+ YAMLStdin ->
+ Just <$> (BS.hGetContents stdin >>= Y.decodeThrow)
+ ConfigFileURL (LocalFile lfp) -> do
+ let p = lfp ^. pathWithExtensionAsRawFilePath
+ yamlFound <- doesFileExist p
+ if yamlFound
+ then Just <$> Y.decodeFileThrow p
+ else return Nothing
+ ConfigFileURL remoteLoc -> -- If config is remote, it must always be present
+ -- for now
+ Just <$> ifRemote remoteLoc
+
+data BaseInputConfig cfg = BaseInputConfig
+ { bicSourceFile :: Maybe LocalFilePath
+ , bicLoadedConfig :: Maybe Y.Value
+ , bicDefaultConfig :: cfg
+ }
+
+-- | Creates a command line parser that will return an action returning the
+-- configuration and the chosen subcommand or Nothing if the user simply asked
+-- to save some overrides and that the program should stop. It _does not_ mean
+-- that an error has occured, just that the program should not continue.
+cliYamlParser
+ :: (ToJSON cfg)
+ => String -- ^ The program name
+ -> BaseInputConfig cfg -- ^ Default configuration
+ -> ConfigurationReader cfg overrides
+ -> [(Parser cmd, String, String)] -- ^ [(Parser cmd, Command repr, Command help string)]
+ -> cmd -- ^ Default command
+ -> IO (Parser (Maybe (cfg, cmd), LoggerScribeParams, [PostParsingAction]))
+cliYamlParser progName baseInputCfg inputParsing cmds defCmd = do
+ return $ pureCliParser progName baseInputCfg inputParsing cmds defCmd
+
+-- | A shortcut to run a parser and defining the program help strings
+execCliParser
+ :: String
+ -> String
+ -> Parser a
+ -> IO a
+execCliParser header_ progDesc_ parser_ = do
+ let opts = info (helper <*> parser_)
+ ( fullDesc
+ <> header header_
+ <> progDesc progDesc_ )
+ execParser opts
+
+pureCliParser
+ :: (ToJSON cfg)
+ => String -- ^ The program name
+ -> BaseInputConfig cfg -- ^ The base configuration we read
+ -> ConfigurationReader cfg overrides
+ -> [(Parser cmd, String, String)] -- ^ [(Parser cmd, Command repr, Command help string)]
+ -> cmd -- ^ Default command
+ -> Parser (Maybe (cfg, cmd), LoggerScribeParams, [PostParsingAction])
+ -- ^ (Config and command, actions to run to
+ -- override the yaml file)
+pureCliParser progName baseInputConfig cfgCLIParsing cmds defCmd =
+ (case bicSourceFile baseInputConfig of
+ Nothing -> empty
+ Just f -> subparser $
+ command "write-config-template"
+ (info
+ (pure (Nothing, maxVerbosityLoggerScribeParams
+ ,[PostParsingWrite f (bicDefaultConfig baseInputConfig)]))
+ (progDesc $ "Write a default configuration file in " <> (f^.pathWithExtensionAsRawFilePath))))
+ <|>
+ handleOptions progName baseInputConfig cliOverriding
+ <$> ((subparser $
+ (case bicSourceFile baseInputConfig of
+ Nothing -> mempty
+ Just f ->
+ command "save" $
+ info (pure Nothing) $
+ progDesc $ "Just save the command line overrides in " <> (f^.pathWithExtensionAsRawFilePath))
+ <>
+ foldMap
+ (\(cmdParser, cmdShown, cmdInfo) ->
+ command cmdShown $
+ info (Just . (,cmdShown) <$> cmdParser) $
+ progDesc cmdInfo)
+ cmds)
+ <|>
+ pure (Just (defCmd, "")))
+ <*> (case bicSourceFile baseInputConfig of
+ Nothing -> pure False
+ Just f ->
+ switch ( long "save"
+ <> short 's'
+ <> help ("Save overrides in the " <> (f^.pathWithExtensionAsRawFilePath) <> " before running.") ))
+ <*> overridesParser cliOverriding
+ where
+ cliOverriding = addScribeParamsParsing cfgCLIParsing
+
+severityShortcuts :: Parser Severity
+severityShortcuts =
+ numToSeverity <$> liftA2 (-)
+ (length <$>
+ (many
+ (flag' ()
+ ( short 'q'
+ <> long "quiet"
+ <> help "Print only warning (-q) or error (-qq) messages. Cancels out with -v."))))
+ (length <$>
+ (many
+ (flag' ()
+ ( long "verbose"
+ <> short 'v'
+ <> help "Print info (-v) and debug (-vv) messages. Cancels out with -q."))))
+ where
+ numToSeverity (-1) = InfoS
+ numToSeverity 0 = NoticeS
+ numToSeverity 1 = WarningS
+ numToSeverity 2 = ErrorS
+ numToSeverity 3 = CriticalS
+ numToSeverity 4 = AlertS
+ numToSeverity x | x>0 = EmergencyS
+ | otherwise = DebugS
+
+-- | Parses the CLI options that will be given to Katip's logger scribe
+parseScribeParams :: Parser LoggerScribeParams
+parseScribeParams = LoggerScribeParams
+ <$> ((option (eitherReader severityParser)
+ ( long "severity"
+ <> help "Control exactly which minimal severity level will be logged (used instead of -q or -v)"))
+ <|>
+ severityShortcuts)
+ <*> (numToVerbosity <$>
+ option auto
+ ( long "context-verb"
+ <> short 'c'
+ <> help "A number from 0 to 3 (default: 0). Controls the amount of context to show per log line"
+ <> value (0 :: Int)))
+ <*> (option (eitherReader loggerFormatParser)
+ ( long "log-format"
+ <> help "Selects a format for the log: 'pretty' (default, only for human consumption), 'compact' (pretty but more compact), 'json' or 'bracket'"
+ <> value PrettyLog))
+ where
+ severityParser = \case
+ "debug" -> Right DebugS
+ "info" -> Right InfoS
+ "notice" -> Right NoticeS
+ "warning" -> Right WarningS
+ "error" -> Right ErrorS
+ "critical" -> Right CriticalS
+ "alert" -> Right AlertS
+ "emergency" -> Right EmergencyS
+ s -> Left $ s ++ " isn't a valid severity level"
+ numToVerbosity 0 = V0
+ numToVerbosity 1 = V1
+ numToVerbosity 2 = V2
+ numToVerbosity _ = V3
+ loggerFormatParser "pretty" = Right PrettyLog
+ loggerFormatParser "compact" = Right CompactLog
+ loggerFormatParser "json" = Right JSONLog
+ loggerFormatParser "bracket" = Right BracketLog
+ loggerFormatParser s = Left $ s ++ " isn't a valid log format"
+
+-- | Modifies a CLI parsing so it features verbosity and severity flags
+addScribeParamsParsing :: ConfigurationReader cfg ovs -> ConfigurationReader (LoggerScribeParams, cfg) (LoggerScribeParams, ovs)
+addScribeParamsParsing super = ConfigurationReader
+ { overridesParser = (,) <$> parseScribeParams <*> overridesParser super
+ , nullOverrides = \(_, ovs) -> nullOverrides super ovs
+ , overrideCfgFromYamlFile = \yaml (scribeParams, ovs) ->
+ let (warns, res) = overrideCfgFromYamlFile super yaml ovs
+ in (warns, (scribeParams,) <$> res)
+ }
+
+-- | Some action to be carried out after the parser is done. Writing the config
+-- file is done here, as is the logging of config.
+data PostParsingAction
+ = PostParsingLog Severity LogStr -- ^ Log a message
+ | forall a. (ToJSON a) => PostParsingWrite LocalFilePath a
+ -- ^ Write to a file and log a message about it
+
+-- | Wraps the actions to override the config file
+newtype PreRun = PreRun {unPreRun :: forall m. (KatipContext m, MonadIO m) => m ()}
+
+handleOptions
+ :: forall cfg cmd overrides.
+ (ToJSON cfg)
+ => String -- ^ Program name
+ -> BaseInputConfig cfg
+ -> ConfigurationReader (LoggerScribeParams, cfg) (LoggerScribeParams, overrides)
+ -> Maybe (cmd, String) -- ^ Command to run (and a name/description for it). If
+ -- Nothing, means we should just save the config
+ -> Bool -- ^ Whether to save the overrides
+ -> (LoggerScribeParams, overrides) -- ^ overrides
+ -> (Maybe (cfg, cmd), LoggerScribeParams, [PostParsingAction])
+ -- ^ (Config and command, actions to run to override
+ -- the yaml file)
+handleOptions progName (BaseInputConfig _ Nothing _) _ Nothing _ _ = error $
+ "No config found and nothing to save. Please run `" ++ progName ++ " write-config-template' first."
+handleOptions progName (BaseInputConfig mbCfgFile mbCfg defCfg) cliOverriding mbCmd saveOverridesAlong overrides =
+ let defaultCfg = toJSON defCfg
+ (cfgWarnings, cfg) = case mbCfg of
+ Just c -> mergeWithDefault [] defaultCfg c
+ Nothing -> ([PostParsingLog DebugS $ logStr $
+ "Config file" ++ configFile' ++ " is not found. Treated as empty."]
+ ,defaultCfg)
+ (overrideWarnings, mbScribeParamsAndCfgOverriden) =
+ overrideCfgFromYamlFile cliOverriding cfg overrides
+ allWarnings = cfgWarnings ++ map (PostParsingLog WarningS . logStr) overrideWarnings
+ in case mbScribeParamsAndCfgOverriden of
+ Right (lsp, cfgOverriden) ->
+ case mbCmd of
+ Nothing -> (Nothing, lsp, allWarnings ++
+ [PostParsingWrite (fromJust mbCfgFile) cfgOverriden])
+ Just (cmd, _cmdShown) ->
+ let actions =
+ allWarnings ++
+ (if saveOverridesAlong
+ then [PostParsingWrite (fromJust mbCfgFile) cfgOverriden]
+ else []) ++
+ [PostParsingLog DebugS $ logStr $ "Running `" <> T.pack progName
+ <> "' with the following config:\n"
+ <> T.decodeUtf8 (Y.encode cfgOverriden)]
+ in (Just (cfgOverriden, cmd), lsp, actions)
+ Left err -> dispErr err
+ where
+ configFile' = case mbCfgFile of Nothing -> ""
+ Just f -> " " ++ f ^. pathWithExtensionAsRawFilePath
+ dispErr err = error $
+ (if nullOverrides cliOverriding overrides
+ then "C"
+ else "Overriden c") ++ "onfig" <> shownFile <> " is not valid:\n " ++ err
+ where
+ shownFile = case mbCfgFile of
+ Just f -> " from " <> show f
+ Nothing -> ""
+
+mergeWithDefault :: [T.Text] -> Y.Value -> Y.Value -> ([PostParsingAction], Y.Value)
+mergeWithDefault path (Object o1) (Object o2) =
+ let newKeys = HashMap.keys $ o2 `HashMap.difference` o1
+ warnings = map (\key -> PostParsingLog DebugS $ logStr $
+ "The key '" <> T.intercalate "." (reverse $ key:path) <>
+ "' isn't present in the default configuration. " <>
+ "Please make sure it isn't a typo.")
+ newKeys
+ (subWarnings, merged) =
+ sequenceA $ HashMap.unionWithKey
+ (\key (_,v1) (_,v2) -> mergeWithDefault (key:path) v1 v2)
+ (fmap pure o1)
+ (fmap pure o2)
+ in (warnings ++ subWarnings, Object merged)
+mergeWithDefault _ _ v = pure v
+
+parseShowTree :: Parser PipelineCommand
+parseShowTree = ShowTree <$> parseRoot <*> parseShowOpts
+ where
+ parseRoot = argument (eitherReader (fromTextRepr . T.pack))
+ (help "Path from which to display the porcupine tree" <> value (LTP []))
+ parseShowOpts = PhysicalFileNodeShowOpts
+ <$> flag False True
+ (long "mappings"
+ <> short 'm'
+ <> help "Show mappings of virtual files")
+ <*> flag True False
+ (long "no-serials"
+ <> short 'S'
+ <> help "Don't show if the virtual file can be used as a source or a sink")
+ <*> flag True False
+ (long "no-fields"
+ <> short 'F'
+ <> help "Don't show the option fields and their docstrings")
+ <*> flag False True
+ (long "types"
+ <> short 't'
+ <> help "Show types written to virtual files")
+ <*> flag False True
+ (long "accesses"
+ <> short 'a'
+ <> help "Show how virtual files will be accessed")
+ <*> flag True False
+ (long "no-extensions"
+ <> short 'E'
+ <> help "Don't show the possible extensions for physical files")
+ <*> option auto
+ (long "num-chars"
+ <> short 'c'
+ <> help "The number of characters to show for the type (default: 60)"
+ <> value (60 :: Int))
+
+pipelineCliParser
+ :: (ToJSON cfg)
+ => (cfg -> ConfigurationReader cfg overrides)
+ -> String
+ -> BaseInputConfig cfg
+ -> IO (Parser (Maybe (cfg, PipelineCommand), LoggerScribeParams, [PostParsingAction]))
+pipelineCliParser getCliOverriding progName baseInputConfig =
+ cliYamlParser progName baseInputConfig (getCliOverriding $ bicDefaultConfig baseInputConfig)
+ [(pure RunPipeline, "run", "Run the pipeline")
+ ,(parseShowTree, "show-tree", "Show the porcupine tree of the pipeline")]
+ RunPipeline
diff --git a/src/System/TaskPipeline/Caching.hs b/src/System/TaskPipeline/Caching.hs
new file mode 100644
index 0000000..dbc62c1
--- /dev/null
+++ b/src/System/TaskPipeline/Caching.hs
@@ -0,0 +1,112 @@
+{-# LANGUAGE Arrows #-}
+{-# LANGUAGE TupleSections #-}
+
+-- | Functions in that module are necessary only if you want a fine control over
+-- the caching of some actions. When you want to perform several reads and
+-- writes from and to VirtualFiles as part of a /single/ cached task, the recommended way is
+-- to use:
+--
+-- - 'getDataReader'/'getDataWriter' to obtain the accessors
+-- - 'toTask'' to create the cached task, to which you give the accessors
+--
+-- Given the accessors are hashable, the files that are bound to them are
+-- incorporated to the hash, so binding them to new files will re-trigger the
+-- task.
+
+module System.TaskPipeline.Caching
+ ( toTaskAndWrite
+ , toTaskAndWrite_
+
+ -- * Re-exports
+
+ , module Data.Locations.LogAndErrors
+ , Properties(..)
+ , defaultCacherWithIdent
+ , Default(..)
+ ) where
+
+import qualified Control.Exception.Safe as SE
+import Control.Funflow
+import Data.Default (Default (..))
+import Data.Locations.LogAndErrors
+import Data.Locations.VirtualFile
+import System.TaskPipeline.PorcupineTree
+import System.TaskPipeline.PTask
+import System.TaskPipeline.VirtualFileAccess
+
+import Prelude hiding (id, (.))
+
+
+-- | For when the result of the lifted function just needs to be written, not
+-- returned.
+toTaskAndWrite_
+ :: (LogCatch m, Typeable b, Typeable ignored)
+ => Properties (a, DataWriter m b) () -- ^ Location types aren't ContentHashable, but
+ -- all are convertible to JSON. We need that to
+ -- hash on locations so the task is repeated if
+ -- we bind to new locations.
+ -> VirtualFile b ignored -- ^ The VirtualFile to write
+ -> (a -> m b) -- ^ The function to lift. Won't be executed if
+ -- the file isn't mapped
+ -> PTask m a ()
+toTaskAndWrite_ props vf f =
+ toTaskAndWrite props id vf (fmap (,()) . f) (const $ return ())
+{-# INLINE toTaskAndWrite_ #-}
+
+
+-- | Similar to 'toTask'', but caches a write action of the result too. In this
+-- case we use the filepath bound to the VirtualFile to compute the hash. That
+-- means that if the VirtualFile is bound to something else, the step will be
+-- re-executed.
+toTaskAndWrite
+ :: (LogCatch m, Typeable b, Typeable ignored)
+ => Properties (a', DataWriter m b) c -- ^ Location types aren't ContentHashable, but
+ -- all are convertible to JSON. We need that to
+ -- hash on locations so the task is repeated if
+ -- we bind to new locations.
+ -> (a -> a') -- ^ If the input mustn't or cannot be fully
+ -- hashed, you can select a subset of it or
+ -- transform it into a hashable intermediate
+ -- representation (like aeson Value). Else just
+ -- use 'id'
+ -> VirtualFile b ignored -- ^ The VirtualFile to write. If the file
+ -- isn't mapped, the action won't be performed,
+ -- and the task will return the default result.
+ -> (a -> m (b,c)) -- ^ The function to lift. First item of the
+ -- returned tuple will be written to the
+ -- VirtualFile. The second will be returned by
+ -- the task, so it must be loadable from the
+ -- store.
+ -> (a -> m c) -- ^ Called when the VirtualFile isn't mapped,
+ -- and therefore no @b@ needs to be computed
+ -> PTask m a c
+toTaskAndWrite props inputHashablePart vf action actionWhenNotMapped = proc input -> do
+ writer <- getDataWriter vf -< ()
+ throwTask <<< toTask' props' cached -< (input,writer)
+ where
+ cached (input,writer) | null (dwLocsAccessed writer)
+ = Right <$> actionWhenNotMapped input
+ | otherwise
+ = do
+ res <- SE.try $ action input
+ case res of
+ Right (outputForVFile, outputForStore) -> do
+ dwPerformWrite writer outputForVFile
+ return $ Right outputForStore
+ Left err -> return $ Left (err::SomeException)
+
+ props' = props { cache = cache'
+ , mdpolicy = updMdw <$> mdpolicy props }
+ getH (input,writer) = (inputHashablePart input,writer)
+ cache' = case cache props of
+ NoCache -> NoCache
+ Cache key sv rv ->
+ let key' salt = key salt . getH
+ sv' (Left e) = error $
+ "toTaskAndWrite: An exception occured during the cached function: "
+ ++ displayException e
+ sv' (Right x) = sv x
+ rv' = Right . rv
+ in Cache key' sv' rv'
+ updMdw mdWriter i (Right o) = mdWriter (getH i) o
+ updMdw _ _ (Left _) = []
diff --git a/src/System/TaskPipeline/ConfigurationReader.hs b/src/System/TaskPipeline/ConfigurationReader.hs
new file mode 100644
index 0000000..005815f
--- /dev/null
+++ b/src/System/TaskPipeline/ConfigurationReader.hs
@@ -0,0 +1,150 @@
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE GADTs #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE RecordWildCards #-}
+{-# OPTIONS_GHC -Wall #-}
+
+module System.TaskPipeline.ConfigurationReader
+ ( ConfigurationReader(..)
+ , docRecBasedConfigurationReader
+ , genericAesonBasedConfigurationReader
+ , parseJSONEither
+ ) where
+
+import Control.Applicative
+import Control.Lens
+import Control.Monad
+import qualified Data.Aeson as A
+import Data.DocRecord
+import Data.DocRecord.OptParse
+import qualified Data.HashMap.Lazy as HashMap
+import Data.Locations.Loc
+import Data.Locations.SerializationMethod (parseJSONEither)
+import qualified Data.Text as T
+import qualified Data.Text.Encoding as T
+import qualified Data.Yaml as Y
+import Options.Applicative
+
+
+-- | How to override a YAML file config from the command-line to return some
+-- config type @cfg@
+data ConfigurationReader cfg overrides = ConfigurationReader
+ { overridesParser :: Parser overrides
+ -- ^ Generate a parser from default cfg
+ , nullOverrides :: overrides -> Bool
+ -- ^ True if no override has been provided on the CLI
+ , overrideCfgFromYamlFile
+ :: A.Value -> overrides -> ([String], Either String cfg)
+ -- ^ How to override the config read from YAML file. Returns: (Warnings,
+ -- Overriden config or an error).
+ }
+
+-- | defCfg must be a 'DocRec' here. Uses it to generate one option per field in
+-- the DocRec, along with its documentation.
+docRecBasedConfigurationReader
+ :: (RecordUsableWithCLI rs)
+ => DocRec rs -> ConfigurationReader (DocRec rs) (Rec SourcedDocField rs)
+docRecBasedConfigurationReader defCfg = ConfigurationReader{..}
+ where
+ overridesParser = parseRecFromCLI $ tagWithDefaultSource defCfg
+ -- The parser will set the source to CLI for each modified
+ -- field. Unmodified fields' source with remain Default
+ nullOverrides :: Rec SourcedDocField rs -> Bool
+ nullOverrides RNil = True
+ nullOverrides _ = False
+ overrideCfgFromYamlFile aesonCfg cliOverrides = ([], result)
+ where
+ result = do
+ yamlCfg <- tagWithYamlSource <$> parseJSONEither aesonCfg
+ return $ rmTags $ rzipWith chooseHighestPriority yamlCfg cliOverrides
+ -- CLI overrides YAML and YAML overrides Default. This
+ -- way, options from the CLI that have not been changed
+ -- from their Default value do not erase the value
+ -- specified in the JSON file.
+
+-- | Generates a --override/-o CLI option that takes a "key=value" argument and
+-- can be repeated. Also has a -q flag that activates quiet mode. Doesn't make
+-- assumptions on the types of values, but doesn't do extra checks and doesn't
+-- display any extra documentation.
+genericAesonBasedConfigurationReader
+ :: (A.FromJSON cfg)
+ => LocalFilePath -> [(String, Char, String)] -> ConfigurationReader cfg [String]
+genericAesonBasedConfigurationReader configFile shortcuts =
+ ConfigurationReader genParser null
+ (\origCfg overrides ->
+ let (warnings, result) =
+ overrideConfigFromKeyValues origCfg overrides
+ in (warnings, parseJSONEither =<< result))
+ where
+ genParser = foldr1 (liftA2 (++)) overrideArgs
+ mkOption (l,s,h,f) = f <$>
+ many (strOption
+ ( long l <> short s <> metavar "yaml.path=YAML_VALUE" <> help h ))
+ mkShortcut (l,s,p) =
+ ( l,s,"A shortcut for `-o "<>p<>".yaml.path=YAML_VALUE'"
+ , map ((p++".")++) )
+ overrideArgs = map mkOption $
+ ("override", 'o', "Override a field value in the " <> configFile ^. pathWithExtensionAsRawFilePath <>
+ " configuration.", id)
+ : map mkShortcut shortcuts
+
+overrideConfigFromKeyValues :: A.Value -> [String] -> ([String], Either String A.Value)
+overrideConfigFromKeyValues origCfg overrides =
+ case foldM parseAndOverride ([], origCfg) $ map T.pack overrides of
+ Left s -> ([], Left s)
+ Right (w, c) -> (w, Right c)
+ where
+ badPath fullPath =
+ Left $ "Path `" ++ fullPath ++ "' malformed."
+ pathNotFound fullPath fields =
+ Left $ "Path `" ++ fullPath
+ ++ "' contains unknown nested field(s): " ++ show fields
+ parseAndOverride (w, cfg) override = case T.splitOn "=" override of
+ [path, val] -> case Y.decodeEither' $ T.encodeUtf8 val of
+ Right jsonVal -> do
+ (w', cfg') <- doOverride cfg (T.unpack path) (T.splitOn "." path) jsonVal
+ return (w'++w, cfg')
+ Left e -> Left $ "`" ++ T.unpack path ++ "': `"
+ ++ T.unpack val ++ "' is not valid yaml: got"
+ ++ show e
+ _ -> badPath $ T.unpack override
+ doOverride _ _ [] v = Right ([],v)
+ doOverride (A.Object cfg) fullPath (k:ks) v =
+ case HashMap.lookup k cfg of
+ Just cfg' -> do
+ (w, cfg'') <- doOverride cfg' fullPath ks v
+ Right $ checkTypeAndInsert w fullPath cfg' k cfg'' cfg
+ Nothing -> case ks of
+ [] -> Right $
+ ( ["`" ++ fullPath ++
+ "': This field does not exist in config file, it will be added (but beware of typos!)"]
+ , A.Object $ HashMap.insert k v cfg)
+ k' -> pathNotFound fullPath k'
+ doOverride _ fullPath k _ = pathNotFound fullPath k
+
+jsonType :: A.Value -> String
+jsonType a = case a of
+ A.String _ -> "a string"
+ A.Object _ -> "an object"
+ A.Number _ -> "a number"
+ A.Array _ -> "an array"
+ A.Bool _ -> "a bool"
+ A.Null -> "a null"
+
+checkTypeAndInsert :: [String]
+ -> String
+ -> A.Value
+ -> T.Text
+ -> A.Value
+ -> HashMap.HashMap T.Text A.Value
+ -> ([String], A.Value)
+checkTypeAndInsert w fullPath v' k v m =
+ let i = A.Object $ HashMap.insert k v m
+ t = jsonType v
+ t' = jsonType v'
+ in if t == t'
+ then (w,i)
+ else
+ (["`" ++ fullPath ++ "': Overriding " ++ t'
+ ++ " with " ++ t] ++ w
+ , i)
diff --git a/src/System/TaskPipeline/Logger.hs b/src/System/TaskPipeline/Logger.hs
new file mode 100644
index 0000000..461dc3d
--- /dev/null
+++ b/src/System/TaskPipeline/Logger.hs
@@ -0,0 +1,117 @@
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE RecordWildCards #-}
+
+module System.TaskPipeline.Logger
+ ( LoggerScribeParams(..)
+ , LoggerFormat(..)
+ , Severity(..)
+ , Verbosity(..)
+ , maxVerbosityLoggerScribeParams
+ , warningsAndErrorsLoggerScribeParams
+ , log
+ , runLogger
+ ) where
+
+import Control.Exception.Safe
+import Control.Monad.IO.Class (MonadIO, liftIO)
+import Data.Aeson
+import Data.Aeson.Encode.Pretty (encodePrettyToTextBuilder)
+import Data.Aeson.Text (encodeToTextBuilder)
+import qualified Data.HashMap.Strict as HM
+import Data.String
+import Data.Text (Text)
+import Data.Text.Lazy.Builder hiding (fromString)
+import Katip
+import Katip.Core
+import System.IO (stdout)
+
+
+-- | Switch between the different type of formatters for the log
+data LoggerFormat
+ = PrettyLog -- ^ Just shows the log messages, colored, with namespace and
+ -- pretty-prints . For human consumption.
+ | CompactLog -- ^ Like pretty, but prints JSON context just on one line
+ | JSONLog -- ^ JSON-formatted log, from katip
+ | BracketLog -- ^ Regular bracket log, from katip
+ deriving (Eq, Show)
+
+-- | Scribe parameters for Logger. Define a severity threshold and a verbosity level.
+data LoggerScribeParams = LoggerScribeParams
+ { loggerSeverityThreshold :: Severity
+ , loggerVerbosity :: Verbosity
+ , loggerFormat :: LoggerFormat
+ }
+ deriving (Eq, Show)
+
+-- | Show log message from Debug level, with V2 verbosity.
+maxVerbosityLoggerScribeParams :: LoggerScribeParams
+maxVerbosityLoggerScribeParams = LoggerScribeParams DebugS V2 PrettyLog
+
+-- | Show log message from Warning level, with V2 verbosity and one-line logs.
+warningsAndErrorsLoggerScribeParams :: LoggerScribeParams
+warningsAndErrorsLoggerScribeParams = LoggerScribeParams WarningS V2 CompactLog
+
+-- | Starts a logger.
+runLogger
+ :: (MonadMask m, MonadIO m)
+ => String
+ -> LoggerScribeParams
+ -> KatipContextT m a
+ -> m a
+runLogger progName (LoggerScribeParams sev verb logFmt) x = do
+ let logFmt' :: LogItem t => ItemFormatter t
+ logFmt' = case logFmt of
+ PrettyLog -> prettyFormat True
+ CompactLog -> prettyFormat False
+ BracketLog -> bracketFormat
+ JSONLog -> jsonFormat
+ handleScribe <- liftIO $
+ mkHandleScribeWithFormatter logFmt' ColorIfTerminal stdout (permitItem sev) verb
+ let mkLogEnv = liftIO $
+ registerScribe "stdout" handleScribe defaultScribeSettings
+ =<< initLogEnv (fromString progName) "devel"
+ bracket mkLogEnv (liftIO . closeScribes) $ \le ->
+ runKatipContextT le () "main" $ x
+
+-- | Doesn't log time, host, file location etc. Colors the whole message and
+-- displays context AFTER the message.
+prettyFormat :: LogItem a => Bool -> ItemFormatter a
+prettyFormat usePrettyJSON withColor verb Item{..} =
+ colorize withColor "40" (mconcat $ map fromText $ intercalateNs _itemNamespace) <>
+ fromText " " <>
+ colorBySeverity' withColor _itemSeverity (mbSeverity <> unLogStr _itemMessage) <>
+ colorize withColor "2" ctx
+ where
+ ctx = case toJSON $ payloadObject verb _itemPayload of
+ Object hm | HM.null hm -> mempty
+ c -> if usePrettyJSON
+ then fromText "\n" <> encodePrettyToTextBuilder c
+ else fromText " " <> encodeToTextBuilder c
+ -- We display severity levels not distinguished by color
+ mbSeverity = case _itemSeverity of
+ CriticalS -> fromText "[CRITICAL] "
+ AlertS -> fromText "[ALERT] "
+ EmergencyS -> fromText "[EMERGENCY] "
+ _ -> mempty
+
+-- | Like 'colorBySeverity' from katip, but works on builders
+colorBySeverity' :: Bool -> Severity -> Builder -> Builder
+colorBySeverity' withColor severity msg = case severity of
+ EmergencyS -> red msg
+ AlertS -> red msg
+ CriticalS -> red msg
+ ErrorS -> red msg
+ WarningS -> yellow msg
+ NoticeS -> bold msg
+ DebugS -> grey msg
+ _ -> msg
+ where
+ bold = colorize withColor "1"
+ red = colorize withColor "31"
+ yellow = colorize withColor "33"
+ grey = colorize withColor "2"
+
+colorize :: Bool -> Text -> Builder -> Builder
+colorize withColor c s
+ | withColor = fromText "\ESC[" <> fromText c <> fromText "m" <> s <> fromText "\ESC[0m"
+ | otherwise = s
diff --git a/src/System/TaskPipeline/Options.hs b/src/System/TaskPipeline/Options.hs
new file mode 100644
index 0000000..4d02279
--- /dev/null
+++ b/src/System/TaskPipeline/Options.hs
@@ -0,0 +1,84 @@
+{-# LANGUAGE DataKinds #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE TypeOperators #-}
+{-# LANGUAGE PatternSynonyms #-}
+{-# LANGUAGE GADTs #-}
+{-# OPTIONS_GHC -Wall #-}
+{-# OPTIONS_GHC -Wno-unticked-promoted-constructors #-}
+
+module System.TaskPipeline.Options
+ ( -- * API
+ getOptions
+ , getOption
+ , optionsVirtualFile
+ , optionVirtualFile
+ -- * Re-exports from docrecords
+ , DocRec, Rec(..), (^^.), (^^?), (^^?!), (=:)
+ , PathWithType(..)
+ , docField
+ , pattern FV
+ ) where
+
+import Data.Aeson
+import Data.DocRecord
+import Data.DocRecord.OptParse
+import Data.Locations.SerializationMethod
+import Data.Locations.VirtualFile
+import Data.Typeable
+import GHC.TypeLits (KnownSymbol)
+import System.TaskPipeline.PTask
+import System.TaskPipeline.VirtualFileAccess
+
+import Prelude hiding (id, (.))
+
+
+-- | Field Value. Allows you to directly pattern match on the output of
+-- 'getOptions'/'loadData'
+pattern FV :: a -> DocField (s:|:a)
+pattern FV v <- DocField _ (Right (Field v))
+
+-- | Creates a 'VirtualFile' from a default set of options (as a DocRec). To be
+-- used with 'loadData'.
+optionsVirtualFile
+ :: forall rs. (Typeable rs, RecordUsableWithCLI rs)
+ => [LocationTreePathItem] -- ^ The path for the options in the LocationTree
+ -> DocRec rs -- ^ The DocRec containing the fields with their
+ -- docs and default values
+ -> BidirVirtualFile (DocRec rs)
+optionsVirtualFile path defOpts =
+ withEmbeddedValue defOpts $
+ bidirVirtualFile path $
+ someBidirSerial (OptionsSerial id id :: OptionsSerial (DocRec rs))
+ <> someBidirSerial YAMLSerial
+
+-- | Just like 'optionsVirtualFile', but for a single field
+optionVirtualFile
+ :: (KnownSymbol s, Typeable t, ToJSON t, FieldFromCLI ('[s] :|: t))
+ => [LocationTreePathItem] -- ^ The path for the option field in the LocationTree
+ -> DocField ('[s] :|: t) -- ^ The field, usually with default value (created
+ -- with 'docField')
+ -> BidirVirtualFile t
+optionVirtualFile path field =
+ dimap (field =:) (^^?! field) $
+ optionsVirtualFile path (field :& RNil)
+
+-- | Add a set of options (as a DocRec) to the 'LocationTree', in order to
+-- expose them to the user, and returns the final values of these options
+getOptions
+ :: (LogThrow m, Typeable rs, RecordUsableWithCLI rs)
+ => [LocationTreePathItem] -- ^ The path for the options in the LocationTree
+ -> DocRec rs -- ^ The DocRec containing the fields with their
+ -- docs and default values
+ -> PTask m () (DocRec rs) -- ^ A PTask that returns the new options values,
+ -- overriden by the user
+getOptions path = loadData . optionsVirtualFile path
+
+-- | Just like 'getOptions', but for a single field.
+getOption
+ :: (LogThrow m, KnownSymbol s, Typeable t, ToJSON t, FieldFromCLI ('[s] :|: t))
+ => [LocationTreePathItem] -- ^ The path for the option field in the LocationTree
+ -> DocField ('[s] :|: t) -- ^ The field (created with 'docField')
+ -> PTask m () t -- ^ A PTask that returns the new option,
+ -- overriden by the user
+getOption path = loadData . optionVirtualFile path
diff --git a/src/System/TaskPipeline/PTask.hs b/src/System/TaskPipeline/PTask.hs
new file mode 100644
index 0000000..c70e556
--- /dev/null
+++ b/src/System/TaskPipeline/PTask.hs
@@ -0,0 +1,218 @@
+{-# LANGUAGE Arrows #-}
+{-# LANGUAGE ConstraintKinds #-}
+{-# LANGUAGE DataKinds #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE GADTs #-}
+{-# LANGUAGE InstanceSigs #-}
+{-# LANGUAGE TupleSections #-}
+{-# LANGUAGE TypeFamilies #-}
+{-# LANGUAGE TypeOperators #-}
+{-# OPTIONS_GHC -Wall #-}
+
+
+module System.TaskPipeline.PTask
+ ( module Control.Category
+ , module Control.Arrow
+ , module Data.Locations.LogAndErrors
+ , PTask
+ , Severity(..)
+ , CanRunPTask
+ , Properties
+ , tryTask, throwTask, clockTask, clockTask'
+ , catchAndLog, throwStringTask
+ , toTask, toTask'
+ , ioTask, stepIO, stepIO'
+ , taskUsedFiles
+ , taskRequirements
+ , taskRunnablePart
+ , taskDataAccessTree
+ , taskInSubtree
+ , voidTask
+ , addContextToTask
+ , addStaticContextToTask
+ , addNamespaceToTask
+ , nameTask
+ , logTask
+ , logDebug, logInfo, logNotice, logWarning, logError
+ ) where
+
+import Prelude hiding (id, (.))
+
+import Control.Arrow
+import qualified Control.Arrow.Free as AF
+import Control.Category
+import Control.DeepSeq (NFData (..), force)
+import Control.Exception (evaluate)
+import Control.Funflow (Properties, stepIO, stepIO')
+import Control.Lens
+import Data.Locations
+import Data.Locations.LogAndErrors
+import Data.String
+import Katip
+import System.ClockHelpers
+import System.TaskPipeline.PorcupineTree
+import System.TaskPipeline.PTask.Internal
+
+
+-- | a tasks that discards its inputs and returns ()
+voidTask :: PTask m a ()
+voidTask = arr (const ())
+
+-- | Just a shortcut for when you want an IO step that requires no input
+ioTask :: (KatipContext m) => PTask m (IO a) a
+ioTask = stepIO id
+
+-- | Catches an error happening in a task. Leaves the tree intact if an error
+-- occured.
+tryTask
+ :: PTask m a b -> PTask m a (Either SomeException b)
+tryTask = AF.try
+
+-- | An version of 'tryPTask' that just logs when an error happens
+catchAndLog :: (KatipContext m)
+ => Severity -> PTask m a b -> PTask m a (Maybe b)
+catchAndLog severity task =
+ tryTask task
+ >>> toTask (\i ->
+ case i of
+ Left e -> do
+ logFM severity $ logStr $ displayException (e::SomeException)
+ return Nothing
+ Right x -> return $ Just x)
+
+-- | Fails the whole pipeline if an exception occured, or just continues as
+-- normal
+throwTask :: (Exception e, LogThrow m) => PTask m (Either e b) b
+throwTask = arr (over _Left displayException) >>> throwStringTask
+
+-- | Fails the whole pipeline if an exception occured, or just continues as
+-- normal
+throwStringTask :: (LogThrow m) => PTask m (Either String b) b
+throwStringTask = toTask $ \i ->
+ case i of
+ Left e -> throwWithPrefix e
+ Right r -> return r
+
+-- | Turn an action into a PTask. BEWARE! The resulting 'PTask' will have NO
+-- requirements, so if the action uses files or resources, they won't appear in
+-- the LocationTree.
+toTask :: (KatipContext m)
+ => (a -> m b) -> PTask m a b
+toTask = makeTask mempty . const
+
+-- | A version of 'toTask' that can perform caching. It's analog to
+-- funflow wrap' except the action passed here is just a simple function (it
+-- will be wrapped later as a funflow effect).
+toTask' :: (KatipContext m)
+ => Properties a b -> (a -> m b) -> PTask m a b
+toTask' props = makeTask' props mempty . const
+
+-- | Measures the time taken by a 'PTask'.
+clockTask
+ :: (KatipContext m) => PTask m a b -> PTask m a (b, TimeSpec)
+clockTask task = proc input -> do
+ start <- time -< ()
+ output <- task -< input
+ end <- time -< ()
+ returnA -< (output, end `diffTimeSpec` start)
+ where
+ time = stepIO $ const $ getTime Realtime
+
+-- | Measures the time taken by a 'PTask' and the deep evaluation of its result.
+clockTask'
+ :: (NFData b, KatipContext m) => PTask m a b -> PTask m a (b, TimeSpec)
+clockTask' task = clockTask $
+ task >>> stepIO (evaluate . force)
+
+-- | Logs a message during the pipeline execution
+logTask :: (KatipContext m) => PTask m (Severity, String) ()
+logTask = toTask $ \(sev, s) -> logFM sev $ logStr s
+
+-- | Logs a message at a predefined severity level
+logDebug, logInfo, logNotice, logWarning, logError :: (KatipContext m) => PTask m String ()
+logDebug = arr (DebugS,) >>> logTask
+logInfo = arr (InfoS,) >>> logTask
+logNotice = arr (NoticeS,) >>> logTask
+logWarning = arr (WarningS,) >>> logTask
+logError = arr (ErrorS,) >>> logTask
+
+-- | To access and transform the requirements of the PTask before it runs
+taskRequirements :: Lens' (PTask m a b) VirtualTree
+taskRequirements = splitTask . _1
+
+-- | To access and transform all the 'VirtualFiles' used by this 'PTask'. The
+-- parameters of the VirtualFiles will remain hidden, but all the metadata is
+-- accessible. NOTE: The original path of the files isn't settable.
+taskUsedFiles :: Traversal' (PTask m a b) (VirtualFile NoWrite NoRead)
+taskUsedFiles = taskRequirements . traversed . vfnodeFileVoided
+
+-- | Permits to access the 'RunnableTask' inside the PTask. It is the PTask,
+-- devoid of its requirements. It is also and Arrow, and additionally it's an
+-- ArrowChoice, so by using 'over ptaskRunnablePart' you can access a structure
+-- in which you can use /case/ and /if/ statements.
+taskRunnablePart :: Lens (PTask m a b) (PTask m a' b')
+ (RunnableTask m a b) (RunnableTask m a' b')
+taskRunnablePart = splitTask . _2
+
+-- | To transform the state of the PTask when it will run
+taskReaderState :: Setter' (PTask m a b) (PTaskState m)
+taskReaderState = taskRunnablePart . runnableTaskReaderState
+
+-- | To transform the 'DataAccessTree' of the PTask when it will run
+taskDataAccessTree :: Setter' (PTask m a b) (DataAccessTree m)
+taskDataAccessTree = taskReaderState . ptrsDataAccessTree
+
+-- | Adds some context to a task, that will be used by the logger. That bit of
+-- context is dynamic, that's why what we do is wrap the task into a new one,
+-- expecting the 'LogItem'. See 'katipAddContext'. If your bit of context can be
+-- known statically (ie. before the pipeline actually runs), prefer
+-- 'addStaticContextToTask'.
+addContextToTask :: (LogItem i, Monad m) => PTask m a b -> PTask m (i,a) b
+addContextToTask = over taskRunnablePart $ modifyingRuntimeState
+ (\(item,_) -> over ptrsKatipContext (<> liftPayload item))
+ snd
+
+-- | Adds to a task some context that is know _before_ the pipeline run. The
+-- 'LogItem' to add is therefore static and can be given just as an argument.
+addStaticContextToTask :: (LogItem i) => i -> PTask m a b -> PTask m a b
+addStaticContextToTask item =
+ over (taskReaderState . ptrsKatipContext) (<> liftPayload item)
+
+-- | Adds a namespace to the task. See 'katipAddNamespace'. Like context in
+-- 'addStaticContextToTask', the namespace is meant to be static, that's why we
+-- give it as a parameter to 'addNamespaceToTask', instead of creating a PTask
+-- that expects the namespace as an input.
+--
+-- NOTE: Prefer the use of 'nameTask', which records the time spent within the
+-- task. Directly use 'addNamespaceToTask' only if that time tracking hurts
+-- performance.
+addNamespaceToTask :: String -> PTask m a b -> PTask m a b
+addNamespaceToTask ns =
+ over (taskReaderState . ptrsKatipNamespace) (<> fromString ns)
+
+-- | This gives the task a name, making porcupine aware that this task should be
+-- considered a entity by itself. This has a few effects:
+--
+-- change the logging output by wrapping it in a namespace (as per
+-- 'addNamespaceToTask') and measure and log (InfoS level) the time spent within
+-- that task
+nameTask :: (KatipContext m) => String -> PTask m a b -> PTask m a b
+nameTask ns task =
+ addNamespaceToTask ns $
+ clockTask task
+ >>> toTask (\(output, time) -> do
+ katipAddContext time $
+ logFM InfoS $ logStr $ "Finished task '" ++ ns ++ "' in " ++ showTimeSpec time
+ return output)
+
+-- | Moves the 'VirtualTree' associated to the task deeper in the final
+-- tree. This can be used to solve conflicts between tasks that have
+-- 'VirtualTree's that are identical (for instance input files for a model if
+-- you want to solve several models, in which case you'd want for instance to
+-- add an extra level at the root of the tree with the model name).
+taskInSubtree :: [LocationTreePathItem] -> PTask m a b -> PTask m a b
+taskInSubtree path = over splitTask $ \(reqTree, runnable) ->
+ let reqTree' = foldr (\pathItem rest -> folderNode [pathItem :/ rest]) reqTree path
+ runnable' = runnable & over (runnableTaskReaderState . ptrsDataAccessTree)
+ (view $ atSubfolderRec path)
+ in (reqTree', runnable')
diff --git a/src/System/TaskPipeline/PTask/Internal.hs b/src/System/TaskPipeline/PTask/Internal.hs
new file mode 100644
index 0000000..b1826f8
--- /dev/null
+++ b/src/System/TaskPipeline/PTask/Internal.hs
@@ -0,0 +1,286 @@
+{-# LANGUAGE Arrows #-}
+{-# LANGUAGE ConstraintKinds #-}
+{-# LANGUAGE ExistentialQuantification #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE GeneralizedNewtypeDeriving #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE QuasiQuotes #-}
+{-# LANGUAGE RecordWildCards #-}
+{-# LANGUAGE TemplateHaskell #-}
+{-# LANGUAGE TypeSynonymInstances #-}
+
+-- | This module exposes the 'PTask' arrow along with some low-level functions
+-- to create and run a 'PTask'.
+
+module System.TaskPipeline.PTask.Internal
+ ( PTask(..)
+ , PTaskState
+ , RunnableTask
+ , FunflowRunConfig(..)
+ , CanRunPTask
+ , FunflowOpts(..)
+ , ptrsKatipContext
+ , ptrsKatipNamespace
+ , ptrsFunflowRunConfig
+ , ptrsDataAccessTree
+ , splitTask
+ , runnableTaskReaderState
+ , makeTask
+ , makeTask'
+ , modifyingRuntimeState
+ , withRunnableState
+ , withRunnableState'
+ , execRunnableTask
+ , runnableWithoutReqs
+ , withTaskState
+ ) where
+
+import Prelude hiding (id, (.))
+
+import Control.Arrow
+import Control.Arrow.AppArrow
+import Control.Arrow.Async
+import Control.Arrow.Free (ArrowError)
+import Control.Category
+import Control.Funflow
+import qualified Control.Funflow.ContentStore as CS
+import Control.Funflow.External.Coordinator
+import Control.Funflow.External.Coordinator.SQLite
+import qualified Control.Funflow.RemoteCache as Remote
+import Control.Lens
+import Control.Monad.Trans
+import Control.Monad.Trans.Control
+import Control.Monad.Trans.Reader
+import Control.Monad.Trans.State
+import Control.Monad.Trans.Writer
+import Data.Default
+import Data.Locations.Accessors
+import Data.Locations.FunflowRemoteCache
+import Data.Locations.LocationTree
+import Data.Locations.LogAndErrors
+import Katip.Core (Namespace)
+import Katip.Monadic
+import Path
+import System.TaskPipeline.PorcupineTree
+
+
+-- | PTask functions like mappingOverStream make necessary to recursively run
+-- some flows. Until we find a better solution than to run flows in flows, this
+-- is how we do it. These are the arguments to
+-- Control.Funflow.Exec.Simple.runFlowEx
+data FunflowRunConfig m = forall c rc. (Coordinator c, Remote.Cacher m rc) => FunflowRunConfig
+ { _ffrcCoordinator :: !c
+ , _ffrcCoordinatorConfig :: !(Config c)
+ , _ffrcContentStore :: !CS.ContentStore
+ , _ffrcFlowIdentity :: !(Maybe Int)
+ , _ffrcRemoteCache :: !rc
+ }
+
+-- | This is the state that will be shared by the whole PTask pipeline once it
+-- starts running.
+data PTaskState m = PTaskState
+ { _ptrsKatipContext :: !LogContexts
+ , _ptrsKatipNamespace :: !Namespace
+ , _ptrsFunflowRunConfig :: !(FunflowRunConfig m)
+ , _ptrsDataAccessTree :: !(DataAccessTree m) }
+
+makeLenses ''PTaskState
+
+-- | The part of a 'PTask' that will be ran once the whole pipeline is composed
+-- and the tree of requirements has been bound to physical locations. Is is
+-- important to note that while both 'PTask' and 'RunnableTask' are Arrows,
+-- only 'RunnableTask' is an ArrowChoice.
+type RunnableTask m =
+ AppArrow
+ (Reader (PTaskState m)) -- The reader layer contains the mapped
+ -- tree. Will be used only as an applicative.
+ (Flow (InnerEffect m) SomeException)
+
+-- | The constraints that must be satisfied by the base monad m so that a @PTask
+-- m@ can be run
+type CanRunPTask m = (MonadBaseControl IO m, LogMask m)
+
+-- | Runs a 'RunnableTask' given its state
+execRunnableTask
+ :: (CanRunPTask m)
+ => RunnableTask m a b -> PTaskState m -> a -> m b
+execRunnableTask
+ (AppArrow act)
+ st@PTaskState{_ptrsFunflowRunConfig=FunflowRunConfig{..}}
+ input =
+ flip evalStateT [] $
+ runFlowEx _ffrcCoordinator _ffrcCoordinatorConfig
+ _ffrcContentStore (LiftCacher _ffrcRemoteCache) id _ffrcFlowIdentity
+ (runReader act st)
+ input
+
+-- | A task is an Arrow than turns @a@ into @b@. It runs in some monad @m@.
+-- Each 'PTask' will expose its requirements in terms of resource it wants to
+-- access in the form of a virtual tree (implemented as a 'LocationTree' of
+-- 'VirtualFile's). These trees of requirements are aggregated when the tasks
+-- are combined with each other, so once the full pipeline has been composed
+-- (through Arrow composition), its 'pTaskVirtualTree' will contain the
+-- complete requirements of the pipeline.
+newtype PTask m a b = PTask
+ (AppArrow
+ (Writer VirtualTree) -- The writer layer accumulates the requirements. It will
+ -- be used only as an applicative.
+ (RunnableTask m)
+ a b)
+ deriving (Category, Arrow, ArrowError SomeException, Functor, Applicative)
+ -- PTask doesn't instanciate ArrowChoice. That's intentional, even if an
+ -- instance could be automatically derived. The ArrowChoice implementation for
+ -- `AppArrow (Writer x) arr` isn't sane for PTasks, as the monoid state (the
+ -- PTask requirements) will be accumulated by (|||) in a way that's
+ -- indistiguishable from (>>>), that is to say that doesn't differentiate
+ -- VirtualFiles that _will_ be used from those that _may_ be used. Maybe in
+ -- the future we will implement ArrowChoice/ArrowPlus/ArrowZero in a saner way (it
+ -- should even be necessary if we want to implement serialization methods with
+ -- PTask themselves, and have serial method selection based on file format or
+ -- mapping metadata), but in that case it's necessary that the pipeline
+ -- configuration file reflects this "either-or" nature of VirtualFiles.
+
+flowToPTask :: Flow (InnerEffect m) SomeException a b -> PTask m a b
+flowToPTask = PTask . appArrow . appArrow
+
+-- | The type of effects we can run. The reader layer is executed by 'wrap',
+-- this is why it doesn't appear in the Flow part of the 'RunnableTask' type.
+type OuterEffect m =
+ AsyncA (ReaderT (PTaskState m) m)
+
+-- | The effects ran inside the flow have to handle some dynamic modifications
+-- of the state (for instance from task inputs) that have to be applied to each
+-- state passed to 'wrap'. We store these modifications as a stack of functions.
+type InnerEffect m =
+ AsyncA (StateT [PTaskState m -> PTaskState m] m)
+
+instance (KatipContext m)
+ => ArrowFlow (OuterEffect m) SomeException (PTask m) where
+ step' props f = flowToPTask $ step' props f
+ stepIO' props f = flowToPTask $ stepIO' props f
+ external f = flowToPTask $ external f
+ external' props f = flowToPTask $ external' props f
+ -- wrap' transmits the Reader state of the PTask down to the flow:
+ wrap' props (AsyncA rdrAct) = runnableWithoutReqs $
+ withRunnableState' props $ \outerState input ->
+ runReaderT (rdrAct input) outerState
+ putInStore f = flowToPTask $ putInStore f
+ getFromStore f = flowToPTask $ getFromStore f
+ internalManipulateStore f = flowToPTask $ internalManipulateStore f
+
+withOuterState
+ :: (ArrowFlow (AsyncA m) ex arr)
+ => Properties a b
+ -> (t -> a -> m b)
+ -> AppArrow (Reader t) arr a b
+withOuterState props f =
+ AppArrow $ reader $ \outerState ->
+ wrap' props $ AsyncA $ \input ->
+ f outerState input
+
+-- | The task will be executed with a new state modifier pushed on the modifiers
+-- stack.
+modifyingRuntimeState
+ :: (Monad m)
+ => (a -> PTaskState m -> PTaskState m)
+ -> (a -> a')
+ -> RunnableTask m a' b
+ -> RunnableTask m a b
+modifyingRuntimeState alterState alterInput ar = pushState >>> ar >>> popState
+ where
+ pushState =
+ withOuterState def $ \_ x -> do
+ modify (alterState x :)
+ return (alterInput x)
+ popState =
+ withOuterState def $ \_ x -> do
+ modify popMod
+ return x
+ popMod [] = error $
+ "modifyingRunnableState: Modifiers list shouldn't be empty!"
+ popMod (_:ms) = ms
+
+-- | At the 'RunnableTask' level, access the reader state and run an action
+withRunnableState' :: (KatipContext m)
+ => Properties a b -> (PTaskState m -> a -> m b) -> RunnableTask m a b
+withRunnableState' props f = withOuterState props $ \outerState input -> do
+ mods <- get
+ let ptrs = foldr ($) outerState mods
+ lift $
+ localKatipContext (const $ _ptrsKatipContext ptrs) $
+ localKatipNamespace (const $ _ptrsKatipNamespace ptrs) $
+ f ptrs input
+
+-- | 'withRunnableState'' without caching.
+withRunnableState :: (KatipContext m)
+ => (PTaskState m -> a -> m b) -> RunnableTask m a b
+withRunnableState = withRunnableState' def
+
+-- | Wraps a 'RunnableTask' into a 'PTask' that declares no requirements
+runnableWithoutReqs :: RunnableTask m a b -> PTask m a b
+runnableWithoutReqs = PTask . appArrow
+
+-- | An Iso to the requirements and the runnable part of a 'PTask'
+splitTask :: Iso (PTask m a b) (PTask m a' b')
+ (VirtualTree, RunnableTask m a b)
+ (VirtualTree, RunnableTask m a' b')
+splitTask = iso to_ from_
+ where
+ to_ (PTask (AppArrow wrtrAct)) = swap $ runWriter wrtrAct
+ from_ = PTask . AppArrow . writer . swap
+ swap (a,b) = (b,a)
+
+-- | Permits to apply a function to the Reader state of a 'RunnableTask' when
+-- in runs.
+runnableTaskReaderState :: Setter' (RunnableTask m a b) (PTaskState m)
+runnableTaskReaderState = lens unAppArrow (const AppArrow) . setting local
+
+-- | Makes a task from a tree of requirements and a function. The 'Properties'
+-- indicate whether we can cache this task.
+makeTask' :: (KatipContext m)
+ => Properties a b
+ -> LocationTree VirtualFileNode
+ -> (DataAccessTree m -> a -> m b)
+ -> PTask m a b
+makeTask' props tree f =
+ (tree, withRunnableState' props (f . _ptrsDataAccessTree)) ^. from splitTask
+
+-- | Makes a task from a tree of requirements and a function. This is the entry
+-- point to PTasks
+makeTask :: (KatipContext m)
+ => LocationTree VirtualFileNode
+ -> (DataAccessTree m -> a -> m b)
+ -> PTask m a b
+makeTask = makeTask' def
+
+data FunflowOpts m = FunflowOpts
+ { storePath :: FilePath
+ , coordPath :: FilePath
+ , flowIdentity :: Maybe Int
+ , remoteCacheLoc :: Maybe (SomeLoc m) }
+ deriving (Show)
+
+withFunflowRunConfig
+ :: (LogMask m)
+ => FunflowOpts m
+ -> (FunflowRunConfig m -> m r)
+ -> m r
+withFunflowRunConfig ffopts f = do
+ storePath' <- parseAbsDir $ storePath ffopts
+ coordPath' <- parseAbsDir $ coordPath ffopts
+ let cacher = locationCacher $ remoteCacheLoc ffopts
+ CS.withStore storePath' (\store ->
+ f $ FunflowRunConfig SQLite coordPath' store (flowIdentity ffopts) cacher)
+
+-- | Given a 'KatipContext' and a 'DataAccessTree', gets the initial state to
+-- give to 'execRunnableTask'
+withTaskState :: (LogMask m)
+ => FunflowOpts m
+ -> DataAccessTree m
+ -> (PTaskState m -> m r) -> m r
+withTaskState ffPaths tree f =
+ withFunflowRunConfig ffPaths $ \ffconfig -> do
+ ctx <- getKatipContext
+ ns <- getKatipNamespace
+ f $ PTaskState ctx ns ffconfig tree
diff --git a/src/System/TaskPipeline/PorcupineTree.hs b/src/System/TaskPipeline/PorcupineTree.hs
new file mode 100644
index 0000000..bf2cf94
--- /dev/null
+++ b/src/System/TaskPipeline/PorcupineTree.hs
@@ -0,0 +1,689 @@
+{-# LANGUAGE DeriveGeneric #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE GADTs #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE PatternSynonyms #-}
+{-# LANGUAGE RecordWildCards #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE TypeOperators #-}
+{-# LANGUAGE ViewPatterns #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# OPTIONS_GHC -fno-warn-missing-pattern-synonym-signatures #-}
+{-# OPTIONS_GHC -fno-warn-name-shadowing #-}
+
+-- | This file defines all the state of the _porcupine tree_.
+--
+-- A porcupine tree is a "LocationTree" containing the resources for a pipeline.
+-- The porcupine trees of subtasks are aggregated into the resource trees of a
+-- pipeline.
+--
+-- The porcupine tree can be in three different states:
+--
+-- * "VirtualTree": It contains only VirtualFiles. It is the state of
+-- the porcupine tree that is used by the tasks to declare their requirements,
+-- and by the configuration manager to write the default tree and mappings
+-- and read back the conf
+--
+-- * "PhysicalTree": It contains physical locations. Once the
+-- configuration and mappings to physical files have been read,
+-- each node is attached its corresponding physical locations. The locations for
+-- the node which have no explicit mappings in the configuration are derived from the
+-- nodes higher in the hierarchy (this implies that the only mandatory mapping is the
+-- root of the tree). Physical trees are used to check that each virtual file
+-- is compatible with the physical locations bound to it.
+--
+-- * "DataAccessTree": It contains the functions that allow to read or write
+-- to the resources. This tree is created after every physical location has
+-- been checked according to the following rules:
+--
+-- * if a physical location has an extension that is not recognized by the
+-- VirtualFile it is bound to, the process fails
+--
+-- * if a VirtualFile has at the same time physical locations bound AND
+-- embedded data, then the embedded data is considered to be the
+-- _rightmost_ layer (ie. the one overriding all the other ones), so that
+-- the data of this VirtualFile can be easily overriden by just changing
+-- the config file.
+--
+-- The VirtualFiles of a VirtualTree, when written to the configuration file,
+-- are divided into 2 sections: "data" and "locations". "data" is a tree of the
+-- json objects corresponding to the VirtualFiles that have default data which
+-- can be represented as 'Value's (keeping the structure of the virtual
+-- tree). "locations" contains a flat json object of all the other nodes, mapped
+-- in a "/virtual/path: physical_paths" fashion.
+--
+-- Indeed, each node can be mapped to _several_ physical paths, which we call
+-- _layers_. We require everything that is read from a VirtualFile to be a
+-- Monoid, so that we can combine the content of each layer into one value. This
+-- Monoid should be right-biased (ie. in the expression @a <> b@, @b@ overwrites
+-- the contents of @a@).
+--
+-- TODO: The physical tree still has variables in the locations. When
+-- are these spliced? What are the uses cases for physical locations with
+-- variables, and how do they differ from virtual locations?
+--
+module System.TaskPipeline.PorcupineTree where
+
+import Control.Exception.Safe
+import Control.Funflow.ContentHashable
+import Control.Lens hiding ((:>))
+import Control.Monad
+import Data.Aeson
+import qualified Data.ByteString as Strict
+import qualified Data.ByteString.Lazy as Lazy
+import qualified Data.ByteString.Streaming as BSS
+import Data.DocRecord
+import Data.DocRecord.OptParse
+import qualified Data.HashMap.Strict as HM
+import Data.List (intersperse)
+import Data.Locations
+import Data.Locations.Accessors
+import Data.Maybe
+import Data.Monoid (First (..))
+import Data.Representable
+import qualified Data.Text as T
+import qualified Data.Text.Encoding as TE
+import qualified Data.Text.Lazy as LT
+import qualified Data.Text.Lazy.Encoding as LTE
+import Data.Typeable
+import qualified Data.Yaml as Y
+import GHC.Generics (Generic)
+import Katip
+import Options.Applicative
+import Streaming
+import System.ClockHelpers
+import System.TaskPipeline.ConfigurationReader
+
+
+-- * API for manipulating porcupine tree _nodes_
+
+-- | The internal part of a 'VirtualFileNode', closing over the type params of
+-- the 'VirtualFile'
+data SomeVirtualFile where
+ SomeVirtualFile :: (Typeable a, Typeable b)
+ => VirtualFile a b
+ -> SomeVirtualFile
+
+instance Semigroup SomeVirtualFile where
+ SomeVirtualFile vf <> SomeVirtualFile vf' = case cast vf' of
+ Just vf'' -> SomeVirtualFile (vf <> vf'')
+ Nothing -> error $ "Two differently typed VirtualFiles share the same virtual path "
+ ++ (T.unpack $ toTextRepr (LTP $ _vfileOriginalPath vf))
+ ++ ":\n file 1 type: " ++ show (typeOf vf)
+ ++ "\n file 2 type: " ++ show (typeOf vf')
+
+-- | Packs together the two functions that will read and write a location, along
+-- with the actual locations that will be accessed. This interface permits to
+-- separate the resolution of locations and their access. A @DataAccessor@ is
+-- usually meant to be used inside an 'toPTask'. A @DataAccessor@ is hashable
+-- (via the locations it accesses) so a cached task can take a @DataAccessor@ as
+-- an input, and this task will be re-triggered if the physical location(s)
+-- bound to this accessor change(s).
+data DataAccessor m a b = DataAccessor
+ { daPerformWrite :: a -> m ()
+ , daPerformRead :: m b
+ , daLocsAccessed :: Either String [SomeLoc m] }
+
+instance (Monad m) => ContentHashable m (DataAccessor m' a b) where
+ contentHashUpdate ctx = contentHashUpdate ctx . fmap toHashableLocs . daLocsAccessed
+
+-- | Like a 'DataAccessor' but only with the writer part
+data DataWriter m a = DataWriter
+ { dwPerformWrite :: a -> m ()
+ , dwLocsAccessed :: Either String [SomeLoc m] }
+
+instance (Monad m) => ContentHashable m (DataWriter m' a) where
+ contentHashUpdate ctx = contentHashUpdate ctx . fmap toHashableLocs . dwLocsAccessed
+
+-- | Like a 'DataAccessor' but only with the reader part
+data DataReader m a = DataReader
+ { drPerformRead :: m a
+ , drLocsAccessed :: Either String [SomeLoc m] }
+
+instance (Monad m) => ContentHashable m (DataReader m' a) where
+ contentHashUpdate ctx = contentHashUpdate ctx . fmap toHashableLocs . drLocsAccessed
+
+-- | The internal part of a 'DataAccessNode, closing over the type params of the
+-- access function.
+data SomeDataAccess m where
+ SomeDataAccess :: (Typeable a, Typeable b)
+ => (LocVariableMap -> DataAccessor m a b) -> SomeDataAccess m
+
+-- | Tells the type of accesses that some VirtualFile will undergo. They are
+-- accumulated though the whole pipeline.
+data VFNodeAccessType = ATWrite | ATRead
+ deriving (Eq, Show, Generic)
+
+instance ToJSON VFNodeAccessType
+instance FromJSON VFNodeAccessType
+
+-- | The nodes of a "VirtualTree"
+data VirtualFileNode = MbVirtualFileNode [VFNodeAccessType] (Maybe SomeVirtualFile)
+-- | A non-empty 'VirtualFileNode'
+pattern VirtualFileNode {vfnodeAccesses, vfnodeFile} =
+ MbVirtualFileNode vfnodeAccesses (Just (SomeVirtualFile vfnodeFile))
+
+-- | vfnodeFile is a @Traversal'@ into the VirtualFile contained in a
+-- VirtualFileNode, but hiding its real read/write types.
+vfnodeFileVoided :: Traversal' VirtualFileNode (VirtualFile NoWrite NoRead)
+vfnodeFileVoided f (VirtualFileNode at vf) =
+ VirtualFileNode at <$> vfileVoided f vf
+vfnodeFileVoided _ vfn = pure vfn
+
+-- | The nodes of a "PhysicalTree"
+data PhysicalFileNode m = MbPhysicalFileNode [SomeLocWithVars m] [VFNodeAccessType] (Maybe SomeVirtualFile)
+-- | A non-empty 'PhysicalFileNode'
+pattern PhysicalFileNode {pfnodeLayers, pfnodeAccesses, pfnodeFile} =
+ MbPhysicalFileNode pfnodeLayers pfnodeAccesses (Just (SomeVirtualFile pfnodeFile))
+
+-- | The nodes of a "DataAccessTree"
+data DataAccessNode m = MbDataAccessNode [SomeLocWithVars m] (First (SomeDataAccess m))
+ -- Data access function isn't a semigroup, hence the use of First here instead
+ -- of Maybe.
+-- | A non-empty 'DataAccessNode'
+pattern DataAccessNode l x = MbDataAccessNode l (First (Just (SomeDataAccess x)))
+
+
+instance Semigroup VirtualFileNode where
+ MbVirtualFileNode ats vf <> MbVirtualFileNode ats' vf' =
+ MbVirtualFileNode (ats <> ats') (vf <> vf')
+instance Monoid VirtualFileNode where
+ mempty = MbVirtualFileNode [] mempty
+-- TODO: It is dubious that composing DataAccessNodes is really needed in the
+-- end. Find a way to remove that.
+instance Semigroup (DataAccessNode m) where
+ MbDataAccessNode locs f <> MbDataAccessNode _ f' = MbDataAccessNode locs $ f <> f'
+instance Monoid (DataAccessNode m) where
+ mempty = MbDataAccessNode [] mempty
+
+toJSONTxt :: SomeLocWithVars m -> T.Text
+toJSONTxt (SomeGLoc a) = case toJSON a of
+ String s -> s
+ v -> LT.toStrict $ LTE.decodeUtf8 $ encode v
+
+-- | How to Show a PhysicalFileNode
+data PhysicalFileNodeShowOpts = PhysicalFileNodeShowOpts
+ { pfshowWithMappings :: Bool
+ , pfshowWithSerials :: Bool
+ , pfshowWithOptions :: Bool
+ , pfshowWithTypes :: Bool
+ , pfshowWithAccesses :: Bool
+ , pfshowWithExtensions :: Bool
+ , pfshowTypeNumChars :: Int }
+
+data PhysicalFileNodeWithShowOpts m =
+ PhysicalFileNodeWithShowOpts PhysicalFileNodeShowOpts (PhysicalFileNode m)
+
+instance Show (PhysicalFileNodeWithShowOpts m) where
+ show (PhysicalFileNodeWithShowOpts PhysicalFileNodeShowOpts{..}
+ PhysicalFileNode{..}) =
+ mconcat $ intersperse "\n " $
+ (showIf pfshowWithMappings $
+ (if null pfnodeLayers
+ then "<no mapping>"
+ else T.unpack (mconcat
+ (intersperse "\n + "
+ (map toJSONTxt pfnodeLayers)))))
+ ++ (showIf pfshowWithSerials $
+ describeVFileAsSourceSink pfnodeFile)
+ ++ (showIf pfshowWithTypes $
+ describeVFileTypes pfnodeFile pfshowTypeNumChars)
+ ++ (showIf pfshowWithExtensions $
+ describeVFileExtensions pfnodeFile)
+ ++ (showIf pfshowWithAccesses $
+ "Accessed with " ++ show pfnodeAccesses)
+ ++ (showIf pfshowWithOptions $
+ describeVFileAsRecOfOptions pfnodeFile pfshowTypeNumChars)
+ where
+ showIf cond content =
+ if cond && (not $ null content) then [content] else []
+ show _ = ""
+
+-- * API for manipulating porcupine trees globally
+
+-- | The tree manipulated by tasks during their construction
+type VirtualTree = LocationTree VirtualFileNode
+
+-- | The tree manipulated when checking if each location is bound to something
+-- legit
+type PhysicalTree m = LocationTree (PhysicalFileNode m)
+
+-- | The tree manipulated by tasks when they actually run
+type DataAccessTree m = LocationTree (DataAccessNode m)
+
+instance HasDefaultMappingRule VirtualFileNode where
+ getDefaultLocShortcut VirtualFileNode{..} = getDefaultLocShortcut vfnodeFile
+ getDefaultLocShortcut _ = Nothing
+
+-- | Filters the tree to get only the nodes that don't have data and can be
+-- mapped to external files
+--
+-- TODO: Explain the rules. What does it mean for a node to have data?
+-- Can a node with data be mapped to an external file?
+virtualTreeToMappings
+ :: VirtualTree
+ -> Maybe LocationMappings
+virtualTreeToMappings tree = mappingsFromLocTree <$> over filteredLocsInTree rmOpts tree
+ where
+ rmOpts VirtualFileNode{..}
+ | Just VFForCLIOptions <- intent = Nothing -- Nodes with default data are
+ -- by default not put in the
+ -- mappings
+ where intent = vfileDescIntent $ getVFileDescription vfnodeFile
+ rmOpts n = Just n
+
+-- | Filters the tree to get only the nodes than can be embedded in the config file
+--
+-- TODO: Explain which are the nodes that can be embedded in config files.
+-- Explain how they relate to the nodes obtained in the previous function.
+-- Can a node have data and be embedded in a config file?
+-- Can a node be mapped externally and be embedded in a config file?
+--
+-- TODO: It is going to create some confusion having DataTree's and
+-- DataAccessTree's in the discourse. Could we define better what a
+-- DataTree is and see if there are better names?
+embedDataInVirtualTree
+ :: VirtualTree
+ -> Maybe VirtualTree
+embedDataInVirtualTree = over filteredLocsInTree keepOpts
+ where
+ keepOpts n@VirtualFileNode{..}
+ | Just VFForCLIOptions <- intent = Just n
+ | otherwise = Nothing
+ where intent = vfileDescIntent $ getVFileDescription vfnodeFile
+ keepOpts n = Just n
+
+variablesSection :: T.Text
+variablesSection = "variables"
+
+embeddedDataSection :: T.Text
+embeddedDataSection = "data"
+
+mappingsSection :: T.Text
+mappingsSection = "locations"
+
+-- TODO: This function type is suspicious given that it always
+-- yields a singleton list. May be fixed by changing the type or
+-- explaining what the function does. Recursion would work just
+-- as well if the function returned a pair.
+embeddedDataTreeToJSONFields
+ :: T.Text -> VirtualTree -> [(T.Text, Value)]
+embeddedDataTreeToJSONFields thisPath (LocationTree mbOpts sub) =
+ [(thisPath, Object $ opts' <> sub')]
+ where
+ opts' = case mbOpts of
+ VirtualFileNode{..} ->
+ case (vfnodeFile ^? vfileAsBidir) >>= getConvertedEmbeddedValue of
+ Just (Object o) -> o
+ _ -> mempty
+ _ -> mempty
+ sub' = HM.fromList $
+ concatMap (\(k,v) -> embeddedDataTreeToJSONFields (_ltpiName k) v) $ HM.toList sub
+
+
+-- ** VirtualTreeAndMappings: join a virtual tree with the locations it
+-- should be mapped to
+
+-- | A 'VirtualTree' associated with the mapping that should be applied
+-- to it. This is the way to serialize and deserialize a virtual tree
+data VirtualTreeAndMappings = VirtualTreeAndMappings
+ { vtamTree :: VirtualTree
+ , vtamMappings :: Either Loc LocationMappings
+ , vtamVariables :: LocVariableMap }
+
+-- VirtualTreeAndMappings is only 'ToJSON' and not 'FromJSON' because we need
+-- more context to deserialize it. It is done by virtualTreeConfigurationReader
+instance ToJSON VirtualTreeAndMappings where
+ toJSON (VirtualTreeAndMappings tree mappings variables) = Object $
+ (case embedDataInVirtualTree tree of
+ Just t -> HM.fromList $ embeddedDataTreeToJSONFields embeddedDataSection t
+ Nothing -> HM.empty)
+ <>
+ (case virtualTreeToMappings tree of
+ Just m ->
+ HM.singleton mappingsSection $ toJSON $ case mappings of
+ Right m' -> m'
+ Left rootLoc -> mappingRootOnly rootLoc <> m
+ Nothing -> HM.empty)
+ <>
+ (HM.singleton variablesSection $ toJSON variables)
+
+-- ** Mapping virtual trees to input files
+
+data LayerOperator = ReplaceLayers | AddLayer
+ deriving (Eq, Show)
+
+type VirtualTreeAndMappingsOverrides =
+ ( LocVariableMap
+ -- The map of variables and their values read from CLI too
+ , [(LocationTreePath, LayerOperator, SerializableLocShortcut)]
+ -- Locations mapped to new layers
+ , LocationTree (VirtualFileNode, Maybe (RecOfOptions SourcedDocField))
+ -- The tree containing the options parsed by optparse-applicative
+ )
+
+splitVarBinding :: String -> Either String (LocVariable, String)
+splitVarBinding (T.splitOn "=" . T.pack -> [T.unpack -> var, T.unpack -> val]) =
+ Right (LocVariable var,val)
+splitVarBinding _ = Left "Var binding must be of the form \"variable=value\""
+
+-- | Reads the data from the input config file. Constructs the parser for the
+-- command-line arguments. Combines both results to create the
+-- 'VirtualTree' (and its mappings) the pipeline should run on.
+virtualTreeConfigurationReader
+ :: VirtualTreeAndMappings
+ -> ConfigurationReader VirtualTreeAndMappings VirtualTreeAndMappingsOverrides
+virtualTreeConfigurationReader VirtualTreeAndMappings{vtamTree=defTree} =
+ ConfigurationReader overridesParser_ nullOverrides_ overrideCfgFromYamlFile_
+ where
+ overridesParser_ =
+ (,,) <$> variablesParser <*> mappingsParser <*> treeOfOptsParser
+ where
+ treeOfOptsParser = traverseOf (traversed . _2 . _Just) parseOptions $
+ fmap nodeAndRecOfOptions defTree
+ variablesParser = HM.fromList <$>
+ many (option (eitherReader splitVarBinding)
+ (long "var"
+ <> help "Set a variable already present in the config file"))
+
+ mappingsParser =
+ many (option (eitherReader locBinding)
+ (long "loc"
+ <> short 'l'
+ <> help "Map a virtual file path to a physical location"))
+ parseLocBinding vpath locOp loc = do
+ p <- fromTextRepr vpath
+ l <- over _Left displayException $ Y.decodeEither' $ TE.encodeUtf8 loc
+ return (p,locOp,l)
+ locBinding (T.splitOn "+=" . T.pack -> [vpath,loc]) =
+ parseLocBinding vpath AddLayer loc
+ locBinding (T.splitOn "=" . T.pack -> [vpath,loc]) =
+ parseLocBinding vpath ReplaceLayers loc
+ locBinding _ =
+ Left "Location mapping must be of the form \"virtual_path(+)=physical_path\""
+
+ nodeAndRecOfOptions :: VirtualFileNode -> (VirtualFileNode, Maybe DocRecOfOptions)
+ nodeAndRecOfOptions n@VirtualFileNode{..} = (n, (vfnodeFile ^? vfileAsBidir) >>= getConvertedEmbeddedValue)
+ nodeAndRecOfOptions n = (n, Nothing)
+ parseOptions :: RecOfOptions DocField -> Parser (RecOfOptions SourcedDocField)
+ parseOptions (RecOfOptions r) = RecOfOptions <$>
+ parseRecFromCLI (tagWithDefaultSource r)
+
+ nullOverrides_ (_,_,t) = allOf (traversed . _2 . _Just) nullRec t
+ nullRec (RecOfOptions RNil) = True
+ nullRec _ = False
+
+ overrideCfgFromYamlFile_ (Object aesonCfg) (cliVars, cliMappings, embeddedDataTree) = ([], vtam)
+ where
+ dataSectionContent = HM.lookup embeddedDataSection aesonCfg
+ mappingsSectionContent = HM.lookup mappingsSection aesonCfg
+ variablesSectionContent = HM.lookup variablesSection aesonCfg
+ addCLIMappings (LocationMappings_ yamlMappings) =
+ LocationMappings_ $ foldl addOne yamlMappings cliMappings
+ where
+ addOne mappings (path, locOp, loc) = HM.alter (go locOp loc) path mappings
+ go AddLayer loc (Just locs) = Just $ locs ++ [loc]
+ go _ loc _ = Just [loc]
+ vtam = VirtualTreeAndMappings
+ <$> traverseOf traversedTreeWithPath
+ (replaceWithDataFromConfig dataSectionContent) embeddedDataTree
+ <*> (Right . addCLIMappings <$> case mappingsSectionContent of
+ Just m -> parseJSONEither m
+ _ -> pure mempty)
+ <*> ((cliVars <>) <$> case variablesSectionContent of
+ Just m -> parseJSONEither m
+ _ -> pure mempty)
+ overrideCfgFromYamlFile_ _ _ = ([], Left "Configuration file doesn't contain a JSON object")
+
+ replaceWithDataFromConfig
+ :: Maybe Value -- The content of the embedded data section
+ -> (LocationTreePath, (VirtualFileNode, Maybe (RecOfOptions SourcedDocField)))
+ -> Either String VirtualFileNode
+ replaceWithDataFromConfig (Just dataSectionContent)
+ (LTP path, (node@VirtualFileNode{..}, mbRecFromCLI)) =
+ let rebuildNode :: (Typeable a', Typeable b')
+ => VirtualFile a' b' -> VirtualFileNode
+ rebuildNode newF = VirtualFileNode{vfnodeFile = newF, ..}
+ err :: String -> Either String a
+ err s = Left $ "replaceWithDataFromConfig: " ++ showVFileOriginalPath vfnodeFile ++ ": " ++ s
+ in case findInAesonVal path dataSectionContent of
+ Right rawObjFromYaml ->
+ case (mbRecFromCLI, vfnodeFile ^? vfileAsBidir) of
+ (Just _, Nothing) ->
+ err "that virtualfile contains an embedded record of options, yet it isn't bidirectional"
+ (Just (RecOfOptions (recFromCLI :: Rec SourcedDocField rs)), Just bidirVFile) -> do
+ -- YAML: yes, CLI: yes
+ -- First we merge all the data present in the yaml:
+ mergedDataFromYaml <- getMergedLayersFromAesonValue bidirVFile rawObjFromYaml
+ (RecOfOptions::DocRec rs -> DocRecOfOptions)
+ -- then we convert it back to a DocRec, as we need to override
+ -- it from the CLI:
+ RecOfOptions (recFromYaml::DocRec rs') <- case getToAtomicFn (bidirVFile ^. vfileSerials) of
+ Just toDocRec -> return (toDocRec mergedDataFromYaml :: DocRecOfOptions)
+ Nothing -> err "that virtualfile contained an embedded record of options, yet it contained \
+ \no function to convert back to DocRecOfOptions"
+ case eqT :: Maybe (rs' :~: rs) of
+ Nothing -> err "not the same record type has been returned \
+ \by getMergedLayersFromAesonValue"
+ Just Refl -> do
+ -- finally, we merge CLI and YAML config and modify the
+ -- node's embedded data:
+ rebuildNode <$> setConvertedEmbeddedValue bidirVFile
+ (RecOfOptions $ rmTags $ rzipWith chooseHighestPriority (tagWithYamlSource recFromYaml) recFromCLI)
+ (Nothing, _) -> do
+ -- YAML: yes, CLI: no
+ mergedDataFromYaml <- getMergedLayersFromAesonValue vfnodeFile rawObjFromYaml (id::Value -> Value)
+ return $ rebuildNode $ vfnodeFile & vfileEmbeddedValue .~ Just mergedDataFromYaml
+ Left _ -> case mbRecFromCLI of
+ Just (RecOfOptions recFromCLI) ->
+ -- YAML: no, CLI: yes
+ rebuildNode <$> setConvertedEmbeddedValue vfnodeFile (RecOfOptions (rmTags recFromCLI))
+ Nothing ->
+ -- YAML: no, CLI: no
+ return node
+ replaceWithDataFromConfig _ (_, (node, _)) = return node
+
+ findInAesonVal path = go path
+ where
+ go [] v = return v
+ go (p:ps) (Object (HM.lookup (_ltpiName p) -> Just v)) = go ps v
+ go _ _ = Left $ "virtualTreeConfigurationReader: " ++
+ (T.unpack $ toTextRepr $ LTP path) ++ " doesn't match any path in the Yaml config"
+
+ getMergedLayersFromAesonValue vf objFromYaml f = do
+ layersFromYaml <- case objFromYaml of
+ Object m
+ | Just v <- HM.lookup "$layers" m ->
+ case v of
+ Array layers -> mapM parseJSONEither $ foldr (:) [] layers
+ _ -> Left $ "If you specify data with $layers, $layers must contain an array."
+ _ -> (:[]) <$> parseJSONEither objFromYaml
+ tryMergeLayersForVFile vf $ map f layersFromYaml
+
+-- ** Transforming a virtual tree to a physical resource tree (ie. a
+-- tree with physical locations attached)
+
+-- | Transform a virtual file node in file node with definite physical
+-- locations. Splices in the locs the variables that can be spliced.
+--
+-- See the meaning of the parameters in 'Data.Locations.Mappings.applyMappings'.
+applyOneVFileMapping :: LocVariableMap -> [SomeLocWithVars m] -> VirtualFileNode -> Bool -> PhysicalFileNode m
+applyOneVFileMapping variables configLayers mbVF mappingIsExplicit = buildPhysicalNode mbVF
+ where
+ configLayers' = map (\(SomeGLoc l) -> SomeGLoc $ spliceLocVariables variables l) configLayers
+ buildPhysicalNode VirtualFileNode{..} = PhysicalFileNode layers vfnodeAccesses vfnodeFile
+ where
+ First defExt = vfnodeFile ^. vfileSerials . serialDefaultExt
+ intent = vfileDescIntent $ getVFileDescription vfnodeFile
+ -- TODO: It seems to be the case that the are some constraints to meet
+ -- on a valid physical tree. For instance, that having a VFForCLIOptions
+ -- node with derived layers is invalid?
+ layers | not mappingIsExplicit, Just VFForCLIOptions <- intent = []
+ -- Options usually present in the config file need an _explicit_
+ -- mapping to be present in the config file, if we want them to be
+ -- read from external files instead
+ | otherwise = map resolveExt configLayers'
+ resolveExt (SomeGLoc loc) = SomeGLoc $ setLocTypeIfMissing loc $ T.unpack $ fromMaybe "" defExt
+ buildPhysicalNode _ = MbPhysicalFileNode configLayers' [] Nothing
+
+-- | Binds together a 'VirtualTree' with physical locations an splices
+-- in the variables read from the configuration.
+getPhysicalTreeFromMappings
+ :: (LogThrow m) => VirtualTreeAndMappings -> LocResolutionM m (PhysicalTree m)
+getPhysicalTreeFromMappings (VirtualTreeAndMappings tree mappings variables) =
+ applyMappings (applyOneVFileMapping variables) m' tree
+ where
+ m' = case mappings of
+ Right m -> m
+ Left rootLoc -> mappingRootOnly rootLoc
+
+-- ** Transforming a physical tree to a data access tree (ie. a tree where each
+-- node is just a function that pulls or writes the relevant data)
+
+newtype TaskConstructionError =
+ TaskConstructionError String
+ deriving (Show)
+instance Exception TaskConstructionError
+
+-- TODO: Is this dead-code?
+data DataAccessContext = DAC
+ { locationAccessed :: Value
+ , locationAccessType :: VFNodeAccessType
+ , requiredLocVariables :: [LocVariable]
+ , providedLocVariables :: LocVariableMap
+ , splicedLocation :: Value }
+ deriving (Generic)
+
+instance ToJSON DataAccessContext
+instance ToObject DataAccessContext
+instance LogItem DataAccessContext where
+ payloadKeys v _
+ | v == V3 = AllKeys
+ | v >= V1 = SomeKeys ["locationAccessed", "locationAccessType"]
+ | otherwise = SomeKeys []
+
+makeDataAccessor
+ :: forall m a b a' b'. (LogMask m)
+ => String -- ^ VirtualFile path (for doc)
+ -> VirtualFile a' b' -- ^ The virtual file
+ -> [SomeLocWithVars m] -- ^ Every mapped layer (for doc)
+ -> Maybe b -- ^ Default value (used as base layer)
+ -> LayeredReadScheme b -- ^ How to handle the different layers
+ -> [(ToAtomicFn a, SomeLocWithVars m)] -- ^ Layers to write to
+ -> [(FromStreamFn b, SomeLocWithVars m)] -- ^ Layers to read from
+ -> LocVariableMap -- ^ The map of the values of the repetition indices
+ -> DataAccessor m a b
+makeDataAccessor vpath vf layers mbDefVal readScheme writeLocs readLocs repetKeyMap =
+ DataAccessor{..}
+ where
+ (VFileImportance sevRead sevWrite sevError clockAccess) = vf ^. vfileImportance
+ rkeys = vf ^. vfileSerials.serialRepetitionKeys
+ timeAccess :: String -> Severity -> String -> m t -> m t
+ timeAccess prefix sev loc action
+ | clockAccess = do
+ (r, time) <- clockM action
+ katipAddContext time $
+ logFM sev $ logStr $ prefix ++ " '" ++ loc ++ "' in " ++ showTimeSpec time
+ return r
+ | otherwise = do
+ r <- action
+ logFM sev $ logStr $ prefix ++ " '" ++ loc ++ "'"
+ return r
+ daLocsAccessed = traverse (\(SomeGLoc loc) -> SomeGLoc <$> fillLoc' repetKeyMap loc) layers
+ daPerformWrite input =
+ forM_ writeLocs $ \(ToAtomicFn f, SomeGLoc loc) ->
+ case cast (f input) of
+ Nothing -> error "Some atomic serializer isn't converting to a lazy ByteString"
+ Just bs -> do
+ loc' <- fillLoc repetKeyMap loc
+ katipAddNamespace "dataAccessor" $ katipAddNamespace "writer" $
+ katipAddContext (DAC (toJSON loc) ATWrite rkeys repetKeyMap (toJSON loc')) $ do
+ let runWrite = writeBSS loc' (BSS.fromLazy bs)
+ timeAccess "Wrote" sevWrite (show loc') $
+ withException runWrite $ \ioError ->
+ logFM sevError $ logStr $ displayException (ioError :: IOException)
+ daPerformRead = do
+ dataFromLayers <- forM readLocs (\(FromStreamFn (f :: Stream (Of i) m () -> m b), SomeGLoc loc) ->
+ case eqT :: Maybe (i :~: Strict.ByteString) of
+ Nothing -> error "Some stream reader isn't expecting a stream of strict ByteStrings"
+ Just Refl -> do
+ loc' <- fillLoc repetKeyMap loc
+ katipAddNamespace "dataAccessor" $ katipAddNamespace "reader" $
+ katipAddContext (DAC (toJSON loc) ATRead rkeys repetKeyMap (toJSON loc')) $ do
+ let runRead = readBSS loc' (f . BSS.toChunks)
+ r <- timeAccess "Read" sevRead (show loc') $ withException runRead $ \ioError ->
+ logFM sevError $ logStr $ displayException (ioError :: IOException)
+ return r)
+ let embeddedValAndLayers = maybe id (:) mbDefVal dataFromLayers
+ case (readScheme, embeddedValAndLayers) of
+ (_, [x]) -> return x
+ (LayeredReadWithNull, ls) -> return $ mconcat ls
+ (_, []) -> throwWithPrefix $ vpath ++ " has no layers from which to read"
+ (LayeredRead, l:ls) -> return $ foldl (<>) l ls
+ (SingleLayerRead, ls) -> do
+ logFM WarningS $ logStr $ vpath ++
+ " doesn't support layered mapping. Using only result from last layer '"
+ ++ show (last $ layers) ++ "'"
+ return $ last ls
+
+ fillLoc' rkMap loc = terminateLocWithVars $ spliceLocVariables rkMap loc
+ fillLoc rkMap loc =
+ case fillLoc' rkMap loc of
+ Left e -> katipAddNamespace "dataAccessor" $ throwWithPrefix e
+ Right r -> return r
+
+-- | Transform a file node with physical locations in node with a data access
+-- function to run. Matches the location (especially the filetype/extension) to
+-- the readers & writers available in the 'VirtualFile'.
+resolveDataAccess
+ :: forall m m'. (LogMask m, MonadThrow m')
+ => PhysicalFileNode m
+ -> m' (DataAccessNode m)
+resolveDataAccess (PhysicalFileNode{pfnodeFile=vf, ..}) = do
+ -- resolveDataAccess performs some checks when we build the pipeline:
+
+ -- First, that we aren't illegally binding to no layers a VirtualFile that
+ -- will be read without having any default value:
+ case (any (==ATRead) pfnodeAccesses, pfnodeLayers) of
+ (True, [])
+ -> case readScheme of
+ LayeredReadWithNull -> return ()
+ _ -> case mbEmbeddedVal of
+ Just _ -> return ()
+ Nothing ->
+ throwM $ TaskConstructionError $
+ vpath ++ " cannot be mapped to null. It will be read from and doesn't contain any default value."
+ _ -> return ()
+ -- Then, that we aren't writing to an unsupported filetype:
+ writeLocs <- findFunctions (typeOf (undefined :: Lazy.ByteString)) writers
+ -- And finally, that we aren't reading from an unsupported filetype:
+ readLocs <- findFunctions (typeOf (undefined :: Strict.ByteString)) readers
+ return $
+ DataAccessNode pfnodeLayers $
+ makeDataAccessor vpath vf pfnodeLayers
+ mbEmbeddedVal readScheme
+ writeLocs readLocs
+ where
+ readScheme = vf ^. vfileLayeredReadScheme
+ mbEmbeddedVal = vf ^. vfileEmbeddedValue
+
+ vpath = T.unpack $ toTextRepr $ LTP $ vf ^. vfileOriginalPath
+
+ readers = vf ^. vfileSerials . serialReaders . serialReadersFromStream
+ writers = vf ^. vfileSerials . serialWriters . serialWritersToAtomic
+
+ findFunctions :: TypeRep -> HM.HashMap (TypeRep,Maybe FileExt) v -> m' [(v, SomeLocWithVars m)]
+ findFunctions typeRep hm | HM.null hm = return []
+ | otherwise = mapM findFunction pfnodeLayers
+ where
+ findFunction (SomeGLoc loc) = case HM.lookup (typeRep, Just $ T.pack $ getLocType loc) hm of
+ Just f -> return (f, SomeGLoc loc)
+ -- TODO: add VirtualFile path to error
+ Nothing -> throwM $ TaskConstructionError $
+ show loc ++ " is mapped to " ++ vpath ++ " which doesn't support filetype '" ++ getLocType loc ++
+ "'. Accepted filetypes here are: " ++
+ mconcat (intersperse "," [T.unpack ext | (_,Just ext) <- HM.keys hm]) ++ "."
+
+resolveDataAccess (MbPhysicalFileNode layers _ _) =
+ return $ MbDataAccessNode layers $ First Nothing
diff --git a/src/System/TaskPipeline/Repetition.hs b/src/System/TaskPipeline/Repetition.hs
new file mode 100644
index 0000000..8dde3e4
--- /dev/null
+++ b/src/System/TaskPipeline/Repetition.hs
@@ -0,0 +1,119 @@
+{-# LANGUAGE GeneralizedNewtypeDeriving #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE TupleSections #-}
+
+module System.TaskPipeline.Repetition
+ ( RepInfo(..)
+ , TRIndex(..)
+ , HasTRIndex(..)
+ , OneOrSeveral(..)
+ , parMapTask
+ , parMapTask_
+ , IndexRange(..)
+ , oneIndex
+ , oneRange
+ , enumIndices
+ , enumTRIndices
+ ) where
+
+import Control.Applicative
+import Control.Arrow.Free (mapA)
+import Control.Lens hiding ((.=))
+import Control.Monad
+import Data.Aeson
+import Data.Aeson.Types (Parser)
+import qualified Data.Text as T
+import Prelude hiding ((.))
+import System.TaskPipeline.PTask
+import System.TaskPipeline.Repetition.Internal
+
+
+-- | Makes a 'PTask' repeatable and maps it in parallel over a list.
+parMapTask
+ :: (HasTRIndex a, KatipContext m)
+ => RepInfo
+ -> PTask m a b
+ -> PTask m [a] [b]
+parMapTask ri =
+ over taskRunnablePart mapA . makeTaskRepeatable ri
+
+-- | Simply repeats a task which takes no input over a list of indices, and
+-- ignores the end result. See 'RepInfo' for how these indices are
+-- used. See 'parMapTask' for a more complete version.
+parMapTask_
+ :: (HasTRIndex idx, KatipContext m)
+ => RepInfo
+ -> PTask m () b
+ -> PTask m [idx] ()
+parMapTask_ ri task =
+ arr (map (, ())) >>> parMapTask ri (arr snd >>> task) >>> arr (const ())
+
+
+-- * A simple type to handle index ranges
+
+data OneRange i = OneIndex i | OneRange i i
+
+toJSONStr :: (ToJSON a) => a -> Either Value T.Text
+toJSONStr a = case toJSON a of
+ String s -> Right s
+ Number n -> Right $ T.pack $ show n
+ o -> Left o
+
+parseJSONStr :: (FromJSON a) => T.Text -> Parser a
+parseJSONStr v = tryNumber v <|> parseJSON (String v)
+ where
+ tryNumber n = case reads $ T.unpack n of
+ [(n',_)] -> parseJSON $ Number n'
+ _ -> fail "Not a number"
+
+instance (ToJSON i) => ToJSON (OneRange i) where
+ toJSON (OneIndex i) = toJSON i
+ toJSON (OneRange a b) = case (toJSONStr a, toJSONStr b) of
+ (Right a', Right b') -> String $ a' <> ".." <> b'
+ (a', b') -> object ["lower" .= toJ a', "upper" .= toJ b']
+ where toJ (Left o) = o
+ toJ (Right s) = String s
+
+instance (FromJSON i) => FromJSON (OneRange i) where
+ parseJSON o@(String s) = case T.splitOn ".." s of
+ [a,b] -> (OneRange <$> parseJSONStr a <*> parseJSONStr b)
+ <|> (OneIndex <$> parseJSON o)
+ _ -> OneIndex <$> parseJSON o
+ parseJSON (Object o) = OneRange <$> o .: "lower" <*> o .: "upper"
+ parseJSON o = OneIndex <$> parseJSON o
+
+-- | Allows to read from a JSON file either one @a@ or an array of @a@
+newtype OneOrSeveral a = OneOrSeveral {getOneOrSeveral :: [a]}
+
+instance (ToJSON a) => ToJSON (OneOrSeveral a) where
+ toJSON (OneOrSeveral [r]) = toJSON r
+ toJSON (OneOrSeveral rs) = toJSON rs
+
+instance (FromJSON a) => FromJSON (OneOrSeveral a) where
+ parseJSON o@(Array _) = OneOrSeveral <$> parseJSON o
+ parseJSON o = OneOrSeveral . (:[]) <$> parseJSON o
+
+-- | A simple index list that can be used in configuration, and from which a
+-- list of indices can be extracted. The JSON representation of it is more
+-- compact than that of [(i,i)], as ranges are represented by "a..b" strings
+newtype IndexRange i = IndexRange (OneOrSeveral (OneRange i))
+ deriving (FromJSON, ToJSON)
+
+-- | A range of just one index
+oneIndex :: i -> IndexRange i
+oneIndex i = IndexRange $ OneOrSeveral [OneIndex i]
+
+-- | A range of consecutive values
+oneRange :: i -> i -> IndexRange i
+oneRange a b = IndexRange $ OneOrSeveral [OneRange a b]
+
+-- | Gives a list of indices from an index range
+enumIndices :: (Enum i) => IndexRange i -> [i]
+enumIndices (IndexRange (OneOrSeveral rs)) = concatMap toL rs
+ where
+ toL (OneIndex i) = [i]
+ toL (OneRange a b) = [a..b]
+
+-- | Gives a list of TaskRepetitionIndex
+enumTRIndices :: (Enum i, Show i) => IndexRange i -> [TRIndex]
+enumTRIndices = map (TRIndex . show) . enumIndices
diff --git a/src/System/TaskPipeline/Repetition/Foldl.hs b/src/System/TaskPipeline/Repetition/Foldl.hs
new file mode 100644
index 0000000..91cd2c8
--- /dev/null
+++ b/src/System/TaskPipeline/Repetition/Foldl.hs
@@ -0,0 +1,145 @@
+{-# LANGUAGE Arrows #-}
+{-# LANGUAGE ExistentialQuantification #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE RankNTypes #-}
+{-# LANGUAGE BangPatterns #-}
+{-# OPTIONS_GHC -fno-warn-orphans #-}
+
+-- | This module implements a Foldl-based interface for arrow computations
+-- compatible with the <https://hackage.haskell.org/package/foldl foldl
+-- library>. Use 'generalizeA' and 'generalizeM' to convert folds to
+-- 'FoldA'. This is the most general way to repeat a 'PTask' over some an input
+-- (list, array, stream, etc.).
+--
+-- This API is still experimental and might be subject to changes in the future
+
+module System.TaskPipeline.Repetition.Foldl
+ ( module Control.Arrow.FoldA
+ , RepInfo(..)
+ , TRIndex(..)
+ , HasTRIndex(..)
+ , generalizeM
+ , generalizeM_
+ , foldlTask
+ , foldStreamTask
+ , runFoldAOverPTask
+ , premapMaybe
+ ) where
+
+import Control.Arrow.FoldA
+import Control.Lens hiding (Fold)
+import Data.Locations
+import Prelude hiding ((.), id)
+import Streaming (Of (..), Stream)
+import qualified Streaming.Prelude as S
+import System.TaskPipeline.PTask
+import System.TaskPipeline.PTask.Internal
+import System.TaskPipeline.Repetition.Internal
+
+
+-- * Folding data with a PTask
+
+data RunningFoldM m a b =
+ forall x. RFM (x -> a -> m x) !x (x -> m b)
+
+-- | Turns a function creating a 'FoldM' into a 'FoldA' over 'PTasks'
+generalizeM :: (KatipContext m)
+ => (i -> FoldM m a b)
+ -> FoldA (PTask m) i a b
+generalizeM f =
+ FoldA (toTask $ \(Pair (RFM step acc done) x) -> do
+ acc' <- step acc x
+ return $ RFM step acc' done)
+ (toTask $ \i ->
+ case f i of
+ FoldM step start done -> do
+ initAcc <- start
+ return $ RFM step initAcc done)
+ (toTask $ \(RFM _ acc done) -> done acc)
+
+-- | Turns a 'FoldM' in some monad to a 'FoldA' compatible with 'foldTask'
+--
+-- This is a version of 'generalizeM' for when your initial accumulator doesn't
+-- need to be computed by a PTask
+generalizeM_ :: (KatipContext m)
+ => FoldM m a b -> FoldA (PTask m) i a b
+generalizeM_ (FoldM step start done) =
+ FoldA (toTask $ \(Pair a x) -> step a x)
+ (toTask $ const start)
+ (toTask done)
+
+instance (HasTRIndex a)
+ => HasTRIndex (Pair x a) where
+ getTRIndex (Pair _ a) = getTRIndex a
+
+-- | Runs a 'FoldA' created with 'arrowFold', 'generalizeA', 'unsafeGeneralizeM',
+-- or a composition of such folds.
+--
+-- You shouldn't use 'runFoldAOverPTask' directly in client code, rather you should
+-- specialize it to some collection. See e.g 'foldStreamTask' or 'foldlTask'.
+runFoldAOverPTask
+ :: (HasTRIndex a, KatipContext m)
+ => (forall ar x. (ArrowChoice ar)
+ => (forall inp out. (inp -> m out) -> ar inp out)
+ -> ar (Pair x a) x
+ -> ar (x, col) (x, r))
+ -- ^ This function receives a function to wrap an action in the @m@ monad
+ -- and the step to repeat. It should consume the collection
+ -> RepInfo -- ^ How to log the repeated task
+ -> FoldA (PTask m) i a b -- ^ The 'FoldA' to run
+ -> PTask m (i, col) (b, r)
+runFoldAOverPTask loopStep ri (FoldA step_ start done) =
+ (reqs, runnable) ^. from splitTask
+ where
+ (reqsStep, runnableStep) = makeTaskRepeatable ri step_ ^. splitTask
+ (reqsStart, runnableStart) = start ^. splitTask
+ (reqsDone, runnableDone) = done ^. splitTask
+ reqs = reqsStart <> reqsStep <> reqsDone
+ runnable =
+ first runnableStart >>> loopStep (withRunnableState . const) runnableStep
+ >>> first runnableDone
+
+-- | Consumes a Stream with a 'FoldA' created with 'arrowFold', 'generalizeA',
+-- 'unsafeGeneralizeM', or a composition of such folds.
+foldStreamTask
+ :: (HasTRIndex a, KatipContext m)
+ => RepInfo -- ^ How to log the repeated task
+ -> FoldA (PTask m) i a b -- ^ The FoldA to run
+ -> PTask m (i, Stream (Of a) m r) (b, r)
+foldStreamTask = runFoldAOverPTask $ \wrap step ->
+ let
+ consumeStream = proc (acc, stream) -> do
+ firstElem <- wrap S.next -< stream
+ case firstElem of
+ Left r -> returnA -< (acc, r)
+ Right (a, stream') -> do
+ !acc' <- step -< Pair acc a
+ consumeStream -< (acc', stream')
+ in consumeStream
+
+-- | Consumes a Foldable with a 'FoldA' over 'PTask'.
+--
+-- See 'arrowFold' to create such a 'FoldA'
+foldlTask
+ :: (Foldable f, HasTRIndex a, KatipContext m)
+ => RepInfo -- ^ How to log the repeated task
+ -> FoldA (PTask m) i a b
+ -> PTask m (i, f a) b
+foldlTask ri fld =
+ arr (second S.each) >>> foldStreamTask ri fld >>> arr fst
+
+-- | Allows to filter out some data before it is taken into account by the FoldA
+-- of PTask
+--
+-- We provide a implementation specific to PTask because a general
+-- implementation requires ArrowChoice
+premapMaybe :: (a -> Maybe a')
+ -> FoldA (PTask m) i a' b
+ -> FoldA (PTask m) i a b
+premapMaybe f (FoldA step start done) = FoldA step' start done
+ where
+ step' = step & over taskRunnablePart
+ (\run -> proc (Pair acc input) ->
+ case f input of
+ Nothing -> returnA -< acc
+ Just input' -> run -< Pair acc input')
diff --git a/src/System/TaskPipeline/Repetition/Internal.hs b/src/System/TaskPipeline/Repetition/Internal.hs
new file mode 100644
index 0000000..0dab24a
--- /dev/null
+++ b/src/System/TaskPipeline/Repetition/Internal.hs
@@ -0,0 +1,119 @@
+{-# LANGUAGE RecordWildCards #-}
+{-# LANGUAGE GeneralizedNewtypeDeriving #-}
+{-# OPTIONS_GHC -fno-warn-name-shadowing #-}
+
+module System.TaskPipeline.Repetition.Internal
+ ( RepInfo(..)
+ , TRIndex(..)
+ , HasTRIndex(..)
+ , makeTaskRepeatable
+ ) where
+
+import Control.Category
+import Control.Lens hiding ((:>))
+import Control.Monad
+import Data.Aeson
+import qualified Data.HashMap.Strict as HM
+import Data.Locations
+import Data.String (IsString(..))
+import Katip
+import Prelude hiding (id, (.))
+import System.TaskPipeline.PorcupineTree
+import System.TaskPipeline.PTask
+import System.TaskPipeline.PTask.Internal
+
+
+-- | Gives information about how a task will be repeated. The repInfoIndex will
+-- be used as a suffix in the default bindings to locations accessed by this
+-- task. If repInfoLogging is not Nothing, repInfoIndex will also be mentioned
+-- in the context of each line logged by the task to identify which repetition
+-- of the task is generating this log line. RepInfo is an instance of IsString
+-- so you can use it with OverloadedStrings (in which case repInfoIndex will be
+-- added to the context when verbosity level is at least 1).
+data RepInfo = RepInfo
+ { repInfoIndex :: LocVariable
+ -- ^ A name that will be used as a metavariable in the config file. It may
+ -- also be used by the logger as a context key, to indicate which repetition
+ -- is currently running.
+ , repInfoLogging :: Maybe Verbosity
+ -- ^ The minimal vebosity level at which to display the value associated with
+ -- the repetition index in the logger context. Nothing if we don't want to add
+ -- context.
+ } deriving (Eq, Show)
+
+instance IsString RepInfo where
+ fromString s = RepInfo (fromString s) (Just V1)
+
+-- | Logging context for repeated tasks
+data TaskRepetitionContext = TRC
+ { _repetitionKey :: LocVariable
+ , _repetitionKeyVal :: String
+ , _repetitionKeyVerb :: Verbosity }
+
+instance ToJSON TaskRepetitionContext where
+ toJSON (TRC k v _) = toJSON $ HM.singleton k v
+instance ToObject TaskRepetitionContext
+instance LogItem TaskRepetitionContext where
+ payloadKeys v (TRC _ _ v') | v >= v' = AllKeys
+ | otherwise = SomeKeys []
+
+-- | Task Repetition Index. Is given to functions that repeat tasks for each
+-- iteration.
+newtype TRIndex = TRIndex { unTRIndex :: String }
+ deriving (FromJSON, ToJSON)
+
+instance IsString TRIndex where
+ fromString = TRIndex
+
+-- | The class of every data that can be repeated
+class HasTRIndex a where
+ getTRIndex :: a -> TRIndex
+
+instance HasTRIndex TRIndex where
+ getTRIndex = id
+
+instance HasTRIndex Int where
+ getTRIndex = TRIndex . show
+
+instance HasTRIndex Integer where
+ getTRIndex = TRIndex . show
+
+instance (HasTRIndex i) => HasTRIndex (i,a) where
+ getTRIndex (i,_) = getTRIndex i
+
+-- | Turns a task into one that can be called several times, each time with a
+-- different index value @i@. This index will be used to alter every path
+-- accessed by the task. The first argument gives a name to that index, that
+-- will appear in the configuration file in the default bindings for the
+-- VirtualFiles accessed by this task. The second one controls whether we want
+-- to add to the logging context which repetition is currently running.
+makeTaskRepeatable
+ :: (HasTRIndex a, KatipContext m)
+ => RepInfo
+ -> PTask m a b
+ -> PTask m a b
+makeTaskRepeatable (RepInfo repetitionKey mbVerb) =
+ over splitTask
+ (\(reqTree, runnable) ->
+ ( fmap addKeyToVirtualFile reqTree
+ , modifyingRuntimeState alterState id runnable ))
+ where
+ addKeyToVirtualFile VirtualFileNode{..} =
+ VirtualFileNode
+ {vfnodeFile = vfnodeFile &
+ over (vfileSerials.serialRepetitionKeys) (repetitionKey:)
+ ,..}
+ addKeyToVirtualFile emptyNode = emptyNode
+
+ alterState input =
+ over ptrsKatipContext alterContext
+ . over (ptrsDataAccessTree.traversed) addKeyValToDataAccess
+ where
+ idxStr = unTRIndex $ getTRIndex input
+ newCtxItem = TRC repetitionKey idxStr <$> mbVerb
+ alterContext ctx = case newCtxItem of
+ Nothing -> ctx
+ Just item -> ctx <> liftPayload item
+ addKeyValToDataAccess (DataAccessNode l fn) =
+ DataAccessNode l $ fn . HM.insert repetitionKey idxStr
+ addKeyValToDataAccess emptyNode = emptyNode
diff --git a/src/System/TaskPipeline/Repetition/Streaming.hs b/src/System/TaskPipeline/Repetition/Streaming.hs
new file mode 100644
index 0000000..3edfe32
--- /dev/null
+++ b/src/System/TaskPipeline/Repetition/Streaming.hs
@@ -0,0 +1,127 @@
+{-# LANGUAGE FlexibleContexts #-}
+
+-- | This module contains functions that map a task over a stream. Delaying
+-- effects of tasks like this plays badly with caching. This interface will be
+-- removed in a future version of porcupine.
+
+module System.TaskPipeline.Repetition.Streaming
+ ( STask, ISTask, OSTask
+ , mappingOverStream
+ , listToStreamTask, runStreamTask, streamToListTask
+ , Typeable
+ ) where
+
+import Control.Arrow
+import Control.Category
+import Control.Lens hiding ((:>))
+import Control.Monad
+import Data.Locations
+import Katip
+import Prelude hiding ((.))
+import Streaming (Of (..), Stream)
+import qualified Streaming.Prelude as S
+import System.TaskPipeline.PTask
+import System.TaskPipeline.PTask.Internal
+import System.TaskPipeline.Repetition.Internal
+
+
+-- * Type aliases for tasks over streams
+
+-- | An PTask mapping a action over a Stream, transforming @a@'s into
+-- @b@'s. Each element in the stream should be associated to an identifier.
+type STask m a b r =
+ PTask m
+ (Stream (Of a) m r)
+ (Stream (Of b) m r)
+
+-- | An 'PTask' that consumes an Input Stream and just returns its result.
+type ISTask m a r =
+ PTask m
+ (Stream (Of a) m r)
+ r
+
+-- | An 'PTask' that emits an Output Stream.
+type OSTask m a b =
+ PTask m
+ a
+ (Stream (Of b) m ())
+
+-- * Running tasks over streams
+
+-- | Turns a task into something that will be repeated once per each item in its
+-- input. This is done by transforming VirtualFile accessed by the tasks to add
+-- a 'RepetitionKey' to it, indicating that its final file name should be
+-- modified by adding an identifier to it just before reading it or writing it.
+-- So each loop actually accesses different locations in the end.
+--
+-- Calls to 'mappingOverStream' can be nested, this way the underlying VirtualFiles
+-- will have one 'RepetitionKey' per loop (from outermost loop to innermost).
+mappingOverStream
+ :: (HasTRIndex a, CanRunPTask m)
+ => LocVariable -- ^ A variable name, used as a key to indicate which
+ -- repetition we're at. Used in the logger context and
+ -- exposed in the yaml file for each VirtualFile that
+ -- will be repeated by this task
+ -> Maybe Verbosity -- ^ The minimal vebosity level at which to display the
+ -- logger context. (Nothing if we don't want to add
+ -- context)
+ -> PTask m a b -- ^ The base task X to repeat
+ -> STask m a b r -- ^ A task that will repeat X it for each input. Each
+ -- input is associated to a identifier that will be
+ -- appended to every Loc mapped to every leaf in the
+ -- LocationTree given to X.
+mappingOverStream repetitionKey mbVerb =
+ over taskRunnablePart mappingRunnableOverStream
+ . makeTaskRepeatable (RepInfo repetitionKey mbVerb)
+
+{-# DEPRECATED mappingOverStream "Prefer the FoldA API to repeat tasks and consume streams" #-}
+
+-- | IMPORTANT: That requires the RunnableTask to be repeatable. See
+-- 'makeTaskRepeatable'.
+mappingRunnableOverStream
+ :: (CanRunPTask m)
+ => RunnableTask m a b
+ -> RunnableTask m
+ (Stream (Of a) m r)
+ (Stream (Of b) m r)
+mappingRunnableOverStream runnable =
+ withRunnableState $ \state inputStream -> do
+ firstElem <- S.next inputStream
+ case firstElem of
+ Left r -> return (return r) -- Empty input stream
+ Right (firstInput, inputStream') -> do
+ firstResult <- go state firstInput
+ return $
+ firstResult `S.cons` S.mapM (go state) inputStream'
+ where
+ go = execRunnableTask runnable
+ -- NOTE: We "cheat" here: we run the funflow layer of the inner
+ -- task. We should find a way not to have to do that, but when using
+ -- Streaming (which delays effects in a monad) it's really problematic.
+
+-- * Helper functions to create and run streams
+
+-- | Runs the input stream, forgets all its elements and just returns its result
+runStreamTask :: (KatipContext m)
+ => PTask m
+ (Stream (Of t) m r)
+ r
+runStreamTask = toTask S.effects
+
+-- | An 'PTask' converting a list to a stream
+listToStreamTask :: (Monad m)
+ => PTask m
+ [t]
+ (Stream (Of t) m ())
+listToStreamTask = arr S.each
+
+-- | An 'PTask' converting an input stream to a list. WARNING: It can cause
+-- space leaks if the list is too big, as the output list will be eagerly
+-- evaluated. This function is provided only for compatibility with existing
+-- tasks expecting lists. Please consider switching to processing streams
+-- directly. See 'S.toList' for more details.
+streamToListTask :: (KatipContext m)
+ => PTask m
+ (Stream (Of t) m r)
+ [t]
+streamToListTask = toTask (S.toList_ . void)
diff --git a/src/System/TaskPipeline/Run.hs b/src/System/TaskPipeline/Run.hs
new file mode 100644
index 0000000..e81f0ce
--- /dev/null
+++ b/src/System/TaskPipeline/Run.hs
@@ -0,0 +1,269 @@
+{-# LANGUAGE ConstraintKinds #-}
+{-# LANGUAGE DataKinds #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE GADTs #-}
+{-# LANGUAGE OverloadedLabels #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE RankNTypes #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE TemplateHaskell #-}
+{-# OPTIONS_GHC "-fno-warn-missing-signatures" #-}
+
+module System.TaskPipeline.Run
+ ( PipelineConfigMethod(..)
+ , PipelineCommand(..)
+ , CanRunPTask
+ , Rec(..)
+ , ContextRunner(..)
+ , SimplePTaskM, BasePorcupineContexts
+ , ReaderSoup
+ , FieldWithAccessors
+ , AcceptableArgsAndContexts
+ , (<--)
+ , (:::)
+ , baseContexts, baseContextsWithScribeParams
+ , maxVerbosityLoggerScribeParams
+ , warningsAndErrorsLoggerScribeParams
+ , runPipelineTask
+ , runLocalPipelineTask
+ , simpleRunPTask
+ , runPipelineTaskWithExceptionHandlers
+ , runPipelineCommandOnPTask
+ ) where
+
+import Control.Lens
+import Control.Monad.IO.Class
+import Control.Monad.ReaderSoup
+import Control.Monad.ReaderSoup.Katip ()
+import Data.Locations hiding ((</>))
+import Data.Locations.Accessors
+import Data.Maybe
+import Data.Vinyl.Derived (HasField, rlensf)
+import Katip
+import Prelude hiding ((.))
+import System.Environment (lookupEnv, withArgs)
+import System.FilePath ((</>))
+import System.Posix.Directory (getWorkingDirectory)
+import System.TaskPipeline.CLI
+import System.TaskPipeline.Logger
+import System.TaskPipeline.PorcupineTree
+import System.TaskPipeline.PTask
+import System.TaskPipeline.PTask.Internal
+
+
+-- | Tells whether a record of args can be used to run a PTask
+type AcceptableArgsAndContexts args ctxs m =
+ (ArgsForSoupConsumption args, ctxs ~ ContextsFromArgs args
+ ,IsInSoup ctxs "katip", IsInSoup ctxs "resource"
+ ,RunsKatipOver args m)
+
+-- | We need to have some information about how katip will be run, because we
+-- will want to override that from the command-line
+type RunsKatipOver args m =
+ (HasField Rec "katip" args args (ContextRunner KatipContextT m) (ContextRunner KatipContextT m)
+ ,MonadMask m, MonadIO m)
+
+-- | Runs an 'PTask' according to some 'PipelineConfigMethod' and with an input
+-- @i@. In principle, it should be directly called by your @main@ function. It
+-- exits with @ExitFailure 1@ when the 'PipelineTask' raises an uncatched
+-- exception.
+runPipelineTask
+ :: (AcceptableArgsAndContexts args ctxs m)
+ => PipelineConfigMethod o -- ^ Whether to use the CLI and load the yaml
+ -- config or not
+ -> Rec (FieldWithAccessors (ReaderSoup ctxs)) args -- ^ The location
+ -- accessors to use
+ -> PTask (ReaderSoup ctxs) i o -- ^ The whole pipeline task to run
+ -> i -- ^ The pipeline task input
+ -> IO o -- ^ The pipeline task output
+runPipelineTask = runPipelineTaskWithExceptionHandlers []
+
+runPipelineTaskWithExceptionHandlers
+ :: (AcceptableArgsAndContexts args ctxs m)
+ => [Handler IO o] -- ^ Exception handlers in case the pipeline raises
+ -- an exception.
+ -> PipelineConfigMethod o -- ^ Whether to use the CLI and load the yaml
+ -- config or not
+ -> Rec (FieldWithAccessors (ReaderSoup ctxs)) args -- ^ The location
+ -- accessors to use
+ -> PTask (ReaderSoup ctxs) i o -- ^ The whole pipeline task to run
+ -> i -- ^ The pipeline task input
+ -> IO o -- ^ The pipeline task output
+runPipelineTaskWithExceptionHandlers exceptionHandlers configMethod accessors ptask input = do
+ let tree = ptask ^. taskRequirements
+ catches
+ (bindVirtualTreeAndRun configMethod accessors tree $
+ runPipelineCommandOnPTask ptask input)
+ exceptionHandlers
+
+-- | A monad that implements MonadIO, MonadUnliftIO, KatipContext and
+-- MonadResource. For simplest uses of porcupine.
+type SimplePTaskM = ReaderSoup BasePorcupineContexts
+
+-- | Like 'runPipelineTask' if you don't need to access any other resources than
+-- local files. Uses the 'maxVerbosityLoggerScribeParams' by default.
+runLocalPipelineTask
+ :: PipelineConfigMethod o
+ -> PTask SimplePTaskM i o
+ -> i
+ -> IO o
+runLocalPipelineTask configMethod =
+ runPipelineTask configMethod (baseContexts $ configMethod ^. pipelineConfigMethodProgName)
+
+-- | Runs a PTask without reading any configuration nor parsing the CLI, with
+-- only local files being accessible, and using PWD as the root location for all
+-- files read and written.
+simpleRunPTask
+ :: PTask SimplePTaskM i o
+ -> i
+ -> IO o
+simpleRunPTask = runLocalPipelineTask (NoConfig "simpleRunPTask" ".")
+
+-- | Runs the required 'PipelineCommand' on an 'PTask'
+runPipelineCommandOnPTask
+ :: (CanRunPTask m)
+ => PTask m i o
+ -> i
+ -> PipelineCommand
+ -> Maybe o
+ -> PhysicalTree m
+ -> FunflowOpts m
+ -> m o
+runPipelineCommandOnPTask ptask input cmd defRetVal physTree ffopts = do
+ case cmd of
+ RunPipeline -> do
+ dataTree <- traverse resolveDataAccess physTree
+ withTaskState ffopts dataTree $ \initState -> do
+ $(logTM) NoticeS $ logStr $ case flowIdentity ffopts of
+ Just i -> "Using funflow store at '" ++ storePath ffopts ++ "' with identity "
+ ++ show i ++ "." ++
+ (case remoteCacheLoc ffopts of
+ Just l -> "Using remote cache at " ++ show l
+ _ -> "")
+ Nothing -> identityVar ++ " not specified. The cache will not be used."
+ execRunnableTask (ptask ^. taskRunnablePart) initState input
+ ShowTree root showOpts -> do
+ liftIO $ putStrLn $ prettyLocTree root $
+ case physTree ^. inLocTree root of
+ Just t -> fmap (PhysicalFileNodeWithShowOpts showOpts) t
+ _ -> error $ "Path `" ++ showLTP root ++ "' doesn't exist in the porcupine tree"
+ case defRetVal of
+ Just r -> return r
+ Nothing -> error "NOT EXPECTED: runPipelineCommandOnPTask(ShowTree) was not given a default\
+ \value to return. Please submit this as a bug."
+
+storeVar,remoteCacheVar,identityVar,coordVar :: String
+storeVar = "FUNFLOW_STORE"
+remoteCacheVar = "FUNFLOW_REMOTE_CACHE"
+coordVar = "FUNFLOW_COORDINATOR"
+identityVar = "FUNFLOW_IDENTITY"
+
+-- | Reads the relevant environment variables to construct the set of parameters
+-- necessary to initialize funflow
+getFunflowOpts :: (MonadIO m, LogThrow m) => LocResolutionM m (FunflowOpts m)
+getFunflowOpts = do
+ pwd <- liftIO getWorkingDirectory
+ givenStore <- lookupEnv' storeVar
+ opts <- FunflowOpts
+ (fromMaybe (pwd </> "_funflow/store") givenStore)
+ <$> (fromMaybe (pwd </> "_funflow/coordinator.db") <$> lookupEnv' coordVar)
+ <*> (lookupEnv' identityVar >>= parseIdentity)
+ <*> (lookupEnv' remoteCacheVar >>= traverse resolveYamlDocToSomeLoc)
+ case (flowIdentity opts, givenStore, remoteCacheLoc opts) of
+ (Nothing, Just _, _) -> warnAboutIdentity storeVar
+ (Nothing, _, Just _) -> warnAboutIdentity remoteCacheVar
+ _ -> return ()
+ return opts
+ where
+ lookupEnv' = liftIO . lookupEnv
+ parseIdentity Nothing = return Nothing
+ parseIdentity (Just "") = return Nothing
+ parseIdentity (Just s) = case reads s of
+ [(i,_)] -> return $ Just i
+ _ -> fail $ identityVar ++ " isn't a valid integer"
+ warnAboutIdentity var = $(logTM) WarningS $ logStr $
+ var ++ " has been given but no " ++ identityVar ++
+ " has been provided. Caching will NOT be performed."
+
+-- | Resolve all the JSON values in the mappings and paths from environment (for
+-- funflow) to locations tied to their respective LocationAccessors
+getPhysTreeAndFFOpts
+ :: (MonadIO m, LogThrow m)
+ => VirtualTreeAndMappings
+ -> AvailableAccessors m
+ -> m (PhysicalTree m, FunflowOpts m)
+getPhysTreeAndFFOpts vtam accessors =
+ flip runReaderT accessors $
+ (,) <$> getPhysicalTreeFromMappings vtam
+ <*> getFunflowOpts
+
+-- | Runs the cli if using FullConfig, binds every location in the virtual tree
+-- to its final value/path, and passes to the continuation the physical tree.
+bindVirtualTreeAndRun
+ :: (AcceptableArgsAndContexts args ctxs m)
+ => PipelineConfigMethod r -- ^ How to read the configuration
+ -> Rec (FieldWithAccessors (ReaderSoup ctxs)) args
+ -> VirtualTree -- ^ The tree to look for DocRecOfoptions in
+ -> (PipelineCommand
+ -> Maybe r
+ -> PhysicalTree (ReaderSoup ctxs)
+ -> FunflowOpts (ReaderSoup ctxs)
+ -> ReaderSoup ctxs r)
+ -- ^ What to do with the tree
+ -> IO r
+bindVirtualTreeAndRun (NoConfig _ root) accessorsRec tree f =
+ consumeSoup argsRec $ do
+ (physTree, ffPaths) <- getPhysTreeAndFFOpts defaultConfig accessors
+ f RunPipeline Nothing physTree ffPaths
+ where
+ defaultConfig = VirtualTreeAndMappings tree (Left root) mempty
+ (accessors, argsRec) = splitAccessorsFromArgRec accessorsRec
+bindVirtualTreeAndRun (ConfigFileOnly progName configFileURL defRoot) accessorsRec tree f = do
+ -- We deactivate every argument that might have been passed so the only choice
+ -- is to run the pipeline. Given the parsing of the config file and the
+ -- command-line are quite related, it is difficult to just remove the CLI
+ -- parsing until that part of the code is refactored to better separate CLI
+ -- parsing and deserialization of the VirtualTreeAndMappings from the config
+ -- file
+ res <- withArgs ["-qq", "--context-verb", "2", "--log-format", "compact"] $
+ -- No CLI arg is passable, so until we improve CLI parsin as stated
+ -- just above, in that case we limit ourselves to warnings and errors
+ bindVirtualTreeAndRun (FullConfig progName configFileURL defRoot Nothing) accessorsRec tree $
+ \_ _ t o -> Just <$> f RunPipeline Nothing t o
+ case res of
+ Just r -> return r
+ Nothing -> error "NOT EXPECTED: bindVirtualTreeAndRun(ConfigFileOnly) didn't receive a result\
+ \from the pipeline. Please submit this as a bug."
+bindVirtualTreeAndRun (FullConfig progName defConfigFileURL defRoot defRetVal) accessorsRec tree f =
+ withConfigFileSourceFromCLI $ \mbConfigFileSource -> do
+ let configFileSource = fromMaybe (ConfigFileURL (LocalFile defConfigFileURL)) mbConfigFileSource
+ mbConfigFromFile <-
+ tryReadConfigFileSource configFileSource $ \remoteURL ->
+ consumeSoup argsRec $ do
+ -- If config file is remote, we use the accessors and run
+ -- the readerSoup with the defaut katip params
+ SomeGLoc loc <- flip runReaderT accessors $
+ resolvePathToSomeLoc $ show remoteURL
+ -- TODO: Implement locExists in each accessor and use it
+ -- here. For now we fail if given a remote config that
+ -- doesn't exist.
+ readBSS loc decodeYAMLStream
+ parser <- pipelineCliParser virtualTreeConfigurationReader progName $
+ BaseInputConfig (case configFileSource of
+ ConfigFileURL (LocalFile filep) -> Just filep
+ _ -> Nothing)
+ mbConfigFromFile
+ defaultConfig
+ withCliParser progName "Run a task pipeline" parser defRetVal run
+ where
+ defaultConfig = VirtualTreeAndMappings tree (Left defRoot) mempty
+ (accessors, argsRec) = splitAccessorsFromArgRec accessorsRec
+ run finalConfig cmd lsp performConfigWrites =
+ let -- We change the katip runner, from the options we got from CLI:
+ argsRec' = argsRec & set (rlensf #katip)
+ (ContextRunner (runLogger progName lsp))
+ in
+ consumeSoup argsRec' $ do
+ unPreRun performConfigWrites
+ (physTree, ffPaths) <- getPhysTreeAndFFOpts finalConfig accessors
+ f cmd (Just defRetVal) physTree ffPaths
diff --git a/src/System/TaskPipeline/VirtualFileAccess.hs b/src/System/TaskPipeline/VirtualFileAccess.hs
new file mode 100644
index 0000000..c6c3722
--- /dev/null
+++ b/src/System/TaskPipeline/VirtualFileAccess.hs
@@ -0,0 +1,405 @@
+{-# LANGUAGE ExistentialQuantification #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE PartialTypeSignatures #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE TupleSections #-}
+{-# LANGUAGE TypeFamilies #-}
+{-# LANGUAGE TypeOperators #-}
+{-# OPTIONS_GHC -Wall #-}
+
+-- | This module provides some utilities for when the pipeline needs to access
+-- several files organized in layers for each location in the 'LocationTree'
+module System.TaskPipeline.VirtualFileAccess
+ ( -- * Reexports
+ module Data.Locations.LogAndErrors
+
+ -- * High-level API
+ , loadData
+ , loadDataStream
+ , loadDataList
+ , tryLoadDataStream
+ , writeData
+ , writeDataStream
+ , writeDataList
+ , writeEffData
+ , writeDataFold
+ , DataWriter(..), DataReader(..)
+ , getDataWriter, getDataReader
+
+ -- * Lower-level API
+ , EffectSeq(..), EffectSeqFromList(..)
+ , SingletonES(..), ListES(..), StreamES(..)
+ , AccessToPerform(..)
+ , DataAccessor(..)
+ , VFNodeAccessType(..)
+ , SomeGLoc(..), SomeLoc
+ , accessVirtualFile'
+ , getDataAccessorFn
+ , getLocsMappedTo
+
+ -- * Internal API
+ , accessVirtualFile
+ , withVFileInternalAccessFunction
+ , withFolderDataAccessNodes
+ ) where
+
+import Prelude hiding (id, (.))
+
+import Control.Funflow (Properties(..))
+import Control.Lens
+import Control.Monad (forM)
+import Control.Monad.Trans
+import Data.Default
+import qualified Data.Foldable as F
+import qualified Data.HashMap.Strict as HM
+import Data.Locations
+import Data.Locations.Accessors
+import Data.Locations.LogAndErrors
+import Data.Monoid
+import Data.Representable
+import qualified Data.Text as T
+import Data.Typeable
+import Streaming (Of (..), Stream)
+import qualified Streaming.Prelude as S
+import System.TaskPipeline.PorcupineTree
+import System.TaskPipeline.PTask
+import System.TaskPipeline.PTask.Internal
+import System.TaskPipeline.Repetition.Internal
+import qualified System.TaskPipeline.Repetition.Foldl as F
+
+
+-- | Uses only the read part of a 'VirtualFile'. It is therefore considered as a
+-- pure 'DataSource'.
+loadData
+ :: (LogThrow m, Typeable a, Typeable b)
+ => VirtualFile a b -- ^ Use as a 'DataSource'
+ -> PTask m ignored b -- ^ The resulting task. Ignores its input.
+loadData vf =
+ getDataReader vf >>> toTask' props drPerformRead
+ where
+ cacher = case _vfileReadCacher vf of
+ NoCache -> NoCache
+ Cache k s r -> Cache (\i d -> k i (fmap toHashableLocs $ drLocsAccessed d)) s r
+ props = Properties Nothing cacher Nothing
+
+-- | Loads a stream of repeated occurences of a VirtualFile, from a stream of
+-- indices. The process is lazy: the data will actually be read when the
+-- resulting stream is consumed BUT this also means that the reading cannot be
+-- cached.
+loadDataStream
+ :: (HasTRIndex idx, LogThrow m, Typeable a, Typeable b, Monoid r)
+ => LocVariable
+ -> VirtualFile a b -- ^ Used as a 'DataSource'
+ -> PTask m (Stream (Of idx) m r) (Stream (Of (idx, b)) m r)
+loadDataStream lv vf =
+ arr (StreamES . S.map (,error "loadDataStream: THIS IS VOID"))
+ >>> accessVirtualFile' (DoRead id) lv vf
+ >>> arr streamFromES
+
+-- | Loads as a list repeated occurences of a VirtualFile, from a list of
+-- indices. BEWARE: the process is _strict_: every file will be loaded in
+-- memory, and the list returned is fully evaluated. So you should not use
+-- 'loadDataList' if the number of files to read is too huge.
+loadDataList
+ :: (HasTRIndex idx, LogThrow m, Typeable a, Typeable b)
+ => LocVariable
+ -> VirtualFile a b -- ^ Used as a 'DataSource'
+ -> PTask m [idx] [(idx, b)]
+loadDataList lv vf =
+ arr (ListES . map (return . (,error "loadDataList: THIS IS VOID")))
+ >>> accessVirtualFile' (DoRead id) lv vf
+ >>> toTask (sequence . getListFromES)
+
+-- | Like 'loadDataStream', but won't stop on a failure on a single file
+tryLoadDataStream
+ :: (Exception e, HasTRIndex idx, LogCatch m, Typeable a, Typeable b, Monoid r)
+ => LocVariable
+ -> VirtualFile a b -- ^ Used as a 'DataSource'
+ -> PTask m (Stream (Of idx) m r) (Stream (Of (idx, Either e b)) m r)
+tryLoadDataStream lv vf =
+ arr (StreamES . S.map (,error "loadDataStream: THIS IS VOID"))
+ >>> accessVirtualFile' (DoRead try) lv vf
+ >>> arr streamFromES
+
+-- | Uses only the write part of a 'VirtualFile'. It is therefore considered as
+-- a pure 'DataSink'.
+writeData
+ :: (LogThrow m, Typeable a, Typeable b)
+ => VirtualFile a b -- ^ Used as a 'DataSink'
+ -> PTask m a ()
+writeData vf =
+ id &&& getDataWriter vf >>> toTask' props (uncurry $ flip dwPerformWrite)
+ where
+ cacher = case _vfileWriteCacher vf of
+ NoCache -> NoCache
+ Cache k s r -> Cache (\i (a, d) -> k i (a, fmap toHashableLocs $ dwLocsAccessed d)) s r
+ props = Properties Nothing cacher Nothing
+
+-- | Like 'writeData', but the data to write first has to be computed by an
+-- action running some effects. Therefore, these effects will be executed only
+-- if the 'VirtualFile' is mapped to something.
+writeEffData
+ :: (LogThrow m, Typeable a, Typeable b)
+ => VirtualFile a b -- ^ Used as a 'DataSink'
+ -> PTask m (m a) ()
+writeEffData vf =
+ arr (SingletonES . (fmap ([] :: [TRIndex],)))
+ >>> accessVirtualFile (DoWrite id) [] vf
+ >>> toTask runES
+
+-- | Like 'writeDataList', but takes a stream as an input instead of a list. If
+-- the VirtualFile is not mapped to any physical file (this can be authorized if
+-- the VirtualFile 'canBeUnmapped'), then the input stream's effects will not be
+-- executed. This is why its end result must be a Monoid. See
+-- System.TaskPipeline.Repetition.Fold for more complex ways to consume a
+-- Stream.
+writeDataStream
+ :: (HasTRIndex idx, LogThrow m, Typeable a, Typeable b, Monoid r)
+ => LocVariable
+ -> VirtualFile a b -- ^ Used as a 'DataSink'
+ -> PTask m (Stream (Of (idx, a)) m r) r
+writeDataStream lv vf =
+ arr StreamES
+ >>> accessVirtualFile' (DoWrite id) lv vf
+ >>> toTask runES
+
+-- | The simplest way to consume a stream of data inside a pipeline. Just write
+-- it to repeated occurences of a 'VirtualFile'.
+writeDataList
+ :: (HasTRIndex idx, LogThrow m, Typeable a, Typeable b)
+ => LocVariable
+ -> VirtualFile a b -- ^ Used as a 'DataSink'
+ -> PTask m [(idx, a)] ()
+writeDataList lv vf =
+ arr (ListES . map return)
+ >>> accessVirtualFile' (DoWrite id) lv vf
+ >>> toTask runES
+
+-- | A very simple fold that will just repeatedly write the data to different
+-- occurences of a 'VirtualFile'.
+writeDataFold :: (LogThrow m, Typeable a, Typeable b)
+ => VirtualFile a b -> F.FoldA (PTask m) i a ()
+writeDataFold vf = F.premapInitA (arr $ const ()) $ F.arrowFold (arr snd >>> writeData vf)
+
+-- | Gets a 'DataAccessor' to the 'VirtualFile', ie. doesn't read or write it
+-- immediately but gets a function that will make it possible.
+getDataAccessorFn
+ :: (LogThrow m, Typeable a, Typeable b)
+ => [VFNodeAccessType] -- ^ The accesses that will be performed on the DataAccessor
+ -> VirtualFile a b
+ -> PTask m () (LocVariableMap -> DataAccessor m a b)
+getDataAccessorFn accesses vfile = withVFileInternalAccessFunction def accesses vfile
+ (\mkAccessor _ _ -> return mkAccessor)
+
+-- | Gets a 'DataWriter' to the 'VirtualFile', ie. a function to write to it
+-- that can be passed to cached tasks.
+getDataWriter
+ :: (LogThrow m, Typeable a, Typeable b)
+ => VirtualFile a b
+ -> PTask m ignored (DataWriter m a)
+getDataWriter vfile = withVFileInternalAccessFunction def [ATWrite] vfile
+ (\mkAccessor _ _ -> case mkAccessor mempty of
+ DataAccessor w _ l -> return $ DataWriter w l)
+
+-- | Gets a 'DataReader' from the 'VirtualFile', ie. a function to read from it
+-- than can be passed to cached tasks.
+getDataReader
+ :: (LogThrow m, Typeable a, Typeable b)
+ => VirtualFile a b
+ -> PTask m ignored (DataReader m b)
+getDataReader vfile = withVFileInternalAccessFunction def [ATRead] vfile
+ (\mkAccessor _ _ -> case mkAccessor mempty of
+ DataAccessor _ r l -> return $ DataReader r l)
+
+-- | Gives a wrapper that should be used when the actual read or write is
+-- performed.
+data AccessToPerform m b b'
+ = DoWrite (m () -> m b')
+ | DoRead (m b -> m b')
+ | DoWriteAndRead (m b -> m b')
+
+-- | A unique value, computed from a action in @m@
+newtype SingletonES m a = SingletonES { getSingletonFromES :: m a }
+
+-- | Just a wrapper around [m a]
+newtype ListES m a = ListES { getListFromES :: [m a] }
+
+-- | Just a wrapper around Stream, with arguments reordered
+newtype StreamES r m a = StreamES { getStreamFromES :: Stream (Of a) m r }
+
+-- | Some class around Stream (Of a) m () and [m a].
+class (Monoid (ESResult seq)) => EffectSeq seq where
+ type ESResult seq :: *
+ mapES :: (Monad m) => (a -> b) -> seq m a -> seq m b
+ mapESM :: (Monad m) => (a -> m b) -> seq m a -> seq m b
+ streamFromES :: (Monad m) => seq m a -> Stream (Of a) m (ESResult seq)
+ runES :: (Monad m) => seq m a -> m (ESResult seq)
+ emptyES :: (Monad m) => seq m a
+
+instance EffectSeq SingletonES where
+ type ESResult SingletonES = ()
+ mapES f = SingletonES . fmap f . getSingletonFromES
+ mapESM f = SingletonES . (>>= f) . getSingletonFromES
+ streamFromES (SingletonES act) = lift act >>= S.yield
+ runES (SingletonES act) = act >> return ()
+ emptyES = SingletonES $ return $ error "SingletonES: THIS IS VOID"
+
+instance EffectSeq ListES where
+ type ESResult ListES = ()
+ mapES f = ListES . map (f <$>) . getListFromES
+ mapESM f = ListES . map (>>= f) . getListFromES
+ streamFromES = mapM_ (\act -> lift act >>= S.yield) . getListFromES
+ runES = sequence_ . getListFromES
+ emptyES = ListES []
+
+instance (Monoid r) => EffectSeq (StreamES r) where
+ type ESResult (StreamES r) = r
+ mapES f = StreamES . S.map f . getStreamFromES
+ mapESM f = StreamES . S.mapM f . getStreamFromES
+ streamFromES = getStreamFromES
+ runES = S.effects . getStreamFromES
+ emptyES = StreamES (return mempty)
+
+class (EffectSeq seq) => EffectSeqFromList seq where
+ esFromList :: (Monad m) => [a] -> seq m a
+
+instance EffectSeqFromList ListES where
+ esFromList = ListES . map return
+
+instance (Monoid r) => EffectSeqFromList (StreamES r) where
+ esFromList x = StreamES $ S.each x >> return mempty
+
+-- | Like 'accessVirtualFile', but uses only one repetition variable
+accessVirtualFile' :: forall seq idx m a b b'.
+ (HasTRIndex idx, LogThrow m, Typeable a, Typeable b
+ ,EffectSeq seq)
+ => AccessToPerform m b b'
+ -> LocVariable
+ -> VirtualFile a b -- ^ Used as a 'DataSource'
+ -> PTask m (seq m (idx, a)) (seq m (idx, b'))
+accessVirtualFile' access repIndex_ vf =
+ arr (mapES $ \(i, a) -> ([i], a))
+ >>> accessVirtualFile access [repIndex_] vf
+ >>> arr (mapES $ first head)
+
+toAccessTypes :: AccessToPerform m b b' -> [VFNodeAccessType]
+toAccessTypes ac = case ac of
+ DoWriteAndRead{} -> [ATWrite,ATRead]
+ DoWrite{} -> [ATWrite]
+ DoRead{} -> [ATRead]
+
+-- | When building the pipeline, stores into the location tree the way to read
+-- or write the required resource. When running the pipeline, accesses the
+-- instances of this ressource corresponding to the values of some repetition
+-- indices.
+accessVirtualFile
+ :: forall m a b b' seq idx.
+ (LogThrow m, Typeable a, Typeable b, HasTRIndex idx
+ ,EffectSeq seq)
+ => AccessToPerform m b b'
+ -> [LocVariable] -- ^ The list of repetition indices. Can be empty if the
+ -- file isn't meant to be repeated
+ -> VirtualFile a b -- ^ The VirtualFile to access
+ -> PTask m (seq m ([idx], a)) (seq m ([idx], b')) -- ^ The resulting task reads a stream of indices and
+ -- input values and returns a stream of the same indices
+ -- associated to their outputs.
+accessVirtualFile accessToDo repIndices vfile =
+ withVFileInternalAccessFunction props (toAccessTypes accessToDo) vfile' $
+ \accessFn isMapped inputEffSeq ->
+ case (isMapped, accessToDo) of
+ (False, DoWrite{}) -> return emptyES
+ -- We don't even run the input effects if the
+ -- input shouldn't be written
+ _ -> return $ mapESM (runOnce accessFn) inputEffSeq
+ where
+ runOnce :: (LocVariableMap -> DataAccessor m a b) -> ([idx], a) -> m ([idx], b')
+ runOnce accessFn (ixVals, input) =
+ (ixVals,) <$> case accessToDo of
+ DoWrite wrap -> wrap $ daPerformWrite da input
+ DoRead wrap -> wrap $ daPerformRead da
+ DoWriteAndRead wrap -> wrap $ daPerformWrite da input >> daPerformRead da
+ where
+ da = accessFn lvMap
+ lvMap = HM.fromList $ zip repIndices $ map (unTRIndex . getTRIndex) ixVals
+ vfile' = case repIndices of
+ [] -> vfile
+ _ -> vfile & over (vfileSerials.serialRepetitionKeys) (repIndices++)
+ props = def
+
+-- | Executes as a task a function that needs to access the content of the
+-- DataAccessNode of a VirtualFile.
+withVFileInternalAccessFunction
+ :: forall m i o a b.
+ (LogThrow m, Typeable a, Typeable b)
+ => Properties i o
+ -> [VFNodeAccessType] -- ^ The accesses that will be performed on it
+ -> VirtualFile a b -- ^ The VirtualFile to access
+ -> ((LocVariableMap -> DataAccessor m a b) -> Bool -> i -> m o)
+ -- ^ The action to run, and a Bool telling if the file has been
+ -- mapped. It will be a function to access the VirtualFile. The
+ -- LocVariableMap can just be empty if the VirtualFile isn't meant to
+ -- be repeated
+ -> PTask m i o
+withVFileInternalAccessFunction props accessesToDo vfile f =
+ withFolderDataAccessNodes props path (Identity fname) $
+ \(Identity n) input -> case n of
+ DataAccessNode layers (action :: LocVariableMap -> DataAccessor m a' b') ->
+ case (eqT :: Maybe (a :~: a'), eqT :: Maybe (b :~: b')) of
+ (Just Refl, Just Refl)
+ -> f action (not $ null layers) input
+ _ -> err "input or output types don't match"
+ _ -> err "no access action is present in the tree"
+ where
+ path = init $ vfile ^. vfileOriginalPath
+ fname = file (last $ vfile ^. vfileOriginalPath) $ VirtualFileNode accessesToDo vfile
+ err s = throwWithPrefix $
+ "withVFileInternalAccessFunction (" ++ showVFileOriginalPath vfile ++ "): " ++ s
+
+-- | Wraps in a task a function that needs to access some items present in a
+-- subfolder of the 'LocationTree' and mark these accesses as done.
+withFolderDataAccessNodes
+ :: (LogThrow m, Traversable t)
+ => Properties i o
+ -> [LocationTreePathItem] -- ^ Path to folder in 'LocationTree'
+ -> t (LTPIAndSubtree VirtualFileNode) -- ^ Items of interest in the subfolder
+ -> (t (DataAccessNode m) -> i -> m o) -- ^ What to run with these items
+ -> PTask m i o -- ^ The resulting PTask
+withFolderDataAccessNodes props path filesToAccess accessFn =
+ makeTask' props tree runAccess
+ where
+ tree = foldr (\pathItem subtree -> folderNode [ pathItem :/ subtree ])
+ (folderNode $ F.toList filesToAccess) path
+ runAccess virtualTree input = do
+ let mbSubtree = virtualTree ^? atSubfolderRec path
+ subtree <- case mbSubtree of
+ Just s -> return s
+ Nothing -> throwWithPrefix $
+ "path '" ++ show path ++ "' not found in the LocationTree"
+ nodeTags <- forM filesToAccess $ \(filePathItem :/ _) ->
+ case subtree ^? atSubfolder filePathItem . locTreeNodeTag of
+ Nothing -> throwWithPrefix $
+ "path '" ++ show filePathItem ++ "' not found in the LocationTree"
+ Just tag -> return tag
+ accessFn nodeTags input
+
+-- | Returns the locs mapped to some path in the location tree. It *doesn't*
+-- expose this path as a requirement (hence the result list may be empty, as no
+-- mapping might exist). SHOULD NOT BE USED UNLESS loadData/writeData cannot do
+-- what you want.
+getLocsMappedTo :: (LogThrow m)
+ => [LocationTreePathItem] -> PTask m () [SomeLoc m]
+getLocsMappedTo path = runnableWithoutReqs $ withRunnableState $
+ \state _ -> getLocs $ state^.ptrsDataAccessTree
+ where
+ onErr (Left s) = throwWithPrefix $
+ "getLocsMappedTo (" ++ T.unpack (toTextRepr (LTP path)) ++ "): " ++ s
+ onErr (Right x) = return x
+ getLocs tree =
+ case tree ^? (atSubfolderRec path . locTreeNodeTag) of
+ -- NOTE: Will fail on repeated folders (because here we can only access
+ -- the final locations -- with variables spliced in -- in the case of
+ -- nodes with a data access function, not intermediary folders).
+ Just (MbDataAccessNode locsWithVars (First mbAccess)) -> case mbAccess of
+ Just (SomeDataAccess fn) -> onErr $ daLocsAccessed $ fn mempty
+ Nothing -> onErr $ traverse terminateLocWithVars locsWithVars
+ _ -> return []