summaryrefslogtreecommitdiff
path: root/src/Control/Funflow/External/Executor.hs
blob: c7c9a5c54c9817f2989f1f4f991b14d226f40d46 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
{-# LANGUAGE LambdaCase          #-}
{-# LANGUAGE OverloadedStrings   #-}
{-# LANGUAGE QuasiQuotes         #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell     #-}
-- | Executor for external tasks.
--
--   An executor will poll for tasks on the co-ordinator, mark them as in
--   progress and then execute them.
--
--   You probably want to start with 'executeLoop'.
module Control.Funflow.External.Executor where

import           Control.Arrow                        (second)
import           Control.Concurrent                   (threadDelay)
import           Control.Concurrent.Async
import           Control.Concurrent.MVar
import           Control.Exception.Safe
import qualified Control.Funflow.ContentStore         as CS
import           Control.Funflow.External
import           Control.Funflow.External.Coordinator
import           Control.Lens
import           Control.Monad                        (forever, mzero, unless)
import           Control.Monad.Trans                  (lift)
import           Control.Monad.Trans.Maybe
import qualified Data.Aeson                           as Json
import qualified Data.ByteString                      as BS
import           Data.Foldable                        (for_)
import           Data.Maybe                           (isJust, isNothing)
import           Data.Monoid                          ((<>))
import qualified Data.Text                            as T
import           Katip                                as K
import           Network.HostName
import           Path
import           Path.IO
import           System.Clock
import           System.Exit                          (ExitCode (..))
import           System.IO                            (Handle, IOMode (..),
                                                       openFile, stderr, stdout)
import           System.Posix.Env                     (getEnv)
import           System.Posix.User
import           System.Process
import           GHC.IO.Handle                        (hClose)

data ExecutionResult =
    -- | The result already exists in the store and there is no need
    --   to execute. This is also returned if the job is already running
    --   elsewhere.
    Cached
    -- | The computation is already running elsewhere. This is probably
    --   indicative of a bug, because the coordinator should only allow one
    --   instance of a task to be running at any time.
  | AlreadyRunning
    -- | Execution completed successfully after a certain amount of time.
  | Success TimeSpec
    -- | Execution failed with the following exit code.
    --   TODO where should logs go?
  | Failure TimeSpec Int
    -- | The executor itself failed to execute the external task.
    --   E.g. because the executable was not found.
  | ExecutorFailure IOException

-- | Execute an individual task.
execute :: CS.ContentStore -> TaskDescription -> KatipContextT IO ExecutionResult
execute store td = logError $ do
  status <- CS.withConstructIfMissing store (td ^. tdOutput) $ \fp -> do
    (fpOut, hOut) <- lift $
      CS.createMetadataFile store (td ^. tdOutput) [relfile|stdout|]
    (fpErr, hErr) <- lift $
      CS.createMetadataFile store (td ^. tdOutput) [relfile|stderr|]
    let
      withFollowOutput
        | td ^. tdTask . etWriteToStdOut . to outputCaptureToRelFile
              . to isNothing
        = withFollowFile fpErr stderr
        | otherwise
        = withFollowFile fpErr stderr
        . withFollowFile fpOut stdout
      cmd = T.unpack $ td ^. tdTask . etCommand
      procSpec params textEnv = (proc cmd $ T.unpack <$> params)
        { cwd = Just (fromAbsDir fp)
        , close_fds = True
        , std_err = UseHandle hErr
        , std_out = UseHandle hOut
        , env = map (bimap T.unpack T.unpack) <$> textEnv
        }
      convParam = ConvParam
        { convPath = pure . CS.itemPath store
        , convEnv = \e -> T.pack <$> MaybeT (getEnv $ T.unpack e)
        , convUid = lift getEffectiveUserID
        , convGid = lift getEffectiveGroupID
        , convOut = pure fp
        }
    mbParams <- lift $ runMaybeT $
      traverse (paramToText convParam) (td ^. tdTask . etParams)
    mbEnv <- case td ^. tdTask . etEnv of
      EnvInherit -> pure Nothing
      EnvExplicit x ->
        (lift . runMaybeT $ traverse (sequence . second (paramToText convParam)) x)
        >>= \case
          Nothing -> fail "A parameter was not ready"
          jp -> return jp
    params <- case mbParams of
      Nothing     -> fail "A parameter was not ready"
      Just params -> return params

    let
      inputItems :: [CS.Item]
      inputItems = do
        Param fields <- td ^. tdTask . etParams
        ParamPath inputPath <- fields
        case inputPath of
          IPItem item      -> pure item
          -- XXX: Store these references as well.
          IPExternalFile _ -> mzero
          IPExternalDir _  -> mzero
    CS.setInputs store (td ^. tdOutput) inputItems
    CS.setMetadata store (td ^. tdOutput)
      ("external-task"::T.Text)
      (Json.encode (td ^. tdTask))

    start <- lift $ getTime Monotonic
    let theProc = procSpec params mbEnv
    katipAddNamespace "process" . katipAddContext (sl "processId" $ show theProc) $ do
      $(logTM) InfoS "Executing"
      res <- lift $ tryIO $ withCreateProcess theProc $ \_ _ _ ph ->
        -- Error output should be displayed on our stderr stream
        withFollowOutput $ do
          exitCode <- waitForProcess ph
          hClose hErr
          hClose hOut
          end <- getTime Monotonic
          case exitCode of
            ExitSuccess   -> do
              for_ (td ^. tdTask . etWriteToStdOut . to outputCaptureToRelFile)
                $ \file -> copyFile fpOut (fp </> file)
              return $ Right (diffTimeSpec start end)
            ExitFailure i ->
              return $ Left (diffTimeSpec start end, i)
      case res of
        -- execution was successful
        Right (Right r) -> return $ Right r
        -- execution failed
        Right (Left e)  -> return $ Left (Right e)
        -- executor itself failed
        Left e          -> return $ Left (Left e)
  case status of
    CS.Missing (Left e)        -> return (ExecutorFailure e)
    CS.Missing (Right (t, ec)) -> return (Failure t ec)
    CS.Pending ()              -> return AlreadyRunning
    CS.Complete (Nothing, _)   -> return Cached
    CS.Complete (Just t, _)    -> return (Success t)
  where
    logError = flip withException $ \(e::SomeException) ->
      $(logTM) ErrorS . ls $ displayException e

-- | Execute tasks forever
executeLoop :: forall c. Coordinator c
            => c
            -> Config c
            -> CS.ContentStore
            -> IO ()
executeLoop coord cfg store =
  executeLoopWithScribe coord cfg store =<<
    mkHandleScribe ColorIfTerminal stdout InfoS V2

-- | Same as 'executeLoop', but allows specifying a custom 'Scribe' for logging
executeLoopWithScribe :: forall c. Coordinator c
                      => c
                      -> Config c
                      -> CS.ContentStore
                      -> Scribe
                      -> IO ()
executeLoopWithScribe _ cfg store handleScribe = do
  let mkLogEnv = registerScribe "stdout" handleScribe defaultScribeSettings =<< initLogEnv "FFExecutorD" "production"
  bracket mkLogEnv closeScribes $ \le -> do
    let initialContext = ()
        initialNamespace = "executeLoop"

    runKatipContextT le initialContext initialNamespace $ do
      $(logTM) InfoS "Initialising connection to coordinator."
      hook :: Hook c <- lift $ initialise cfg
      executor <- lift $ Executor <$> getHostName

      let -- Known failures that do not affect the executors ability
          -- to execute further tasks will be logged and ignored.
          handleFailures = handle $ \(e::CS.StoreError) ->
            -- Certain store errors can occur if an item is forcibly removed
            -- while the executor is constructing it or picked up a
            -- corresponding outdated task from the queue.
            -- XXX: The store should distinguish between recoverable
            --   and unrecoverable errors.
            $(logTM) WarningS . ls $ displayException e

      forever $ handleFailures $ do
        $(logTM) DebugS "Awaiting task from coordinator."
        mb <- withPopTask hook executor $ \task ->
          katipAddContext (sl "task" $ task ^. tdOutput) $ do
            $(logTM) DebugS "Checking task"
            res <- execute store task
            case res of
              Cached      -> do
                $(logTM) InfoS "Task was cached"
                return (0, Right ())
              Success t   -> do
                $(logTM) InfoS "Task completed successfully"
                return (t, Right ())
              Failure t i -> do
                $(logTM) WarningS "Task failed"
                return (t, Left i)
              ExecutorFailure e -> do
                $(logTM) ErrorS $ "Executor failed: " <> ls (displayException e)
                return (0, Left 2)
              AlreadyRunning -> do
                -- XXX:
                --   This should not happen and indicates a programming error
                --   or invalid state.
                --   We do not want to just put the task back on the queue,
                --   as it would cause a loop.
                --   We do not want to just mark the task done, as a potential
                --   later completion of the already running external task
                --   would to mark it as done then.
                --   We cannot mark it as ongoing, as we don't have information
                --   on the executor where the task is already running.
                $(logTM) ErrorS $
                  "Received an already running task from the coordinator "
                  <> showLS (task ^. tdOutput)
                error $
                  "Received an already running task from the coordinator "
                  ++ show (task ^. tdOutput)
        case mb of
          Nothing -> lift $ threadDelay 1000000
          Just () -> return ()

-- | @withFollowFile in out@ follows the file @in@
--   and prints contents to @out@ as they appear.
--   The file must exist. Doesn't handle file truncation.
withFollowFile :: Path Abs File -> Handle -> IO a -> IO a
withFollowFile infile outhandle action = do
  mv <- newEmptyMVar
  inhandle <- openFile (fromAbsFile infile) ReadMode
  let loop = do
        some <- BS.hGetSome inhandle 4096
        if BS.null some then do
          done <- isJust <$> tryTakeMVar mv
          unless done $ do
            threadDelay 10000
            loop
        else do
          BS.hPut outhandle some
          loop
  res <- snd <$> concurrently (tryIO loop) (action <* putMVar mv ())
  hClose inhandle
  return res