previous up next
Go up to Top
Go forward to Footnotes
RISC-Linz logo

Source Code

// --------------------------------------------------------------------------
// $Id: snapshot.tex,v 1.3 1997/11/12 21:59:23 schreine Exp schreine $
// the Chandy-Lamport algorithm for taking distributed snapshots
//
// (c) 1997, Wolfgang Schreiner <Wolfgang.Schreiner@risc.uni-linz.ac.at>
// http://www.risc.uni-linz.ac.at/software/daj
// --------------------------------------------------------------------------
import daj.*;
import java.util.*;

// ----------------------------------------------------------------------------
//
// the application itself
//
// ----------------------------------------------------------------------------
public class Main extends Application
{
  // --------------------------------------------------------------------------
  // main function of application
  // --------------------------------------------------------------------------
  public static void main(String[] args)
  {
    new Main().run();
  }

  // --------------------------------------------------------------------------
  // constructor for application
  // --------------------------------------------------------------------------
  public Main()
  {
    super("Distributed Snapshots", 580, 450);
  }

  // --------------------------------------------------------------------------
  // construction of a hexagon network (just for fun)
  // --------------------------------------------------------------------------
  public void construct()
  {
    // length of first row and half height of hexagon
    int w = 3;
    int h = 2;
    int size = (w+h-1)*(w+h)-w*(w-1)+(w+h);
    Node nodes[][] = new Node[w+h][2*h+1];

    // left upper corner and vertical/horizontal distance
    int x0 = 170;
    int y0 = 50;
    int x = 100;
    int y = 86;

    // random generator for initialization of deposit
    Random random = new Random();

    // create and place nodes
    int number = 0;
    for (int j = 0; j < h+1; j++)
      {
        for (int i = 0; i < w+j; i++)
          {
            int deposit = Math.abs(random.nextInt())%1000;
            nodes[i][j] = 
              node(new Prog(size, number, deposit), String.valueOf(number), 
                   x0+i*x-j*x/2, y0+j*y);
            number++;
        
          }
      }
    for (int j = h+1; j < 2*h+1; j++)
      {
        for (int i = 0; i < w+2*h-j; i++)
          {
            int deposit = Math.abs(random.nextInt())%1000;
            nodes[i][j] = 
              node(new Prog(size, number, deposit), String.valueOf(number), 
                   x0+i*x-(2*h-j)*x/2, y0+j*y);
            number++;
          }
      }

    // link nodes
    for (int j = 0; j < h+1; j++)
      {
        for (int i = 0; i < w+j; i++)
          {
            if (i < w+j-1) link(nodes[i][j], nodes[i+1][j]);
            if (i > 0) link(nodes[i][j], nodes[i-1][j]);
            if (j < h) 
              {
                link(nodes[i][j], nodes[i][j+1]);
                link(nodes[i][j], nodes[i+1][j+1]);
              }
            else
              {
                if (i < w+j-1) link(nodes[i][j], nodes[i][j+1]);
                if (i> 0) link(nodes[i][j], nodes[i-1][j+1]);
              }
            if (j > 0) 
              {
                if (i < w+j-1) link(nodes[i][j], nodes[i][j-1]);
                if (i > 0) link(nodes[i][j], nodes[i-1][j-1]);
              }
          }
      }
    for (int j = h+1; j < 2*h+1; j++)
      {
        for (int i = 0; i < w+2*h-j; i++)
          {
            if (i < w+2*h-j-1) link(nodes[i][j], nodes[i+1][j]);
            if (i > 0) link(nodes[i][j], nodes[i-1][j]);
            if (j < 2*h) 
              {
                if (i > 0) link(nodes[i][j], nodes[i-1][j+1]);
                if (i < w+2*h-j-1) link(nodes[i][j], nodes[i][j+1]);
              }     
            if (j > 0) 
              {
                link(nodes[i][j], nodes[i][j-1]);
                link(nodes[i][j], nodes[i+1][j-1]);
              }
          }
      }
  }

  // --------------------------------------------------------------------------
  // informative message
  // --------------------------------------------------------------------------
  public String getText()
  {
    return "Distributed Snapshots\n \n" +
      "The Chandy-Lamport algorithm\n" +
      "for taking consistent global snapshots\n" +
      "of a running network.";
  }
}

// ----------------------------------------------------------------------------
//
// a program class
//
// ----------------------------------------------------------------------------
class Prog extends Program
{
  // state variables
  public int size;        // network size
  public int number;      // current node number
  public int deposit;     // current deposit
  public Message message; // message currently being processed
  public int index;       // index of channel from which message was received
  public int mode;        // current execution mode

