summaryrefslogtreecommitdiff
path: root/Reactor/Task.hs
blob: fa9cc80eddb9eb86f4cd5bb8f4fa53999c130577 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
{-# LANGUAGE MultiParamTypeClasses, DeriveDataTypeable #-}
module Reactor.Task
  ( Task
  , run
  , spawn
  , io
  ) where

import Control.Applicative
import Control.Monad
import Control.Exception
import Control.Monad.Reader.Class
import Control.Monad.Error.Class
import Control.Monad.IO.Class
import Data.Array.IO
import Data.Functor.Bind
import Data.Functor.Plus
import Reactor.Deque (Deque)
import Data.Data
import qualified Reactor.Deque as Deque

newtype Env = Env { envDeque :: Deque IOArray (Task ()) }

mkEnv :: IO Env 
mkEnv = Env <$> Deque.empty

newtype Task a = Task 
  { runTask :: (a -> IO ()) -> 
               (SomeException -> IO ()) -> 
               (Env -> IO ())
  } deriving Typeable

instance Functor Task where
  fmap f (Task m) = Task $ \ks -> m (ks . f)

instance Apply Task where
  Task mf <.> Task ma = Task $ \ks kf e -> mf (\f -> ma (ks . f) kf e) kf e

instance Applicative Task where 
  pure a = Task (\ks _kf _e -> ks a)
  (<*>) = (<.>) 

instance Bind Task where
  Task mf >>- k = Task (\ks kf e -> mf (\a -> runTask (k a) ks kf e) kf e)
  
instance Monad Task where
  return = pure
  (>>=) = (>>-)

instance MonadReader Env Task where
  ask = Task (\ks _kf e -> ks e)
  local f (Task ma) = Task (\ks kf e -> ma ks kf (f e))

instance MonadIO Task where
  liftIO = io

io :: IO a -> Task a 
io act = Task (\ks _kf _e -> act >>= ks)

instance MonadError SomeException Task where
  throwError err = Task (\_ks kf _e -> kf err)
  catchError (Task m) h = Task (\ks kf e -> m ks (\err -> runTask (h err) ks kf e) e)

instance Alt Task where
  Task ma <!> Task mb = Task (\ks kf e -> ma ks (\_ -> mb ks kf e) e)

instance Plus Task where
  zero = Task (\_ks kf _e -> kf (toException (ErrorCall "empty")))

instance Alternative Task where
  (<|>) = (<!>)
  empty = zero

instance MonadPlus Task where
  mzero = zero
  mplus = (<!>)

spawn :: Task () -> Task ()
spawn task = Task (\_ks _kf e -> Deque.push task (envDeque e))

-- run a single threaded pump, all tasks are placed locally
run :: Task () -> IO ()
run task0 = do
  env <- mkEnv
  bracket_
    (register env)
    (go env task0)
    (unregister env)
  where
    go :: Env -> Task () -> IO ()
    go env (Task m) = m (success env) (failure env) env
    success env _ = Deque.pop (envDeque env) >>= maybe (return ()) (go env)
    failure _env = throw -- TODO: shut down workers?
    register _env = return () -- TODO: start up if necessary and tell worker threads about us
    unregister _env = return () -- TODO: shutdown if necessary and tell worker threads about us