This document is derived of the lecture by Prof. Dr. P. Fischer & Dr. S. Köhler at the University of Freiburg in semester SS16.

## Chapter 1: Basics & System Models

Def.: […] Collection of individual computing devices that can communicate with each other. […] Each processor in a distributed system has its semiindependent agenda, but for various reasons, including sharing of resources, availablity, and fault tolerance, processors need to coordinate their actions. – Attiya, Welch 2004

### Challenges

• How to organize a distributed system. How to share computation / data, communication infra, …
• Coordination of multiple (heterogeneous) nodes
• Each node has local view only (→ no global time!)
• Agreement on steps to perform
• All of this in presence of asynchrony, message losses and faulty, lazy, malicious or selfish nodes

Lot’s of nodeterminism! Unpredictable delays, failures, actions, concurrenty and uncertainty. Much harder to get this right, important to have theoretical tools to argue about correctness. Easier to go from theory to pracitce than vice versa.

### Two models

• Message Passing: Nodes/processes interact by exchanging messages, fully connected topology or arbitrary topology
• Shared Memory: Processes interact by reading/writing from/to common global memory
• Generally, the two models can simulate each other
• Most things discussed hold for both models

### Message Passing

• Used to model large (decentralized) system and networks
• Except for small-scale systems, real systems are implemented based on exchanging messages
• Certainly the right model for large systems with large number of machines, but also for many other practical systems

#### Formally:

1. System consists of $$n$$ deterministic nodes $$v_1, ..., v_n$$ and pairwise communication channels
2. At each time, each node $$v_i$$ has some internal state $$Q_i$$
3. System is event-based: states change based on discrete events
• Internal State: inputs, local variables, some local clocks, history of whole sequence of observed events
• Types of events
• Send event $$s_{ij}$$: Some node $$v_i$$ puts a message on the communication channel to node $$v_j$$
• Receive Event $$r_{ij}$$: Node $$v_i$$ received message from $$v_j$$
• Timing Event: Event triggered at node by local clock

#### Schedules and Executions

• Configuration $$C$$: Vector of al $$n$$ node states (at given time)
• Execution Fragment: Sequence of alternating configurations and events
• Execution: execution fragment that starts with initial config $$C_0$$
• Schedule: execution without the configurations, but including inputs

#### Remarks

• Local state: The local state of $$v_i$$ does not include the states of messates sent by $$v_i$$
• Adversary: within the timing guarantees of the model (synchrony assumptions), execution/schedule is determined in a worst-case way
• Deterministic nodes: in the basic model, all nodes are deterministic. This will be relaxed. Model details / adversary more tricky

### Shared Memory

• Classic model to study many standard coordination problems
• multi-core processors and multi-threaded programs
• convenient abstraction from programming

### Synchrony

Modeling assumptions:

• Bounded message delays / process speeds: Nodes can measure time differences and there is a (known) upper bound on message delays / time to perform 1 step. Model is equivalent to the synchronous model. $$1$$ round = $$T$$ time units.
• Partial synchony: There is an upper bound on message delays / process speeds: Either not known to nodes, or only starts to hold at some unknown time

#### Synchronous

• System: System runs in synchronous time steps (rounds). Discrete time. Round $$r$$ takes place between time $$r - 1$$ and time $$r$$
• Message passing: Round $$r$$: at time $$r - 1$$ each process sends out messages, delivered and processed at time $$r$$
• Shared memory: in each round every process can access one memory cell

#### Asynchronous

• System: process speeds and message delay are finite but unpredictable. Assumption: process speeds / message delays are determined in a worst-case way by an adversarial scheduler
• Message passing: Messages are always delivered but can have arbitrary delays
• Shared Memory: All processes eventually do their next steps, process speeds are arbitrary

### Failures

• Crash failure: node stops working at some point in execution, can be in the middle of a round
• Byzantine failure: node starts behaving in a complete arbitrary way, different byzantine nodes might collude
• Omission failure: node communitaction link stops working temoprarily
• Resilience: number of failing nodes tolerated

### Correctness of Distributed Systems

Three kinds of correctness properties:

• Safety: Nothing bad ever happens. No bad reachable states in the system, proved using invariants, every possible transition keeps a safe system state
• Liveness: Something good eventually happens. Not a property of a system state but of system executions. Property must start holding at some finite time. Proofs usually depend on other more basic liveness properties, eg: all messages in the system are eventually delivered
• Fairness: Something good eventually happens to everyone. Strong kind of liveness porperty that avoids starvation (node cannot make progress).

### Local Schedules

A node $$v$$’s state is determined by $$v$$’s inputs and observable events.

• Schedule Restriction: Given a schedule $$S$$ we define the restriction $$S|i$$ as the subsequence of $$S$$ consisting $$v_i$$’s inputs and of all events happening at node $$v_i$$.

#### Indistinguishability

For two schedules $$S$$ and $$S'$$ and for a node $$v_i$$ with the same inputs in $$S$$ and $$S'$$, we have $$S|i= S'|i$$, then the next action performed by $$v_i$$ is the same in both schedules $$S$$ and $$S'$$.

### Asynchronous executions

• Only minimal restrictions on when messages are delivered and when local computations are done
• A schedule is called admissible if
1. there are infinitely many computation steps for each node
2. every message is eventually delivered
• 1 and 2 are fairness conditions
• 1 assumes that nodes do not explicity terminate
• Alternative condition:
1. every node has either infinitely many computation steps ore it reaches an explicit halting state

## Chapter 2: The Two Generals Problem

Non-trivial distributed services must coordinate their actions. Basic coordination: agreeing on some action / fact.

The problem: two red armies, one blue army. How to coordinate attacks by red armies only using pigeons that might not make it.

### Two generals problem formally

• Model: two deterministic nodes, synchronous communication, unreliable messages
• Input: node starts with one of two possible inputs: $$0$$ or $$1$$.
• Output: Each node needs to decide either $$0$$ or $$1$$.
• Agreement: Both nodes must make the same decision
• Validity: If both nodes have same input $$x$$ and no messages are lost, both nodes output $$x$$
• Termination: Bounded number of rounds until termination

### Two generals: proved to be unsolvable

Using the indistinguishability proof:

• Execution $$E$$ is indistinguishable from execution $$E'$$ for some node $$v$$ if $$v$$ sees the same things in both executions.
• If $$E$$ is indistinguishable from $$E'$$ for $$v$$, then $$v$$ does the same thing in both exectuions. $$E|v = E'|v$$

Similarity:

• $$E_i$$ and $$E_j$$ similar if $$E_i|v = E_j|v$$ for some node $$v$$: $E_i \sim_v E_j \Leftrightarrow E_i|v = E_j|v$

Chain of executions $$E_0, ..., E_k$$ such that $$\forall i \in \{1, ..., k\} : E_{i-1} \sim_v E_i$$

• If $$E_{i-1} \sim_v E_i$$, then $$v$$ outputs same decision in both executions.
• Both nodes output same decision therefore all nodes ouput same value in all executions

