WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content

Commit e8a2aee

Browse files
authored
Merge pull request #164 from jasagredo/js/debug-auto-label
Cabal flag to control auto-labelling of threads
2 parents a7753ed + f4e6b12 commit e8a2aee

File tree

2 files changed

+129
-40
lines changed

2 files changed

+129
-40
lines changed

Control/Concurrent/Async/Internal.hs

Lines changed: 117 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,16 @@ import GHC.Exts
5757
import GHC.IO hiding (finally, onException)
5858
import GHC.Conc (ThreadId(..))
5959

60+
#ifdef DEBUG_AUTO_LABEL
61+
import qualified GHC.Stack
62+
#endif
63+
64+
#ifdef DEBUG_AUTO_LABEL
65+
#define CALLSTACK GHC.Stack.HasCallStack =>
66+
#else
67+
#define CALLSTACK
68+
#endif
69+
6070
-- -----------------------------------------------------------------------------
6171
-- STM Async API
6272

@@ -95,40 +105,53 @@ compareAsyncs (Async t1 _) (Async t2 _) = compare t1 t2
95105
-- (see module-level documentation for details).
96106
--
97107
-- __Use 'withAsync' style functions wherever you can instead!__
98-
async :: IO a -> IO (Async a)
108+
async ::
109+
CALLSTACK
110+
IO a -> IO (Async a)
99111
async = inline asyncUsing rawForkIO
100112

101113
-- | Like 'async' but using 'forkOS' internally.
102-
asyncBound :: IO a -> IO (Async a)
114+
asyncBound ::
115+
CALLSTACK
116+
IO a -> IO (Async a)
103117
asyncBound = asyncUsing forkOS
104118

105119
-- | Like 'async' but using 'forkOn' internally.
106-
asyncOn :: Int -> IO a -> IO (Async a)
120+
asyncOn ::
121+
CALLSTACK
122+
Int -> IO a -> IO (Async a)
107123
asyncOn = asyncUsing . rawForkOn
108124

109125
-- | Like 'async' but using 'forkIOWithUnmask' internally. The child
110126
-- thread is passed a function that can be used to unmask asynchronous
111127
-- exceptions.
112-
asyncWithUnmask :: ((forall b . IO b -> IO b) -> IO a) -> IO (Async a)
128+
asyncWithUnmask ::
129+
CALLSTACK
130+
((forall b . IO b -> IO b) -> IO a) -> IO (Async a)
113131
asyncWithUnmask actionWith = asyncUsing rawForkIO (actionWith unsafeUnmask)
114132

115133
-- | Like 'asyncOn' but using 'forkOnWithUnmask' internally. The
116134
-- child thread is passed a function that can be used to unmask
117135
-- asynchronous exceptions.
118-
asyncOnWithUnmask :: Int -> ((forall b . IO b -> IO b) -> IO a) -> IO (Async a)
136+
asyncOnWithUnmask ::
137+
CALLSTACK
138+
Int -> ((forall b . IO b -> IO b) -> IO a) -> IO (Async a)
119139
asyncOnWithUnmask cpu actionWith =
120140
asyncUsing (rawForkOn cpu) (actionWith unsafeUnmask)
121141

122-
asyncUsing :: (IO () -> IO ThreadId)
123-
-> IO a -> IO (Async a)
142+
asyncUsing ::
143+
CALLSTACK
144+
(IO () -> IO ThreadId) -> IO a -> IO (Async a)
124145
asyncUsing doFork = \action -> do
125146
var <- newEmptyTMVarIO
147+
let action_plus = debugLabelMe >> action
126148
-- t <- forkFinally action (\r -> atomically $ putTMVar var r)
127149
-- slightly faster:
128150
t <- mask $ \restore ->
129-
doFork $ try (restore action) >>= atomically . putTMVar var
151+
doFork $ try (restore action_plus) >>= atomically . putTMVar var
130152
return (Async t (readTMVar var))
131153

