1 /**
2  * The condition module provides a primitive for synchronized condition
3  * checking.
4  *
5  * Copyright: Copyright Sean Kelly 2005 - 2009.
6  * License:   $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0)
7  * Authors:   Sean Kelly
8  * Source:    $(DRUNTIMESRC core/sync/_condition.d)
9  */
10 
11 /*          Copyright Sean Kelly 2005 - 2009.
12  * Distributed under the Boost Software License, Version 1.0.
13  *    (See accompanying file LICENSE or copy at
14  *          http://www.boost.org/LICENSE_1_0.txt)
15  */
16 module hunt.pool.impl.Condition;
17 
18 import core.sync.exception;
19 import hunt.pool.impl.Mutex;
20 import core.time;
21 
22 version (Windows)
23 {
24     private import core.sync.semaphore;
25     private import core.sys.windows.basetsd /+: HANDLE+/;
26     private import core.sys.windows.winbase /+: CloseHandle, CreateSemaphoreA, CRITICAL_SECTION,
27         DeleteCriticalSection, EnterCriticalSection, INFINITE, InitializeCriticalSection,
28         LeaveCriticalSection, ReleaseSemaphore, WAIT_OBJECT_0, WaitForSingleObject+/;
29     private import core.sys.windows.windef /+: BOOL, DWORD+/;
30     private import core.sys.windows.winerror /+: WAIT_TIMEOUT+/;
31 }
32 else version (Posix)
33 {
34     private import core.sync.config;
35     private import core.stdc.errno;
36     private import core.sys.posix.pthread;
37     private import core.sys.posix.time;
38 }
39 else
40 {
41     static assert(false, "Platform not supported");
42 }
43 
44 
45 ////////////////////////////////////////////////////////////////////////////////
46 // Condition
47 //
48 // void wait();
49 // void notify();
50 // void notifyAll();
51 ////////////////////////////////////////////////////////////////////////////////
52 
53 
54 /**
55  * This class represents a condition variable as conceived by C.A.R. Hoare.  As
56  * per Mesa type monitors however, "signal" has been replaced with "notify" to
57  * indicate that control is not transferred to the waiter when a notification
58  * is sent.
59  */
60 class Condition
61 {
62     ////////////////////////////////////////////////////////////////////////////
63     // Initialization
64     ////////////////////////////////////////////////////////////////////////////
65 
66     /**
67      * Initializes a condition object which is associated with the supplied
68      * mutex object.
69      *
70      * Params:
71      *  m = The mutex with which this condition will be associated.
72      *
73      * Throws:
74      *  SyncError on error.
75      */
76     this( Mutex m ) nothrow @safe
77     {
78         version (Windows)
79         {
80             m_blockLock = CreateSemaphoreA( null, 1, 1, null );
81             if ( m_blockLock == m_blockLock.init )
82                 throw new SyncError( "Unable to initialize condition" );
83             scope(failure) CloseHandle( m_blockLock );
84 
85             m_blockQueue = CreateSemaphoreA( null, 0, int.max, null );
86             if ( m_blockQueue == m_blockQueue.init )
87                 throw new SyncError( "Unable to initialize condition" );
88             scope(failure) CloseHandle( m_blockQueue );
89 
90             InitializeCriticalSection( &m_unblockLock );
91             m_assocMutex = m;
92         }
93         else version (Posix)
94         {
95             m_assocMutex = m;
96             static if ( is( typeof( pthread_condattr_setclock ) ) )
97             {
98                 () @trusted
99                 {
100                     pthread_condattr_t attr = void;
101                     int rc  = pthread_condattr_init( &attr );
102                     if ( rc )
103                         throw new SyncError( "Unable to initialize condition" );
104                     rc = pthread_condattr_setclock( &attr, CLOCK_MONOTONIC );
105                     if ( rc )
106                         throw new SyncError( "Unable to initialize condition" );
107                     rc = pthread_cond_init( &m_hndl, &attr );
108                     if ( rc )
109                         throw new SyncError( "Unable to initialize condition" );
110                     rc = pthread_condattr_destroy( &attr );
111                     if ( rc )
112                         throw new SyncError( "Unable to initialize condition" );
113                 } ();
114             }
115             else
116             {
117                 int rc = pthread_cond_init( &m_hndl, null );
118                 if ( rc )
119                     throw new SyncError( "Unable to initialize condition" );
120             }
121         }
122     }
123 
124 
125     ~this()
126     {
127         version (Windows)
128         {
129             BOOL rc = CloseHandle( m_blockLock );
130             assert( rc, "Unable to destroy condition" );
131             rc = CloseHandle( m_blockQueue );
132             assert( rc, "Unable to destroy condition" );
133             DeleteCriticalSection( &m_unblockLock );
134         }
135         else version (Posix)
136         {
137             int rc = pthread_cond_destroy( &m_hndl );
138             assert( !rc, "Unable to destroy condition" );
139         }
140     }
141 
142 
143     ////////////////////////////////////////////////////////////////////////////
144     // General Properties
145     ////////////////////////////////////////////////////////////////////////////
146 
147 
148     /**
149      * Gets the mutex associated with this condition.
150      *
151      * Returns:
152      *  The mutex associated with this condition.
153      */
154     @property Mutex mutex()
155     {
156         return m_assocMutex;
157     }
158 
159     // undocumented function for internal use
160     final @property Mutex mutex_nothrow() pure nothrow @safe @nogc
161     {
162         return m_assocMutex;
163     }
164 
165 
166     ////////////////////////////////////////////////////////////////////////////
167     // General Actions
168     ////////////////////////////////////////////////////////////////////////////
169 
170 
171     /**
172      * Wait until notified.
173      *
174      * Throws:
175      *  SyncError on error.
176      */
177     void wait()
178     {
179         m_numWaitersBlocked++;
180         scope(exit) {
181             m_numWaitersBlocked--;
182         }
183 
184         version (Windows)
185         {
186             timedWait( INFINITE );
187         }
188         else version (Posix)
189         {
190             int rc = pthread_cond_wait( &m_hndl, m_assocMutex.handleAddr() );
191             if ( rc )
192                 throw new SyncError( "Unable to wait for condition" );
193         }
194     }
195 
196 
197     /**
198      * Suspends the calling thread until a notification occurs or until the
199      * supplied time period has elapsed.
200      *
201      * Params:
202      *  val = The time to wait.
203      *
204      * In:
205      *  val must be non-negative.
206      *
207      * Throws:
208      *  SyncError on error.
209      *
210      * Returns:
211      *  true if notified before the timeout and false if not.
212      */
213     bool wait( Duration val )
214     in
215     {
216         assert( !val.isNegative );
217     }
218     do
219     {
220         
221         m_numWaitersBlocked++;
222         scope(exit) {
223             m_numWaitersBlocked--;
224         }
225 
226         version (Windows)
227         {
228             auto maxWaitMillis = dur!("msecs")( uint.max - 1 );
229 
230             while ( val > maxWaitMillis )
231             {
232                 if ( timedWait( cast(uint)
233                                maxWaitMillis.total!"msecs" ) )
234                     return true;
235                 val -= maxWaitMillis;
236             }
237             return timedWait( cast(uint) val.total!"msecs" );
238         }
239         else version (Posix)
240         {
241             timespec t = void;
242             mktspec( t, val );
243 
244             int rc = pthread_cond_timedwait( &m_hndl,
245                                              m_assocMutex.handleAddr(),
246                                              &t );
247             if ( !rc ) {
248                 return true;
249             }
250             
251             if ( rc == ETIMEDOUT )
252                 return false;
253             throw new SyncError( "Unable to wait for condition" );
254         }
255     }
256 
257 
258     /**
259      * Notifies one waiter.
260      *
261      * Throws:
262      *  SyncError on error.
263      */
264     void notify()
265     {
266         version (Windows)
267         {
268             notify( false );
269         }
270         else version (Posix)
271         {
272             // Since OS X 10.7 (Lion), pthread_cond_signal returns EAGAIN after retrying 8192 times,
273             // so need to retrying while it returns EAGAIN.
274             //
275             // 10.7.0 (Lion):          http://www.opensource.apple.com/source/Libc/Libc-763.11/pthreads/pthread_cond.c
276             // 10.8.0 (Mountain Lion): http://www.opensource.apple.com/source/Libc/Libc-825.24/pthreads/pthread_cond.c
277             // 10.10.0 (Yosemite):     http://www.opensource.apple.com/source/libpthread/libpthread-105.1.4/src/pthread_cond.c
278             // 10.11.0 (El Capitan):   http://www.opensource.apple.com/source/libpthread/libpthread-137.1.1/src/pthread_cond.c
279             // 10.12.0 (Sierra):       http://www.opensource.apple.com/source/libpthread/libpthread-218.1.3/src/pthread_cond.c
280             // 10.13.0 (High Sierra):  http://www.opensource.apple.com/source/libpthread/libpthread-301.1.6/src/pthread_cond.c
281             // 10.14.0 (Mojave):       http://www.opensource.apple.com/source/libpthread/libpthread-330.201.1/src/pthread_cond.c
282             // 10.14.1 (Mojave):       http://www.opensource.apple.com/source/libpthread/libpthread-330.220.2/src/pthread_cond.c
283 
284             int rc;
285             do {
286                 rc = pthread_cond_signal( &m_hndl );
287             } while ( rc == EAGAIN );
288             if ( rc )
289                 throw new SyncError( "Unable to notify condition" );            
290         }
291     }
292 
293 
294     /**
295      * Notifies all waiters.
296      *
297      * Throws:
298      *  SyncError on error.
299      */
300     void notifyAll()
301     {
302         version (Windows)
303         {
304             notify( true );
305         }
306         else version (Posix)
307         {
308             // Since OS X 10.7 (Lion), pthread_cond_broadcast returns EAGAIN after retrying 8192 times,
309             // so need to retrying while it returns EAGAIN.
310             //
311             // 10.7.0 (Lion):          http://www.opensource.apple.com/source/Libc/Libc-763.11/pthreads/pthread_cond.c
312             // 10.8.0 (Mountain Lion): http://www.opensource.apple.com/source/Libc/Libc-825.24/pthreads/pthread_cond.c
313             // 10.10.0 (Yosemite):     http://www.opensource.apple.com/source/libpthread/libpthread-105.1.4/src/pthread_cond.c
314             // 10.11.0 (El Capitan):   http://www.opensource.apple.com/source/libpthread/libpthread-137.1.1/src/pthread_cond.c
315             // 10.12.0 (Sierra):       http://www.opensource.apple.com/source/libpthread/libpthread-218.1.3/src/pthread_cond.c
316             // 10.13.0 (High Sierra):  http://www.opensource.apple.com/source/libpthread/libpthread-301.1.6/src/pthread_cond.c
317             // 10.14.0 (Mojave):       http://www.opensource.apple.com/source/libpthread/libpthread-330.201.1/src/pthread_cond.c
318             // 10.14.1 (Mojave):       http://www.opensource.apple.com/source/libpthread/libpthread-330.220.2/src/pthread_cond.c
319 
320             int rc;
321             do {
322                 rc = pthread_cond_broadcast( &m_hndl );
323             } while ( rc == EAGAIN );
324             if ( rc )
325                 throw new SyncError( "Unable to notify condition" );
326         }
327     }
328 
329 
330     bool hasWaiters() {
331         return m_numWaitersBlocked>0;
332     }
333 
334     int getWaitQueueLength() {
335         return m_numWaitersBlocked;
336     }
337 
338 private:
339     version (Windows)
340     {
341         bool timedWait( DWORD timeout )
342         {
343             int   numSignalsLeft;
344             int   numWaitersGone;
345             DWORD rc;
346 
347             rc = WaitForSingleObject( m_blockLock, INFINITE );
348             assert( rc == WAIT_OBJECT_0 );
349 
350             m_numWaitersBlocked++;
351 
352             rc = ReleaseSemaphore( m_blockLock, 1, null );
353             assert( rc );
354 
355             m_assocMutex.unlock();
356             scope(failure) m_assocMutex.lock();
357 
358             rc = WaitForSingleObject( m_blockQueue, timeout );
359             assert( rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT );
360             bool timedOut = (rc == WAIT_TIMEOUT);
361 
362             EnterCriticalSection( &m_unblockLock );
363             scope(failure) LeaveCriticalSection( &m_unblockLock );
364 
365             if ( (numSignalsLeft = m_numWaitersToUnblock) != 0 )
366             {
367                 if ( timedOut )
368                 {
369                     // timeout (or canceled)
370                     if ( m_numWaitersBlocked != 0 )
371                     {
372                         m_numWaitersBlocked--;
373                         // do not unblock next waiter below (already unblocked)
374                         numSignalsLeft = 0;
375                     }
376                     else
377                     {
378                         // spurious wakeup pending!!
379                         m_numWaitersGone = 1;
380                     }
381                 }
382                 if ( --m_numWaitersToUnblock == 0 )
383                 {
384                     if ( m_numWaitersBlocked != 0 )
385                     {
386                         // open the gate
387                         rc = ReleaseSemaphore( m_blockLock, 1, null );
388                         assert( rc );
389                         // do not open the gate below again
390                         numSignalsLeft = 0;
391                     }
392                     else if ( (numWaitersGone = m_numWaitersGone) != 0 )
393                     {
394                         m_numWaitersGone = 0;
395                     }
396                 }
397             }
398             else if ( ++m_numWaitersGone == int.max / 2 )
399             {
400                 // timeout/canceled or spurious event :-)
401                 rc = WaitForSingleObject( m_blockLock, INFINITE );
402                 assert( rc == WAIT_OBJECT_0 );
403                 // something is going on here - test of timeouts?
404                 m_numWaitersBlocked -= m_numWaitersGone;
405                 rc = ReleaseSemaphore( m_blockLock, 1, null );
406                 assert( rc == WAIT_OBJECT_0 );
407                 m_numWaitersGone = 0;
408             }
409 
410             LeaveCriticalSection( &m_unblockLock );
411 
412             if ( numSignalsLeft == 1 )
413             {
414                 // better now than spurious later (same as ResetEvent)
415                 for ( ; numWaitersGone > 0; --numWaitersGone )
416                 {
417                     rc = WaitForSingleObject( m_blockQueue, INFINITE );
418                     assert( rc == WAIT_OBJECT_0 );
419                 }
420                 // open the gate
421                 rc = ReleaseSemaphore( m_blockLock, 1, null );
422                 assert( rc );
423             }
424             else if ( numSignalsLeft != 0 )
425             {
426                 // unblock next waiter
427                 rc = ReleaseSemaphore( m_blockQueue, 1, null );
428                 assert( rc );
429             }
430             m_assocMutex.lock();
431             return !timedOut;
432         }
433 
434 
435         void notify( bool all )
436         {
437             DWORD rc;
438 
439             EnterCriticalSection( &m_unblockLock );
440             scope(failure) LeaveCriticalSection( &m_unblockLock );
441 
442             if ( m_numWaitersToUnblock != 0 )
443             {
444                 if ( m_numWaitersBlocked == 0 )
445                 {
446                     LeaveCriticalSection( &m_unblockLock );
447                     return;
448                 }
449                 if ( all )
450                 {
451                     m_numWaitersToUnblock += m_numWaitersBlocked;
452                     m_numWaitersBlocked = 0;
453                 }
454                 else
455                 {
456                     m_numWaitersToUnblock++;
457                     m_numWaitersBlocked--;
458                 }
459                 LeaveCriticalSection( &m_unblockLock );
460             }
461             else if ( m_numWaitersBlocked > m_numWaitersGone )
462             {
463                 rc = WaitForSingleObject( m_blockLock, INFINITE );
464                 assert( rc == WAIT_OBJECT_0 );
465                 if ( 0 != m_numWaitersGone )
466                 {
467                     m_numWaitersBlocked -= m_numWaitersGone;
468                     m_numWaitersGone = 0;
469                 }
470                 if ( all )
471                 {
472                     m_numWaitersToUnblock = m_numWaitersBlocked;
473                     m_numWaitersBlocked = 0;
474                 }
475                 else
476                 {
477                     m_numWaitersToUnblock = 1;
478                     m_numWaitersBlocked--;
479                 }
480                 LeaveCriticalSection( &m_unblockLock );
481                 rc = ReleaseSemaphore( m_blockQueue, 1, null );
482                 assert( rc );
483             }
484             else
485             {
486                 LeaveCriticalSection( &m_unblockLock );
487             }
488         }
489 
490 
491         // NOTE: This implementation uses Algorithm 8c as described here:
492         //       http://groups.google.com/group/comp.programming.threads/
493         //              browse_frm/thread/1692bdec8040ba40/e7a5f9d40e86503a
494         HANDLE              m_blockLock;    // auto-reset event (now semaphore)
495         HANDLE              m_blockQueue;   // auto-reset event (now semaphore)
496         Mutex               m_assocMutex;   // external mutex/CS
497         CRITICAL_SECTION    m_unblockLock;  // internal mutex/CS
498         int                 m_numWaitersGone        = 0;
499         int                 m_numWaitersToUnblock   = 0;
500     }
501     else version (Posix)
502     {
503         Mutex               m_assocMutex;
504         pthread_cond_t      m_hndl;
505     }
506     
507         int                 m_numWaitersBlocked     = 0;
508 }
509 
510 
511 ////////////////////////////////////////////////////////////////////////////////
512 // Unit Tests
513 ////////////////////////////////////////////////////////////////////////////////
514 
515 unittest
516 {
517     import core.thread;
518     import hunt.pool.impl.Mutex;
519     import core.sync.semaphore;
520 
521 
522     void testNotify()
523     {
524         auto mutex      = new Mutex;
525         auto condReady  = new Condition( mutex );
526         auto semDone    = new Semaphore;
527         auto synLoop    = new Object;
528         int  numWaiters = 10;
529         int  numTries   = 10;
530         int  numReady   = 0;
531         int  numTotal   = 0;
532         int  numDone    = 0;
533         int  numPost    = 0;
534 
535         void waiter()
536         {
537             for ( int i = 0; i < numTries; ++i )
538             {
539                 synchronized( mutex )
540                 {
541                     while ( numReady < 1 )
542                     {
543                         condReady.wait();
544                     }
545                     --numReady;
546                     ++numTotal;
547                 }
548 
549                 synchronized( synLoop )
550                 {
551                     ++numDone;
552                 }
553                 semDone.wait();
554             }
555         }
556 
557         auto group = new ThreadGroup;
558 
559         for ( int i = 0; i < numWaiters; ++i )
560             group.create( &waiter );
561 
562         for ( int i = 0; i < numTries; ++i )
563         {
564             for ( int j = 0; j < numWaiters; ++j )
565             {
566                 synchronized( mutex )
567                 {
568                     ++numReady;
569                     condReady.notify();
570                 }
571             }
572             while ( true )
573             {
574                 synchronized( synLoop )
575                 {
576                     if ( numDone >= numWaiters )
577                         break;
578                 }
579                 Thread.yield();
580             }
581             for ( int j = 0; j < numWaiters; ++j )
582             {
583                 semDone.notify();
584             }
585         }
586 
587         group.joinAll();
588         assert( numTotal == numWaiters * numTries );
589     }
590 
591 
592     void testNotifyAll()
593     {
594         auto mutex      = new Mutex;
595         auto condReady  = new Condition( mutex );
596         int  numWaiters = 10;
597         int  numReady   = 0;
598         int  numDone    = 0;
599         bool alert      = false;
600 
601         void waiter()
602         {
603             synchronized( mutex )
604             {
605                 ++numReady;
606                 while ( !alert )
607                     condReady.wait();
608                 ++numDone;
609             }
610         }
611 
612         auto group = new ThreadGroup;
613 
614         for ( int i = 0; i < numWaiters; ++i )
615             group.create( &waiter );
616 
617         while ( true )
618         {
619             synchronized( mutex )
620             {
621                 if ( numReady >= numWaiters )
622                 {
623                     alert = true;
624                     condReady.notifyAll();
625                     break;
626                 }
627             }
628             Thread.yield();
629         }
630         group.joinAll();
631         assert( numReady == numWaiters && numDone == numWaiters );
632     }
633 
634 
635     void testWaitTimeout()
636     {
637         auto mutex      = new Mutex;
638         auto condReady  = new Condition( mutex );
639         bool waiting    = false;
640         bool alertedOne = true;
641         bool alertedTwo = true;
642 
643         void waiter()
644         {
645             synchronized( mutex )
646             {
647                 waiting    = true;
648                 // we never want to miss the notification (30s)
649                 alertedOne = condReady.wait( dur!"seconds"(30) );
650                 // but we don't want to wait long for the timeout (10ms)
651                 alertedTwo = condReady.wait( dur!"msecs"(10) );
652             }
653         }
654 
655         auto thread = new Thread( &waiter );
656         thread.start();
657 
658         while ( true )
659         {
660             synchronized( mutex )
661             {
662                 if ( waiting )
663                 {
664                     condReady.notify();
665                     break;
666                 }
667             }
668             Thread.yield();
669         }
670         thread.join();
671         assert( waiting );
672         assert( alertedOne );
673         assert( !alertedTwo );
674     }
675 
676     testNotify();
677     testNotifyAll();
678     testWaitTimeout();
679 }