diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/Propellor/Bootstrap.hs | 3 | ||||
| -rw-r--r-- | src/Utility/ConcurrentOutput.hs | 173 |
2 files changed, 98 insertions, 78 deletions
diff --git a/src/Propellor/Bootstrap.hs b/src/Propellor/Bootstrap.hs index 6a5d5acb..2318b910 100644 --- a/src/Propellor/Bootstrap.hs +++ b/src/Propellor/Bootstrap.hs @@ -65,7 +65,7 @@ depsCommand = "( " ++ intercalate " ; " (concat [osinstall, cabalinstall]) ++ " aptinstall p = "apt-get --no-upgrade --no-install-recommends -y install " ++ p - -- This is the same build deps listed in debian/control. + -- This is the same deps listed in debian/control. debdeps = [ "gnupg" , "ghc" @@ -81,6 +81,7 @@ depsCommand = "( " ++ intercalate " ; " (concat [osinstall, cabalinstall]) ++ " , "libghc-mtl-dev" , "libghc-transformers-dev" , "libghc-exceptions-dev" + , "libghc-stm-dev" , "make" ] diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs index c6550b84..5535066f 100644 --- a/src/Utility/ConcurrentOutput.hs +++ b/src/Utility/ConcurrentOutput.hs @@ -1,3 +1,5 @@ +{-# LANGUAGE BangPatterns #-} + -- | Concurrent output handling. module Utility.ConcurrentOutput ( @@ -14,6 +16,7 @@ import Control.Monad.IO.Class (liftIO, MonadIO) import Control.Applicative import System.IO.Unsafe (unsafePerformIO) import Control.Concurrent +import Control.Concurrent.STM import Control.Concurrent.Async import Data.Maybe import Data.List @@ -25,21 +28,23 @@ import Utility.Monad import Utility.Exception data OutputHandle = OutputHandle - { outputLock :: MVar () -- ^ empty when locked - , outputLockedBy :: MVar Locker + { outputLock :: TMVar (Maybe Locker) } data Locker = GeneralLock - | ProcessLock P.ProcessHandle + | ProcessLock P.ProcessHandle String + +instance Show Locker where + show GeneralLock = "GeneralLock" + show (ProcessLock _ cmd) = "ProcessLock " ++ cmd -- | A shared global variable for the OutputHandle. {-# NOINLINE globalOutputHandle #-} globalOutputHandle :: MVar OutputHandle globalOutputHandle = unsafePerformIO $ newMVar =<< OutputHandle - <$> newMVar () - <*> newEmptyMVar + <$> newTMVarIO Nothing -- | Gets the global OutputHandle. getOutputHandle :: IO OutputHandle @@ -58,61 +63,69 @@ takeOutputLock = void $ takeOutputLock' True tryTakeOutputLock :: IO Bool tryTakeOutputLock = takeOutputLock' False -takeOutputLock' :: Bool -> IO Bool -takeOutputLock' block = do +withLock :: (TMVar (Maybe Locker) -> STM a) -> IO a +withLock a = do lck <- outputLock <$> getOutputHandle - go =<< tryTakeMVar lck + atomically (a lck) + +-- The lock TMVar is kept full normally, even if only with Nothing, +-- so if we take it here, that blocks anyone else from trying +-- to take the lock while we are checking it. +takeOutputLock' :: Bool -> IO Bool +takeOutputLock' block = go =<< withLock tryTakeTMVar where - -- lck was full, and we've emptied it, so we hold the lock now. - go (Just ()) = havelock - -- lck is empty, so someone else is holding the lock. - go Nothing = do - lcker <- outputLockedBy <$> getOutputHandle - v' <- tryTakeMVar lcker - case v' of - Just (ProcessLock h) -> - -- if process has exited, lock is stale - ifM (isJust <$> P.getProcessExitCode h) - ( havelock - , if block - then do - void $ P.waitForProcess h - havelock - else do - putMVar lcker (ProcessLock h) - return False - ) - Just GeneralLock -> do - putMVar lcker GeneralLock - whenblock waitlock - Nothing -> whenblock waitlock + go Nothing = whenblock waitlock + -- Something has the lock. It may be stale, so check it. + -- We must always be sure to fill the TMVar back with Just or Nothing. + go (Just orig) = case orig of + Nothing -> havelock + (Just (ProcessLock h _)) -> + -- when process has exited, lock is stale + ifM (isJust <$> P.getProcessExitCode h) + ( havelock + , if block + then do + hPutStr stderr "WAITFORPROCESS in lock" + hFlush stderr + void $ P.waitForProcess h + hPutStr stderr "WAITFORPROCESS in lock done" + hFlush stderr + havelock + else do + withLock (`putTMVar` orig) + return False + ) + (Just GeneralLock) -> do + withLock (`putTMVar` orig) + whenblock waitlock havelock = do - updateOutputLocker GeneralLock + withLock (`putTMVar` Just GeneralLock) return True - waitlock = do - -- Wait for current lock holder to relinquish - -- it and take the lock. - lck <- outputLock <$> getOutputHandle - takeMVar lck - havelock + + -- Wait for current lock holder (if any) to relinquish + -- it and take the lock for ourselves. + waitlock = withLock $ \l -> do + v <- tryTakeTMVar l + case v of + Just (Just _) -> retry + _ -> do + putTMVar l (Just GeneralLock) + return True + whenblock a = if block then a else return False -- | Only safe to call after taking the output lock. dropOutputLock :: IO () -dropOutputLock = do - lcker <- outputLockedBy <$> getOutputHandle - lck <- outputLock <$> getOutputHandle - void $ takeMVar lcker - putMVar lck () +dropOutputLock = withLock $ \l -> do + void $ takeTMVar l + putTMVar l Nothing -- | Only safe to call after takeOutputLock; updates the Locker. updateOutputLocker :: Locker -> IO () -updateOutputLocker l = do - lcker <- outputLockedBy <$> getOutputHandle - void $ tryTakeMVar lcker - putMVar lcker l - modifyMVar_ lcker (const $ return l) +updateOutputLocker locker = withLock $ \l -> do + void $ takeTMVar l + putTMVar l (Just locker) -- | Use this around any IO actions that use `outputConcurrent` -- or `createProcessConcurrent` @@ -124,7 +137,7 @@ withConcurrentOutput a = a `finally` drain where -- Just taking the output lock is enough to ensure that anything -- that was buffering output has had a chance to flush its buffer. - drain = lockOutput (return ()) + drain = lockOutput noop -- | Displays a string to stdout, and flush output so it's displayed. -- @@ -158,28 +171,25 @@ outputConcurrent s = do -- as the output lock becomes free. createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) createProcessConcurrent p - | willoutput (P.std_out p) || willoutput (P.std_err p) = + | willOutput (P.std_out p) || willOutput (P.std_err p) = ifM tryTakeOutputLock - ( do - hPutStrLn stderr "IS NOT CONCURRENT" - firstprocess - , do - hPutStrLn stderr "IS CONCURRENT" - concurrentprocess + ( firstprocess + , concurrentprocess ) | otherwise = P.createProcess p where - willoutput P.Inherit = True - willoutput _ = False + rediroutput ss h + | willOutput ss = P.UseHandle h + | otherwise = ss - rediroutput str h - | willoutput str = P.UseHandle h - | otherwise = str + cmd = case P.cmdspec p of + P.ShellCommand s -> s + P.RawCommand c ps -> unwords (c:ps) firstprocess = do r@(_, _, _, h) <- P.createProcess p `onException` dropOutputLock - updateOutputLocker (ProcessLock h) + updateOutputLocker (ProcessLock h cmd) -- Output lock is still held as we return; the process -- is running now, and once it exits the output lock will -- be stale and can then be taken by something else. @@ -196,8 +206,8 @@ createProcessConcurrent p hClose toouth hClose toerrh buf <- newMVar [] - void $ async $ outputDrainer fromouth stdout buf - void $ async $ outputDrainer fromerrh stderr buf + void $ async $ outputDrainer (P.std_out p) fromouth stdout buf + void $ async $ outputDrainer (P.std_err p) fromerrh stderr buf void $ async $ bufferWriter buf return r @@ -205,6 +215,10 @@ createProcessConcurrent p (from, to) <- createPipe (,) <$> fdToHandle to <*> fdToHandle from +willOutput :: P.StdStream -> Bool +willOutput P.Inherit = True +willOutput _ = False + type Buffer = [(Handle, BufferedActivity)] data BufferedActivity @@ -213,17 +227,22 @@ data BufferedActivity | InTempFile FilePath deriving (Eq) --- Drain output from the handle, and buffer it in memory. -outputDrainer :: Handle -> Handle -> MVar Buffer -> IO () -outputDrainer fromh toh buf = do - v <- tryIO $ B.hGetSome fromh 1024 - case v of - Right b | not (B.null b) -> do - modifyMVar_ buf $ addBuffer (toh, Output b) - outputDrainer fromh toh buf - _ -> do - modifyMVar_ buf $ pure . (++ [(toh, ReachedEnd)]) - hClose fromh +-- Drain output from the handle, and buffer it. +outputDrainer :: P.StdStream -> Handle -> Handle -> MVar Buffer -> IO () +outputDrainer ss fromh toh buf + | willOutput ss = go + | otherwise = atend + where + go = do + v <- tryIO $ B.hGetSome fromh 1024 + case v of + Right b | not (B.null b) -> do + modifyMVar_ buf $ addBuffer (toh, Output b) + go + _ -> atend + atend = do + modifyMVar_ buf $ pure . (++ [(toh, ReachedEnd)]) + hClose fromh -- Wait to lock output, and once we can, display everything -- that's put into buffer, until the end is signaled by Nothing @@ -262,8 +281,8 @@ addBuffer (toh, Output b) buf hClose h return ((toh, InTempFile tmp) : other) where - b' = B.concat (mapMaybe getOutput this) <> b - (this, other) = partition same buf + !b' = B.concat (mapMaybe getOutput this) <> b + !(this, other) = partition same buf same v = fst v == toh && case snd v of Output _ -> True _ -> False |
