Contents (hide)
1 Guarded Methods and Waiting
2 Guarded suspension
3 Java Mechanics for Guarded Suspension
3.1 Wait() and Notify()
3.1.2 Continuing our Queue Example
3.1.3 Guard Atomicity
3.1.4 Wait Atomicity
3.1.5 Wait/Notify mechanism
3.1.6 Rules of Thumb for Wait and Notify
4 More Sophisticated Primitives
4.1 Semaphores

Guarded Methods and Waiting

Concurrent code has to deal with the possibility that an asynchronous modification to object states may lead to a failure of a thread computation (usually, a pre-condition is not met because another thread changed the state of the object).

Therefore, concurrent code must devise techniques to deal with such potential failures. This is generally done using exceptions and appropriate exception handling code.

Traditionally, methods refuse to perform actions unless they can ensure that these actions will succeed, in part by first checking their preconditions. Recall the class Even from previous lectures. It is considered good practice to check all the invariants and preconditions before performing any actual work. We can change the code in the following way:

1. class Even{
2.
3.    /* the internal state counter
4.     * @INV counter_%2=0
5.     */
6.    private int counter_ = 0;
7.
8.    /* default constructor */
9.    public Even(){ }
10.
11.    /* increment the counter.
12.     * return the current counter value
13.     * @PRE counter%2_=0 (this is redundant, as it is promised by the @INV)
14.     * @POST @POST(counter_) = @PRE(counter_)+2
15.     */
16.    public synchronized int add() throws RuntimeException {
17.       if (counter_%2 != 0){
18.          throw new RuntimeException("precondition does not hold");
19.       }
20.       ++counter_;
21.       ++counter_;
22.       return counter_;
23.    }
24. }

In general, there are three basic flavors of policy decisions surrounding failed preconditions/invariants:

• Balking. Throwing an exception if the precondition fails. Here, an exception indicates refusal, not failure, but they have the same consequences to clients. This is what we do in the code above.
• Guarded suspension. Suspending the current method invocation (and its associated thread) until the precondition becomes true.
• Time-outs. Cases falling between balking and suspension, where an upper bound is placed on how long to wait for the precondition to become true.

In the reminder of this lecture we will elaborate on guarded suspension. Furthermore, We will introduce the suspension mechanisms offered by the Java RTE that are applicable in many contexts guarded suspension being one of them.

Guarded suspension

Guarded suspension and time-outs have no analog in sequential programs, but play central roles in concurrent software. To understand why, consider the following abstract queue class, which supports the add, get, and size methods.

1. abstract class BoundedQueueInterface <E{
2.
3.    protected final int MAX;
4.
5.    // @INIT: size() == 0
6.
7.    public BoundedQueueInterface(int max)
8.    {
9.       MAX = max;
10.    }
11.
12.    // @INV: size() <= MAX
13.    public abstract int size();
14.
15.    // @PRE size() < MAX
16.    // @POST @post(size()) = @pre(size())+1
17.    // @POST @post(get()) == e
18.    public abstract void add(E e);
19.
20.    // @PRE size()>0
21.    // @POST @post(size()) = @pre(size())-1
22.    // @POST @happened(add(e)) : @post(get()) == e
23.    public abstract E remove();
24.
25. }

Guards can be considered as a special form of conditionals. In sequential programs, an if statement can check whether a condition holds upon entry to a method. When the condition is false, there is no point in waiting for it to be true – it can never become true since no other concurrent activity may cause the condition to change. For example, consider the following (incomplete, non-thread-safe) implementation of a queue:

1. class SimpleQueue <Eextends BoundedQueueInterface <E>{
2.
3.    // a vector, used to implement the queue
4.    private Vector<Evec_;
5.
6.    public SimpleQueue(int max) { super(max)}
7.
9.       if (vec_.size() >= MAX)
10.          throw new RuntimeException("queue is full");
12.    }
13. }

However, in concurrent programs, asynchronous state changes can happen all the time; although the precondition might not hold, it may hold at future date, and it may be worthwhile to wait until it does. In such a case, we would like to implement something along the lines of the following code:

1. class SimpleQueue <Eextends BoundedQueueInterface <E>{
2.
3.    // a vector, used to implement the queue
4.    private Vector<Evec_;
5.
6.    public SimpleQueue(int max) { super(max)}
7.
9.       wait until (vec_.size() < MAX) and thenatomically proceed
11.    }
12. }

We will discuss why we need to wait until and continue atomically later.

In general, any guard implicitly asserts that, eventually, some other thread will cause the required state changes to occur. Time-outs are a way of softening such assertions, using a balking policy as a backup if the wait continues for too long a time.

