summaryrefslogtreecommitdiff
path: root/src/Streaming
diff options
context:
space:
mode:
authorYvesPares <>2019-10-09 10:16:00 (GMT)
committerhdiff <hdiff@hdiff.luite.com>2019-10-09 10:16:00 (GMT)
commit0a26644cffcc096b169b4f1ee3b790b562c01c51 (patch)
tree0404a48f10a79540a26f315f0af2da316474e3a9 /src/Streaming
version 0.1.0.00.1.0.0
Diffstat (limited to 'src/Streaming')
-rw-r--r--src/Streaming/TaskPipelineUtils.hs157
1 files changed, 157 insertions, 0 deletions
diff --git a/src/Streaming/TaskPipelineUtils.hs b/src/Streaming/TaskPipelineUtils.hs
new file mode 100644
index 0000000..5011bb3
--- /dev/null
+++ b/src/Streaming/TaskPipelineUtils.hs
@@ -0,0 +1,157 @@
+{-# LANGUAGE DeriveGeneric #-}
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE LambdaCase #-}
+{-# LANGUAGE RankNTypes #-}
+{-# LANGUAGE TemplateHaskell #-}
+{-# LANGUAGE TypeFamilies #-}
+{-# LANGUAGE TypeOperators #-}
+
+module Streaming.TaskPipelineUtils
+ ( module S
+ , Of(..)
+ , MonadTrans(..)
+ , MonadIO(..)
+ , StreamFilter(..)
+ , Copy(..)
+ , (&)
+ , S.mapM_
+ , asConduit
+ , intoSink
+ , streamFolder
+ , streamFolderRel
+ , mapCopy
+ , hoistCopy
+ , With(..), elt, ann
+ , StreamWith
+ , mapStreamW
+ , mapStreamWM )
+ where
+
+import Control.Lens hiding ((:>))
+import Control.Monad (forM_)
+import Control.Monad.IO.Class
+import Data.Aeson
+import Data.Conduit (ConduitT, Void, runConduit, (.|))
+import Data.Function ((&))
+import GHC.Generics
+import Streaming
+import Streaming.Conduit (asConduit, fromStreamSource)
+import qualified Streaming.Prelude as S
+import System.Directory (doesDirectoryExist,
+ getDirectoryContents)
+import System.FilePath ((</>))
+
+
+intoSink :: Monad m => ConduitT a Void m b -> Stream (Of a) m r -> m b
+intoSink snk src = runConduit $ fromStreamSource src .| snk
+
+-- TODO: rename to streamFolderRecursive
+streamFolder :: (MonadIO m) => FilePath -> Stream (Of FilePath) m ()
+streamFolder topPath = S.map (topPath </>) $ streamFolderRel topPath
+
+-- TODO: rename to streamFolderRecursiveRel
+streamFolderRel :: (MonadIO m) => FilePath -> Stream (Of FilePath) m ()
+streamFolderRel topPath =
+ aux ""
+ where
+ aux relPath = do
+ names <- liftIO $ getDirectoryContents (topPath </> relPath)
+ let properNames = filter (`notElem` [".", ".."]) names
+ forM_ properNames $ \name -> do
+ let path = relPath </> name
+ isDirectory <- liftIO $ doesDirectoryExist path
+ if isDirectory
+ then aux path
+ else S.yield path
+
+-- | Generalizes 'partitionEithers' from Streaming.Prelude to fork a stream into
+-- several substreams
+-- TODO: This language of filters needs an introduction.
+-- What problem do they solve? Why bare streams aren't suitable for the task?
+class StreamFilter s where
+ type Wanted s :: *
+ type Split s (m :: * -> *) :: * -> *
+ filters :: (Monad m) => s -> Stream (Of (Wanted s)) m r -> Split s m r
+
+instance StreamFilter (a -> Bool) where
+ type Wanted (a -> Bool) = a
+ type Split (a -> Bool) m = Stream (Of a) m
+ filters = S.filter
+
+instance StreamFilter (a -> Maybe b) where
+ type Wanted (a -> Maybe b) = a
+ type Split (a -> Maybe b) m = Stream (Of b) m
+ filters = S.mapMaybe
+
+instance StreamFilter (a -> Either b c) where
+ type Wanted (a -> Either b c) = a
+ type Split (a -> Either b c) m = Stream (Of b) (Stream (Of c) m)
+ filters f = S.partitionEithers . S.map f
+
+instance (StreamFilter s', Wanted s' ~ a) => StreamFilter (Of (a -> Bool) s') where
+ type Wanted (Of (a -> Bool) s') = a
+ type Split (Of (a -> Bool) s') m = Stream (Of a) (Split s' m)
+ filters (f :> s') = hoist (filters s') . S.partition f
+
+instance (StreamFilter s', Wanted s' ~ a) => StreamFilter (Of (a -> Maybe b) s') where
+ type Wanted (Of (a -> Maybe b) s') = a
+ type Split (Of (a -> Maybe b) s') m = Stream (Of b) (Split s' m)
+ filters (f :> s') = filters (f' :> s')
+ where f' a = case f a of
+ Just b -> Left b
+ Nothing -> Right a
+
+instance (StreamFilter s', Wanted s' ~ c) => StreamFilter (Of (a -> Either b c) s') where
+ type Wanted (Of (a -> Either b c) s') = a
+ type Split (Of (a -> Either b c) s') m = Stream (Of b) (Split s' m)
+ filters (f :> s') = hoist (filters s') . S.partitionEithers . S.map f
+
+data Copy = Copy
+
+instance (StreamFilter s') => StreamFilter (Of Copy s') where
+ type Wanted (Of Copy s') = Wanted s'
+ type Split (Of Copy s') m = Stream (Of (Wanted s')) (Split s' m)
+ filters (Copy :> s') = hoist (filters s') . S.copy
+
+-- | Copies stream elements to a layer underneath after applying a function on
+-- them
+-- TODO: Motivate.
+mapCopy
+ :: Monad m
+ => (a -> b) -> Stream (Of a) (Stream (Of b) m) r -> Stream (Of a) (Stream (Of b) m) r
+mapCopy f stream = S.for stream $ \x -> do
+ S.yield x
+ lift $ S.yield $ f x
+
+-- | A version of mapCopy that takes the whole substream of copied values and
+-- merges it downwards
+-- TODO: Motivate.
+hoistCopy
+ :: (Monad m)
+ => (forall n s. (Monad n) => Stream (Of a) n s -> Stream (Of b) n s)
+ -> Stream (Of a) (Stream (Of b) m) r -> Stream (Of a) (Stream (Of b) m) r
+hoistCopy g stream =
+ S.copy stream & hoist (S.effects . flip S.for (lift . S.yield) . g)
+
+-- | Just a simple tuple to annotate stream elements. It is strict in the
+-- annotation.
+-- TODO: Motivate. How is this better than a primitive pair?
+data t `With` ann = With { _ann :: !ann, _elt :: t }
+ deriving (Eq, Generic)
+
+makeLenses ''With
+
+instance (ToJSON t, ToJSON ann) => ToJSON (t `With` ann)
+instance (FromJSON t, FromJSON ann) => FromJSON (t `With` ann)
+
+type StreamWith id a = Stream (Of (a `With` id))
+
+-- TODO: Maybe this function can be defined more generally when the stream
+-- elements are functors.
+mapStreamW :: Monad m => (a -> b) -> StreamWith ann a m r -> StreamWith ann b m r
+mapStreamW f = S.map $ \case
+ With pid a -> With pid (f a)
+
+mapStreamWM :: Monad m => (a -> m b) -> StreamWith ann a m r -> StreamWith ann b m r
+mapStreamWM f = S.mapM $ \case
+ With pid a -> With pid <$> f a