diff options
| author | Joey Hess <joeyh@joeyh.name> | 2015-11-01 11:30:36 -0400 |
|---|---|---|
| committer | Joey Hess <joeyh@joeyh.name> | 2015-11-01 11:30:36 -0400 |
| commit | 046d7d82b4b309ade5e3508817f1b9b684e57b94 (patch) | |
| tree | b1e6cc3f2d959c7726e3da0c67551927d6a321c8 /src/Utility | |
| parent | 082bfc9f301adc59d7cd26954d8cdc0caf80ec7e (diff) | |
| parent | b218820da0b069e826507150cba118f0fa69d409 (diff) | |
Merge branch 'joeyconfig'
Diffstat (limited to 'src/Utility')
| -rw-r--r-- | src/Utility/ConcurrentOutput.hs | 348 | ||||
| -rw-r--r-- | src/Utility/Process.hs | 28 | ||||
| -rw-r--r-- | src/Utility/Process/Shim.hs | 12 |
3 files changed, 373 insertions, 15 deletions
diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs new file mode 100644 index 00000000..c24744a3 --- /dev/null +++ b/src/Utility/ConcurrentOutput.hs @@ -0,0 +1,348 @@ +{-# LANGUAGE BangPatterns, TypeSynonymInstances, FlexibleInstances #-} +{-# OPTIONS_GHC -fno-warn-tabs #-} + +-- | +-- Copyright: 2013 Joey Hess <id@joeyh.name> +-- License: BSD-2-clause +-- +-- Concurrent output handling. +-- +-- > import Control.Concurrent.Async +-- > import Control.Concurrent.Output +-- > +-- > main = withConcurrentOutput $ +-- > outputConcurrent "washed the car\n" +-- > `concurrently` +-- > outputConcurrent "walked the dog\n" +-- > `concurrently` +-- > createProcessConcurrent (proc "ls" []) + +module Utility.ConcurrentOutput ( + withConcurrentOutput, + flushConcurrentOutput, + Outputable(..), + outputConcurrent, + createProcessConcurrent, + waitForProcessConcurrent, + lockOutput, +) where + +import System.IO +import System.Posix.IO +import System.Directory +import System.Exit +import Control.Monad +import Control.Monad.IO.Class (liftIO, MonadIO) +import Control.Applicative +import System.IO.Unsafe (unsafePerformIO) +import Control.Concurrent +import Control.Concurrent.STM +import Control.Concurrent.Async +import Data.Maybe +import Data.List +import Data.Monoid +import qualified System.Process as P +import qualified Data.Set as S +import qualified Data.ByteString as B +import qualified Data.Text as T +import Data.Text.Encoding (encodeUtf8) + +import Utility.Monad +import Utility.Exception + +data OutputHandle = OutputHandle + { outputLock :: TMVar Lock + , outputBuffer :: TMVar Buffer + , outputThreads :: TMVar (S.Set (Async ())) + } + +data Lock = Locked + +-- | A shared global variable for the OutputHandle. +{-# NOINLINE globalOutputHandle #-} +globalOutputHandle :: MVar OutputHandle +globalOutputHandle = unsafePerformIO $ + newMVar =<< OutputHandle + <$> newEmptyTMVarIO + <*> newTMVarIO [] + <*> newTMVarIO S.empty + +-- | Gets the global OutputHandle. +getOutputHandle :: IO OutputHandle +getOutputHandle = readMVar globalOutputHandle + +-- | Holds a lock while performing an action that will display output. +-- While this is running, other threads that try to lockOutput will block, +-- and calls to `outputConcurrent` and `createProcessConcurrent` +-- will result in that concurrent output being buffered and not +-- displayed until the action is done. +lockOutput :: (MonadIO m, MonadMask m) => m a -> m a +lockOutput = bracket_ (liftIO takeOutputLock) (liftIO dropOutputLock) + +-- | Blocks until we have the output lock. +takeOutputLock :: IO () +takeOutputLock = void $ takeOutputLock' True + +-- | Tries to take the output lock, without blocking. +tryTakeOutputLock :: IO Bool +tryTakeOutputLock = takeOutputLock' False + +withLock :: (TMVar Lock -> STM a) -> IO a +withLock a = do + lck <- outputLock <$> getOutputHandle + atomically (a lck) + +takeOutputLock' :: Bool -> IO Bool +takeOutputLock' block = do + locked <- withLock $ \l -> do + v <- tryTakeTMVar l + case v of + Just Locked + | block -> retry + | otherwise -> do + -- Restore value we took. + putTMVar l Locked + return False + Nothing -> do + putTMVar l Locked + return True + when locked $ do + bv <- outputBuffer <$> getOutputHandle + buf <- atomically $ swapTMVar bv [] + emitBuffer stdout buf + return locked + +-- | Only safe to call after taking the output lock. +dropOutputLock :: IO () +dropOutputLock = withLock $ void . takeTMVar + +-- | Use this around any IO actions that use `outputConcurrent` +-- or `createProcessConcurrent` +-- +-- This is necessary to ensure that buffered concurrent output actually +-- gets displayed before the program exits. +withConcurrentOutput :: IO a -> IO a +withConcurrentOutput a = a `finally` flushConcurrentOutput + +-- | Blocks until any processes started by `createProcessConcurrent` have +-- finished, and any buffered output is displayed. +flushConcurrentOutput :: IO () +flushConcurrentOutput = do + -- Wait for all outputThreads to finish. + v <- outputThreads <$> getOutputHandle + atomically $ do + r <- takeTMVar v + if r == S.empty + then putTMVar v r + else retry + -- Take output lock to ensure that nothing else is currently + -- generating output, and flush any buffered output. + lockOutput $ return () + +-- | Values that can be output. +class Outputable v where + toOutput :: v -> B.ByteString + +instance Outputable B.ByteString where + toOutput = id + +instance Outputable T.Text where + toOutput = encodeUtf8 + +instance Outputable String where + toOutput = toOutput . T.pack + +-- | Displays a value to stdout, and flush output so it's displayed. +-- +-- Uses locking to ensure that the whole output occurs atomically +-- even when other threads are concurrently generating output. +-- +-- When something else is writing to the console at the same time, this does +-- not block. It buffers the value, so it will be displayed once the other +-- writer is done. +outputConcurrent :: Outputable v => v -> IO () +outputConcurrent v = bracket setup cleanup go + where + setup = tryTakeOutputLock + cleanup False = return () + cleanup True = dropOutputLock + go True = do + B.hPut stdout (toOutput v) + hFlush stdout + go False = do + bv <- outputBuffer <$> getOutputHandle + oldbuf <- atomically $ takeTMVar bv + newbuf <- addBuffer (Output (toOutput v)) oldbuf + atomically $ putTMVar bv newbuf + +-- | This must be used to wait for processes started with +-- `createProcessConcurrent`. +-- +-- This is necessary because `System.Process.waitForProcess` has a +-- race condition when two threads check the same process. If the race +-- is triggered, one thread will successfully wait, but the other +-- throws a DoesNotExist exception. +waitForProcessConcurrent :: P.ProcessHandle -> IO ExitCode +waitForProcessConcurrent h = do + v <- tryWhenExists (P.waitForProcess h) + case v of + Just r -> return r + Nothing -> maybe (waitForProcessConcurrent h) return =<< P.getProcessExitCode h + +-- | Wrapper around `System.Process.createProcess` that prevents +-- multiple processes that are running concurrently from writing +-- to stdout/stderr at the same time. +-- +-- If the process does not output to stdout or stderr, it's run +-- by createProcess entirely as usual. Only processes that can generate +-- output are handled specially: +-- +-- A process is allowed to write to stdout and stderr in the usual +-- way, assuming it can successfully take the output lock. +-- +-- When the output lock is held (by another concurrent process, +-- or because `outputConcurrent` is being called at the same time), +-- the process is instead run with its stdout and stderr +-- redirected to a buffer. The buffered output will be displayed as soon +-- as the output lock becomes free. +createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) +createProcessConcurrent p + | willOutput (P.std_out p) || willOutput (P.std_err p) = + ifM tryTakeOutputLock + ( firstprocess + , concurrentprocess + ) + | otherwise = P.createProcess p + where + rediroutput ss h + | willOutput ss = P.UseHandle h + | otherwise = ss + + firstprocess = do + r@(_, _, _, h) <- P.createProcess p + `onException` dropOutputLock + -- Wait for the process to exit and drop the lock. + void $ async $ do + void $ tryIO $ waitForProcessConcurrent h + dropOutputLock + return r + + concurrentprocess = do + (toouth, fromouth) <- pipe + (toerrh, fromerrh) <- pipe + let p' = p + { P.std_out = rediroutput (P.std_out p) toouth + , P.std_err = rediroutput (P.std_err p) toerrh + } + r <- P.createProcess p' + outbuf <- setupBuffer stdout toouth (P.std_out p) fromouth + errbuf <- setupBuffer stderr toerrh (P.std_err p) fromerrh + void $ async $ bufferWriter [outbuf, errbuf] + return r + + pipe = do + (from, to) <- createPipe + (,) <$> fdToHandle to <*> fdToHandle from + +willOutput :: P.StdStream -> Bool +willOutput P.Inherit = True +willOutput _ = False + +-- Built up with newest seen output first. +type Buffer = [BufferedActivity] + +data BufferedActivity + = ReachedEnd + | Output B.ByteString + | InTempFile FilePath + deriving (Eq) + +setupBuffer :: Handle -> Handle -> P.StdStream -> Handle -> IO (Handle, MVar Buffer, TMVar ()) +setupBuffer h toh ss fromh = do + hClose toh + buf <- newMVar [] + bufsig <- atomically newEmptyTMVar + void $ async $ outputDrainer ss fromh buf bufsig + return (h, buf, bufsig) + +-- Drain output from the handle, and buffer it. +outputDrainer :: P.StdStream -> Handle -> MVar Buffer -> TMVar () -> IO () +outputDrainer ss fromh buf bufsig + | willOutput ss = go + | otherwise = atend + where + go = do + v <- tryIO $ B.hGetSome fromh 1048576 + case v of + Right b | not (B.null b) -> do + modifyMVar_ buf $ addBuffer (Output b) + changed + go + _ -> atend + atend = do + modifyMVar_ buf $ pure . (ReachedEnd :) + changed + hClose fromh + changed = atomically $ do + void $ tryTakeTMVar bufsig + putTMVar bufsig () + +-- Wait to lock output, and once we can, display everything +-- that's put into the buffers, until the end. +bufferWriter :: [(Handle, MVar Buffer, TMVar ())] -> IO () +bufferWriter ts = do + worker <- async $ void $ lockOutput $ mapConcurrently go ts + v <- outputThreads <$> getOutputHandle + atomically $ do + s <- takeTMVar v + putTMVar v (S.insert worker s) + void $ async $ do + void $ waitCatch worker + atomically $ do + s <- takeTMVar v + putTMVar v (S.delete worker s) + where + go v@(outh, buf, bufsig) = do + void $ atomically $ takeTMVar bufsig + l <- takeMVar buf + putMVar buf [] + emitBuffer outh l + if any (== ReachedEnd) l + then return () + else go v + +emitBuffer :: Handle -> Buffer -> IO () +emitBuffer outh l = forM_ (reverse l) $ \ba -> case ba of + Output b -> do + B.hPut outh b + hFlush outh + InTempFile tmp -> do + B.hPut outh =<< B.readFile tmp + void $ tryWhenExists $ removeFile tmp + ReachedEnd -> return () + +-- Adds a value to the Buffer. When adding Output to a Handle, it's cheaper +-- to combine it with any already buffered Output to that same Handle. +-- +-- When the total buffered Output exceeds 1 mb in size, it's moved out of +-- memory, to a temp file. This should only happen rarely, but is done to +-- avoid some verbose process unexpectedly causing excessive memory use. +addBuffer :: BufferedActivity -> Buffer -> IO Buffer +addBuffer (Output b) buf + | B.length b' <= 1048576 = return (Output b' : other) + | otherwise = do + tmpdir <- getTemporaryDirectory + (tmp, h) <- openTempFile tmpdir "output.tmp" + B.hPut h b' + hClose h + return (InTempFile tmp : other) + where + !b' = B.concat (mapMaybe getOutput this) <> b + !(this, other) = partition isOutput buf + isOutput v = case v of + Output _ -> True + _ -> False + getOutput v = case v of + Output b'' -> Just b'' + _ -> Nothing +addBuffer v buf = return (v:buf) diff --git a/src/Utility/Process.hs b/src/Utility/Process.hs index c4882a01..c6699961e 100644 --- a/src/Utility/Process.hs +++ b/src/Utility/Process.hs @@ -41,9 +41,12 @@ module Utility.Process ( devNull, ) where -import qualified System.Process -import qualified System.Process as X hiding (CreateProcess(..), createProcess, runInteractiveProcess, readProcess, readProcessWithExitCode, system, rawSystem, runInteractiveCommand, runProcess) -import System.Process hiding (createProcess, readProcess, waitForProcess) +import qualified Utility.Process.Shim +import qualified Utility.Process.Shim as X hiding (CreateProcess(..), createProcess, runInteractiveProcess, readProcess, readProcessWithExitCode, system, rawSystem, runInteractiveCommand, runProcess) +import Utility.Process.Shim hiding (createProcess, readProcess, waitForProcess) +import Utility.Misc +import Utility.Exception + import System.Exit import System.IO import System.Log.Logger @@ -58,9 +61,6 @@ import Control.Applicative import Data.Maybe import Prelude -import Utility.Misc -import Utility.Exception - type CreateProcessRunner = forall a. CreateProcess -> ((Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> IO a) -> IO a data StdHandle = StdinHandle | StdoutHandle | StderrHandle @@ -172,22 +172,21 @@ createBackgroundProcess p a = a =<< createProcess p -- returns a transcript combining its stdout and stderr, and -- whether it succeeded or failed. processTranscript :: String -> [String] -> (Maybe String) -> IO (String, Bool) -processTranscript cmd opts = processTranscript' cmd opts Nothing +processTranscript = processTranscript' id -processTranscript' :: String -> [String] -> Maybe [(String, String)] -> (Maybe String) -> IO (String, Bool) -processTranscript' cmd opts environ input = do +processTranscript' :: (CreateProcess -> CreateProcess) -> String -> [String] -> Maybe String -> IO (String, Bool) +processTranscript' modproc cmd opts input = do #ifndef mingw32_HOST_OS {- This implementation interleves stdout and stderr in exactly the order - the process writes them. -} (readf, writef) <- System.Posix.IO.createPipe readh <- System.Posix.IO.fdToHandle readf writeh <- System.Posix.IO.fdToHandle writef - p@(_, _, _, pid) <- createProcess $ + p@(_, _, _, pid) <- createProcess $ modproc $ (proc cmd opts) { std_in = if isJust input then CreatePipe else Inherit , std_out = UseHandle writeh , std_err = UseHandle writeh - , env = environ } hClose writeh @@ -199,12 +198,11 @@ processTranscript' cmd opts environ input = do return (transcript, ok) #else {- This implementation for Windows puts stderr after stdout. -} - p@(_, _, _, pid) <- createProcess $ + p@(_, _, _, pid) <- createProcess $ modproc $ (proc cmd opts) { std_in = if isJust input then CreatePipe else Inherit , std_out = CreatePipe , std_err = CreatePipe - , env = environ } getout <- mkreader (stdoutHandle p) @@ -374,7 +372,7 @@ startInteractiveProcess cmd args environ = do createProcess :: CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) createProcess p = do debugProcess p - System.Process.createProcess p + Utility.Process.Shim.createProcess p -- | Debugging trace for a CreateProcess. debugProcess :: CreateProcess -> IO () @@ -394,6 +392,6 @@ debugProcess p = debugM "Utility.Process" $ unwords -- | Wrapper around 'System.Process.waitForProcess' that does debug logging. waitForProcess :: ProcessHandle -> IO ExitCode waitForProcess h = do - r <- System.Process.waitForProcess h + r <- Utility.Process.Shim.waitForProcess h debugM "Utility.Process" ("process done " ++ show r) return r diff --git a/src/Utility/Process/Shim.hs b/src/Utility/Process/Shim.hs new file mode 100644 index 00000000..08694d5d --- /dev/null +++ b/src/Utility/Process/Shim.hs @@ -0,0 +1,12 @@ +module Utility.Process.Shim (module X, createProcess, waitForProcess) where + +import System.Process as X hiding (createProcess, waitForProcess) +import Utility.ConcurrentOutput (createProcessConcurrent, waitForProcessConcurrent) +import System.IO +import System.Exit + +createProcess :: CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) +createProcess = createProcessConcurrent + +waitForProcess :: ProcessHandle -> IO ExitCode +waitForProcess = waitForProcessConcurrent |