In our next lecture we will see that guarded methods pose liveness issues, and we will further investigate the proper use of such techniques. Next, we discuss guarded methods mechanism in Java.

Java Mechanics for Guarded Suspension

The Java RTE offers the programmer a single, simple, mechanism to facilitate guarded suspension and suspension in multi-threaded context in general, the Wait and Notify constructs. First, we will understand the semantics of this mechanism and next, we will implement several higher level constructs which simplify its use.

Wait() and Notify()

Java offers threads the possibility of wait()-ing. That is, threads may wait until some other thread wakes them up. We use this mechanism to implement guarded methods.

To be more concrete, Each object in Java has an associated wait set. Threads may enter the queue of an object, o, by invoking the wait() method of o. A thread wishing to wakeup threads from the queue of o may call o.notify() (or o.notifyAll()) to wakeup just one thread (resp. all of the threads) waiting in the wait set of o.

To further clarify, the o.wait(), o.notify() and o.notifyAll() methods work as follows (there are some more details which we will discuss in a moment):

• o.wait(). A wait invocation results in the following actions:
1. The current thread (which invoked the method) is blocked.
2. The JVM places the thread in the wait set associated with o.
• o.notify(). A notify invocation results in the following actions:
1. An arbitrarily chosen thread T is removed by the JVM from the wait set associated with o (if such a thread exists).
2. Once unblocked, T is resumed from the point of its wait.
• o.notifyAll(). A notifyAll works like notify except that the steps occur (in effect, simultaneously) for all threads in the wait set of o.

Before discussing the actual use of wait() and notify(), the concept of interrupting threads must be presented. Since sometimes threads may need to stop waiting for a notify() to take place, for example when the program needs to exit, we need a mechanism to remove a thread from a waiting queue and let it resume its execution. Java implements this by sending a special exception to threads.

When a thread is going to invoke a wait() method, it needs to take into account that an exception might be thrown. Therefore, code using waits usually looks like this:

1. synchronized (obj) {
2.      try{
3.          while (<condition does not hold>)
4.              obj.wait();
5.              ... // Perform action appropriate to condition
6.          }
7.      } catch(InterruptedExcpetion e) {
8.        //handle the interrupted wait.
9.      } finally {
10.        //do something if needed.
11.      }
12. }

We shell discuss the need for the while later.

Continuing our Queue Example

Our queue example will be a very simple implementation of the Producer-Consumer design pattern. Consider a scenario in which there are two active entities (threads), one called a Producer, which produces objects which the second entity, the Consumer, needs to work on (recalling our video player example from previous lectures, the Producer may read the file from disk and give chunks of data to the Consumer, which is a decoder thread). Now, we wish to guard the add() and get() methods, such that all preconditions are respected. In concurrent RTEs, it is usually acceptable to use blocking in guarded methods, so we will want to wait() until the preconditions hold. Consider the following (yet incomplete, and wrong) implementation:

1. import java.util.Vector;
2.
3. class SimpleQueue <Eextends BoundedQueueInterface <E>{
4.
5.         // a vector, used to implement the queue
6.         private Vector<Evec_;
7.
8.         public SimpleQueue(int max) { super(max)}
9.
10.         public int size(){
11.                 return vec_.size();
12.         }
13.
15.                 try{
16.                         if(size()>=MAX)
17.                                 this.wait();
18.                 } catch (InterruptedException ignored){}
19.
21.                 // wakeup everybody. If someone is waiting in the remove()
22.                 //method, it can now perform the get.
23.                 this.notifyAll();
24.         }
25.
26.         public E remove(){
27.                 try{
28.                         if(size()==0)
29.                                 this.wait();
30.                 } catch (InterruptedException ignored){}
31.
32.                 E e = vec_.remove(0);
33.                 // wakeup everybody. If someone is waiting in the add()
34.                 //method, it can now perform the add.
35.                  this.notifyAll();
36.                 return e;
37.         }
38.
39. }

What we are trying to do here is that each thread trying to get() when the queue is full will wait in the wait set of our queue object. A thread which successfully add()s an object to the queue will then wake him up, and vice versa.

However, this code is wrong, very very wrong. First note that we "forgot" to add synchronization, Moreover two important properties does not hold:

• Guard atomicity - Consider the following scenario, in which there are two Consumer threads and one producer. The queue is empty at first, and both of the consumers execute the get() method. Both of the consumer will block, since the queue size is 0. Now comes along a single producer, which calls add() once. Since the producer calls notifyAll(), both of our consumers will be waked up. Now, both of them (in some order) will execute the vec_.remove(0) method. This is a problem, as the precondition does not hold for at least one of them.

