summaryrefslogtreecommitdiff
path: root/Database/Handle.hs
blob: 389ec366ddeb652314ea267cc7a71ca1d25a6c93 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
{- Persistent sqlite database handles.
 -
 - Copyright 2015-2018 Joey Hess <id@joeyh.name>
 -
 - Licensed under the GNU AGPL version 3 or higher.
 -}

module Database.Handle (
	DbHandle,
	DbConcurrency(..),
	openDb,
	TableName,
	queryDb,
	closeDb,
	commitDb,
	commitDb',
) where

import Utility.Exception
import Utility.FileSystemEncoding

import Database.Persist.Sqlite
import qualified Database.Sqlite as Sqlite
import Control.Monad
import Control.Monad.IO.Class (liftIO)
import Control.Concurrent
import Control.Concurrent.Async
import Control.Exception (throwIO, BlockedIndefinitelyOnMVar(..))
import qualified Data.Text as T
import Control.Monad.Trans.Resource (runResourceT)
import Control.Monad.Logger (runNoLoggingT)
import System.IO

{- A DbHandle is a reference to a worker thread that communicates with
 - the database. It has a MVar which Jobs are submitted to. -}
data DbHandle = DbHandle DbConcurrency (Async ()) (MVar Job)

{- Name of a table that should exist once the database is initialized. -}
type TableName = String

{- Sqlite only allows a single write to a database at a time; a concurrent
 - write will crash. 
 - 
 - MultiWrter works around this limitation.
 - The downside of using MultiWriter is that after writing a change to the
 - database, the a query using the same DbHandle will not immediately see
 - the change! This is because the change is actually written using a
 - separate database connection, and caching can prevent seeing the change.
 - Also, consider that if multiple processes are writing to a database,
 - you can't rely on seeing values you've just written anyway, as another
 - process may change them.
 -
 - When a database can only be written to by a single process (enforced by
 - a lock file), use SingleWriter. Changes written to the database will
 - always be immediately visible then. Multiple threads can write; their
 - writes will be serialized.
 -}
data DbConcurrency = SingleWriter | MultiWriter

{- Opens the database, but does not perform any migrations. Only use
 - once the database is known to exist and have the right tables. -}
openDb :: DbConcurrency -> FilePath -> TableName -> IO DbHandle
openDb dbconcurrency db tablename = do
	jobs <- newEmptyMVar
	worker <- async (workerThread (T.pack db) tablename jobs)
	
	-- work around https://github.com/yesodweb/persistent/issues/474
	liftIO $ fileEncoding stderr

	return $ DbHandle dbconcurrency worker jobs

{- This is optional; when the DbHandle gets garbage collected it will
 - auto-close. -}
closeDb :: DbHandle -> IO ()
closeDb (DbHandle _ worker jobs) = do
	putMVar jobs CloseJob
	wait worker

{- Makes a query using the DbHandle. This should not be used to make
 - changes to the database!
 -
 - Note that the action is not run by the calling thread, but by a
 - worker thread. Exceptions are propigated to the calling thread.
 -
 - Only one action can be run at a time against a given DbHandle.
 - If called concurrently in the same process, this will block until
 - it is able to run.
 -
 - Note that when the DbHandle was opened in MultiWriter mode, recent
 - writes may not be seen by queryDb.
 -}
queryDb :: DbHandle -> SqlPersistM a -> IO a
queryDb (DbHandle _ _ jobs) a = do
	res <- newEmptyMVar
	putMVar jobs $ QueryJob $
		liftIO . putMVar res =<< tryNonAsync a
	(either throwIO return =<< takeMVar res)
		`catchNonAsync` (const $ error "sqlite query crashed")

{- Writes a change to the database.
 -
 - In MultiWriter mode, writes can fail if another write is happening
 - concurrently. So write failures are caught and retried repeatedly
 - for up to 10 seconds, which should avoid all but the most exceptional
 - problems.
 -}