  // snapped local state
  public int snapValue;      // current snap value
  public boolean snapped[];  // snap state of input channels
  public int missingSnaps;   // number of missing snaps of input channels

  // snapped total state
  public int totalValue;     // current network value
  public boolean done[];     // true iff node has reported its snap value
  public int missingValues;  // number of outstanding node reports

  // execution modes
  private final int RUNNING = 0;      // initial state
  private final int SNAPPING = 1;     // constructing snapshot
  private final int BROADCASTING = 2; // broadcasting result
  private final int TERMINATED = 3;   // terminal state

  // --------------------------------------------------------------------------
  // construct program in network of size `s` at node `n` with deposit `d`
  // --------------------------------------------------------------------------
  public Prog(int s, int n, int d)
  { 
    size = s;
    number = n;
    deposit = d;
    message = null;    
    mode = RUNNING;
  } 
 
  // --------------------------------------------------------------------------
  // return random integer `r` with 0 <= `r` < `n`
  // --------------------------------------------------------------------------
  private Random rand = new Random();
  private int random(int n)
  {
    return Math.abs(rand.nextInt())%n;
  }

  // --------------------------------------------------------------------------
  // go into snapping mode
  // --------------------------------------------------------------------------
  public void snap()
  {
    // go into snap mode, store local deposit, reset total value
    mode = SNAPPING;
    snapValue = deposit;
    totalValue = 0;

    // initialize snapping state of channels
    int s = in().getSize();
    snapped = new boolean[s];
    for (int i = 0; i < s; i++)
      snapped[i] = false;    
    missingSnaps = s;

    // initialize report state of nodes
    done = new boolean[size];
    for (int i = 0; i < size; i++)
      done[i] = false;
    missingValues = size;
  }

  // --------------------------------------------------------------------------
  // collect `value` from `sender`
  // --------------------------------------------------------------------------
  public void collect(int sender, int value)
  {
    totalValue += value;
    done[sender] = true;
    missingValues--;
    if (missingValues == 0) 
      mode = TERMINATED;
  }

  // --------------------------------------------------------------------------
  // main function of program
  // --------------------------------------------------------------------------
  public void main()
  { 
    while (mode != TERMINATED)
      {
        // node 0 triggers snapping at its own will
        if (number == 0 && mode == RUNNING && random(10) == 0)
          {
            snap();
            interrupt();
            out().send(new SnapMessage());
          }

        // wait some time for message
        index = in().select(random(5));
        if (index == -1 && deposit > 0)
          {
            // ---------------------------------------------------------------
            // send some money from deposit to some neighbor node
            // ---------------------------------------------------------------
            int value = random(deposit);
            deposit -= value;
            message = new ValueMessage(value);
            index = random(out().getSize());
            out(index).send(message);
            message = null;
            continue;
          }

        // message was received
        message = in(index).receive();

        // ---------------------------------------------------------------
        // value message was received
        // ---------------------------------------------------------------
        if (message instanceof ValueMessage)
          {
            ValueMessage valueMessage = (ValueMessage)message;
            int value = valueMessage.getValue();
            deposit += value;
            
            // snap message
            if (mode == SNAPPING && !snapped[index])
              {
                snapValue += value;
              }
          }

        // ---------------------------------------------------------------
        // snap message was received
        // ---------------------------------------------------------------
        else if (message instanceof SnapMessage)
          {
            SnapMessage snapMessage = (SnapMessage)message;
            if (mode == RUNNING) 
              {
                snap();
                out().send(snapMessage);
              }
            if (!snapped[index])
              {
                snapped[index] = true;
                missingSnaps--;
                if (missingSnaps == 0)
                  {
                    mode = BROADCASTING;
                    collect(number, snapValue);
                    if (number == 0) interrupt();
                    out().send(new ResultMessage(number, snapValue));
                  }
              }
          }

        // ---------------------------------------------------------------
        // result message was received
        // ---------------------------------------------------------------
        else if (message instanceof ResultMessage)
          {
            ResultMessage resultMessage = (ResultMessage)message;
            int sender = resultMessage.getSender();
            if (!done[sender])
              {
                collect(sender, resultMessage.getValue());
                out().send(resultMessage);
              }
          }
        
        // message was handled
        message = null;
      }
    // assert that total amount of money equals the determined value
    assert(new TotalValue(totalValue));
  }

