1 /** 2 * The condition module provides a primitive for synchronized condition 3 * checking. 4 * 5 * Copyright: Copyright Sean Kelly 2005 - 2009. 6 * License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0) 7 * Authors: Sean Kelly 8 * Source: $(DRUNTIMESRC core/sync/_condition.d) 9 */ 10 11 /* Copyright Sean Kelly 2005 - 2009. 12 * Distributed under the Boost Software License, Version 1.0. 13 * (See accompanying file LICENSE or copy at 14 * http://www.boost.org/LICENSE_1_0.txt) 15 */ 16 module hunt.pool.impl.Condition; 17 18 import core.sync.exception; 19 import hunt.pool.impl.Mutex; 20 import core.time; 21 22 version (Windows) 23 { 24 private import core.sync.semaphore; 25 private import core.sys.windows.basetsd /+: HANDLE+/; 26 private import core.sys.windows.winbase /+: CloseHandle, CreateSemaphoreA, CRITICAL_SECTION, 27 DeleteCriticalSection, EnterCriticalSection, INFINITE, InitializeCriticalSection, 28 LeaveCriticalSection, ReleaseSemaphore, WAIT_OBJECT_0, WaitForSingleObject+/; 29 private import core.sys.windows.windef /+: BOOL, DWORD+/; 30 private import core.sys.windows.winerror /+: WAIT_TIMEOUT+/; 31 } 32 else version (Posix) 33 { 34 private import core.sync.config; 35 private import core.stdc.errno; 36 private import core.sys.posix.pthread; 37 private import core.sys.posix.time; 38 } 39 else 40 { 41 static assert(false, "Platform not supported"); 42 } 43 44 45 //////////////////////////////////////////////////////////////////////////////// 46 // Condition 47 // 48 // void wait(); 49 // void notify(); 50 // void notifyAll(); 51 //////////////////////////////////////////////////////////////////////////////// 52 53 54 /** 55 * This class represents a condition variable as conceived by C.A.R. Hoare. As 56 * per Mesa type monitors however, "signal" has been replaced with "notify" to 57 * indicate that control is not transferred to the waiter when a notification 58 * is sent. 59 */ 60 class Condition 61 { 62 //////////////////////////////////////////////////////////////////////////// 63 // Initialization 64 //////////////////////////////////////////////////////////////////////////// 65 66 /** 67 * Initializes a condition object which is associated with the supplied 68 * mutex object. 69 * 70 * Params: 71 * m = The mutex with which this condition will be associated. 72 * 73 * Throws: 74 * SyncError on error. 75 */ 76 this( Mutex m ) nothrow @safe 77 { 78 version (Windows) 79 { 80 m_blockLock = CreateSemaphoreA( null, 1, 1, null ); 81 if ( m_blockLock == m_blockLock.init ) 82 throw new SyncError( "Unable to initialize condition" ); 83 scope(failure) CloseHandle( m_blockLock ); 84 85 m_blockQueue = CreateSemaphoreA( null, 0, int.max, null ); 86 if ( m_blockQueue == m_blockQueue.init ) 87 throw new SyncError( "Unable to initialize condition" ); 88 scope(failure) CloseHandle( m_blockQueue ); 89 90 InitializeCriticalSection( &m_unblockLock ); 91 m_assocMutex = m; 92 } 93 else version (Posix) 94 { 95 m_assocMutex = m; 96 static if ( is( typeof( pthread_condattr_setclock ) ) ) 97 { 98 () @trusted 99 { 100 pthread_condattr_t attr = void; 101 int rc = pthread_condattr_init( &attr ); 102 if ( rc ) 103 throw new SyncError( "Unable to initialize condition" ); 104 rc = pthread_condattr_setclock( &attr, CLOCK_MONOTONIC ); 105 if ( rc ) 106 throw new SyncError( "Unable to initialize condition" ); 107 rc = pthread_cond_init( &m_hndl, &attr ); 108 if ( rc ) 109 throw new SyncError( "Unable to initialize condition" ); 110 rc = pthread_condattr_destroy( &attr ); 111 if ( rc ) 112 throw new SyncError( "Unable to initialize condition" ); 113 } (); 114 } 115 else 116 { 117 int rc = pthread_cond_init( &m_hndl, null ); 118 if ( rc ) 119 throw new SyncError( "Unable to initialize condition" ); 120 } 121 } 122 } 123 124 125 ~this() 126 { 127 version (Windows) 128 { 129 BOOL rc = CloseHandle( m_blockLock ); 130 assert( rc, "Unable to destroy condition" ); 131 rc = CloseHandle( m_blockQueue ); 132 assert( rc, "Unable to destroy condition" ); 133 DeleteCriticalSection( &m_unblockLock ); 134 } 135 else version (Posix) 136 { 137 int rc = pthread_cond_destroy( &m_hndl ); 138 assert( !rc, "Unable to destroy condition" ); 139 } 140 } 141 142 143 //////////////////////////////////////////////////////////////////////////// 144 // General Properties 145 //////////////////////////////////////////////////////////////////////////// 146 147 148 /** 149 * Gets the mutex associated with this condition. 150 * 151 * Returns: 152 * The mutex associated with this condition. 153 */ 154 @property Mutex mutex() 155 { 156 return m_assocMutex; 157 } 158 159 // undocumented function for internal use 160 final @property Mutex mutex_nothrow() pure nothrow @safe @nogc 161 { 162 return m_assocMutex; 163 } 164 165 166 //////////////////////////////////////////////////////////////////////////// 167 // General Actions 168 //////////////////////////////////////////////////////////////////////////// 169 170 171 /** 172 * Wait until notified. 173 * 174 * Throws: 175 * SyncError on error. 176 */ 177 void wait() 178 { 179 m_numWaitersBlocked++; 180 scope(exit) { 181 m_numWaitersBlocked--; 182 } 183 184 version (Windows) 185 { 186 timedWait( INFINITE ); 187 } 188 else version (Posix) 189 { 190 int rc = pthread_cond_wait( &m_hndl, m_assocMutex.handleAddr() ); 191 if ( rc ) 192 throw new SyncError( "Unable to wait for condition" ); 193 } 194 } 195 196 197 /** 198 * Suspends the calling thread until a notification occurs or until the 199 * supplied time period has elapsed. 200 * 201 * Params: 202 * val = The time to wait. 203 * 204 * In: 205 * val must be non-negative. 206 * 207 * Throws: 208 * SyncError on error. 209 * 210 * Returns: 211 * true if notified before the timeout and false if not. 212 */ 213 bool wait( Duration val ) 214 in 215 { 216 assert( !val.isNegative ); 217 } 218 do 219 { 220 221 m_numWaitersBlocked++; 222 scope(exit) { 223 m_numWaitersBlocked--; 224 } 225 226 version (Windows) 227 { 228 auto maxWaitMillis = dur!("msecs")( uint.max - 1 ); 229 230 while ( val > maxWaitMillis ) 231 { 232 if ( timedWait( cast(uint) 233 maxWaitMillis.total!"msecs" ) ) 234 return true; 235 val -= maxWaitMillis; 236 } 237 return timedWait( cast(uint) val.total!"msecs" ); 238 } 239 else version (Posix) 240 { 241 timespec t = void; 242 mktspec( t, val ); 243 244 int rc = pthread_cond_timedwait( &m_hndl, 245 m_assocMutex.handleAddr(), 246 &t ); 247 if ( !rc ) { 248 return true; 249 } 250 251 if ( rc == ETIMEDOUT ) 252 return false; 253 throw new SyncError( "Unable to wait for condition" ); 254 } 255 } 256 257 258 /** 259 * Notifies one waiter. 260 * 261 * Throws: 262 * SyncError on error. 263 */ 264 void notify() 265 { 266 version (Windows) 267 { 268 notify( false ); 269 } 270 else version (Posix) 271 { 272 // Since OS X 10.7 (Lion), pthread_cond_signal returns EAGAIN after retrying 8192 times, 273 // so need to retrying while it returns EAGAIN. 274 // 275 // 10.7.0 (Lion): http://www.opensource.apple.com/source/Libc/Libc-763.11/pthreads/pthread_cond.c 276 // 10.8.0 (Mountain Lion): http://www.opensource.apple.com/source/Libc/Libc-825.24/pthreads/pthread_cond.c 277 // 10.10.0 (Yosemite): http://www.opensource.apple.com/source/libpthread/libpthread-105.1.4/src/pthread_cond.c 278 // 10.11.0 (El Capitan): http://www.opensource.apple.com/source/libpthread/libpthread-137.1.1/src/pthread_cond.c 279 // 10.12.0 (Sierra): http://www.opensource.apple.com/source/libpthread/libpthread-218.1.3/src/pthread_cond.c 280 // 10.13.0 (High Sierra): http://www.opensource.apple.com/source/libpthread/libpthread-301.1.6/src/pthread_cond.c 281 // 10.14.0 (Mojave): http://www.opensource.apple.com/source/libpthread/libpthread-330.201.1/src/pthread_cond.c 282 // 10.14.1 (Mojave): http://www.opensource.apple.com/source/libpthread/libpthread-330.220.2/src/pthread_cond.c 283 284 int rc; 285 do { 286 rc = pthread_cond_signal( &m_hndl ); 287 } while ( rc == EAGAIN ); 288 if ( rc ) 289 throw new SyncError( "Unable to notify condition" ); 290 } 291 } 292 293 294 /** 295 * Notifies all waiters. 296 * 297 * Throws: 298 * SyncError on error. 299 */ 300 void notifyAll() 301 { 302 version (Windows) 303 { 304 notify( true ); 305 } 306 else version (Posix) 307 { 308 // Since OS X 10.7 (Lion), pthread_cond_broadcast returns EAGAIN after retrying 8192 times, 309 // so need to retrying while it returns EAGAIN. 310 // 311 // 10.7.0 (Lion): http://www.opensource.apple.com/source/Libc/Libc-763.11/pthreads/pthread_cond.c 312 // 10.8.0 (Mountain Lion): http://www.opensource.apple.com/source/Libc/Libc-825.24/pthreads/pthread_cond.c 313 // 10.10.0 (Yosemite): http://www.opensource.apple.com/source/libpthread/libpthread-105.1.4/src/pthread_cond.c 314 // 10.11.0 (El Capitan): http://www.opensource.apple.com/source/libpthread/libpthread-137.1.1/src/pthread_cond.c 315 // 10.12.0 (Sierra): http://www.opensource.apple.com/source/libpthread/libpthread-218.1.3/src/pthread_cond.c 316 // 10.13.0 (High Sierra): http://www.opensource.apple.com/source/libpthread/libpthread-301.1.6/src/pthread_cond.c 317 // 10.14.0 (Mojave): http://www.opensource.apple.com/source/libpthread/libpthread-330.201.1/src/pthread_cond.c 318 // 10.14.1 (Mojave): http://www.opensource.apple.com/source/libpthread/libpthread-330.220.2/src/pthread_cond.c 319 320 int rc; 321 do { 322 rc = pthread_cond_broadcast( &m_hndl ); 323 } while ( rc == EAGAIN ); 324 if ( rc ) 325 throw new SyncError( "Unable to notify condition" ); 326 } 327 } 328 329 330 bool hasWaiters() { 331 return m_numWaitersBlocked>0; 332 } 333 334 int getWaitQueueLength() { 335 return m_numWaitersBlocked; 336 } 337 338 private: 339 version (Windows) 340 { 341 bool timedWait( DWORD timeout ) 342 { 343 int numSignalsLeft; 344 int numWaitersGone; 345 DWORD rc; 346 347 rc = WaitForSingleObject( m_blockLock, INFINITE ); 348 assert( rc == WAIT_OBJECT_0 ); 349 350 m_numWaitersBlocked++; 351 352 rc = ReleaseSemaphore( m_blockLock, 1, null ); 353 assert( rc ); 354 355 m_assocMutex.unlock(); 356 scope(failure) m_assocMutex.lock(); 357 358 rc = WaitForSingleObject( m_blockQueue, timeout ); 359 assert( rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT ); 360 bool timedOut = (rc == WAIT_TIMEOUT); 361 362 EnterCriticalSection( &m_unblockLock ); 363 scope(failure) LeaveCriticalSection( &m_unblockLock ); 364 365 if ( (numSignalsLeft = m_numWaitersToUnblock) != 0 ) 366 { 367 if ( timedOut ) 368 { 369 // timeout (or canceled) 370 if ( m_numWaitersBlocked != 0 ) 371 { 372 m_numWaitersBlocked--; 373 // do not unblock next waiter below (already unblocked) 374 numSignalsLeft = 0; 375 } 376 else 377 { 378 // spurious wakeup pending!! 379 m_numWaitersGone = 1; 380 } 381 } 382 if ( --m_numWaitersToUnblock == 0 ) 383 { 384 if ( m_numWaitersBlocked != 0 ) 385 { 386 // open the gate 387 rc = ReleaseSemaphore( m_blockLock, 1, null ); 388 assert( rc ); 389 // do not open the gate below again 390 numSignalsLeft = 0; 391 } 392 else if ( (numWaitersGone = m_numWaitersGone) != 0 ) 393 { 394 m_numWaitersGone = 0; 395 } 396 } 397 } 398 else if ( ++m_numWaitersGone == int.max / 2 ) 399 { 400 // timeout/canceled or spurious event :-) 401 rc = WaitForSingleObject( m_blockLock, INFINITE ); 402 assert( rc == WAIT_OBJECT_0 ); 403 // something is going on here - test of timeouts? 404 m_numWaitersBlocked -= m_numWaitersGone; 405 rc = ReleaseSemaphore( m_blockLock, 1, null ); 406 assert( rc == WAIT_OBJECT_0 ); 407 m_numWaitersGone = 0; 408 } 409 410 LeaveCriticalSection( &m_unblockLock ); 411 412 if ( numSignalsLeft == 1 ) 413 { 414 // better now than spurious later (same as ResetEvent) 415 for ( ; numWaitersGone > 0; --numWaitersGone ) 416 { 417 rc = WaitForSingleObject( m_blockQueue, INFINITE ); 418 assert( rc == WAIT_OBJECT_0 ); 419 } 420 // open the gate 421 rc = ReleaseSemaphore( m_blockLock, 1, null ); 422 assert( rc ); 423 } 424 else if ( numSignalsLeft != 0 ) 425 { 426 // unblock next waiter 427 rc = ReleaseSemaphore( m_blockQueue, 1, null ); 428 assert( rc ); 429 } 430 m_assocMutex.lock(); 431 return !timedOut; 432 } 433 434 435 void notify( bool all ) 436 { 437 DWORD rc; 438 439 EnterCriticalSection( &m_unblockLock ); 440 scope(failure) LeaveCriticalSection( &m_unblockLock ); 441 442 if ( m_numWaitersToUnblock != 0 ) 443 { 444 if ( m_numWaitersBlocked == 0 ) 445 { 446 LeaveCriticalSection( &m_unblockLock ); 447 return; 448 } 449 if ( all ) 450 { 451 m_numWaitersToUnblock += m_numWaitersBlocked; 452 m_numWaitersBlocked = 0; 453 } 454 else 455 { 456 m_numWaitersToUnblock++; 457 m_numWaitersBlocked--; 458 } 459 LeaveCriticalSection( &m_unblockLock ); 460 } 461 else if ( m_numWaitersBlocked > m_numWaitersGone ) 462 { 463 rc = WaitForSingleObject( m_blockLock, INFINITE ); 464 assert( rc == WAIT_OBJECT_0 ); 465 if ( 0 != m_numWaitersGone ) 466 { 467 m_numWaitersBlocked -= m_numWaitersGone; 468 m_numWaitersGone = 0; 469 } 470 if ( all ) 471 { 472 m_numWaitersToUnblock = m_numWaitersBlocked; 473 m_numWaitersBlocked = 0; 474 } 475 else 476 { 477 m_numWaitersToUnblock = 1; 478 m_numWaitersBlocked--; 479 } 480 LeaveCriticalSection( &m_unblockLock ); 481 rc = ReleaseSemaphore( m_blockQueue, 1, null ); 482 assert( rc ); 483 } 484 else 485 { 486 LeaveCriticalSection( &m_unblockLock ); 487 } 488 } 489 490 491 // NOTE: This implementation uses Algorithm 8c as described here: 492 // http://groups.google.com/group/comp.programming.threads/ 493 // browse_frm/thread/1692bdec8040ba40/e7a5f9d40e86503a 494 HANDLE m_blockLock; // auto-reset event (now semaphore) 495 HANDLE m_blockQueue; // auto-reset event (now semaphore) 496 Mutex m_assocMutex; // external mutex/CS 497 CRITICAL_SECTION m_unblockLock; // internal mutex/CS 498 int m_numWaitersGone = 0; 499 int m_numWaitersToUnblock = 0; 500 } 501 else version (Posix) 502 { 503 Mutex m_assocMutex; 504 pthread_cond_t m_hndl; 505 } 506 507 int m_numWaitersBlocked = 0; 508 } 509 510 511 //////////////////////////////////////////////////////////////////////////////// 512 // Unit Tests 513 //////////////////////////////////////////////////////////////////////////////// 514 515 unittest 516 { 517 import core.thread; 518 import hunt.pool.impl.Mutex; 519 import core.sync.semaphore; 520 521 522 void testNotify() 523 { 524 auto mutex = new Mutex; 525 auto condReady = new Condition( mutex ); 526 auto semDone = new Semaphore; 527 auto synLoop = new Object; 528 int numWaiters = 10; 529 int numTries = 10; 530 int numReady = 0; 531 int numTotal = 0; 532 int numDone = 0; 533 int numPost = 0; 534 535 void waiter() 536 { 537 for ( int i = 0; i < numTries; ++i ) 538 { 539 synchronized( mutex ) 540 { 541 while ( numReady < 1 ) 542 { 543 condReady.wait(); 544 } 545 --numReady; 546 ++numTotal; 547 } 548 549 synchronized( synLoop ) 550 { 551 ++numDone; 552 } 553 semDone.wait(); 554 } 555 } 556 557 auto group = new ThreadGroup; 558 559 for ( int i = 0; i < numWaiters; ++i ) 560 group.create( &waiter ); 561 562 for ( int i = 0; i < numTries; ++i ) 563 { 564 for ( int j = 0; j < numWaiters; ++j ) 565 { 566 synchronized( mutex ) 567 { 568 ++numReady; 569 condReady.notify(); 570 } 571 } 572 while ( true ) 573 { 574 synchronized( synLoop ) 575 { 576 if ( numDone >= numWaiters ) 577 break; 578 } 579 Thread.yield(); 580 } 581 for ( int j = 0; j < numWaiters; ++j ) 582 { 583 semDone.notify(); 584 } 585 } 586 587 group.joinAll(); 588 assert( numTotal == numWaiters * numTries ); 589 } 590 591 592 void testNotifyAll() 593 { 594 auto mutex = new Mutex; 595 auto condReady = new Condition( mutex ); 596 int numWaiters = 10; 597 int numReady = 0; 598 int numDone = 0; 599 bool alert = false; 600 601 void waiter() 602 { 603 synchronized( mutex ) 604 { 605 ++numReady; 606 while ( !alert ) 607 condReady.wait(); 608 ++numDone; 609 } 610 } 611 612 auto group = new ThreadGroup; 613 614 for ( int i = 0; i < numWaiters; ++i ) 615 group.create( &waiter ); 616 617 while ( true ) 618 { 619 synchronized( mutex ) 620 { 621 if ( numReady >= numWaiters ) 622 { 623 alert = true; 624 condReady.notifyAll(); 625 break; 626 } 627 } 628 Thread.yield(); 629 } 630 group.joinAll(); 631 assert( numReady == numWaiters && numDone == numWaiters ); 632 } 633 634 635 void testWaitTimeout() 636 { 637 auto mutex = new Mutex; 638 auto condReady = new Condition( mutex ); 639 bool waiting = false; 640 bool alertedOne = true; 641 bool alertedTwo = true; 642 643 void waiter() 644 { 645 synchronized( mutex ) 646 { 647 waiting = true; 648 // we never want to miss the notification (30s) 649 alertedOne = condReady.wait( dur!"seconds"(30) ); 650 // but we don't want to wait long for the timeout (10ms) 651 alertedTwo = condReady.wait( dur!"msecs"(10) ); 652 } 653 } 654 655 auto thread = new Thread( &waiter ); 656 thread.start(); 657 658 while ( true ) 659 { 660 synchronized( mutex ) 661 { 662 if ( waiting ) 663 { 664 condReady.notify(); 665 break; 666 } 667 } 668 Thread.yield(); 669 } 670 thread.join(); 671 assert( waiting ); 672 assert( alertedOne ); 673 assert( !alertedTwo ); 674 } 675 676 testNotify(); 677 testNotifyAll(); 678 testWaitTimeout(); 679 }