• $$T$$-round protocol, sequence of executions $$E_0, ..., E_k$$ such that: $$\forall i \in \{1, ..., k\} : E_{i-1} \sim_v E_i$$ for some node $$v \in \{v_1, v_2\}$$
• $$E_0$$: both inputs are $$0$$, output needs to be $$0$$, no message loss
• $$E_1$$: both inputs are $$1$$, output needs to be $$1$$, no message los

• Validity: Both nodes output $$1$$ in $$E_k$$.
• Similarities: Both nodes output $$0$$ in $$E_k$$.

### Solving two generals: randomized algorithm

• Problem can be solved if one of the to generals flip coins
• Agreement is achieved with probability $$1 - \epsilon$$, small $$\epsilon$$

#### Level Algorithm

• Both nodes compute a level
• at end, levels differ by at most one
• levels essentially measure number of successful back and forth transmissions

Steps:

1. init levels to $$0$$
2. in each round: both nodes send current level to each other
3. new level at $$u$$: $$l_u := \text{max}\{l_u, l_v + 1\}$$

Properies:

• at all times, levels differ by at most one
• if all messages delivered, tow levels are equal to number of rounds
• level $$l_u$$ of node $$u$$ is $$0$$ if all messages to $$u$$ have been dropped

#### Randomized two generals algorithm

Two nodes $$u$$ and $$v$$, $$u$$ is leader node. Possible inputs $$0$$ and $$1$$.

1. Node $$u$$ picks random number $$R \in \{1, ..., r\}$$
2. Nodes run level algorithm for $$r$$ rounds. Also include inputs and node $$u$$ also includes $$R$$ in message.
3. At the end, node decides $$1$$ if:
• both inputs are equal to $$1$$
• node knows $$R$$ and has seen both inputs
• level of the node is $$\geq R$$
4. Otherwise, decides $$0$$

### Lower Bound on Error Probability

• using similar techniques, lower bound on the error probability can be proven.
• stronger validty condition: if at least one input is $$0$$, both nodes need to output $$0$$.

• Prove lower bound by assuming both inputs are $$1$$:
1. if no messages are lot, outputs must be 1
2. otherwise, the nodes will output same value with probability at least $$1 - \epsilon$$.

For the two generals problem, $$\epsilon$$ is:

• strong version: $\epsilon \geq \frac{1}{r}$
• original version: $\epsilon \geq \frac{1}{2r + 1}$

## Chapter 3: Broadcast, Convergecast and Spanning Trees

Assumptions:

• Network: message passing system with arbitrary topology
• undirected graph $$G = (V, E)$$
• messages delivered in finite time
• message delays unpredictable
• algorithms event based

Source node $$n$$ needs to broadcast message $$M$$ to all nodes, all nodes have unique ID, only knows IDs of neighbors.

#### Flooding

1. source node $$s$$: send $$M$$ to all neighbors
2. all other nodes: when receiving $$M$$ from some neighbor $$v$$ and has not been received before, send $$M$$ to all neighbors except $$v$$.

After $$r$$ rounds, exactly nodes at distance $$\leq r$$ from $$s$$ known $$M$$.

• Given a node $$s \in V$$, radius of $$s$$ in $$G$$: $\text{rad}(G, s) := \text{max}_{v \in V} \text{dist}_G(s, v)$
• radius of $$G$$: $\text{rad}(G) := \text{min}_{s \in V}\text{rad}(G, s)$
##### Diameter of $$G$$

$\text{diam}(G) := \text{max}_{u, v \in V} \text{dist}_G(u, v) = \text{max}_{s \in V}\text{rad}(G, s)$

##### Time complexity of flooding in synchronous systems: $$rad(G, s)$$

$\frac{\text{diam}(G)}{2} \leq \text{rad}(G) \leq \text{rad}(G, s) \leq \text{diam}(G)$

#### Asynchronous complexity

• message delays for local computations are arbitrary
• for analysis: message delays $$\leq 1$$ time unit, local computations take $$0$$ time
1. determine running time of execution
2. asynch. time complexity = max. running time of any exec

Running time of execution: normalize message delays such that maximum delay is $$1$$ time unit

Def. Asynchronous Time Complexity: Total time of a worst-case execution in which local computations take time $$0$$ and all message delays are at most $$1$$ time unit.

• time complexity of flooding from a source $$s$$ in an asynch network $$G$$ is $$\text{rad}(G, s)$$
• message complexity (no. of messages): $$O(|E|)$$

#### Flooding Spanning Tree

Flooding algorithm can be used to compute spanning tree of the network. Idea: Source $$s$$ is root, all other nodes parent is first sender of $$M$$

• In synchronous systems: distance of $$v$$ to root is round which $$v$$ is reached. Tree preserves graph distances to root node ⇒ BFS Tree
• In async. system: depth of tree can be $$n - 1$$ even if radius/diameter of graph is $$1$$.

### Convergecast

Opposite of broadcst, given a rooted spanning tree: communicate from all nodes to the root.

• time complexity: depth of tree
• message complexity: edges in tree
• useful for: max, min, sum, average, median, termination detection, …

#### Flooding/Echo algorithm

1. use flooding to construct a tree
2. use convergecast to report back to the root node when done
• time complexity: $$O(diam(G))$$ (depth of tree). In async. systems: $$\Omega(n)$$
• message complexity: $$2E$$
• low diameter spanning trees important! How to construct in async. system?

### Distributed Dijkstra

Unweighted graph, grow tree level by level. Algorithm:

1. Root node boradcasts “start phase $$r + 1$$” in current ree
2. Leaf nodes (level $$r$$ nodes) send “join $$r + 1$$” to neighbors
3. Node $$v$$ receiving “join $$r + 1$$” from neighbor $$u$$:
1. first such message: $$u$$ becomes parent of $$v$$, $$v$$ sends ACK to $$u$$
2. otherwise, $$v$$ sends NACK to $$u$$
4. After receiving ACK or NACK from all neighbors, level $$r$$ nodes report back to root by starting convergecast
5. When that terminates, the root can start next phase
• time complexity: $$O(\text{diam}(G)^2)$$
• message complexity: $$O(|E| + |V| * \text{diam}(G))$$

### Distributed Bellman-Ford

Basic idea: Each node $$u$$ stores an int $$d_u$$ with current guess for distance to root node $$s$$. When a node $$u$$ can improve $$d_u$$, inform neighbors.

1. init step
• $$d_s := 0$$
• $$\forall v \neq s: d_v := \infty$$ and $$\text{parent}_v := \bot$$
2. root $$s$$ sends $$1$$ to all neighbors
3. For all other nodes $$u$$. When receive message $$x$$ with $$x < d_u$$ from neighbor $$v$$:
• set $$d_u := x$$
• set $$\text{parent}_u := v$$
• send $$x + 1$$ to all neighbors execpt $$v$$
• time complexity: $$rad(G, s) = O(\text{diam}(G))$$
• message complexity: $$O(|E| * |V|)$$

### Distributed BFS Tree Construction

#### Synchronous

• Time: $$O(\text{diam}(G))$$
• Messages: $$O(|E|)$$

⇒ Optimal!

#### Asynchronous

