summaryrefslogtreecommitdiff
path: root/Reactor/Observable.hs
blob: e193101cd5f51d9fdeeb6a3117d3db29036e25da (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
module Reactor.Observable
  ( Observable(..)
  , never
  , fby
  , take, drop
  , safe
  , filterMap, (!?)
  , observe
  , counted
  -- , andThen
  , (<||>)
  -- , feed
  -- , head
  -- , safeHead
  ) where

import Prelude hiding (filter, take, drop)
import Control.Applicative
import Control.Exception hiding (handle)
import Control.Monad
import Control.Monad.Error
import Data.Foldable
import qualified Data.Foldable as Foldable
import Data.Functor.Bind
import Data.Functor.Plus
import Data.Functor.Extend
import Data.Functor.Contravariant
import Data.Monoid
import Data.IORef
import Reactor.Atomic
import Reactor.Filtered
import Reactor.Observer
import Reactor.Task
import Reactor.Subscription

newtype Observable a = Observable { subscribe :: Observer a -> Task Subscription }

instance Functor Observable where
  fmap f s = Observable (subscribe s . contramap f)

instance Apply Observable where
  mf <.> ma = Observable $ \o -> do
    funs <- io $ newIORef []
    fflag <- atomic (1 :: Int)
    aflag <- atomic (1 :: Int)
    let discard pf sf = do
          i <- atomicFetchAndAnd pf 0
          when (i == 1) $ do
            j <- atomicFetch sf
            when (j == 0) $ complete o
    mappend 
      <$> subscribe_ mf
            (\f -> io $ atomicModifyIORef funs (\fs -> (f:fs, ())))
            (handle o)
            (discard fflag aflag)
      <*> subscribe_ ma
            (\a -> do
              fs <- io $ readIORef funs 
              spawn $ Foldable.mapM_ (\f -> o ! f a) fs)
            (handle o)
            (discard aflag fflag)

instance Filtered Observable where
  filter p s = Observable (subscribe s . filter p)

instance Applicative Observable where
  pure a = Observable $ \o -> (o ! a) *> complete o $> mempty 
  (<*>) = (<.>)

instance Bind Observable where
  mf >>- k = Observable $ \o -> do
    counter <- atomic (1 :: Int)
    topFlag <- atomic (1 :: Int)
    let detach flag = do
          clearing flag $ do
            i <- atomicSubAndFetch counter 1
            when (i == 0) $ complete o
    subscribe_ mf 
      (\f -> do
        flag <- atomic (1 :: Int)
        _ <- atomicAddAndFetch counter 1 
        () <$ subscribe_ (k f)
          (o !)
          (handle o)
          (detach flag))
      (handle o)
      (detach topFlag)
        
instance Monad Observable where
  return = pure
  (>>=) = (>>-)

instance Alt Observable where
  a <!> b = Observable $ \o -> subscribe_ a
    (o !)
    (handle o)
    (subscribe b o $> ())

instance Plus Observable where
  zero = Observable (\o -> complete o $> mempty)
  
instance Alternative Observable where
  empty = zero
  (<|>) = (<!>)

instance MonadPlus Observable where
  mzero = zero
  mplus = (<!>)

instance Extend Observable where
  duplicate p = Observable $ \o -> subscribe_ p 
    (\a -> o ! fby a p)
    (handle o) 
    (complete o)
  extend f p = Observable $ \o -> subscribe_ p 
    (\a -> o ! f (fby a p))
    (handle o)
    (complete o)

safe :: Observable a -> Observable a 
safe p = Observable $ \o -> do
   alive <- atomic (1 :: Int)
   subscribe_ p
     (\a -> given alive $ o ! a)
     (\e -> clearing alive $ handle o e)
     (clearing alive $ complete o)

subscribe_ :: Observable a -> (a -> Task ()) -> (SomeException -> Task ()) -> Task () -> Task Subscription
subscribe_ a f h c = subscribe a (Observer f h c)

filterMap :: (a -> Maybe b) -> Observable a -> Observable b
filterMap p s = Observable $ \o -> subscribe s $ o ?! p

(!?) :: Observable a -> (a -> Maybe b) -> Observable b
(!?) = flip filterMap 

never :: Observable a
never = Observable $ \_ -> return mempty

fby :: a -> Observable a -> Observable a 
fby a as = Observable $ \o -> do
  o ! a
  subscribe as o

take :: Int -> Observable a -> Observable a 
take n p = Observable $ \o -> do
  counter <- atomic n
  subscribe_ p 
    (\a -> do
      i <- io $ atomicSubAndFetch counter 1
      when (i >= 0) $ o ! a
      when (i == 0) $ complete o)
    (handle o)
    (do
      i <- io $ atomicFetchAndAnd counter 0
      when (i >= 0) $ complete o)
      
drop :: Int -> Observable a -> Observable a 
drop n p = Observable $ \o -> do
  counter <- atomic n
  subscribe_ p
    (\a -> do
      i <- io $ atomicSubAndFetch counter 1
      when (i < 0) $ o ! a)
    (handle o)
    (complete o)

-- | Observe both at the same time.
(<||>) :: Observable a -> Observable a -> Observable a 
p <||> q = Observable $ \o -> mappend <$> subscribe p o <*> subscribe q o

observe :: Foldable f => f a -> Observable a
observe t = Observable $ \o -> do
  spawn $ do
    Foldable.forM_ t (o !)
    complete o
  return mempty

counted :: Observable a -> Observable (Int, a)
counted p = Observable $ \o -> do
  counter <- atomic 0
  subscribe_ p 
    (\a -> do
      i <- atomicFetchAndAdd counter 1
      o ! (i, a))
    (handle o)
    (complete o)

-- do something when the observable completes

{-
andThen :: Observable a -> Task () -> Task ()
andThen p t = spawn $ do
  () <$ subscribe_ p (\_ -> return ()) (\_ -> t) t
-}

{-

feed :: Moore i o -> Observable i -> Task o
feed machine p = do
  m <- io $ newIORef machine
  callCC $ \resume -> 
    subscribe p 
      (\i -> io $ atomicModifyIORef m $ \m' -> (step m' i, ()))
      throwError 
      (do Moore _ o <- io $ readIORef m
          resume o)

safeHead :: Onservable a -> Task (Maybe a)
safeHead = feed (Moore (pure . Just) Nothing) . take 1

head :: Observable a -> Task a 
head = feed (Moore pure (error "head: empty observable")) . take 1

-- uncons :: Observable a -> Task (Maybe (a, Observable a))
-}