154+
132155
-- | Spawn an asynchronous action in a separate thread, and pass its
133156
-- @Async@ handle to the supplied function. When the function returns
134157
-- or throws an exception, 'uninterruptibleCancel' is called on the @Async@.
@@ -144,41 +167,51 @@ asyncUsing doFork = \action -> do
144167
-- to `withAsync` returns, so nesting many `withAsync` calls requires
145168
-- linear memory.
146169
--
147-
withAsync :: IO a -> (Async a -> IO b) -> IO b
170+
withAsync ::
171+
CALLSTACK
172+
IO a -> (Async a -> IO b) -> IO b
148173
withAsync = inline withAsyncUsing rawForkIO
149174

150175
-- | Like 'withAsync' but uses 'forkOS' internally.
151-
withAsyncBound :: IO a -> (Async a -> IO b) -> IO b
176+
withAsyncBound ::
177+
CALLSTACK
178+
IO a -> (Async a -> IO b) -> IO b
152179
withAsyncBound = withAsyncUsing forkOS
153180

154181
-- | Like 'withAsync' but uses 'forkOn' internally.
155-
withAsyncOn :: Int -> IO a -> (Async a -> IO b) -> IO b
182+
withAsyncOn ::
183+
CALLSTACK
184+
Int -> IO a -> (Async a -> IO b) -> IO b
156185
withAsyncOn = withAsyncUsing . rawForkOn
157186

