This document is derived of the lecture by Prof. Dr. P. Fischer & Dr. S. Köhler at the University of Freiburg in semester SS16.

Chapter 1: Basics & System Models

Def.: […] Collection of individual computing devices that can communicate with each other. […] Each processor in a distributed system has its semiindependent agenda, but for various reasons, including sharing of resources, availablity, and fault tolerance, processors need to coordinate their actions. – Attiya, Welch 2004


Lot’s of nodeterminism! Unpredictable delays, failures, actions, concurrenty and uncertainty. Much harder to get this right, important to have theoretical tools to argue about correctness. Easier to go from theory to pracitce than vice versa.

Two models

Message Passing


  1. System consists of \(n\) deterministic nodes \(v_1, ..., v_n\) and pairwise communication channels
  2. At each time, each node \(v_i\) has some internal state \(Q_i\)
  3. System is event-based: states change based on discrete events

Schedules and Executions


Shared Memory


Modeling assumptions:




Correctness of Distributed Systems

Three kinds of correctness properties:

Local Schedules

A node \(v\)’s state is determined by \(v\)’s inputs and observable events.


For two schedules \(S\) and \(S'\) and for a node \(v_i\) with the same inputs in \(S\) and \(S'\), we have \(S|i= S'|i\), then the next action performed by \(v_i\) is the same in both schedules \(S\) and \(S'\).

Asynchronous executions

Chapter 2: The Two Generals Problem

Non-trivial distributed services must coordinate their actions. Basic coordination: agreeing on some action / fact.

The problem: two red armies, one blue army. How to coordinate attacks by red armies only using pigeons that might not make it.

Two generals problem formally

Two generals: proved to be unsolvable

Using the indistinguishability proof:


Chain of executions \(E_0, ..., E_k\) such that \(\forall i \in \{1, ..., k\} : E_{i-1} \sim_v E_i\)


Solving two generals: randomized algorithm

Level Algorithm


  1. init levels to \(0\)
  2. in each round: both nodes send current level to each other
  3. new level at \(u\): \(l_u := \text{max}\{l_u, l_v + 1\}\)


Randomized two generals algorithm

Two nodes \(u\) and \(v\), \(u\) is leader node. Possible inputs \(0\) and \(1\).

  1. Node \(u\) picks random number \(R \in \{1, ..., r\}\)
  2. Nodes run level algorithm for \(r\) rounds. Also include inputs and node \(u\) also includes \(R\) in message.
  3. At the end, node decides \(1\) if:
    • both inputs are equal to \(1\)
    • node knows \(R\) and has seen both inputs
    • level of the node is \(\geq R\)
  4. Otherwise, decides \(0\)

Lower Bound on Error Probability

For the two generals problem, \(\epsilon\) is:

Chapter 3: Broadcast, Convergecast and Spanning Trees



Source node \(n\) needs to broadcast message \(M\) to all nodes, all nodes have unique ID, only knows IDs of neighbors.


  1. source node \(s\): send \(M\) to all neighbors
  2. all other nodes: when receiving \(M\) from some neighbor \(v\) and has not been received before, send \(M\) to all neighbors except \(v\).

After \(r\) rounds, exactly nodes at distance \(\leq r\) from \(s\) known \(M\).

Diameter of \(G\)

\[\text{diam}(G) := \text{max}_{u, v \in V} \text{dist}_G(u, v) = \text{max}_{s \in V}\text{rad}(G, s)\]

Time complexity of flooding in synchronous systems: \(rad(G, s)\)

\[\frac{\text{diam}(G)}{2} \leq \text{rad}(G) \leq \text{rad}(G, s) \leq \text{diam}(G)\]

Asynchronous complexity

  1. determine running time of execution
  2. asynch. time complexity = max. running time of any exec

Running time of execution: normalize message delays such that maximum delay is \(1\) time unit

Def. Asynchronous Time Complexity: Total time of a worst-case execution in which local computations take time \(0\) and all message delays are at most \(1\) time unit.

Flooding Spanning Tree

Flooding algorithm can be used to compute spanning tree of the network. Idea: Source \(s\) is root, all other nodes parent is first sender of \(M\)


