1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 module hunt.pool.impl.LinkedBlockingDeque;
18 
19 import hunt.collection;
20 import hunt.Exceptions;
21 import hunt.logging.ConsoleLogger;
22 
23 import core.time;
24 // import core.sync.condition;
25 import hunt.pool.impl.Condition;
26 import hunt.pool.impl.Mutex;
27 // import core.sync.mutex;
28 
29 import std.algorithm;
30 import std.range;
31 
32 
33 /**
34  * An optionally-bounded {@linkplain java.util.concurrent.BlockingDeque blocking
35  * deque} based on linked nodes.
36  *
37  * <p> The optional capacity bound constructor argument serves as a
38  * way to prevent excessive expansion. The capacity, if unspecified,
39  * is equal to {@link Integer#MAX_VALUE}.  Linked nodes are
40  * dynamically created upon each insertion unless this would bring the
41  * deque above capacity.
42  *
43  * <p>Most operations run in constant time (ignoring time spent
44  * blocking).  Exceptions include {@link #remove(Object) remove},
45  * {@link #removeFirstOccurrence removeFirstOccurrence}, {@link
46  * #removeLastOccurrence removeLastOccurrence}, {@link #contains
47  * contains}, {@link #iterator iterator.remove()}, and the bulk
48  * operations, all of which run in linear time.
49  *
50  * <p>This class and its iterator implement all of the
51  * <em>optional</em> methods of the {@link Collection} and {@link
52  * Iterator} interfaces.
53  *
54  * <p>This class is a member of the
55  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
56  * Java Collections Framework</a>.
57  *
58  * @param <E> the type of elements held in this collection
59  *
60  * Note: This was copied from Apache Harmony and modified to suit the needs of
61  *       Commons Pool.
62  *
63  */
64 class LinkedBlockingDeque(E) : AbstractDeque!(E) { // , Serializable 
65 
66     /*
67      * Implemented as a simple doubly-linked list protected by a
68      * single lock and using conditions to manage blocking.
69      *
70      * To implement weakly consistent iterators, it appears we need to
71      * keep all Nodes GC-reachable from a predecessor dequeued Node.
72      * That would cause two problems:
73      * - allow a rogue Iterator to cause unbounded memory retention
74      * - cause cross-generational linking of old Nodes to new Nodes if
75      *   a Node was tenured while live, which generational GCs have a
76      *   hard time dealing with, causing repeated major collections.
77      * However, only non-deleted Nodes need to be reachable from
78      * dequeued Nodes, and reachability does not necessarily have to
79      * be of the kind understood by the GC.  We use the trick of
80      * linking a Node that has just been dequeued to itself.  Such a
81      * self-link implicitly means to jump to "first" (for next links)
82      * or "last" (for prev links).
83      */
84 
85     /*
86      * We have "diamond" multiple interface/abstract class inheritance
87      * here, and that introduces ambiguities. Often we want the
88      * BlockingDeque javadoc combined with the AbstractQueue
89      * implementation, so a lot of method specs are duplicated here.
90      */
91 
92 
93 
94     /**
95      * Doubly-linked list node class.
96      *
97      * @param <E> node item type
98      */
99     private static class Node(E) {
100         /**
101          * The item, or null if this node has been removed.
102          */
103         E item;
104 
105         /**
106          * One of:
107          * - the real predecessor Node
108          * - this Node, meaning the predecessor is tail
109          * - null, meaning there is no predecessor
110          */
111         Node!(E) prev;
112 
113         /**
114          * One of:
115          * - the real successor Node
116          * - this Node, meaning the successor is head
117          * - null, meaning there is no successor
118          */
119         Node!(E) next;
120 
121         /**
122          * Create a new list node.
123          *
124          * @param x The list item
125          * @param p Previous item
126          * @param n Next item
127          */
128         this(E x, Node!(E) p, Node!(E) n) {
129             item = x;
130             prev = p;
131             next = n;
132         }
133     }
134 
135     /**
136      * Pointer to first node.
137      * Invariant: (first is null && last is null) ||
138      *            (first.prev is null && first.item !is null)
139      */
140     private Node!(E) first; // @GuardedBy("lock")
141 
142     /**
143      * Pointer to last node.
144      * Invariant: (first is null && last is null) ||
145      *            (last.next is null && last.item !is null)
146      */
147     private Node!(E) last; // @GuardedBy("lock")
148 
149     /** Number of items in the deque */
150     private int count; // @GuardedBy("lock")
151 
152     /** Maximum number of items in the deque */
153     private int capacity;
154 
155     /** Main lock guarding all access */
156     private Mutex lock;
157 
158     /** Condition for waiting takes */
159     private Condition notEmpty;
160 
161     /** Condition for waiting puts */
162     private Condition notFull;
163 
164     /**
165      * Creates a {@code LinkedBlockingDeque} with a capacity of
166      * {@link Integer#MAX_VALUE}.
167      */
168     this() {
169         this(int.max);
170     }
171 
172     /**
173      * Creates a {@code LinkedBlockingDeque} with a capacity of
174      * {@link Integer#MAX_VALUE} and the given fairness policy.
175      * @param fairness true means threads waiting on the deque should be served
176      * as if waiting in a FIFO request queue
177      */
178     this(bool fairness) {
179         this(int.max, fairness);
180     }
181 
182     /**
183      * Creates a {@code LinkedBlockingDeque} with the given (fixed) capacity.
184      *
185      * @param capacity the capacity of this deque
186      * @throws IllegalArgumentException if {@code capacity} is less than 1
187      */
188     this(int capacity) {
189         this(capacity, false);
190     }
191 
192     /**
193      * Creates a {@code LinkedBlockingDeque} with the given (fixed) capacity
194      * and fairness policy.
195      *
196      * @param capacity the capacity of this deque
197      * @param fairness true means threads waiting on the deque should be served
198      * as if waiting in a FIFO request queue
199      * @throws IllegalArgumentException if {@code capacity} is less than 1
200      */
201     this(int capacity, bool fairness) {
202         if (capacity <= 0) {
203             throw new IllegalArgumentException();
204         }
205         this.capacity = capacity;
206         lock = new Mutex(); // new InterruptibleReentrantLock(fairness);
207         notEmpty = new Condition(lock); // lock.newCondition();
208         notFull = new Condition(lock); // lock.newCondition();
209     }
210 
211     /**
212      * Creates a {@code LinkedBlockingDeque} with a capacity of
213      * {@link Integer#MAX_VALUE}, initially containing the elements of
214      * the given collection, added in traversal order of the
215      * collection's iterator.
216      *
217      * @param c the collection of elements to initially contain
218      * @throws NullPointerException if the specified collection or any
219      *         of its elements are null
220      */
221     this(Collection!E c) {
222         this(int.max);
223 
224         foreach(E e ; c) {
225             if (e is null) {
226                 throw new NullPointerException();
227             }
228             if (!linkLast(e)) {
229                 throw new IllegalStateException("Deque full");
230             }
231         }
232     }
233 
234     this(E[] c) {
235         this(int.max);
236         foreach(E e ; c) {
237             if (e is null) {
238                 throw new NullPointerException();
239             }
240             if (!linkLast(e)) {
241                 throw new IllegalStateException("Deque full");
242             }
243         }
244     }
245 
246     // Basic linking and unlinking operations, called only while holding lock
247 
248     /**
249      * Links provided element as first element, or returns false if full.
250      *
251      * @param e The element to link as the first element.
252      *
253      * @return {@code true} if successful, otherwise {@code false}
254      */
255     private bool linkFirst(E e) {
256         // assert lock.isHeldByCurrentThread();
257         if (count >= capacity) {
258             return false;
259         }
260         Node!(E) f = first;
261         Node!(E) x = new Node!(E)(e, null, f);
262         first = x;
263         if (last is null) {
264             last = x;
265         } else {
266             f.prev = x;
267         }
268         ++count;
269         notEmpty.notify();
270         return true;
271     }
272 
273     /**
274      * Links provided element as last element, or returns false if full.
275      *
276      * @param e The element to link as the last element.
277      *
278      * @return {@code true} if successful, otherwise {@code false}
279      */
280     private bool linkLast(E e) {
281         // assert lock.isHeldByCurrentThread();
282         if (count >= capacity) {
283             return false;
284         }
285         Node!(E) l = last;
286         Node!(E) x = new Node!E(e, l, null);
287         last = x;
288         if (first is null) {
289             first = x;
290         } else {
291             l.next = x;
292         }
293         ++count;
294         notEmpty.notify();
295         return true;
296     }
297 
298     /**
299      * Removes and returns the first element, or null if empty.
300      *
301      * @return The first element or {@code null} if empty
302      */
303     private E unlinkFirst() {
304         // assert lock.isHeldByCurrentThread();
305         Node!(E) f = first;
306         if (f is null) {
307             return null;
308         }
309         Node!(E) n = f.next;
310         E item = f.item;
311         f.item = null;
312         f.next = f; // help GC
313         first = n;
314         if (n is null) {
315             last = null;
316         } else {
317             n.prev = null;
318         }
319         --count;
320         notFull.notify();
321         return item;
322     }
323 
324     /**
325      * Removes and returns the last element, or null if empty.
326      *
327      * @return The first element or {@code null} if empty
328      */
329     private E unlinkLast() {
330         // assert lock.isHeldByCurrentThread();
331         Node!(E) l = last;
332         if (l is null) {
333             return null;
334         }
335         Node!(E) p = l.prev;
336         E item = l.item;
337         l.item = null;
338         l.prev = l; // help GC
339         last = p;
340         if (p is null) {
341             first = null;
342         } else {
343             p.next = null;
344         }
345         --count;
346         notFull.notify();
347         return item;
348     }
349 
350     /**
351      * Unlinks the provided node.
352      *
353      * @param x The node to unlink
354      */
355     private void unlink(Node!(E) x) {
356         // assert lock.isHeldByCurrentThread();
357         Node!(E) p = x.prev;
358         Node!(E) n = x.next;
359         if (p is null) {
360             unlinkFirst();
361         } else if (n is null) {
362             unlinkLast();
363         } else {
364             p.next = n;
365             n.prev = p;
366             x.item = null;
367             // Don't mess with x's links.  They may still be in use by
368             // an iterator.
369         --count;
370             notFull.notify();
371         }
372     }
373 
374     // BlockingDeque methods
375 
376     /**
377      * {@inheritDoc}
378      */
379     override
380     bool offerFirst(E e) {
381         if (e is null) {
382             throw new NullPointerException();
383         }
384         lock.lock();
385         try {
386             return linkFirst(e);
387         } finally {
388             lock.unlock();
389         }
390     }
391 
392     /**
393      * {@inheritDoc}
394      */
395     override bool offerLast(E e) {
396         if (e is null) {
397             throw new NullPointerException();
398         }
399         lock.lock();
400         try {
401             return linkLast(e);
402         } finally {
403             lock.unlock();
404         }
405     }
406 
407     /**
408      * Links the provided element as the first in the queue, waiting until there
409      * is space to do so if the queue is full.
410      *
411      * @param e element to link
412      *
413      * @throws NullPointerException if e is null
414      * @throws InterruptedException if the thread is interrupted whilst waiting
415      *         for space
416      */
417     void putFirst(E e){
418         if (e is null) {
419             throw new NullPointerException();
420         }
421         lock.lock();
422         try {
423             while (!linkFirst(e)) {
424                 notFull.wait();
425             }
426         } finally {
427             lock.unlock();
428         }
429     }
430 
431     /**
432      * Links the provided element as the last in the queue, waiting until there
433      * is space to do so if the queue is full.
434      *
435      * @param e element to link
436      *
437      * @throws NullPointerException if e is null
438      * @throws InterruptedException if the thread is interrupted whilst waiting
439      *         for space
440      */
441     override void putLast(E e) {
442         if (e is null) {
443             throw new NullPointerException();
444         }
445         lock.lock();
446         try {
447             while (!linkLast(e)) {
448                 notFull.wait();
449             }
450         } finally {
451             lock.unlock();
452         }
453     }
454 
455     /**
456      * Links the provided element as the first in the queue, waiting up to the
457      * specified time to do so if the queue is full.
458      *
459      * @param e         element to link
460      * @param timeout   length of time to wait
461      * @param unit      units that timeout is expressed in
462      *
463      * @return {@code true} if successful, otherwise {@code false}
464      *
465      * @throws NullPointerException if e is null
466      * @throws InterruptedException if the thread is interrupted whilst waiting
467      *         for space
468      */
469     bool offerFirst(E e, Duration timeout) {
470         if (e is null) {
471             throw new NullPointerException();
472         }
473         // long nanos = unit.toNanos(timeout);
474         lock.lock();
475         bool isTimeout = false;
476         try {
477             while (!linkFirst(e)) {
478                 if (isTimeout) {
479                     return false;
480                 }
481                 // nanos = notFull.awaitNanos(nanos);
482 // TODO: Tasks pending completion -@zxp at 7/10/2019, 1:31:30 PM                
483 // 
484                 isTimeout = !notFull.wait(timeout);
485             }
486             return true;
487         } finally {
488             lock.unlock();
489         }
490     }
491 
492     /**
493      * Links the provided element as the last in the queue, waiting up to the
494      * specified time to do so if the queue is full.
495      *
496      * @param e         element to link
497      * @param timeout   length of time to wait
498      * @param unit      units that timeout is expressed in
499      *
500      * @return {@code true} if successful, otherwise {@code false}
501      *
502      * @throws NullPointerException if e is null
503      * @throws InterruptedException if the thread is interrupted whist waiting
504      *         for space
505      */
506     override bool offerLast(E e, Duration timeout) {
507         if (e is null) {
508             throw new NullPointerException();
509         }
510         // long nanos = unit.toNanos(timeout);
511         lock.lock();
512         bool isTimeout = false;
513         try {
514             while (!linkLast(e)) {
515                 if (isTimeout) {
516                     return false;
517                 }
518                 isTimeout = !notFull.wait(timeout);
519             }
520             return true;
521         } finally {
522             lock.unlock();
523         }
524     }
525 
526     override
527     E pollFirst() {
528         lock.lock();
529         try {
530             return unlinkFirst();
531         } finally {
532             lock.unlock();
533         }
534     }
535 
536     override
537     E pollLast() {
538         lock.lock();
539         try {
540             return unlinkLast();
541         } finally {
542             lock.unlock();
543         }
544     }
545 
546     /**
547      * Unlinks the first element in the queue, waiting until there is an element
548      * to unlink if the queue is empty.
549      *
550      * @return the unlinked element
551      * @throws InterruptedException if the current thread is interrupted
552      */
553     override E takeFirst() {
554         lock.lock();
555         try {
556             E x;
557             while ( (x = unlinkFirst()) is null) {
558                 notEmpty.wait();
559             }
560             return x;
561         } finally {
562             lock.unlock();
563         }
564     }
565 
566     /**
567      * Unlinks the last element in the queue, waiting until there is an element
568      * to unlink if the queue is empty.
569      *
570      * @return the unlinked element
571      * @throws InterruptedException if the current thread is interrupted
572      */
573     E takeLast(){
574         lock.lock();
575         try {
576             E x;
577             while ( (x = unlinkLast()) is null) {
578                 notEmpty.wait();
579             }
580             return x;
581         } finally {
582             lock.unlock();
583         }
584     }
585 
586     /**
587      * Unlinks the first element in the queue, waiting up to the specified time
588      * to do so if the queue is empty.
589      *
590      * @param timeout   length of time to wait
591      * @param unit      units that timeout is expressed in
592      *
593      * @return the unlinked element
594      * @throws InterruptedException if the current thread is interrupted
595      */
596     override E pollFirst(Duration timeout) {
597         lock.lock();
598         scope(exit) lock.unlock();
599         
600         bool isTimeout = false;
601         E x;
602         try {
603             while ( (x = unlinkFirst()) is null && !isTimeout) {
604                 isTimeout = !notEmpty.wait(timeout);
605                 // infof("result: %s, isTimeout: %s, in %s", x is null, isTimeout, timeout);
606             }
607         } catch(Exception ex) {
608             debug warning(ex.msg);
609             version(HUNT_DEBUG) warning(ex);
610         }
611 
612         return x;
613     }
614 
615     /**
616      * Unlinks the last element in the queue, waiting up to the specified time
617      * to do so if the queue is empty.
618      *
619      * @param timeout   length of time to wait
620      * @param unit      units that timeout is expressed in
621      *
622      * @return the unlinked element
623      * @throws InterruptedException if the current thread is interrupted
624      */
625     E pollLast(Duration timeout) {
626         lock.lock();
627         scope(exit) lock.unlock();
628 
629         bool isTimeout = false;
630         E x;
631         try {
632             while ( (x = unlinkLast()) is null) {
633                 if (isTimeout) {
634                     return null;
635                 }
636                 isTimeout = !notEmpty.wait(timeout);
637             }
638         } catch(Exception ex) {
639             debug warning(ex.msg);
640             version(HUNT_DEBUG) warning(ex);
641         }
642 
643         return x;
644     }
645 
646     /**
647      * {@inheritDoc}
648      */
649     override
650     E getFirst() {
651         E x = peekFirst();
652         if (x is null) {
653             throw new NoSuchElementException();
654         }
655         return x;
656     }
657 
658     /**
659      * {@inheritDoc}
660      */
661     override
662     E getLast() {
663         E x = peekLast();
664         if (x is null) {
665             throw new NoSuchElementException();
666         }
667         return x;
668     }
669 
670     override
671     E peekFirst() {
672         lock.lock();
673         
674         try {
675             return first is null ? null : first.item;
676         } finally {
677             lock.unlock();
678         }
679     }
680 
681     override
682     E peekLast() {
683         lock.lock();
684         try {
685             return last is null ? null : last.item;
686         } finally {
687             lock.unlock();
688         }
689     }
690 
691     override
692     bool removeFirstOccurrence(E o) {
693         if (o is null) {
694             return false;
695         }
696         lock.lock();
697         try {
698             for (Node!(E) p = first; p !is null; p = p.next) {
699                 if (o == p.item) {
700                     unlink(p);
701                     return true;
702                 }
703             }
704             return false;
705         } finally {
706             lock.unlock();
707         }
708     }
709 
710     // override
711     bool removeLastOccurrence(E o) {
712         if (o is null) {
713             return false;
714         }
715         lock.lock();
716         try {
717             for (Node!(E) p = last; p !is null; p = p.prev) {
718                 if (o == p.item) {
719                     unlink(p);
720                     return true;
721                 }
722             }
723             return false;
724         } finally {
725             lock.unlock();
726         }
727     }
728 
729     // BlockingQueue methods
730 
731     /**
732      * Returns the number of additional elements that this deque can ideally
733      * (in the absence of memory or resource constraints) accept without
734      * blocking. This is always equal to the initial capacity of this deque
735      * less the current {@code size} of this deque.
736      *
737      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
738      * an element will succeed by inspecting {@code remainingCapacity}
739      * because it may be the case that another thread is about to
740      * insert or remove an element.
741      *
742      * @return The number of additional elements the queue is able to accept
743      */
744     int remainingCapacity() {
745         lock.lock();
746         try {
747             return capacity - count;
748         } finally {
749             lock.unlock();
750         }
751     }
752 
753     /**
754      * Drains the queue to the specified collection.
755      *
756      * @param c The collection to add the elements to
757      *
758      * @return number of elements added to the collection
759      *
760      * @throws UnsupportedOperationException if the add operation is not
761      *         supported by the specified collection
762      * @throws ClassCastException if the class of the elements held by this
763      *         collection prevents them from being added to the specified
764      *         collection
765      * @throws NullPointerException if c is null
766      * @throws IllegalArgumentException if c is this instance
767      */
768     int drainTo(Collection!E c) {
769         return drainTo(c, int.max);
770     }
771 
772     /**
773      * Drains no more than the specified number of elements from the queue to the
774      * specified collection.
775      *
776      * @param c           collection to add the elements to
777      * @param maxElements maximum number of elements to remove from the queue
778      *
779      * @return number of elements added to the collection
780      * @throws UnsupportedOperationException if the add operation is not
781      *         supported by the specified collection
782      * @throws ClassCastException if the class of the elements held by this
783      *         collection prevents them from being added to the specified
784      *         collection
785      * @throws NullPointerException if c is null
786      * @throws IllegalArgumentException if c is this instance
787      */
788     int drainTo(Collection!E c, int maxElements) {
789         if (c is null) {
790             throw new NullPointerException();
791         }
792         if (c is this) {
793             throw new IllegalArgumentException();
794         }
795         lock.lock();
796         try {
797             int n = min(maxElements, count);
798             for (int i = 0; i < n; i++) {
799                 c.add(first.item);   // In this order, in case add() throws.
800                 unlinkFirst();
801             }
802             return n;
803         } finally {
804             lock.unlock();
805         }
806     }
807 
808     // Collection methods
809 
810     /**
811      * Returns the number of elements in this deque.
812      *
813      * @return the number of elements in this deque
814      */
815     override
816     int size() {
817         return count;
818     }
819 
820     /**
821      * Returns {@code true} if this deque contains the specified element.
822      * More formally, returns {@code true} if and only if this deque contains
823      * at least one element {@code e} such that {@code o == e}.
824      *
825      * @param o object to be checked for containment in this deque
826      * @return {@code true} if this deque contains the specified element
827      */
828     override
829     bool contains(E o) {
830         if (o is null) {
831             return false;
832         }
833         lock.lock();
834         try {
835             for (Node!(E) p = first; p !is null; p = p.next) {
836                 if (o == p.item) {
837                     return true;
838                 }
839             }
840             return false;
841         } finally {
842             lock.unlock();
843         }
844     }
845 
846     /*
847      * TODO: Add support for more efficient bulk operations.
848      *
849      * We don't want to acquire the lock for every iteration, but we
850      * also want other threads a chance to interact with the
851      * collection, especially when count is close to capacity.
852      */
853 
854 //     /**
855 //      * Adds all of the elements in the specified collection to this
856 //      * queue.  Attempts to addAll of a queue to itself result in
857 //      * {@code IllegalArgumentException}. Further, the behavior of
858 //      * this operation is undefined if the specified collection is
859 //      * modified while the operation is in progress.
860 //      *
861 //      * @param c collection containing elements to be added to this queue
862 //      * @return {@code true} if this queue changed as a result of the call
863 //      * @throws ClassCastException
864 //      * @throws NullPointerException
865 //      * @throws IllegalArgumentException
866 //      * @throws IllegalStateException
867 //      * @see #add(Object)
868 //      */
869 //     bool addAll(Collection<? extends E> c) {
870 //         if (c is null)
871 //             throw new NullPointerException();
872 //         if (c == this)
873 //             throw new IllegalArgumentException();
874 //         ReentrantLock lock = this.lock;
875 //         lock.lock();
876 //         try {
877 //             bool modified = false;
878 //             foreach(E e ; c)
879 //                 if (linkLast(e))
880 //                     modified = true;
881 //             return modified;
882 //         } finally {
883 //             lock.unlock();
884 //         }
885 //     }
886 
887     /**
888      * Returns an array containing all of the elements in this deque, in
889      * proper sequence (from first to last element).
890      *
891      * <p>The returned array will be "safe" in that no references to it are
892      * maintained by this deque.  (In other words, this method must allocate
893      * a new array).  The caller is thus free to modify the returned array.
894      *
895      * <p>This method acts as bridge between array-based and collection-based
896      * APIs.
897      *
898      * @return an array containing all of the elements in this deque
899      */
900     override
901     E[] toArray() {
902         lock.lock();
903         try {
904             E[] a = new E[count];
905             int k = 0;
906             for (Node!(E) p = first; p !is null; p = p.next) {
907                 a[k++] = p.item;
908             }
909             return a;
910         } finally {
911             lock.unlock();
912         }
913     }
914 
915     /**
916      * {@inheritDoc}
917      */
918     // override
919     // <T> T[] toArray(T[] a) {
920     //     lock.lock();
921     //     try {
922     //         if (a.length < count) {
923     //             a = (T[])java.lang.reflect.Array.newInstance
924     //                 (a.getClass().getComponentType(), count);
925     //         }
926     //         int k = 0;
927     //         for (Node!(E) p = first; p !is null; p = p.next) {
928     //             a[k++] = (T)p.item;
929     //         }
930     //         if (a.length > k) {
931     //             a[k] = null;
932     //         }
933     //         return a;
934     //     } finally {
935     //         lock.unlock();
936     //     }
937     // }
938 
939     override
940     string toString() {
941         lock.lock();
942         try {
943             return super.toString();
944         } finally {
945             lock.unlock();
946         }
947     }
948 
949     /**
950      * Atomically removes all of the elements from this deque.
951      * The deque will be empty after this call returns.
952      */
953     override
954     void clear() {
955         lock.lock();
956         try {
957             for (Node!(E) f = first; f !is null;) {
958                 f.item = null;
959                 Node!(E) n = f.next;
960                 f.prev = null;
961                 f.next = null;
962                 f = n;
963             }
964             first = last = null;
965             count = 0;
966             // notFull.signalAll();
967             notFull.notifyAll();
968         } finally {
969             lock.unlock();
970         }
971     }
972 
973     /**
974      * Returns an iterator over the elements in this deque in proper sequence.
975      * The elements will be returned in order from first (head) to last (tail).
976      * The returned {@code Iterator} is a "weakly consistent" iterator that
977      * will never throw {@link java.util.ConcurrentModificationException
978      * ConcurrentModificationException},
979      * and guarantees to traverse elements as they existed upon
980      * construction of the iterator, and may (but is not guaranteed to)
981      * reflect any modifications subsequent to construction.
982      *
983      * @return an iterator over the elements in this deque in proper sequence
984      */
985     override
986     InputRange!(E) iterator() {
987         return new Itr();
988     }
989 
990     /**
991      * {@inheritDoc}
992      */
993     // override
994     InputRange!(E) descendingIterator() {
995         return new DescendingItr();
996     }
997 
998     /**
999      * Base class for Iterators for LinkedBlockingDeque
1000      */
1001     private abstract class AbstractItr : InputRange!(E) {
1002         /**
1003          * The next node to return in next()
1004          */
1005          Node!(E) next;
1006 
1007         /**
1008          * nextItem holds on to item fields because once we claim that
1009          * an element exists in hasNext(), we must return item read
1010          * under lock (in advance()) even if it was in the process of
1011          * being removed when hasNext() was called.
1012          */
1013         E nextItem;
1014 
1015         /**
1016          * Node returned by most recent call to next. Needed by remove.
1017          * Reset to null if this element is deleted by a call to remove.
1018          */
1019         private Node!(E) lastRet;
1020 
1021         /**
1022          * Obtain the first node to be returned by the iterator.
1023          *
1024          * @return first node
1025          */
1026         abstract Node!(E) firstNode();
1027 
1028         /**
1029          * For a given node, obtain the next node to be returned by the
1030          * iterator.
1031          *
1032          * @param n given node
1033          *
1034          * @return next node
1035          */
1036         abstract Node!(E) nextNode(Node!(E) n);
1037 
1038         /**
1039          * Create a new iterator. Sets the initial position.
1040          */
1041         this() {
1042             // set to initial position
1043             lock.lock();
1044             try {
1045                 next = firstNode();
1046                 nextItem = next is null ? null : next.item;
1047             } finally {
1048                 lock.unlock();
1049             }
1050         }
1051 
1052         /**
1053          * Returns the successor node of the given non-null, but
1054          * possibly previously deleted, node.
1055          *
1056          * @param n node whose successor is sought
1057          * @return successor node
1058          */
1059         private Node!(E) succ(Node!(E) n) {
1060             // Chains of deleted nodes ending in null or self-links
1061             // are possible if multiple interior nodes are removed.
1062             for (;;) {
1063                 Node!(E) s = nextNode(n);
1064                 if (s is null) {
1065                     return null;
1066                 } else if (s.item !is null) {
1067                     return s;
1068                 } else if (s == n) {
1069                     return firstNode();
1070                 } else {
1071                     n = s;
1072                 }
1073             }
1074         }
1075 
1076         /**
1077          * Advances next.
1078          */
1079         void advance() {
1080             lock.lock();
1081             try {
1082                 // assert next !is null;
1083                 next = succ(next);
1084                 nextItem = next is null ? null : next.item;
1085             } finally {
1086                 lock.unlock();
1087             }
1088         }
1089 
1090         // override
1091         bool empty() {
1092             return next is null;
1093         }
1094 
1095         // override
1096         E front() {
1097             if (next is null) {
1098                 throw new NoSuchElementException();
1099             }
1100             return nextItem;
1101             // lastRet = next;
1102             // E x = nextItem;
1103             // advance();
1104             // return x;
1105         }
1106         
1107         void popFront() {
1108             if (next is null) {
1109                 throw new NoSuchElementException();
1110             }
1111             lastRet = next;
1112             advance();
1113         }
1114 
1115         // override
1116         void remove() {
1117             Node!(E) n = lastRet;
1118             if (n is null) {
1119                 throw new IllegalStateException();
1120             }
1121             lastRet = null;
1122             lock.lock();
1123             try {
1124                 if (n.item !is null) {
1125                     unlink(n);
1126                 }
1127             } finally {
1128                 lock.unlock();
1129             }
1130         }
1131         
1132         E moveFront() @property { throw new NotSupportedException(); }
1133 
1134         
1135         int opApply(scope int delegate(E) dg) {
1136             if(dg is null)
1137                 throw new NullPointerException();
1138             
1139             int result = 0;
1140             lock.lock();
1141             scope(exit) {
1142                 lock.unlock();
1143             }
1144 
1145             while(next !is null) {
1146                 result = dg(nextItem);
1147                 next = succ(next);
1148                 nextItem = next is null ? null : next.item;                
1149             }
1150             
1151             return result;
1152         }
1153 
1154         /// Ditto
1155         int opApply(scope int delegate(size_t, E) dg) {
1156             if(dg is null)
1157                 throw new NullPointerException();
1158 
1159             int result = 0;
1160             lock.lock();
1161             scope(exit) {
1162                 lock.unlock();
1163             }
1164 
1165             size_t index = 0;
1166             while(next !is null) {
1167                 result = dg(index, nextItem);
1168                 next = succ(next);
1169                 nextItem = next is null ? null : next.item;
1170                 index++;         
1171             }
1172             
1173             return result;                
1174         }
1175     }
1176 
1177     /** Forward iterator */
1178     private class Itr : AbstractItr {
1179         override Node!(E) firstNode() { return first; }
1180 
1181         override Node!(E) nextNode(Node!(E) n) { return n.next; }
1182 
1183         }
1184 
1185     /** Descending iterator */
1186     private class DescendingItr : AbstractItr {
1187         override Node!(E) firstNode() { return last; }
1188 
1189         override Node!(E) nextNode(Node!(E) n) { return n.prev; }
1190     }
1191 
1192     /**
1193      * Saves the state of this deque to a stream (that is, serialize it).
1194      *
1195      * @serialData The capacity (int), followed by elements (each an
1196      * {@code Object}) in the proper order, followed by a null
1197      * @param s the stream
1198      */
1199     // private void writeObject(java.io.ObjectOutputStream s) {
1200     //     lock.lock();
1201     //     try {
1202     //         // Write out capacity and any hidden stuff
1203     //         s.defaultWriteObject();
1204     //         // Write out all elements in the proper order.
1205     //         for (Node!(E) p = first; p !is null; p = p.next) {
1206     //             s.writeObject(p.item);
1207     //         }
1208     //         // Use trailing null as sentinel
1209     //         s.writeObject(null);
1210     //     } finally {
1211     //         lock.unlock();
1212     //     }
1213     // }
1214 
1215     /**
1216      * Reconstitutes this deque from a stream (that is,
1217      * deserialize it).
1218      * @param s the stream
1219      */
1220     // private void readObject(java.io.ObjectInputStream s)
1221     //     throws java.io.IOException, ClassNotFoundException {
1222     //     s.defaultReadObject();
1223     //     count = 0;
1224     //     first = null;
1225     //     last = null;
1226     //     // Read in all elements and place in queue
1227     //     for (;;) {
1228     //         @SuppressWarnings("unchecked")
1229     //         final
1230     //         E item = (E)s.readObject();
1231     //         if (item is null) {
1232     //             break;
1233     //         }
1234     //         add(item);
1235     //     }
1236     // }
1237 
1238     // Monitoring methods
1239 
1240     /**
1241      * Returns true if there are threads waiting to take instances from this deque. See disclaimer on accuracy in
1242      * {@link java.util.concurrent.locks.ReentrantLock#hasWaiters(Condition)}.
1243      *
1244      * @return true if there is at least one thread waiting on this deque's notEmpty condition.
1245      */
1246     bool hasTakeWaiters() {
1247         lock.lock();
1248         try {
1249             // return lock.hasWaiters(notEmpty);
1250             version(HUNT_DEBUG) {
1251                 int len = notEmpty.getWaitQueueLength();
1252                 if(len>0) {
1253                     tracef("waiters: %d", len);
1254                 }
1255             }
1256             return notEmpty.hasWaiters();
1257         } finally {
1258             lock.unlock();
1259         }
1260     }
1261 
1262     /**
1263      * Returns the length of the queue of threads waiting to take instances from this deque. See disclaimer on accuracy
1264      * in {@link java.util.concurrent.locks.ReentrantLock#getWaitQueueLength(Condition)}.
1265      *
1266      * @return number of threads waiting on this deque's notEmpty condition.
1267      */
1268     int getTakeQueueLength() {
1269         lock.lock();
1270         scope(exit) lock.unlock();
1271         return notEmpty.getWaitQueueLength();
1272     }
1273 
1274     /**
1275      * Interrupts the threads currently waiting to take an object from the pool. See disclaimer on accuracy in
1276      * {@link java.util.concurrent.locks.ReentrantLock#getWaitingThreads(Condition)}.
1277      */
1278     void interuptTakeWaiters() {
1279         lock.lock();
1280         scope(exit) lock.unlock();
1281         
1282         try {
1283             notEmpty.notifyAll();
1284         } catch(Exception ex) {
1285             debug warning(ex.msg);
1286             version(HUNT_DEBUG) warning(ex);
1287         }
1288     }
1289 }