1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
|
{-# LANGUAGE PackageImports #-}
-- | This module handles all display of output to the console when
-- propellor is ensuring Properties.
--
-- When two threads both try to display a message concurrently,
-- the messages will be displayed sequentially.
module Propellor.Message (
getMessageHandle,
isConsole,
forceConsole,
actionMessage,
actionMessageOn,
warningMessage,
infoMessage,
errorMessage,
processChainOutput,
messagesDone,
createProcessConcurrent,
) where
import System.Console.ANSI
import System.IO
import System.Posix.IO
import "mtl" Control.Monad.Reader
import Control.Applicative
import Control.Monad.IfElse
import System.IO.Unsafe (unsafePerformIO)
import Control.Concurrent
import Control.Concurrent.Async
import Data.Maybe
import Data.Char
import Data.List
import Data.Monoid
import qualified Data.ByteString as B
import qualified System.Process as P
import Propellor.Types
import Utility.PartialPrelude
import Utility.Monad
import Utility.Exception
data MessageHandle = MessageHandle
{ isConsole :: Bool
, outputLock :: MVar () -- ^ empty when locked
, outputLockedBy :: MVar Locker
}
data Locker
= GeneralLock
| ProcessLock P.ProcessHandle
-- | A shared global variable for the MessageHandle.
{-# NOINLINE globalMessageHandle #-}
globalMessageHandle :: MVar MessageHandle
globalMessageHandle = unsafePerformIO $
newMVar =<< MessageHandle
<$> hIsTerminalDevice stdout
<*> newMVar ()
<*> newEmptyMVar
-- | Gets the global MessageHandle.
getMessageHandle :: IO MessageHandle
getMessageHandle = readMVar globalMessageHandle
-- | Takes a lock while performing an action. Any other threads
-- that try to lockOutput at the same time will block.
lockOutput :: (MonadIO m, MonadMask m) => m a -> m a
lockOutput = bracket_ (liftIO takeOutputLock) (liftIO dropOutputLock)
-- | Blocks until we have the output lock.
takeOutputLock :: IO ()
takeOutputLock = void $ takeOutputLock' True
-- | Tries to take the output lock, without blocking.
tryTakeOutputLock :: IO Bool
tryTakeOutputLock = takeOutputLock' False
takeOutputLock' :: Bool -> IO Bool
takeOutputLock' block = do
lck <- outputLock <$> getMessageHandle
go =<< tryTakeMVar lck
where
-- lck was full, and we've emptied it, so we hold the lock now.
go (Just ()) = havelock
-- lck is empty, so someone else is holding the lock.
go Nothing = do
lcker <- outputLockedBy <$> getMessageHandle
v' <- tryTakeMVar lcker
case v' of
Just (ProcessLock h) ->
-- if process has exited, lock is stale
ifM (isJust <$> P.getProcessExitCode h)
( havelock
, if block
then do
void $ P.waitForProcess h
havelock
else do
putMVar lcker (ProcessLock h)
return False
)
Just GeneralLock -> do
putMVar lcker GeneralLock
whenblock waitlock
Nothing -> whenblock waitlock
havelock = do
updateOutputLocker GeneralLock
return True
waitlock = do
-- Wait for current lock holder to relinquish
-- it and take the lock.
lck <- outputLock <$> getMessageHandle
takeMVar lck
havelock
whenblock a = if block then a else return False
-- | Only safe to call after taking the output lock.
dropOutputLock :: IO ()
dropOutputLock = do
lcker <- outputLockedBy <$> getMessageHandle
lck <- outputLock <$> getMessageHandle
void $ takeMVar lcker
putMVar lck ()
-- | Only safe to call after takeOutputLock; updates the Locker.
updateOutputLocker :: Locker -> IO ()
updateOutputLocker l = do
lcker <- outputLockedBy <$> getMessageHandle
void $ tryTakeMVar lcker
putMVar lcker l
modifyMVar_ lcker (const $ return l)
-- | Force console output. This can be used when stdout is not directly
-- connected to a console, but is eventually going to be displayed at a
-- console.
forceConsole :: IO ()
forceConsole = modifyMVar_ globalMessageHandle $ \mh ->
pure (mh { isConsole = True })
-- | Only performs the action when at the console, or when console
-- output has been forced.
whenConsole :: IO () -> IO ()
whenConsole a = whenM (isConsole <$> getMessageHandle) a
-- | Shows a message while performing an action, with a colored status
-- display.
actionMessage :: (MonadIO m, MonadMask m, ActionResult r) => Desc -> m r -> m r
actionMessage = actionMessage' Nothing
-- | Shows a message while performing an action on a specified host,
-- with a colored status display.
actionMessageOn :: (MonadIO m, MonadMask m, ActionResult r) => HostName -> Desc -> m r -> m r
actionMessageOn = actionMessage' . Just
actionMessage' :: (MonadIO m, MonadMask m, ActionResult r) => Maybe HostName -> Desc -> m r -> m r
actionMessage' mhn desc a = do
liftIO $ whenConsole $ lockOutput $ do
setTitle $ "propellor: " ++ desc
hFlush stdout
r <- a
liftIO $ lockOutput $ do
whenConsole $
setTitle "propellor: running"
showhn mhn
putStr $ desc ++ " ... "
let (msg, intensity, color) = getActionResult r
colorLine intensity color msg
hFlush stdout
return r
where
showhn Nothing = return ()
showhn (Just hn) = do
whenConsole $
setSGR [SetColor Foreground Dull Cyan]
putStr (hn ++ " ")
whenConsole $
setSGR []
warningMessage :: MonadIO m => String -> m ()
warningMessage s = liftIO $ lockOutput $
colorLine Vivid Magenta $ "** warning: " ++ s
infoMessage :: MonadIO m => [String] -> m ()
infoMessage ls = liftIO $ lockOutput $
mapM_ putStrLn ls
errorMessage :: MonadIO m => String -> m a
errorMessage s = liftIO $ lockOutput $ do
colorLine Vivid Red $ "** error: " ++ s
error "Cannot continue!"
colorLine :: ColorIntensity -> Color -> String -> IO ()
colorLine intensity color msg = do
whenConsole $
setSGR [SetColor Foreground intensity color]
putStr msg
whenConsole $
setSGR []
-- Note this comes after the color is reset, so that
-- the color set and reset happen in the same line.
putStrLn ""
hFlush stdout
-- | Reads and displays each line from the Handle, except for the last line
-- which is a Result.
processChainOutput :: Handle -> IO Result
processChainOutput h = go Nothing
where
go lastline = do
v <- catchMaybeIO (hGetLine h)
case v of
Nothing -> case lastline of
Nothing -> do
return FailedChange
Just l -> case readish l of
Just r -> pure r
Nothing -> do
lockOutput $ do
putStrLn l
hFlush stdout
return FailedChange
Just s -> do
lockOutput $ do
maybe noop (\l -> unless (null l) (putStrLn l)) lastline
hFlush stdout
go (Just s)
-- | Called when all messages about properties have been printed.
messagesDone :: IO ()
messagesDone = lockOutput $ do
whenConsole $
setTitle "propellor: done"
hFlush stdout
-- | Wrapper around `System.Process.createProcess` that prevents
-- multiple processes that are running concurrently from writing
-- to stdout/stderr at the same time.
--
-- The first process is allowed to write to
-- stdout and stderr in the usual way.
--
-- However, if another process runs concurrently with the
-- first, any stdout or stderr that would have been displayed by it is
-- instead buffered. The buffered output will be displayed the next time it
-- is safe to do so (ie, after the first process exits).
--
-- Also does debug logging of all commands run.
--
-- Unless you manually import System.Process, every part of propellor
-- that runs a process uses this.
createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle)
createProcessConcurrent p
| hasoutput (P.std_out p) || hasoutput (P.std_err p) =
ifM tryTakeOutputLock
( firstprocess
, concurrentprocess
)
| otherwise = P.createProcess p
where
hasoutput P.Inherit = True
hasoutput _ = False
firstprocess = do
r@(_, _, _, h) <- P.createProcess p
`onException` dropOutputLock
updateOutputLocker (ProcessLock h)
-- Output lock is still held as we return; the process
-- is running now, and once it exits the output lock will
-- be stale and can then be taken by something else.
return r
concurrentprocess = do
(toouth, fromouth) <- pipe
(toerrh, fromerrh) <- pipe
let p' = p
{ P.std_out = if hasoutput (P.std_out p)
then P.UseHandle toouth
else P.std_out p
, P.std_err = if hasoutput (P.std_err p)
then P.UseHandle toerrh
else P.std_err p
}
r <- P.createProcess p'
hClose toouth
hClose toerrh
buf <- newMVar []
void $ async $ outputDrainer fromouth stdout buf
void $ async $ outputDrainer fromerrh stderr buf
void $ async $ bufferWriter buf
return r
pipe = do
(from, to) <- createPipe
(,) <$> fdToHandle to <*> fdToHandle from
type Buffer = [(Handle, Maybe B.ByteString)]
-- Drain output from the handle, and buffer it in memory.
outputDrainer :: Handle -> Handle -> MVar Buffer -> IO ()
outputDrainer fromh toh buf = do
v <- tryIO $ B.hGetSome fromh 1024
case v of
Right b | not (B.null b) -> do
modifyMVar_ buf (pure . addBuffer (toh, Just b))
outputDrainer fromh toh buf
_ -> do
modifyMVar_ buf (pure . (++ [(toh, Nothing)]))
hClose fromh
-- Wait to lock output, and once we can, display everything
-- that's put into buffer, until the end is signaled by Nothing
-- for both stdout and stderr.
bufferWriter :: MVar Buffer -> IO ()
bufferWriter buf = lockOutput (go [stdout, stderr])
where
go [] = return ()
go hs = do
l <- takeMVar buf
forM_ l $ \(h, mb) -> do
maybe noop (B.hPut h) mb
hFlush h
let hs' = filter (\h -> not (any (== (h, Nothing)) l)) hs
putMVar buf []
go hs'
-- The buffer can grow up to 1 mb in size, but after that point,
-- it's truncated to avoid propellor using unbounded memory
-- when a process outputs a whole lot of stuff.
bufsz :: Int
bufsz = 1000000
addBuffer :: (Handle, Maybe B.ByteString) -> Buffer -> Buffer
addBuffer v@(_, Nothing) buf = buf ++ [v]
addBuffer (toh, Just b) buf = (toh, Just b') : other
where
(this, other) = partition (\v -> fst v == toh && isJust (snd v)) buf
b' = truncateBuffer $ B.concat (mapMaybe snd this) <> b
-- Truncate a buffer by removing lines from the front until it's
-- small enough.
truncateBuffer :: B.ByteString -> B.ByteString
truncateBuffer b
| B.length b <= bufsz = b
| otherwise = truncateBuffer $ snd $ B.breakByte nl b
where
nl = fromIntegral (ord '\n')
|