|
Go backward to B.2 Algorithm Go up to B Distributed Snapshots Go forward to B.4 State Program |
|
Embedded into this page is a DAJ program that implements the algorithm described in the previous subsection.
You should see a button labelled "Press Me" embedded below. If you do not see this button but the string "Please enable Java!" instead, you must enable Java on your Web browser and then reload thiis page.
In case your browser does not support Java, we include a snapshot of the visualization window below.
Below you can find the full source code of the program.
// --------------------------------------------------------------------------
// $Id: snapshot.tex,v 1.3 1997/11/12 21:59:23 schreine Exp $
// the Chandy-Lamport algorithm for taking distributed snapshots
//
// (c) 1997, Wolfgang Schreiner <Wolfgang.Schreiner@risc.uni-linz.ac.at>
// http://www.risc.uni-linz.ac.at/software/daj
// --------------------------------------------------------------------------
import daj.*;
import java.util.*;
// ----------------------------------------------------------------------------
//
// the application itself
//
// ----------------------------------------------------------------------------
public class Main extends Application
{
// --------------------------------------------------------------------------
// main function of application
// --------------------------------------------------------------------------
public static void main(String[] args)
{
new Main().run();
}
// --------------------------------------------------------------------------
// constructor for application
// --------------------------------------------------------------------------
public Main()
{
super("Distributed Snapshots", 580, 450);
}
// --------------------------------------------------------------------------
// construction of a hexagon network (just for fun)
// --------------------------------------------------------------------------
public void construct()
{
// length of first row and half height of hexagon
int w = 3;
int h = 2;
int size = (w+h-1)*(w+h)-w*(w-1)+(w+h);
Node nodes[][] = new Node[w+h][2*h+1];
// left upper corner and vertical/horizontal distance
int x0 = 170;
int y0 = 50;
int x = 100;
int y = 86;
// random generator for initialization of deposit
Random random = new Random();
// create and place nodes
int number = 0;
for (int j = 0; j < h+1; j++)
{
for (int i = 0; i < w+j; i++)
{
int deposit = Math.abs(random.nextInt())%1000;
nodes[i][j] =
node(new Prog(size, number, deposit), String.valueOf(number),
x0+i*x-j*x/2, y0+j*y);
number++;
}
}
for (int j = h+1; j < 2*h+1; j++)
{
for (int i = 0; i < w+2*h-j; i++)
{
int deposit = Math.abs(random.nextInt())%1000;
nodes[i][j] =
node(new Prog(size, number, deposit), String.valueOf(number),
x0+i*x-(2*h-j)*x/2, y0+j*y);
number++;
}
}
// link nodes
for (int j = 0; j < h+1; j++)
{
for (int i = 0; i < w+j; i++)
{
if (i < w+j-1) link(nodes[i][j], nodes[i+1][j]);
if (i > 0) link(nodes[i][j], nodes[i-1][j]);
if (j < h)
{
link(nodes[i][j], nodes[i][j+1]);
link(nodes[i][j], nodes[i+1][j+1]);
}
else
{
if (i < w+j-1) link(nodes[i][j], nodes[i][j+1]);
if (i> 0) link(nodes[i][j], nodes[i-1][j+1]);
}
if (j > 0)
{
if (i < w+j-1) link(nodes[i][j], nodes[i][j-1]);
if (i > 0) link(nodes[i][j], nodes[i-1][j-1]);
}
}
}
for (int j = h+1; j < 2*h+1; j++)
{
for (int i = 0; i < w+2*h-j; i++)
{
if (i < w+2*h-j-1) link(nodes[i][j], nodes[i+1][j]);
if (i > 0) link(nodes[i][j], nodes[i-1][j]);
if (j < 2*h)
{
if (i > 0) link(nodes[i][j], nodes[i-1][j+1]);
if (i < w+2*h-j-1) link(nodes[i][j], nodes[i][j+1]);
}
if (j > 0)
{
link(nodes[i][j], nodes[i][j-1]);
link(nodes[i][j], nodes[i+1][j-1]);
}
}
}
}
// --------------------------------------------------------------------------
// informative message
// --------------------------------------------------------------------------
public String getText()
{
return "Distributed Snapshots\n \n" +
"The Chandy-Lamport algorithm\n" +
"for taking consistent global snapshots\n" +
"of a running network.";
}
}
// ----------------------------------------------------------------------------
//
// a program class
//
// ----------------------------------------------------------------------------
class Prog extends Program
{
// state variables
public int size; // network size
public int number; // current node number
public int deposit; // current deposit
public Message message; // message currently being processed
public int index; // index of channel from which message was received
public int mode; // current execution mode
// snapped local state
public int snapValue; // current snap value
public boolean snapped[]; // snap state of input channels
public int missingSnaps; // number of missing snaps of input channels
// snapped total state
public int totalValue; // current network value
public boolean done[]; // true iff node has reported its snap value
public int missingValues; // number of outstanding node reports
// execution modes
private final int RUNNING = 0; // initial state
private final int SNAPPING = 1; // constructing snapshot
private final int BROADCASTING = 2; // broadcasting result
private final int TERMINATED = 3; // terminal state
// --------------------------------------------------------------------------
// construct program in network of size `s` at node `n` with deposit `d`
// --------------------------------------------------------------------------
public Prog(int s, int n, int d)
{
size = s;
number = n;
deposit = d;
message = null;
mode = RUNNING;
}
// --------------------------------------------------------------------------
// return random integer `r` with 0 <= `r` < `n`
// --------------------------------------------------------------------------
private Random rand = new Random();
private int random(int n)
{
return Math.abs(rand.nextInt())%n;
}
// --------------------------------------------------------------------------
// go into snapping mode
// --------------------------------------------------------------------------
public void snap()
{
// go into snap mode, store local deposit, reset total value
mode = SNAPPING;
snapValue = deposit;
totalValue = 0;
// initialize snapping state of channels
int s = in().getSize();
snapped = new boolean[s];
for (int i = 0; i < s; i++)
snapped[i] = false;
missingSnaps = s;
// initialize report state of nodes
done = new boolean[size];
for (int i = 0; i < size; i++)
done[i] = false;
missingValues = size;
}
// --------------------------------------------------------------------------
// collect `value` from `sender`
// --------------------------------------------------------------------------
public void collect(int sender, int value)
{
totalValue += value;
done[sender] = true;
missingValues--;
if (missingValues == 0)
mode = TERMINATED;
}
// --------------------------------------------------------------------------
// main function of program
// --------------------------------------------------------------------------
public void main()
{
while (mode != TERMINATED)
{
// node 0 triggers snapping at its own will
if (number == 0 && mode == RUNNING && random(10) == 0)
{
snap();
interrupt();
out().send(new SnapMessage());
}
// wait some time for message
index = in().select(random(5));
if (index == -1 && deposit > 0)
{
// ---------------------------------------------------------------
// send some money from deposit to some neighbor node
// ---------------------------------------------------------------
int value = random(deposit);
deposit -= value;
message = new ValueMessage(value);
index = random(out().getSize());
out(index).send(message);
message = null;
continue;
}
// message was received
message = in(index).receive();
// ---------------------------------------------------------------
// value message was received
// ---------------------------------------------------------------
if (message instanceof ValueMessage)
{
ValueMessage valueMessage = (ValueMessage)message;
int value = valueMessage.getValue();
deposit += value;
// snap message
if (mode == SNAPPING && !snapped[index])
{
snapValue += value;
}
}
// ---------------------------------------------------------------
// snap message was received
// ---------------------------------------------------------------
else if (message instanceof SnapMessage)
{
SnapMessage snapMessage = (SnapMessage)message;
if (mode == RUNNING)
{
snap();
out().send(snapMessage);
}
if (!snapped[index])
{
snapped[index] = true;
missingSnaps--;
if (missingSnaps == 0)
{
mode = BROADCASTING;
collect(number, snapValue);
if (number == 0) interrupt();
out().send(new ResultMessage(number, snapValue));
}
}
}
// ---------------------------------------------------------------
// result message was received
// ---------------------------------------------------------------
else if (message instanceof ResultMessage)
{
ResultMessage resultMessage = (ResultMessage)message;
int sender = resultMessage.getSender();
if (!done[sender])
{
collect(sender, resultMessage.getValue());
out().send(resultMessage);
}
}
// message was handled
message = null;
}
// assert that total amount of money equals the determined value
assert(new TotalValue(totalValue));
}
// --------------------------------------------------------------------------
// called for display of program state
// --------------------------------------------------------------------------
public String getText()
{
String messageString;
if (message == null)
messageString = "(null)";
else
messageString = message.getText();
String modeString;
switch (mode)
{
case RUNNING:
{
modeString = "RUNNING";
break;
}
case SNAPPING:
{
modeString = "SNAPPING";
break;
}
case BROADCASTING:
{
modeString = "BROADCASTING";
break;
}
case TERMINATED:
{
modeString = "TERMINATED";
break;
}
default:
{
modeString = "(unknown)";
break;
}
}
if (message == null)
messageString = "(null)";
else
messageString = message.getText();
if (mode == RUNNING)
return
"mode: " + modeString +
"\ndeposit: " + String.valueOf(deposit);
else if (mode == SNAPPING)
return
"mode: " + modeString +
"\ndeposit: " + String.valueOf(deposit) +
"\nsnapValue: " + String.valueOf(snapValue) +
"\nmissingSnaps: " + String.valueOf(missingSnaps);
else if (mode == BROADCASTING)
return
"mode: " + modeString +
"\ndeposit: " + String.valueOf(deposit) +
"\nsnapValue: " + String.valueOf(snapValue) +
"\ntotalValue: " + String.valueOf(totalValue) +
"\nmissingValues: " + String.valueOf(missingValues);
else
return
"mode: " + modeString +
"\ndeposit: " + String.valueOf(deposit) +
"\nsnapValue: " + String.valueOf(snapValue) +
"\ntotalValue: " + String.valueOf(totalValue);
}
}
// ----------------------------------------------------------------------------
//
// the message classes
//
// ----------------------------------------------------------------------------
// ----------------------------------------------------------------------------
// carries `value`
// ----------------------------------------------------------------------------
class ValueMessage extends Message
{
private int value;
public ValueMessage(int v)
{
value = v;
}
public int getValue()
{
return value;
}
public String getText()
{
return "VALUE[" + String.valueOf(value) + "]";
}
}
// ----------------------------------------------------------------------------
// carries no information
// ----------------------------------------------------------------------------
class SnapMessage extends Message
{
public String getText()
{
return "SNAP";
}
}
// ----------------------------------------------------------------------------
// carries `value` of `sender`
// ----------------------------------------------------------------------------
class ResultMessage extends Message
{
private int sender;
private int value;
public ResultMessage(int s, int v)
{
sender = s;
value = v;
}
public int getSender()
{
return sender;
}
public int getValue()
{
return value;
}
public String getText()
{
return "RESULT[" + String.valueOf(sender) + ", " +
String.valueOf(value) + "]";
}
}
// ----------------------------------------------------------------------------
//
// global assertion stating that total values of network equals `value`
//
// ----------------------------------------------------------------------------
class TotalValue extends GlobalAssertion
{
int value; // expected value
int sum; // counted value
public TotalValue(int v)
{
value = v;
}
public String getText()
{
return "invalid value: " + String.valueOf(sum) +
"(" + String.valueOf(value) + " expected)";
}
// --------------------------------------------------------------------------
// add values of all messages in `msg`
// --------------------------------------------------------------------------
private int add(Message msg[])
{
int s = 0;
for (int i = 0; i < msg.length; i++)
{
if (msg[i] instanceof ValueMessage)
{
s += ((ValueMessage)msg[i]).getValue();
}
}
return s;
}
// --------------------------------------------------------------------------
// check whether total value of network equals `value`
// --------------------------------------------------------------------------
public boolean assert(Program prog[])
{
sum = 0;
for (int j = 0; j < prog.length; j++)
{
Prog program = (Prog)prog[j];
// value deposited in node
sum += program.deposit;
// value in output channels
int n = program.out().getSize();
for (int i = 0; i < n; i++)
{
Message msg[] = getMessages(program.out(i));
sum += add(msg);
}
// value in pending message
if (program.message != null && program.message instanceof ValueMessage)
{
sum += ((ValueMessage)program.message).getValue();
}
}
return sum == value;
}
}