summaryrefslogtreecommitdiff
path: root/Reactor/Task.hs
diff options
context:
space:
mode:
Diffstat (limited to 'Reactor/Task.hs')
-rw-r--r--Reactor/Task.hs96
1 files changed, 96 insertions, 0 deletions
diff --git a/Reactor/Task.hs b/Reactor/Task.hs
new file mode 100644
index 0000000..fa9cc80
--- /dev/null
+++ b/Reactor/Task.hs
@@ -0,0 +1,96 @@
+{-# LANGUAGE MultiParamTypeClasses, DeriveDataTypeable #-}
+module Reactor.Task
+ ( Task
+ , run
+ , spawn
+ , io
+ ) where
+
+import Control.Applicative
+import Control.Monad
+import Control.Exception
+import Control.Monad.Reader.Class
+import Control.Monad.Error.Class
+import Control.Monad.IO.Class
+import Data.Array.IO
+import Data.Functor.Bind
+import Data.Functor.Plus
+import Reactor.Deque (Deque)
+import Data.Data
+import qualified Reactor.Deque as Deque
+
+newtype Env = Env { envDeque :: Deque IOArray (Task ()) }
+
+mkEnv :: IO Env
+mkEnv = Env <$> Deque.empty
+
+newtype Task a = Task
+ { runTask :: (a -> IO ()) ->
+ (SomeException -> IO ()) ->
+ (Env -> IO ())
+ } deriving Typeable
+
+instance Functor Task where
+ fmap f (Task m) = Task $ \ks -> m (ks . f)
+
+instance Apply Task where
+ Task mf <.> Task ma = Task $ \ks kf e -> mf (\f -> ma (ks . f) kf e) kf e
+
+instance Applicative Task where
+ pure a = Task (\ks _kf _e -> ks a)
+ (<*>) = (<.>)
+
+instance Bind Task where
+ Task mf >>- k = Task (\ks kf e -> mf (\a -> runTask (k a) ks kf e) kf e)
+
+instance Monad Task where
+ return = pure
+ (>>=) = (>>-)
+
+instance MonadReader Env Task where
+ ask = Task (\ks _kf e -> ks e)
+ local f (Task ma) = Task (\ks kf e -> ma ks kf (f e))
+
+instance MonadIO Task where
+ liftIO = io
+
+io :: IO a -> Task a
+io act = Task (\ks _kf _e -> act >>= ks)
+
+instance MonadError SomeException Task where
+ throwError err = Task (\_ks kf _e -> kf err)
+ catchError (Task m) h = Task (\ks kf e -> m ks (\err -> runTask (h err) ks kf e) e)
+
+instance Alt Task where
+ Task ma <!> Task mb = Task (\ks kf e -> ma ks (\_ -> mb ks kf e) e)
+
+instance Plus Task where
+ zero = Task (\_ks kf _e -> kf (toException (ErrorCall "empty")))
+
+instance Alternative Task where
+ (<|>) = (<!>)
+ empty = zero
+
+instance MonadPlus Task where
+ mzero = zero
+ mplus = (<!>)
+
+spawn :: Task () -> Task ()
+spawn task = Task (\_ks _kf e -> Deque.push task (envDeque e))
+
+-- run a single threaded pump, all tasks are placed locally
+run :: Task () -> IO ()
+run task0 = do
+ env <- mkEnv
+ bracket_
+ (register env)
+ (go env task0)
+ (unregister env)
+ where
+ go :: Env -> Task () -> IO ()
+ go env (Task m) = m (success env) (failure env) env
+ success env _ = Deque.pop (envDeque env) >>= maybe (return ()) (go env)
+ failure _env = throw -- TODO: shut down workers?
+ register _env = return () -- TODO: start up if necessary and tell worker threads about us
+ unregister _env = return () -- TODO: shutdown if necessary and tell worker threads about us
+