This document is derived of the lecture by Prof. Dr. P. Fischer & Dr. S. Köhler at the University of Freiburg in semester SS16.

Chapter 1: Basics & System Models

Def.: […] Collection of individual computing devices that can communicate with each other. […] Each processor in a distributed system has its semiindependent agenda, but for various reasons, including sharing of resources, availablity, and fault tolerance, processors need to coordinate their actions. – Attiya, Welch 2004

Challenges

Lot’s of nodeterminism! Unpredictable delays, failures, actions, concurrenty and uncertainty. Much harder to get this right, important to have theoretical tools to argue about correctness. Easier to go from theory to pracitce than vice versa.

Two models

Message Passing

  1. Formally:

    1. System consists of \(n\) deterministic nodes \(v_1, ..., v_n\) and pairwise communication channels
    2. At each time, each node \(v_i\) has some internal state \(Q_i\)
    3. System is event-based: states change based on discrete events
    • Internal State: inputs, local variables, some local clocks, history of whole sequence of observed events
    • Types of events
      • Send event \(s_{ij}\): Some node \(v_i\) puts a message on the communication channel to node \(v_j\)
      • Receive Event \(r_{ij}\): Node \(v_i\) received message from \(v_j\)
      • Timing Event: Event triggered at node by local clock
  2. Schedules and Executions

    • Configuration \(C\): Vector of al \(n\) node states (at given time)
    • Execution Fragment: Sequence of alternating configurations and events
    • Execution: execution fragment that starts with initial config \(C_0\)
    • Schedule: execution without the configurations, but including inputs
  3. Remarks

    • Local state: The local state of \(v_i\) does not include the states of messates sent by \(v_i\)
    • Adversary: within the timing guarantees of the model (synchrony assumptions), execution/schedule is determined in a worst-case way
    • Deterministic nodes: in the basic model, all nodes are deterministic. This will be relaxed. Model details / adversary more tricky

Shared Memory

Synchrony

Modeling assumptions:

  1. Synchronous

    • System: System runs in synchronous time steps (rounds). Discrete time. Round \(r\) takes place between time \(r - 1\) and time \(r\)
    • Message passing: Round \(r\): at time \(r - 1\) each process sends out messages, delivered and processed at time \(r\)
    • Shared memory: in each round every process can access one memory cell
  2. Asynchronous

    • System: process speeds and message delay are finite but unpredictable. Assumption: process speeds / message delays are determined in a worst-case way by an adversarial scheduler
    • Message passing: Messages are always delivered but can have arbitrary delays
    • Shared Memory: All processes eventually do their next steps, process speeds are arbitrary

Failures

Correctness of Distributed Systems

Three kinds of correctness properties:

Local Schedules

A node \(v\)’s state is determined by \(v\)’s inputs and observable events.

  1. Indistinguishability

    For two schedules \(S\) and \(S'\) and for a node \(v_i\) with the same inputs in \(S\) and \(S'\), we have \(S|i= S'|i\), then the next action performed by \(v_i\) is the same in both schedules \(S\) and \(S'\).

Asynchronous executions

Chapter 2: The Two Generals Problem

Non-trivial distributed services must coordinate their actions. Basic coordination: agreeing on some action / fact.

The problem: two red armies, one blue army. How to coordinate attacks by red armies only using pigeons that might not make it.

Two generals problem formally

Two generals: proved to be unsolvable

Using the indistinguishability proof:

Similarity:

Chain of executions \(E_0, ..., E_k\) such that \(\forall i \in \{1, ..., k\} : E_{i-1} \sim_v E_i\)

Contradiction!

Solving two generals: randomized algorithm

  1. Level Algorithm

    • Both nodes compute a level
    • at end, levels differ by at most one
    • levels essentially measure number of successful back and forth transmissions

    Steps:

    1. init levels to \(0\)
    2. in each round: both nodes send current level to each other
    3. new level at \(u\): \(l_u := \text{max}\{l_u, l_v + 1\}\)

    Properies:

    • at all times, levels differ by at most one
    • if all messages delivered, tow levels are equal to number of rounds
    • level \(l_u\) of node \(u\) is \(0\) if all messages to \(u\) have been dropped
  2. Randomized two generals algorithm

    Two nodes \(u\) and \(v\), \(u\) is leader node. Possible inputs \(0\) and \(1\).

    1. Node \(u\) picks random number \(R \in \{1, ..., r\}\)
    2. Nodes run level algorithm for \(r\) rounds. Also include inputs and node \(u\) also includes \(R\) in message.
    3. At the end, node decides \(1\) if:
      • both inputs are equal to \(1\)
      • node knows \(R\) and has seen both inputs
      • level of the node is \(\geq R\)
    4. Otherwise, decides \(0\)

Lower Bound on Error Probability

For the two generals problem, \(\epsilon\) is:

Chapter 3: Broadcast, Convergecast and Spanning Trees

Assumptions:

Broadcast

