diff options
| author | Joey Hess <joeyh@joeyh.name> | 2015-11-01 16:53:25 -0400 |
|---|---|---|
| committer | Joey Hess <joeyh@joeyh.name> | 2015-11-01 16:53:25 -0400 |
| commit | 592c65d02bf07d053d2fbe8a568f88d1b28e1a65 (patch) | |
| tree | 49a3c1a7aa976043d72ee396681fe11ddd3e0194 | |
| parent | 4d63a9f0ad327cba305e239e51d02e5e33213eda (diff) | |
merge from concurrent-output
| -rw-r--r-- | propellor.cabal | 4 | ||||
| -rw-r--r-- | src/Propellor/Gpg.hs | 5 | ||||
| -rw-r--r-- | src/Propellor/Message.hs | 2 | ||||
| -rw-r--r-- | src/Propellor/PrivData.hs | 5 | ||||
| -rw-r--r-- | src/Propellor/Property/Chroot.hs | 2 | ||||
| -rw-r--r-- | src/Propellor/Property/Docker.hs | 2 | ||||
| -rw-r--r-- | src/System/Console/Concurrent.hs | 39 | ||||
| -rw-r--r-- | src/System/Console/Concurrent/Internal.hs (renamed from src/Utility/ConcurrentOutput.hs) | 176 | ||||
| -rw-r--r-- | src/System/Process/Concurrent.hs | 34 | ||||
| -rw-r--r-- | src/Utility/Process/Shim.hs | 10 |
10 files changed, 172 insertions, 107 deletions
diff --git a/propellor.cabal b/propellor.cabal index 6e871d6b..ccba846f 100644 --- a/propellor.cabal +++ b/propellor.cabal @@ -161,7 +161,6 @@ Library Propellor.Shim Propellor.Property.Chroot.Util Utility.Applicative - Utility.ConcurrentOutput Utility.Data Utility.DataUnits Utility.Directory @@ -185,6 +184,9 @@ Library Utility.Tmp Utility.UserInfo Utility.QuickCheck + System.Console.Concurrent + System.Console.Concurrent.Internal + System.Process.Concurrent source-repository head type: git diff --git a/src/Propellor/Gpg.hs b/src/Propellor/Gpg.hs index 9c58a5d1..960c70d3 100644 --- a/src/Propellor/Gpg.hs +++ b/src/Propellor/Gpg.hs @@ -7,6 +7,8 @@ import System.Directory import Data.Maybe import Data.List.Utils import Control.Monad +import System.Console.Concurrent +import System.Console.Concurrent.Internal (ConcurrentProcessHandle(..)) import Propellor.PrivData.Paths import Propellor.Message @@ -16,7 +18,6 @@ import Utility.Monad import Utility.Misc import Utility.Tmp import Utility.FileSystemEncoding -import Utility.ConcurrentOutput type KeyId = String @@ -129,7 +130,7 @@ gitCommit msg ps = do ps'' <- gpgSignParams ps' if isNothing msg then do - (_, _, _, p) <- createProcessForeground $ + (_, _, _, ConcurrentProcessHandle p) <- createProcessForeground $ proc "git" (toCommand ps'') checkSuccessProcess p else boolSystem "git" ps'' diff --git a/src/Propellor/Message.hs b/src/Propellor/Message.hs index 7df5104a..e964c664 100644 --- a/src/Propellor/Message.hs +++ b/src/Propellor/Message.hs @@ -25,9 +25,9 @@ import Control.Monad.IO.Class (liftIO, MonadIO) import Control.Applicative import System.IO.Unsafe (unsafePerformIO) import Control.Concurrent +import System.Console.Concurrent import Propellor.Types -import Utility.ConcurrentOutput import Utility.PartialPrelude import Utility.Monad import Utility.Exception diff --git a/src/Propellor/PrivData.hs b/src/Propellor/PrivData.hs index 6b77f782..a1e34abc 100644 --- a/src/Propellor/PrivData.hs +++ b/src/Propellor/PrivData.hs @@ -36,6 +36,8 @@ import "mtl" Control.Monad.Reader import qualified Data.Map as M import qualified Data.Set as S import qualified Data.ByteString.Lazy as L +import System.Console.Concurrent +import System.Console.Concurrent.Internal (ConcurrentProcessHandle(..)) import Propellor.Types import Propellor.Types.PrivData @@ -54,7 +56,6 @@ import Utility.FileMode import Utility.Env import Utility.Table import Utility.FileSystemEncoding -import Utility.ConcurrentOutput import Utility.Process -- | Allows a Property to access the value of a specific PrivDataField, @@ -194,7 +195,7 @@ editPrivData field context = do hClose th maybe noop (\p -> writeFileProtected' f (`L.hPut` privDataByteString p)) v editor <- getEnvDefault "EDITOR" "vi" - (_, _, _, p) <- createProcessForeground $ proc editor [f] + (_, _, _, ConcurrentProcessHandle p) <- createProcessForeground $ proc editor [f] unlessM (checkSuccessProcess p) $ error "Editor failed; aborting." PrivData <$> readFile f diff --git a/src/Propellor/Property/Chroot.hs b/src/Propellor/Property/Chroot.hs index 0c00e8f4..8d1a2388 100644 --- a/src/Propellor/Property/Chroot.hs +++ b/src/Propellor/Property/Chroot.hs @@ -27,11 +27,11 @@ import qualified Propellor.Property.Systemd.Core as Systemd import qualified Propellor.Property.File as File import qualified Propellor.Shim as Shim import Propellor.Property.Mount -import Utility.ConcurrentOutput import qualified Data.Map as M import Data.List.Utils import System.Posix.Directory +import System.Console.Concurrent -- | Specification of a chroot. Normally you'll use `debootstrapped` or -- `bootstrapped` to construct a Chroot value. diff --git a/src/Propellor/Property/Docker.hs b/src/Propellor/Property/Docker.hs index f2dbaaf5..0cc8212b 100644 --- a/src/Propellor/Property/Docker.hs +++ b/src/Propellor/Property/Docker.hs @@ -56,7 +56,6 @@ import qualified Propellor.Property.Cmd as Cmd import qualified Propellor.Shim as Shim import Utility.Path import Utility.ThreadScheduler -import Utility.ConcurrentOutput import Control.Concurrent.Async hiding (link) import System.Posix.Directory @@ -65,6 +64,7 @@ import Prelude hiding (init) import Data.List hiding (init) import Data.List.Utils import qualified Data.Map as M +import System.Console.Concurrent installed :: Property NoInfo installed = Apt.installed ["docker.io"] diff --git a/src/System/Console/Concurrent.hs b/src/System/Console/Concurrent.hs new file mode 100644 index 00000000..efbfaa15 --- /dev/null +++ b/src/System/Console/Concurrent.hs @@ -0,0 +1,39 @@ +-- | +-- Copyright: 2015 Joey Hess <id@joeyh.name> +-- License: BSD-2-clause +-- +-- Concurrent output handling. +-- +-- > import Control.Concurrent.Async +-- > import System.Console.Concurrent +-- > +-- > main = withConcurrentOutput $ +-- > outputConcurrent "washed the car\n" +-- > `concurrently` +-- > outputConcurrent "walked the dog\n" +-- > `concurrently` +-- > createProcessConcurrent (proc "ls" []) + +module System.Console.Concurrent ( + -- * Concurrent output + withConcurrentOutput, + Outputable(..), + outputConcurrent, + ConcurrentProcessHandle, + createProcessConcurrent, + waitForProcessConcurrent, + createProcessForeground, + flushConcurrentOutput, + lockOutput, + -- * Low level access to the output buffer + OutputBuffer, + StdHandle(..), + bufferOutputSTM, + outputBufferWaiterSTM, + waitAnyBuffer, + waitCompleteLines, + emitOutputBuffer, +) where + +import System.Console.Concurrent.Internal + diff --git a/src/Utility/ConcurrentOutput.hs b/src/System/Console/Concurrent/Internal.hs index ca1ae7c5..caef9833 100644 --- a/src/Utility/ConcurrentOutput.hs +++ b/src/System/Console/Concurrent/Internal.hs @@ -1,40 +1,14 @@ {-# LANGUAGE BangPatterns, TypeSynonymInstances, FlexibleInstances, TupleSections #-} -- | --- Copyright: 2013 Joey Hess <id@joeyh.name> +-- Copyright: 2015 Joey Hess <id@joeyh.name> -- License: BSD-2-clause -- --- Concurrent output handling. +-- Concurrent output handling, internals. -- --- > import Control.Concurrent.Async --- > import System.Console.Concurrent --- > --- > main = withConcurrentOutput $ --- > outputConcurrent "washed the car\n" --- > `concurrently` --- > outputConcurrent "walked the dog\n" --- > `concurrently` --- > createProcessConcurrent (proc "ls" []) +-- May change at any time. -module Utility.ConcurrentOutput ( - -- * Concurrent output - withConcurrentOutput, - Outputable(..), - outputConcurrent, - createProcessConcurrent, - waitForProcessConcurrent, - createProcessForeground, - flushConcurrentOutput, - lockOutput, - -- * Low level access to the output buffer - OutputBuffer, - StdHandle(..), - bufferOutputSTM, - outputBufferWaiterSTM, - waitAnyBuffer, - waitCompleteLines, - emitOutputBuffer, -) where +module System.Console.Concurrent.Internal where import System.IO import System.Posix.IO @@ -62,6 +36,8 @@ data OutputHandle = OutputHandle , outputBuffer :: TMVar OutputBuffer , errorBuffer :: TMVar OutputBuffer , outputThreads :: TMVar Integer + , processWaiters :: TMVar [Async ()] + , waitForProcessLock :: TMVar () } data Lock = Locked @@ -74,6 +50,8 @@ globalOutputHandle = unsafePerformIO $ OutputHandle <*> newTMVarIO (OutputBuffer []) <*> newTMVarIO (OutputBuffer []) <*> newTMVarIO 0 + <*> newTMVarIO [] + <*> newEmptyTMVarIO -- | Holds a lock while performing an action. This allows the action to -- perform its own output to the console, without using functions from this @@ -185,20 +163,69 @@ outputConcurrent v = bracket setup cleanup go newbuf <- addOutputBuffer (Output (toOutput v)) oldbuf atomically $ putTMVar bv newbuf --- | This must be used to wait for processes started with --- `createProcessConcurrent` and `createProcessForeground`. It may also be --- used to wait for processes started by `System.Process.createProcess`. +newtype ConcurrentProcessHandle = ConcurrentProcessHandle P.ProcessHandle + +toConcurrentProcessHandle :: (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) -> (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle) +toConcurrentProcessHandle (i, o, e, h) = (i, o, e, ConcurrentProcessHandle h) + +-- | Use this to wait for processes started with +-- `createProcessConcurrent` and `createProcessForeground`, and get their +-- exit status. -- --- This is necessary because `System.Process.waitForProcess` has a --- race condition when two threads check the same process. If the race --- is triggered, one thread will successfully wait, but the other --- throws a DoesNotExist exception. -waitForProcessConcurrent :: P.ProcessHandle -> IO ExitCode -waitForProcessConcurrent h = do - v <- tryWhenExists (P.waitForProcess h) - case v of - Just r -> return r - Nothing -> maybe (waitForProcessConcurrent h) return =<< P.getProcessExitCode h +-- Note that such processes are actually automatically waited for +-- internally, so not calling this exiplictly will not result +-- in zombie processes. This behavior differs from `P.waitForProcess` +waitForProcessConcurrent :: ConcurrentProcessHandle -> IO ExitCode +waitForProcessConcurrent (ConcurrentProcessHandle h) = checkexit + where + checkexit = maybe waitsome return =<< P.getProcessExitCode h + waitsome = maybe checkexit return =<< bracket lock unlock go + lck = waitForProcessLock globalOutputHandle + lock = atomically $ tryPutTMVar lck () + unlock True = atomically $ takeTMVar lck + unlock False = return () + go True = do + let v = processWaiters globalOutputHandle + l <- atomically $ readTMVar v + if null l + -- Avoid waitAny [] which blocks forever; + then Just <$> P.waitForProcess h + else do + -- Wait for any of the running + -- processes to exit. It may or may not + -- be the one corresponding to the + -- ProcessHandle. If it is, + -- getProcessExitCode will succeed. + void $ tryIO $ waitAny l + hFlush stdout + return Nothing + go False = do + -- Another thread took the lck first. Wait for that thread to + -- wait for one of the running processes to exit. + atomically $ do + putTMVar lck () + takeTMVar lck + return Nothing + +-- Registers an action that waits for a process to exit, +-- adding it to the processWaiters list, and removing it once the action +-- completes. +asyncProcessWaiter :: IO () -> IO () +asyncProcessWaiter waitaction = do + regdone <- newEmptyTMVarIO + waiter <- async $ do + self <- atomically (takeTMVar regdone) + waitaction `finally` unregister self + register waiter regdone + where + v = processWaiters globalOutputHandle + register waiter regdone = atomically $ do + l <- takeTMVar v + putTMVar v (waiter:l) + putTMVar regdone waiter + unregister waiter = atomically $ do + l <- takeTMVar v + putTMVar v (filter (/= waiter) l) -- | Wrapper around `System.Process.createProcess` that prevents -- multiple processes that are running concurrently from writing @@ -215,71 +242,39 @@ waitForProcessConcurrent h = do -- or because `outputConcurrent` is being called at the same time), -- the process is instead run with its stdout and stderr -- redirected to a buffer. The buffered output will be displayed as soon --- as the output lock becomes free, or after the command has finished. -createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) +-- as the output lock becomes free. +createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle) createProcessConcurrent p | willOutput (P.std_out p) || willOutput (P.std_err p) = ifM tryTakeOutputLock ( fgProcess p , bgProcess p ) - | otherwise = P.createProcess p + | otherwise = do + r@(_, _, _, h) <- P.createProcess p + asyncProcessWaiter $ do + void $ P.waitForProcess h + return (toConcurrentProcessHandle r) -- | Wrapper around `System.Process.createProcess` that makes sure a process -- is run in the foreground, with direct access to stdout and stderr. -- Useful when eg, running an interactive process. --- --- If another process is already running in the foreground, this will block --- until it finishes. Background processes may continue to run while --- this process is running, and their output will be buffered until it --- exits. --- --- The obvious reason you might need to use this is in an example like this: --- --- > main = withConcurrentOutput $ --- > createProcessConcurrent (proc "ls" []) --- > `concurrently` createProcessForeground (proc "vim" []) --- --- Since vim is an interactive program, it needs to run in the foreground. --- If it were started by `createProcessConcurrent`, it would sometimes --- run in the background. --- --- Also, there is actually a race condition when calling --- `createProcessConcurrent` sequentially like this: --- --- > main = withConcurrentOutput $ do --- > (Nothing, Nothing, Nothing, h) <- createProcessConcurrent (proc "ls" []) --- > waitForProcessConcurrent h --- > createProcessConcurrent (proc "vim" []) --- --- Here vim runs about 50% of the time as a background process! Why is --- it not always run in the foreground? The reason is that the previous --- process was run in the foreground, and still holds the output lock. --- `waitForProcessConcurrent` waits for that process, but does not clear --- the output lock immediately. By the time the output lock does clear, --- the vim process may have already started up, in the background. --- --- It would be nice to fix that race, but it can't be fixed without --- an Eq instance for `ProcessHandle`. In any case, when you're using --- this module, you're typically actually doing concurrent things, --- not sequential as in the example above, and so even if the race were --- fixed, you'd still want to use `createProcessForeground` to run vim. -createProcessForeground :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) +createProcessForeground :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle) createProcessForeground p = do takeOutputLock fgProcess p -fgProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) +fgProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle) fgProcess p = do r@(_, _, _, h) <- P.createProcess p `onException` dropOutputLock -- Wait for the process to exit and drop the lock. - void $ async $ do - void $ tryIO $ waitForProcessConcurrent h + asyncProcessWaiter $ do + void $ P.waitForProcess h dropOutputLock - return r + return (toConcurrentProcessHandle r) -bgProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) +bgProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle) bgProcess p = do (toouth, fromouth) <- pipe (toerrh, fromerrh) <- pipe @@ -288,12 +283,13 @@ bgProcess p = do , P.std_err = rediroutput (P.std_err p) toerrh } registerOutputThread - r <- P.createProcess p' + r@(_, _, _, h) <- P.createProcess p' `onException` unregisterOutputThread + asyncProcessWaiter $ void $ P.waitForProcess h outbuf <- setupOutputBuffer StdOut toouth (P.std_out p) fromouth errbuf <- setupOutputBuffer StdErr toerrh (P.std_err p) fromerrh void $ async $ bufferWriter [outbuf, errbuf] - return r + return (toConcurrentProcessHandle r) where pipe = do (from, to) <- createPipe diff --git a/src/System/Process/Concurrent.hs b/src/System/Process/Concurrent.hs new file mode 100644 index 00000000..0e00e4fd --- /dev/null +++ b/src/System/Process/Concurrent.hs @@ -0,0 +1,34 @@ +-- | +-- Copyright: 2015 Joey Hess <id@joeyh.name> +-- License: BSD-2-clause +-- +-- The functions exported by this module are intended to be drop-in +-- replacements for those from System.Process, when converting a whole +-- program to use System.Console.Concurrent. + +module System.Process.Concurrent where + +import System.Console.Concurrent +import System.Console.Concurrent.Internal (ConcurrentProcessHandle(..)) +import System.Process hiding (createProcess, waitForProcess) +import System.IO +import System.Exit + +-- | Calls `createProcessConcurrent` +-- +-- You should use the waitForProcess in this module on the resulting +-- ProcessHandle. Using System.Process.waitForProcess instead can have +-- mildly unexpected results. +createProcess :: CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) +createProcess p = do + (i, o, e, ConcurrentProcessHandle h) <- createProcessConcurrent p + return (i, o, e, h) + +-- | Calls `waitForProcessConcurrent` +-- +-- You should only use this on a ProcessHandle obtained by calling +-- createProcess from this module. Using this with a ProcessHandle +-- obtained from System.Process.createProcess etc will have extremely +-- unexpected results; it can wait a very long time before returning. +waitForProcess :: ProcessHandle -> IO ExitCode +waitForProcess = waitForProcessConcurrent . ConcurrentProcessHandle diff --git a/src/Utility/Process/Shim.hs b/src/Utility/Process/Shim.hs index 08694d5d..8c9d41d0 100644 --- a/src/Utility/Process/Shim.hs +++ b/src/Utility/Process/Shim.hs @@ -1,12 +1,4 @@ module Utility.Process.Shim (module X, createProcess, waitForProcess) where import System.Process as X hiding (createProcess, waitForProcess) -import Utility.ConcurrentOutput (createProcessConcurrent, waitForProcessConcurrent) -import System.IO -import System.Exit - -createProcess :: CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -createProcess = createProcessConcurrent - -waitForProcess :: ProcessHandle -> IO ExitCode -waitForProcess = waitForProcessConcurrent +import System.Process.Concurrent |