Opposite of broadcst, given a rooted spanning tree: communicate from all nodes to the root.

Flooding/Echo algorithm

  1. use flooding to construct a tree
  2. use convergecast to report back to the root node when done

Distributed Dijkstra

Unweighted graph, grow tree level by level. Algorithm:

  1. Root node boradcasts “start phase \(r + 1\)” in current ree
  2. Leaf nodes (level \(r\) nodes) send “join \(r + 1\)” to neighbors
  3. Node \(v\) receiving “join \(r + 1\)” from neighbor \(u\):
    1. first such message: \(u\) becomes parent of \(v\), \(v\) sends ACK to \(u\)
    2. otherwise, \(v\) sends NACK to \(u\)
  4. After receiving ACK or NACK from all neighbors, level \(r\) nodes report back to root by starting convergecast
  5. When that terminates, the root can start next phase

Distributed Bellman-Ford

Basic idea: Each node \(u\) stores an int \(d_u\) with current guess for distance to root node \(s\). When a node \(u\) can improve \(d_u\), inform neighbors.

  1. init step
    • \(d_s := 0\)
    • \(\forall v \neq s: d_v := \infty\) and \(\text{parent}_v := \bot\)
  2. root \(s\) sends \(1\) to all neighbors
  3. For all other nodes \(u\). When receive message \(x\) with \(x < d_u\) from neighbor \(v\):
    • set \(d_u := x\)
    • set \(\text{parent}_u := v\)
    • send \(x + 1\) to all neighbors execpt \(v\)

Distributed BFS Tree Construction


⇒ Optimal!


Time Messages
Distributed Dijkstra \(O(\text{diam}(G)^2)\) \(O(E + V * \text{diam}(G))\)
Distributed Bellman-Ford \(O(\text{diam}(G))\) \(O(E * V)\)
Best known trade-off \(O(\text{diam}(G) * log^3V)\) \(O(E + V * log^3V)\)

Best known trade-off based on synchronizers


Synchronous algorithms are simple, but systems asynchronous. Idea: run synch. algorithm in async. systems. The synchronizer simulates rounds at all nodes. Important: local schedules are the same as in sync. exec.

Simple local synchronizers

To detect if all messages of current round received: always send a message (dummy message).

Synchronizer \(S\)

For simple synchronizer: \(T(S) = 1\), \(M(S) = 2|E|\). Other trade-offs possible!

BFS Tree with synchrozier

Slightly better than distributed Bellman-Ford. Best BFS algorithm is based on best known synchronizer

Leader Election

How to pick root node in distributed system? Example: pick node with smallest ID.

For node \(u\) store smallest known ID in variable \(x_u\).

  1. Initial step: \(u\) set \(x_u := \text{ID}_u\) and sends \(x_u\) to all neighbors
  2. When receiving \(x_v < x_u\) from neighbor \(v\):
    • \(x_u := x_v\)
    • send \(x_u\) to all neighbors except \(v\)

Problems: message complexity is high, when/how to terminate! Can both be solved at some cost.

Chapter 4: Causality, Logical Time and Global States

Logical Clocks

Assign a timestamp to all events in an asynchronous message-passing system

Causal Shuffles

A schedule \(S'\) is a causal shuffle of schedule \(S\) iff \[\forall v \in V: S|v = S'|v\]

Observation: If \(S'\) is causal shuffle of \(S\), no node can distringuish between \(S\) and \(S'\)

Causal Order