158187
-- | Like 'withAsync' but uses 'forkIOWithUnmask' internally. The
159188
-- child thread is passed a function that can be used to unmask
160189
-- asynchronous exceptions.
161-
withAsyncWithUnmask
162-
:: ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
190+
withAsyncWithUnmask ::
191+
CALLSTACK
192+
((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
163193
withAsyncWithUnmask actionWith =
164194
withAsyncUsing rawForkIO (actionWith unsafeUnmask)
165195

166196
-- | Like 'withAsyncOn' but uses 'forkOnWithUnmask' internally. The
167197
-- child thread is passed a function that can be used to unmask
168198
-- asynchronous exceptions
169-
withAsyncOnWithUnmask
170-
:: Int -> ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
199+
withAsyncOnWithUnmask ::
200+
CALLSTACK
201+
Int -> ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
171202
withAsyncOnWithUnmask cpu actionWith =
172203
withAsyncUsing (rawForkOn cpu) (actionWith unsafeUnmask)
173204

174-
withAsyncUsing :: (IO () -> IO ThreadId)
175-
-> IO a -> (Async a -> IO b) -> IO b
205+
withAsyncUsing ::
206+
CALLSTACK
207+
(IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
176208
-- The bracket version works, but is slow. We can do better by
177209
-- hand-coding it:
178210
withAsyncUsing doFork = \action inner -> do
179211
var <- newEmptyTMVarIO
180212
mask $ \restore -> do
181-
t <- doFork $ try (restore action) >>= atomically . putTMVar var
213+
let action_plus = debugLabelMe >> action
214+
t <- doFork $ try (restore action_plus) >>= atomically . putTMVar var
182215
let a = Async t (readTMVar var)
183216
r <- restore (inner a) `catchAll` \e -> do
184217
uninterruptibleCancel a
@@ -554,11 +587,15 @@ isCancel e
554587
-- > withAsync right $ \b ->
555588
-- > waitEither a b
556589
--
557-
race :: IO a -> IO b -> IO (Either a b)
590+
race ::
591+
CALLSTACK
592+
IO a -> IO b -> IO (Either a b)
558593

559594
-- | Like 'race', but the result is ignored.
560595
--
561-
race_ :: IO a -> IO b -> IO ()
596+
race_ ::
597+
CALLSTACK
598+
IO a -> IO b -> IO ()
562599

563600

564601
-- | Run two @IO@ actions concurrently, and return both results. If
@@ -570,19 +607,25 @@ race_ :: IO a -> IO b -> IO ()
570607
-- > withAsync left $ \a ->
571608
-- > withAsync right $ \b ->
572609
-- > waitBoth a b
573-
concurrently :: IO a -> IO b -> IO (a,b)
610+
concurrently ::
611+
CALLSTACK
612+
IO a -> IO b -> IO (a,b)
574613

575614

576615
-- | Run two @IO@ actions concurrently. If both of them end with @Right@,
577616
-- return both results. If one of then ends with @Left@, interrupt the other
578617
-- action and return the @Left@.
579618
--
580-
concurrentlyE :: IO (Either e a) -> IO (Either e b) -> IO (Either e (a, b))
619+
concurrentlyE ::
620+
CALLSTACK
621+
IO (Either e a) -> IO (Either e b) -> IO (Either e (a, b))
581622

582623
-- | 'concurrently', but ignore the result values
583624
--
584625
-- @since 2.1.1
585-
concurrently_ :: IO a -> IO b -> IO ()
626+
concurrently_ ::
627+
CALLSTACK
628+
IO a -> IO b -> IO ()
586629

587630
#define USE_ASYNC_VERSIONS 0
588631

@@ -643,9 +686,11 @@ concurrentlyE left right = concurrently' left right (collect [])
643686
Left ex -> throwIO ex
644687
Right r -> collect (r:xs) m
645688

646-
concurrently' :: IO a -> IO b
647-
-> (IO (Either SomeException (Either a b)) -> IO r)
648-
-> IO r
689+
concurrently' ::
690+
CALLSTACK
691+
IO a -> IO b
692+
-> (IO (Either SomeException (Either a b)) -> IO r)
693+
-> IO r
649694
concurrently' left right collect = do
650695
done <- newEmptyMVar
651696
mask $ \restore -> do
@@ -721,37 +766,49 @@ concurrently_ left right = concurrently' left right (collect 0)
721766
-- for each element of the @Traversable@, so running this on large
722767
-- inputs without care may lead to resource exhaustion (of memory,
723768
-- file descriptors, or other limited resources).
724-
mapConcurrently :: Traversable t => (a -> IO b) -> t a -> IO (t b)
769+
mapConcurrently ::
770+
CALLSTACK
771+
Traversable t => (a -> IO b) -> t a -> IO (t b)
725772
mapConcurrently f = runConcurrently . traverse (Concurrently . f)
726773

727774
-- | `forConcurrently` is `mapConcurrently` with its arguments flipped
728775
--
729776
-- > pages <- forConcurrently ["url1", "url2", "url3"] $ \url -> getURL url
730777
--
731778
-- @since 2.1.0
732-
forConcurrently :: Traversable t => t a -> (a -> IO b) -> IO (t b)
779+
forConcurrently ::
780+
CALLSTACK
781+
Traversable t => t a -> (a -> IO b) -> IO (t b)
733782
forConcurrently = flip mapConcurrently
734783

735784
-- | `mapConcurrently_` is `mapConcurrently` with the return value discarded;
736785
-- a concurrent equivalent of 'mapM_'.
737-
mapConcurrently_ :: F.Foldable f => (a -> IO b) -> f a -> IO ()
786+
mapConcurrently_ ::
787+
CALLSTACK
788+
F.Foldable f => (a -> IO b) -> f a -> IO ()
738789
mapConcurrently_ f = runConcurrently . F.foldMap (Concurrently . void . f)
739790

740791
-- | `forConcurrently_` is `forConcurrently` with the return value discarded;
741792
-- a concurrent equivalent of 'forM_'.
742-
forConcurrently_ :: F.Foldable f => f a -> (a -> IO b) -> IO ()
793+
forConcurrently_ ::
794+
CALLSTACK
795+
F.Foldable f => f a -> (a -> IO b) -> IO ()
743796
forConcurrently_ = flip mapConcurrently_
744797

745798
-- | Perform the action in the given number of threads.
746799
--
747800
-- @since 2.1.1
748-
replicateConcurrently :: Int -> IO a -> IO [a]
801+
replicateConcurrently ::
802+
CALLSTACK
803+
Int -> IO a -> IO [a]
749804
replicateConcurrently cnt = runConcurrently . sequenceA . replicate cnt . Concurrently
750805

751806
-- | Same as 'replicateConcurrently', but ignore the results.
752807
--
753808
-- @since 2.1.1
754-
replicateConcurrently_ :: Int -> IO a -> IO ()
809+
replicateConcurrently_ ::
810+
CALLSTACK
811+
Int -> IO a -> IO ()
755812
replicateConcurrently_ cnt = runConcurrently . F.fold . replicate cnt . Concurrently . void
756813

757814
-- -----------------------------------------------------------------------------
@@ -845,14 +902,16 @@ instance (Semigroup a, Monoid a) => Monoid (ConcurrentlyE e a) where
845902
-- | Fork a thread that runs the supplied action, and if it raises an
846903
-- exception, re-runs the action. The thread terminates only when the
847904
-- action runs to completion without raising an exception.
848-
forkRepeat :: IO a -> IO ThreadId
905+
forkRepeat ::
906+
CALLSTACK
907+
IO a -> IO ThreadId
849908
forkRepeat action =
850909
mask $ \restore ->
851910
let go = do r <- tryAll (restore action)
852911
case r of
853912
Left _ -> go
854913
_ -> return ()
855-
in forkIO go
914+
in forkIO (debugLabelMe >> go)
856915

857916
catchAll :: IO a -> (SomeException -> IO a) -> IO a
858917
catchAll = catch
@@ -864,11 +923,29 @@ tryAll = try
864923
-- handler: saves a bit of time when we will be installing our own
865924
-- exception handler.
866925
{-# INLINE rawForkIO #-}
867-
rawForkIO :: IO () -> IO ThreadId
868-
rawForkIO (IO action) = IO $ \ s ->
869-
case (fork# action s) of (# s1, tid #) -> (# s1, ThreadId tid #)
926+
rawForkIO ::
927+
CALLSTACK
928+
IO () -> IO ThreadId
929+
rawForkIO action = IO $ \ s ->
930+
case (fork# action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
931+
where
932+
(IO action_plus) = debugLabelMe >> action
870933

871934
{-# INLINE rawForkOn #-}
872-
rawForkOn :: Int -> IO () -> IO ThreadId
873-
rawForkOn (I# cpu) (IO action) = IO $ \ s ->
874-
case (forkOn# cpu action s) of (# s1, tid #) -> (# s1, ThreadId tid #)
935+
rawForkOn ::
936+
CALLSTACK
937+
Int -> IO () -> IO ThreadId
938+
rawForkOn (I# cpu) action = IO $ \ s ->
939+
case (forkOn# cpu action_plus s) of (# s1, tid #) -> (# s1, ThreadId tid #)
940+
where
941+
(IO action_plus) = debugLabelMe >> action
942+
943+
debugLabelMe ::
944+
CALLSTACK
945+
IO ()
946+
debugLabelMe =
947+
#ifdef DEBUG_AUTO_LABEL
948+
myThreadId >>= flip labelThread (GHC.Stack.prettyCallStack callStack)
949+
#else
950+
pure ()
951+
#endif

async.cabal

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,16 @@ source-repository head
6464
type: git
6565
location: https://github.com/simonmar/async.git
6666

67+
flag debug-auto-label
68+
description:
69+
Strictly for debugging as it might have a non-negligible overhead.
70+
71+
Enabling this flag will auto-label the threads spawned by @async@. Use it to
72+
find where are unlabelled threads spawned in your program (be it your code or
73+
dependency code).
74+
default: False
75+
manual: True
76+
6777
library
6878
default-language: Haskell2010
6979
other-extensions: CPP, MagicHash, RankNTypes, UnboxedTuples
@@ -74,6 +84,8 @@ library
7484
build-depends: base >= 4.3 && < 4.22,
7585
hashable >= 1.1.2.0 && < 1.6,
7686
stm >= 2.2 && < 2.6
87+
if flag(debug-auto-label)
88+
cpp-options: -DDEBUG_AUTO_LABEL
7789

7890
test-suite test-async
7991
default-language: Haskell2010

0 commit comments

Comments
 (0)