summaryrefslogtreecommitdiff
path: root/src/Streaming/TaskPipelineUtils.hs
blob: 5011bb3bc7362a9022cae0ad455e471ca94dcc53 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
{-# LANGUAGE DeriveGeneric     #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE LambdaCase        #-}
{-# LANGUAGE RankNTypes        #-}
{-# LANGUAGE TemplateHaskell   #-}
{-# LANGUAGE TypeFamilies      #-}
{-# LANGUAGE TypeOperators     #-}

module Streaming.TaskPipelineUtils
  ( module S
  , Of(..)
  , MonadTrans(..)
  , MonadIO(..)
  , StreamFilter(..)
  , Copy(..)
  , (&)
  , S.mapM_
  , asConduit
  , intoSink
  , streamFolder
  , streamFolderRel
  , mapCopy
  , hoistCopy
  , With(..), elt, ann
  , StreamWith
  , mapStreamW
  , mapStreamWM )
  where

import           Control.Lens           hiding ((:>))
import           Control.Monad          (forM_)
import           Control.Monad.IO.Class
import           Data.Aeson
import           Data.Conduit           (ConduitT, Void, runConduit, (.|))
import           Data.Function          ((&))
import           GHC.Generics
import           Streaming
import           Streaming.Conduit      (asConduit, fromStreamSource)
import qualified Streaming.Prelude      as S
import           System.Directory       (doesDirectoryExist,
                                         getDirectoryContents)
import           System.FilePath        ((</>))


intoSink :: Monad m => ConduitT a Void m b -> Stream (Of a) m r -> m b
intoSink snk src = runConduit $ fromStreamSource src .| snk

-- TODO: rename to streamFolderRecursive
streamFolder :: (MonadIO m) => FilePath -> Stream (Of FilePath) m ()
streamFolder topPath = S.map (topPath </>) $ streamFolderRel topPath

-- TODO: rename to streamFolderRecursiveRel
streamFolderRel :: (MonadIO m) => FilePath -> Stream (Of FilePath) m ()
streamFolderRel topPath =
  aux ""
  where
    aux relPath = do
      names <- liftIO $ getDirectoryContents (topPath </> relPath)
      let properNames = filter (`notElem` [".", ".."]) names
      forM_ properNames $ \name -> do
        let path = relPath </> name
        isDirectory <- liftIO $ doesDirectoryExist path
        if isDirectory
          then aux path
          else S.yield path

-- | Generalizes 'partitionEithers' from Streaming.Prelude to fork a stream into
-- several substreams
-- TODO: This language of filters needs an introduction.
-- What problem do they solve? Why bare streams aren't suitable for the task?
class StreamFilter s where
  type Wanted s :: *
  type Split s (m :: * -> *) :: * -> *
  filters :: (Monad m) => s -> Stream (Of (Wanted s)) m r -> Split s m r

instance StreamFilter (a -> Bool) where
  type Wanted (a -> Bool) = a
  type Split (a -> Bool) m = Stream (Of a) m
  filters = S.filter

instance StreamFilter (a -> Maybe b) where
  type Wanted (a -> Maybe b) = a
  type Split (a -> Maybe b) m = Stream (Of b) m
  filters = S.mapMaybe

instance StreamFilter (a -> Either b c) where
  type Wanted (a -> Either b c) = a
  type Split (a -> Either b c) m = Stream (Of b) (Stream (Of c) m)
  filters f = S.partitionEithers . S.map f

instance (StreamFilter s', Wanted s' ~ a) => StreamFilter (Of (a -> Bool) s') where
  type Wanted (Of (a -> Bool) s') = a
  type Split (Of (a -> Bool) s') m = Stream (Of a) (Split s' m)
  filters (f :> s') = hoist (filters s') . S.partition f

instance (StreamFilter s', Wanted s' ~ a) => StreamFilter (Of (a -> Maybe b) s') where
  type Wanted (Of (a -> Maybe b) s') = a
  type Split (Of (a -> Maybe b) s') m = Stream (Of b) (Split s' m)
  filters (f :> s') = filters (f' :> s')
    where f' a = case f a of
            Just b  -> Left b
            Nothing -> Right a

instance (StreamFilter s', Wanted s' ~ c) => StreamFilter (Of (a -> Either b c) s') where
  type Wanted (Of (a -> Either b c) s') = a
  type Split (Of (a -> Either b c) s') m = Stream (Of b) (Split s' m)
  filters (f :> s') = hoist (filters s') . S.partitionEithers . S.map f

data Copy = Copy

instance (StreamFilter s') => StreamFilter (Of Copy s') where
  type Wanted (Of Copy s') = Wanted s'
  type Split (Of Copy s') m = Stream (Of (Wanted s')) (Split s' m)
  filters (Copy :> s') = hoist (filters s') . S.copy

-- | Copies stream elements to a layer underneath after applying a function on
-- them
-- TODO: Motivate.
mapCopy
  :: Monad m
  => (a -> b) -> Stream (Of a) (Stream (Of b) m) r -> Stream (Of a) (Stream (Of b) m) r
mapCopy f stream = S.for stream $ \x -> do
  S.yield x
  lift $ S.yield $ f x

-- | A version of mapCopy that takes the whole substream of copied values and
-- merges it downwards
-- TODO: Motivate.
hoistCopy
  :: (Monad m)
  => (forall n s. (Monad n) => Stream (Of a) n s -> Stream (Of b) n s)
  -> Stream (Of a) (Stream (Of b) m) r -> Stream (Of a) (Stream (Of b) m) r
hoistCopy g stream =
  S.copy stream & hoist (S.effects . flip S.for (lift . S.yield) . g)

-- | Just a simple tuple to annotate stream elements. It is strict in the
-- annotation.
-- TODO: Motivate. How is this better than a primitive pair?
data t `With` ann = With { _ann :: !ann, _elt :: t }
  deriving (Eq, Generic)

makeLenses ''With

instance (ToJSON t, ToJSON ann) => ToJSON (t `With` ann)
instance (FromJSON t, FromJSON ann) => FromJSON (t `With` ann)

type StreamWith id a = Stream (Of (a `With` id))

-- TODO: Maybe this function can be defined more generally when the stream
-- elements are functors.
mapStreamW :: Monad m => (a -> b) -> StreamWith ann a m r -> StreamWith ann b m r
mapStreamW f = S.map $ \case
  With pid a -> With pid (f a)

mapStreamWM :: Monad m => (a -> m b) -> StreamWith ann a m r -> StreamWith ann b m r
mapStreamWM f = S.mapM $ \case
  With pid a -> With pid <$> f a