1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 module hunt.pool.impl.LinkedBlockingDeque; 18 19 import hunt.collection; 20 import hunt.Exceptions; 21 import hunt.logging.ConsoleLogger; 22 23 import core.time; 24 // import core.sync.condition; 25 import hunt.pool.impl.Condition; 26 import hunt.pool.impl.Mutex; 27 // import core.sync.mutex; 28 29 import std.algorithm; 30 import std.range; 31 32 33 /** 34 * An optionally-bounded {@linkplain java.util.concurrent.BlockingDeque blocking 35 * deque} based on linked nodes. 36 * 37 * <p> The optional capacity bound constructor argument serves as a 38 * way to prevent excessive expansion. The capacity, if unspecified, 39 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are 40 * dynamically created upon each insertion unless this would bring the 41 * deque above capacity. 42 * 43 * <p>Most operations run in constant time (ignoring time spent 44 * blocking). Exceptions include {@link #remove(Object) remove}, 45 * {@link #removeFirstOccurrence removeFirstOccurrence}, {@link 46 * #removeLastOccurrence removeLastOccurrence}, {@link #contains 47 * contains}, {@link #iterator iterator.remove()}, and the bulk 48 * operations, all of which run in linear time. 49 * 50 * <p>This class and its iterator implement all of the 51 * <em>optional</em> methods of the {@link Collection} and {@link 52 * Iterator} interfaces. 53 * 54 * <p>This class is a member of the 55 * <a href="{@docRoot}/../technotes/guides/collections/index.html"> 56 * Java Collections Framework</a>. 57 * 58 * @param <E> the type of elements held in this collection 59 * 60 * Note: This was copied from Apache Harmony and modified to suit the needs of 61 * Commons Pool. 62 * 63 */ 64 class LinkedBlockingDeque(E) : AbstractDeque!(E) { // , Serializable 65 66 /* 67 * Implemented as a simple doubly-linked list protected by a 68 * single lock and using conditions to manage blocking. 69 * 70 * To implement weakly consistent iterators, it appears we need to 71 * keep all Nodes GC-reachable from a predecessor dequeued Node. 72 * That would cause two problems: 73 * - allow a rogue Iterator to cause unbounded memory retention 74 * - cause cross-generational linking of old Nodes to new Nodes if 75 * a Node was tenured while live, which generational GCs have a 76 * hard time dealing with, causing repeated major collections. 77 * However, only non-deleted Nodes need to be reachable from 78 * dequeued Nodes, and reachability does not necessarily have to 79 * be of the kind understood by the GC. We use the trick of 80 * linking a Node that has just been dequeued to itself. Such a 81 * self-link implicitly means to jump to "first" (for next links) 82 * or "last" (for prev links). 83 */ 84 85 /* 86 * We have "diamond" multiple interface/abstract class inheritance 87 * here, and that introduces ambiguities. Often we want the 88 * BlockingDeque javadoc combined with the AbstractQueue 89 * implementation, so a lot of method specs are duplicated here. 90 */ 91 92 93 94 /** 95 * Doubly-linked list node class. 96 * 97 * @param <E> node item type 98 */ 99 private static class Node(E) { 100 /** 101 * The item, or null if this node has been removed. 102 */ 103 E item; 104 105 /** 106 * One of: 107 * - the real predecessor Node 108 * - this Node, meaning the predecessor is tail 109 * - null, meaning there is no predecessor 110 */ 111 Node!(E) prev; 112 113 /** 114 * One of: 115 * - the real successor Node 116 * - this Node, meaning the successor is head 117 * - null, meaning there is no successor 118 */ 119 Node!(E) next; 120 121 /** 122 * Create a new list node. 123 * 124 * @param x The list item 125 * @param p Previous item 126 * @param n Next item 127 */ 128 this(E x, Node!(E) p, Node!(E) n) { 129 item = x; 130 prev = p; 131 next = n; 132 } 133 } 134 135 /** 136 * Pointer to first node. 137 * Invariant: (first is null && last is null) || 138 * (first.prev is null && first.item !is null) 139 */ 140 private Node!(E) first; // @GuardedBy("lock") 141 142 /** 143 * Pointer to last node. 144 * Invariant: (first is null && last is null) || 145 * (last.next is null && last.item !is null) 146 */ 147 private Node!(E) last; // @GuardedBy("lock") 148 149 /** Number of items in the deque */ 150 private int count; // @GuardedBy("lock") 151 152 /** Maximum number of items in the deque */ 153 private int capacity; 154 155 /** Main lock guarding all access */ 156 private Mutex lock; 157 158 /** Condition for waiting takes */ 159 private Condition notEmpty; 160 161 /** Condition for waiting puts */ 162 private Condition notFull; 163 164 /** 165 * Creates a {@code LinkedBlockingDeque} with a capacity of 166 * {@link Integer#MAX_VALUE}. 167 */ 168 this() { 169 this(int.max); 170 } 171 172 /** 173 * Creates a {@code LinkedBlockingDeque} with a capacity of 174 * {@link Integer#MAX_VALUE} and the given fairness policy. 175 * @param fairness true means threads waiting on the deque should be served 176 * as if waiting in a FIFO request queue 177 */ 178 this(bool fairness) { 179 this(int.max, fairness); 180 } 181 182 /** 183 * Creates a {@code LinkedBlockingDeque} with the given (fixed) capacity. 184 * 185 * @param capacity the capacity of this deque 186 * @throws IllegalArgumentException if {@code capacity} is less than 1 187 */ 188 this(int capacity) { 189 this(capacity, false); 190 } 191 192 /** 193 * Creates a {@code LinkedBlockingDeque} with the given (fixed) capacity 194 * and fairness policy. 195 * 196 * @param capacity the capacity of this deque 197 * @param fairness true means threads waiting on the deque should be served 198 * as if waiting in a FIFO request queue 199 * @throws IllegalArgumentException if {@code capacity} is less than 1 200 */ 201 this(int capacity, bool fairness) { 202 if (capacity <= 0) { 203 throw new IllegalArgumentException(); 204 } 205 this.capacity = capacity; 206 lock = new Mutex(); // new InterruptibleReentrantLock(fairness); 207 notEmpty = new Condition(lock); // lock.newCondition(); 208 notFull = new Condition(lock); // lock.newCondition(); 209 } 210 211 /** 212 * Creates a {@code LinkedBlockingDeque} with a capacity of 213 * {@link Integer#MAX_VALUE}, initially containing the elements of 214 * the given collection, added in traversal order of the 215 * collection's iterator. 216 * 217 * @param c the collection of elements to initially contain 218 * @throws NullPointerException if the specified collection or any 219 * of its elements are null 220 */ 221 this(Collection!E c) { 222 this(int.max); 223 224 foreach(E e ; c) { 225 if (e is null) { 226 throw new NullPointerException(); 227 } 228 if (!linkLast(e)) { 229 throw new IllegalStateException("Deque full"); 230 } 231 } 232 } 233 234 this(E[] c) { 235 this(int.max); 236 foreach(E e ; c) { 237 if (e is null) { 238 throw new NullPointerException(); 239 } 240 if (!linkLast(e)) { 241 throw new IllegalStateException("Deque full"); 242 } 243 } 244 } 245 246 // Basic linking and unlinking operations, called only while holding lock 247 248 /** 249 * Links provided element as first element, or returns false if full. 250 * 251 * @param e The element to link as the first element. 252 * 253 * @return {@code true} if successful, otherwise {@code false} 254 */ 255 private bool linkFirst(E e) { 256 // assert lock.isHeldByCurrentThread(); 257 if (count >= capacity) { 258 return false; 259 } 260 Node!(E) f = first; 261 Node!(E) x = new Node!(E)(e, null, f); 262 first = x; 263 if (last is null) { 264 last = x; 265 } else { 266 f.prev = x; 267 } 268 ++count; 269 notEmpty.notify(); 270 return true; 271 } 272 273 /** 274 * Links provided element as last element, or returns false if full. 275 * 276 * @param e The element to link as the last element. 277 * 278 * @return {@code true} if successful, otherwise {@code false} 279 */ 280 private bool linkLast(E e) { 281 // assert lock.isHeldByCurrentThread(); 282 if (count >= capacity) { 283 return false; 284 } 285 Node!(E) l = last; 286 Node!(E) x = new Node!E(e, l, null); 287 last = x; 288 if (first is null) { 289 first = x; 290 } else { 291 l.next = x; 292 } 293 ++count; 294 notEmpty.notify(); 295 return true; 296 } 297 298 /** 299 * Removes and returns the first element, or null if empty. 300 * 301 * @return The first element or {@code null} if empty 302 */ 303 private E unlinkFirst() { 304 // assert lock.isHeldByCurrentThread(); 305 Node!(E) f = first; 306 if (f is null) { 307 return null; 308 } 309 Node!(E) n = f.next; 310 E item = f.item; 311 f.item = null; 312 f.next = f; // help GC 313 first = n; 314 if (n is null) { 315 last = null; 316 } else { 317 n.prev = null; 318 } 319 --count; 320 notFull.notify(); 321 return item; 322 } 323 324 /** 325 * Removes and returns the last element, or null if empty. 326 * 327 * @return The first element or {@code null} if empty 328 */ 329 private E unlinkLast() { 330 // assert lock.isHeldByCurrentThread(); 331 Node!(E) l = last; 332 if (l is null) { 333 return null; 334 } 335 Node!(E) p = l.prev; 336 E item = l.item; 337 l.item = null; 338 l.prev = l; // help GC 339 last = p; 340 if (p is null) { 341 first = null; 342 } else { 343 p.next = null; 344 } 345 --count; 346 notFull.notify(); 347 return item; 348 } 349 350 /** 351 * Unlinks the provided node. 352 * 353 * @param x The node to unlink 354 */ 355 private void unlink(Node!(E) x) { 356 // assert lock.isHeldByCurrentThread(); 357 Node!(E) p = x.prev; 358 Node!(E) n = x.next; 359 if (p is null) { 360 unlinkFirst(); 361 } else if (n is null) { 362 unlinkLast(); 363 } else { 364 p.next = n; 365 n.prev = p; 366 x.item = null; 367 // Don't mess with x's links. They may still be in use by 368 // an iterator. 369 --count; 370 notFull.notify(); 371 } 372 } 373 374 // BlockingDeque methods 375 376 /** 377 * {@inheritDoc} 378 */ 379 override 380 bool offerFirst(E e) { 381 if (e is null) { 382 throw new NullPointerException(); 383 } 384 lock.lock(); 385 try { 386 return linkFirst(e); 387 } finally { 388 lock.unlock(); 389 } 390 } 391 392 /** 393 * {@inheritDoc} 394 */ 395 override bool offerLast(E e) { 396 if (e is null) { 397 throw new NullPointerException(); 398 } 399 lock.lock(); 400 try { 401 return linkLast(e); 402 } finally { 403 lock.unlock(); 404 } 405 } 406 407 /** 408 * Links the provided element as the first in the queue, waiting until there 409 * is space to do so if the queue is full. 410 * 411 * @param e element to link 412 * 413 * @throws NullPointerException if e is null 414 * @throws InterruptedException if the thread is interrupted whilst waiting 415 * for space 416 */ 417 void putFirst(E e){ 418 if (e is null) { 419 throw new NullPointerException(); 420 } 421 lock.lock(); 422 try { 423 while (!linkFirst(e)) { 424 notFull.wait(); 425 } 426 } finally { 427 lock.unlock(); 428 } 429 } 430 431 /** 432 * Links the provided element as the last in the queue, waiting until there 433 * is space to do so if the queue is full. 434 * 435 * @param e element to link 436 * 437 * @throws NullPointerException if e is null 438 * @throws InterruptedException if the thread is interrupted whilst waiting 439 * for space 440 */ 441 override void putLast(E e) { 442 if (e is null) { 443 throw new NullPointerException(); 444 } 445 lock.lock(); 446 try { 447 while (!linkLast(e)) { 448 notFull.wait(); 449 } 450 } finally { 451 lock.unlock(); 452 } 453 } 454 455 /** 456 * Links the provided element as the first in the queue, waiting up to the 457 * specified time to do so if the queue is full. 458 * 459 * @param e element to link 460 * @param timeout length of time to wait 461 * @param unit units that timeout is expressed in 462 * 463 * @return {@code true} if successful, otherwise {@code false} 464 * 465 * @throws NullPointerException if e is null 466 * @throws InterruptedException if the thread is interrupted whilst waiting 467 * for space 468 */ 469 bool offerFirst(E e, Duration timeout) { 470 if (e is null) { 471 throw new NullPointerException(); 472 } 473 // long nanos = unit.toNanos(timeout); 474 lock.lock(); 475 bool isTimeout = false; 476 try { 477 while (!linkFirst(e)) { 478 if (isTimeout) { 479 return false; 480 } 481 // nanos = notFull.awaitNanos(nanos); 482 // TODO: Tasks pending completion -@zxp at 7/10/2019, 1:31:30 PM 483 // 484 isTimeout = !notFull.wait(timeout); 485 } 486 return true; 487 } finally { 488 lock.unlock(); 489 } 490 } 491 492 /** 493 * Links the provided element as the last in the queue, waiting up to the 494 * specified time to do so if the queue is full. 495 * 496 * @param e element to link 497 * @param timeout length of time to wait 498 * @param unit units that timeout is expressed in 499 * 500 * @return {@code true} if successful, otherwise {@code false} 501 * 502 * @throws NullPointerException if e is null 503 * @throws InterruptedException if the thread is interrupted whist waiting 504 * for space 505 */ 506 override bool offerLast(E e, Duration timeout) { 507 if (e is null) { 508 throw new NullPointerException(); 509 } 510 // long nanos = unit.toNanos(timeout); 511 lock.lock(); 512 bool isTimeout = false; 513 try { 514 while (!linkLast(e)) { 515 if (isTimeout) { 516 return false; 517 } 518 isTimeout = !notFull.wait(timeout); 519 } 520 return true; 521 } finally { 522 lock.unlock(); 523 } 524 } 525 526 override 527 E pollFirst() { 528 lock.lock(); 529 try { 530 return unlinkFirst(); 531 } finally { 532 lock.unlock(); 533 } 534 } 535 536 override 537 E pollLast() { 538 lock.lock(); 539 try { 540 return unlinkLast(); 541 } finally { 542 lock.unlock(); 543 } 544 } 545 546 /** 547 * Unlinks the first element in the queue, waiting until there is an element 548 * to unlink if the queue is empty. 549 * 550 * @return the unlinked element 551 * @throws InterruptedException if the current thread is interrupted 552 */ 553 override E takeFirst() { 554 lock.lock(); 555 try { 556 E x; 557 while ( (x = unlinkFirst()) is null) { 558 notEmpty.wait(); 559 } 560 return x; 561 } finally { 562 lock.unlock(); 563 } 564 } 565 566 /** 567 * Unlinks the last element in the queue, waiting until there is an element 568 * to unlink if the queue is empty. 569 * 570 * @return the unlinked element 571 * @throws InterruptedException if the current thread is interrupted 572 */ 573 E takeLast(){ 574 lock.lock(); 575 try { 576 E x; 577 while ( (x = unlinkLast()) is null) { 578 notEmpty.wait(); 579 } 580 return x; 581 } finally { 582 lock.unlock(); 583 } 584 } 585 586 /** 587 * Unlinks the first element in the queue, waiting up to the specified time 588 * to do so if the queue is empty. 589 * 590 * @param timeout length of time to wait 591 * @param unit units that timeout is expressed in 592 * 593 * @return the unlinked element 594 * @throws InterruptedException if the current thread is interrupted 595 */ 596 override E pollFirst(Duration timeout) { 597 lock.lock(); 598 scope(exit) lock.unlock(); 599 600 bool isTimeout = false; 601 E x; 602 try { 603 while ( (x = unlinkFirst()) is null && !isTimeout) { 604 isTimeout = !notEmpty.wait(timeout); 605 // infof("result: %s, isTimeout: %s, in %s", x is null, isTimeout, timeout); 606 } 607 } catch(Exception ex) { 608 debug warning(ex.msg); 609 version(HUNT_DEBUG) warning(ex); 610 } 611 612 return x; 613 } 614 615 /** 616 * Unlinks the last element in the queue, waiting up to the specified time 617 * to do so if the queue is empty. 618 * 619 * @param timeout length of time to wait 620 * @param unit units that timeout is expressed in 621 * 622 * @return the unlinked element 623 * @throws InterruptedException if the current thread is interrupted 624 */ 625 E pollLast(Duration timeout) { 626 lock.lock(); 627 scope(exit) lock.unlock(); 628 629 bool isTimeout = false; 630 E x; 631 try { 632 while ( (x = unlinkLast()) is null) { 633 if (isTimeout) { 634 return null; 635 } 636 isTimeout = !notEmpty.wait(timeout); 637 } 638 } catch(Exception ex) { 639 debug warning(ex.msg); 640 version(HUNT_DEBUG) warning(ex); 641 } 642 643 return x; 644 } 645 646 /** 647 * {@inheritDoc} 648 */ 649 override 650 E getFirst() { 651 E x = peekFirst(); 652 if (x is null) { 653 throw new NoSuchElementException(); 654 } 655 return x; 656 } 657 658 /** 659 * {@inheritDoc} 660 */ 661 override 662 E getLast() { 663 E x = peekLast(); 664 if (x is null) { 665 throw new NoSuchElementException(); 666 } 667 return x; 668 } 669 670 override 671 E peekFirst() { 672 lock.lock(); 673 674 try { 675 return first is null ? null : first.item; 676 } finally { 677 lock.unlock(); 678 } 679 } 680 681 override 682 E peekLast() { 683 lock.lock(); 684 try { 685 return last is null ? null : last.item; 686 } finally { 687 lock.unlock(); 688 } 689 } 690 691 override 692 bool removeFirstOccurrence(E o) { 693 if (o is null) { 694 return false; 695 } 696 lock.lock(); 697 try { 698 for (Node!(E) p = first; p !is null; p = p.next) { 699 if (o == p.item) { 700 unlink(p); 701 return true; 702 } 703 } 704 return false; 705 } finally { 706 lock.unlock(); 707 } 708 } 709 710 // override 711 bool removeLastOccurrence(E o) { 712 if (o is null) { 713 return false; 714 } 715 lock.lock(); 716 try { 717 for (Node!(E) p = last; p !is null; p = p.prev) { 718 if (o == p.item) { 719 unlink(p); 720 return true; 721 } 722 } 723 return false; 724 } finally { 725 lock.unlock(); 726 } 727 } 728 729 // BlockingQueue methods 730 731 /** 732 * Returns the number of additional elements that this deque can ideally 733 * (in the absence of memory or resource constraints) accept without 734 * blocking. This is always equal to the initial capacity of this deque 735 * less the current {@code size} of this deque. 736 * 737 * <p>Note that you <em>cannot</em> always tell if an attempt to insert 738 * an element will succeed by inspecting {@code remainingCapacity} 739 * because it may be the case that another thread is about to 740 * insert or remove an element. 741 * 742 * @return The number of additional elements the queue is able to accept 743 */ 744 int remainingCapacity() { 745 lock.lock(); 746 try { 747 return capacity - count; 748 } finally { 749 lock.unlock(); 750 } 751 } 752 753 /** 754 * Drains the queue to the specified collection. 755 * 756 * @param c The collection to add the elements to 757 * 758 * @return number of elements added to the collection 759 * 760 * @throws UnsupportedOperationException if the add operation is not 761 * supported by the specified collection 762 * @throws ClassCastException if the class of the elements held by this 763 * collection prevents them from being added to the specified 764 * collection 765 * @throws NullPointerException if c is null 766 * @throws IllegalArgumentException if c is this instance 767 */ 768 int drainTo(Collection!E c) { 769 return drainTo(c, int.max); 770 } 771 772 /** 773 * Drains no more than the specified number of elements from the queue to the 774 * specified collection. 775 * 776 * @param c collection to add the elements to 777 * @param maxElements maximum number of elements to remove from the queue 778 * 779 * @return number of elements added to the collection 780 * @throws UnsupportedOperationException if the add operation is not 781 * supported by the specified collection 782 * @throws ClassCastException if the class of the elements held by this 783 * collection prevents them from being added to the specified 784 * collection 785 * @throws NullPointerException if c is null 786 * @throws IllegalArgumentException if c is this instance 787 */ 788 int drainTo(Collection!E c, int maxElements) { 789 if (c is null) { 790 throw new NullPointerException(); 791 } 792 if (c is this) { 793 throw new IllegalArgumentException(); 794 } 795 lock.lock(); 796 try { 797 int n = min(maxElements, count); 798 for (int i = 0; i < n; i++) { 799 c.add(first.item); // In this order, in case add() throws. 800 unlinkFirst(); 801 } 802 return n; 803 } finally { 804 lock.unlock(); 805 } 806 } 807 808 // Collection methods 809 810 /** 811 * Returns the number of elements in this deque. 812 * 813 * @return the number of elements in this deque 814 */ 815 override 816 int size() { 817 return count; 818 } 819 820 /** 821 * Returns {@code true} if this deque contains the specified element. 822 * More formally, returns {@code true} if and only if this deque contains 823 * at least one element {@code e} such that {@code o == e}. 824 * 825 * @param o object to be checked for containment in this deque 826 * @return {@code true} if this deque contains the specified element 827 */ 828 override 829 bool contains(E o) { 830 if (o is null) { 831 return false; 832 } 833 lock.lock(); 834 try { 835 for (Node!(E) p = first; p !is null; p = p.next) { 836 if (o == p.item) { 837 return true; 838 } 839 } 840 return false; 841 } finally { 842 lock.unlock(); 843 } 844 } 845 846 /* 847 * TODO: Add support for more efficient bulk operations. 848 * 849 * We don't want to acquire the lock for every iteration, but we 850 * also want other threads a chance to interact with the 851 * collection, especially when count is close to capacity. 852 */ 853 854 // /** 855 // * Adds all of the elements in the specified collection to this 856 // * queue. Attempts to addAll of a queue to itself result in 857 // * {@code IllegalArgumentException}. Further, the behavior of 858 // * this operation is undefined if the specified collection is 859 // * modified while the operation is in progress. 860 // * 861 // * @param c collection containing elements to be added to this queue 862 // * @return {@code true} if this queue changed as a result of the call 863 // * @throws ClassCastException 864 // * @throws NullPointerException 865 // * @throws IllegalArgumentException 866 // * @throws IllegalStateException 867 // * @see #add(Object) 868 // */ 869 // bool addAll(Collection<? extends E> c) { 870 // if (c is null) 871 // throw new NullPointerException(); 872 // if (c == this) 873 // throw new IllegalArgumentException(); 874 // ReentrantLock lock = this.lock; 875 // lock.lock(); 876 // try { 877 // bool modified = false; 878 // foreach(E e ; c) 879 // if (linkLast(e)) 880 // modified = true; 881 // return modified; 882 // } finally { 883 // lock.unlock(); 884 // } 885 // } 886 887 /** 888 * Returns an array containing all of the elements in this deque, in 889 * proper sequence (from first to last element). 890 * 891 * <p>The returned array will be "safe" in that no references to it are 892 * maintained by this deque. (In other words, this method must allocate 893 * a new array). The caller is thus free to modify the returned array. 894 * 895 * <p>This method acts as bridge between array-based and collection-based 896 * APIs. 897 * 898 * @return an array containing all of the elements in this deque 899 */ 900 override 901 E[] toArray() { 902 lock.lock(); 903 try { 904 E[] a = new E[count]; 905 int k = 0; 906 for (Node!(E) p = first; p !is null; p = p.next) { 907 a[k++] = p.item; 908 } 909 return a; 910 } finally { 911 lock.unlock(); 912 } 913 } 914 915 /** 916 * {@inheritDoc} 917 */ 918 // override 919 // <T> T[] toArray(T[] a) { 920 // lock.lock(); 921 // try { 922 // if (a.length < count) { 923 // a = (T[])java.lang.reflect.Array.newInstance 924 // (a.getClass().getComponentType(), count); 925 // } 926 // int k = 0; 927 // for (Node!(E) p = first; p !is null; p = p.next) { 928 // a[k++] = (T)p.item; 929 // } 930 // if (a.length > k) { 931 // a[k] = null; 932 // } 933 // return a; 934 // } finally { 935 // lock.unlock(); 936 // } 937 // } 938 939 override 940 string toString() { 941 lock.lock(); 942 try { 943 return super.toString(); 944 } finally { 945 lock.unlock(); 946 } 947 } 948 949 /** 950 * Atomically removes all of the elements from this deque. 951 * The deque will be empty after this call returns. 952 */ 953 override 954 void clear() { 955 lock.lock(); 956 try { 957 for (Node!(E) f = first; f !is null;) { 958 f.item = null; 959 Node!(E) n = f.next; 960 f.prev = null; 961 f.next = null; 962 f = n; 963 } 964 first = last = null; 965 count = 0; 966 // notFull.signalAll(); 967 notFull.notifyAll(); 968 } finally { 969 lock.unlock(); 970 } 971 } 972 973 /** 974 * Returns an iterator over the elements in this deque in proper sequence. 975 * The elements will be returned in order from first (head) to last (tail). 976 * The returned {@code Iterator} is a "weakly consistent" iterator that 977 * will never throw {@link java.util.ConcurrentModificationException 978 * ConcurrentModificationException}, 979 * and guarantees to traverse elements as they existed upon 980 * construction of the iterator, and may (but is not guaranteed to) 981 * reflect any modifications subsequent to construction. 982 * 983 * @return an iterator over the elements in this deque in proper sequence 984 */ 985 override 986 InputRange!(E) iterator() { 987 return new Itr(); 988 } 989 990 /** 991 * {@inheritDoc} 992 */ 993 // override 994 InputRange!(E) descendingIterator() { 995 return new DescendingItr(); 996 } 997 998 /** 999 * Base class for Iterators for LinkedBlockingDeque 1000 */ 1001 private abstract class AbstractItr : InputRange!(E) { 1002 /** 1003 * The next node to return in next() 1004 */ 1005 Node!(E) next; 1006 1007 /** 1008 * nextItem holds on to item fields because once we claim that 1009 * an element exists in hasNext(), we must return item read 1010 * under lock (in advance()) even if it was in the process of 1011 * being removed when hasNext() was called. 1012 */ 1013 E nextItem; 1014 1015 /** 1016 * Node returned by most recent call to next. Needed by remove. 1017 * Reset to null if this element is deleted by a call to remove. 1018 */ 1019 private Node!(E) lastRet; 1020 1021 /** 1022 * Obtain the first node to be returned by the iterator. 1023 * 1024 * @return first node 1025 */ 1026 abstract Node!(E) firstNode(); 1027 1028 /** 1029 * For a given node, obtain the next node to be returned by the 1030 * iterator. 1031 * 1032 * @param n given node 1033 * 1034 * @return next node 1035 */ 1036 abstract Node!(E) nextNode(Node!(E) n); 1037 1038 /** 1039 * Create a new iterator. Sets the initial position. 1040 */ 1041 this() { 1042 // set to initial position 1043 lock.lock(); 1044 try { 1045 next = firstNode(); 1046 nextItem = next is null ? null : next.item; 1047 } finally { 1048 lock.unlock(); 1049 } 1050 } 1051 1052 /** 1053 * Returns the successor node of the given non-null, but 1054 * possibly previously deleted, node. 1055 * 1056 * @param n node whose successor is sought 1057 * @return successor node 1058 */ 1059 private Node!(E) succ(Node!(E) n) { 1060 // Chains of deleted nodes ending in null or self-links 1061 // are possible if multiple interior nodes are removed. 1062 for (;;) { 1063 Node!(E) s = nextNode(n); 1064 if (s is null) { 1065 return null; 1066 } else if (s.item !is null) { 1067 return s; 1068 } else if (s == n) { 1069 return firstNode(); 1070 } else { 1071 n = s; 1072 } 1073 } 1074 } 1075 1076 /** 1077 * Advances next. 1078 */ 1079 void advance() { 1080 lock.lock(); 1081 try { 1082 // assert next !is null; 1083 next = succ(next); 1084 nextItem = next is null ? null : next.item; 1085 } finally { 1086 lock.unlock(); 1087 } 1088 } 1089 1090 // override 1091 bool empty() { 1092 return next is null; 1093 } 1094 1095 // override 1096 E front() { 1097 if (next is null) { 1098 throw new NoSuchElementException(); 1099 } 1100 return nextItem; 1101 // lastRet = next; 1102 // E x = nextItem; 1103 // advance(); 1104 // return x; 1105 } 1106 1107 void popFront() { 1108 if (next is null) { 1109 throw new NoSuchElementException(); 1110 } 1111 lastRet = next; 1112 advance(); 1113 } 1114 1115 // override 1116 void remove() { 1117 Node!(E) n = lastRet; 1118 if (n is null) { 1119 throw new IllegalStateException(); 1120 } 1121 lastRet = null; 1122 lock.lock(); 1123 try { 1124 if (n.item !is null) { 1125 unlink(n); 1126 } 1127 } finally { 1128 lock.unlock(); 1129 } 1130 } 1131 1132 E moveFront() @property { throw new NotSupportedException(); } 1133 1134 1135 int opApply(scope int delegate(E) dg) { 1136 if(dg is null) 1137 throw new NullPointerException(); 1138 1139 int result = 0; 1140 lock.lock(); 1141 scope(exit) { 1142 lock.unlock(); 1143 } 1144 1145 while(next !is null) { 1146 result = dg(nextItem); 1147 next = succ(next); 1148 nextItem = next is null ? null : next.item; 1149 } 1150 1151 return result; 1152 } 1153 1154 /// Ditto 1155 int opApply(scope int delegate(size_t, E) dg) { 1156 if(dg is null) 1157 throw new NullPointerException(); 1158 1159 int result = 0; 1160 lock.lock(); 1161 scope(exit) { 1162 lock.unlock(); 1163 } 1164 1165 size_t index = 0; 1166 while(next !is null) { 1167 result = dg(index, nextItem); 1168 next = succ(next); 1169 nextItem = next is null ? null : next.item; 1170 index++; 1171 } 1172 1173 return result; 1174 } 1175 } 1176 1177 /** Forward iterator */ 1178 private class Itr : AbstractItr { 1179 override Node!(E) firstNode() { return first; } 1180 1181 override Node!(E) nextNode(Node!(E) n) { return n.next; } 1182 1183 } 1184 1185 /** Descending iterator */ 1186 private class DescendingItr : AbstractItr { 1187 override Node!(E) firstNode() { return last; } 1188 1189 override Node!(E) nextNode(Node!(E) n) { return n.prev; } 1190 } 1191 1192 /** 1193 * Saves the state of this deque to a stream (that is, serialize it). 1194 * 1195 * @serialData The capacity (int), followed by elements (each an 1196 * {@code Object}) in the proper order, followed by a null 1197 * @param s the stream 1198 */ 1199 // private void writeObject(java.io.ObjectOutputStream s) { 1200 // lock.lock(); 1201 // try { 1202 // // Write out capacity and any hidden stuff 1203 // s.defaultWriteObject(); 1204 // // Write out all elements in the proper order. 1205 // for (Node!(E) p = first; p !is null; p = p.next) { 1206 // s.writeObject(p.item); 1207 // } 1208 // // Use trailing null as sentinel 1209 // s.writeObject(null); 1210 // } finally { 1211 // lock.unlock(); 1212 // } 1213 // } 1214 1215 /** 1216 * Reconstitutes this deque from a stream (that is, 1217 * deserialize it). 1218 * @param s the stream 1219 */ 1220 // private void readObject(java.io.ObjectInputStream s) 1221 // throws java.io.IOException, ClassNotFoundException { 1222 // s.defaultReadObject(); 1223 // count = 0; 1224 // first = null; 1225 // last = null; 1226 // // Read in all elements and place in queue 1227 // for (;;) { 1228 // @SuppressWarnings("unchecked") 1229 // final 1230 // E item = (E)s.readObject(); 1231 // if (item is null) { 1232 // break; 1233 // } 1234 // add(item); 1235 // } 1236 // } 1237 1238 // Monitoring methods 1239 1240 /** 1241 * Returns true if there are threads waiting to take instances from this deque. See disclaimer on accuracy in 1242 * {@link java.util.concurrent.locks.ReentrantLock#hasWaiters(Condition)}. 1243 * 1244 * @return true if there is at least one thread waiting on this deque's notEmpty condition. 1245 */ 1246 bool hasTakeWaiters() { 1247 lock.lock(); 1248 try { 1249 // return lock.hasWaiters(notEmpty); 1250 version(HUNT_DEBUG) { 1251 int len = notEmpty.getWaitQueueLength(); 1252 if(len>0) { 1253 tracef("waiters: %d", len); 1254 } 1255 } 1256 return notEmpty.hasWaiters(); 1257 } finally { 1258 lock.unlock(); 1259 } 1260 } 1261 1262 /** 1263 * Returns the length of the queue of threads waiting to take instances from this deque. See disclaimer on accuracy 1264 * in {@link java.util.concurrent.locks.ReentrantLock#getWaitQueueLength(Condition)}. 1265 * 1266 * @return number of threads waiting on this deque's notEmpty condition. 1267 */ 1268 int getTakeQueueLength() { 1269 lock.lock(); 1270 scope(exit) lock.unlock(); 1271 return notEmpty.getWaitQueueLength(); 1272 } 1273 1274 /** 1275 * Interrupts the threads currently waiting to take an object from the pool. See disclaimer on accuracy in 1276 * {@link java.util.concurrent.locks.ReentrantLock#getWaitingThreads(Condition)}. 1277 */ 1278 void interuptTakeWaiters() { 1279 lock.lock(); 1280 scope(exit) lock.unlock(); 1281 1282 try { 1283 notEmpty.notifyAll(); 1284 } catch(Exception ex) { 1285 debug warning(ex.msg); 1286 version(HUNT_DEBUG) warning(ex); 1287 } 1288 } 1289 }