forked from yang/notes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Distributed.page
1755 lines (1507 loc) · 67.5 KB
/
Distributed.page
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
TODO phi accrual failure detector
TODO merge in notes from CS192, 6.829
TODO comparison of database replication techniques based on total order broadcast
TODO http://bytepawn.com/readings-in-distributed-systems/
TODO http://www.cs.jhu.edu/~jak/docs/paxos_for_system_builders.pdf
comparisons
- <http://grydz.com/grid/show/nosql-open-source-and-commercial-nosql-data-storage-systems>
- <http://spreadsheets.google.com/ccc?key=0Ale_YaCwKEUVclVFVFlrUWt5aWhQaGQ0OXVCMUl4Vmc&hl=en#gid=0>
- <http://en.wikipedia.org/wiki/NoSQL>
Background
==========
resources
- <http://marcua.net/notes/6.824/>
core
- distributed concurrency control: serializability
- distributed commit: agree to do something or abort
- consensus: agree on a value
- replication: for availability, durability
- paxos/viewstamped replication
- virtual synchrony (spread)
- CAP theorem's semantics
- assume partition of 5 nodes, with X,Y in non-quorum and Z in quorum; X
holds lock on resource, and Y and Z get request for resource
- if consistent & available, then Y can reach X to acquire
- if consistent & partition-tolerant, then Z can acquire, breaking X's lock
- TODO what if lock were on quorum side? TODO tie in with "req" vs "node"
failure
- consistency:
- one-copy serializability
- linearizability
- <http://pl.atyp.us/wordpress/?p=2521>
- <http://pl.atyp.us/wordpress/?p=2532>
- PACELC: if there is a partition (P) how does the system tradeoff between
availability and consistency (A and C); else (E) when the system is running
as normal in the absence of partitions, how does the system tradeoff between
latency (L) and consistency (C)
- consistency
- eventual consistency: stale reads (OOO if read from diff nodes), divergent
writes (clocks and conflicts)
- strict cons
- event cons: staleness
- monotonic read cons aka timeline cons: can be stale but always in-order
- RYOW aka RYW aka session cons: not nec implies (subset of) timeline cons
- causal cons: use version numbers (lamport)
- immediate cons
- event cons: multi-writer
- replication: quorums, primary-backup, master-master
- $R+W>N$; no guarantees unless joins/leaves incur synchronization barriers
- primary-backup: $R=1,W=2,N=3$; only need ack from majority (2), but write
to all 3; majority prevents divergence in case of partition
- quorums stronger guarantee: always read latest; PB read can be stale (if
read from a replica that hasn't applied update yet)
- PB gives in-order, since all updates serialized through primary
- quorums require app-specified ordering, eg cassandra's app timestamps,
since different clients write directly to their quorums (no global
ordering)
- with viewstamped replication, joins/leaves change views; this is when sync
should happen (to bring new node up to speed)
- but sync barriers are expensive, eg full merkle tree comparison or log
shipping
- dynamo uses sloppy quorums, bc trad'l strict quorums implies unavailability
on failures/partitions
- replicate to first $N$ healthy nodes, not first $N$ nodes; hence, the $N$
may change or be entirely different
- dynamo tries to spread quorum across datacenters
- cassandra claims to use strict quorums
- multi-master: requires dist commit and dist concurrency control or eventual
cons
- design patterns
- tombstones: for deletes in eventually consistent systems
- vector timestamps
- logical clocks
- happened-before relationship
- $a \rightarrow b$ if $a$ happened before $b$ in same process
- $a \rightarrow b$ if $a,b$ are the send, recv of a msg
- Lamport clocks/timestamps
- timestamp = (counter, process ID); all msgs have unique timestamp; total
order possible
- on msg send, first incr then stamp
- on msg recv, set counter = max(msg.counter, counter)
- $a \rightarrow b \implies L(a) < L(b)$ but not vice versa
- however, key utility: $L(a) \nless L(b) \implies a \nrightarrow b$
- vector clocks/timestamps
- all msgs include vector timestamp (snapshot of global vector clock)
- on update, increment own component
- on msg send, first incr own component
- on msg recv, incr own component and use component-wise max
- time is more advanced on receiver than sender
- $a \rightarrow b \iff V(a) < V(b)$
- version vectors: for syncing replicas
- after sync, both vectors are same: component-wise max
- only for detecting conflicts; resolving is still separate, hard problem
- eg commonly need to maintain complete history, e.g. with counters or
version control (ancestor info needed)
- easier cases include unioning sets
rsync
- directed (one-way) synchronization
- receiver sends rolling checksums of each non-overlapping block
- sender compares these with its checksums of every (incl. overlapping) block
- rolling checksums efficient to incrementally update
- if match, calculate and compare MD5
NAS/SAN
- SAN: block protocols, eg scsi, FC, iscsi, ataoe
- NAS: remote FS, eg CIFS, NFS; NAS heads can be atop SANs
MPI/PVM
- MPI
- lib for apps
- don't mandate thread-safe impls, but allow them
- high perf on high perf systems
- modular: all refs relative to a module, not entire program
- eg linear solver module must be restricted to a subset of processes
- process src/dst must be spec'd by rank in a group not absolute ID
- extensible
- heterogeneous computing (impls working together)
- well-defined behavior: no race conds/avoidable impl-specific behavior
- PVM
- also supporting heterogeneous computing and diff approach to extensibility
- provide portable, heterogeneous env for using clusters over TCP
MPI
- system objects: apps have handles to these
- _group_: set of $n$ processes; each process has a _rank_ in $[0,n)$; group
routines mainly for specifying processes for communicator ctor
- _communicator_: always associated 1:1 with a group; a "tag" passed in MPI
calls; `MPI_COMM_WORLD` represents all tasks
- organize tasks into task groups
industry
- cloudera
- consulting/training services
- products: hadoop distro, desktop gui, squool for rdbms import/export
- people
- doug cutting: hadoop/nutch/lucene founder; yahoo
- mike cafarella
- jeff hammerbacher: facebook data team; hive
- consistency
- strict: timestamp everything; impractical
- sequential: some total ordering that's consistent with the local orderings
each computer sees
- Merkle trees
- consistent hashing: when changing hash table width, impose minimal disruption
- eg, when adding, take a fair amount from each existing bucket
- chord ring is a way to implement this
- can replicate each node multiple times to make things smoother
- http://citeseer.ist.psu.edu/karger97consistent.html (Karger)
- gossip: decentralized but eventually consistent
general principles for reducing latency
- distributed caches
- CDN
- caching proxy server
- app-level (yslow, ajax)
- edge DNS accelerator
- zero copy
- no serialization
- load-balanced replicas
- TCP offload engine
TODO DSM
- http://www.cs.umd.edu/~keleher/bib/dsmbiblio/dsmbiblio.html
TODO consistency
- http://en.wikipedia.org/wiki/Consistency_model
coordination
- lamport clocks: total ordering; single values
- vector clocks: causal ordering; partial ordering; more information to eg
resolve conflicts; easy to move to total order (eg by appending node ids)
data grids
- data grids: general distributed utilities
- caches, in-mem data structures, txns, pub-sub/msging, membership
- oracle coherence, ibm websphere extreme scale/object grid, terracotta,
gigaspaces, gemstone, jbosscache/jgroups/infinispan, hazelcast
- hazelcast: undocumented consistency guarantees
BFT
- normal operation: why simple approach similar to failstop replication won't
work
invoke VIEWSTAMP OPERATION -> DEST
----------------------------------
invoke 1,1 X -> B
invoke 1,1 Y -> C
nobody's been fooled yet at this point since the you need 2f+1 agreements
invoke 1,2 Z -> B
invoke 1,2 Z -> C
but at this point both will reply correctly even though they have diff
states
- what about sending hashes of logs?
- SUNDR: guarantees fork consistency property: if server lies to clients
(drops, reorders, etc.) causing a fork in the clients' histories
- actually what happens: see journal
- a prepared certificate is a set of 2f+1 prepares
- commit point: when $f+1$ non-faulty replicas have a prepared certificate
- read requests: send to all replicas, just wait for enough consistent replies
- view changes: completely diff from paxos
- go through primaries round-robin; no leader election
- R_n to (new) P: <DOVIEWCHANGE, list of prepared certificates> R_n
- P to all: <NEWVIEW, list of $2f+1$ DOVIEWCHANGE msgs> P
<PREPREPARE, req...> all the operations that were supposed to
have been committed in the prev view
- note: 2 replicas can execute the same operation in diff viewstamps; only
thing that matters is that they all run in the same order
- questions
- diff btwn viewstamp replication and paxos?
- what was the point about fork consistency in SUNDR and log hashes?
- isn't the all-to-all comm necessary since the primary could lie?
queueing systems
- AMQP
- rabbitmq: erlang impl;
<http://developian.blogspot.com/2009/11/drizzle-replication-using-rabbitmq-as.html>
- SQS: AWS offering
- yahoo/apache hedwig: for PNUTS replication
TODO dtxns
- http://citeseer.ist.psu.edu/533331.html a fast commit protocol for distributed main memory database systems
- http://citeseer.ist.psu.edu/589561.html
- http://citeseer.ist.psu.edu/238153.html Transaction Routing for Distributed OLTP Systems
- transaction management in the R* distributed database management system
TODO distributed query optimization
- R*
datacenter
==========
amazon aws
- ebs: volumes can only be mounted from 1 instance at a time, so no consistency issues
- behaves like SAN
- autoscaling: set conditions eg CPU, network, disk util
microsoft chicago
- trailer containers
- $.5B, 700 ft^2, 2 levels, 112 containers, 224K servers
- each container: 2K servers, .5MW critical load, 50K lb, air skates
- 11 diesel generators, currently 30MW total load, fully 56MW critical load
FAWN (dga, sosp09)
- best paper; vijay presented
- low-power embedded CPUs + small flash storage
- increasing CPU/IO gap: reduce CPU
- power grows super-linearly with speed
- dynamic power scaling on trad'l systems is inefficient; near min volt at
highest freqs
- flash
- fast random reads: << 1 ms (up to 175x faster than disk)
- power-efficient IO: < 1 W at heavy load (disks at load use 10 W)
- slow random writes
- FAWN-DS: log-structured store
- main-mem hash index: maps 160-bit keys to log pos
- only stores 15-bit fragment of key, 1 valid bit, 4 B log pos = 16 B
- index into hash table is lowest i bits (16 bits); tables have 2^i elts
- key frag is next 15 bits; cmp this
- pseudocode
index = key & 0xffff
keyfrag = (key >> 16) & 0x7fff
for i = 0 to n:
bucket = hash[i][index]
if bucket.valid and bucket.keyfrag == keyfrag and
readkey(bucket.offset) == key:
return bucket
return not_found
- periodic snapshots
- typical obj size: 256 B to 1 KB; for 256 MB RAM, yields approx 4 GB
storage
- each node has several virtual IDs in the key ring, so multiple files;
these _semi-random writes_ are still fast
- maintenance: split, merge, compact are all scans that can run
concurrently with ongoing operation (thanks to immutability/main-memory
index)
- TODO
- FAWN-KV: consistent, replicated, HA KV store
- chained replication
- 350 KV queries/Joule: 2 OOM more than disk-based system
People
======
- Michael J Freedman
- Frans Kaashoek
- Robert Morris
- Ion Stoica
- Scott Shenker
vuze aka azureus
- custom msging protocol for extensions
- peer exchange
- superseeding
- vivaldi coords
- kademlia dht
DSM
===
Ivy: Memory Coherence in Shared Virtual Memory Systems (Yale)
- migrate pages among writers; write invalidation
- read-only copies can be replicated
- implements sequential consistency
- centralized manager algorithm
- to get a read
- C: RQ to M (read query)
- M: RF to O (read forward)
- O: RD to C (read data)
- C: RC to M (read confirm)
- to get a write
- C2: WQ to M (write query)
- M: IV to C1 (invalidate)
- C1: IC to M (invalidate confirm)
- M: WF to O (write forward)
- O: WD to C2 (write data)
- C2: WC to M (write confirm)
- tricky: read/write confirms are done to order the read/write data requests
- efficiency problems: comm intensive, page granularity, false sharing
TreadMarks: DSM on Standard Workstations and Operating Systems
- "better" than Ivy
- implements lazy release consistency with write notices/write diffs
- compatibility with existing apps, since you can always re-write programs to
respect sharing granularity (to avoid false sharing)
- technqiues
- LRC: lazy release consistency
- write diffs: just send RLE of changes
- write notices: don't send write diffs immediately, but on demand (even
lazier)
- vector timestamps
- details
- release consistency: just publish changes following a lock release
- eager RC: all coherence actions performed immediately on release operations
- lazy RC: all coherence actions delayed until after a corresponding
subsequent acquire
- even lazier: use write notices
- less work: use write diffs
- coherence actions = publishing all changes
- vector timestamps are necessary for this situation:
CPU0: al1 x=1 rl1 al2 z=99 rl2
CPU1: al1 y=x rl1
CPU2: al1 print x, y, z rl1
- want CPU2 to see not just CPU1's release but CPU0's as well
- CPU2 knows about CPU0's change via CPU1's vector timestamp, which includes
CPU0's entry
Consensus
=========
Paxos
- simplified overview: there are proposers and acceptors
- propose
- proposer sends proposal (with unique round number $n$) to all acceptors
- acceptors promise to accept proposal if $n$ is highest $n$ seen (promise
to ignore any lower $n$)
- accept
- if proposer hears back from quorum, send accept requests to quorum with
the value it wants to set
- if acceptor hasn't promised to a higher $n$, tell proposer it accepted
- if proposer gets quorum of accepts, then commit
- normal for all nodes to be both proposer and acceptor
- actual overview: acceptor promise identifies accepted $n$ to prevent future
proposers with higher $n$ from changing the value
prepare(n) ->
"if you've already *accepted* a proposal what's its n and value?"
<- prep_ok(n_a, v_a)
the proposer will adopt this value, if it hears it from a majority
accept(n,v) ->
<- acc_ok(n)
decide ->
- proposal n is chosen when a majority of acceptors sent `acc_ok(n)`
- interesting property of paxos: once proposal is chosen, system will never
change its mind
- problem: if proposers all get prep_ok responses that say nothing was agreed
upon, then there may be conflicts in the accept stage
- this is why we have acc_ok
- pseudocode
proposer(v):
choose n, unique and higher than any n seen so far
send prepare(n) to acceptors
if prep_ok(n_a, v_a) from majority:
# go with the *lower* proposal's value (but retain same n)
if any v_a != nil: v' = v_a with max n_a
else: v' = v
send accept(n, v') to all
if acc_ok(n) from majority: send decided(v') to all
acceptor:
n_p: highest prepare seen, initially 0
v_a: highest accept seen, initially nil
n_a: highest accept seen, initially 0
prepare(n) handler:
if n > n_p:
n_p = n
reply prep_ok(n_a, v_a)
accept(n,v) handler:
if n >= n_p: # reject anything lower than highest *prepare*
# this ensures that we only accept the highest prepare we've seen
n_a = n
v_a = v
reply acc_ok(n)
commit point: majority of acceptors record a particular v_a
if acceptor times out (doesn't receive decide):
ask all servers for v_a, see if majority agrees
otherwise become a proposer
- scenario
A1: p10 a10v10 p11 a11v10
A2: p10 a10v10 p11 a11v10
A3: p10 p11 a10v10 X a11v10
- say two proposers both get back a majority of prep_oks; n_p allows us to
reject any accept request that isn't the highest
- vague argument for why other interleavings work out right:
- important: the variable values must persist across crashes; must not be
allowed to forget values
- otherwise nodes that have already accepted a value will forget that they
have done so, and may subsequently reply incorrectly to a different
prepare/accept
- what this effectively means is that nodes must record their variable
updates to disk
life beyond distributed transactions: an apostate's opinion (pat helland)
- dtxns are fragile
Storage & Consistency
=====================
Errors in Database Systems, Eventual Consistency, and the CAP Theorem (Michael
Stonebraker, cacm blog)
- <http://cacm.acm.org/blogs/blog-cacm/83396-errors-in-database-systems-eventual-consistency-and-the-cap-theorem/fulltext>
- most NoSQL disallow multi-partition txns, so consistency applies only to
replication
- LAN partns rare, esp w redundant LANs
- disasters: no eventual WAN consistency since you'll never recover (no
durability)
- app/dbms bugs: corruption/byzantine
- single-node failures: easily survived by many algos
- WANs are redundantly connected
From <http://www.allthingsdistributed.com/2008/12/eventually_consistent.html>:
> The first formal treatment of the consistency vs. latency (and if one wishes,
> availability and tolerance to network partitions) in fact appeared in the
> paper
>
> Hagit Attiya, Jennifer L. Welch: Sequential Consistency versus
> Linearizability. ACM Trans. Comput. Syst. (TOCS) 12(2):91-122 (1994) -
> initially published in the SPAA 1991 conference.
>
> It formally proves that for lineariability both read and write operations
> cannot be faster than the network latency. In sequential consistency, either
> reads or writes can be faster than the network latency, but at least one of
> them has to be slow.
>
> In particular, this means that strong consistency conditions cannot be made
> highly available in partitionable environments, and cannot be implemented in
> a scalable manner.
Measurement and Analysis of TCP Throughput Collapse in Cluster-based Storage
Systems (read 2/25/08)
- cluster storage systems where data is striped across multiple servers
experience tcp throughput collapse
- cause: exceeding output port buffer capacity within switches -> packets
dropped -> tcp timeouts
- solutions: nothing very satisfactory
Chain Replication for Supporting High Throughput and Availability
- simple idea: have a chain of servers
- all writes start at head and return from tail
- all reads go to tail
- "high availability": you can stay up, but you're only reading from tail....
The Chubby Lock Service for Loosely-Coupled Distributed Systems
Rethink the Sync
- TODO
- basically: bluefs allows for speculative IO assuming that cached data is
still valid; allow computations to proceed while buffering linux IO so that
speculative output is not released until data is validated
Tra (Russ Cox)
- sync time >= mod time; time btwn is guaranteed to have no changes
- only benefit of VT pair is bounding the times for dirs; same thing can be
done with VTs, but more verbose repr
- impl: traserv scans fs for changes
- paper bugs
- why is fig 8 known to have no mods at B3? it could've....
- why doesn't d have min of sync times in fig 12?
MapReduce
=========
mapreduce
- [work underappreciated for its answers to systems issues like stragglers and
query fault-tolerance]
- [issues]
- pull-based reducers mean lots of disk seeks
- $O(mr)$, where $m$ is mapper count, $r$ is reducer count; on single disk,
$O(r)$
- bc after each mapper sorts locally, all reducers read a slice from each
sorted map output
hadoop
- _job_: full map-reduce; _task_: an execution slice
- _jobtracker_: master node that accepts jobs from clients, splits jobs into
tasks, and assigns them to _slaves_ (workers)
- _tasktracker_: on each slave; manages task execution
- map task execution
- _split_: portion of input file for a task; defaults to 1 chunk (64MB)
- mapper interface
class mapper:
map(key, val, OutputCollector)
close(): called when map has been called on all records in split
- OutputCollector: stores output in a format for reducer
- apply partition function on key, buffer record/partition number in buffer
- spill to disk occasionally; spill sorts by (partition, key)
- _commit phase_: on task completion; merges spills; read when servicing reqs
- _combiner_: optional map-side reducer
- reduce task execution
- each reduce task is assigned a partition of key range
- _shuffle phase_: fetch task partition from each map task's output (HTTP;
default 5 at a time)
- _sort phase_: group records with same key
- _reduce phase_: apply reducer to each key and corresponding list of values
- oozie workflow engine uses DAG of hadoop, pig, ssh, http, email jobs
hive
- TODO (below is just starter)
- hiveql
- from subqueries, inner/outer equijoins, multi-group-by, multi-table
inserts, UDFs
- sampling
- nested collections: arrays, maps, UDTs
- query optimizer
- predicate pushdown
- left outer join: left side preds pushed
- right outer join: right side preds pushed
- full outer join: none pushed
- non-det functions not pushed (rand; specified with java annot)
- map joins: user-specified small in-mem hash tables on mappers; no
sort/reduce needed
- group by optimizations: map-side partial aggs, load balancing for data skew
- extensibility
- disk data formats: text, sequence, ...
- in-mem ('serde') data formats: java, hadoop writable, thrift, ...
- user map-reduce scripts
- UDFs: substr, trim, from_unixtime, ...
- user-defined agg functions: sum, average, ...
- user-defined table functions: explode, ...
mapreduce online (tyson condie, nconway, ..., jmh, cal TR 2009)
- built hadoop online prototype (HOP); submitting to hadoop project
- key idea: pipeline (no materialization)
- features
- online aggregation: see snapshots of results as they are generated ("early
returns")
- continuous data stream processing: indefinite input
- jobs finish faster
- single job: start as many map/reduce tasks as you have slots for
- don't schedule all at once, since not enough slots and too many TCP conns
- if no reducer task to take from a mapper, then just materialize
- buffer records for opportunity to apply sort/combiner
- resort/recombine periodically if reducer not keeping up
- multi-job pipelining
- fault-tolerance: reducer just avoids mixing tentative ("uncommitted") map
task output with "committed"
- online agg: snapshot accuracy is hard; report simple progress metric along
with snapshot results
- continuous mr: continuous online agg
- circular buffer of spillfiles
- [db group meeting discussion]
- they claim that they can't overlap/pipeline mapreduce jobs, but unclear why
- for regular non-online-agg MR jobs, unclear how much pipelining benefits
perf
- seems to be pushing the disk-writing to the reducer
- mappers send smaller sorted units to reducers; reducers are reading from
many input file descriptors
- but reducers need to buffer these somehow; most likely need to spill to
disk
dryad
- TODO
- DAGs, pipelining, explicit materialization
- iterative algos incl. k-means, matrix power iteration, and graph traversal
- cosmos distributed file system
- ecosystem
- scope: sql-like language; by bing team
- dryadlinq: academic release
- nebula: a distributed shell with file descriptors as connectors
manimal: relational optimization for data-intensive programs (mcafarella, webdb10, talk at cloudera)
- dbms efficient, mr slow but flexible
- given unmodified mr programs as bytecode
- apply well understood opts like indexes, col stores
- analyzer -> optimizer -> executor
- special handling of regexes; eg constant regex must start w chars
- eval 16 programs: 4 from pavlo, 12 from mahout
- detected 3/4 sel opt oppty's
- couldn't handle hashtable, but that's popular enough for special handling
- counters, log msgs, etc - we're fine for abusing
- opts: sel, proj, delta compression, dict compression w direct operation
- dict compression can use the compressed value directly if you're just doing
e.g. equality comparisons; no need to decompress/look up in dictionary
- sel: 1% selectivity -> 1% runtime
- proj: huge data reductions
- compression: delta compression not worth the trouble; too small gains
- need semantic analysis of the code
- outstanding problems
- indexing time overhead is another mr job; only makes sense for repeat jobs
- details
- works w writables and text
- overridden recordreader can use the index
- mentioned haloop from uw on optimizing gradient ascent mr jobs
- discussion
- what % of mr programs are java or pig/hive?
- karmasphere: most (90%) mr jobs are in java
- amr thinks it's the other way around; some even ban java
- 99% are python streaming at fb, visa
- flumejava/plume: closer to linq; java lib for mr jobs
- craig chambers' uw project -> google
- hive has indexes
- optimizer is rule-based; cost-based optimizers that take runtime feedback
would be nice
- extra cloudera work (not part of webdb talk)
- learnavro: 'zero to olap in 60s'; mcaf's cloudera hacking
- based on at&t learnpads paper; python done; java wip
- go from raw data files to structured data
- schemadictionary
- inputs: anon structured data + previously seen datasets
- find $k$-closest datasets to anon, + schema mappings
- schema type info, data statistics
- "how likely is it that this data was drawn from the same distrib?"
- after user picks one of the 3, label anon data using mapping
- dist computation function
- tool to auto pick the correct visualization
Block stores
============
DRBD
- 2-node block-level replication, either sync or async
- active/active only for FSs like OCFS2 or GFS2 (slower FSs), not ext3 etc.
- supports online recovery
- <http://lwn.net/Articles/329543/>
Filesystems
===========
ceph (sage weil, osdi06) TODO incomplete
- in linux 2.6.34; weil's ucsd phd thesis, then founded new dream/dreamhost
- near-POSIX semantics; has linux VFS client
- extends interface & selectively relaxes consistency for app needs & for
perf
- O_LAZY: don't need to disable read caching & write buffering when file
opened by multiple clients
- HPC community proposed extensions to posix
- metadata server (MDS) cluster manages namespace and coordinates security,
consistency, and coherence
- _dynamic subtree partitioning_: can redundantly replicate portions for perf
- measure metadata popularity using exponentially decaying counters; each op
affects parent dir up to root
- 2PC subtree authority transfers
- CRUSH distribution function designed for heterogeneous, unreliable _object
storage devices_ (OSDs, i.e. backends)
- file broken into objects mapped into placement groups (PGs) using simple
hash fn
- PGs assigned to OSDs using CRUSH
- ceph monitor cluster manages MDS clusters
- just has to handle MDS heartbeats
- uses Paxos w leasing mechanism
- largest scalabality test: 430 nodes, 128 MDS, per-MDS perf 50% of 1-MDS cluster, ~250K metadata ops/s (enough for 1000s of OSDs)
- OSDs only comm w peers
- actively migrates data to new nodes and re-replicates data on failed nodes
- uses (and contributes to) btrfs; orig used own EBOFS (extent and b-tree
object file system)
- tuned for obj semantics & other features eg async notif of commits to disk
- other features: snapshots; file/capacity accounting at dir level
pohmelfs TODO
farsite distributed file system (MSR, 2000)
- replicates each file onto multiple desktops
Reclaiming Space from Duplicate Files in a Serverless Distributed File System (2002) [32 citations — 2 self]
- over half of all consumed space is occupied by duplicate files
- convergent encryption: coalesce duplicates to take single file's space
- database for aggregating file content and location information in
decentralized, scalable, fault-tolerant manner
frangipani: scalable distributed file system (1997)
- built on petal block storage
xtreemfs
- WAN operation
- TODO
Replication in the Harp File System (Liskov, Ghemawat, et al)
- primary copy replication: single primary server waits on multiple backup
servers before returning
- primary ensures enough backups to guarantee operation effects will survive
all subsequent failovers
- failover algorithm masks failures, removes failed node
- operations are immediately recorded into volatile memory log
- write-behind to disk
- UPS for power failures
- each file managed by a _group_ (with one master)
- modifications require 2PC
- non-modifications don't require 2PC, but instead TODO
- **replication method**
- **overview**
- _view change_: each group configuration is a _view_
- as with any replication scheme that tolerates network partitions, require
$2n+1$ for $n$ failures
- only actually need to store data on $n+1$ of the machines; can be
propagated on failures (partition)
- _witnesses_: replicas that don't store data
- 1 _designated primary_ (will act as primary whenever it can), $n$
_designated backups_, $n$ _designated witnesses_
- arrange groups so that each node is designated primary of one group,
designated backup of another, designated witness of a third; this
distributes workload
- **normal-case processing**
- log of _event records_ with growing IDs of operations, both phase 1 and
committed
- _commit point (CP)_: ID of latest committed op (separates phase 1 and
committed)
- 2PC
- primary logs, sends logged info to backups
- backups only accept in log order; ack for $n$ means got up through $n$
- primary commits by advancing CP and send CP to backups
- applying changes
- log is WAL no-steal: changes not applied on disk (FS) until committed
- _application point_: ID of latest op that has started being applied to
the local FS
- _lower bound (LB)_: ID of latest op that has finished being applied to
the local FS
- primary and backups exchange LBs
- _global LB_: min(all LBs); may truncate things beyond this
- recovery: log brings recovering node up to date
- log is _redo_: only sufficient to redo op after failure
- reads can be serviced immediately without 2PC
- results reflect committed, no uncommitted (i.e. serialize these reads
at the CP)
- if primary is partitioned off, then the result of the read may not
reflect a write that has been committed in the new view
- compromises _external consistency_, which requires comm outside FS
- make this unlikely by using loosely synchronized clocks
- each message from backup to primary contains a time equal to backup's
clock + $\delta$ (few hundred ms)
- backup promises to not start new view until that time
- expect that starting a new view will not be delayed, since $\delta$
will have passed before new view is ready to run
- primary needs to comm with the backup about a read op only if the
time of its local clock is greater than the promised time
- access time: can enable loose consistency on this
- return immediately from primary, before committing
- may be lost if there's a failure/partition
- **view changes**
- views have _view numbers_
- promoted witnesses may not have resources (eg FS) so must never trunc log
- TODO left off here
The Google File System
- 3x replication
GFS Evolution
- main problem: master has memory index of all files' metadata
- orig master per cell per DC, but isolation was difficult and 1-master not
scalable in # files; moved to multi-cell, multi-master
- multi-cell: multiple masters (isolated FSs) over shared pool of
chunkservers; app must manually partition over masters
- namespaces: static partitioning of one FS namespace over multi-cell
- started having more online apps like gmail
- 64MB chunks too big but smaller chunks -> more files; 1MB compromise; see
above discussion on master scalability
- orig GFS was for throughput not latency
- also needed more availability; 1-master is SPOF
- orig master failover was manual: cell could be down for an hr
- then auto, but slow (minutes); now down to 10s
- generally, bigtable solves a bunch of problems
- bigtable is very failure-aware & responds faster
- multihoming
- bigtable txn log: have 2 logs, write to one and failover to other, merging
later
- gmail uses multihoming, for availability and to hide gfs problems
- many disks claimed to linux driver that they supported a range of ide
protocol versions but actually responded reliably only to more recent ones, silently corrupting data
- motivated GFS checksums, but these were E2E so covered everything (eg TCP
corruption)
- loose/eventual consistency
- apps can read stale data
- client failures are big issue
- multi-writers are problematic; eg RecordAppend interface (dupes, ordering)
- general google approaches
- get things working reasonably well, then focus on scaling, not so much on
efficiency; usually never focus on a binary, except for GFS master perf
- had flexibility since apps & infrastructure all by google
- <http://queue.acm.org/detail.cfm?id=1594206>
- <http://cacm.acm.org/magazines/2010/3/76283-gfs-evolution-on-fast-forward/fulltext>
wheelfs: Don't Give Up On Distributed File Systems (strib)
- specify hints about consistency
CloudStore aka Kosmos File System (KFS)
- from kosmix auto-portal site
- hadoop/hypertable/hbase integration [compat with HDFS?]
- for: mainly write-once/read-many loads; Ms of big files; mostly seq access
- replication (configurable per-file) with async re-replication on failure
- periodic chunk rebalancing; elasticity
- for data integrity, verify checksum on read; re-replicate to recover
- leases for client cache consistency
- chunk versioning to detect stale chunks
- C++, STL, boost, aio, log4cpp; solaris, linux; fuse
DHTs
====
UsenetDHT (Emil Sit, NSDI08)
- TODO: read paper
- high bandwidth for peering and high storage/durability rquirements
- related work
- DHash, OpenDHT optimized for small objects (eg CFS)
- Galcier: focus on catastrophic failure
- Total Recall: ...
- Coral: perf, but a cache, so no durability
- UsenetDHT: shared Usenet server
- > 80 TB/yr
- _Passing Tone_: algo for data maintenance
- re-replication on failures
- durability for immut dat
- using only pairwise sync
- supports obj expiration
- enhanced DHash for perf
- bulk data
- 6MB/s per server
- goal: reduce distribution inefficiency and storage reqs
- share articles among eg universities, ISPs
- nodes do full exchange of headers *only*
- challenges
- in talk and paper
- respond to eachfailure?
- syncs expensive
- TODO
- in paper
- limited disk cap: can spurious repairs be avoided
- rand disk IO
- Passing Tone
- $r_L$ reps for durab
- extra replicas to mask transient failures
- repair decisions need only pairwise syncing
- two algos:
- local: replication among current replica set
- global: ensures correct placement
- data placement
- replicate to $r_L$ successors
- idea from Carbonite system: extra replicas to mask transient failures
- make new copies when $< r_L$ avail
- predecessors enable local decisions
- chord only know about immed pred
- forces immed succ to handle maint
- coord hard: constant writes/epirations
- pred list allows nodes to handle own maint
- given $r_L$ and preds, know responsible range
- extras reps can be reclaimed for space if nec
- local maint
- calc respons range from preds
- sync with nbrs over range to
- identify objs missing locally
- make local reps of those objs
Kademlia (Petar Maymounkov, David Mazieres)
- simplest
Pastry
Chord: A Scalable Peer-to-Peer Lookup Service for Internet Applications
Programming
===========
Friday: global debugging (paper notes)
- intro
- objective: track global state
- contributions [basically new debugger]
- primitives detect events with watch/breakpoints
- attach commands to these
- liblog
- for libc/posix c/c++ apps
- each thread logs side effects of non-det syscalls (recvfrom, select)
- maintains causally consistent group replay by inserting lamport clocks in
messages
- incrementally deployable
- can simulate a reasonably large # nodes on debugging machine
- still hard to debug - find needle in haystack
- design
- watch/breakpoints
- breakpoints are the same
- implementation [watchpoints]
- write-protected pages
- alternatives
- hardware - limited # watchpoint regs (usu. 8) not enough
- single-stepping - too slow
- local breakpoints - hard to analyze program to know where to insert
them
- periodic sampling - coarse-grained
- implementation complexity
- replicated gdb functionality
- replaced gdb's calling facilities due to conflicts with
write-protection
- commands
- scripting language (running in own world; effectively 'global' state)
- + syntax to access app state, current node, lamport/real clocks, etc
- language choice
- python - easy to embed
- syntax
- translate to gdb's 'print'/'set'
- string interpolation
- memory protection on app function invocs
- general data structure marshalling -> raw bytes, enough for equivalence
testing
- limitations
- false positives slow down replay
- can recompile app to spread out stack