Logical clocks are based on causal order of events: Event \(e\) should occur before \(e'\) if it provably occurs before \(e'\). Clock value \(t_e < t_{e'}\) !

*Event \(e\) provably occurs before \(e'\) iff \(e\) appars before \(e'\) in all causal shuffles of \(S*\)

Lamport’s Happens-Before-Relation

MPS: only send and receive events.

Two events \(e\) and \(e'\) occurring at \(u\) and \(u'\). \(t\) and \(t'\) are times when \(e\) and \(e'\) occur.

\(e\) provably occurs before \(e'\) if:

  1. The events occur at the same node and \(e\) occurs before \(e'\)
  2. Event \(e\) is a send event and \(e'\) the recv. event of the same message
  3. The is an event \(e''\) for which we know provably that \(e\) occurs before \(e''\) and \(e''\) occurs before \(e'\)

Def: happens-before-relation: \(\Rightarrow_s\) is pairwise relation on send/receive events of S. Contains:

  1. All pairs \((e, e')\) where \(e\) precedes \(e'\) in \(S\) and event of same node
  2. All pairs \((e, e')\) where \(e\) is send event and \(e'\) receive event for the same message
  3. All pairs \((e, e')\) where there is a third event \(e''\) such that \[e \Rightarrow_s e'' \wedge e'' \Rightarrow_s e'\]

Happens-Before and Causal Shuffles

Theorem: Given a schedule \(S\) and two events \(e\) and \(e'\), the following statements are equivalent:

Lamport Clocks

  1. Each event \(e\) gets a clock value \(\tau(e) \in \mathbb{N}\)
  2. If \(e\) and \(e'\) are events at the same node and \(e\) precedes \(e'\) then \[\tau(e) < \tau(e')\]
  3. If \(s_M\) and \(r_M\) are send and receive events of some msg. \(M\), then \[\tau(s_M) < \tau(r_M)\]


  1. initial counter at node \(n\) is \(c_u = 0\)
  2. For any non-recieve event \(e\) at node \(u\): \(c_u := c_u + 1\); \(\tau(e) := c_u\)
  3. For any send event \(s\) at \(u\), attach value of \(\tau(s)\) to message
  4. For any receive event \(r\) at \(u\) (corresp. send event \(s\)) compute: \[c_u := max\{c_u, \tau(s)\} + 1; \tau(r) := c_u\]


Neiger-Toueg-Welch Clocks


Fidge-Mattern Vector Clocks


  1. All noes keep vector \(VC(u)\) with an entry for all nodes \(V\) initialized to \(0\).
  2. For all non-receive event \(e\) at \(u\): \[VC_u(u) := VC_u(u) + 1; VC(E) := VC(u)\]
  3. For any send event \(s\) at node \(u\), attach \(VC(s)\) to \(s\) message
  4. For recieve event \(r\) at \(u\) (corresp. send event \(s\)) compute: \[\forall v \neq u: VC_v(u) := max \{VC_v(s), VC_v(u)\}; VC_u(u) := VC_u(u) + 1; VC(r) := VC(u)\]

Vector Clocks and Happens-Before

\[VC(e) < VC(e') \leftrightarrow e \Rightarrow_s e'\]

Local Clock vs. Synchronizers

Clock pulses generated by synchronizer can also be seen as logical clocks.


Consistent cuts (Global State)

It is not possible to record global state at any given time of execution. Therfore, best option is to find a global state which could have been true at some time. ⇒ Global Snapshot

Cut: Given a schedule \(S\), a cut is a subset \(C\) of the events of \(S\) such that for all nodes \(v \in V\) the events in \(C\) happening at \(v\) form a prefix of the sequence of events in \(S|v\).

Consistent Cut: Given a schedule \(S\), a consistent cut \(C\) is a cut such that for all events \(e \in C\) and for all events \(f\) in \(S\) it hols that \[f \Rightarrow_s e \rightarrow f \in C\]

Given a schedule \(S\), a cut \(C\) is consistent cut iff for each message \(M\) with send event \(s_M\) and receive event \(r_M\), if \(r_M \in C\) then it also holds that \(s_M \in C\).

Consistent Snapshot

A consistent snapshot is a global system state which is consistent with all local views.

Computing a Consistent Snapshot (with Logical Clocks)

Claim: \(\forall \tau_0 \geq 0 : C(\tau_0)\) is consistent cut.


Goals: Capture consistent snapshot in running system.

  1. Node \(s\) records its state
  2. When node \(u\) receives a marker message from node \(v\):
    • if \(u\) has not recorded its state then
      • \(u\) records its state
      • \(u\) starts recording messages on all incoming chanels
    • else
      • \(u\) stops recording messages on incoming channel from \(v\) to \(u\)
      • set of msg. in transit from \(v\) to \(u\) is set of recorded messages
  3. after node \(u\) records its state:
    • node \(u\) sends marker message on all outgoing channels (before any other message)

Chapter 5: Clock Synchronization

Lead question: how to get synchronized clocks?

logical time (“happens-before”) vs. physical time

Propertes of clock sync. algorithm


Clock drift. Deviation from the nominal rate dependent on power supply, temerature, etc. TinyNodes hav max. drift of 30-50 ppm (50µs per second)


Clock sync via UDP. Packet delay is estimted to reduce clock skew.

Propagation delay estimation

Measure Round-Trip time (RTT). \(t_1\) is request time on A, \(t_2\) is request received time on B, \(t_3\) is answer time on B, \(t_4\) is answer received time on A.

Delay: \[\delta = \frac{(t_4 - t_1) - (t_3 - t_2)}{2}\] Clock skew: \[\Omega = \frac{(t_2 - (t_1 + \delta)) - (t_4 - (t_3 + \delta))}{2} = \frac{(t_2 - t_1) + (t_3 - t4)}{2}\]

Message jitter

Various sources of errors (deterministic and non-deterministic). Fix by timestamping packets at MAC layer.


Precision time protocol (PTP), similar to NTP.

Hardware clock distribution

Synchronous digital circuits require all components to act in sync. The bigger the clock skew, the longer the clock period. Optimize routing, insert buffers.

Clock Sync. tricks in wireless networks

Best tree for tree-based clock sync?

Finding a good tree for clock synch is a tough problem. Find spanning tree with small (maximum or average) strech. Hard problem with approx. algorithms.

Clock sync tricks (GTSP)

Gradient time sync protocol.

Results in all nodes consistenly averaging errors to all neighbors. In Tree-like algorithms (e.g. FTSP) we can get a bad local skew.

Theory of clock sync

Formal model

\(A^{\text{max}}\) algorithm

  1. set local clock to maximum clock value received from any neighbor
  2. if recv. value \(>\) previously forwarded value, forward immediately
  3. at least forward local logical clock value every \(T\) time step.


Can we do better?

Skew lower bounds

Chapter 6: Consensus




Output of all non-failing nodes are equal


If all inputs are equal to \(x\), all outputs are equal to \(x\)

Depends on failure model


All non failing processes terminate after finite number of steps


Shared Memory




Def: Usually means turing-computable ⇒ strong math. model

Shared-memory computability: Model of async concuccrent computation. Wait-free computable on multiprocessor. Wait-free?

Wait-free Shared Model

*Wait-free* : Every process completes in finite number of steps, no locks, we assume that we have wait-free atomic registers.

Every processor does:

r = read(c);
if (r == "?") {
   write(c, x_i); decide(x_i);
} else {

Read-Modify-Write Memory


Cell \(c\) and function \(f\). Method call: Replace value \(x\) of cell \(c\) with \(f(x)\). Return value \(x\) from cell \(c\)

Consensus Number

An object has consensus number \(n\) if it can be used to implement \(n\)-process consensus, but not (\(n + 1\))-process consensus

Example: Atomic read/write registers have consensus number \(1\). Works with \(1\) process, impossibility with \(2\).


If you can implement \(X\) from \(Y\) and \(X\) has the consensus number \(c\) then \(Y\) has consensus number at least \(c\). ⇒ Useful way of measuring synchronization power


A RMW is non-trivial if there eists a value \(v\) such that \(v \neq f(v)\):

Any non-trivial RMV object has consensus number at least \(2\). Implies no wait-free implementation of RMW registers from read/write registers.

Interfering RMW

Let \(F\) be a set of functions such that for all \(f_i\) and \(f_j\) either the commute or they overwrite. Any such of RMW objects has consensus number of exactly \(2\)

Synchronous System

One can sometimes tell if a processor had crashed (timeouts, broken connection)

Simple consensus algorithm

Each process:

  1. broadcast own value
  2. decide on the minimum of all received values

Problem: no consensus if a processor fails

\(f\)-Resilient consensus algorithm

If an algorithm solves consensus for \(f\) failed processes, we say it is an \(f\)-resilient consensus algorithm.

Each process:

  1. broadcast own value
  2. round 2 to round \(f + 1\): broadcast minimum of received values unless it has been sent before.
  3. end of round \(f + 1\): decode on the minimum value received

\(f = 2\) failures, so \(f + 1\) rounds needed

Theorem: If at most \(f \leq n - 2\) of \(n\) nodes of a synchronous message passing system can crash, at least \(f + 1\) rounds are needed to solve consensus.

Byzantine Failures

A node can not only crash and stop forever, but it may also recover or are damaged (different processes receive different values)

Theorem: There is no \(f\)-resilient Byzantine consensus algorithm for \(n\) nodes for \(f \geq \frac{n}{3}\).

Simple byzantine agreement algorithm

Works for any \(f\) and \(n > 3f\), which is optimal. Only takes \(f + 1\), optimal for crash failures. Works for any input, not just binary input.

Problem: not easy to formalize, size of the message increases exponentially.

Queen Algorithm




King Algorithm




B.A. Using athentication


In each round \(r \in \{1, ..., f + 1\}\):

After \(f + 1\) rounds: decide value



Randomized algorithm


In each round \(r\)

  1. Wait for \(n - f\) proposals from round \(r - 1\)
  2. If at least \(n - 2f\) proposals have some value \(y\)
    • \(x := y\), decide on \(y\)
    else if at least \(n - 4f\) proposals have some value \(y\)
    • \(x := y\)
    • choose \(x\) randomly with \(Pr[x = 0] = Pr[x = 1] = \frac{1}{2}\)
  3. Broadcast proposal(\(x\), \(r\))
  4. If decided on value → stop


Algorithm satisfies validity and agreement conditions, but not termination condition. Also: very slow

Shared coin

A shared coin is a random binary variable that is \(0\) with constant probability and \(1\) with constant probability. Used in randomized algorithm to speed it up.

Assume only crash failures, no byzantine failures!


Subroutine terminates, at least \(\frac{1}{3}\) of all coins are seen by everybody. There are at least \(f + 1\) coins in at least \(f + 1\) coin sets

All processes decide \(0\) with constant probabilty and \(1\) with constant probabilty.


Note: there is a constant expected time algorithm that tolerates \(f < \frac{n}{2}\) crash failures

Byzantine and asynchronous

Can be solved, but message size remains polynomial and message complexity \(O(n^3)\). Does not use global coin, but tries to detect biased local coinflips from Byzantine processes.

Chapter 8: Distributed System Architectures


One tier

Two tier


Three tier

Multiprocessor Architectures

Shared memory

Shared Disk


Communication Paradigms

Chapter 9: Reliability

Availability: probabilty for a server to be read/to serve: \[\frac{\text{MTBF}}{\text{MTBF} + \text{MTTR}}\] MTBF: mean time between failure MTTR: mean time to repair

⇒ fast recovery is key to high availability


Three types of failures:

Types of Storage


*Steal+No-Force* : Improves throughput and latency, but makes recovery more complicated.

ARIES Algorithm

*A*lgorithm for *R*ecovery and *I*solation *E*xploiting *S*emantics

Principles of ARIES

  1. Write-Ahead Logging: Record database changes before actual change
  2. Repeating History During Redo: After crash, bring system back to the exact state at crash time, undo active transactions
  3. Logging Changes During Undo: Log changes during transaction undo so that they are not repeated in case of repeated failures


Any change to a database object ist first recorded in the log, which must be written to stable storage before the change itself is written to disk.

Log Information

Redo Information: ARIES does page-oriented redo, stores byte images of pages before and after modification.

Undo Information: ARIES assumes logical undo, record the actual tuple changes

Writing Log Records

During normal transaction processing keep in each transaction control block:

ARIES Transaction Rollback

Undo operations must be recorded to WAL. Use compensation log records for this purpuse. Never undo an undo action, but might need to redo undo action.

ARIES Crash Recovery

  1. Analysis Phase
    • read log in forward direction
    • find active transactions where failure happened (“losers”)
  2. Redo Phase
    • Replay the log to bring the system into state as of the time of system failure
    • restore the losers
  3. Undo Phase
    • Roll back all loser transactions

Commit coordination and consensus

Set of independent servers with transactions spanning several servers, i.e. subtransactions at some sites. For successfull commit, all subtransaction have to be committed.

Approach: Commit protocols. Fundamental approach is to rely on leader/coordinator. Leader proposes value or client talks to leader, participants decide and inform coordinator.


2-Phase-Commit (2PC)



Decentralized variant

3-Phase-Commit (3PC)

Unblock 2PC by spreading decision knowledge

Without network splits all would be good, but with network splits two new coordinators could arrise that arrive at different values.


Safe, nonblocking, making progress if possible, resilient to failures, delays and network partitions



Basic Idea

Dealing with multiple proposals


Leader election concerns


Optimize Paxos regarding message complexity.

Paxos Commit (Idea)

Chapter 10: Distributed Concurrency Control

Storing copies of data on different nodes enables availability, performance and reliability. Data needs to be consistent.

Page model

All data is mapped to read and write operations on pages. Inspect interleavings of resulting page operations to study concurrent execution.

Set of transactions: \(T = \{T_1, ..., T_n\}\) Sequence of \(R\) and \(W\) actions over objects \(A\), \(B\), …

History and Schedule

A transaction is ordered and all write operations can potentially depend on previously read inputs.

A history is a set of complete transactions (begin, commit or abort). Schedule of a history is a prefix of a history. Schedule is serial if it is not interleaved.


A schedule is called (conflict-)serializable if there exists a (conflict-)equivalent serial schedule over the same set of transactions. Create a conflcit graph (Edge for WR, RW, WW) and if it is acyclic schedule is serializable.

A transaction manager enforces certain transaction behaviour to exclude non serializable schedules.

Attempt: 2PL. Every schedule according to 2PL is serializable, hower not every serializable schedule can be produced by 2PL and deadlocks may occur.

Local and global schedules

Partial order on OP with following properties:

Problem: transactions may seem serializable at local sites, but not on global. How to decide it locally observable sequences imply globally serializable schedule?

As we do not have replication of data items, whenever there is a conflict in a global schedule, the same conflict must be part of exactly one local schedule. Conflict graph of global schedule is given union of conclict graphs of respective local schedules. Either all or no corresponsing global schedule is serializable.

Types of federation

Interface to recovery: Every global transaction runs the 2-phase-commit protocol. By that protocol the subtransactions of a global transaction synchronize such that either all subtransactions commit, or none of them.

Homogeneous Concurrency Control

Primary Site 2PL:

Distributed 2PL

Distribtued Strong 2PL

Deadlock detection strategies

Serializability by assigning timestamps to transactions

Time stamp protocol TS

Can be combined with Lamport clocks.

Lock-based vs timestamp-based approaches

Heterogeneous Concurreny Control

Rigorous schedules

A local schedule \(S\) of a set of complete transaction is rigorous if for all involved transactions \(T_i\), \(T_j\) there holds: Let \(p_j \in \text{OP}, q_i \in \text{OP}_i, i \neq j\) such that \((p_j, q_i) \in \text{conf}(S)\). Then either \(a_j <_s q_i\) or \(c_j <_s q_i\)

Commit-deferred transaction

A global transaction \(T\) is commit-deffered if its commit action is sent by global transaction manager to the local sites of \(T\) only after the local executions of all subtransactions of \(T\) at that sites have been acknowledged.

A schedule is serializable whenever it is rigorous

Proof via contrary (not serializable)

Ticket-based concurrency control

Chapter 11: Replication and (weaker) consistency

Two replication dimensions:


Method specific:

Synchronous replication protocols


Primary Copy

Quorum-Based Protocols

Can be generalized to arbitrary read quorum \(N_R\) and write quorum \(N_W\) such that:

⇒ quorum consensus method

CAP Theorem

From atomic data consistency, system availability and tolerance to network partition, only two can be achieved at the same time at any given time.

We can not avoide network partitions, so consistency and availability can not be achieved at same time.

Eventual consistency

Database Consistency: Anomalies

Database consistency classes

DS Consistency classes

Session guarantees

Sicky session guarantees

Causes for unavailability

Consistency Trade-offs

Strong consistency See all prev writes A D F
Eventual consistency Subsect of prev writes D A A
Consistent prefix See init sequence of writes C B A
Bounded staleness See all “old” writes B C C
Monotonic reads See increasing subset of writes C B B
Read my writes See all writes performed by reader C C C