Chapter 14. Threads

Continuations make the implementation of threads almost trivial. The trick is in the trampoline. Our old trampoline method repeatedly called Bounce() on the current continuation to get the next continuation, until a continuation returned undef:

 70 sub trampoline {
 71     my ($cont) = @_;
 72     $cont = $cont->Bounce() while defined $cont;
 73 }

If you think about it, a continuation already represents a single thread of computation. The trampoline is just managing that single thread, ensuring that it does not consume too much stack. Suppose that trampoline(), instead of just repeatedly invoking the current continuation, kept a queue of continuations, and after bouncing the one at the front of the queue put the result onto the back of the queue (if the result was not undef,) looping until the queue was empty. This version does exactly that:

101 sub trampoline {
102     while (@thread_queue) {
103         my $cont = shift @thread_queue;
104         $cont = $cont->Bounce();
105         push @thread_queue, $cont if defined $cont;
106     }
107 }

Note that it no longer takes an argument continuation, instead it gets the next continuation from the front of the queue. @thread_queue is a new lexical “my” variable in the PScm package.

We place new threads on that queue with a new_thread() method, also in the PScm package:

 96 sub new_thread {
 97     my ($self, $cont) = @_;
 98     push @thread_queue, $cont;
 99 }

Very simple. it takes a continuation and pushes it onto the queue.

Next we need a way of creating threads from the PScheme language. This is done using a new special form spawn. spawn takes no arguments and returns 0 to one thread and 1 to the other. This means you can write code that does different things in different threads by testing the result, much like the UNIX fork system call does:

> (if (spawn)
>   (begin
>     (print "hello")
>     (print "hello")
>     1)
>   (begin
>     (print "goodbye")
>     (print "goodbye")
>     (exit)))
"hello"
"goodbye"
"hello"
"goodbye"
1
[download]

Notice that although both threads run in parallel, one thread does an (exit) so only the result 1 from the other thread gets printed.

spawn is a new special form in PScm::SpecialForm::Spawn, and it's surprisingly easy to implement:

286 package PScm::SpecialForm::Spawn;
287 
288 use base qw(PScm::SpecialForm);
289 use PScm::Continuation;
290 
291 sub Apply {
292     my ($self, $form, $env, $cont) = @_;
293 
294     PScm->new_thread(cont {
295         $cont->Cont(new PScm::Expr::Number(0));
296     });
297 
298     $cont->Cont(new PScm::Expr::Number(1));
299 }

On Line 294 it calls new_thread() with a new continuation that will call the current continuation with argument 0, and on Line 298 it directly calls the current continuation with an argument of 1. This is so easy I feel like I have cheated!, but really that's all there is to it. The new continuation will get executed in turn when the Cont() on Line 298 returns control to the trampoline, and the trampoline will continue executing any threads on its queue until all threads have finished and the queue is empty.

exit is even more trivial. It has to be a special form because individual primitives do not get called in tail position, but all that it has to do is to return undef to the trampoline:

327 package PScm::SpecialForm::Exit;
328 
329 use base qw(PScm::SpecialForm);
330 
331 sub Apply {
332     my ($self, $form, $env, $cont) = @_;
333     return undef;
334 }
335 
336 1;

Incidentally, exit provides a useful way of terminating the interactive interpreter. Typing (exit) at the prompt while only one thread is running will result in an empty $thread_queue so the trampoline will finish.

All that remains is to wire this in to the repl:

 35 sub ReadEvalPrint {
 36     my ($infh, $outfh) = @_;
 37 
 38     $outfh ||= new FileHandle(">-");
 39     my $reader      = new PScm::Read($infh);
 40     my $initial_env;
 41     $initial_env = new PScm::Env(
 42         let          => new PScm::SpecialForm::Let(),
 43         '*'          => new PScm::Primitive::Multiply(),
 44         '-'          => new PScm::Primitive::Subtract(),
 45         '+'          => new PScm::Primitive::Add(),
 46         if           => new PScm::SpecialForm::If(),
 47         lambda       => new PScm::SpecialForm::Lambda(),
 48         list         => new PScm::Primitive::List(),
 49         car          => new PScm::Primitive::Car(),
 50         cdr          => new PScm::Primitive::Cdr(),
 51         cons         => new PScm::Primitive::Cons(),
 52         letrec       => new PScm::SpecialForm::LetRec(),
 53         'let*'       => new PScm::SpecialForm::LetStar(),
 54         eval         => new PScm::SpecialForm::Eval(),
 55         macro        => new PScm::SpecialForm::Macro(),
 56         quote        => new PScm::SpecialForm::Quote(),
 57         'set!'       => new PScm::SpecialForm::Set(),
 58         begin        => new PScm::SpecialForm::Begin(),
 59         define       => new PScm::SpecialForm::Define(),
 60         'make-class' => new PScm::SpecialForm::MakeClass(),
 61         'call/cc'    => new PScm::SpecialForm::CallCC(),
 62         print        => new PScm::SpecialForm::Print($outfh),
 63         spawn        => new PScm::SpecialForm::Spawn(),
 64         exit         => new PScm::SpecialForm::Exit(),
 65     );
 66 
 67     $initial_env->Define(
 68         PScm::Expr::Symbol->new("root"),
 69         PScm::Class::Root->new($initial_env)
 70     );
 71     __PACKAGE__->new_thread(cont { repl($initial_env, $reader, $outfh) });
 72     trampoline();
 73 }

Apart from the addition of spawn and exit to the initial environment, there is only one change. The repl uses new_thread() to add the initial thread (continuation) to the @thread_queue then calls trampoline() with no arguments, rather than passing the continuation directly to trampoline().

14.1. Variations

A more complete thread implementation would also provide mechanisms for collecting the result of one thread in another with a wait command—not so easy, you'd need to put the waiting thread on a separate queue and have the exit command take an argument and put it somewhere that the wait command could find.

You would also need to be able to prevent concurrent access to sections of code, best done with some sort of atomic semaphore operation. But atomicity is easy to guarantee at the level of the interpreter internals, as long as no continuations are called during the claiming of a semaphore.

These variations are exercises you can try at home.

14.2. Tests

A simple test for spawn is in Listing 28.

Listing 28. t/CPS_Spawn.t
  1 use strict;
  2 use warnings;
  3 use Test::More;
  4 use lib 't/lib';
  5 use PScm::Test tests => 3;
  6 
  7 BEGIN { use_ok('PScm') }
  8 
  9 eval_ok(<<EOF, <<EOR, 'spawn');
 10 (if (spawn)
 11     (begin
 12         (print "hello")
 13         (print "hello")
 14         1)
 15     (begin
 16       (print "goodbye")
 17       (print "goodbye")
 18       (exit)))
 19 EOF
 20 "hello"
 21 "goodbye"
 22 "hello"
 23 "goodbye"
 24 1
 25 EOR
 26 
 27 eval_ok('(exit)', '', 'exit');
 28 
 29 # vim: ft=perl
Full source code for this version of the interpreter is available at
http://billhails.net/Book/releases/PScm-0.1.10.tgz
Last updated Sun Mar 14 10:43:08 2010 UST