;;;; "thread.scm", Multi-threading for Scheme ;;; Michael Elhadad - Jan 96 ;;; Imitation of SUN's lwp library in Scheme. ;;; Derived from process.scm in slib (Copyright (C) 1992, 1993 Aubrey Jaffer) ;;; This has a simple scheduler following the rendez-vous paradigm of lwp. ;;; - Policy is: 1/ process with highest priority runs. ;;; 2/ within same priority, no ordering. ;;; - Allow for rendez-vous synchronization: ;;; (msg_send pid msg): send a message to pid and wait for reply ;;; (msg_recv pid msg): wait until pid sends a message ;;; (msg_reply pid reply): reply to pid ;;; - No semaphore yet... ;;; State-transition diagram for threads: ;;; ;;; -> --- Ready----- <--- ;;; / / | \ \ ;;; 1/ 2/ 3| \5 \6 ;;; / v v 4 v \ ;;; W_msg_send W_msg_recv--->W_msg_reply ;;; ;;; 1: thread did (msg_recv tid) was waiting for a msg_send from tid and ;;; got it. ;;; 2: thread does (msg_recv tid) or (msg_recv #f) [any thread] ;;; 3: thread does (msg_send tid msg) ;;; 4: thread did (msg_send tid msg) and tid did (msg_recv tid). ;;; thread now waits for reply from tid. ;;; 5: thread does (msg_send tid msg) and tid had already done (msg_rcv #f). ;;; 6: thread did (msg_send tid msg) and tid now does a (msg_reply msg). (require 'full-continuation) ;; need call/cc (require 'priority-queue) ;; priority queue (require 'queue) ;; FIFO for queue of messages per thread. (require 'struct) ;; records ;; ;; Externals for queues: ;; (make-queue) ;; (queue? x) ;; (queue-empty? q) ;; (queue-front q) ;; (queue-rear q) ;; (queue-push! q datum) ;; (queue-pop! q) ;; (enqueue! q datum) ;; (dequeue! q) ;; ;; Externals for priority-queue: ;; ;; (make-heap comparator) ;; (heap-insert! heap value) ;; (heap-extract-max! heap) ;; (heap:length heap) ;; ;; Needed here for hacking the internals of a heap: ;; Search queue for key using accessor on each element ;; Return the element in the heap. (define (heap:get heap key accessor) (letrec ((loop (lambda (i) (if (> i (heap:length heap)) #f (let ((hi (heap:ref heap i))) (if (eq? (accessor hi) key) hi (loop (1+ i)))))))) (loop 1))) (define (heap-max heap) (heap:ref heap 1)) (define (writeln x) (display x) (newline)) ;;; ============================================================ ;;; Status is: w_msg_send: waiting for a send ;;; w_msg_recv: waiting for a recv ;;; w_msg_reply: waiting for a reply ;;; ready: ready to run ;;; Status is a record (s data). ;;; Status ADT (define-record status (s data)) (define (thread:ready? status) (eq? (status->s status) 'ready)) (define (thread:w_msg_send? status) (eq? (status->s status) 'W_msg_send)) (define (thread:w_msg_recv? status) (eq? (status->s status) 'W_msg_recv)) (define (thread:w_msg_reply? status) (eq? (status->s status) 'W_msg_reply)) ;;; ============================================================ ;;; A thread has an id, a function and a priority. ;;; Status is its current status. ;;; Port: queue of messages not yet processed (FIFO). ;;; Value is the value an interrupted thread is waiting for. ;;; For debugging - add a name field. (define-record thread (id thunk priority status value port name)) (define (thread->thunk-set! t thunk) (vector-set! t 2 thunk)) (define (thread->priority-set! t p) (vector-set! t 3 p)) (define (thread->status-set! t s) (vector-set! t 4 s)) (define (thread->value-set! t v) (vector-set! t 5 v)) (define (thread->enqueue-msg! t v) (enqueue! (thread->port t) v)) (define (thread->dequeue-msg! t) (dequeue! (thread->port t))) (define (thread:priority< proc1 proc2) (< (thread->priority proc1) (thread->priority proc2))) (define thread:newid (let ((thread:id 0)) (lambda () (set! thread:id (1+ thread:id)) thread:id))) ;;; Make THE running thread heap and the blocked thread heap (define thread:q (make-heap thread:priority<)) ;;; Get thread with tid (define (thread:get tid) (heap:get thread:q tid thread->id)) ;; Maximum priority currently stored in heap (define (heap:maxprio heap) (thread->priority (heap:ref heap 1))) ;;; Return the status of thread tid (define (thread:status tid) (let ((t (thread:get tid))) (if t (thread->status t) #f))) (define (thread:maxprio) (heap:maxprio thread:q)) ;; For recv from anybody: 'any matches anyone (define (thread:match tid1 tid2) (or (eq? tid1 tid2) (eq? tid1 'any) (eq? tid2 'any))) (define (thread:name tid) (if (eq? tid 'any) "any" (thread->name (thread:get tid)))) ;;; ============================================================ ;;; The thread calling add-thread can be interrupted if the new thread ;;; has a higher priority. ;;; It eventually will get newid as a value. (define (add-thread! name thunk1 priority initval) (cond ((procedure? thunk1) (let ((newid (thread:newid))) (display "# Adding thread ") (writeln newid) (heap-insert! thread:q (make-thread newid thunk1 priority (make-status 'ready initval) initval (make-queue) name)) (thread:schedule! newid))) (else (slib:error "add-thread!: wrong type argument " thunk1)))) ;;; Remove a thread from the queues (define (destroy-thread! tid) ;; Give to the dead thread max priority then extract it (let ((thread (thread:get tid))) (if thread (begin (thread->priority-set! thread (1+ (thread:maxprio))) (heap:heapify thread:q 1) (heap:extract-max! thread:q) (thread:dump) (thread:schedule! 'ignore))))) ;;; For debugging purposes: state of the queue. (define (thread:dump) (letrec ((loop (lambda (i) (if (> i (heap:length thread:q)) #f (let ((hi (heap:ref thread:q i))) (display "# Thread ") (display (thread->name hi)) (display " (") (display (thread->id hi)) (display ")") (display " priority ") (display (thread->priority hi)) (display " status ") (display (thread->status hi)) (display " port ") (thread:dump-port (thread->port hi)) (newline) (loop (1+ i))))))) (loop 1))) (define (thread:dump-port queue) (if (queue-empty? queue) (display "") (let ((top (dequeue! queue))) (letrec ((loop (lambda (elt) (display elt) (enqueue! queue elt) (if (eq? (queue-front queue) top) (display ">") (begin (display " - ") (loop (dequeue! queue))))))) (display "<") (loop top))))) ;;; ============================================================ ;;; This gets called when the current thread (highest priority ready) is ;;; blocked and waits or when a higher-priority thread becomes ready. ;;; Remember to kill a thread when its thunk terminates. (define (thread:schedule! value) (cond ((not (thread:ready?)) (writeln "No more threads ready") (thread:toplevel 'done)) (else (let ((thread (heap-max thread:q))) (call-with-current-continuation (lambda (cont) ;; Remember state of current thread (thread->thunk-set! thread:cur cont) (thread->value-set! thread:cur value) ;; Activate highest priority thread (set! thread:cur thread) (display "# Switching to ") (writeln (thread->id thread:cur)) ((thread->thunk thread) (thread->value thread)) (display "# thread ") (display (thread->id thread)) (writeln " is dead") ;; Continue on next thread (destroy-thread! (thread->id thread)))))))) ;;; Initialization ;;; In general, thread:init is called from the toplevel ;;; When that happens, toplevel is stored in the q as a thread w/priority 0 (define thread:cur 'ignore) (define thread:toplevel 'ignore) (define (thread:init thunk priority initval) (call-with-current-continuation (lambda (toplevel) (set! thread:toplevel toplevel) (set! thread:cur (make-thread 0 toplevel 0 (make-status 'ready 0) 0 (make-queue) "INIT")) (add-thread! "INIT" thunk priority initval)))) ;;; Switch tid from its current status to a new one. (define (thread:switch-id tid status data) (let ((thread (thread:get tid))) (if thread (thread:switch thread status data)))) (define (thread:switch thread status data) (thread->status-set! thread (make-status status data)) (cond ((eq? status 'ready) (thread->value-set! thread data) (thread:ready! thread)) (else (thread:block! thread)))) ;;; Small hack to keep blocked and ready threads in the same heap: ;;; blocked threads have a negative priority (define (thread:ready! thread) (if (< (thread->priority thread) 0) (begin (thread->priority-set! thread (- (thread->priority thread))) (heap:heapify thread:q 1)))) (define (thread:block! thread) (if (> (thread->priority thread) 0) (begin (thread->priority-set! thread (- (thread->priority thread))) (heap:heapify thread:q 1)))) ;;; Is there a thread ready to run? Yes if max has a positive priority (define (thread:ready?) (and (> (heap:length thread:q) 0) (> (heap:maxprio thread:q) 0))) ;;; ============================================================ ;;; Rendez-vous primitives ;;; ;; cid msg-send to tid: ;; if (w_msg_send tid ?x) and (match cid ?x) ;; - cid -> w_msg_reply tid (5) ;; - tid -> ready msg (1) ;; else ;; - cid -> w_msg_rcv tid (3) + enqueue msg on tid's port ;; If tid dead ;; - return #f ;; Caller gets as a value: ignore. ;; Need to queue the messages sent to tid because several ;; different threads can send a message before a thread receives. (define (msg-send tid msg) (display "# msg-send ") (display (thread->name thread:cur)) (display " sends to ") (display (thread:name tid)) (display " - msg ") (write msg) (display " - ") (let ((stat (thread:status tid))) (cond ((eq? stat #f) #f) ((and (thread:w_msg_send? stat) (thread:match (status->data stat) (thread->id thread:cur))) (display (thread->name thread:cur)) (display " is blocked (5) - ") (display (thread:name tid)) (writeln " is ready (1)") (thread:switch thread:cur 'w_msg_reply tid) (thread:switch-id tid 'ready msg) (thread:schedule! 'ignore)) (else (display (thread->name thread:cur)) (writeln " is blocked (3)") (thread->enqueue-msg! (thread:get tid) (cons (thread->id thread:cur) msg)) (display "# ") (display (thread:name tid)) (display " has port ") (thread:dump-port (thread->port (thread:get tid))) (writeln "") (thread:switch thread:cur 'w_msg_rcv tid) (thread:schedule! 'ignore))))) ;; cid msg-recv from tid ;; if (w_msg_recv tid cid) [found in port] ;; - tid -> w_msg_reply (4) ;; else ;; - cid -> w_msg_send (2) ;; If tid dead ;; - return #f ;; Caller gets as a value: msg sent by sender. (define (msg-recv tid) (display "# msg-recv ") (display (thread->name thread:cur)) (display " recv from ") (display (thread:name tid)) (display " - ") ;; Did someone send a msg in the past? (let ((fromport (thread:found-in-queue tid))) (cond ;; yes fromport is (tid . msg) (fromport (thread:transition4 (car fromport) (cdr fromport))) ;; Nobody sent to us in the past - we accept from anybody ((eq? tid 'any) (thread:transition2 'any)) ;; We only accept from tid (else ;; Check if tid exists (let ((stat (thread:status tid))) (cond ((eq? stat #f) #f) (else (thread:transition2 tid)))))))) ;; Is there a message sent in the past by tid (or a matching tid) ;; Search assumes here that same tid cannot appear twice in the port ;; Since after sending, tid waits until it gets a reply. ;; Elements are of the form: (tid . msg) ;; If found, remove the matching element from the port and return it. (define (thread:found-in-queue tid) (let ((curq (thread->port thread:cur))) (if (queue-empty? curq) #f (let ((top (dequeue! curq))) (letrec ((loop (lambda (elt) (cond ((thread:match tid (car elt)) elt) (else ;; put it back (enqueue! curq elt) (if (eq? (queue-front curq) top) #f (loop (dequeue! curq)))))))) (loop top)))))) (define (thread:transition2 tid) (display (thread->name thread:cur)) (writeln " is blocked (2)") (thread:switch thread:cur 'w_msg_send tid) ;; The value that cid will receive will get updated later ;; in transition 1 when cid is switched back to ready (thread:schedule! 'future)) (define (thread:transition4 tid data) (display (thread:name tid)) (writeln " is blocked (4)") (thread:switch-id tid 'w_msg_reply (thread->id thread:cur)) (thread:schedule! data)) ;; cid msg-reply to tid ;; if (w_msg_reply tid cid) ;; - tid -> ready msg (6) ;; else ;; - return #f ;; Caller gets as a value: ignore (define (msg-reply tid msg) (display "# msg-reply ") (display (thread->name thread:cur)) (display " reply to ") (display (thread:name tid)) (display " - msg ") (write msg) (display " - ") (let ((stat (thread:status tid))) (cond ((eq? stat #f) #f) ((and (thread:w_msg_reply? stat) (thread:match (status->data stat) (thread->id thread:cur))) (display (thread:name tid)) (writeln " is ready (6)") (thread:switch-id tid 'ready msg) (thread:schedule! 'ignore)) (else #f)))) (define (thread:self) (thread->id thread:cur)) ;;; ============================================================ ;;; Thread tests ;;; ;;; Important that driver1 has higher priority than t1 and t2 ;;; so that t1 does not start running before t2 is created. (define t1 '*) (define t2 '*) (define (driver1 x) (set! t1 (add-thread! "T1" thread1 10 '*)) (set! t2 (add-thread! "T2" thread2 10 '*)) (writeln "Back from driver1")) (define (thread1 tid) (display "In thread1 with ") (writeln t2) (let ((reply (msg-send t2 "Hello"))) (display "Back in thread1 with ") (writeln reply))) (define (thread2 tid) (display "In thread2 with ") (writeln t1) (let ((msg (msg-recv t1))) (display "Recv in thread2 with ") (writeln msg) (writeln "Reply in thread2") (msg-reply t1 "All is fine"))) (define (test1) (thread:init driver1 20 'ignore)) ;;; ============================================================ ;;; SameFringe with threads ;;; 3 threads: driver, parse1 and parse2 ;;; This is coroutining. (define (same-fringe tree1 tree2) (thread:init sfdriver 20 (cons tree1 tree2))) (define (test2) (same-fringe '((1) (2 (3))) '(1 (2) 3))) (define p1 '*) (define p2 '*) (define co '*) (define (sfdriver tree-pair) (set! p1 (add-thread! "P1" parsetree 10 (car tree-pair))) (set! p2 (add-thread! "P2" parsetree 10 (cdr tree-pair))) (set! co (add-thread! "CO" comparator1 10 '*)) (writeln "Threads launched")) ;; Traverse whole tree then send end-of-tree message to comparator. ;; Note well that eot cannot be identified from within recursion. (define (parsetree tree) (letrec ((loop (lambda (tree) (cond ((null? tree) #f) ((pair? tree) (loop (car tree)) (loop (cdr tree))) (else (msg-send co (cons tree (thread:self)))))))) (loop tree) (msg-send co (cons 'eot (thread:self))))) ;; Two ways to write the comparator ;; comparator2 allows for more freedom in the scheduling. ;; It requires the use of the (msg-recv 'any) construct. ;; comparator1 works p1 and p2 step by step in a fixed order. (define (comparator1 x) (letrec ((loop (lambda () (let ((val1 (car (msg-recv p1))) (val2 (car (msg-recv p2)))) (if (eq? val1 val2) (if (eq? val1 'eot) (thread:toplevel 'same-fringe) (begin (msg-reply p1 'next) (msg-reply p2 'next) (display "Found a match: ") (writeln val1) (loop))) (thread:toplevel 'diff-fringe)))))) (loop))) (define (comparator2 x) (letrec ((loop (lambda () (let ((msg1 (msg-recv 'any))) (let ((val1 (car msg1)) (first (cdr msg1)) (next '*)) (set! next (if (eq? first p1) p2 p1)) (msg-reply first 'next) (let ((val2 (car (msg-recv next)))) (if (eq? val1 val2) (if (eq? val1 'eot) (thread:toplevel 'same-fringe) (begin (msg-reply next 'next) (display "Found a match: ") (writeln val1) (loop))) (thread:toplevel 'diff-fringe)))))))) (loop)))