From 894e2f7980052f1c331ba7780100ae0ad19856cb Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Tue, 27 Oct 2015 23:52:02 -0400 Subject: use execProcessConcurrent everywhere Found a reasonable clean way to make Utility.Process use execProcessConcurrent, while still allowing copying updates to it from git-annex. --- propellor.cabal | 2 ++ 1 file changed, 2 insertions(+) (limited to 'propellor.cabal') diff --git a/propellor.cabal b/propellor.cabal index 7a9d2b5d..63fcaaa5 100644 --- a/propellor.cabal +++ b/propellor.cabal @@ -135,6 +135,7 @@ Library Propellor.CmdLine Propellor.Info Propellor.Message + Propellor.Debug Propellor.PrivData Propellor.Engine Propellor.Exception @@ -175,6 +176,7 @@ Library Utility.PartialPrelude Utility.PosixFiles Utility.Process + Utility.Process.Shim Utility.SafeCommand Utility.Scheduled Utility.Table -- cgit v1.3-2-g0d8e From af68ec950b2480749182d0d6838e96fd02c2c285 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 28 Oct 2015 10:37:19 -0400 Subject: split out generic ConcurrentOutput module to Utility --- propellor.cabal | 1 + src/Propellor/Message.hs | 204 +----------------------------------- src/Utility/ConcurrentOutput.hs | 224 ++++++++++++++++++++++++++++++++++++++++ src/Utility/Process/Shim.hs | 2 +- 4 files changed, 229 insertions(+), 202 deletions(-) create mode 100644 src/Utility/ConcurrentOutput.hs (limited to 'propellor.cabal') diff --git a/propellor.cabal b/propellor.cabal index 63fcaaa5..20e82407 100644 --- a/propellor.cabal +++ b/propellor.cabal @@ -161,6 +161,7 @@ Library Propellor.Shim Propellor.Property.Chroot.Util Utility.Applicative + Utility.ConcurrentOutput Utility.Data Utility.DataUnits Utility.Directory diff --git a/src/Propellor/Message.hs b/src/Propellor/Message.hs index 3792129b..3b06770c 100644 --- a/src/Propellor/Message.hs +++ b/src/Propellor/Message.hs @@ -1,5 +1,3 @@ -{-# LANGUAGE PackageImports #-} - -- | This module handles all display of output to the console when -- propellor is ensuring Properties. -- @@ -22,117 +20,34 @@ module Propellor.Message ( import System.Console.ANSI import System.IO -import System.Posix.IO -import "mtl" Control.Monad.Reader +import Control.Monad +import Control.Monad.IO.Class (liftIO, MonadIO) import Control.Applicative import Control.Monad.IfElse import System.IO.Unsafe (unsafePerformIO) import Control.Concurrent -import Control.Concurrent.Async -import Data.Maybe -import Data.Char -import Data.List -import Data.Monoid -import qualified Data.ByteString as B -import qualified System.Process as P import Propellor.Types +import Utility.ConcurrentOutput import Utility.PartialPrelude import Utility.Monad import Utility.Exception data MessageHandle = MessageHandle { isConsole :: Bool - , outputLock :: MVar () -- ^ empty when locked - , outputLockedBy :: MVar Locker } -data Locker - = GeneralLock - | ProcessLock P.ProcessHandle - -- | A shared global variable for the MessageHandle. {-# NOINLINE globalMessageHandle #-} globalMessageHandle :: MVar MessageHandle globalMessageHandle = unsafePerformIO $ newMVar =<< MessageHandle <$> hIsTerminalDevice stdout - <*> newMVar () - <*> newEmptyMVar -- | Gets the global MessageHandle. getMessageHandle :: IO MessageHandle getMessageHandle = readMVar globalMessageHandle --- | Takes a lock while performing an action. Any other threads --- that try to lockOutput at the same time will block. -lockOutput :: (MonadIO m, MonadMask m) => m a -> m a -lockOutput = bracket_ (liftIO takeOutputLock) (liftIO dropOutputLock) - --- | Blocks until we have the output lock. -takeOutputLock :: IO () -takeOutputLock = void $ takeOutputLock' True - --- | Tries to take the output lock, without blocking. -tryTakeOutputLock :: IO Bool -tryTakeOutputLock = takeOutputLock' False - -takeOutputLock' :: Bool -> IO Bool -takeOutputLock' block = do - lck <- outputLock <$> getMessageHandle - go =<< tryTakeMVar lck - where - -- lck was full, and we've emptied it, so we hold the lock now. - go (Just ()) = havelock - -- lck is empty, so someone else is holding the lock. - go Nothing = do - lcker <- outputLockedBy <$> getMessageHandle - v' <- tryTakeMVar lcker - case v' of - Just (ProcessLock h) -> - -- if process has exited, lock is stale - ifM (isJust <$> P.getProcessExitCode h) - ( havelock - , if block - then do - void $ P.waitForProcess h - havelock - else do - putMVar lcker (ProcessLock h) - return False - ) - Just GeneralLock -> do - putMVar lcker GeneralLock - whenblock waitlock - Nothing -> whenblock waitlock - - havelock = do - updateOutputLocker GeneralLock - return True - waitlock = do - -- Wait for current lock holder to relinquish - -- it and take the lock. - lck <- outputLock <$> getMessageHandle - takeMVar lck - havelock - whenblock a = if block then a else return False - --- | Only safe to call after taking the output lock. -dropOutputLock :: IO () -dropOutputLock = do - lcker <- outputLockedBy <$> getMessageHandle - lck <- outputLock <$> getMessageHandle - void $ takeMVar lcker - putMVar lck () - --- | Only safe to call after takeOutputLock; updates the Locker. -updateOutputLocker :: Locker -> IO () -updateOutputLocker l = do - lcker <- outputLockedBy <$> getMessageHandle - void $ tryTakeMVar lcker - putMVar lcker l - modifyMVar_ lcker (const $ return l) - -- | Force console output. This can be used when stdout is not directly -- connected to a console, but is eventually going to be displayed at a -- console. @@ -237,116 +152,3 @@ messagesDone = lockOutput $ do whenConsole $ setTitle "propellor: done" hFlush stdout - --- | Wrapper around `System.Process.createProcess` that prevents --- multiple processes that are running concurrently from writing --- to stdout/stderr at the same time. --- --- The first process is allowed to write to --- stdout and stderr in the usual way. --- --- However, if another process runs concurrently with the --- first, any stdout or stderr that would have been displayed by it is --- instead buffered. The buffered output will be displayed the next time it --- is safe to do so (ie, after the first process exits). --- --- Also does debug logging of all commands run. --- --- Unless you manually import System.Process, every part of propellor --- that runs a process uses this. -createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) -createProcessConcurrent p - | hasoutput (P.std_out p) || hasoutput (P.std_err p) = - ifM tryTakeOutputLock - ( firstprocess - , concurrentprocess - ) - | otherwise = P.createProcess p - where - hasoutput P.Inherit = True - hasoutput _ = False - - firstprocess = do - r@(_, _, _, h) <- P.createProcess p - `onException` dropOutputLock - updateOutputLocker (ProcessLock h) - -- Output lock is still held as we return; the process - -- is running now, and once it exits the output lock will - -- be stale and can then be taken by something else. - return r - - concurrentprocess = do - (toouth, fromouth) <- pipe - (toerrh, fromerrh) <- pipe - let p' = p - { P.std_out = if hasoutput (P.std_out p) - then P.UseHandle toouth - else P.std_out p - , P.std_err = if hasoutput (P.std_err p) - then P.UseHandle toerrh - else P.std_err p - } - r <- P.createProcess p' - hClose toouth - hClose toerrh - buf <- newMVar [] - void $ async $ outputDrainer fromouth stdout buf - void $ async $ outputDrainer fromerrh stderr buf - void $ async $ bufferWriter buf - return r - - pipe = do - (from, to) <- createPipe - (,) <$> fdToHandle to <*> fdToHandle from - -type Buffer = [(Handle, Maybe B.ByteString)] - --- Drain output from the handle, and buffer it in memory. -outputDrainer :: Handle -> Handle -> MVar Buffer -> IO () -outputDrainer fromh toh buf = do - v <- tryIO $ B.hGetSome fromh 1024 - case v of - Right b | not (B.null b) -> do - modifyMVar_ buf (pure . addBuffer (toh, Just b)) - outputDrainer fromh toh buf - _ -> do - modifyMVar_ buf (pure . (++ [(toh, Nothing)])) - hClose fromh - --- Wait to lock output, and once we can, display everything --- that's put into buffer, until the end is signaled by Nothing --- for both stdout and stderr. -bufferWriter :: MVar Buffer -> IO () -bufferWriter buf = lockOutput (go [stdout, stderr]) - where - go [] = return () - go hs = do - l <- takeMVar buf - forM_ l $ \(h, mb) -> do - maybe noop (B.hPut h) mb - hFlush h - let hs' = filter (\h -> not (any (== (h, Nothing)) l)) hs - putMVar buf [] - go hs' - --- The buffer can grow up to 1 mb in size, but after that point, --- it's truncated to avoid propellor using unbounded memory --- when a process outputs a whole lot of stuff. -bufsz :: Int -bufsz = 1000000 - -addBuffer :: (Handle, Maybe B.ByteString) -> Buffer -> Buffer -addBuffer v@(_, Nothing) buf = buf ++ [v] -addBuffer (toh, Just b) buf = (toh, Just b') : other - where - (this, other) = partition (\v -> fst v == toh && isJust (snd v)) buf - b' = truncateBuffer $ B.concat (mapMaybe snd this) <> b - --- Truncate a buffer by removing lines from the front until it's --- small enough. -truncateBuffer :: B.ByteString -> B.ByteString -truncateBuffer b - | B.length b <= bufsz = b - | otherwise = truncateBuffer $ snd $ B.breakByte nl b - where - nl = fromIntegral (ord '\n') diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs new file mode 100644 index 00000000..cf1d166e --- /dev/null +++ b/src/Utility/ConcurrentOutput.hs @@ -0,0 +1,224 @@ +-- | Concurrent output handling. +-- +-- When two threads both try to display a message concurrently, +-- the messages will be displayed sequentially. + +module Utility.ConcurrentOutput ( + lockOutput, + createProcessConcurrent, +) where + +import System.IO +import System.Posix.IO +import Control.Monad +import Control.Monad.IO.Class (liftIO, MonadIO) +import Control.Applicative +import System.IO.Unsafe (unsafePerformIO) +import Control.Concurrent +import Control.Concurrent.Async +import Data.Maybe +import Data.Char +import Data.List +import Data.Monoid +import qualified Data.ByteString as B +import qualified System.Process as P + +import Utility.Monad +import Utility.Exception + +data OutputHandle = OutputHandle + { outputLock :: MVar () -- ^ empty when locked + , outputLockedBy :: MVar Locker + } + +data Locker + = GeneralLock + | ProcessLock P.ProcessHandle + +-- | A shared global variable for the OutputHandle. +{-# NOINLINE globalOutputHandle #-} +globalOutputHandle :: MVar OutputHandle +globalOutputHandle = unsafePerformIO $ + newMVar =<< OutputHandle + <$> newMVar () + <*> newEmptyMVar + +-- | Gets the global OutputHandle. +getOutputHandle :: IO OutputHandle +getOutputHandle = readMVar globalOutputHandle + +-- | Holds a lock while performing an action. Any other threads +-- that try to lockOutput at the same time will block. +lockOutput :: (MonadIO m, MonadMask m) => m a -> m a +lockOutput = bracket_ (liftIO takeOutputLock) (liftIO dropOutputLock) + +-- | Blocks until we have the output lock. +takeOutputLock :: IO () +takeOutputLock = void $ takeOutputLock' True + +-- | Tries to take the output lock, without blocking. +tryTakeOutputLock :: IO Bool +tryTakeOutputLock = takeOutputLock' False + +takeOutputLock' :: Bool -> IO Bool +takeOutputLock' block = do + lck <- outputLock <$> getOutputHandle + go =<< tryTakeMVar lck + where + -- lck was full, and we've emptied it, so we hold the lock now. + go (Just ()) = havelock + -- lck is empty, so someone else is holding the lock. + go Nothing = do + lcker <- outputLockedBy <$> getOutputHandle + v' <- tryTakeMVar lcker + case v' of + Just (ProcessLock h) -> + -- if process has exited, lock is stale + ifM (isJust <$> P.getProcessExitCode h) + ( havelock + , if block + then do + void $ P.waitForProcess h + havelock + else do + putMVar lcker (ProcessLock h) + return False + ) + Just GeneralLock -> do + putMVar lcker GeneralLock + whenblock waitlock + Nothing -> whenblock waitlock + + havelock = do + updateOutputLocker GeneralLock + return True + waitlock = do + -- Wait for current lock holder to relinquish + -- it and take the lock. + lck <- outputLock <$> getOutputHandle + takeMVar lck + havelock + whenblock a = if block then a else return False + +-- | Only safe to call after taking the output lock. +dropOutputLock :: IO () +dropOutputLock = do + lcker <- outputLockedBy <$> getOutputHandle + lck <- outputLock <$> getOutputHandle + void $ takeMVar lcker + putMVar lck () + +-- | Only safe to call after takeOutputLock; updates the Locker. +updateOutputLocker :: Locker -> IO () +updateOutputLocker l = do + lcker <- outputLockedBy <$> getOutputHandle + void $ tryTakeMVar lcker + putMVar lcker l + modifyMVar_ lcker (const $ return l) + +-- | Wrapper around `System.Process.createProcess` that prevents +-- multiple processes that are running concurrently from writing +-- to stdout/stderr at the same time. +-- +-- The first process is allowed to write to stdout and stderr in the usual way. +-- +-- However, if another process runs concurrently with the +-- first, any stdout or stderr that would have been displayed by it is +-- instead buffered. The buffered output will be displayed the next time it +-- is safe to do so (ie, after the first process exits). +createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) +createProcessConcurrent p + | hasoutput (P.std_out p) || hasoutput (P.std_err p) = + ifM tryTakeOutputLock + ( firstprocess + , concurrentprocess + ) + | otherwise = P.createProcess p + where + hasoutput P.Inherit = True + hasoutput _ = False + + firstprocess = do + r@(_, _, _, h) <- P.createProcess p + `onException` dropOutputLock + updateOutputLocker (ProcessLock h) + -- Output lock is still held as we return; the process + -- is running now, and once it exits the output lock will + -- be stale and can then be taken by something else. + return r + + concurrentprocess = do + (toouth, fromouth) <- pipe + (toerrh, fromerrh) <- pipe + let p' = p + { P.std_out = if hasoutput (P.std_out p) + then P.UseHandle toouth + else P.std_out p + , P.std_err = if hasoutput (P.std_err p) + then P.UseHandle toerrh + else P.std_err p + } + r <- P.createProcess p' + hClose toouth + hClose toerrh + buf <- newMVar [] + void $ async $ outputDrainer fromouth stdout buf + void $ async $ outputDrainer fromerrh stderr buf + void $ async $ bufferWriter buf + return r + + pipe = do + (from, to) <- createPipe + (,) <$> fdToHandle to <*> fdToHandle from + +type Buffer = [(Handle, Maybe B.ByteString)] + +-- Drain output from the handle, and buffer it in memory. +outputDrainer :: Handle -> Handle -> MVar Buffer -> IO () +outputDrainer fromh toh buf = do + v <- tryIO $ B.hGetSome fromh 1024 + case v of + Right b | not (B.null b) -> do + modifyMVar_ buf (pure . addBuffer (toh, Just b)) + outputDrainer fromh toh buf + _ -> do + modifyMVar_ buf (pure . (++ [(toh, Nothing)])) + hClose fromh + +-- Wait to lock output, and once we can, display everything +-- that's put into buffer, until the end is signaled by Nothing +-- for both stdout and stderr. +bufferWriter :: MVar Buffer -> IO () +bufferWriter buf = lockOutput (go [stdout, stderr]) + where + go [] = return () + go hs = do + l <- takeMVar buf + forM_ l $ \(h, mb) -> do + maybe noop (B.hPut h) mb + hFlush h + let hs' = filter (\h -> not (any (== (h, Nothing)) l)) hs + putMVar buf [] + go hs' + +-- The buffer can grow up to 1 mb in size, but after that point, +-- it's truncated to avoid propellor using unbounded memory +-- when a process outputs a whole lot of stuff. +bufsz :: Int +bufsz = 1000000 + +addBuffer :: (Handle, Maybe B.ByteString) -> Buffer -> Buffer +addBuffer v@(_, Nothing) buf = buf ++ [v] +addBuffer (toh, Just b) buf = (toh, Just b') : other + where + (this, other) = partition (\v -> fst v == toh && isJust (snd v)) buf + b' = truncateBuffer $ B.concat (mapMaybe snd this) <> b + +-- Truncate a buffer by removing lines from the front until it's +-- small enough. +truncateBuffer :: B.ByteString -> B.ByteString +truncateBuffer b + | B.length b <= bufsz = b + | otherwise = truncateBuffer $ snd $ B.breakByte nl b + where + nl = fromIntegral (ord '\n') diff --git a/src/Utility/Process/Shim.hs b/src/Utility/Process/Shim.hs index 0da93bf7..202b7c32 100644 --- a/src/Utility/Process/Shim.hs +++ b/src/Utility/Process/Shim.hs @@ -1,7 +1,7 @@ module Utility.Process.Shim (module X, createProcess) where import System.Process as X hiding (createProcess) -import Propellor.Message (createProcessConcurrent) +import Utility.ConcurrentOutput (createProcessConcurrent) import System.IO createProcess :: CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -- cgit v1.3-2-g0d8e From 111ea88d4d7c54e9ab7950962ad22528d54dd959 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 28 Oct 2015 14:46:17 -0400 Subject: fix bad MVar use, use STM I had 2 MVars both involved in the same lock, and it seemed intractable to avoid deadlocks with them. STM makes it easy. At this point, the concurrent process stuff seems to work pretty well, but I'm not 100% sure it's not got some bugs. --- debian/changelog | 1 + debian/control | 2 + propellor.cabal | 6 +- src/Propellor/Bootstrap.hs | 3 +- src/Utility/ConcurrentOutput.hs | 173 ++++++++++++++++++++++------------------ 5 files changed, 104 insertions(+), 81 deletions(-) (limited to 'propellor.cabal') diff --git a/debian/changelog b/debian/changelog index 6c154e1a..f3522b7c 100644 --- a/debian/changelog +++ b/debian/changelog @@ -19,6 +19,7 @@ propellor (2.13.0) UNRELEASED; urgency=medium actions are combined (API change). * Added Propellor.Property.Concurrent for concurrent properties. * execProcess and everything built on it is now concurrent output safe. + * Propellor now depends on stm. * Add File.isCopyOf. Thanks, Per Olofsson. -- Joey Hess Sat, 24 Oct 2015 15:16:45 -0400 diff --git a/debian/control b/debian/control index 7f42c916..2956fdaa 100644 --- a/debian/control +++ b/debian/control @@ -17,6 +17,7 @@ Build-Depends: libghc-mtl-dev, libghc-transformers-dev, libghc-exceptions-dev (>= 0.6), + libghc-stm-dev, Maintainer: Gergely Nagy Standards-Version: 3.9.6 Vcs-Git: git://git.joeyh.name/propellor @@ -39,6 +40,7 @@ Depends: ${misc:Depends}, ${shlibs:Depends}, libghc-mtl-dev, libghc-transformers-dev, libghc-exceptions-dev (>= 0.6), + libghc-stm-dev, git, make, Description: property-based host configuration management in haskell diff --git a/propellor.cabal b/propellor.cabal index 20e82407..da43775f 100644 --- a/propellor.cabal +++ b/propellor.cabal @@ -39,7 +39,7 @@ Executable propellor Build-Depends: MissingH, directory, filepath, base >= 4.5, base < 5, IfElse, process, bytestring, hslogger, unix-compat, ansi-terminal, containers (>= 0.5), network, async, time, QuickCheck, mtl, transformers, - exceptions (>= 0.6) + exceptions (>= 0.6), stm if (! os(windows)) Build-Depends: unix @@ -51,7 +51,7 @@ Executable propellor-config Build-Depends: MissingH, directory, filepath, base >= 4.5, base < 5, IfElse, process, bytestring, hslogger, unix-compat, ansi-terminal, containers (>= 0.5), network, async, time, QuickCheck, mtl, transformers, - exceptions + exceptions, stm if (! os(windows)) Build-Depends: unix @@ -62,7 +62,7 @@ Library Build-Depends: MissingH, directory, filepath, base >= 4.5, base < 5, IfElse, process, bytestring, hslogger, unix-compat, ansi-terminal, containers (>= 0.5), network, async, time, QuickCheck, mtl, transformers, - exceptions + exceptions, stm if (! os(windows)) Build-Depends: unix diff --git a/src/Propellor/Bootstrap.hs b/src/Propellor/Bootstrap.hs index 6a5d5acb..2318b910 100644 --- a/src/Propellor/Bootstrap.hs +++ b/src/Propellor/Bootstrap.hs @@ -65,7 +65,7 @@ depsCommand = "( " ++ intercalate " ; " (concat [osinstall, cabalinstall]) ++ " aptinstall p = "apt-get --no-upgrade --no-install-recommends -y install " ++ p - -- This is the same build deps listed in debian/control. + -- This is the same deps listed in debian/control. debdeps = [ "gnupg" , "ghc" @@ -81,6 +81,7 @@ depsCommand = "( " ++ intercalate " ; " (concat [osinstall, cabalinstall]) ++ " , "libghc-mtl-dev" , "libghc-transformers-dev" , "libghc-exceptions-dev" + , "libghc-stm-dev" , "make" ] diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs index c6550b84..5535066f 100644 --- a/src/Utility/ConcurrentOutput.hs +++ b/src/Utility/ConcurrentOutput.hs @@ -1,3 +1,5 @@ +{-# LANGUAGE BangPatterns #-} + -- | Concurrent output handling. module Utility.ConcurrentOutput ( @@ -14,6 +16,7 @@ import Control.Monad.IO.Class (liftIO, MonadIO) import Control.Applicative import System.IO.Unsafe (unsafePerformIO) import Control.Concurrent +import Control.Concurrent.STM import Control.Concurrent.Async import Data.Maybe import Data.List @@ -25,21 +28,23 @@ import Utility.Monad import Utility.Exception data OutputHandle = OutputHandle - { outputLock :: MVar () -- ^ empty when locked - , outputLockedBy :: MVar Locker + { outputLock :: TMVar (Maybe Locker) } data Locker = GeneralLock - | ProcessLock P.ProcessHandle + | ProcessLock P.ProcessHandle String + +instance Show Locker where + show GeneralLock = "GeneralLock" + show (ProcessLock _ cmd) = "ProcessLock " ++ cmd -- | A shared global variable for the OutputHandle. {-# NOINLINE globalOutputHandle #-} globalOutputHandle :: MVar OutputHandle globalOutputHandle = unsafePerformIO $ newMVar =<< OutputHandle - <$> newMVar () - <*> newEmptyMVar + <$> newTMVarIO Nothing -- | Gets the global OutputHandle. getOutputHandle :: IO OutputHandle @@ -58,61 +63,69 @@ takeOutputLock = void $ takeOutputLock' True tryTakeOutputLock :: IO Bool tryTakeOutputLock = takeOutputLock' False -takeOutputLock' :: Bool -> IO Bool -takeOutputLock' block = do +withLock :: (TMVar (Maybe Locker) -> STM a) -> IO a +withLock a = do lck <- outputLock <$> getOutputHandle - go =<< tryTakeMVar lck + atomically (a lck) + +-- The lock TMVar is kept full normally, even if only with Nothing, +-- so if we take it here, that blocks anyone else from trying +-- to take the lock while we are checking it. +takeOutputLock' :: Bool -> IO Bool +takeOutputLock' block = go =<< withLock tryTakeTMVar where - -- lck was full, and we've emptied it, so we hold the lock now. - go (Just ()) = havelock - -- lck is empty, so someone else is holding the lock. - go Nothing = do - lcker <- outputLockedBy <$> getOutputHandle - v' <- tryTakeMVar lcker - case v' of - Just (ProcessLock h) -> - -- if process has exited, lock is stale - ifM (isJust <$> P.getProcessExitCode h) - ( havelock - , if block - then do - void $ P.waitForProcess h - havelock - else do - putMVar lcker (ProcessLock h) - return False - ) - Just GeneralLock -> do - putMVar lcker GeneralLock - whenblock waitlock - Nothing -> whenblock waitlock + go Nothing = whenblock waitlock + -- Something has the lock. It may be stale, so check it. + -- We must always be sure to fill the TMVar back with Just or Nothing. + go (Just orig) = case orig of + Nothing -> havelock + (Just (ProcessLock h _)) -> + -- when process has exited, lock is stale + ifM (isJust <$> P.getProcessExitCode h) + ( havelock + , if block + then do + hPutStr stderr "WAITFORPROCESS in lock" + hFlush stderr + void $ P.waitForProcess h + hPutStr stderr "WAITFORPROCESS in lock done" + hFlush stderr + havelock + else do + withLock (`putTMVar` orig) + return False + ) + (Just GeneralLock) -> do + withLock (`putTMVar` orig) + whenblock waitlock havelock = do - updateOutputLocker GeneralLock + withLock (`putTMVar` Just GeneralLock) return True - waitlock = do - -- Wait for current lock holder to relinquish - -- it and take the lock. - lck <- outputLock <$> getOutputHandle - takeMVar lck - havelock + + -- Wait for current lock holder (if any) to relinquish + -- it and take the lock for ourselves. + waitlock = withLock $ \l -> do + v <- tryTakeTMVar l + case v of + Just (Just _) -> retry + _ -> do + putTMVar l (Just GeneralLock) + return True + whenblock a = if block then a else return False -- | Only safe to call after taking the output lock. dropOutputLock :: IO () -dropOutputLock = do - lcker <- outputLockedBy <$> getOutputHandle - lck <- outputLock <$> getOutputHandle - void $ takeMVar lcker - putMVar lck () +dropOutputLock = withLock $ \l -> do + void $ takeTMVar l + putTMVar l Nothing -- | Only safe to call after takeOutputLock; updates the Locker. updateOutputLocker :: Locker -> IO () -updateOutputLocker l = do - lcker <- outputLockedBy <$> getOutputHandle - void $ tryTakeMVar lcker - putMVar lcker l - modifyMVar_ lcker (const $ return l) +updateOutputLocker locker = withLock $ \l -> do + void $ takeTMVar l + putTMVar l (Just locker) -- | Use this around any IO actions that use `outputConcurrent` -- or `createProcessConcurrent` @@ -124,7 +137,7 @@ withConcurrentOutput a = a `finally` drain where -- Just taking the output lock is enough to ensure that anything -- that was buffering output has had a chance to flush its buffer. - drain = lockOutput (return ()) + drain = lockOutput noop -- | Displays a string to stdout, and flush output so it's displayed. -- @@ -158,28 +171,25 @@ outputConcurrent s = do -- as the output lock becomes free. createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) createProcessConcurrent p - | willoutput (P.std_out p) || willoutput (P.std_err p) = + | willOutput (P.std_out p) || willOutput (P.std_err p) = ifM tryTakeOutputLock - ( do - hPutStrLn stderr "IS NOT CONCURRENT" - firstprocess - , do - hPutStrLn stderr "IS CONCURRENT" - concurrentprocess + ( firstprocess + , concurrentprocess ) | otherwise = P.createProcess p where - willoutput P.Inherit = True - willoutput _ = False + rediroutput ss h + | willOutput ss = P.UseHandle h + | otherwise = ss - rediroutput str h - | willoutput str = P.UseHandle h - | otherwise = str + cmd = case P.cmdspec p of + P.ShellCommand s -> s + P.RawCommand c ps -> unwords (c:ps) firstprocess = do r@(_, _, _, h) <- P.createProcess p `onException` dropOutputLock - updateOutputLocker (ProcessLock h) + updateOutputLocker (ProcessLock h cmd) -- Output lock is still held as we return; the process -- is running now, and once it exits the output lock will -- be stale and can then be taken by something else. @@ -196,8 +206,8 @@ createProcessConcurrent p hClose toouth hClose toerrh buf <- newMVar [] - void $ async $ outputDrainer fromouth stdout buf - void $ async $ outputDrainer fromerrh stderr buf + void $ async $ outputDrainer (P.std_out p) fromouth stdout buf + void $ async $ outputDrainer (P.std_err p) fromerrh stderr buf void $ async $ bufferWriter buf return r @@ -205,6 +215,10 @@ createProcessConcurrent p (from, to) <- createPipe (,) <$> fdToHandle to <*> fdToHandle from +willOutput :: P.StdStream -> Bool +willOutput P.Inherit = True +willOutput _ = False + type Buffer = [(Handle, BufferedActivity)] data BufferedActivity @@ -213,17 +227,22 @@ data BufferedActivity | InTempFile FilePath deriving (Eq) --- Drain output from the handle, and buffer it in memory. -outputDrainer :: Handle -> Handle -> MVar Buffer -> IO () -outputDrainer fromh toh buf = do - v <- tryIO $ B.hGetSome fromh 1024 - case v of - Right b | not (B.null b) -> do - modifyMVar_ buf $ addBuffer (toh, Output b) - outputDrainer fromh toh buf - _ -> do - modifyMVar_ buf $ pure . (++ [(toh, ReachedEnd)]) - hClose fromh +-- Drain output from the handle, and buffer it. +outputDrainer :: P.StdStream -> Handle -> Handle -> MVar Buffer -> IO () +outputDrainer ss fromh toh buf + | willOutput ss = go + | otherwise = atend + where + go = do + v <- tryIO $ B.hGetSome fromh 1024 + case v of + Right b | not (B.null b) -> do + modifyMVar_ buf $ addBuffer (toh, Output b) + go + _ -> atend + atend = do + modifyMVar_ buf $ pure . (++ [(toh, ReachedEnd)]) + hClose fromh -- Wait to lock output, and once we can, display everything -- that's put into buffer, until the end is signaled by Nothing @@ -262,8 +281,8 @@ addBuffer (toh, Output b) buf hClose h return ((toh, InTempFile tmp) : other) where - b' = B.concat (mapMaybe getOutput this) <> b - (this, other) = partition same buf + !b' = B.concat (mapMaybe getOutput this) <> b + !(this, other) = partition same buf same v = fst v == toh && case snd v of Output _ -> True _ -> False -- cgit v1.3-2-g0d8e From 5cde1ed21cc912db0b53846196f920fe52835dbc Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 28 Oct 2015 17:03:17 -0400 Subject: fix memory leak, and optimise when command output is very large --- debian/changelog | 3 ++ propellor.cabal | 6 +-- src/Utility/ConcurrentOutput.hs | 113 +++++++++++++++++++++++----------------- 3 files changed, 72 insertions(+), 50 deletions(-) (limited to 'propellor.cabal') diff --git a/debian/changelog b/debian/changelog index 8c4715f5..c5538c7f 100644 --- a/debian/changelog +++ b/debian/changelog @@ -21,6 +21,9 @@ propellor (2.13.0) UNRELEASED; urgency=medium * Made the execProcess exported by propellor, and everything built on it, avoid scrambled output when run concurrently. * Propellor now depends on STM. + * The cabal file now builds propellor with -O. While -O0 makes ghc + take less memory while building propellor, it can lead to bad memory + usage at runtime due to eg, disabled stream fusion. * Add File.isCopyOf. Thanks, Per Olofsson. -- Joey Hess Sat, 24 Oct 2015 15:16:45 -0400 diff --git a/propellor.cabal b/propellor.cabal index da43775f..a07109a7 100644 --- a/propellor.cabal +++ b/propellor.cabal @@ -34,7 +34,7 @@ Description: Executable propellor Main-Is: wrapper.hs - GHC-Options: -threaded -O0 -Wall -fno-warn-tabs + GHC-Options: -threaded -O -Wall -fno-warn-tabs Hs-Source-Dirs: src Build-Depends: MissingH, directory, filepath, base >= 4.5, base < 5, IfElse, process, bytestring, hslogger, unix-compat, ansi-terminal, @@ -46,7 +46,7 @@ Executable propellor Executable propellor-config Main-Is: config.hs - GHC-Options: -threaded -O0 -Wall -fno-warn-tabs + GHC-Options: -threaded -O -Wall -fno-warn-tabs Hs-Source-Dirs: src Build-Depends: MissingH, directory, filepath, base >= 4.5, base < 5, IfElse, process, bytestring, hslogger, unix-compat, ansi-terminal, @@ -57,7 +57,7 @@ Executable propellor-config Build-Depends: unix Library - GHC-Options: -O0 -Wall -fno-warn-tabs + GHC-Options: -O -Wall -fno-warn-tabs Hs-Source-Dirs: src Build-Depends: MissingH, directory, filepath, base >= 4.5, base < 5, IfElse, process, bytestring, hslogger, unix-compat, ansi-terminal, diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs index 5bf973de..be1562ac 100644 --- a/src/Utility/ConcurrentOutput.hs +++ b/src/Utility/ConcurrentOutput.hs @@ -1,8 +1,11 @@ {-# LANGUAGE BangPatterns #-} +{-# OPTIONS_GHC -fno-warn-tabs #-} -- | Concurrent output handling. module Utility.ConcurrentOutput ( + takeOutputLock, + dropOutputLock, withConcurrentOutput, outputConcurrent, createProcessConcurrent, @@ -146,6 +149,20 @@ outputConcurrent s = do hFlush stdout -- TODO +-- | This must be used to wait for processes started with +-- `createProcessConcurrent`. +-- +-- This is necessary because `System.Process.waitForProcess` has a +-- race condition when two threads check the same process. If the race +-- is triggered, one thread will successfully wait, but the other +-- throws a DoesNotExist exception. +waitForProcessConcurrent :: P.ProcessHandle -> IO ExitCode +waitForProcessConcurrent h = do + v <- tryWhenExists (P.waitForProcess h) + case v of + Just r -> return r + Nothing -> maybe (waitForProcessConcurrent h) return =<< P.getProcessExitCode h + -- | Wrapper around `System.Process.createProcess` that prevents -- multiple processes that are running concurrently from writing -- to stdout/stderr at the same time. @@ -196,37 +213,20 @@ createProcessConcurrent p , P.std_err = rediroutput (P.std_err p) toerrh } r <- P.createProcess p' - hClose toouth - hClose toerrh - buf <- newMVar [] - void $ async $ outputDrainer (P.std_out p) fromouth stdout buf - void $ async $ outputDrainer (P.std_err p) fromerrh stderr buf - void $ async $ bufferWriter buf + outbuf <- setupBuffer stdout toouth (P.std_out p) fromouth + errbuf <- setupBuffer stderr toerrh (P.std_err p) fromerrh + void $ async $ bufferWriter [outbuf, errbuf] return r pipe = do (from, to) <- createPipe (,) <$> fdToHandle to <*> fdToHandle from --- | This must be used to wait for processes started with --- `createProcessConcurrent`. --- --- This is necessary because `System.Process.waitForProcess` has a --- race condition when two threads check the same process. If the race --- is triggered, one thread will successfully wait, but the other --- throws a DoesNotExist exception. -waitForProcessConcurrent :: P.ProcessHandle -> IO ExitCode -waitForProcessConcurrent h = do - v <- tryWhenExists (P.waitForProcess h) - case v of - Just r -> return r - Nothing -> maybe (waitForProcessConcurrent h) return =<< P.getProcessExitCode h - willOutput :: P.StdStream -> Bool willOutput P.Inherit = True willOutput _ = False -type Buffer = [(Handle, BufferedActivity)] +type Buffer = [BufferedActivity] data BufferedActivity = ReachedEnd @@ -234,43 +234,62 @@ data BufferedActivity | InTempFile FilePath deriving (Eq) +instance Show BufferedActivity where + show ReachedEnd = "ReachedEnd" + show (Output b) = "Output " ++ show (B.length b) + show (InTempFile t) = "InTempFile " ++ t + +setupBuffer :: Handle -> Handle -> P.StdStream -> Handle -> IO (Handle, MVar Buffer, TMVar ()) +setupBuffer h toh ss fromh = do + hClose toh + buf <- newMVar [] + bufsig <- atomically newEmptyTMVar + void $ async $ outputDrainer ss fromh buf bufsig + return (h, buf, bufsig) + -- Drain output from the handle, and buffer it. -outputDrainer :: P.StdStream -> Handle -> Handle -> MVar Buffer -> IO () -outputDrainer ss fromh toh buf +outputDrainer :: P.StdStream -> Handle -> MVar Buffer -> TMVar () -> IO () +outputDrainer ss fromh buf bufsig | willOutput ss = go | otherwise = atend where go = do - v <- tryIO $ B.hGetSome fromh 1024 + v <- tryIO $ B.hGetSome fromh 1048576 case v of Right b | not (B.null b) -> do - modifyMVar_ buf $ addBuffer (toh, Output b) + modifyMVar_ buf $ addBuffer (Output b) + changed go _ -> atend atend = do - modifyMVar_ buf $ pure . ((toh, ReachedEnd) :) + modifyMVar_ buf $ pure . (ReachedEnd :) + changed hClose fromh + changed = atomically $ do + void $ tryTakeTMVar bufsig + putTMVar bufsig () -- Wait to lock output, and once we can, display everything --- that's put into buffer, until the end is signaled by Nothing --- for both stdout and stderr. -bufferWriter :: MVar Buffer -> IO () -bufferWriter buf = lockOutput (go [stdout, stderr]) +-- that's put into the buffers. +bufferWriter :: [(Handle, MVar Buffer, TMVar ())] -> IO () +bufferWriter = void . lockOutput . mapConcurrently go where - go [] = return () - go hs = do + go v@(outh, buf, bufsig) = do + atomically $ takeTMVar bufsig l <- takeMVar buf - forM_ (reverse l) $ \(h, ba) -> case ba of + putMVar buf [] + forM_ (reverse l) $ \ba -> case ba of Output b -> do - B.hPut h b - hFlush h + B.hPut outh b + hFlush outh + return () InTempFile tmp -> do - B.hPut h =<< B.readFile tmp + B.hPut outh =<< B.readFile tmp void $ tryWhenExists $ removeFile tmp ReachedEnd -> return () - let hs' = filter (\h -> not (any (== (h, ReachedEnd)) l)) hs - putMVar buf [] - go hs' + if any (== ReachedEnd) l + then return () + else go v -- Adds a value to the Buffer. When adding Output to a Handle, it's cheaper -- to combine it with any already buffered Output to that same Handle. @@ -278,22 +297,22 @@ bufferWriter buf = lockOutput (go [stdout, stderr]) -- When the total buffered Output exceeds 1 mb in size, it's moved out of -- memory, to a temp file. This should only happen rarely, but is done to -- avoid some verbose process unexpectedly causing excessive memory use. -addBuffer :: (Handle, BufferedActivity) -> Buffer -> IO Buffer -addBuffer (toh, Output b) buf - | B.length b' <= 1000000 = return ((toh, Output b') : other) +addBuffer :: BufferedActivity -> Buffer -> IO Buffer +addBuffer (Output b) buf + | B.length b' <= 1048576 = return (Output b' : other) | otherwise = do tmpdir <- getTemporaryDirectory (tmp, h) <- openTempFile tmpdir "output.tmp" B.hPut h b' hClose h - return ((toh, InTempFile tmp) : other) + return (InTempFile tmp : other) where !b' = B.concat (mapMaybe getOutput this) <> b - !(this, other) = partition same buf - same v = fst v == toh && case snd v of + !(this, other) = partition isOutput buf + isOutput v = case v of Output _ -> True _ -> False - getOutput v = case snd v of + getOutput v = case v of Output b'' -> Just b'' _ -> Nothing -addBuffer v buf = return (buf ++ [v]) +addBuffer v buf = return (v:buf) -- cgit v1.3-2-g0d8e From 39fa051833de3178639974fa4fc7c803c5918f0e Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Thu, 29 Oct 2015 00:38:53 -0400 Subject: generalize what can be output This adds a dependency on Text, but I don't mind propellor depending on it and am somewhat surprised it doesn't already. Using Text also lets this use encodeUtf8 instead of the nasty hack it was using to go from String -> ByteString. --- debian/changelog | 2 +- debian/control | 2 ++ propellor.cabal | 6 +++--- src/Propellor/Bootstrap.hs | 1 + src/Utility/ConcurrentOutput.hs | 35 +++++++++++++++++++++++++---------- 5 files changed, 32 insertions(+), 14 deletions(-) (limited to 'propellor.cabal') diff --git a/debian/changelog b/debian/changelog index c5538c7f..6f75bce9 100644 --- a/debian/changelog +++ b/debian/changelog @@ -20,7 +20,7 @@ propellor (2.13.0) UNRELEASED; urgency=medium * Added Propellor.Property.Concurrent for concurrent properties. * Made the execProcess exported by propellor, and everything built on it, avoid scrambled output when run concurrently. - * Propellor now depends on STM. + * Propellor now depends on STM and text. * The cabal file now builds propellor with -O. While -O0 makes ghc take less memory while building propellor, it can lead to bad memory usage at runtime due to eg, disabled stream fusion. diff --git a/debian/control b/debian/control index 2956fdaa..97fb3e6d 100644 --- a/debian/control +++ b/debian/control @@ -18,6 +18,7 @@ Build-Depends: libghc-transformers-dev, libghc-exceptions-dev (>= 0.6), libghc-stm-dev, + libghc-text-dev, Maintainer: Gergely Nagy Standards-Version: 3.9.6 Vcs-Git: git://git.joeyh.name/propellor @@ -41,6 +42,7 @@ Depends: ${misc:Depends}, ${shlibs:Depends}, libghc-transformers-dev, libghc-exceptions-dev (>= 0.6), libghc-stm-dev, + libghc-text-dev, git, make, Description: property-based host configuration management in haskell diff --git a/propellor.cabal b/propellor.cabal index a07109a7..6e871d6b 100644 --- a/propellor.cabal +++ b/propellor.cabal @@ -39,7 +39,7 @@ Executable propellor Build-Depends: MissingH, directory, filepath, base >= 4.5, base < 5, IfElse, process, bytestring, hslogger, unix-compat, ansi-terminal, containers (>= 0.5), network, async, time, QuickCheck, mtl, transformers, - exceptions (>= 0.6), stm + exceptions (>= 0.6), stm, text if (! os(windows)) Build-Depends: unix @@ -51,7 +51,7 @@ Executable propellor-config Build-Depends: MissingH, directory, filepath, base >= 4.5, base < 5, IfElse, process, bytestring, hslogger, unix-compat, ansi-terminal, containers (>= 0.5), network, async, time, QuickCheck, mtl, transformers, - exceptions, stm + exceptions, stm, text if (! os(windows)) Build-Depends: unix @@ -62,7 +62,7 @@ Library Build-Depends: MissingH, directory, filepath, base >= 4.5, base < 5, IfElse, process, bytestring, hslogger, unix-compat, ansi-terminal, containers (>= 0.5), network, async, time, QuickCheck, mtl, transformers, - exceptions, stm + exceptions, stm, text if (! os(windows)) Build-Depends: unix diff --git a/src/Propellor/Bootstrap.hs b/src/Propellor/Bootstrap.hs index 2318b910..21772b34 100644 --- a/src/Propellor/Bootstrap.hs +++ b/src/Propellor/Bootstrap.hs @@ -82,6 +82,7 @@ depsCommand = "( " ++ intercalate " ; " (concat [osinstall, cabalinstall]) ++ " , "libghc-transformers-dev" , "libghc-exceptions-dev" , "libghc-stm-dev" + , "libghc-text-dev" , "make" ] diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs index 94cd4202..c24744a3 100644 --- a/src/Utility/ConcurrentOutput.hs +++ b/src/Utility/ConcurrentOutput.hs @@ -1,4 +1,4 @@ -{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE BangPatterns, TypeSynonymInstances, FlexibleInstances #-} {-# OPTIONS_GHC -fno-warn-tabs #-} -- | @@ -20,6 +20,7 @@ module Utility.ConcurrentOutput ( withConcurrentOutput, flushConcurrentOutput, + Outputable(..), outputConcurrent, createProcessConcurrent, waitForProcessConcurrent, @@ -40,13 +41,14 @@ import Control.Concurrent.Async import Data.Maybe import Data.List import Data.Monoid -import qualified Data.ByteString as B import qualified System.Process as P import qualified Data.Set as S +import qualified Data.ByteString as B +import qualified Data.Text as T +import Data.Text.Encoding (encodeUtf8) import Utility.Monad import Utility.Exception -import Utility.FileSystemEncoding data OutputHandle = OutputHandle { outputLock :: TMVar Lock @@ -137,27 +139,40 @@ flushConcurrentOutput = do -- generating output, and flush any buffered output. lockOutput $ return () --- | Displays a string to stdout, and flush output so it's displayed. +-- | Values that can be output. +class Outputable v where + toOutput :: v -> B.ByteString + +instance Outputable B.ByteString where + toOutput = id + +instance Outputable T.Text where + toOutput = encodeUtf8 + +instance Outputable String where + toOutput = toOutput . T.pack + +-- | Displays a value to stdout, and flush output so it's displayed. -- --- Uses locking to ensure that the whole string is output atomically +-- Uses locking to ensure that the whole output occurs atomically -- even when other threads are concurrently generating output. -- -- When something else is writing to the console at the same time, this does --- not block. It buffers the string, so it will be displayed once the other +-- not block. It buffers the value, so it will be displayed once the other -- writer is done. -outputConcurrent :: String -> IO () -outputConcurrent s = bracket setup cleanup go +outputConcurrent :: Outputable v => v -> IO () +outputConcurrent v = bracket setup cleanup go where setup = tryTakeOutputLock cleanup False = return () cleanup True = dropOutputLock go True = do - putStr s + B.hPut stdout (toOutput v) hFlush stdout go False = do bv <- outputBuffer <$> getOutputHandle oldbuf <- atomically $ takeTMVar bv - newbuf <- addBuffer (Output (B.pack (decodeW8NUL s))) oldbuf + newbuf <- addBuffer (Output (toOutput v)) oldbuf atomically $ putTMVar bv newbuf -- | This must be used to wait for processes started with -- cgit v1.3-2-g0d8e