  // --------------------------------------------------------------------------
  // called for display of program state
  // --------------------------------------------------------------------------
  public String getText()
  {
    String messageString;
    if (message == null)
      messageString = "(null)";
    else
      messageString = message.getText();
    String modeString;
    switch (mode)
      {
      case RUNNING:
        {
          modeString = "RUNNING";
          break;
        }
      case SNAPPING:
        {
          modeString = "SNAPPING";
          break;
        }
      case BROADCASTING:
        {
          modeString = "BROADCASTING";
          break;
        }
      case TERMINATED:
        {
          modeString = "TERMINATED";
          break;
        }
      default:
        {         
          modeString = "(unknown)";
          break;
        }
      }
    if (message == null)
      messageString = "(null)";
    else
      messageString = message.getText();

    if (mode == RUNNING)      
      return 
        "mode: " + modeString +
        "\ndeposit: " + String.valueOf(deposit);
    else if (mode == SNAPPING)
      return 
        "mode: " + modeString +
        "\ndeposit: " + String.valueOf(deposit) +
        "\nsnapValue: "  + String.valueOf(snapValue) +
        "\nmissingSnaps: "  + String.valueOf(missingSnaps);
    else if (mode == BROADCASTING)
      return 
        "mode: " + modeString +
        "\ndeposit: " + String.valueOf(deposit) +
        "\nsnapValue: "  + String.valueOf(snapValue) +
        "\ntotalValue: " + String.valueOf(totalValue) +
        "\nmissingValues: "  + String.valueOf(missingValues);
    else
      return 
        "mode: " + modeString +
        "\ndeposit: " + String.valueOf(deposit) +
        "\nsnapValue: "  + String.valueOf(snapValue) +
        "\ntotalValue: " + String.valueOf(totalValue);
  }
}

// ----------------------------------------------------------------------------
//
// the message classes
//
// ----------------------------------------------------------------------------


// ----------------------------------------------------------------------------
// carries `value`
// ----------------------------------------------------------------------------
class ValueMessage extends Message
{
  private int value;  
  public ValueMessage(int v)
  {
    value = v;
  }
  public int getValue()
  {
    return value;
  }
  public String getText()
  {
    return "VALUE[" + String.valueOf(value) + "]";
  }
}

// ----------------------------------------------------------------------------
// carries no information
// ----------------------------------------------------------------------------
class SnapMessage extends Message
{
  public String getText()
  {
    return "SNAP";
  }
}

// ----------------------------------------------------------------------------
// carries `value` of `sender`
// ----------------------------------------------------------------------------
class ResultMessage extends Message
{
  private int sender;
  private int value;  
  public ResultMessage(int s, int v)
  {
    sender = s;
    value = v;
  }
  public int getSender()
  {
    return sender;
  }
  public int getValue()
  {
    return value;
  }
  public String getText()
  {
    return "RESULT[" + String.valueOf(sender) + ", " + 
      String.valueOf(value) + "]";
  }
}

// ----------------------------------------------------------------------------
//
// global assertion stating that total values of network equals `value`
//
// ----------------------------------------------------------------------------
class TotalValue extends GlobalAssertion
{
  int value; // expected value
  int sum;   // counted value
  public TotalValue(int v)
  {
    value = v;
  }
  public String getText()
  {
    return "invalid value: " + String.valueOf(sum) + 
      "(" + String.valueOf(value) + " expected)";
  }

  // --------------------------------------------------------------------------
  // add values of all messages in `msg`
  // --------------------------------------------------------------------------
  private int add(Message msg[])
  {
    int s = 0;
    for (int i = 0; i < msg.length; i++)
      { 
        if (msg[i] instanceof ValueMessage)
          {
            s += ((ValueMessage)msg[i]).getValue();
          }
      }
    return s;
  }

  // --------------------------------------------------------------------------
  // check whether total value of network equals `value`
  // --------------------------------------------------------------------------
  public boolean assert(Program prog[])
  {
    sum = 0;
    for (int j = 0; j < prog.length; j++)
      { 
        Prog program = (Prog)prog[j];
        // value deposited in node
        sum += program.deposit;
        // value in output channels
        int n = program.out().getSize();
        for (int i = 0; i < n; i++)
          {
            Message msg[] = getMessages(program.out(i));
            sum += add(msg);
          }
        // value in pending message
        if (program.message != null && program.message instanceof ValueMessage)
          {
            sum += ((ValueMessage)program.message).getValue();
          }
      }
    return sum == value;
  }
}

Maintained by: Wolfgang Schreiner
Last Modification: November 13, 1997

previous up next