• Wait atomicity - Consider a scenario in which there are exactly one producer and one consumer. The first to run is the consumer, which calls the get() method. The consumer checks the condition, but is stopped right after (by the scheduler). Now comes along the producer and calls add() successfully. Note that the producer invoked the notifyAll() method of this, however there was no thread in the wait set in that time! Now, the consumer is resumed (again, by the scheduler) and calls this.wait(). But the queue is not empty now! The consumer missed the notification.

Guard Atomicity

To solve the first problem, we will use a simple loop. Consider the following code:
1. public E remove(){
2.         while(size()==0){
3.            try{
4.               this.wait();
5.            } catch (InterruptedException ignored){}
6.         }
7.         E e = vec_.remove(0);
8.         // wakeup everybody. If someone is waiting in the add() method,
9.         // it can now perform the add.
10.         this.notifyAll();
11.         return e;
12. }

Using a while loop ensures that even after waking up, the producer still checks the precondition one more time. The problem is not entirely solved now (remember, we "forgot" about synchronization), but this is definitely a step forward.

On the other hand, one could think, why not using notify() instead of notifyAll(), which would ensure exactly one thread is woken up. However, using notify() is dangerous, as will be explained later.

Wait Atomicity

To achieve wait atomicity, we need to ensure no other thread may call notify() between our precondition checks and our wait. In other words, we wish to make both the check and the wait atomic with relation to the notify mechanism. Achieving this is simple: use a single lock to protect them. But, let us stop and consider the implications. If we hold a lock, and then enter a wait set, no other thread may get its hands on the lock and wake us up! To solve that problem, Java require and ensures the following: to call the wait() or notifyAll() methods of an object, the calling thread must hold the object's monitor (lock). Moreover, after calling wait(), the thread (involuntarily) releases the lock, and before exiting the wait set of the object, the thread must re-lock the object.

Wait/Notify mechanism

Following is the complete definition of the Wait/Notify mechanism:

• When a thread T calls o.wait() the wait invocation results in the following actions:
1. The JVM checks that T holds o's lock. If T does not, an exception is thrown (IllegalMonitorState).
2. T is blocked, and the lock of o (and only of o) is released.
3. The JVM places T in the wait set associated with o.
• When a thread T calls o.notify() the notify invocation results in the following actions:
1. The JVM checks that T holds o's lock. If T does not, an exception is thrown.
2. An arbitrarily chosen thread T' is removed by the JVM from the wait set associated with o (if such a thread exists).
3. Once unblocked, T' tries to re-acquire the lock of o (and if necessary, T' blocks until it does). Finally, T' is resumed from the point of its wait().
• o.notifyAll(). A notifyAll works like notify() except that the steps occur (in effect, simultaneously) for all threads in the wait set of o. This results in the Thundering Herd phenomena, as all the threads try to acquire the lock and end up competing with each other.

Note that holding the monitor's lock check is done in runtime and not in compile time.

The complete and correct implementation of our blocking queue follows:

1. import java.util.Vector;
2.
3. class SimpleQueue <Eextends BoundedQueueInterface <E>{
4.
5.         // a vector, used to implement the queue
6.         private Vector<Evec_;
7.
8.         public SimpleQueue(int max) { super(max)}
9.
10.         public synchronized int size(){
11.                 return vec_.size();
12.         }
13.
14.         public synchronized void add(E e){
15.                 while(size()>=MAX){
16.                         try{
17.                                 this.wait();
18.                         } catch (InterruptedException ignored){}
19.                 }
20.
22.                 // wakeup everybody. If someone is waiting in the remove()
23.                 // method, it can now perform the get.
24.                 this.notifyAll();
25.         }
26.
27.         public synchronized E remove(){
28.                 while(size()==0){
29.                         try{
30.                                 this.wait();
31.                         } catch (InterruptedException ignored){}
32.                 }
33.
34.                 E e = vec_.remove(0);
35.                 // wakeup everybody. If someone is waiting in the add()
36.                 // method, it can now perform the add.
37.                 this.notifyAll();
38.                 return e;
39.         }
40. }

Rules of Thumb for Wait and Notify

• Guarded wait - For each condition that needs to be waited on, write a guarded wait loop that causes the current thread to block if the guard condition is false.

• Guard atomicity - Condition checks must always be placed in while loops. When an action is resumed, the waiting thread doesn't know whether the condition it is waiting for is actually true; it only knows that it has been woken up. Thus, in order to maintain safety, it must check again. Even worst, in rare cases a thread can also wake up without being notified, interrupted, or timing out, a so-called spurious wakeup. Checking the condition in a loop ensures it would cost no harm.