Source node \(n\) needs to broadcast message \(M\) to all nodes, all nodes have unique ID, only knows IDs of neighbors.

  1. Flooding

    1. source node \(s\): send \(M\) to all neighbors
    2. all other nodes: when receiving \(M\) from some neighbor \(v\) and has not been received before, send \(M\) to all neighbors except \(v\).

    After \(r\) rounds, exactly nodes at distance \(\leq r\) from \(s\) known \(M\).

    1. Radius

      • Given a node \(s \in V\), radius of \(s\) in \(G\): \[\text{rad}(G, s) := \text{max}_{v \in V} \text{dist}_G(s, v)\]
      • radius of \(G\): \[\text{rad}(G) := \text{min}_{s \in V}\text{rad}(G, s)\]
    2. Diameter of \(G\)

      \[\text{diam}(G) := \text{max}_{u, v \in V} \text{dist}_G(u, v) = \text{max}_{s \in V}\text{rad}(G, s)\]

    3. Time complexity of flooding in synchronous systems: \(rad(G, s)\)

      \[\frac{\text{diam}(G)}{2} \leq \text{rad}(G) \leq \text{rad}(G, s) \leq \text{diam}(G)\]

  2. Asynchronous complexity

    • message delays for local computations are arbitrary
    • for analysis: message delays \(\leq 1\) time unit, local computations take \(0\) time
    1. determine running time of execution
    2. asynch. time complexity = max. running time of any exec

    Running time of execution: normalize message delays such that maximum delay is \(1\) time unit

    Def. Asynchronous Time Complexity: Total time of a worst-case execution in which local computations take time \(0\) and all message delays are at most \(1\) time unit.

    • time complexity of flooding from a source \(s\) in an asynch network \(G\) is \(\text{rad}(G, s)\)
    • message complexity (no. of messages): \(O(|E|)\)
  3. Flooding Spanning Tree

    Flooding algorithm can be used to compute spanning tree of the network. Idea: Source \(s\) is root, all other nodes parent is first sender of \(M\)

    • In synchronous systems: distance of \(v\) to root is round which \(v\) is reached. Tree preserves graph distances to root node ⇒ BFS Tree
    • In async. system: depth of tree can be \(n - 1\) even if radius/diameter of graph is \(1\).

Convergecast

Opposite of broadcst, given a rooted spanning tree: communicate from all nodes to the root.

  1. Flooding/Echo algorithm

    1. use flooding to construct a tree
    2. use convergecast to report back to the root node when done
    • time complexity: \(O(diam(G))\) (depth of tree). In async. systems: \(\Omega(n)\)
    • message complexity: \(2E\)
    • low diameter spanning trees important! How to construct in async. system?

Distributed Dijkstra

Unweighted graph, grow tree level by level. Algorithm:

  1. Root node boradcasts “start phase \(r + 1\)” in current ree
  2. Leaf nodes (level \(r\) nodes) send “join \(r + 1\)” to neighbors
  3. Node \(v\) receiving “join \(r + 1\)” from neighbor \(u\):
    1. first such message: \(u\) becomes parent of \(v\), \(v\) sends ACK to \(u\)
    2. otherwise, \(v\) sends NACK to \(u\)
  4. After receiving ACK or NACK from all neighbors, level \(r\) nodes report back to root by starting convergecast
  5. When that terminates, the root can start next phase

Distributed Bellman-Ford

Basic idea: Each node \(u\) stores an int \(d_u\) with current guess for distance to root node \(s\). When a node \(u\) can improve \(d_u\), inform neighbors.

  1. init step
    • \(d_s := 0\)
    • \(\forall v \neq s: d_v := \infty\) and \(\text{parent}_v := \bot\)
  2. root \(s\) sends \(1\) to all neighbors
  3. For all other nodes \(u\). When receive message \(x\) with \(x < d_u\) from neighbor \(v\):
    • set \(d_u := x\)
    • set \(\text{parent}_u := v\)
    • send \(x + 1\) to all neighbors execpt \(v\)

Distributed BFS Tree Construction

  1. Synchronous

    • Time: \(O(\text{diam}(G))\)
    • Messages: \(O(|E|)\)

    ⇒ Optimal!

  2. Asynchronous

    Time Messages
    Distributed Dijkstra \(O(\text{diam}(G)^2)\) \(O(E + V * \text{diam}(G))\)
    Distributed Bellman-Ford \(O(\text{diam}(G))\) \(O(E * V)\)
    Best known trade-off \(O(\text{diam}(G) * log^3V)\) \(O(E + V * log^3V)\)

    Best known trade-off based on synchronizers

Synchronizers

Synchronous algorithms are simple, but systems asynchronous. Idea: run synch. algorithm in async. systems. The synchronizer simulates rounds at all nodes. Important: local schedules are the same as in sync. exec.

  1. Simple local synchronizers

    • node \(u\) generates clock pulses to start each new round
    • before starting \(r\), \(u\) needs to make sure that all messages of round \(r - 1\) have been received.
    • after starting round \(r\), \(u\) sends all messages of round \(r\).

    To detect if all messages of current round received: always send a message (dummy message).

    • algorithm correctly allows to run a synch. algo. in an async. system.
    • if all nodes start simulation at \(t = 0\), time complexity for \(R\) rounds is \(R\).
    • total number of dummy messages to simulate \(R\) is at most \(O(R * |E|)\)
  2. Synchronizer \(S\)

    • time complexity \(T(S)\) is time for simulating one round
    • message complexity \(M(S)\) is messages needed for simulating one round

    For simple synchronizer: \(T(S) = 1\), \(M(S) = 2|E|\). Other trade-offs possible!

  3. BFS Tree with synchrozier

    • time complexity: \(O(\text{diam}(G))\)
    • message complexity: \(O(\text{diam}(G) * |E|)\)

    Slightly better than distributed Bellman-Ford. Best BFS algorithm is based on best known synchronizer

