summaryrefslogtreecommitdiff
path: root/src/System/TaskPipeline/Repetition/Streaming.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/System/TaskPipeline/Repetition/Streaming.hs')
-rw-r--r--src/System/TaskPipeline/Repetition/Streaming.hs127
1 files changed, 127 insertions, 0 deletions
diff --git a/src/System/TaskPipeline/Repetition/Streaming.hs b/src/System/TaskPipeline/Repetition/Streaming.hs
new file mode 100644
index 0000000..3edfe32
--- /dev/null
+++ b/src/System/TaskPipeline/Repetition/Streaming.hs
@@ -0,0 +1,127 @@
+{-# LANGUAGE FlexibleContexts #-}
+
+-- | This module contains functions that map a task over a stream. Delaying
+-- effects of tasks like this plays badly with caching. This interface will be
+-- removed in a future version of porcupine.
+
+module System.TaskPipeline.Repetition.Streaming
+ ( STask, ISTask, OSTask
+ , mappingOverStream
+ , listToStreamTask, runStreamTask, streamToListTask
+ , Typeable
+ ) where
+
+import Control.Arrow
+import Control.Category
+import Control.Lens hiding ((:>))
+import Control.Monad
+import Data.Locations
+import Katip
+import Prelude hiding ((.))
+import Streaming (Of (..), Stream)
+import qualified Streaming.Prelude as S
+import System.TaskPipeline.PTask
+import System.TaskPipeline.PTask.Internal
+import System.TaskPipeline.Repetition.Internal
+
+
+-- * Type aliases for tasks over streams
+
+-- | An PTask mapping a action over a Stream, transforming @a@'s into
+-- @b@'s. Each element in the stream should be associated to an identifier.
+type STask m a b r =
+ PTask m
+ (Stream (Of a) m r)
+ (Stream (Of b) m r)
+
+-- | An 'PTask' that consumes an Input Stream and just returns its result.
+type ISTask m a r =
+ PTask m
+ (Stream (Of a) m r)
+ r
+
+-- | An 'PTask' that emits an Output Stream.
+type OSTask m a b =
+ PTask m
+ a
+ (Stream (Of b) m ())
+
+-- * Running tasks over streams
+
+-- | Turns a task into something that will be repeated once per each item in its
+-- input. This is done by transforming VirtualFile accessed by the tasks to add
+-- a 'RepetitionKey' to it, indicating that its final file name should be
+-- modified by adding an identifier to it just before reading it or writing it.
+-- So each loop actually accesses different locations in the end.
+--
+-- Calls to 'mappingOverStream' can be nested, this way the underlying VirtualFiles
+-- will have one 'RepetitionKey' per loop (from outermost loop to innermost).
+mappingOverStream
+ :: (HasTRIndex a, CanRunPTask m)
+ => LocVariable -- ^ A variable name, used as a key to indicate which
+ -- repetition we're at. Used in the logger context and
+ -- exposed in the yaml file for each VirtualFile that
+ -- will be repeated by this task
+ -> Maybe Verbosity -- ^ The minimal vebosity level at which to display the
+ -- logger context. (Nothing if we don't want to add
+ -- context)
+ -> PTask m a b -- ^ The base task X to repeat
+ -> STask m a b r -- ^ A task that will repeat X it for each input. Each
+ -- input is associated to a identifier that will be
+ -- appended to every Loc mapped to every leaf in the
+ -- LocationTree given to X.
+mappingOverStream repetitionKey mbVerb =
+ over taskRunnablePart mappingRunnableOverStream
+ . makeTaskRepeatable (RepInfo repetitionKey mbVerb)
+
+{-# DEPRECATED mappingOverStream "Prefer the FoldA API to repeat tasks and consume streams" #-}
+
+-- | IMPORTANT: That requires the RunnableTask to be repeatable. See
+-- 'makeTaskRepeatable'.
+mappingRunnableOverStream
+ :: (CanRunPTask m)
+ => RunnableTask m a b
+ -> RunnableTask m
+ (Stream (Of a) m r)
+ (Stream (Of b) m r)
+mappingRunnableOverStream runnable =
+ withRunnableState $ \state inputStream -> do
+ firstElem <- S.next inputStream
+ case firstElem of
+ Left r -> return (return r) -- Empty input stream
+ Right (firstInput, inputStream') -> do
+ firstResult <- go state firstInput
+ return $
+ firstResult `S.cons` S.mapM (go state) inputStream'
+ where
+ go = execRunnableTask runnable
+ -- NOTE: We "cheat" here: we run the funflow layer of the inner
+ -- task. We should find a way not to have to do that, but when using
+ -- Streaming (which delays effects in a monad) it's really problematic.
+
+-- * Helper functions to create and run streams
+
+-- | Runs the input stream, forgets all its elements and just returns its result
+runStreamTask :: (KatipContext m)
+ => PTask m
+ (Stream (Of t) m r)
+ r
+runStreamTask = toTask S.effects
+
+-- | An 'PTask' converting a list to a stream
+listToStreamTask :: (Monad m)
+ => PTask m
+ [t]
+ (Stream (Of t) m ())
+listToStreamTask = arr S.each
+
+-- | An 'PTask' converting an input stream to a list. WARNING: It can cause
+-- space leaks if the list is too big, as the output list will be eagerly
+-- evaluated. This function is provided only for compatibility with existing
+-- tasks expecting lists. Please consider switching to processing streams
+-- directly. See 'S.toList' for more details.
+streamToListTask :: (KatipContext m)
+ => PTask m
+ (Stream (Of t) m r)
+ [t]
+streamToListTask = toTask (S.toList_ . void)