Time Messages
Distributed Dijkstra $$O(\text{diam}(G)^2)$$ $$O(E + V * \text{diam}(G))$$
Distributed Bellman-Ford $$O(\text{diam}(G))$$ $$O(E * V)$$
Best known trade-off $$O(\text{diam}(G) * log^3V)$$ $$O(E + V * log^3V)$$

Best known trade-off based on synchronizers

### Synchronizers

Synchronous algorithms are simple, but systems asynchronous. Idea: run synch. algorithm in async. systems. The synchronizer simulates rounds at all nodes. Important: local schedules are the same as in sync. exec.

#### Simple local synchronizers

• node $$u$$ generates clock pulses to start each new round
• before starting $$r$$, $$u$$ needs to make sure that all messages of round $$r - 1$$ have been received.
• after starting round $$r$$, $$u$$ sends all messages of round $$r$$.

To detect if all messages of current round received: always send a message (dummy message).

• algorithm correctly allows to run a synch. algo. in an async. system.
• if all nodes start simulation at $$t = 0$$, time complexity for $$R$$ rounds is $$R$$.
• total number of dummy messages to simulate $$R$$ is at most $$O(R * |E|)$$

#### Synchronizer $$S$$

• time complexity $$T(S)$$ is time for simulating one round
• message complexity $$M(S)$$ is messages needed for simulating one round

For simple synchronizer: $$T(S) = 1$$, $$M(S) = 2|E|$$. Other trade-offs possible!

#### BFS Tree with synchrozier

• time complexity: $$O(\text{diam}(G))$$
• message complexity: $$O(\text{diam}(G) * |E|)$$

Slightly better than distributed Bellman-Ford. Best BFS algorithm is based on best known synchronizer

How to pick root node in distributed system? Example: pick node with smallest ID.

For node $$u$$ store smallest known ID in variable $$x_u$$.

1. Initial step: $$u$$ set $$x_u := \text{ID}_u$$ and sends $$x_u$$ to all neighbors
2. When receiving $$x_v < x_u$$ from neighbor $$v$$:
• $$x_u := x_v$$
• send $$x_u$$ to all neighbors except $$v$$
• time complexity: $$O(\text{diam}(G))$$
• message complexity: $$O(|V| * |E|)$$

Problems: message complexity is high, when/how to terminate! Can both be solved at some cost.

## Chapter 4: Causality, Logical Time and Global States

### Logical Clocks

Assign a timestamp to all events in an asynchronous message-passing system

• give the nodes notion of time
• logical clock values: numerical values, increase over time, consistent with observable behavior
• objective is not clock synchronization

### Causal Shuffles

A schedule $$S'$$ is a causal shuffle of schedule $$S$$ iff $\forall v \in V: S|v = S'|v$

Observation: If $$S'$$ is causal shuffle of $$S$$, no node can distringuish between $$S$$ and $$S'$$

### Causal Order

