diff options
| author | Joey Hess <joeyh@joeyh.name> | 2015-11-01 16:53:25 -0400 |
|---|---|---|
| committer | Joey Hess <joeyh@joeyh.name> | 2015-11-01 16:53:25 -0400 |
| commit | 592c65d02bf07d053d2fbe8a568f88d1b28e1a65 (patch) | |
| tree | 49a3c1a7aa976043d72ee396681fe11ddd3e0194 /src/Utility | |
| parent | 4d63a9f0ad327cba305e239e51d02e5e33213eda (diff) | |
merge from concurrent-output
Diffstat (limited to 'src/Utility')
| -rw-r--r-- | src/Utility/ConcurrentOutput.hs | 526 | ||||
| -rw-r--r-- | src/Utility/Process/Shim.hs | 10 |
2 files changed, 1 insertions, 535 deletions
diff --git a/src/Utility/ConcurrentOutput.hs b/src/Utility/ConcurrentOutput.hs deleted file mode 100644 index ca1ae7c5..00000000 --- a/src/Utility/ConcurrentOutput.hs +++ /dev/null @@ -1,526 +0,0 @@ -{-# LANGUAGE BangPatterns, TypeSynonymInstances, FlexibleInstances, TupleSections #-} - --- | --- Copyright: 2013 Joey Hess <id@joeyh.name> --- License: BSD-2-clause --- --- Concurrent output handling. --- --- > import Control.Concurrent.Async --- > import System.Console.Concurrent --- > --- > main = withConcurrentOutput $ --- > outputConcurrent "washed the car\n" --- > `concurrently` --- > outputConcurrent "walked the dog\n" --- > `concurrently` --- > createProcessConcurrent (proc "ls" []) - -module Utility.ConcurrentOutput ( - -- * Concurrent output - withConcurrentOutput, - Outputable(..), - outputConcurrent, - createProcessConcurrent, - waitForProcessConcurrent, - createProcessForeground, - flushConcurrentOutput, - lockOutput, - -- * Low level access to the output buffer - OutputBuffer, - StdHandle(..), - bufferOutputSTM, - outputBufferWaiterSTM, - waitAnyBuffer, - waitCompleteLines, - emitOutputBuffer, -) where - -import System.IO -import System.Posix.IO -import System.Directory -import System.Exit -import Control.Monad -import Control.Monad.IO.Class (liftIO, MonadIO) -import Control.Applicative -import System.IO.Unsafe (unsafePerformIO) -import Control.Concurrent -import Control.Concurrent.STM -import Control.Concurrent.Async -import Data.Maybe -import Data.List -import Data.Monoid -import qualified System.Process as P -import qualified Data.Text as T -import qualified Data.Text.IO as T - -import Utility.Monad -import Utility.Exception - -data OutputHandle = OutputHandle - { outputLock :: TMVar Lock - , outputBuffer :: TMVar OutputBuffer - , errorBuffer :: TMVar OutputBuffer - , outputThreads :: TMVar Integer - } - -data Lock = Locked - --- | A shared global variable for the OutputHandle. -{-# NOINLINE globalOutputHandle #-} -globalOutputHandle :: OutputHandle -globalOutputHandle = unsafePerformIO $ OutputHandle - <$> newEmptyTMVarIO - <*> newTMVarIO (OutputBuffer []) - <*> newTMVarIO (OutputBuffer []) - <*> newTMVarIO 0 - --- | Holds a lock while performing an action. This allows the action to --- perform its own output to the console, without using functions from this --- module. --- --- While this is running, other threads that try to lockOutput will block. --- Any calls to `outputConcurrent` and `createProcessConcurrent` will not --- block, but the output will be buffered and displayed only once the --- action is done. -lockOutput :: (MonadIO m, MonadMask m) => m a -> m a -lockOutput = bracket_ (liftIO takeOutputLock) (liftIO dropOutputLock) - --- | Blocks until we have the output lock. -takeOutputLock :: IO () -takeOutputLock = void $ takeOutputLock' True - --- | Tries to take the output lock, without blocking. -tryTakeOutputLock :: IO Bool -tryTakeOutputLock = takeOutputLock' False - -withLock :: (TMVar Lock -> STM a) -> IO a -withLock a = atomically $ a (outputLock globalOutputHandle) - -takeOutputLock' :: Bool -> IO Bool -takeOutputLock' block = do - locked <- withLock $ \l -> do - v <- tryTakeTMVar l - case v of - Just Locked - | block -> retry - | otherwise -> do - -- Restore value we took. - putTMVar l Locked - return False - Nothing -> do - putTMVar l Locked - return True - when locked $ do - (outbuf, errbuf) <- atomically $ (,) - <$> swapTMVar (outputBuffer globalOutputHandle) (OutputBuffer []) - <*> swapTMVar (errorBuffer globalOutputHandle) (OutputBuffer []) - emitOutputBuffer StdOut outbuf - emitOutputBuffer StdErr errbuf - return locked - --- | Only safe to call after taking the output lock. -dropOutputLock :: IO () -dropOutputLock = withLock $ void . takeTMVar - --- | Use this around any actions that use `outputConcurrent` --- or `createProcessConcurrent` --- --- This is necessary to ensure that buffered concurrent output actually --- gets displayed before the program exits. -withConcurrentOutput :: (MonadIO m, MonadMask m) => m a -> m a -withConcurrentOutput a = a `finally` liftIO flushConcurrentOutput - --- | Blocks until any processes started by `createProcessConcurrent` have --- finished, and any buffered output is displayed. --- --- `withConcurrentOutput` calls this at the end; you can call it anytime --- you want to flush output. -flushConcurrentOutput :: IO () -flushConcurrentOutput = do - -- Wait for all outputThreads to finish. - let v = outputThreads globalOutputHandle - atomically $ do - r <- takeTMVar v - if r <= 0 - then putTMVar v r - else retry - -- Take output lock to ensure that nothing else is currently - -- generating output, and flush any buffered output. - lockOutput $ return () - --- | Values that can be output. -class Outputable v where - toOutput :: v -> T.Text - -instance Outputable T.Text where - toOutput = id - -instance Outputable String where - toOutput = toOutput . T.pack - --- | Displays a value to stdout. --- --- No newline is appended to the value, so if you want a newline, be sure --- to include it yourself. --- --- Uses locking to ensure that the whole output occurs atomically --- even when other threads are concurrently generating output. --- --- When something else is writing to the console at the same time, this does --- not block. It buffers the value, so it will be displayed once the other --- writer is done. -outputConcurrent :: Outputable v => v -> IO () -outputConcurrent v = bracket setup cleanup go - where - setup = tryTakeOutputLock - cleanup False = return () - cleanup True = dropOutputLock - go True = do - T.hPutStr stdout (toOutput v) - hFlush stdout - go False = do - let bv = outputBuffer globalOutputHandle - oldbuf <- atomically $ takeTMVar bv - newbuf <- addOutputBuffer (Output (toOutput v)) oldbuf - atomically $ putTMVar bv newbuf - --- | This must be used to wait for processes started with --- `createProcessConcurrent` and `createProcessForeground`. It may also be --- used to wait for processes started by `System.Process.createProcess`. --- --- This is necessary because `System.Process.waitForProcess` has a --- race condition when two threads check the same process. If the race --- is triggered, one thread will successfully wait, but the other --- throws a DoesNotExist exception. -waitForProcessConcurrent :: P.ProcessHandle -> IO ExitCode -waitForProcessConcurrent h = do - v <- tryWhenExists (P.waitForProcess h) - case v of - Just r -> return r - Nothing -> maybe (waitForProcessConcurrent h) return =<< P.getProcessExitCode h - --- | Wrapper around `System.Process.createProcess` that prevents --- multiple processes that are running concurrently from writing --- to stdout/stderr at the same time. --- --- If the process does not output to stdout or stderr, it's run --- by createProcess entirely as usual. Only processes that can generate --- output are handled specially: --- --- A process is allowed to write to stdout and stderr in the usual --- way, assuming it can successfully take the output lock. --- --- When the output lock is held (ie, by another concurrent process, --- or because `outputConcurrent` is being called at the same time), --- the process is instead run with its stdout and stderr --- redirected to a buffer. The buffered output will be displayed as soon --- as the output lock becomes free, or after the command has finished. -createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) -createProcessConcurrent p - | willOutput (P.std_out p) || willOutput (P.std_err p) = - ifM tryTakeOutputLock - ( fgProcess p - , bgProcess p - ) - | otherwise = P.createProcess p - --- | Wrapper around `System.Process.createProcess` that makes sure a process --- is run in the foreground, with direct access to stdout and stderr. --- Useful when eg, running an interactive process. --- --- If another process is already running in the foreground, this will block --- until it finishes. Background processes may continue to run while --- this process is running, and their output will be buffered until it --- exits. --- --- The obvious reason you might need to use this is in an example like this: --- --- > main = withConcurrentOutput $ --- > createProcessConcurrent (proc "ls" []) --- > `concurrently` createProcessForeground (proc "vim" []) --- --- Since vim is an interactive program, it needs to run in the foreground. --- If it were started by `createProcessConcurrent`, it would sometimes --- run in the background. --- --- Also, there is actually a race condition when calling --- `createProcessConcurrent` sequentially like this: --- --- > main = withConcurrentOutput $ do --- > (Nothing, Nothing, Nothing, h) <- createProcessConcurrent (proc "ls" []) --- > waitForProcessConcurrent h --- > createProcessConcurrent (proc "vim" []) --- --- Here vim runs about 50% of the time as a background process! Why is --- it not always run in the foreground? The reason is that the previous --- process was run in the foreground, and still holds the output lock. --- `waitForProcessConcurrent` waits for that process, but does not clear --- the output lock immediately. By the time the output lock does clear, --- the vim process may have already started up, in the background. --- --- It would be nice to fix that race, but it can't be fixed without --- an Eq instance for `ProcessHandle`. In any case, when you're using --- this module, you're typically actually doing concurrent things, --- not sequential as in the example above, and so even if the race were --- fixed, you'd still want to use `createProcessForeground` to run vim. -createProcessForeground :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) -createProcessForeground p = do - takeOutputLock - fgProcess p - -fgProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) -fgProcess p = do - r@(_, _, _, h) <- P.createProcess p - `onException` dropOutputLock - -- Wait for the process to exit and drop the lock. - void $ async $ do - void $ tryIO $ waitForProcessConcurrent h - dropOutputLock - return r - -bgProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) -bgProcess p = do - (toouth, fromouth) <- pipe - (toerrh, fromerrh) <- pipe - let p' = p - { P.std_out = rediroutput (P.std_out p) toouth - , P.std_err = rediroutput (P.std_err p) toerrh - } - registerOutputThread - r <- P.createProcess p' - `onException` unregisterOutputThread - outbuf <- setupOutputBuffer StdOut toouth (P.std_out p) fromouth - errbuf <- setupOutputBuffer StdErr toerrh (P.std_err p) fromerrh - void $ async $ bufferWriter [outbuf, errbuf] - return r - where - pipe = do - (from, to) <- createPipe - (,) <$> fdToHandle to <*> fdToHandle from - rediroutput ss h - | willOutput ss = P.UseHandle h - | otherwise = ss - -willOutput :: P.StdStream -> Bool -willOutput P.Inherit = True -willOutput _ = False - --- | Buffered output. -data OutputBuffer = OutputBuffer [OutputBufferedActivity] - deriving (Eq) - -data StdHandle = StdOut | StdErr - -toHandle :: StdHandle -> Handle -toHandle StdOut = stdout -toHandle StdErr = stderr - -bufferFor :: StdHandle -> TMVar OutputBuffer -bufferFor StdOut = outputBuffer globalOutputHandle -bufferFor StdErr = errorBuffer globalOutputHandle - -data OutputBufferedActivity - = Output T.Text - | InTempFile - { tempFile :: FilePath - , endsInNewLine :: Bool - } - deriving (Eq) - -data AtEnd = AtEnd - deriving Eq - -data BufSig = BufSig - -setupOutputBuffer :: StdHandle -> Handle -> P.StdStream -> Handle -> IO (StdHandle, MVar OutputBuffer, TMVar BufSig, TMVar AtEnd) -setupOutputBuffer h toh ss fromh = do - hClose toh - buf <- newMVar (OutputBuffer []) - bufsig <- atomically newEmptyTMVar - bufend <- atomically newEmptyTMVar - void $ async $ outputDrainer ss fromh buf bufsig bufend - return (h, buf, bufsig, bufend) - --- Drain output from the handle, and buffer it. -outputDrainer :: P.StdStream -> Handle -> MVar OutputBuffer -> TMVar BufSig -> TMVar AtEnd -> IO () -outputDrainer ss fromh buf bufsig bufend - | willOutput ss = go - | otherwise = atend - where - go = do - t <- T.hGetChunk fromh - if T.null t - then atend - else do - modifyMVar_ buf $ addOutputBuffer (Output t) - changed - go - atend = do - atomically $ putTMVar bufend AtEnd - hClose fromh - changed = atomically $ do - void $ tryTakeTMVar bufsig - putTMVar bufsig BufSig - -registerOutputThread :: IO () -registerOutputThread = do - let v = outputThreads globalOutputHandle - atomically $ putTMVar v . succ =<< takeTMVar v - -unregisterOutputThread :: IO () -unregisterOutputThread = do - let v = outputThreads globalOutputHandle - atomically $ putTMVar v . pred =<< takeTMVar v - --- Wait to lock output, and once we can, display everything --- that's put into the buffers, until the end. --- --- If end is reached before lock is taken, instead add the command's --- buffers to the global outputBuffer and errorBuffer. -bufferWriter :: [(StdHandle, MVar OutputBuffer, TMVar BufSig, TMVar AtEnd)] -> IO () -bufferWriter ts = do - activitysig <- atomically newEmptyTMVar - worker1 <- async $ lockOutput $ - ifM (atomically $ tryPutTMVar activitysig ()) - ( void $ mapConcurrently displaybuf ts - , noop -- buffers already moved to global - ) - worker2 <- async $ void $ globalbuf activitysig - void $ async $ do - void $ waitCatch worker1 - void $ waitCatch worker2 - unregisterOutputThread - where - displaybuf v@(outh, buf, bufsig, bufend) = do - change <- atomically $ - (Right <$> takeTMVar bufsig) - `orElse` - (Left <$> takeTMVar bufend) - l <- takeMVar buf - putMVar buf (OutputBuffer []) - emitOutputBuffer outh l - case change of - Right BufSig -> displaybuf v - Left AtEnd -> return () - globalbuf activitysig = do - ok <- atomically $ do - -- signal we're going to handle it - -- (returns false if the displaybuf already did) - ok <- tryPutTMVar activitysig () - -- wait for end of all buffers - when ok $ - mapM_ (\(_outh, _buf, _bufsig, bufend) -> takeTMVar bufend) ts - return ok - when ok $ do - -- add all of the command's buffered output to the - -- global output buffer, atomically - bs <- forM ts $ \(outh, buf, _bufsig, _bufend) -> - (outh,) <$> takeMVar buf - atomically $ - forM_ bs $ \(outh, b) -> - bufferOutputSTM' outh b - --- Adds a value to the OutputBuffer. When adding Output to a Handle, --- it's cheaper to combine it with any already buffered Output to that --- same Handle. --- --- When the total buffered Output exceeds 1 mb in size, it's moved out of --- memory, to a temp file. This should only happen rarely, but is done to --- avoid some verbose process unexpectedly causing excessive memory use. -addOutputBuffer :: OutputBufferedActivity -> OutputBuffer -> IO OutputBuffer -addOutputBuffer (Output t) (OutputBuffer buf) - | T.length t' <= 1048576 = return $ OutputBuffer (Output t' : other) - | otherwise = do - tmpdir <- getTemporaryDirectory - (tmp, h) <- openTempFile tmpdir "output.tmp" - let !endnl = endsNewLine t' - let i = InTempFile - { tempFile = tmp - , endsInNewLine = endnl - } - T.hPutStr h t' - hClose h - return $ OutputBuffer (i : other) - where - !t' = T.concat (mapMaybe getOutput this) <> t - !(this, other) = partition isOutput buf - isOutput v = case v of - Output _ -> True - _ -> False - getOutput v = case v of - Output t'' -> Just t'' - _ -> Nothing -addOutputBuffer v (OutputBuffer buf) = return $ OutputBuffer (v:buf) - --- | Adds a value to the output buffer for later display. --- --- Note that buffering large quantities of data this way will keep it --- resident in memory until it can be displayed. While `outputConcurrent` --- uses temp files if the buffer gets too big, this STM function cannot do --- so. -bufferOutputSTM :: Outputable v => StdHandle -> v -> STM () -bufferOutputSTM h v = bufferOutputSTM' h (OutputBuffer [Output (toOutput v)]) - -bufferOutputSTM' :: StdHandle -> OutputBuffer -> STM () -bufferOutputSTM' h (OutputBuffer newbuf) = do - (OutputBuffer buf) <- takeTMVar bv - putTMVar bv (OutputBuffer (newbuf ++ buf)) - where - bv = bufferFor h - --- | A STM action that waits for some buffered output to become --- available, and returns it. --- --- The function can select a subset of output when only some is desired; --- the fst part is returned and the snd is left in the buffer. --- --- This will prevent it from being displayed in the usual way, so you'll --- need to use `emitOutputBuffer` to display it yourself. -outputBufferWaiterSTM :: (OutputBuffer -> (OutputBuffer, OutputBuffer)) -> STM [(StdHandle, OutputBuffer)] -outputBufferWaiterSTM selector = do - bs <- forM hs $ \h -> do - let bv = bufferFor h - (selected, rest) <- selector <$> takeTMVar bv - putTMVar bv rest - return selected - if all (== OutputBuffer []) bs - then retry - else do - return (zip hs bs) - where - hs = [StdOut, StdErr] - -waitAnyBuffer :: OutputBuffer -> (OutputBuffer, OutputBuffer) -waitAnyBuffer b = (b, OutputBuffer []) - --- | Use with `outputBufferWaiterSTM` to make it only return buffered --- output that ends with a newline. Anything buffered without a newline --- is left in the buffer. -waitCompleteLines :: OutputBuffer -> (OutputBuffer, OutputBuffer) -waitCompleteLines (OutputBuffer l) = - let (selected, rest) = span completeline l - in (OutputBuffer selected, OutputBuffer rest) - where - completeline (v@(InTempFile {})) = endsInNewLine v - completeline (Output b) = endsNewLine b - -endsNewLine :: T.Text -> Bool -endsNewLine t = not (T.null t) && T.last t == '\n' - --- | Emits the content of the OutputBuffer to the Handle --- --- If you use this, you should use `lockOutput` to ensure you're the only --- thread writing to the console. -emitOutputBuffer :: StdHandle -> OutputBuffer -> IO () -emitOutputBuffer stdh (OutputBuffer l) = - forM_ (reverse l) $ \ba -> case ba of - Output t -> emit t - InTempFile tmp _ -> do - emit =<< T.readFile tmp - void $ tryWhenExists $ removeFile tmp - where - outh = toHandle stdh - emit t = void $ tryIO $ do - T.hPutStr outh t - hFlush outh diff --git a/src/Utility/Process/Shim.hs b/src/Utility/Process/Shim.hs index 08694d5d..8c9d41d0 100644 --- a/src/Utility/Process/Shim.hs +++ b/src/Utility/Process/Shim.hs @@ -1,12 +1,4 @@ module Utility.Process.Shim (module X, createProcess, waitForProcess) where import System.Process as X hiding (createProcess, waitForProcess) -import Utility.ConcurrentOutput (createProcessConcurrent, waitForProcessConcurrent) -import System.IO -import System.Exit - -createProcess :: CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -createProcess = createProcessConcurrent - -waitForProcess :: ProcessHandle -> IO ExitCode -waitForProcess = waitForProcessConcurrent +import System.Process.Concurrent |