commitDb :: DbHandle -> SqlPersistM () -> IO ()
commitDb h wa = robustly Nothing 100 (commitDb' h wa)
  where
	robustly :: Maybe SomeException -> Int -> IO (Either SomeException ()) -> IO ()
	robustly e 0 _ = error $ "failed to commit changes to sqlite database: " ++ show e
	robustly _ n a = do
		r <- a
		case r of
			Right _ -> return ()
			Left e -> do
				threadDelay 100000 -- 1/10th second
				robustly (Just e) (n-1) a

commitDb' :: DbHandle -> SqlPersistM () -> IO (Either SomeException ())
commitDb' (DbHandle MultiWriter _ jobs) a = do
	res <- newEmptyMVar
	putMVar jobs $ RobustChangeJob $ \runner ->
		liftIO $ putMVar res =<< tryNonAsync (runner a)
	takeMVar res
commitDb' (DbHandle SingleWriter _ jobs) a = do
	res <- newEmptyMVar
	putMVar jobs $ ChangeJob $
		liftIO . putMVar res =<< tryNonAsync a
	takeMVar res
		`catchNonAsync` (const $ error "sqlite commit crashed")

data Job
	= QueryJob (SqlPersistM ())
	| ChangeJob (SqlPersistM ())
	| RobustChangeJob ((SqlPersistM () -> IO ()) -> IO ())
	| CloseJob

workerThread :: T.Text -> TableName -> MVar Job -> IO ()
workerThread db tablename jobs = go
  where
	go = do
		v <- tryNonAsync (runSqliteRobustly tablename db loop)
		case v of
			Left e -> hPutStrLn stderr $
				"sqlite worker thread crashed: " ++ show e
			Right True -> go
			Right False -> return ()
	
	getjob :: IO (Either BlockedIndefinitelyOnMVar Job)
	getjob = try $ takeMVar jobs

	loop = do
		job <- liftIO getjob
		case job of
			-- Exception is thrown when the MVar is garbage
			-- collected, which means the whole DbHandle
			-- is not used any longer. Shutdown cleanly.
			Left BlockedIndefinitelyOnMVar -> return False
			Right CloseJob -> return False
			Right (QueryJob a) -> a >> loop
			Right (ChangeJob a) -> do
				a
				-- Exit this sqlite transaction so the
				-- database gets updated on disk.
				return True
			-- Change is run in a separate database connection
			-- since sqlite only supports a single writer at a
			-- time, and it may crash the database connection
			-- that the write is made to.
			Right (RobustChangeJob a) -> do
				liftIO (a (runSqliteRobustly tablename db))
				loop
	
-- Like runSqlite, but more robust.
--
-- New database connections can sometimes take a while to become usable.
-- This may be due to WAL mode recovering after a crash, or perhaps a bug
-- like described in blob 500f777a6ab6c45ca5f9790e0a63575f8e3cb88f.
-- So, loop until a select succeeds; once one succeeds the connection will
-- stay usable.
--
-- And sqlite sometimes throws ErrorIO when there's not really an IO problem,
-- but perhaps just a short read(). That's caught and retried several times.
runSqliteRobustly :: TableName -> T.Text -> (SqlPersistM a) -> IO a
runSqliteRobustly tablename db a = do
	conn <- opensettle maxretries
	go conn maxretries
  where
	maxretries = 100 :: Int
	
	rethrow msg e = throwIO $ userError $ show e ++ "(" ++ msg ++ ")"
	
	go conn retries = do
		r <- try $ runResourceT $ runNoLoggingT $
			withSqlConn (wrapConnection conn) $
				runSqlConn a
		case r of
			Right v -> return v
			Left ex@(Sqlite.SqliteException { Sqlite.seError = e })
				| e == Sqlite.ErrorIO ->
					let retries' = retries - 1
					in if retries' < 1
						then rethrow "after successful open" ex
						else go conn retries'
				| otherwise -> rethrow "after successful open" ex
	
	opensettle retries = do
		conn <- Sqlite.open db
		settle conn retries

	settle conn retries = do
		r <- try $ do
			stmt <- Sqlite.prepare conn nullselect
			void $ Sqlite.step stmt
			void $ Sqlite.finalize stmt
		case r of
			Right _ -> return conn
			Left ex@(Sqlite.SqliteException { Sqlite.seError = e })
				| e == Sqlite.ErrorBusy -> do
					-- Wait and retry any number of times; it 
					-- will stop being busy eventually.
					briefdelay
					settle conn retries
				| e == Sqlite.ErrorIO -> do
					-- Could be a real IO error,
					-- so don't retry indefinitely.
					Sqlite.close conn
					briefdelay
					let retries' = retries - 1
					if retries' < 1
						then rethrow "while opening database connection" ex
						else opensettle retries'
				| otherwise -> rethrow "while opening database connection" ex
	
	-- This should succeed for any table.
	nullselect = T.pack $ "SELECT null from " ++ tablename ++ " limit 1"

	briefdelay = threadDelay 1000 -- 1/1000th second