Logical clocks are based on causal order of events: Event $$e$$ should occur before $$e'$$ if it provably occurs before $$e'$$. Clock value $$t_e < t_{e'}$$ !

• casual shuffe ⇒ no node can distinguish
• no casual shuffle ⇒ some node can distinguish

*Event $$e$$ provably occurs before $$e'$$ iff $$e$$ appars before $$e'$$ in all causal shuffles of $$S*$$

### Lamport’s Happens-Before-Relation

MPS: only send and receive events.

Two events $$e$$ and $$e'$$ occurring at $$u$$ and $$u'$$. $$t$$ and $$t'$$ are times when $$e$$ and $$e'$$ occur.

$$e$$ provably occurs before $$e'$$ if:

1. The events occur at the same node and $$e$$ occurs before $$e'$$
2. Event $$e$$ is a send event and $$e'$$ the recv. event of the same message
3. The is an event $$e''$$ for which we know provably that $$e$$ occurs before $$e''$$ and $$e''$$ occurs before $$e'$$

Def: happens-before-relation: $$\Rightarrow_s$$ is pairwise relation on send/receive events of S. Contains:

1. All pairs $$(e, e')$$ where $$e$$ precedes $$e'$$ in $$S$$ and event of same node
2. All pairs $$(e, e')$$ where $$e$$ is send event and $$e'$$ receive event for the same message
3. All pairs $$(e, e')$$ where there is a third event $$e''$$ such that $e \Rightarrow_s e'' \wedge e'' \Rightarrow_s e'$

### Happens-Before and Causal Shuffles

Theorem: Given a schedule $$S$$ and two events $$e$$ and $$e'$$, the following statements are equivalent:

• $$e$$ happens before $$e'$$ ($$e \Rightarrow_s e'$$)
• $$e$$ precedes $$e'$$ in all causal shuffles $$S'$$ of $$S$$.

### Lamport Clocks

1. Each event $$e$$ gets a clock value $$\tau(e) \in \mathbb{N}$$
2. If $$e$$ and $$e'$$ are events at the same node and $$e$$ precedes $$e'$$ then $\tau(e) < \tau(e')$
3. If $$s_M$$ and $$r_M$$ are send and receive events of some msg. $$M$$, then $\tau(s_M) < \tau(r_M)$
• If $$\tau(e)$$ satisfies 1., 2. and 3.: $e \Rightarrow_s e' \rightarrow \tau(e) < \tau(e')$
• partial order defined by $$\tau(e)$$ is superset of $$\Rightarrow_s$$

#### Algorithm

1. initial counter at node $$n$$ is $$c_u = 0$$
2. For any non-recieve event $$e$$ at node $$u$$: $$c_u := c_u + 1$$; $$\tau(e) := c_u$$
3. For any send event $$s$$ at $$u$$, attach value of $$\tau(s)$$ to message
4. For any receive event $$r$$ at $$u$$ (corresp. send event $$s$$) compute: $c_u := max\{c_u, \tau(s)\} + 1; \tau(r) := c_u$

#### Discussion

• Advantage: no change of protocol

### Neiger-Toueg-Welch Clocks

Idea:

• nodes have approx. knowledge of real time (→ clock sync!)
• increase their clock value periodically
• combine with lamport clock ideas
• when receiving a message with time stamp larger than current local value, wait with processing message

### Fidge-Mattern Vector Clocks

• Lamport clocks: superset of happens-before rel.
• Logical clock to get $$\Rightarrow_s$$ exactly?

• Each node $$u$$ maintains a vector clock $$VC(u)$$ of clock values ($$\forall v. v \in V : VC_v(u)$$)
• In $$VC(e)$$ assigned by $$u$$ to some event $$e$$ happending at $$u$$, the component $$x_v$$ corresponding to $$v \in V$$ refers to number of events at node $$v$$, $$u$$ knows about when $$e$$ occurs.

#### Algorithm

1. All noes keep vector $$VC(u)$$ with an entry for all nodes $$V$$ initialized to $$0$$.
2. For all non-receive event $$e$$ at $$u$$: $VC_u(u) := VC_u(u) + 1; VC(E) := VC(u)$
3. For any send event $$s$$ at node $$u$$, attach $$VC(s)$$ to $$s$$ message
4. For recieve event $$r$$ at $$u$$ (corresp. send event $$s$$) compute: $\forall v \neq u: VC_v(u) := max \{VC_v(s), VC_v(u)\}; VC_u(u) := VC_u(u) + 1; VC(r) := VC(u)$

#### Vector Clocks and Happens-Before

$VC(e) < VC(e') \leftrightarrow e \Rightarrow_s e'$

### Local Clock vs. Synchronizers

Clock pulses generated by synchronizer can also be seen as logical clocks.

Properties:

• superset of happens-before relation
• requires drastic changes of protocol and its behavior
• heavy-weight method to get logical clock values

### Consistent cuts (Global State)

It is not possible to record global state at any given time of execution. Therfore, best option is to find a global state which could have been true at some time. ⇒ Global Snapshot

Cut: Given a schedule $$S$$, a cut is a subset $$C$$ of the events of $$S$$ such that for all nodes $$v \in V$$ the events in $$C$$ happening at $$v$$ form a prefix of the sequence of events in $$S|v$$.

Consistent Cut: Given a schedule $$S$$, a consistent cut $$C$$ is a cut such that for all events $$e \in C$$ and for all events $$f$$ in $$S$$ it hols that $f \Rightarrow_s e \rightarrow f \in C$

Given a schedule $$S$$, a cut $$C$$ is consistent cut iff for each message $$M$$ with send event $$s_M$$ and receive event $$r_M$$, if $$r_M \in C$$ then it also holds that $$s_M \in C$$.

### Consistent Snapshot

A consistent snapshot is a global system state which is consistent with all local views.

• Global System State: A vector of intermediate states (in $$S$$) of all nodes and a description of the messages currently in transit
• Consistent Snapshot: A global system state which is an intermediate global state for some causal shuffle of $$S$$

### Computing a Consistent Snapshot (with Logical Clocks)

• Assume that each event $$e$$ has a clock value $$\tau(e)$$ such that for two events $$e$$, $$e'$$: $e \Rightarrow_s e' \rightarrow \tau(e) < \tau(e')$
• Given $$\tau_0$$, define $$C(\tau_0)$$ as set of events $$e$$ with $$\tau(e) \leq \tau_0$$

Claim: $$\forall \tau_0 \geq 0 : C(\tau_0)$$ is consistent cut.

#### Algorithm

Goals: Capture consistent snapshot in running system.

1. Node $$s$$ records its state
2. When node $$u$$ receives a marker message from node $$v$$:
• if $$u$$ has not recorded its state then
• $$u$$ records its state
• $$u$$ starts recording messages on all incoming chanels
• else
• $$u$$ stops recording messages on incoming channel from $$v$$ to $$u$$
• set of msg. in transit from $$v$$ to $$u$$ is set of recorded messages
3. after node $$u$$ records its state:
• node $$u$$ sends marker message on all outgoing channels (before any other message)

## Chapter 5: Clock Synchronization

Lead question: how to get synchronized clocks?

logical time (“happens-before”) vs. physical time

### Propertes of clock sync. algorithm

• external vs internal sync:
• internal: sync all nodes to common time
• external: sync all nodes to external clock source
• one-shot vs. continuous sync
• online vs. offline time information
• global vs. local sync
• accuracy vs. convergens time and byzantine nodes

### UTC

• atomic clock, almost no drict, getting smaller and more engery efficient
• access via radio clock signal or GPS
• real time clock (BM PC)
• HPET (High precision event timer)

Clock drift. Deviation from the nominal rate dependent on power supply, temerature, etc. TinyNodes hav max. drift of 30-50 ppm (50µs per second)

### NTP

Clock sync via UDP. Packet delay is estimted to reduce clock skew.

#### Propagation delay estimation

Measure Round-Trip time (RTT). $$t_1$$ is request time on A, $$t_2$$ is request received time on B, $$t_3$$ is answer time on B, $$t_4$$ is answer received time on A.

Delay: $\delta = \frac{(t_4 - t_1) - (t_3 - t_2)}{2}$ Clock skew: $\Omega = \frac{(t_2 - (t_1 + \delta)) - (t_4 - (t_3 + \delta))}{2} = \frac{(t_2 - t_1) + (t_3 - t4)}{2}$

#### Message jitter

Various sources of errors (deterministic and non-deterministic). Fix by timestamping packets at MAC layer.

### PTP

Precision time protocol (PTP), similar to NTP.

• commodity network hardware can assist in time sync by timestamping PTP packets at MAC layer
• Packet delay is only estimated on request, one packet needed for synchronization!

### Hardware clock distribution

Synchronous digital circuits require all components to act in sync. The bigger the clock skew, the longer the clock period. Optimize routing, insert buffers.

### Clock Sync. tricks in wireless networks

• reference broadcast sync (RBS) - Synch atomic clocks. Sender synchs a set of clocks
• time-sync protocol for sensor networks (TPSN) - Network time Protocol. Estimating round trip time to sync more accurately
• flooding time sync protocol (FTSP) - Precision time protocol. Timestamp packets at MAC layer to improve accuracy

### Best tree for tree-based clock sync?

Finding a good tree for clock synch is a tough problem. Find spanning tree with small (maximum or average) strech. Hard problem with approx. algorithms.

### Clock sync tricks (GTSP)

• Sync with all neighboring nodes. Broadcast periodic time beacons
• go to average clock value/rate of all neighbors
• try to adapt to clock rate differences

Results in all nodes consistenly averaging errors to all neighbors. In Tree-like algorithms (e.g. FTSP) we can get a bad local skew.

### Theory of clock sync

• Given: communication network
1. each node with hw clock with drift
2. message delays with jitter
• Goal: synchronize clocks (“logical clocks”). Global and local sync!

• Time (logical clocks) should not be allowed to stand still or jump.
• Logical clocks should always move forward. Sometimes faster, sometimes slower, minimum and maximum speed. As close to correct time as possible.

### Formal model

• Hardware Clock $$H_v(t) = \int_0^t h_v(\tau)d\tau$$ with clock rate $$h_v(t) \in [1 - \rho, 1+ \rho]$$
• Logical clock $$L_v(t)$$ increases at rate at least $$1 - \rho$$ and at most $$\beta$$
• Message delays $$\in [0, 1]$$
• Goal: update logical clock according to hardware clock and messages from neighbors

### $$A^{\text{max}}$$ algorithm

1. set local clock to maximum clock value received from any neighbor
2. if recv. value $$>$$ previously forwarded value, forward immediately
3. at least forward local logical clock value every $$T$$ time step.
• Global skew (max. diff. between two clock values) at most $$(1 + \rho) * D + 2\rho * T$$ ($$D$$: diameter). Can be $$D$$
• Local skew can also be $$D$$

#### Problems:

• global and local skew can both be $$\Omega(D)$$
• clock values can jump ($$\beta = \infty$$)

#### Can we do better?

• make clock continuous
• global skew cannot be improved
• local skew can be improved, however (simple ideas don’t work, $$\Omega(1)$$ is not possible)

#### Skew lower bounds

• The global skew guarantee of any clock sync is at least $$\frac{D}{2}$$.
• Lower bound skew is $$\Omega(log_{\frac{\beta - 1}{p}} D)$$

## Chapter 6: Consensus

### Consensus

#### Setting

• $$n$$ nodes $$v_1, v_2, ..., v_n$$
• each node has input $$x_1, x_2, ..., x_n \in \mathbb{D}$$
• each (non-failing) node computes output $$y_1, y_2, ..., y_n \in \mathbb{D}$$

#### Agreement

Output of all non-failing nodes are equal

#### Validity

If all inputs are equal to $$x$$, all outputs are equal to $$x$$

Depends on failure model

#### Termination

All non failing processes terminate after finite number of steps

#### Remarks

• two generals problem is variant of $$n$$-node, binary consensus
• model: synchronous communication, messages can be lost
• validity: if no messages are lost, and both nodes have the same input $$x$$, $$x$$ needs to be the output
• problem can not be solved in the setting

### Shared Memory

• $$n > 1$$ processors
• shared memory accessed simultaneously by multiple processes
• processors can atomically read or write (not both) on shared memory cell

#### Protocol

• memory cell $$c$$
• initially $$c$$ is in special state ?
• processor 1 writes $$x_1$$ into $$c$$, then decides on $$x_1$$
• processor $$j \neq 1$$ reads $$c$$ until $$j$$ reads something else than ? and then decides on that

#### Probems

• unexpected delay
• heterogeneous architectures: the slowest one slows down the whole system
• no fault-tolerance

### Computability

Def: Usually means turing-computable ⇒ strong math. model

Shared-memory computability: Model of async concuccrent computation. Wait-free computable on multiprocessor. Wait-free?

### Wait-free Shared Model

• $$n > 1$$ processors
• processors can atomically read or write (not both) on shared memory cell
• Processors might crash/stop/become very slow

*Wait-free* : Every process completes in finite number of steps, no locks, we assume that we have wait-free atomic registers.

Every processor does:

r = read(c);
if (r == "?") {
write(c, x_i); decide(x_i);
} else {
decide(r);
}

• Wait-free computation is a tree
• Bivalent system states: outcome not fixed
• Univalent states: Outcome is fixed, might not be known yet. 1-valent and 0-valent states.

• It is impossible to implement a two-dequeuer wait-free FIFO queue with read/write shared memory.
• Proof by reduction, important beyond NP-completeness

• $$n > 1$$ processors
• Wait-free implementation
• processors can atomically read AND write on shared memory cell

#### Formally

Cell $$c$$ and function $$f$$. Method call: Replace value $$x$$ of cell $$c$$ with $$f(x)$$. Return value $$x$$ from cell $$c$$

### Consensus Number

An object has consensus number $$n$$ if it can be used to implement $$n$$-process consensus, but not ($$n + 1$$)-process consensus

Example: Atomic read/write registers have consensus number $$1$$. Works with $$1$$ process, impossibility with $$2$$.

#### Theorem

If you can implement $$X$$ from $$Y$$ and $$X$$ has the consensus number $$c$$ then $$Y$$ has consensus number at least $$c$$. ⇒ Useful way of measuring synchronization power

Alternative:

• If $$X$$ has consensus number $$c$$
• And $$Y$$ has consensus number $$d < c$$
• Then there is no way to construct a wait-free implementation of $$X$$ by $$Y$$

A RMW is non-trivial if there eists a value $$v$$ such that $$v \neq f(v)$$:

• Test&Set, Fetch&Inc, Fetch&Add, Swap, Compare&Swap, general RMW

Any non-trivial RMV object has consensus number at least $$2$$. Implies no wait-free implementation of RMW registers from read/write registers.

#### Interfering RMW

Let $$F$$ be a set of functions such that for all $$f_i$$ and $$f_j$$ either the commute or they overwrite. Any such of RMW objects has consensus number of exactly $$2$$

### Synchronous System

One can sometimes tell if a processor had crashed (timeouts, broken connection)

#### Simple consensus algorithm

Each process:

2. decide on the minimum of all received values

Problem: no consensus if a processor fails

#### $$f$$-Resilient consensus algorithm

If an algorithm solves consensus for $$f$$ failed processes, we say it is an $$f$$-resilient consensus algorithm.

Each process:

2. round 2 to round $$f + 1$$: broadcast minimum of received values unless it has been sent before.
3. end of round $$f + 1$$: decode on the minimum value received

$$f = 2$$ failures, so $$f + 1$$ rounds needed

Theorem: If at most $$f \leq n - 2$$ of $$n$$ nodes of a synchronous message passing system can crash, at least $$f + 1$$ rounds are needed to solve consensus.

### Byzantine Failures

A node can not only crash and stop forever, but it may also recover or are damaged (different processes receive different values)

Theorem: There is no $$f$$-resilient Byzantine consensus algorithm for $$n$$ nodes for $$f \geq \frac{n}{3}$$.

### Simple byzantine agreement algorithm

• Round 1: exchange all values
• Round 2: exchange received info
• Round 3: exchange all received info again (and figure out that $$q$$ gave inconsistent information about $$p$$)

Works for any $$f$$ and $$n > 3f$$, which is optimal. Only takes $$f + 1$$, optimal for crash failures. Works for any input, not just binary input.

Problem: not easy to formalize, size of the message increases exponentially.

### Queen Algorithm

• Simple Byzantine agreement that uses small messages
• solves consensus with $$n$$ nodes and $$f$$ failures where $$f < \frac{n}{4}$$ in $$f + 1$$ phaes.

#### Algorithm:

• Round 1: Broadcast own value, set own value to value that was received most often.
• if own value appears $$> \frac{n}{2} + f$$ times, support value
• else: do not support any value
• Round 2: The queen broadcasts its own value
• if not supporting any value, set own value to queen’s value

• nodes only exchange current values, small messages
• works for any input and not just binary input

• algorithm requires $$f + 1$$ phases consisting of $$2$$ rounds each. Twich as much as optimal algorithm!
• only works with $$f < \frac{n}{4}$$ Byzantine nodes

### King Algorithm

• Tolerates $$f < \frac{n}{3}$$ byzantine failures and uses small messages.
• also takes $$f + 1$$ phases which each consists of $$3$$ rounds

#### Algorithm:

• Round 1: broadcast own value
• Round 2
• If some value $$x$$ appreas $$\geq n - f$$ times
• If some proposal received $$> f$$ times
• set own value to this proposal
• Round 3
• The king broadcasts its value
• if own value received $$< n - f$$ proposals, set own value to king’s value

• Works for any $$f$$ and $$n > 3f$$, which is optimal
• Messages are small
• Works for any input not just binary input

• Algorithm requires $$f + 1$$ phases consisting of $$3$$ rounds each.

### B.A. Using athentication

• reach consensus using authenticated messages
• condition: if a node never sends a message $$m$$, then no correct node ever accepts $$m$$

#### Algorithm:

• if I am $$v$$ and own input is $$1$$
• $$value := 1$$
• broadcast $$v$$ has $$1$$
• else
• $$value := 0$$

In each round $$r \in \{1, ..., f + 1\}$$:

• if $$value = 0$$ and accepted $$r$$ messages $$v$$ has $$1$$ in total including a message from $$v$$ itself
• $$value := 1$$
• broadcast $$v$$ has $$1$$ plus the $$r$$ accepted messages that caused the local value to be set to $$1$$.

After $$f + 1$$ rounds: decide value

• works for any number of byzantine nodes
• only takes $$f + 1$$ rounds ⇒ optimal
• small messages (sub-exponential length)

• If $$v$$ is Byzantine, nodes may agree on a value that is not in the original input
• Only works for binary input
• requires authenticated messages

### Randomized algorithm

• so far, mainly consensus in synchronous systems. Reason: no deterministic algorithm can guarantee consensus in asynchronous systems even if only one process may crash.
• Solve consensus in asynchronous system with randomization
• For samke of simplicity: input is bianry, and $$f < \frac{n}{9}$$ nodes byzantine.

#### Algorithm

• $$x := \text{own input}; r:= 0$$
• broadcast proposal($$x$$, $$r$$)

In each round $$r$$

1. Wait for $$n - f$$ proposals from round $$r - 1$$
2. If at least $$n - 2f$$ proposals have some value $$y$$
• $$x := y$$, decide on $$y$$
else if at least $$n - 4f$$ proposals have some value $$y$$
• $$x := y$$
else
• choose $$x$$ randomly with $$Pr[x = 0] = Pr[x = 1] = \frac{1}{2}$$
3. Broadcast proposal($$x$$, $$r$$)
4. If decided on value → stop

#### Discussion

Algorithm satisfies validity and agreement conditions, but not termination condition. Also: very slow

#### Shared coin

A shared coin is a random binary variable that is $$0$$ with constant probability and $$1$$ with constant probability. Used in randomized algorithm to speed it up.

Assume only crash failures, no byzantine failures!

##### Algorithm
• Set local coin $$c_i := 0$$ with probabilty $$\frac{1}{n}$$, else $$c_i := 1$$
• Broadcast $$c_i$$
• Wait for exactly $$n - f$$ coins and collect all coints in the local coin set $$s_i$$
• Broadcast $$s_i$$
• Wait for exactly $$n - f$$ coin sets
• If at least one coin is $$0$$ among all coins in the coins set
• return $$0$$
• Else
• return $$1$$
##### Discussion

Subroutine terminates, at least $$\frac{1}{3}$$ of all coins are seen by everybody. There are at least $$f + 1$$ coins in at least $$f + 1$$ coin sets

All processes decide $$0$$ with constant probabilty and $$1$$ with constant probabilty.

#### Summary

• takes constant number of rounds in expectation
• handle crash failures even if communication is async
• only works if there are $$f < \frac{n}{9}$$ crash failures. No byzantine nodes!
• only works for binary input.

Note: there is a constant expected time algorithm that tolerates $$f < \frac{n}{2}$$ crash failures

#### Byzantine and asynchronous

Can be solved, but message size remains polynomial and message complexity $$O(n^3)$$. Does not use global coin, but tries to detect biased local coinflips from Byzantine processes.

## Chapter 8: Distributed System Architectures

### Client-Server

#### One tier

• Client
• Server: resource management, application, presentation

#### Two tier

• Client: presentation
• Server: resource management, application

OR

• Client: presentation, application
• Server: resource management

#### Three tier

• Client: presentation
• Application Server: application (middleware)
• Server: resource management

### Multiprocessor Architectures

• high-performance by parallel data management, …
• high-availability (replication)
• extensibility to add processing and storage power

### Shared memory

• Any processor has access to any memory module or disk unit through a fast interconnect. All processors are under the control of a single operating system
• Meta-Information and constrol information can be shared
• high cost: complex hardware required for interlinking of processors and memory modules
• limited extensibility: faster processors will still result in conflicting access to shared-memory
• low availability: memory fault affects all

### Shared Disk

• low cust
• easy migration
• availability: memory faults are isolated
• high complexity: distributed data base system protocols required
• cache consistency
• performance: disk bottleneck

### Shared-Nothing

• high complexity: distributed data base system protocols required

• Interprocess/inter-node Communication: Low-level API (send, receive)
• RPC
• Indirect communication
• Group communication
• Pub/Sub
• Message Queues
• Distributed Shared Memory

## Chapter 9: Reliability

• measure of success with which a system conforms to authoriative specification of behavior
• probability that the system does not experience failures within a given period

Availability: probabilty for a server to be read/to serve: $\frac{\text{MTBF}}{\text{MTBF} + \text{MTTR}}$ MTBF: mean time between failure MTTR: mean time to repair

⇒ fast recovery is key to high availability

### Protocols:

• Local (ARIES): Write-ahead Logging, repeating history on crash
• Distributed Reliability: Commit Protocols, Termination Protocols, Recovery Protocols

### Three types of failures:

• transaction failure
• system failure
• media failure

### Types of Storage

• volatile storage: buffer and main memory
• non-volatile storage: hard disk or SSD
• stable storage: non-volatile storage that survives all types of failures

Interactions:

• Can modified pages be written to disk even if there is no commit (Steal)
• Can we delay writing modified pages after commit (No-Force)

*Steal+No-Force* : Improves throughput and latency, but makes recovery more complicated.

### ARIES Algorithm

*A*lgorithm for *R*ecovery and *I*solation *E*xploiting *S*emantics

• Better alternative to shadow paging
• Works with seal and no-force
• Data pages updated in place
• Uses “logging”:
• Log: Ordered list of REDO/UNDO actions
• Record REDO and UNDO for every update
• Sequential writes to log
• Minimal info written to log

#### Principles of ARIES

1. Write-Ahead Logging: Record database changes before actual change
2. Repeating History During Redo: After crash, bring system back to the exact state at crash time, undo active transactions
3. Logging Changes During Undo: Log changes during transaction undo so that they are not repeated in case of repeated failures

#### WAL

Any change to a database object ist first recorded in the log, which must be written to stable storage before the change itself is written to disk.

• To ensure atomicity and prepare for undo, write undo to stable storage before a page update is written to disk
• To ensure durability, redo info must be written to stable storage at commit time
• LSN: Log Sequence NUmber, Monotonically increasing
• Type: Begin, Commit, Abort, Update, Compensaion
• TID: Transaction Id
• PrevLSN: Previous LSN of the same transaction
• PageID: Page which was modified
• NextLSN: Next LSN of same transaction
• Redo Information
• Undo Information

Redo Information: ARIES does page-oriented redo, stores byte images of pages before and after modification.

Undo Information: ARIES assumes logical undo, record the actual tuple changes

#### Writing Log Records

• For performance, write to volatile storage first
• Log is forced to stable storage up to certain LSN
• Committed transation = all log recods are on stable storage

During normal transaction processing keep in each transaction control block:

• LastLSN: LSN of last log record for this transaction
• NextLSN: LSN of next log record to be processed during rollback
• When update on page $$p$$ is performed, a log record $$r$$ is written to WAL and LSN of $$r$$ is recorded in page header of $$p$$

#### ARIES Transaction Rollback

• Start the undo operation ath log entry pointed to by the Next fied in transaction control blokc of T
• Find remaining log entries for T by following Prev and Next fields in log
• Perform changes in undo part of log entry

Undo operations must be recorded to WAL. Use compensation log records for this purpuse. Never undo an undo action, but might need to redo undo action.

#### ARIES Crash Recovery

1. Analysis Phase
• find active transactions where failure happened (“losers”)
2. Redo Phase
• Replay the log to bring the system into state as of the time of system failure
• restore the losers
3. Undo Phase
• Roll back all loser transactions

### Commit coordination and consensus

Set of independent servers with transactions spanning several servers, i.e. subtransactions at some sites. For successfull commit, all subtransaction have to be committed.

• Nodes: local state, needed for protocol (distributed database)
• Node: fail-stop, fail-recover, extension to byzantine failures possible
• Async communication
• Network may see temporary interruptions and partitions

Approach: Commit protocols. Fundamental approach is to rely on leader/coordinator. Leader proposes value or client talks to leader, participants decide and inform coordinator.

Algorithms:

• 2PC: Simple, effective, but can degrate into blocking behavior
• 3PC: Reduce period of vulnerability spread decision knowlege, but unsafe in presence of network splits
• Paxos, Multi-Paxos, Paxos commit: generalized, safe, nonblocking, may not terminate
• Raft: same goals as Paxos, but simpler to understand and implement

### 2-Phase-Commit (2PC)

1.

• Coordinator sends vote-request to participants
• Participant: When receiving a vote-request, return either vote-commit oder vote-abort

2.

• Collect vote, if al vote-commit send global-commit otherwise global-abort
• All participants wait on global-* and react accordingly

#### Decentralized variant

• Phase 1: Coordinator sends, depending on its vote vote-commit or vote-abort to participants
• Phase 2a: Participant received vote-abort from coordinator, it aborts. Otherwise it has received vote-commit and returns either commit or abort to coordinator and all other participants. If it sends abort, it aborts its local computation
• Phase 2b: After having received all votes, coordinator and all participants have all votes available, if all are / commit/ the commit, otherwise abort.

### 3-Phase-Commit (3PC)

Unblock 2PC by spreading decision knowledge

• Phase 1a: Coordinator sends vote-request to participants
• Phase 2b: Participant: On vote-request return vote-commit or vote-abort. On vote-abort, abort local computation
• Phase 2a: Collect all votes, if all are vote-commit send prepare-commit, otherwise send global-abort and halt.
• Phase 2b: Each participant that voted vote-commit waits for prepare-commit or / global-abort/. On prepare-commit, reply ready-commit.
• Phase 3a: Coordinate waits until all participants have sent ready-commit and then sends global-commit to all
• Phase 3b: Participant waits for global-commit and then commits.

Without network splits all would be good, but with network splits two new coordinators could arrise that arrive at different values.

### Paxos

Safe, nonblocking, making progress if possible, resilient to failures, delays and network partitions

Ideas:

• Combine leader election with consensus protocol. Roles may change on the fly, safe with multiple leaders.
• Acceptance majority: $$F + 1$$ (out of $$2F$$) voters agree
• Numbered sequence of proposals

#### Roles

• Client: issue a request, await response
• Acceptor / Voter: Work in quorums, vote on requests. ⇒ Fault-tolerant memory
• Proposer / Coordinator: Tries to convince acceptors that request is o.k.
• Learner: replicator, take action when request has been granted
• Leader / Special Coordinator: distinguished proposer, resolves conflict of multiple leaders

#### Basic Idea

• One (or more) node(s) decide(s) to be coordinator/proposer
• Proposes a value and requests acceptance from other nodes
• If it fails, it wil try again
• Separate agreement on value from leader acceptance

#### Dealing with multiple proposals

• Use globally ordered proposal numbers (site-id:local-number)
• Only accept the most recent proposal
• acceptors will keep track on the highest proposal number accepted
• on acceptance, tell the new proposer the previously accepted value

#### Algorithm

• Phase 1a (Prepare): Leader selects proposal number $$n$$ and sends a prepare message to quorum of acceptors
• Phase 1b (Promise):
• If the proposal number $$n$$ is larger than any previous proposal:
• each acceptor promises not to accept proposals with a lower proposal number
• sends a promise message including proposal number and value
• otherwise acceptor sends denial
• also each acceptor sends the value and number of its last accepted or promised proposal to proposer
• Phase 2a: Accept
• If proposer receives (positive) responses from quorum of acceptors
• choose value to be agreed upon
• value must be from values of acceptors that have already accepted a value
• otherwise can choose any value
• Send accept message to quorum of acceptors including the chosen value
• Phase 2b: Accepted
• If acceptor received accept message for most recent proposal it has promised
• accept value
• send accepted message to proposer and learner
• otherwise: send denial and the last proposal number and value it has promised to accept

• Paxos is safe in presence of multiple leaders
• Not guaranteed to make progress (livelock)
• tradeoff in election: agressive (livelock), reluctant (protocol stalled)
• Partial solution: use leader election timeout

#### Multi-Paxos

Optimize Paxos regarding message complexity.

• First round can be skipped if proposer stays the same
• Only the proposer is allowed to skip the 2nd round who succeded in the 1st round

#### Paxos Commit (Idea)

• Paxos ensures consesus over a quorum
• Idea: run Paxos instance on each participant on the transacton
• Commit possible if all participants manage to achieve consensos on commit
• 5 message delays
• $$F + 1$$ acceptors, $$N$$ participants
• Number of messages $$(N * F + F + 3N - 1)$$
• RM sends BeginCommit message to leader
• leader sends prepare message to every other RM ($$N - 1$$)
• RM sends ballot $$0$$ phase 2a Prepared message for its instance of Paxos to $$F + 1$$ acceptors ($$N(F + 1)$$ messages)
• For each RM’s instance of Paxos, an acceptor responds to a phase 2a message by sending a phase 2b prepared message to leader. ($$F + 1$$ messages, bundlin!)
• Single commit to each RM ($$N$$ messages)
• Co-locate acceptors with resource managers
• $$F = 0$$ then 2PC

## Chapter 10: Distributed Concurrency Control

Storing copies of data on different nodes enables availability, performance and reliability. Data needs to be consistent.

### Page model

All data is mapped to read and write operations on pages. Inspect interleavings of resulting page operations to study concurrent execution.

Set of transactions: $$T = \{T_1, ..., T_n\}$$ Sequence of $$R$$ and $$W$$ actions over objects $$A$$, $$B$$, …

### History and Schedule

A transaction is ordered and all write operations can potentially depend on previously read inputs.

A history is a set of complete transactions (begin, commit or abort). Schedule of a history is a prefix of a history. Schedule is serial if it is not interleaved.

#### Serializability

A schedule is called (conflict-)serializable if there exists a (conflict-)equivalent serial schedule over the same set of transactions. Create a conflcit graph (Edge for WR, RW, WW) and if it is acyclic schedule is serializable.

A transaction manager enforces certain transaction behaviour to exclude non serializable schedules.

Attempt: 2PL. Every schedule according to 2PL is serializable, hower not every serializable schedule can be produced by 2PL and deadlocks may occur.

### Local and global schedules

Partial order on OP with following properties:

• each data item is read and written by $$T$$ at most once.
• If $$p$$ is read action and $$q$$ write action of $$T$$ and both access same data item then $$p < q$$

Problem: transactions may seem serializable at local sites, but not on global. How to decide it locally observable sequences imply globally serializable schedule?

As we do not have replication of data items, whenever there is a conflict in a global schedule, the same conflict must be part of exactly one local schedule. Conflict graph of global schedule is given union of conclict graphs of respective local schedules. Either all or no corresponsing global schedule is serializable.

### Types of federation

• homogeneous federation: same services and protocols at all servers. Characterized by distribution transparency. The outside world can not observe distribution.
• heterogenous federation: Servers are autonomous and independent of each other; no uniformity of services and protocols across federation.

Interface to recovery: Every global transaction runs the 2-phase-commit protocol. By that protocol the subtransactions of a global transaction synchronize such that either all subtransactions commit, or none of them.

### Homogeneous Concurrency Control

#### Primary Site 2PL:

• one site is selected at which lock maintenance is performed exclusively
• this sites thus has global knowledge and enforcing the 2PL rule for global and local transactions is possible
• lock manager simply has to refuse any futher locking of a subtranction $$T_{ij}$$ whenever a subtransaction $$T_{ik}$$ has started unlocking already
• Much communication can result in bottleneck at primary site

#### Distributed 2PL

• When a server wants to start unlocking data items on behalf of a transaction, it communicates with all other servers regarding lock point of the other respective subtransaction
• server has to receive locking completed message from each of these servers
• implies extra communication between servers

#### Distribtued Strong 2PL

• every subtransaction of global transaction and every local transaction holds locks until commit
• 2-phase-commit protocol the 2PL-rule is enforced as side-effect

• Centralized detection
• time-out based detection
• edge chasing
• path pushing

### Serializability by assigning timestamps to transactions

• global and local transactions are timestamped; all subtransactions of transaction obtain same timestamp
• timestamps must be system-wide unique and based on synched clocks
• to be system-wide unique, timestamps are values of local clocks concatenated with site ID

#### Time stamp protocol TS

• each transaction $$T$$ has unique timestamp $$Z(T)$$ when it started
• A transaction $$T$$ must not write an object which has been read by any $$T'$$ where $$Z(T') > Z(T)$$
• A transaction $$T$$ must not write an object which has been written by any $$T'$$ where $$Z(T') > Z(T)$$
• A transaction $$T$$ must not read an object which has been written by any $$T'$$ where $$Z(T') > Z(T)$$

Can be combined with Lamport clocks.

### Lock-based vs timestamp-based approaches

• Both guarantee serializability
• Timestamp-based are deadlock-free but risk restarts
• All approaches are pessimistic, check if an operation is possible, then execute it

### Heterogeneous Concurreny Control

• each server runs local transaction manager which guarantees local serializability
• global transaction manager controls execution of global transaction. Ordering the commit of transactions, or introduction artificial data objects called tickets which have to be accessed by subtransactions

#### Rigorous schedules

A local schedule $$S$$ of a set of complete transaction is rigorous if for all involved transactions $$T_i$$, $$T_j$$ there holds: Let $$p_j \in \text{OP}, q_i \in \text{OP}_i, i \neq j$$ such that $$(p_j, q_i) \in \text{conf}(S)$$. Then either $$a_j <_s q_i$$ or $$c_j <_s q_i$$

#### Commit-deferred transaction

A global transaction $$T$$ is commit-deffered if its commit action is sent by global transaction manager to the local sites of $$T$$ only after the local executions of all subtransactions of $$T$$ at that sites have been acknowledged.

#### A schedule is serializable whenever it is rigorous

Proof via contrary (not serializable)

#### Ticket-based concurrency control

• Each server guarantees serializable local schedules in a way unknown for global transactions
• Each server maintains a special counter as database object, called ticket. Each subtransaction of global transaction at that server increments (reads and writes) the ticket. Doing so we introduce explicit conflicts between global transactions running at same server.
• Global transaction manager guarantees that the order in which the tickets are accessed by subtransactions will imply linear order on global transactions

## Chapter 11: Replication and (weaker) consistency

• Reliable and high-performance computation on a single instance of a data object is prone to failure
• Replicate data to overcome single points of failure and performance bottlenecks

### Two replication dimensions:

• location of change: Either primary copy (only performed on single dedicated replica) or write anywhere
• propagation speed: at commit, all replicas contain change OR delayed

• Location of change:
• primary copy: simple synch
• write anywhere, flexible, no bottleneck
• Propagation speed
• immediate: strongly consistent, potentially long response times
• delayed: fast response, consistency problems

Method specific:

• primary + immediate: resource contention, strong consistency with simple implementation
• write anywhere + immediate: prone to distributed deadlocks
• primary + delayed: typically fast, outdated data
• write anywhere + delayed: fast, serializability not guarantieed

### Synchronous replication protocols

#### ROWA

• write change to all replicas
• read on any single replica
• expensive write coordination (2PC)
• low write availability (lower than w/o replication)

#### Primary Copy

• write change initially to single replicate
• propagate changes in bulk to other replicas
• coordination with read locks: request from primary
• reduce write cost

#### Quorum-Based Protocols

• Idea: Clients request and aquire permission of multiple servers before either reading or writing a replicated data item
• $$N$$ replicas, contact at least $$\frac{N}{2} + 1$$ servers to agree on update. Once agreed, all contacted servers process the update assigning new version to updated object
• For read, client must first contact at least $$\frac{N}{2} + 1$$ servers and ask them to send version number of local version. Will then read the replica with highest version number

Can be generalized to arbitrary read quorum $$N_R$$ and write quorum $$N_W$$ such that:

• $$N_R + N_W > N$$
• $$N_W > \frac{N}{2}$$

⇒ quorum consensus method

### CAP Theorem

From atomic data consistency, system availability and tolerance to network partition, only two can be achieved at the same time at any given time.

We can not avoide network partitions, so consistency and availability can not be achieved at same time.

• Distributed ACID-transactions: Consistency has priority
• Large-scale distributed systems: availability has priority

### Eventual consistency

• Specific form of weak consistency
• Guarantees: if no updates are made to object eventually all will return the last updated value

• Dirty writes
• Lost update
• Phantom
• Write skew

### Database consistency classes

• ANSI SQL: Prevents typical anomalies by locking
• Cursor Stability
• Snapshot Isolation

### DS Consistency classes

Session guarantees

• Monotonic reads: never return previous values
• Monotonic writes: writes in session appear in order

Sicky session guarantees

• PRAM: serial execution with session
• Causal consistenty/PL-2L

### Causes for unavailability

• Preventing lost updats: detecting competing writes needs coordination
• Preventing write skew: generalization of lost updates
• Recency guarantees: Network split may deplay process arbitrary long

C P A
Strong consistency See all prev writes A D F
Eventual consistency Subsect of prev writes D A A
Consistent prefix See init sequence of writes C B A
Bounded staleness See all “old” writes B C C
Monotonic reads See increasing subset of writes C B B
Read my writes See all writes performed by reader C C C