• Multiple guard atomicity If there are several conditions which need to be waited for, place them all in the same loop.

• Don't forget to wake up - To ensure liveness, classes must contain code to wake up waiting threads. Every time the value of any field mentioned in a guard changes in a way that might affect the truth value of the condition, waiting threads must be awakened so they can recheck guard conditions.

• notify() Vs. notifyAll(): when multiple threads wait in the same wait set of a single object, such that there are at least two threads waiting for different conditions, we must use notifyAll() to ensure that no thread misses an event. For example, consider a simple queue which has both an add() method, as above, and an addTwo() method, which adds two objects into the queue at the same time. Now, thread $tex$ may be waiting for the queue to empty out in the add method, checking that there is at least room for one object, and thread $tex$ may be waiting in the addTwo() method, checking that there is place for two objects. Now, consider that the get() method calls notify(), and the thread which gets woken up is $tex$. Alas, there is only room for one object in the queue, so $tex$ goes back to sleep. Moreover, $tex$ will not be woken up at all, even though there is space in the queue.

More Sophisticated Primitives

Wait and Notify are the basis for higher level constructs. While Wait and Notify are very general constructs, other, more specific mechanisms can make your code easier to understand and demand less details when using them. We review here Semaphores and Readers-Writers locks.

Semaphores

A Semaphore is an abstraction of an object which controls a bounded number of permits. A permit is an abstract concept, which we can think of as a train ticket. Threads may ask the semaphore for a permit (a ticket). If the semaphore has any permits available, one permit will be assigned to the requesting thread. If there is no permit available, the requesting thread is blocked until one such permit exists – for example, when another thread returned the permit it received at an earlier stage.

For example, consider the following (very simple) implementation:

1. class Semaphore{
2.         private final int permits_;
3.         private int free_;
4.
5.         public Semaphore(int permits){
6.                 permits_ = permits;
7.                 free_ = permits;
8.         }
9.
10.         public synchronized void acquire() throws InterruptedException{
11.                 while (free_<=0){
12.                         this.wait();
13.                 }
14.
15.                 free_ --;
16.                 return;
17.         }
18.
19.         public synchronized void release() throws InterruptedException{
20.                 if (free_ < permits_) free_++;
21.                 this.notifyAll();
22.                 return;
23.         }
24. }

Java has a Semaphore class of it own, it is called java.util.concurrent.Semaphore. Note the following properties of the Java Semaphore class:

• The semaphore class is not re-entrant - if one thread calls acquire() twice, the same thread must also call release() twice!
• The semaphore can be released by a thread other than the owner (unlike lock) as semaphores have no notion of ownership.
• The constructor for this class optionally accepts a fairness parameter - if $tex$ have requested and waits for a permit before $tex$ did it will receive it first.
• Some other services are available like tryAcquire and managing permits.

The readers/writers problem is about allowing several readers and writers to access a shared resource under different policies. Different policies (variants of the problem) exist. In our example, several readers can access the resource together (to read it), but only one writer can access the resource at any given time. So for example, Readers will wait if any writer is pending (new data is about to be added, so wait for it first). Only one writer at a time can access the shared resource, and only if no readers are currently reading from it.

1. public abstract class RW {
3.   protected int activeWriters_ = 0;  // always zero or one
5.   protected int waitingWriters_ = 0// same for write_
6.
7.   protected abstract void read_()// implement in subclasses
8.   protected abstract void write_()
9.
14.   }
15.
16.   public void write() {
17.     beforeWrite()
18.     write_();
19.     afterWrite();
20.   }
21.
23.     return waitingWriters_ == 0 && activeWriters_ == 0;
24.   }
25.
26.   protected boolean allowWriter() {
27.     return activeReaders_ == 0 && activeWriters_ == 0;
28.   }
29.
30.   protected synchronized void beforeRead() {
33.       try { wait()} catch (InterruptedException ex) {}
36.   }
37.
38.   protected synchronized void afterRead()  {
40.     notifyAll();  // Will unblock any pending writer
41.   }
42.
43.   protected synchronized void beforeWrite() {
44.     ++waitingWriters_;
45.     while (!allowWriter())
46.       try { wait()} catch (InterruptedException ex) {}
47.     --waitingWriters_;
48.     ++activeWriters_;
49.   }
50.
51.   protected synchronized void afterWrite() {
52.     --activeWriters_;
53.     notifyAll()// Will unblock waiting writers and waiting readers
54.   }
55. }

Note this code handles only the locking while the actual access to the resource is done in a subclass. Also note that the policy of the sharing is determined in two simple methods: allowReader() and allowWriter(). This interface nicely encapsulates our concurrency policy in re-usable code.