summaryrefslogtreecommitdiff
path: root/Reactor/Deque.hs
blob: bf6096d86acfdc0c226ad2340f1028374d243595 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
{-# LANGUAGE UndecidableInstances, FlexibleContexts, DeriveDataTypeable #-}

module Reactor.Deque (
    Deque 

  -- * Local stack operations
  , empty        -- :: (MonadIO m, MArray a e IO) => IO (Deque a e)
  , push         -- :: (MonadIO m, MArray a e IO) => e -> Deque a e -> IO ()
  , pop          -- :: (MonadIO m, MArray a e IO) => Deque a e -> IO (Maybe e)

  -- * Performance tuning
  , withCapacity -- :: (MonadIO m, MArray a e IO) => Int -> IO (Deque a e)
  , minimumCapacity -- :: Int
  , defaultCapacity -- :: Int

  -- * Work stealing
  , steal        -- :: (MonadIO m, MArray a e IO) => Deque a e -> IO (Stolen e)
  , Stolen(..)  
  ) where

-- | For an explanation of the implementation, see \"Dynamic Circular Work-Stealing Deque\" 
-- by David Chase and Yossi Lev of Sun Microsystems.

import Prelude hiding (read)
import Control.Applicative hiding (empty)
import Data.Bits.Atomic
import Foreign.Ptr
import Foreign.ForeignPtr
import Foreign.Storable
import Data.IORef
import Data.Array.MArray
import Control.Monad
import Control.Monad.IO.Class
import Data.Data
import System.IO.Unsafe

data Buffer a e = Buffer {-# UNPACK #-} !Int !(a Int e) 

instance Typeable2 a => Typeable1 (Buffer a) where
  typeOf1 tae = mkTyConApp bufferTyCon [typeOf1 (aInte tae)]
    where aInte :: t a e -> a Int e
          aInte = undefined

bufferTyCon :: TyCon
bufferTyCon = mkTyCon "Reactor.Deque.Buffer"

size :: Buffer a e -> Int
size (Buffer i _) = i

data Deque a e = Deque 
  { _tb :: ForeignPtr Int 
  , _content :: IORef (Buffer a e)
  }

instance (MArray a e IO, Show e) => Show (Deque a e) where
  showsPrec d (Deque tb content) = unsafePerformIO $ do
    (t,b) <- withForeignPtr tb $ \p -> (,) <$> peekTop p <*> peekBottom p 
    buffer <- readIORef content
    contents <- forM [t..b-1] (read buffer)
    return $ showParen (d > 10) $ 
      showString "Deque (ptr " . showsPrec 11 t . showChar ' ' . showsPrec 11 b . showString ") (buffer " . showsPrec 11 contents . showChar ')'

instance Typeable2 a => Typeable1 (Deque a) where
  typeOf1 dae = mkTyConApp dequeTyCon [typeOf1 (aInte dae)]
    where aInte :: t a e -> a Int e
          aInte = undefined

dequeTyCon :: TyCon
dequeTyCon = mkTyCon "Reactor.Deque.Deque"

ptr :: Storable a => a -> a -> IO (ForeignPtr a)
ptr a b = do
  p <- mallocForeignPtrArray 2
  withForeignPtr p $ \q -> do 
    poke q a
    pokeElemOff q 1 b
  return p

minimumCapacity :: Int
minimumCapacity = 16

defaultCapacity :: Int
defaultCapacity = 32

bufferWithCapacity :: MArray a e IO => Int -> IO (Buffer a e)
bufferWithCapacity i = 
  Buffer i <$> newArray_ (0, (minimumCapacity `max` i) - 1)

withCapacity :: (MonadIO m, MArray a e IO) => Int -> m (Deque a e)
withCapacity i = liftIO (Deque <$> ptr 0 0 <*> (bufferWithCapacity i >>= newIORef))

empty :: (MonadIO m, MArray a e IO) => m (Deque a e)
empty = withCapacity defaultCapacity
{-# INLINE empty #-}
  
-- unsafeRead 
read :: MArray a e IO => Buffer a e -> Int -> IO e
read (Buffer s c) i = do
  readArray c (i `mod` s)
{-# INLINE read #-}

-- unsafeWrite
write :: MArray a e IO => Buffer a e -> Int -> e -> IO ()
write (Buffer s c) i e = do
  writeArray c (i `mod` s) e
{-# INLINE write #-}

grow :: MArray a e IO => Buffer a e -> Int -> Int -> IO (Buffer a e) 
grow c b t = do
  c' <- bufferWithCapacity (size c * 2)
  forM_ [t..b-1] $ \i -> read c i >>= write c' i 
  return c'
{-# INLINE grow #-}

peekBottom :: Ptr Int -> IO Int
peekBottom p = peekElemOff p 1

peekTop :: Ptr Int -> IO Int
peekTop p = peek p

pokeBottom :: Ptr Int -> Int -> IO ()
pokeBottom p = pokeElemOff p 1

push  :: (MonadIO m, MArray a e IO) => e -> Deque a e -> m ()
push o (Deque tb content) = liftIO $ withForeignPtr tb $ \p -> do
  b <- peekBottom p
  t <- peekTop p
  a <- readIORef content
  let size' = b - t
  if size' >= size a
    then do 
      a' <- grow a b t 
      writeIORef content a' 
      go p a' b
    else go p a  b
  where
    go p arr b = do
      write arr b o
      pokeBottom p (b + 1)

data Stolen e 
  = Empty 
  | Abort 
  | Stolen e
  deriving (Data,Typeable,Eq,Ord,Show,Read)

steal :: (MonadIO m, MArray a e IO) => Deque a e -> m (Stolen e)
steal (Deque tb content) = liftIO $ withForeignPtr tb $ \p -> do 
     t <- peekTop p
     b <- peekBottom p
     a <- readIORef content
     let size' = b - t
     if size' <= 0
       then return Empty
       else do
         o <- read a t
         result <- compareAndSwapBool p t (t + 1)
         return $ if result then Stolen o else Abort

{-
steal' :: MArray a e IO => Deque a e -> IO (Maybe e)
steal' deque = do
  s <- steal deque 
  case s of
    Stolen e -> return (Just e)
    Empty -> return Nothing
    Abort -> steal' deque
-}

pop :: (MonadIO m, MArray a e IO) => Deque a e -> m (Maybe e)
pop (Deque tb content) = liftIO $ withForeignPtr tb $ \p -> do
  b <- peekBottom p
  a <- readIORef content
  let b' = b - 1
  pokeBottom p b'
  t <- peekTop p
  let size' = b' - t
  if size' < 0 
    then do
      pokeBottom p t
      return Nothing
    else do
      o <- read a b'
      if size' > 0 
        then return (Just o)
        else do
          result <- compareAndSwapBool p t (t + 1)
          if result 
            then do
              pokeBottom p (t + 1)
              return (Just o)
            else do
              pokeBottom p (t + 1)
              return Nothing