Leader Election

How to pick root node in distributed system? Example: pick node with smallest ID.

For node \(u\) store smallest known ID in variable \(x_u\).

  1. Initial step: \(u\) set \(x_u := \text{ID}_u\) and sends \(x_u\) to all neighbors
  2. When receiving \(x_v < x_u\) from neighbor \(v\):
    • \(x_u := x_v\)
    • send \(x_u\) to all neighbors except \(v\)

Problems: message complexity is high, when/how to terminate! Can both be solved at some cost.

Chapter 4: Causality, Logical Time and Global States

Logical Clocks

Assign a timestamp to all events in an asynchronous message-passing system

Causal Shuffles

A schedule \(S'\) is a causal shuffle of schedule \(S\) iff \[\forall v \in V: S|v = S'|v\]

Observation: If \(S'\) is causal shuffle of \(S\), no node can distringuish between \(S\) and \(S'\)

Causal Order

Logical clocks are based on causal order of events: Event \(e\) should occur before \(e'\) if it provably occurs before \(e'\). Clock value \(t_e < t_{e'}\) !

*Event \(e\) provably occurs before \(e'\) iff \(e\) appars before \(e'\) in all causal shuffles of \(S*\)

Lamport’s Happens-Before-Relation

MPS: only send and receive events.

Two events \(e\) and \(e'\) occurring at \(u\) and \(u'\). \(t\) and \(t'\) are times when \(e\) and \(e'\) occur.

\(e\) provably occurs before \(e'\) if:

  1. The events occur at the same node and \(e\) occurs before \(e'\)
  2. Event \(e\) is a send event and \(e'\) the recv. event of the same message
  3. The is an event \(e''\) for which we know provably that \(e\) occurs before \(e''\) and \(e''\) occurs before \(e'\)

Def: happens-before-relation: \(\Rightarrow_s\) is pairwise relation on send/receive events of S. Contains:

  1. All pairs \((e, e')\) where \(e\) precedes \(e'\) in \(S\) and event of same node
  2. All pairs \((e, e')\) where \(e\) is send event and \(e'\) receive event for the same message
  3. All pairs \((e, e')\) where there is a third event \(e''\) such that \[e \Rightarrow_s e'' \wedge e'' \Rightarrow_s e'\]

Happens-Before and Causal Shuffles

Theorem: Given a schedule \(S\) and two events \(e\) and \(e'\), the following statements are equivalent:

Lamport Clocks

  1. Each event \(e\) gets a clock value \(\tau(e) \in \mathbb{N}\)
  2. If \(e\) and \(e'\) are events at the same node and \(e\) precedes \(e'\) then \[\tau(e) < \tau(e')\]
  3. If \(s_M\) and \(r_M\) are send and receive events of some msg. \(M\), then \[\tau(s_M) < \tau(r_M)\]
  1. Algorithm

    1. initial counter at node \(n\) is \(c_u = 0\)
    2. For any non-recieve event \(e\) at node \(u\): \(c_u := c_u + 1\); \(\tau(e) := c_u\)
    3. For any send event \(s\) at \(u\), attach value of \(\tau(s)\) to message
    4. For any receive event \(r\) at \(u\) (corresp. send event \(s\)) compute: \[c_u := max\{c_u, \tau(s)\} + 1; \tau(r) := c_u\]
  2. Discussion

    • Advantage: no change of protocol
    • Disadvantage: large clock jumps

Neiger-Toueg-Welch Clocks

Idea:

Fidge-Mattern Vector Clocks

  1. Algorithm

    1. All noes keep vector \(VC(u)\) with an entry for all nodes \(V\) initialized to \(0\).
    2. For all non-receive event \(e\) at \(u\): \[VC_u(u) := VC_u(u) + 1; VC(E) := VC(u)\]
    3. For any send event \(s\) at node \(u\), attach \(VC(s)\) to \(s\) message
    4. For recieve event \(r\) at \(u\) (corresp. send event \(s\)) compute: \[\forall v \neq u: VC_v(u) := max \{VC_v(s), VC_v(u)\}; VC_u(u) := VC_u(u) + 1; VC(r) := VC(u)\]
  2. Vector Clocks and Happens-Before

    \[VC(e) < VC(e') \leftrightarrow e \Rightarrow_s e'\]

Local Clock vs. Synchronizers

Clock pulses generated by synchronizer can also be seen as logical clocks.

Properties:

Consistent cuts (Global State)

It is not possible to record global state at any given time of execution. Therfore, best option is to find a global state which could have been true at some time. ⇒ Global Snapshot

Cut: Given a schedule \(S\), a cut is a subset \(C\) of the events of \(S\) such that for all nodes \(v \in V\) the events in \(C\) happening at \(v\) form a prefix of the sequence of events in \(S|v\).

Consistent Cut: Given a schedule \(S\), a consistent cut \(C\) is a cut such that for all events \(e \in C\) and for all events \(f\) in \(S\) it hols that \[f \Rightarrow_s e \rightarrow f \in C\]

Given a schedule \(S\), a cut \(C\) is consistent cut iff for each message \(M\) with send event \(s_M\) and receive event \(r_M\), if \(r_M \in C\) then it also holds that \(s_M \in C\).

Consistent Snapshot

A consistent snapshot is a global system state which is consistent with all local views.

Computing a Consistent Snapshot (with Logical Clocks)

Claim: \(\forall \tau_0 \geq 0 : C(\tau_0)\) is consistent cut.

  1. Algorithm

    Goals: Capture consistent snapshot in running system.

    1. Node \(s\) records its state
    2. When node \(u\) receives a marker message from node \(v\):
      • if \(u\) has not recorded its state then
        • \(u\) records its state
        • \(u\) starts recording messages on all incoming chanels
      • else
        • \(u\) stops recording messages on incoming channel from \(v\) to \(u\)
        • set of msg. in transit from \(v\) to \(u\) is set of recorded messages
    3. after node \(u\) records its state:
      • node \(u\) sends marker message on all outgoing channels (before any other message)

Chapter 5: Clock Synchronization

Lead question: how to get synchronized clocks?

logical time (“happens-before”) vs. physical time

Propertes of clock sync. algorithm

UTC

Clock drift. Deviation from the nominal rate dependent on power supply, temerature, etc. TinyNodes hav max. drift of 30-50 ppm (50µs per second)

NTP

Clock sync via UDP. Packet delay is estimted to reduce clock skew.

  1. Propagation delay estimation

    Measure Round-Trip time (RTT). \(t_1\) is request time on A, \(t_2\) is request received time on B, \(t_3\) is answer time on B, \(t_4\) is answer received time on A.

    Delay: \[\delta = \frac{(t_4 - t_1) - (t_3 - t_2)}{2}\] Clock skew: \[\Omega = \frac{(t_2 - (t_1 + \delta)) - (t_4 - (t_3 + \delta))}{2} = \frac{(t_2 - t_1) + (t_3 - t4)}{2}\]

  2. Message jitter

    Various sources of errors (deterministic and non-deterministic). Fix by timestamping packets at MAC layer.

PTP

Precision time protocol (PTP), similar to NTP.

Hardware clock distribution

Synchronous digital circuits require all components to act in sync. The bigger the clock skew, the longer the clock period. Optimize routing, insert buffers.

Clock Sync. tricks in wireless networks

Best tree for tree-based clock sync?

Finding a good tree for clock synch is a tough problem. Find spanning tree with small (maximum or average) strech. Hard problem with approx. algorithms.

Clock sync tricks (GTSP)

Gradient time sync protocol.

Results in all nodes consistenly averaging errors to all neighbors. In Tree-like algorithms (e.g. FTSP) we can get a bad local skew.

Theory of clock sync

Formal model

\(A^{\text{max}}\) algorithm

  1. set local clock to maximum clock value received from any neighbor
  2. if recv. value \(>\) previously forwarded value, forward immediately
  3. at least forward local logical clock value every \(T\) time step.
  1. Problems:

    • global and local skew can both be \(\Omega(D)\)
    • clock values can jump (\(\beta = \infty\))
  2. Can we do better?

    • make clock continuous
    • global skew cannot be improved
    • local skew can be improved, however (simple ideas don’t work, \(\Omega(1)\) is not possible)
  3. Skew lower bounds

    • The global skew guarantee of any clock sync is at least \(\frac{D}{2}\).
    • Lower bound skew is \(\Omega(log_{\frac{\beta - 1}{p}} D)\)

Chapter 6: Consensus

Consensus

  1. Setting

    • \(n\) nodes \(v_1, v_2, ..., v_n\)
    • each node has input \(x_1, x_2, ..., x_n \in \mathbb{D}\)
    • each (non-failing) node computes output \(y_1, y_2, ..., y_n \in \mathbb{D}\)
  2. Agreement

    Output of all non-failing nodes are equal

  3. Validity

    If all inputs are equal to \(x\), all outputs are equal to \(x\)

    Depends on failure model

  4. Termination

    All non failing processes terminate after finite number of steps

  5. Remarks

    • two generals problem is variant of \(n\)-node, binary consensus
    • model: synchronous communication, messages can be lost
    • validity: if no messages are lost, and both nodes have the same input \(x\), \(x\) needs to be the output
    • problem can not be solved in the setting

Shared Memory

  1. Protocol

    • memory cell \(c\)
    • initially \(c\) is in special state ?
    • processor 1 writes \(x_1\) into \(c\), then decides on \(x_1\)
    • processor \(j \neq 1\) reads \(c\) until \(j\) reads something else than ? and then decides on that
  2. Probems

    • unexpected delay
    • heterogeneous architectures: the slowest one slows down the whole system
    • no fault-tolerance

Computability

Def: Usually means turing-computable ⇒ strong math. model

Shared-memory computability: Model of async concuccrent computation. Wait-free computable on multiprocessor. Wait-free?

Wait-free Shared Model

Wait-free : Every process completes in finite number of steps, no locks, we assume that we have wait-free atomic registers.

Every processor does:

r = read(c);
if (r == "?") {
   write(c, x_i); decide(x_i);
} else {
   decide(r);
}

Read-Modify-Write Memory

  1. Formally

    Cell \(c\) and function \(f\). Method call: Replace value \(x\) of cell \(c\) with \(f(x)\). Return value \(x\) from cell \(c\)

Consensus Number

An object has consensus number \(n\) if it can be used to implement \(n\)-process consensus, but not (\(n + 1\))-process consensus

Example: Atomic read/write registers have consensus number \(1\). Works with \(1\) process, impossibility with \(2\).

  1. Theorem

    If you can implement \(X\) from \(Y\) and \(X\) has the consensus number \(c\) then \(Y\) has consensus number at least \(c\). ⇒ Useful way of measuring synchronization power

    Alternative:

    • If \(X\) has consensus number \(c\)
    • And \(Y\) has consensus number \(d < c\)
    • Then there is no way to construct a wait-free implementation of \(X\) by \(Y\)

    A RMW is non-trivial if there eists a value \(v\) such that \(v \neq f(v)\):

    • Test&Set, Fetch&Inc, Fetch&Add, Swap, Compare&Swap, general RMW
    • But not read

    Any non-trivial RMV object has consensus number at least \(2\). Implies no wait-free implementation of RMW registers from read/write registers.

  2. Interfering RMW

    Let \(F\) be a set of functions such that for all \(f_i\) and \(f_j\) either the commute or they overwrite. Any such of RMW objects has consensus number of exactly \(2\)

Synchronous System

One can sometimes tell if a processor had crashed (timeouts, broken connection)

  1. Simple consensus algorithm

    Each process:

    1. broadcast own value
    2. decide on the minimum of all received values

    Problem: no consensus if a processor fails

  2. \(f\)-Resilient consensus algorithm

    If an algorithm solves consensus for \(f\) failed processes, we say it is an \(f\)-resilient consensus algorithm.

    Each process:

    1. broadcast own value
    2. round 2 to round \(f + 1\): broadcast minimum of received values unless it has been sent before.
    3. end of round \(f + 1\): decode on the minimum value received

    \(f = 2\) failures, so \(f + 1\) rounds needed

    Theorem: If at most \(f \leq n - 2\) of \(n\) nodes of a synchronous message passing system can crash, at least \(f + 1\) rounds are needed to solve consensus.

Byzantine Failures

A node can not only crash and stop forever, but it may also recover or are damaged (different processes receive different values)

Theorem: There is no \(f\)-resilient Byzantine consensus algorithm for \(n\) nodes for \(f \geq \frac{n}{3}\).

Simple byzantine agreement algorithm

Works for any \(f\) and \(n > 3f\), which is optimal. Only takes \(f + 1\), optimal for crash failures. Works for any input, not just binary input.

Problem: not easy to formalize, size of the message increases exponentially.

Queen Algorithm

  1. Algorithm:

    • Round 1: Broadcast own value, set own value to value that was received most often.
      • if own value appears \(> \frac{n}{2} + f\) times, support value
      • else: do not support any value
    • Round 2: The queen broadcasts its own value
      • if not supporting any value, set own value to queen’s value
  2. Advantages:

    • nodes only exchange current values, small messages
    • works for any input and not just binary input
  3. Disadvantages:

    • algorithm requires \(f + 1\) phases consisting of \(2\) rounds each. Twich as much as optimal algorithm!
    • only works with \(f < \frac{n}{4}\) Byzantine nodes

King Algorithm

  1. Algorithm:

    • Round 1: broadcast own value
    • Round 2
      • If some value \(x\) appreas \(\geq n - f\) times
        • Broadcast Propose x
      • If some proposal received \(> f\) times
        • set own value to this proposal
    • Round 3
      • The king broadcasts its value
      • if own value received \(< n - f\) proposals, set own value to king’s value
  2. Advantages:

    • Works for any \(f\) and \(n > 3f\), which is optimal
    • Messages are small
    • Works for any input not just binary input
  3. Disadvantage:

    • Algorithm requires \(f + 1\) phases consisting of \(3\) rounds each.

B.A. Using athentication

  1. Algorithm:

    • if I am \(v\) and own input is \(1\)
      • \(value := 1\)
      • broadcast \(v\) has \(1\)
    • else
      • \(value := 0\)

    In each round \(r \in \{1, ..., f + 1\}\):

    • if \(value = 0\) and accepted \(r\) messages \(v\) has \(1\) in total including a message from \(v\) itself
      • \(value := 1\)
      • broadcast \(v\) has \(1\) plus the \(r\) accepted messages that caused the local value to be set to \(1\).

    After \(f + 1\) rounds: decide value

  2. Advantages

    • works for any number of byzantine nodes
    • only takes \(f + 1\) rounds ⇒ optimal
    • small messages (sub-exponential length)
  3. Disadvantages:

    • If \(v\) is Byzantine, nodes may agree on a value that is not in the original input
    • Only works for binary input
    • requires authenticated messages

Randomized algorithm

  1. Algorithm

    • \(x := \text{own input}; r:= 0\)
    • broadcast proposal(\(x\), \(r\))

    In each round \(r\)

    1. Wait for \(n - f\) proposals from round \(r - 1\)
    2. If at least \(n - 2f\) proposals have some value \(y\)
      • \(x := y\), decide on \(y\)
      else if at least \(n - 4f\) proposals have some value \(y\)
      • \(x := y\)
      else
      • choose \(x\) randomly with \(Pr[x = 0] = Pr[x = 1] = \frac{1}{2}\)
    3. Broadcast proposal(\(x\), \(r\))
    4. If decided on value → stop
  2. Discussion

    Algorithm satisfies validity and agreement conditions, but not termination condition. Also: very slow

  3. Shared coin

    A shared coin is a random binary variable that is \(0\) with constant probability and \(1\) with constant probability. Used in randomized algorithm to speed it up.

    Assume only crash failures, no byzantine failures!

    1. Algorithm

      • Set local coin \(c_i := 0\) with probabilty \(\frac{1}{n}\), else \(c_i := 1\)
      • Broadcast \(c_i\)
      • Wait for exactly \(n - f\) coins and collect all coints in the local coin set \(s_i\)
      • Broadcast \(s_i\)
      • Wait for exactly \(n - f\) coin sets
      • If at least one coin is \(0\) among all coins in the coins set
        • return \(0\)
      • Else
        • return \(1\)
    2. Discussion

      Subroutine terminates, at least \(\frac{1}{3}\) of all coins are seen by everybody. There are at least \(f + 1\) coins in at least \(f + 1\) coin sets

      All processes decide \(0\) with constant probabilty and \(1\) with constant probabilty.

  4. Summary

    • takes constant number of rounds in expectation
    • handle crash failures even if communication is async
    • only works if there are \(f < \frac{n}{9}\) crash failures. No byzantine nodes!
    • only works for binary input.

    Note: there is a constant expected time algorithm that tolerates \(f < \frac{n}{2}\) crash failures

  5. Byzantine and asynchronous

    Can be solved, but message size remains polynomial and message complexity \(O(n^3)\). Does not use global coin, but tries to detect biased local coinflips from Byzantine processes.

Chapter 8: Distributed System Architectures

Client-Server

  1. One tier

    • Client
    • Server: resource management, application, presentation
  2. Two tier

    • Client: presentation
    • Server: resource management, application

    OR

    • Client: presentation, application
    • Server: resource management
  3. Three tier

    • Client: presentation
    • Application Server: application (middleware)
    • Server: resource management

Multiprocessor Architectures

Shared memory

Shared Disk

Shared-Nothing

Communication Paradigms

Chapter 9: Reliability

Availability: probabilty for a server to be read/to serve: \[\frac{\text{MTBF}}{\text{MTBF} + \text{MTTR}}\] MTBF: mean time between failure MTTR: mean time to repair

⇒ fast recovery is key to high availability

Protocols:

Three types of failures:

Types of Storage

Interactions:

Steal+No-Force : Improves throughput and latency, but makes recovery more complicated.

ARIES Algorithm

*A*lgorithm for *R*ecovery and *I*solation *E*xploiting *S*emantics

  1. Principles of ARIES

    1. Write-Ahead Logging: Record database changes before actual change
    2. Repeating History During Redo: After crash, bring system back to the exact state at crash time, undo active transactions
    3. Logging Changes During Undo: Log changes during transaction undo so that they are not repeated in case of repeated failures
  2. WAL

    Any change to a database object ist first recorded in the log, which must be written to stable storage before the change itself is written to disk.

    • To ensure atomicity and prepare for undo, write undo to stable storage before a page update is written to disk
    • To ensure durability, redo info must be written to stable storage at commit time
    1. Log Information

      • LSN: Log Sequence NUmber, Monotonically increasing
      • Type: Begin, Commit, Abort, Update, Compensaion
      • TID: Transaction Id
      • PrevLSN: Previous LSN of the same transaction
      • PageID: Page which was modified
      • NextLSN: Next LSN of same transaction
      • Redo Information
      • Undo Information

      Redo Information: ARIES does page-oriented redo, stores byte images of pages before and after modification.

      Undo Information: ARIES assumes logical undo, record the actual tuple changes

  3. Writing Log Records

    • For performance, write to volatile storage first
    • Log is forced to stable storage up to certain LSN
    • Committed transation = all log recods are on stable storage

    During normal transaction processing keep in each transaction control block:

    • LastLSN: LSN of last log record for this transaction
    • NextLSN: LSN of next log record to be processed during rollback
    • When update on page \(p\) is performed, a log record \(r\) is written to WAL and LSN of \(r\) is recorded in page header of \(p\)
  4. ARIES Transaction Rollback

    • Process log in backward fashing
    • Start the undo operation ath log entry pointed to by the Next fied in transaction control blokc of T
    • Find remaining log entries for T by following Prev and Next fields in log
    • Perform changes in undo part of log entry

    Undo operations must be recorded to WAL. Use compensation log records for this purpuse. Never undo an undo action, but might need to redo undo action.

  5. ARIES Crash Recovery

    1. Analysis Phase
      • read log in forward direction
      • find active transactions where failure happened (“losers”)
    2. Redo Phase
      • Replay the log to bring the system into state as of the time of system failure
      • restore the losers
    3. Undo Phase
      • Roll back all loser transactions

Commit coordination and consensus

Set of independent servers with transactions spanning several servers, i.e. subtransactions at some sites. For successfull commit, all subtransaction have to be committed.

Approach: Commit protocols. Fundamental approach is to rely on leader/coordinator. Leader proposes value or client talks to leader, participants decide and inform coordinator.

Algorithms:

2-Phase-Commit (2PC)

    • Coordinator sends vote-request to participants
    • Participant: When receiving a vote-request, return either vote-commit oder vote-abort
    • Collect vote, if al vote-commit send global-commit otherwise global-abort
    • All participants wait on global-* and react accordingly
  1. Decentralized variant

    • Phase 1: Coordinator sends, depending on its vote vote-commit or vote-abort to participants
    • Phase 2a: Participant received vote-abort from coordinator, it aborts. Otherwise it has received vote-commit and returns either commit or abort to coordinator and all other participants. If it sends abort, it aborts its local computation
    • Phase 2b: After having received all votes, coordinator and all participants have all votes available, if all are / commit/ the commit, otherwise abort.

3-Phase-Commit (3PC)

Unblock 2PC by spreading decision knowledge

Without network splits all would be good, but with network splits two new coordinators could arrise that arrive at different values.

Paxos

Safe, nonblocking, making progress if possible, resilient to failures, delays and network partitions

Ideas:

  1. Roles

    • Client: issue a request, await response
    • Acceptor / Voter: Work in quorums, vote on requests. ⇒ Fault-tolerant memory
    • Proposer / Coordinator: Tries to convince acceptors that request is o.k.
    • Learner: replicator, take action when request has been granted
    • Leader / Special Coordinator: distinguished proposer, resolves conflict of multiple leaders
  2. Basic Idea

    • One (or more) node(s) decide(s) to be coordinator/proposer
    • Proposes a value and requests acceptance from other nodes
    • If it fails, it wil try again
    • Separate agreement on value from leader acceptance
  3. Dealing with multiple proposals

    • Use globally ordered proposal numbers (site-id:local-number)
    • Only accept the most recent proposal
      • acceptors will keep track on the highest proposal number accepted
    • on acceptance, tell the new proposer the previously accepted value
  4. Algorithm

    • Phase 1a (Prepare): Leader selects proposal number \(n\) and sends a prepare message to quorum of acceptors
    • Phase 1b (Promise):
      • If the proposal number \(n\) is larger than any previous proposal:
        • each acceptor promises not to accept proposals with a lower proposal number
        • sends a promise message including proposal number and value
      • otherwise acceptor sends denial
      • also each acceptor sends the value and number of its last accepted or promised proposal to proposer
    • Phase 2a: Accept
      • If proposer receives (positive) responses from quorum of acceptors
        • choose value to be agreed upon
        • value must be from values of acceptors that have already accepted a value
        • otherwise can choose any value
      • Send accept message to quorum of acceptors including the chosen value
    • Phase 2b: Accepted
      • If acceptor received accept message for most recent proposal it has promised
        • accept value
        • send accepted message to proposer and learner
      • otherwise: send denial and the last proposal number and value it has promised to accept
  5. Leader election concerns

    • Paxos is safe in presence of multiple leaders
    • Not guaranteed to make progress (livelock)
    • tradeoff in election: agressive (livelock), reluctant (protocol stalled)
    • Partial solution: use leader election timeout
  6. Multi-Paxos

    Optimize Paxos regarding message complexity.

    • First round can be skipped if proposer stays the same
    • Only the proposer is allowed to skip the 2nd round who succeded in the 1st round
  7. Paxos Commit (Idea)

    • Paxos ensures consesus over a quorum
    • Idea: run Paxos instance on each participant on the transacton
    • Commit possible if all participants manage to achieve consensos on commit
    • 5 message delays
    • \(F + 1\) acceptors, \(N\) participants
    • Number of messages \((N * F + F + 3N - 1)\)
      • RM sends BeginCommit message to leader
      • leader sends prepare message to every other RM (\(N - 1\))
      • RM sends ballot \(0\) phase 2a Prepared message for its instance of Paxos to \(F + 1\) acceptors (\(N(F + 1)\) messages)
      • For each RM’s instance of Paxos, an acceptor responds to a phase 2a message by sending a phase 2b prepared message to leader. (\(F + 1\) messages, bundlin!)
      • Single commit to each RM (\(N\) messages)
    • Co-locate acceptors with resource managers
    • Broadcast outcome
    • \(F = 0\) then 2PC

Chapter 10: Distributed Concurrency Control

Storing copies of data on different nodes enables availability, performance and reliability. Data needs to be consistent.

Page model

All data is mapped to read and write operations on pages. Inspect interleavings of resulting page operations to study concurrent execution.

Set of transactions: \(T = \{T_1, ..., T_n\}\) Sequence of \(R\) and \(W\) actions over objects \(A\), \(B\), …

History and Schedule

A transaction is ordered and all write operations can potentially depend on previously read inputs.

A history is a set of complete transactions (begin, commit or abort). Schedule of a history is a prefix of a history. Schedule is serial if it is not interleaved.

  1. Serializability

    A schedule is called (conflict-)serializable if there exists a (conflict-)equivalent serial schedule over the same set of transactions. Create a conflcit graph (Edge for WR, RW, WW) and if it is acyclic schedule is serializable.

    A transaction manager enforces certain transaction behaviour to exclude non serializable schedules.

    Attempt: 2PL. Every schedule according to 2PL is serializable, hower not every serializable schedule can be produced by 2PL and deadlocks may occur.

Local and global schedules

Partial order on OP with following properties:

Problem: transactions may seem serializable at local sites, but not on global. How to decide it locally observable sequences imply globally serializable schedule?

As we do not have replication of data items, whenever there is a conflict in a global schedule, the same conflict must be part of exactly one local schedule. Conflict graph of global schedule is given union of conclict graphs of respective local schedules. Either all or no corresponsing global schedule is serializable.

Types of federation

Interface to recovery: Every global transaction runs the 2-phase-commit protocol. By that protocol the subtransactions of a global transaction synchronize such that either all subtransactions commit, or none of them.

Homogeneous Concurrency Control

  1. Primary Site 2PL:

    • one site is selected at which lock maintenance is performed exclusively
    • this sites thus has global knowledge and enforcing the 2PL rule for global and local transactions is possible
    • lock manager simply has to refuse any futher locking of a subtranction \(T_{ij}\) whenever a subtransaction \(T_{ik}\) has started unlocking already
    • Much communication can result in bottleneck at primary site
  2. Distributed 2PL

    • When a server wants to start unlocking data items on behalf of a transaction, it communicates with all other servers regarding lock point of the other respective subtransaction
    • server has to receive locking completed message from each of these servers
    • implies extra communication between servers
  3. Distribtued Strong 2PL

    • every subtransaction of global transaction and every local transaction holds locks until commit
    • 2-phase-commit protocol the 2PL-rule is enforced as side-effect

Deadlock detection strategies

Serializability by assigning timestamps to transactions

  1. Time stamp protocol TS

    • each transaction \(T\) has unique timestamp \(Z(T)\) when it started
    • A transaction \(T\) must not write an object which has been read by any \(T'\) where \(Z(T') > Z(T)\)
    • A transaction \(T\) must not write an object which has been written by any \(T'\) where \(Z(T') > Z(T)\)
    • A transaction \(T\) must not read an object which has been written by any \(T'\) where \(Z(T') > Z(T)\)

    Can be combined with Lamport clocks.

Lock-based vs timestamp-based approaches

Heterogeneous Concurreny Control

  1. Rigorous schedules

    A local schedule \(S\) of a set of complete transaction is rigorous if for all involved transactions \(T_i\), \(T_j\) there holds: Let \(p_j \in \text{OP}, q_i \in \text{OP}_i, i \neq j\) such that \((p_j, q_i) \in \text{conf}(S)\). Then either \(a_j <_s q_i\) or \(c_j <_s q_i\)

  2. Commit-deferred transaction

    A global transaction \(T\) is commit-deffered if its commit action is sent by global transaction manager to the local sites of \(T\) only after the local executions of all subtransactions of \(T\) at that sites have been acknowledged.

  3. A schedule is serializable whenever it is rigorous

    Proof via contrary (not serializable)

  4. Ticket-based concurrency control

    • Each server guarantees serializable local schedules in a way unknown for global transactions
    • Each server maintains a special counter as database object, called ticket. Each subtransaction of global transaction at that server increments (reads and writes) the ticket. Doing so we introduce explicit conflicts between global transactions running at same server.
    • Global transaction manager guarantees that the order in which the tickets are accessed by subtransactions will imply linear order on global transactions

Chapter 11: Replication and (weaker) consistency

Two replication dimensions:

Tradeoffs

Method specific:

Synchronous replication protocols

  1. ROWA

    • write change to all replicas
    • read on any single replica
    • expensive write coordination (2PC)
    • cheap, highly available reads
    • low write availability (lower than w/o replication)
  2. Primary Copy

    • write change initially to single replicate
    • propagate changes in bulk to other replicas
    • coordination with read locks: request from primary
    • reduce write cost
    • increased read cost
  3. Quorum-Based Protocols

    • Idea: Clients request and aquire permission of multiple servers before either reading or writing a replicated data item
    • \(N\) replicas, contact at least \(\frac{N}{2} + 1\) servers to agree on update. Once agreed, all contacted servers process the update assigning new version to updated object
    • For read, client must first contact at least \(\frac{N}{2} + 1\) servers and ask them to send version number of local version. Will then read the replica with highest version number

    Can be generalized to arbitrary read quorum \(N_R\) and write quorum \(N_W\) such that:

    • \(N_R + N_W > N\)
    • \(N_W > \frac{N}{2}\)

    ⇒ quorum consensus method

CAP Theorem

From atomic data consistency, system availability and tolerance to network partition, only two can be achieved at the same time at any given time.

We can not avoide network partitions, so consistency and availability can not be achieved at same time.

Eventual consistency

Database Consistency: Anomalies

Database consistency classes

DS Consistency classes

Session guarantees

Sicky session guarantees

Causes for unavailability

Consistency Trade-offs

C P A
Strong consistency See all prev writes A D F
Eventual consistency Subsect of prev writes D A A
Consistent prefix See init sequence of writes C B A
Bounded staleness See all “old” writes B C C
Monotonic reads See increasing subset of writes C B B
Read my writes See all writes performed by reader C C C