summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--LICENSE30
-rw-r--r--Reactor/Atomic.hs78
-rw-r--r--Reactor/Deque.hs194
-rw-r--r--Reactor/Filtered.hs14
-rw-r--r--Reactor/Moore.hs34
-rw-r--r--Reactor/Observable.hs216
-rw-r--r--Reactor/Observer.hs38
-rw-r--r--Reactor/Subscription.hs17
-rw-r--r--Reactor/Task.hs96
-rw-r--r--Setup.lhs7
-rw-r--r--reactor.cabal37
11 files changed, 761 insertions, 0 deletions
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..c304044
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,30 @@
+Copyright 2011 Edward Kmett
+
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions
+are met:
+
+1. Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in the
+ documentation and/or other materials provided with the distribution.
+
+3. Neither the name of the author nor the names of his contributors
+ may be used to endorse or promote products derived from this software
+ without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND ANY EXPRESS OR
+IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR
+ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+POSSIBILITY OF SUCH DAMAGE.
diff --git a/Reactor/Atomic.hs b/Reactor/Atomic.hs
new file mode 100644
index 0000000..0a0b70c
--- /dev/null
+++ b/Reactor/Atomic.hs
@@ -0,0 +1,78 @@
+{-# LANGUAGE DeriveDataTypeable #-}
+module Reactor.Atomic where
+
+import Control.Monad
+import Control.Monad.IO.Class
+import Data.Bits.Atomic
+import Data.Data
+import Foreign.Ptr
+import Foreign.ForeignPtr
+import Foreign.Storable
+import System.IO.Unsafe
+
+newtype Atomic a = Atomic (ForeignPtr a)
+ deriving (Data, Typeable)
+
+instance (Show a, Storable a) => Show (Atomic a) where
+ showsPrec d (Atomic fp) = showsPrec d $ unsafePerformIO $ withForeignPtr fp peek
+
+atomic :: (MonadIO m, AtomicBits a, Storable a) => a -> m (Atomic a)
+atomic a = liftIO $ do
+ fp <- mallocForeignPtr
+ withForeignPtr fp $ \p -> poke p a
+ return $ Atomic fp
+
+withAtomic :: MonadIO m => Atomic a -> (Ptr a -> IO b) -> m b
+withAtomic (Atomic fp) = liftIO . withForeignPtr fp
+{-# INLINE withAtomic #-}
+
+atomicFetchAndAdd :: (MonadIO m, AtomicBits a) => Atomic a -> a -> m a
+atomicFetchAndAdd fp a = withAtomic fp $ \p -> fetchAndAdd p a
+{-# INLINE atomicFetchAndAdd #-}
+
+atomicFetchAndAnd :: (MonadIO m, AtomicBits a) => Atomic a -> a -> m a
+atomicFetchAndAnd fp a = withAtomic fp $ \p -> fetchAndAnd p a
+{-# INLINE atomicFetchAndAnd #-}
+
+atomicFetch :: (MonadIO m, AtomicBits a) => Atomic a -> m a
+atomicFetch fp = atomicFetchAndAdd fp 0
+{-# INLINE atomicFetch #-}
+
+atomicFetchAndSub :: (MonadIO m, AtomicBits a) => Atomic a -> a -> m a
+atomicFetchAndSub fp a = withAtomic fp $ \p -> fetchAndSub p a
+{-# INLINE atomicFetchAndSub #-}
+
+atomicSubAndFetch :: (MonadIO m, AtomicBits a) => Atomic a -> a -> m a
+atomicSubAndFetch fp a = withAtomic fp $ \p -> subAndFetch p a
+{-# INLINE atomicSubAndFetch #-}
+
+atomicAddAndFetch :: (MonadIO m, AtomicBits a) => Atomic a -> a -> m a
+atomicAddAndFetch fp a = withAtomic fp $ \p -> subAndFetch p a
+{-# INLINE atomicAddAndFetch #-}
+
+atomicCompareAndSwapBool :: (MonadIO m, AtomicBits a) => Atomic a -> a -> a -> m Bool
+atomicCompareAndSwapBool fp old new = withAtomic fp $ \p -> compareAndSwapBool p old new
+{-# INLINE atomicCompareAndSwapBool #-}
+
+atomicCompareAndSwap :: (MonadIO m, AtomicBits a) => Atomic a -> a -> a -> m a
+atomicCompareAndSwap fp old new = withAtomic fp $ \p -> compareAndSwap p old new
+{-# INLINE atomicCompareAndSwap #-}
+
+atomicLockTestAndSet :: (MonadIO m, AtomicBits a) => Atomic a -> m a
+atomicLockTestAndSet fp = withAtomic fp lockTestAndSet
+{-# INLINE atomicLockTestAndSet #-}
+
+atomicLockRelease :: (MonadIO m, AtomicBits a) => Atomic a -> m ()
+atomicLockRelease fp = withAtomic fp lockRelease
+{-# INLINE atomicLockRelease #-}
+
+given :: (MonadIO m, AtomicBits a) => Atomic a -> m () -> m ()
+given flag task = do
+ i <- atomicFetch flag
+ when (i /= 0) task
+
+clearing :: (MonadIO m, AtomicBits a) => Atomic a -> m () -> m ()
+clearing flag task = do
+ i <- atomicFetchAndAnd flag 0
+ when (i /= 0) task
+
diff --git a/Reactor/Deque.hs b/Reactor/Deque.hs
new file mode 100644
index 0000000..bf6096d
--- /dev/null
+++ b/Reactor/Deque.hs
@@ -0,0 +1,194 @@
+{-# LANGUAGE UndecidableInstances, FlexibleContexts, DeriveDataTypeable #-}
+
+module Reactor.Deque (
+ Deque
+
+ -- * Local stack operations
+ , empty -- :: (MonadIO m, MArray a e IO) => IO (Deque a e)
+ , push -- :: (MonadIO m, MArray a e IO) => e -> Deque a e -> IO ()
+ , pop -- :: (MonadIO m, MArray a e IO) => Deque a e -> IO (Maybe e)
+
+ -- * Performance tuning
+ , withCapacity -- :: (MonadIO m, MArray a e IO) => Int -> IO (Deque a e)
+ , minimumCapacity -- :: Int
+ , defaultCapacity -- :: Int
+
+ -- * Work stealing
+ , steal -- :: (MonadIO m, MArray a e IO) => Deque a e -> IO (Stolen e)
+ , Stolen(..)
+ ) where
+
+-- | For an explanation of the implementation, see \"Dynamic Circular Work-Stealing Deque\"
+-- by David Chase and Yossi Lev of Sun Microsystems.
+
+import Prelude hiding (read)
+import Control.Applicative hiding (empty)
+import Data.Bits.Atomic
+import Foreign.Ptr
+import Foreign.ForeignPtr
+import Foreign.Storable
+import Data.IORef
+import Data.Array.MArray
+import Control.Monad
+import Control.Monad.IO.Class
+import Data.Data
+import System.IO.Unsafe
+
+data Buffer a e = Buffer {-# UNPACK #-} !Int !(a Int e)
+
+instance Typeable2 a => Typeable1 (Buffer a) where
+ typeOf1 tae = mkTyConApp bufferTyCon [typeOf1 (aInte tae)]
+ where aInte :: t a e -> a Int e
+ aInte = undefined
+
+bufferTyCon :: TyCon
+bufferTyCon = mkTyCon "Reactor.Deque.Buffer"
+
+size :: Buffer a e -> Int
+size (Buffer i _) = i
+
+data Deque a e = Deque
+ { _tb :: ForeignPtr Int
+ , _content :: IORef (Buffer a e)
+ }
+
+instance (MArray a e IO, Show e) => Show (Deque a e) where
+ showsPrec d (Deque tb content) = unsafePerformIO $ do
+ (t,b) <- withForeignPtr tb $ \p -> (,) <$> peekTop p <*> peekBottom p
+ buffer <- readIORef content
+ contents <- forM [t..b-1] (read buffer)
+ return $ showParen (d > 10) $
+ showString "Deque (ptr " . showsPrec 11 t . showChar ' ' . showsPrec 11 b . showString ") (buffer " . showsPrec 11 contents . showChar ')'
+
+instance Typeable2 a => Typeable1 (Deque a) where
+ typeOf1 dae = mkTyConApp dequeTyCon [typeOf1 (aInte dae)]
+ where aInte :: t a e -> a Int e
+ aInte = undefined
+
+dequeTyCon :: TyCon
+dequeTyCon = mkTyCon "Reactor.Deque.Deque"
+
+ptr :: Storable a => a -> a -> IO (ForeignPtr a)
+ptr a b = do
+ p <- mallocForeignPtrArray 2
+ withForeignPtr p $ \q -> do
+ poke q a
+ pokeElemOff q 1 b
+ return p
+
+minimumCapacity :: Int
+minimumCapacity = 16
+
+defaultCapacity :: Int
+defaultCapacity = 32
+
+bufferWithCapacity :: MArray a e IO => Int -> IO (Buffer a e)
+bufferWithCapacity i =
+ Buffer i <$> newArray_ (0, (minimumCapacity `max` i) - 1)
+
+withCapacity :: (MonadIO m, MArray a e IO) => Int -> m (Deque a e)
+withCapacity i = liftIO (Deque <$> ptr 0 0 <*> (bufferWithCapacity i >>= newIORef))
+
+empty :: (MonadIO m, MArray a e IO) => m (Deque a e)
+empty = withCapacity defaultCapacity
+{-# INLINE empty #-}
+
+-- unsafeRead
+read :: MArray a e IO => Buffer a e -> Int -> IO e
+read (Buffer s c) i = do
+ readArray c (i `mod` s)
+{-# INLINE read #-}
+
+-- unsafeWrite
+write :: MArray a e IO => Buffer a e -> Int -> e -> IO ()
+write (Buffer s c) i e = do
+ writeArray c (i `mod` s) e
+{-# INLINE write #-}
+
+grow :: MArray a e IO => Buffer a e -> Int -> Int -> IO (Buffer a e)
+grow c b t = do
+ c' <- bufferWithCapacity (size c * 2)
+ forM_ [t..b-1] $ \i -> read c i >>= write c' i
+ return c'
+{-# INLINE grow #-}
+
+peekBottom :: Ptr Int -> IO Int
+peekBottom p = peekElemOff p 1
+
+peekTop :: Ptr Int -> IO Int
+peekTop p = peek p
+
+pokeBottom :: Ptr Int -> Int -> IO ()
+pokeBottom p = pokeElemOff p 1
+
+push :: (MonadIO m, MArray a e IO) => e -> Deque a e -> m ()
+push o (Deque tb content) = liftIO $ withForeignPtr tb $ \p -> do
+ b <- peekBottom p
+ t <- peekTop p
+ a <- readIORef content
+ let size' = b - t
+ if size' >= size a
+ then do
+ a' <- grow a b t
+ writeIORef content a'
+ go p a' b
+ else go p a b
+ where
+ go p arr b = do
+ write arr b o
+ pokeBottom p (b + 1)
+
+data Stolen e
+ = Empty
+ | Abort
+ | Stolen e
+ deriving (Data,Typeable,Eq,Ord,Show,Read)
+
+steal :: (MonadIO m, MArray a e IO) => Deque a e -> m (Stolen e)
+steal (Deque tb content) = liftIO $ withForeignPtr tb $ \p -> do
+ t <- peekTop p
+ b <- peekBottom p
+ a <- readIORef content
+ let size' = b - t
+ if size' <= 0
+ then return Empty
+ else do
+ o <- read a t
+ result <- compareAndSwapBool p t (t + 1)
+ return $ if result then Stolen o else Abort
+
+{-
+steal' :: MArray a e IO => Deque a e -> IO (Maybe e)
+steal' deque = do
+ s <- steal deque
+ case s of
+ Stolen e -> return (Just e)
+ Empty -> return Nothing
+ Abort -> steal' deque
+-}
+
+pop :: (MonadIO m, MArray a e IO) => Deque a e -> m (Maybe e)
+pop (Deque tb content) = liftIO $ withForeignPtr tb $ \p -> do
+ b <- peekBottom p
+ a <- readIORef content
+ let b' = b - 1
+ pokeBottom p b'
+ t <- peekTop p
+ let size' = b' - t
+ if size' < 0
+ then do
+ pokeBottom p t
+ return Nothing
+ else do
+ o <- read a b'
+ if size' > 0
+ then return (Just o)
+ else do
+ result <- compareAndSwapBool p t (t + 1)
+ if result
+ then do
+ pokeBottom p (t + 1)
+ return (Just o)
+ else do
+ pokeBottom p (t + 1)
+ return Nothing
diff --git a/Reactor/Filtered.hs b/Reactor/Filtered.hs
new file mode 100644
index 0000000..d106053
--- /dev/null
+++ b/Reactor/Filtered.hs
@@ -0,0 +1,14 @@
+module Reactor.Filtered where
+
+import Prelude hiding (filter)
+import qualified Prelude
+
+class Filtered f where
+ filter :: (a -> Bool) -> f a -> f a
+
+instance Filtered [] where
+ filter = Prelude.filter
+
+instance Filtered Maybe where
+ filter p m@(Just a) | p a = m
+ filter _ _ = Nothing
diff --git a/Reactor/Moore.hs b/Reactor/Moore.hs
new file mode 100644
index 0000000..5995cb2
--- /dev/null
+++ b/Reactor/Moore.hs
@@ -0,0 +1,34 @@
+{-# LANGUAGE DeriveDataTypeable #-}
+module Reactor.Moore
+ ( Moore(..)
+ ) where
+
+import Control.Applicative
+import Control.Comonad
+import Data.Functor.Apply
+import Data.Typeable
+
+data Moore i o = Moore { step :: i -> Moore i o, current :: o }
+ deriving Typeable
+
+instance Functor (Moore i) where
+ fmap g (Moore f o) = Moore (fmap g . f) (g o)
+ b <$ _ = pure b
+
+instance Extend (Moore i) where
+ duplicate m = Moore (duplicate . step m) m
+ extend g m = Moore (extend g . step m) (g m)
+
+instance Comonad (Moore i) where
+ extract (Moore _ o) = o
+
+instance Apply (Moore i) where
+ Moore ff f <.> Moore fa a = Moore (\i -> ff i <.> fa i) (f a)
+ a <. _ = a
+ _ .> b = b
+
+instance Applicative (Moore i) where
+ pure o = m where m = Moore (const m) o
+ (<*>) = (<.>)
+ (<* ) = (<. )
+ ( *>) = ( .>)
diff --git a/Reactor/Observable.hs b/Reactor/Observable.hs
new file mode 100644
index 0000000..e193101
--- /dev/null
+++ b/Reactor/Observable.hs
@@ -0,0 +1,216 @@
+module Reactor.Observable
+ ( Observable(..)
+ , never
+ , fby
+ , take, drop
+ , safe
+ , filterMap, (!?)
+ , observe
+ , counted
+ -- , andThen
+ , (<||>)
+ -- , feed
+ -- , head
+ -- , safeHead
+ ) where
+
+import Prelude hiding (filter, take, drop)
+import Control.Applicative
+import Control.Exception hiding (handle)
+import Control.Monad
+import Control.Monad.Error
+import Data.Foldable
+import qualified Data.Foldable as Foldable
+import Data.Functor.Bind
+import Data.Functor.Plus
+import Data.Functor.Extend
+import Data.Functor.Contravariant
+import Data.Monoid
+import Data.IORef
+import Reactor.Atomic
+import Reactor.Filtered
+import Reactor.Observer
+import Reactor.Task
+import Reactor.Subscription
+
+newtype Observable a = Observable { subscribe :: Observer a -> Task Subscription }
+
+instance Functor Observable where
+ fmap f s = Observable (subscribe s . contramap f)
+
+instance Apply Observable where
+ mf <.> ma = Observable $ \o -> do
+ funs <- io $ newIORef []
+ fflag <- atomic (1 :: Int)
+ aflag <- atomic (1 :: Int)
+ let discard pf sf = do
+ i <- atomicFetchAndAnd pf 0
+ when (i == 1) $ do
+ j <- atomicFetch sf
+ when (j == 0) $ complete o
+ mappend
+ <$> subscribe_ mf
+ (\f -> io $ atomicModifyIORef funs (\fs -> (f:fs, ())))
+ (handle o)
+ (discard fflag aflag)
+ <*> subscribe_ ma
+ (\a -> do
+ fs <- io $ readIORef funs
+ spawn $ Foldable.mapM_ (\f -> o ! f a) fs)
+ (handle o)
+ (discard aflag fflag)
+
+instance Filtered Observable where
+ filter p s = Observable (subscribe s . filter p)
+
+instance Applicative Observable where
+ pure a = Observable $ \o -> (o ! a) *> complete o $> mempty
+ (<*>) = (<.>)
+
+instance Bind Observable where
+ mf >>- k = Observable $ \o -> do
+ counter <- atomic (1 :: Int)
+ topFlag <- atomic (1 :: Int)
+ let detach flag = do
+ clearing flag $ do
+ i <- atomicSubAndFetch counter 1
+ when (i == 0) $ complete o
+ subscribe_ mf
+ (\f -> do
+ flag <- atomic (1 :: Int)
+ _ <- atomicAddAndFetch counter 1
+ () <$ subscribe_ (k f)
+ (o !)
+ (handle o)
+ (detach flag))
+ (handle o)
+ (detach topFlag)
+
+instance Monad Observable where
+ return = pure
+ (>>=) = (>>-)
+
+instance Alt Observable where
+ a <!> b = Observable $ \o -> subscribe_ a
+ (o !)
+ (handle o)
+ (subscribe b o $> ())
+
+instance Plus Observable where
+ zero = Observable (\o -> complete o $> mempty)
+
+instance Alternative Observable where
+ empty = zero
+ (<|>) = (<!>)
+
+instance MonadPlus Observable where
+ mzero = zero
+ mplus = (<!>)
+
+instance Extend Observable where
+ duplicate p = Observable $ \o -> subscribe_ p
+ (\a -> o ! fby a p)
+ (handle o)
+ (complete o)
+ extend f p = Observable $ \o -> subscribe_ p
+ (\a -> o ! f (fby a p))
+ (handle o)
+ (complete o)
+
+safe :: Observable a -> Observable a
+safe p = Observable $ \o -> do
+ alive <- atomic (1 :: Int)
+ subscribe_ p
+ (\a -> given alive $ o ! a)
+ (\e -> clearing alive $ handle o e)
+ (clearing alive $ complete o)
+
+subscribe_ :: Observable a -> (a -> Task ()) -> (SomeException -> Task ()) -> Task () -> Task Subscription
+subscribe_ a f h c = subscribe a (Observer f h c)
+
+filterMap :: (a -> Maybe b) -> Observable a -> Observable b
+filterMap p s = Observable $ \o -> subscribe s $ o ?! p
+
+(!?) :: Observable a -> (a -> Maybe b) -> Observable b
+(!?) = flip filterMap
+
+never :: Observable a
+never = Observable $ \_ -> return mempty
+
+fby :: a -> Observable a -> Observable a
+fby a as = Observable $ \o -> do
+ o ! a
+ subscribe as o
+
+take :: Int -> Observable a -> Observable a
+take n p = Observable $ \o -> do
+ counter <- atomic n
+ subscribe_ p
+ (\a -> do
+ i <- io $ atomicSubAndFetch counter 1
+ when (i >= 0) $ o ! a
+ when (i == 0) $ complete o)
+ (handle o)
+ (do
+ i <- io $ atomicFetchAndAnd counter 0
+ when (i >= 0) $ complete o)
+
+drop :: Int -> Observable a -> Observable a
+drop n p = Observable $ \o -> do
+ counter <- atomic n
+ subscribe_ p
+ (\a -> do
+ i <- io $ atomicSubAndFetch counter 1
+ when (i < 0) $ o ! a)
+ (handle o)
+ (complete o)
+
+-- | Observe both at the same time.
+(<||>) :: Observable a -> Observable a -> Observable a
+p <||> q = Observable $ \o -> mappend <$> subscribe p o <*> subscribe q o
+
+observe :: Foldable f => f a -> Observable a
+observe t = Observable $ \o -> do
+ spawn $ do
+ Foldable.forM_ t (o !)
+ complete o
+ return mempty
+
+counted :: Observable a -> Observable (Int, a)
+counted p = Observable $ \o -> do
+ counter <- atomic 0
+ subscribe_ p
+ (\a -> do
+ i <- atomicFetchAndAdd counter 1
+ o ! (i, a))
+ (handle o)
+ (complete o)
+
+-- do something when the observable completes
+
+{-
+andThen :: Observable a -> Task () -> Task ()
+andThen p t = spawn $ do
+ () <$ subscribe_ p (\_ -> return ()) (\_ -> t) t
+-}
+
+{-
+
+feed :: Moore i o -> Observable i -> Task o
+feed machine p = do
+ m <- io $ newIORef machine
+ callCC $ \resume ->
+ subscribe p
+ (\i -> io $ atomicModifyIORef m $ \m' -> (step m' i, ()))
+ throwError
+ (do Moore _ o <- io $ readIORef m
+ resume o)
+
+safeHead :: Onservable a -> Task (Maybe a)
+safeHead = feed (Moore (pure . Just) Nothing) . take 1
+
+head :: Observable a -> Task a
+head = feed (Moore pure (error "head: empty observable")) . take 1
+
+-- uncons :: Observable a -> Task (Maybe (a, Observable a))
+-}
diff --git a/Reactor/Observer.hs b/Reactor/Observer.hs
new file mode 100644
index 0000000..58e5923
--- /dev/null
+++ b/Reactor/Observer.hs
@@ -0,0 +1,38 @@
+{-# LANGUAGE DeriveDataTypeable #-}
+module Reactor.Observer
+ ( Observer(..)
+ , (?!)
+ ) where
+
+import Prelude hiding (filter)
+import Control.Monad
+import Control.Exception hiding (handle)
+import Control.Monad.Error
+import Data.Monoid
+import Data.Functor.Contravariant
+import Data.Data
+import Reactor.Filtered
+import Reactor.Task
+
+data Observer a = Observer
+ { (!) :: a -> Task ()
+ , handle :: SomeException -> Task ()
+ , complete :: Task ()
+ } deriving Typeable
+
+instance Contravariant Observer where
+ contramap g (Observer f h c) = Observer (f . g) h c
+
+instance Filtered Observer where
+ filter p (Observer f h c) = Observer (\a -> when (p a) (f a)) h c
+
+instance Monoid (Observer a) where
+ mempty = Observer (\_ -> return ()) throwError (return ())
+ p `mappend` q = Observer
+ (\a -> do p ! a; q ! a)
+ (\e -> do handle p e; handle q e)
+ (do complete p; complete q)
+
+-- filter and map in one operation
+(?!) :: Observer b -> (a -> Maybe b) -> Observer a
+Observer f h c ?! p = Observer (maybe (return ()) f . p) h c
diff --git a/Reactor/Subscription.hs b/Reactor/Subscription.hs
new file mode 100644
index 0000000..f496f27
--- /dev/null
+++ b/Reactor/Subscription.hs
@@ -0,0 +1,17 @@
+{-# LANGUAGE DeriveDataTypeable #-}
+module Reactor.Subscription
+ ( Subscription(..)
+ ) where
+
+import Control.Applicative
+import Reactor.Task
+import Data.Monoid
+import Data.Typeable
+
+-- Like in real life, cancelling a subscription may not stop it from sending you stuff immediately!
+newtype Subscription = Subscription { cancel :: Task () }
+ deriving Typeable
+
+instance Monoid Subscription where
+ mempty = Subscription (return ())
+ Subscription a `mappend` Subscription b = Subscription (a *> b)
diff --git a/Reactor/Task.hs b/Reactor/Task.hs
new file mode 100644
index 0000000..fa9cc80
--- /dev/null
+++ b/Reactor/Task.hs
@@ -0,0 +1,96 @@
+{-# LANGUAGE MultiParamTypeClasses, DeriveDataTypeable #-}
+module Reactor.Task
+ ( Task
+ , run
+ , spawn
+ , io
+ ) where
+
+import Control.Applicative
+import Control.Monad
+import Control.Exception
+import Control.Monad.Reader.Class
+import Control.Monad.Error.Class
+import Control.Monad.IO.Class
+import Data.Array.IO
+import Data.Functor.Bind
+import Data.Functor.Plus
+import Reactor.Deque (Deque)
+import Data.Data
+import qualified Reactor.Deque as Deque
+
+newtype Env = Env { envDeque :: Deque IOArray (Task ()) }
+
+mkEnv :: IO Env
+mkEnv = Env <$> Deque.empty
+
+newtype Task a = Task
+ { runTask :: (a -> IO ()) ->
+ (SomeException -> IO ()) ->
+ (Env -> IO ())
+ } deriving Typeable
+
+instance Functor Task where
+ fmap f (Task m) = Task $ \ks -> m (ks . f)
+
+instance Apply Task where
+ Task mf <.> Task ma = Task $ \ks kf e -> mf (\f -> ma (ks . f) kf e) kf e
+
+instance Applicative Task where
+ pure a = Task (\ks _kf _e -> ks a)
+ (<*>) = (<.>)
+
+instance Bind Task where
+ Task mf >>- k = Task (\ks kf e -> mf (\a -> runTask (k a) ks kf e) kf e)
+
+instance Monad Task where
+ return = pure
+ (>>=) = (>>-)
+
+instance MonadReader Env Task where
+ ask = Task (\ks _kf e -> ks e)
+ local f (Task ma) = Task (\ks kf e -> ma ks kf (f e))
+
+instance MonadIO Task where
+ liftIO = io
+
+io :: IO a -> Task a
+io act = Task (\ks _kf _e -> act >>= ks)
+
+instance MonadError SomeException Task where
+ throwError err = Task (\_ks kf _e -> kf err)
+ catchError (Task m) h = Task (\ks kf e -> m ks (\err -> runTask (h err) ks kf e) e)
+
+instance Alt Task where
+ Task ma <!> Task mb = Task (\ks kf e -> ma ks (\_ -> mb ks kf e) e)
+
+instance Plus Task where
+ zero = Task (\_ks kf _e -> kf (toException (ErrorCall "empty")))
+
+instance Alternative Task where
+ (<|>) = (<!>)
+ empty = zero
+
+instance MonadPlus Task where
+ mzero = zero
+ mplus = (<!>)
+
+spawn :: Task () -> Task ()
+spawn task = Task (\_ks _kf e -> Deque.push task (envDeque e))
+
+-- run a single threaded pump, all tasks are placed locally
+run :: Task () -> IO ()
+run task0 = do
+ env <- mkEnv
+ bracket_
+ (register env)
+ (go env task0)
+ (unregister env)
+ where
+ go :: Env -> Task () -> IO ()
+ go env (Task m) = m (success env) (failure env) env
+ success env _ = Deque.pop (envDeque env) >>= maybe (return ()) (go env)
+ failure _env = throw -- TODO: shut down workers?
+ register _env = return () -- TODO: start up if necessary and tell worker threads about us
+ unregister _env = return () -- TODO: shutdown if necessary and tell worker threads about us
+
diff --git a/Setup.lhs b/Setup.lhs
new file mode 100644
index 0000000..6cbd928
--- /dev/null
+++ b/Setup.lhs
@@ -0,0 +1,7 @@
+#!/usr/bin/runhaskell
+> module Main (main) where
+
+> import Distribution.Simple
+
+> main :: IO ()
+> main = defaultMain
diff --git a/reactor.cabal b/reactor.cabal
new file mode 100644
index 0000000..99a9d71
--- /dev/null
+++ b/reactor.cabal
@@ -0,0 +1,37 @@
+name: reactor
+category: Concurrency
+version: 0.1.3
+license: BSD3
+cabal-version: >= 1.2
+license-file: LICENSE
+author: Edward A. Kmett
+maintainer: Edward A. Kmett <ekmett@gmail.com>
+stability: experimental
+homepage: http://comonad.com/reader/
+copyright: Copyright (C) 2011 Edward A. Kmett
+synopsis: Reactor - task parallel reactive programming
+description: Reactor - task parallel reactive programming
+build-type: Simple
+
+library
+ build-depends:
+ base >= 4 && < 4.4,
+ array >= 0.3.0 && < 0.4,
+ semigroupoids >= 1.2.1 && < 1.3,
+ bits-atomic >= 0.1.3 && < 0.2,
+ comonad >= 1.1 && < 1.2,
+ transformers >= 0.2.2 && < 0.3,
+ mtl >= 2.0.1.0 && < 2.1,
+ contravariant >= 0.1.0.1 && < 0.2
+
+ exposed-modules:
+ Reactor.Atomic
+ Reactor.Deque
+ Reactor.Filtered
+ Reactor.Moore
+ Reactor.Observer
+ Reactor.Observable
+ Reactor.Subscription
+ Reactor.Task
+
+ ghc-options: -Wall