From dd0cdfef4730a4aeb47bccc5bf7e4ea5929e4c3a Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Mon, 3 Aug 2020 14:44:33 -0400 Subject: [PATCH 1/4] roachtest: correctly start crdb in acceptance/rapid_restart --start-single-node was needed. Without it, I think the test's killing of a node raced with that node dieing by itself, and sometimes the race resulted in `cockroach stop` first seeing the process but then `/bin/bash: line 8: kill: (16016) - No such proces` Fixes #52060 Release note: None --- pkg/cmd/roachtest/rapid_restart.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cmd/roachtest/rapid_restart.go b/pkg/cmd/roachtest/rapid_restart.go index 6da2ecb684bf..303dab829016 100644 --- a/pkg/cmd/roachtest/rapid_restart.go +++ b/pkg/cmd/roachtest/rapid_restart.go @@ -47,7 +47,7 @@ func runRapidRestart(ctx context.Context, t *test, c *cluster) { exitCh := make(chan error, 1) go func() { err := c.RunE(ctx, nodes, - `mkdir -p {log-dir} && ./cockroach start --insecure --store={store-dir} `+ + `mkdir -p {log-dir} && ./cockroach start-single-node --insecure --store={store-dir} `+ `--log-dir={log-dir} --cache=10% --max-sql-memory=10% `+ `--listen-addr=:{pgport:1} --http-port=$[{pgport:1}+1] `+ `> {log-dir}/cockroach.stdout 2> {log-dir}/cockroach.stderr`) From d1d27baec8c18085749df9179e2a29b7de3aa7f2 Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Mon, 3 Aug 2020 15:16:07 -0400 Subject: [PATCH 2/4] roachtest: remove confusing "--- PASS" log lines Two workloads were printing PASS/FAIL lines from inside the predicates passed to Searcher.Search. This is really confusing when reading the test's output, because they don't correspond to the test's disposition. Release note: None --- pkg/cmd/roachtest/kvbench.go | 29 +++++++++++++---------------- pkg/cmd/roachtest/tpcc.go | 4 ++-- 2 files changed, 15 insertions(+), 18 deletions(-) diff --git a/pkg/cmd/roachtest/kvbench.go b/pkg/cmd/roachtest/kvbench.go index f54b53969929..5af5451da428 100644 --- a/pkg/cmd/roachtest/kvbench.go +++ b/pkg/cmd/roachtest/kvbench.go @@ -314,18 +314,22 @@ func runKVBench(ctx context.Context, t *test, c *cluster, b kvBenchSpec) { } close(resultChan) res := <-resultChan - failErr := res.failureError(b) - if failErr == nil { - ttycolor.Stdout(ttycolor.Green) - t.l.Printf(`--- PASS: kv workload maintained an average latency of %0.1fms`+ - ` with avg throughput of %d`, res.latency(), res.throughput()) + + var color ttycolor.Code + var msg string + pass := res.latency() <= b.LatencyThresholdMs + if pass { + color = ttycolor.Green + msg = "PASS" } else { - ttycolor.Stdout(ttycolor.Red) - t.l.Printf(`--- FAIL: kv workload maintained an average latency of %0.1fms (threshold: %0.1fms)`+ - ` with avg throughput of %d`, res.latency(), b.LatencyThresholdMs, res.throughput()) + color = ttycolor.Red + msg = "FAIL" } + ttycolor.Stdout(color) + t.l.Printf(`--- SEARCH ITER %s: kv workload avg latency: %0.1fms (threshold: %0.1fms), avg throughput: %d`, + msg, res.latency(), b.LatencyThresholdMs, res.throughput()) ttycolor.Stdout(ttycolor.Reset) - return failErr == nil, nil + return pass, nil } if res, err := s.Search(searchPredicate); err != nil { t.Fatal(err) @@ -381,10 +385,3 @@ func (r kvBenchResult) throughput() int { // compute the average throughput here but not much more than that. return int(float64(r.Cumulative[`write`].TotalCount()) / r.Elapsed.Seconds()) } - -func (r kvBenchResult) failureError(b kvBenchSpec) error { - if r.latency() <= b.LatencyThresholdMs { - return nil - } - return errors.Errorf(`average latency is too high %0.1fms`, r.latency()) -} diff --git a/pkg/cmd/roachtest/tpcc.go b/pkg/cmd/roachtest/tpcc.go index 033504e5e102..b1fcae47ea2f 100644 --- a/pkg/cmd/roachtest/tpcc.go +++ b/pkg/cmd/roachtest/tpcc.go @@ -890,11 +890,11 @@ func runTPCCBench(ctx context.Context, t *test, c *cluster, b tpccBenchSpec) { // Print the result. if failErr == nil { ttycolor.Stdout(ttycolor.Green) - t.l.Printf("--- PASS: tpcc %d resulted in %.1f tpmC (%.1f%% of max tpmC)\n\n", + t.l.Printf("--- SEARCH ITER PASS: TPCC %d resulted in %.1f tpmC (%.1f%% of max tpmC)\n\n", warehouses, res.TpmC(), res.Efficiency()) } else { ttycolor.Stdout(ttycolor.Red) - t.l.Printf("--- FAIL: tpcc %d resulted in %.1f tpmC and failed due to %v", + t.l.Printf("--- SEARCH ITER FAIL: TPCC %d resulted in %.1f tpmC and failed due to %v", warehouses, res.TpmC(), failErr) } ttycolor.Stdout(ttycolor.Reset) From 0fe825af1c8a7e563cd684877e64e04274d0f5f7 Mon Sep 17 00:00:00 2001 From: Paul Bardea Date: Wed, 15 Jul 2020 22:32:25 -0400 Subject: [PATCH 3/4] backupccl: distribute RESTORE work using DistSQL This commit replaces restore's old method of coordinating work on a single node to instead use DistSQL. It creates a 2 stage DistSQL flow. The first stage assigns chunks to SplitAndScatter processors which forward spans that they scattered to the second stage made up of the RestoreData processors which ingest the data. The SplitAndScatter processors attempt to send the work to the RestoreData colocated with the leaseholder of the span after the scattering. Release note: None. --- pkg/ccl/backupccl/backup.pb.go | 278 ++++++++------ pkg/ccl/backupccl/backup.proto | 1 + pkg/ccl/backupccl/restore_data_processor.go | 1 + pkg/ccl/backupccl/restore_job.go | 340 ++++-------------- .../backupccl/restore_processor_planning.go | 252 +++++++++++++ .../backupccl/split_and_scatter_processor.go | 2 +- pkg/sql/rowexec/processors.go | 2 +- 7 files changed, 480 insertions(+), 396 deletions(-) create mode 100644 pkg/ccl/backupccl/restore_processor_planning.go diff --git a/pkg/ccl/backupccl/backup.pb.go b/pkg/ccl/backupccl/backup.pb.go index 87695aeb7e29..9ed2ca61e268 100644 --- a/pkg/ccl/backupccl/backup.pb.go +++ b/pkg/ccl/backupccl/backup.pb.go @@ -54,7 +54,7 @@ func (x MVCCFilter) String() string { return proto.EnumName(MVCCFilter_name, int32(x)) } func (MVCCFilter) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_backup_e6a836364b82952d, []int{0} + return fileDescriptor_backup_41c41766bcab725b, []int{0} } type EncryptionInfo_Scheme int32 @@ -74,7 +74,7 @@ func (x EncryptionInfo_Scheme) String() string { return proto.EnumName(EncryptionInfo_Scheme_name, int32(x)) } func (EncryptionInfo_Scheme) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_backup_e6a836364b82952d, []int{3, 0} + return fileDescriptor_backup_41c41766bcab725b, []int{3, 0} } type ScheduledBackupExecutionArgs_BackupType int32 @@ -97,7 +97,7 @@ func (x ScheduledBackupExecutionArgs_BackupType) String() string { return proto.EnumName(ScheduledBackupExecutionArgs_BackupType_name, int32(x)) } func (ScheduledBackupExecutionArgs_BackupType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_backup_e6a836364b82952d, []int{5, 0} + return fileDescriptor_backup_41c41766bcab725b, []int{5, 0} } // RowCount tracks the size and row/index entry counts. @@ -111,7 +111,7 @@ func (m *RowCount) Reset() { *m = RowCount{} } func (m *RowCount) String() string { return proto.CompactTextString(m) } func (*RowCount) ProtoMessage() {} func (*RowCount) Descriptor() ([]byte, []int) { - return fileDescriptor_backup_e6a836364b82952d, []int{0} + return fileDescriptor_backup_41c41766bcab725b, []int{0} } func (m *RowCount) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -189,7 +189,7 @@ func (m *BackupManifest) Reset() { *m = BackupManifest{} } func (m *BackupManifest) String() string { return proto.CompactTextString(m) } func (*BackupManifest) ProtoMessage() {} func (*BackupManifest) Descriptor() ([]byte, []int) { - return fileDescriptor_backup_e6a836364b82952d, []int{1} + return fileDescriptor_backup_41c41766bcab725b, []int{1} } func (m *BackupManifest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -232,7 +232,7 @@ func (m *BackupManifest_File) Reset() { *m = BackupManifest_File{} } func (m *BackupManifest_File) String() string { return proto.CompactTextString(m) } func (*BackupManifest_File) ProtoMessage() {} func (*BackupManifest_File) Descriptor() ([]byte, []int) { - return fileDescriptor_backup_e6a836364b82952d, []int{1, 0} + return fileDescriptor_backup_41c41766bcab725b, []int{1, 0} } func (m *BackupManifest_File) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -267,7 +267,7 @@ func (m *BackupManifest_DescriptorRevision) Reset() { *m = BackupManifes func (m *BackupManifest_DescriptorRevision) String() string { return proto.CompactTextString(m) } func (*BackupManifest_DescriptorRevision) ProtoMessage() {} func (*BackupManifest_DescriptorRevision) Descriptor() ([]byte, []int) { - return fileDescriptor_backup_e6a836364b82952d, []int{1, 1} + return fileDescriptor_backup_41c41766bcab725b, []int{1, 1} } func (m *BackupManifest_DescriptorRevision) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -301,7 +301,7 @@ func (m *BackupManifest_Progress) Reset() { *m = BackupManifest_Progress func (m *BackupManifest_Progress) String() string { return proto.CompactTextString(m) } func (*BackupManifest_Progress) ProtoMessage() {} func (*BackupManifest_Progress) Descriptor() ([]byte, []int) { - return fileDescriptor_backup_e6a836364b82952d, []int{1, 2} + return fileDescriptor_backup_41c41766bcab725b, []int{1, 2} } func (m *BackupManifest_Progress) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -335,7 +335,7 @@ func (m *BackupManifest_Tenant) Reset() { *m = BackupManifest_Tenant{} } func (m *BackupManifest_Tenant) String() string { return proto.CompactTextString(m) } func (*BackupManifest_Tenant) ProtoMessage() {} func (*BackupManifest_Tenant) Descriptor() ([]byte, []int) { - return fileDescriptor_backup_e6a836364b82952d, []int{1, 3} + return fileDescriptor_backup_41c41766bcab725b, []int{1, 3} } func (m *BackupManifest_Tenant) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -370,7 +370,7 @@ func (m *BackupPartitionDescriptor) Reset() { *m = BackupPartitionDescri func (m *BackupPartitionDescriptor) String() string { return proto.CompactTextString(m) } func (*BackupPartitionDescriptor) ProtoMessage() {} func (*BackupPartitionDescriptor) Descriptor() ([]byte, []int) { - return fileDescriptor_backup_e6a836364b82952d, []int{2} + return fileDescriptor_backup_41c41766bcab725b, []int{2} } func (m *BackupPartitionDescriptor) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -410,7 +410,7 @@ func (m *EncryptionInfo) Reset() { *m = EncryptionInfo{} } func (m *EncryptionInfo) String() string { return proto.CompactTextString(m) } func (*EncryptionInfo) ProtoMessage() {} func (*EncryptionInfo) Descriptor() ([]byte, []int) { - return fileDescriptor_backup_e6a836364b82952d, []int{3} + return fileDescriptor_backup_41c41766bcab725b, []int{3} } func (m *EncryptionInfo) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -446,7 +446,7 @@ func (m *StatsTable) Reset() { *m = StatsTable{} } func (m *StatsTable) String() string { return proto.CompactTextString(m) } func (*StatsTable) ProtoMessage() {} func (*StatsTable) Descriptor() ([]byte, []int) { - return fileDescriptor_backup_e6a836364b82952d, []int{4} + return fileDescriptor_backup_41c41766bcab725b, []int{4} } func (m *StatsTable) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -481,7 +481,7 @@ func (m *ScheduledBackupExecutionArgs) Reset() { *m = ScheduledBackupExe func (m *ScheduledBackupExecutionArgs) String() string { return proto.CompactTextString(m) } func (*ScheduledBackupExecutionArgs) ProtoMessage() {} func (*ScheduledBackupExecutionArgs) Descriptor() ([]byte, []int) { - return fileDescriptor_backup_e6a836364b82952d, []int{5} + return fileDescriptor_backup_41c41766bcab725b, []int{5} } func (m *ScheduledBackupExecutionArgs) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -509,15 +509,16 @@ var xxx_messageInfo_ScheduledBackupExecutionArgs proto.InternalMessageInfo // RestoreProgress is the information that the RestoreData processor sends back // to the restore coordinator to update the job progress. type RestoreProgress struct { - Summary RowCount `protobuf:"bytes,1,opt,name=summary,proto3" json:"summary"` - ProgressIdx int64 `protobuf:"varint,2,opt,name=progressIdx,proto3" json:"progressIdx,omitempty"` + Summary RowCount `protobuf:"bytes,1,opt,name=summary,proto3" json:"summary"` + ProgressIdx int64 `protobuf:"varint,2,opt,name=progressIdx,proto3" json:"progressIdx,omitempty"` + DataSpan roachpb.Span `protobuf:"bytes,3,opt,name=dataSpan,proto3" json:"dataSpan"` } func (m *RestoreProgress) Reset() { *m = RestoreProgress{} } func (m *RestoreProgress) String() string { return proto.CompactTextString(m) } func (*RestoreProgress) ProtoMessage() {} func (*RestoreProgress) Descriptor() ([]byte, []int) { - return fileDescriptor_backup_e6a836364b82952d, []int{6} + return fileDescriptor_backup_41c41766bcab725b, []int{6} } func (m *RestoreProgress) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1264,6 +1265,14 @@ func (m *RestoreProgress) MarshalTo(dAtA []byte) (int, error) { i++ i = encodeVarintBackup(dAtA, i, uint64(m.ProgressIdx)) } + dAtA[i] = 0x1a + i++ + i = encodeVarintBackup(dAtA, i, uint64(m.DataSpan.Size())) + n20, err := m.DataSpan.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n20 return i, nil } @@ -1571,6 +1580,8 @@ func (m *RestoreProgress) Size() (n int) { if m.ProgressIdx != 0 { n += 1 + sovBackup(uint64(m.ProgressIdx)) } + l = m.DataSpan.Size() + n += 1 + l + sovBackup(uint64(l)) return n } @@ -3733,6 +3744,36 @@ func (m *RestoreProgress) Unmarshal(dAtA []byte) error { break } } + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field DataSpan", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBackup + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthBackup + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := m.DataSpan.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipBackup(dAtA[iNdEx:]) @@ -3859,109 +3900,110 @@ var ( ErrIntOverflowBackup = fmt.Errorf("proto: integer overflow") ) -func init() { proto.RegisterFile("ccl/backupccl/backup.proto", fileDescriptor_backup_e6a836364b82952d) } +func init() { proto.RegisterFile("ccl/backupccl/backup.proto", fileDescriptor_backup_41c41766bcab725b) } -var fileDescriptor_backup_e6a836364b82952d = []byte{ - // 1613 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x57, 0xcd, 0x6f, 0xdb, 0xc8, +var fileDescriptor_backup_41c41766bcab725b = []byte{ + // 1630 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x58, 0xcd, 0x6f, 0xdb, 0xc8, 0x15, 0x17, 0x25, 0x5a, 0x1f, 0x4f, 0xb2, 0xac, 0x4c, 0x9c, 0x84, 0x55, 0x53, 0x49, 0xd1, 0x62, - 0x51, 0xf5, 0x03, 0x14, 0xd6, 0x69, 0x9a, 0xc2, 0x87, 0xa2, 0xd6, 0x87, 0x13, 0xfa, 0x23, 0x48, + 0x51, 0xf5, 0x03, 0x14, 0xd6, 0x69, 0x9a, 0xd6, 0x87, 0xa2, 0xd6, 0x87, 0x13, 0xfa, 0x23, 0x48, 0x29, 0xc7, 0x87, 0x00, 0x85, 0x30, 0x22, 0xc7, 0x12, 0x61, 0x8a, 0x64, 0x38, 0x23, 0xad, 0xb5, - 0xc7, 0xfe, 0x05, 0xbd, 0xf6, 0xd4, 0xfe, 0x01, 0xfd, 0x43, 0x72, 0x29, 0xb0, 0x97, 0x02, 0x8b, - 0x1e, 0xdc, 0xae, 0x72, 0xe9, 0xb9, 0xc7, 0xc5, 0x1e, 0x16, 0x33, 0x24, 0x45, 0x66, 0xbd, 0x5e, - 0x2b, 0xf1, 0xed, 0xe9, 0xf1, 0xbd, 0xdf, 0xcc, 0xbc, 0x8f, 0xdf, 0x7b, 0x82, 0xaa, 0x61, 0xd8, - 0xed, 0x11, 0x36, 0xce, 0x67, 0x5e, 0x2c, 0xa9, 0x9e, 0xef, 0x32, 0x17, 0x3d, 0x30, 0x5c, 0xe3, - 0xdc, 0x77, 0xb1, 0x31, 0x51, 0x0d, 0xc3, 0x56, 0x57, 0x56, 0xd5, 0xca, 0x68, 0x66, 0xd9, 0x66, - 0xdb, 0x72, 0xce, 0xdc, 0xc0, 0xb4, 0x7a, 0x47, 0x98, 0x79, 0xa3, 0x36, 0xf6, 0xac, 0x50, 0x85, - 0x22, 0x95, 0x89, 0x19, 0x0e, 0x75, 0x75, 0xfa, 0xc6, 0x6e, 0x53, 0x86, 0x19, 0x6d, 0x33, 0x3c, - 0xb2, 0xc9, 0x90, 0xcb, 0x16, 0x65, 0x96, 0x11, 0x1a, 0x3c, 0x14, 0x06, 0x6f, 0xec, 0x11, 0xa6, - 0xa4, 0x4d, 0x99, 0x3f, 0x33, 0xd8, 0xcc, 0x27, 0x66, 0xf8, 0x55, 0x99, 0x31, 0xcb, 0x6e, 0x4f, - 0x6c, 0xa3, 0xcd, 0xac, 0x29, 0xa1, 0x0c, 0x4f, 0xc3, 0xab, 0x56, 0xb7, 0xc7, 0xee, 0xd8, 0x15, - 0x62, 0x9b, 0x4b, 0x81, 0xb6, 0x79, 0x06, 0x79, 0xdd, 0xfd, 0xbc, 0xeb, 0xce, 0x1c, 0x86, 0x7e, - 0x0a, 0x05, 0x7e, 0x91, 0x21, 0xb5, 0xbe, 0x20, 0x8a, 0xd4, 0x90, 0x5a, 0x19, 0x3d, 0xcf, 0x15, - 0x03, 0xeb, 0x0b, 0x82, 0x10, 0xc8, 0xbe, 0xfb, 0x39, 0x55, 0xd2, 0x42, 0x2f, 0x64, 0xf4, 0x09, - 0x6c, 0x5a, 0x8e, 0x49, 0x2e, 0x86, 0xc4, 0x61, 0xbe, 0x45, 0xa8, 0x92, 0x11, 0x1f, 0x4b, 0x42, - 0xd9, 0x0f, 0x74, 0x07, 0x72, 0x5e, 0xae, 0x6c, 0x34, 0xbf, 0xbe, 0x07, 0xe5, 0x8e, 0x88, 0xce, - 0x31, 0x76, 0xac, 0x33, 0x42, 0x19, 0xea, 0x00, 0x50, 0x86, 0x7d, 0x36, 0xe4, 0x37, 0x15, 0xe7, - 0x15, 0x77, 0x7e, 0xa6, 0xc6, 0x01, 0xe5, 0x2f, 0x51, 0x27, 0xb6, 0xa1, 0x9e, 0x44, 0x2f, 0xe9, - 0xc8, 0x6f, 0x2f, 0xeb, 0x29, 0xbd, 0x20, 0xdc, 0xb8, 0x16, 0xfd, 0x1e, 0xf2, 0xc4, 0x31, 0x03, - 0x84, 0xf4, 0xfa, 0x08, 0x39, 0xe2, 0x98, 0xc2, 0xff, 0x31, 0x6c, 0x50, 0x0f, 0x3b, 0xfc, 0xe6, - 0x99, 0x56, 0x71, 0xe7, 0x41, 0xc2, 0x39, 0xcc, 0x8d, 0x3a, 0xf0, 0xb0, 0x13, 0xba, 0x05, 0xb6, - 0xe8, 0x39, 0x6c, 0x9c, 0x59, 0x36, 0xa1, 0x8a, 0x2c, 0x9c, 0x7e, 0xad, 0x5e, 0x53, 0x04, 0xea, - 0xfb, 0x0f, 0x56, 0xf7, 0x2d, 0x9b, 0x44, 0x48, 0x02, 0x00, 0x69, 0x50, 0x34, 0x09, 0x35, 0x7c, - 0xcb, 0x63, 0xae, 0x4f, 0x95, 0x0d, 0x81, 0xf7, 0x28, 0x81, 0x47, 0xdf, 0xd8, 0x6a, 0x98, 0x6b, - 0xb5, 0xb7, 0xb2, 0x0c, 0x41, 0x92, 0xbe, 0x68, 0x17, 0x32, 0xa6, 0xe5, 0x2b, 0x39, 0x11, 0x84, - 0xe6, 0x0f, 0xbc, 0xa3, 0x7f, 0xc1, 0x88, 0xef, 0x60, 0x7b, 0xc0, 0x5c, 0x1f, 0x8f, 0xa3, 0x8b, - 0x70, 0x27, 0xf4, 0x29, 0x94, 0xcf, 0x5c, 0x7f, 0x8a, 0xd9, 0x70, 0x4e, 0x7c, 0x6a, 0xb9, 0x8e, - 0x92, 0x6f, 0x48, 0xad, 0x4d, 0x7d, 0x33, 0xd0, 0x9e, 0x06, 0x4a, 0x34, 0x06, 0x30, 0xec, 0x19, - 0x65, 0xc4, 0x1f, 0x5a, 0xa6, 0x52, 0x68, 0x48, 0xad, 0x52, 0xe7, 0x39, 0x47, 0xf9, 0xf7, 0x65, - 0xfd, 0xf1, 0xd8, 0x62, 0x93, 0xd9, 0x48, 0x35, 0xdc, 0x69, 0x7b, 0x75, 0xb6, 0x39, 0x8a, 0xe5, - 0xb6, 0x77, 0x3e, 0x6e, 0x8b, 0xe2, 0x9c, 0xcd, 0x2c, 0x53, 0x7d, 0xf5, 0x4a, 0xeb, 0x2d, 0x2f, - 0xeb, 0x85, 0x6e, 0x00, 0xa8, 0xf5, 0xf4, 0x42, 0x88, 0xad, 0x99, 0xe8, 0x35, 0xe4, 0x1c, 0xd7, - 0x24, 0xfc, 0x14, 0x68, 0x48, 0xad, 0x8d, 0xce, 0xde, 0xf2, 0xb2, 0x9e, 0x7d, 0xe1, 0x9a, 0x44, - 0xeb, 0x7d, 0xb3, 0xee, 0x59, 0xd1, 0xbb, 0x03, 0x37, 0x3d, 0xcb, 0x11, 0x35, 0x13, 0xed, 0x02, - 0x88, 0xd6, 0x1c, 0xf2, 0xd6, 0x54, 0x8a, 0x22, 0x5c, 0xf7, 0x12, 0xe1, 0x12, 0x1f, 0x55, 0xcd, - 0x39, 0x73, 0xa3, 0x6a, 0x13, 0x1a, 0xae, 0x40, 0x07, 0x50, 0xe2, 0x95, 0xbe, 0x18, 0x1a, 0xbc, - 0x5f, 0xa8, 0x52, 0x12, 0xde, 0x8f, 0xae, 0xcd, 0x7f, 0xd4, 0x59, 0x51, 0xbe, 0x84, 0xb3, 0xd0, - 0x50, 0x74, 0x02, 0xc5, 0xe9, 0xdc, 0x30, 0x86, 0x67, 0x96, 0xcd, 0x88, 0xaf, 0x6c, 0x36, 0xa4, - 0x56, 0x79, 0xe7, 0x93, 0x6b, 0xa1, 0x8e, 0x4f, 0xbb, 0xdd, 0x7d, 0x61, 0xda, 0x29, 0x2f, 0x2f, - 0xeb, 0x10, 0xff, 0xd6, 0x81, 0xe3, 0x04, 0x32, 0x7a, 0x0d, 0x25, 0xc3, 0x9d, 0x7a, 0x36, 0x61, - 0x64, 0x68, 0x8e, 0xa8, 0x52, 0x6e, 0x64, 0x5a, 0x9b, 0x9d, 0xa7, 0x6b, 0x07, 0x2d, 0xc1, 0x2d, - 0xaa, 0xd6, 0xd3, 0x8b, 0x11, 0x58, 0x6f, 0xc4, 0xcb, 0xbe, 0x62, 0x39, 0xcc, 0x77, 0xcd, 0x99, - 0x41, 0xcc, 0x61, 0xd0, 0x36, 0x5b, 0xeb, 0xb4, 0xcd, 0x56, 0xec, 0x36, 0x10, 0x0d, 0xe4, 0x02, - 0x8a, 0x4b, 0x77, 0x68, 0x4c, 0xb0, 0x33, 0x26, 0x54, 0xa9, 0x08, 0xac, 0xdd, 0x75, 0xbb, 0x29, - 0x6e, 0x07, 0x9d, 0xcc, 0x2d, 0x5e, 0xa0, 0xe1, 0x71, 0x77, 0x62, 0xec, 0x6e, 0x00, 0x8d, 0x06, - 0x70, 0xd7, 0x0f, 0x8d, 0x86, 0x09, 0xce, 0xb9, 0xb3, 0x3e, 0x63, 0xdc, 0x89, 0xfc, 0x07, 0x2b, - 0xee, 0xf9, 0x23, 0xa4, 0x2d, 0x53, 0x41, 0xa2, 0x0d, 0xf6, 0x6e, 0xd7, 0x06, 0x69, 0xad, 0xa7, - 0xa7, 0x2d, 0x13, 0xf5, 0xa0, 0xe6, 0x61, 0x9f, 0x59, 0x8c, 0x5f, 0x34, 0x11, 0x22, 0xce, 0x16, - 0x0e, 0x9e, 0x12, 0xaa, 0xdc, 0x6d, 0x64, 0x5a, 0x05, 0xfd, 0xe1, 0xca, 0x2a, 0x8e, 0xc2, 0x7e, - 0x64, 0x83, 0x76, 0xa0, 0x64, 0xbb, 0x06, 0xb6, 0x2d, 0xb6, 0x18, 0x9e, 0xcf, 0xa9, 0xb2, 0xcd, - 0x7d, 0x3a, 0x5b, 0xcb, 0xcb, 0x7a, 0xf1, 0x28, 0xd4, 0x1f, 0x9e, 0x52, 0xbd, 0x18, 0x19, 0x1d, - 0xce, 0x29, 0xfa, 0x13, 0xdc, 0x33, 0x89, 0xe7, 0x13, 0x03, 0x33, 0x9e, 0xdc, 0x68, 0xe6, 0x50, - 0xe5, 0x9e, 0xc8, 0x4a, 0xeb, 0xfb, 0x9c, 0xc4, 0x07, 0x94, 0x7a, 0xc2, 0x07, 0xd4, 0x20, 0xb2, - 0x7d, 0xc9, 0x07, 0x8a, 0xbe, 0x1d, 0xc3, 0xac, 0xbe, 0x50, 0xb4, 0x80, 0xbb, 0xc9, 0x8c, 0xbb, - 0x73, 0xc2, 0x39, 0x48, 0xb9, 0x2f, 0xba, 0xfb, 0xf9, 0x37, 0x97, 0xf5, 0xde, 0xfa, 0xe5, 0x49, - 0xa6, 0x6d, 0xe6, 0x93, 0x24, 0x1f, 0x76, 0x43, 0x3c, 0x3d, 0x51, 0x56, 0x91, 0x0e, 0xfd, 0x43, - 0x82, 0xed, 0xf8, 0x3d, 0x89, 0x50, 0x3e, 0x10, 0x2f, 0xfb, 0xc3, 0xba, 0xf5, 0x16, 0xbf, 0x66, - 0x15, 0x69, 0x3e, 0xe2, 0x16, 0x9d, 0xa7, 0x7f, 0xfe, 0xcf, 0xc7, 0x75, 0xd7, 0x5d, 0x7a, 0x15, - 0x12, 0xbd, 0x80, 0x1c, 0x23, 0x0e, 0xe6, 0xf4, 0xa2, 0x88, 0x0b, 0xaa, 0xeb, 0x5e, 0xf0, 0x44, - 0xb8, 0x45, 0x13, 0x2e, 0x04, 0xa9, 0xfe, 0x3f, 0x0d, 0x32, 0x47, 0x47, 0x9f, 0x81, 0xcc, 0x7b, - 0x36, 0x1c, 0xb4, 0x37, 0xb4, 0xac, 0x30, 0xe5, 0x33, 0xdf, 0xc3, 0x6c, 0x22, 0x26, 0x6b, 0x41, - 0x17, 0x32, 0xba, 0x0f, 0x59, 0x3a, 0xc1, 0x4f, 0x3e, 0xdb, 0x51, 0x64, 0x5e, 0xf9, 0x7a, 0xf8, - 0xeb, 0x0a, 0x37, 0x66, 0x6f, 0xc1, 0x8d, 0xef, 0x6f, 0x06, 0xb9, 0x5b, 0x6f, 0x06, 0xf9, 0x8f, - 0xd8, 0x0c, 0xda, 0x50, 0x4c, 0x34, 0x91, 0x98, 0x76, 0x85, 0x80, 0x7a, 0xe3, 0x1e, 0xd2, 0x21, - 0x6e, 0xa1, 0x03, 0x39, 0x9f, 0xa9, 0xc8, 0x07, 0x72, 0x7e, 0xa3, 0x92, 0xad, 0xfe, 0x53, 0x02, - 0x74, 0x95, 0x9f, 0xd0, 0x53, 0x90, 0x3f, 0x74, 0xd7, 0x11, 0x0e, 0xe8, 0x19, 0xa4, 0xb5, 0x9e, - 0x48, 0xc3, 0x2d, 0xc8, 0x3c, 0xad, 0xf5, 0xd0, 0x13, 0x90, 0x79, 0x8b, 0x88, 0x45, 0x6d, 0x9d, - 0x4d, 0x43, 0x17, 0xe6, 0xd5, 0xbf, 0x49, 0x90, 0x7f, 0xe9, 0xbb, 0x63, 0x9f, 0xd0, 0xc4, 0xfa, - 0x23, 0xdd, 0x7e, 0xfd, 0x29, 0xfb, 0x64, 0x9e, 0x64, 0xe4, 0x0f, 0xd8, 0xe1, 0x4a, 0x3e, 0x99, - 0xaf, 0xc8, 0xb8, 0xfa, 0x1b, 0xc8, 0x06, 0xf5, 0x8f, 0xee, 0x0b, 0x5a, 0xe6, 0x21, 0x96, 0x3b, - 0xd9, 0x04, 0xb7, 0x22, 0x90, 0xc5, 0xc8, 0x4f, 0x8b, 0xb2, 0x15, 0x72, 0x75, 0x1f, 0x94, 0xeb, - 0xda, 0x1a, 0x55, 0x20, 0x73, 0x4e, 0x16, 0x02, 0x68, 0x53, 0xe7, 0x22, 0xda, 0x86, 0x8d, 0x39, - 0xb6, 0x67, 0x24, 0xec, 0x87, 0xe0, 0xc7, 0x6e, 0xfa, 0x77, 0xd2, 0x81, 0x9c, 0xcf, 0x56, 0x72, - 0xcd, 0x6f, 0x25, 0xf8, 0x49, 0xf0, 0xe6, 0x97, 0x57, 0xe9, 0xf9, 0xfb, 0x05, 0x25, 0xdd, 0x54, - 0x50, 0x71, 0x9c, 0xd3, 0xb7, 0x8d, 0xb3, 0x09, 0x85, 0xc0, 0x9a, 0x6f, 0x54, 0x19, 0x31, 0xb0, - 0x9e, 0xdd, 0x6e, 0x60, 0xe5, 0x83, 0x33, 0xb5, 0x9e, 0x9e, 0x0f, 0x90, 0x35, 0xb3, 0xf9, 0x6d, - 0x1a, 0xca, 0x7d, 0xc7, 0xf0, 0x17, 0x1e, 0x7f, 0xb9, 0x58, 0x98, 0xf6, 0x21, 0x4b, 0x8d, 0x09, - 0x09, 0x4b, 0xbe, 0xfc, 0x23, 0x5c, 0xf6, 0xbe, 0xa3, 0x3a, 0x10, 0x5e, 0x7a, 0xe8, 0xcd, 0x73, - 0x47, 0xb1, 0xcd, 0xa2, 0xdc, 0x71, 0x19, 0xfd, 0x55, 0x82, 0x06, 0x09, 0xbc, 0x88, 0xd9, 0xc3, - 0x0c, 0x1f, 0x92, 0x45, 0x67, 0x71, 0x78, 0x3c, 0x38, 0xc6, 0x7c, 0x8d, 0x3c, 0x24, 0x0b, 0xad, - 0x17, 0xae, 0xf5, 0xc7, 0xeb, 0x1e, 0xdb, 0xbf, 0x01, 0x4f, 0x54, 0x86, 0x7e, 0xe3, 0xb1, 0xd5, - 0x01, 0x7c, 0xba, 0x16, 0x54, 0xb2, 0xc8, 0x0a, 0x3f, 0x50, 0x64, 0xa5, 0x44, 0x91, 0x35, 0x1f, - 0x40, 0x36, 0x08, 0x0b, 0xda, 0x84, 0xc2, 0x5e, 0x7f, 0xb0, 0xf3, 0xe4, 0xb7, 0xcf, 0xba, 0xc7, - 0x95, 0xd4, 0xae, 0xfc, 0xbf, 0xbf, 0xd7, 0xa5, 0xe6, 0x29, 0x00, 0xaf, 0x65, 0x2a, 0x86, 0x32, - 0x7a, 0x2e, 0x28, 0x34, 0x1a, 0xe2, 0xd2, 0x07, 0x0e, 0xf1, 0x84, 0x6f, 0xf3, 0x5f, 0x12, 0x3c, - 0xe4, 0xe7, 0x9a, 0x33, 0x9b, 0x98, 0x41, 0xda, 0xfb, 0x17, 0xc4, 0x98, 0xf1, 0xa0, 0xed, 0xf9, - 0x63, 0x8a, 0x30, 0x14, 0xc3, 0xea, 0x62, 0x0b, 0x2f, 0xca, 0xf4, 0xf5, 0x63, 0xf5, 0xc7, 0xb0, - 0xc2, 0x52, 0x3e, 0x59, 0x78, 0x44, 0x87, 0xd1, 0x4a, 0x46, 0xbf, 0x80, 0x4a, 0x78, 0x04, 0xbf, - 0x18, 0x99, 0x12, 0x87, 0x85, 0x4d, 0xb8, 0x15, 0xe8, 0x07, 0x91, 0xba, 0xf9, 0x73, 0x80, 0x18, - 0x04, 0xe5, 0x41, 0xde, 0x7f, 0x75, 0x74, 0x54, 0x49, 0xa1, 0x2d, 0x28, 0x6a, 0x2f, 0xba, 0x7a, - 0xff, 0xb8, 0xff, 0xe2, 0x64, 0xef, 0xa8, 0x22, 0x35, 0xe7, 0xb0, 0xa5, 0x13, 0xca, 0x5c, 0x9f, - 0xac, 0x98, 0x6d, 0x0f, 0x72, 0x74, 0x36, 0x9d, 0x62, 0x7f, 0x11, 0x52, 0xf4, 0xda, 0xe3, 0x2b, - 0xf2, 0x43, 0x0d, 0x28, 0x7a, 0x21, 0x9c, 0x66, 0x5e, 0x84, 0xff, 0x96, 0x93, 0xaa, 0x5f, 0x3e, - 0x82, 0xc4, 0xf2, 0x8e, 0x00, 0xb2, 0x47, 0x98, 0x11, 0xca, 0x2a, 0x29, 0x94, 0x83, 0xcc, 0x9e, - 0x6d, 0x57, 0xa4, 0xce, 0xaf, 0xde, 0x7e, 0x5d, 0x4b, 0xbd, 0x5d, 0xd6, 0xa4, 0x2f, 0x97, 0x35, - 0xe9, 0xab, 0x65, 0x4d, 0xfa, 0xef, 0xb2, 0x26, 0xfd, 0xe5, 0x5d, 0x2d, 0xf5, 0xe5, 0xbb, 0x5a, - 0xea, 0xab, 0x77, 0xb5, 0xd4, 0xeb, 0xc2, 0xea, 0x26, 0xa3, 0xac, 0xf8, 0x23, 0xff, 0xf8, 0xbb, - 0x00, 0x00, 0x00, 0xff, 0xff, 0xf8, 0xa5, 0x3b, 0xd6, 0xa7, 0x10, 0x00, 0x00, + 0xc7, 0xfe, 0x05, 0xbd, 0xf6, 0xd4, 0xfe, 0x01, 0xbd, 0xf4, 0xbf, 0xc8, 0xa5, 0xc0, 0x5e, 0x0a, + 0x2c, 0x7a, 0x70, 0xbb, 0xca, 0xa5, 0xe7, 0x1e, 0x17, 0x7b, 0x58, 0xcc, 0x90, 0x14, 0x99, 0xf5, + 0x7a, 0xad, 0xac, 0x6f, 0x4f, 0x8f, 0xef, 0xfd, 0x66, 0xe6, 0x7d, 0xfc, 0xde, 0xb3, 0xa1, 0x6a, + 0x18, 0x76, 0x7b, 0x84, 0x8d, 0xf3, 0x99, 0x17, 0x4b, 0xaa, 0xe7, 0xbb, 0xcc, 0x45, 0x0f, 0x0c, + 0xd7, 0x38, 0xf7, 0x5d, 0x6c, 0x4c, 0x54, 0xc3, 0xb0, 0xd5, 0x95, 0x55, 0xb5, 0x32, 0x9a, 0x59, + 0xb6, 0xd9, 0xb6, 0x9c, 0x33, 0x37, 0x30, 0xad, 0xde, 0x11, 0x66, 0xde, 0xa8, 0x8d, 0x3d, 0x2b, + 0x54, 0xa1, 0x48, 0x65, 0x62, 0x86, 0x43, 0x5d, 0x9d, 0xbe, 0xb1, 0xdb, 0x94, 0x61, 0x46, 0xdb, + 0x0c, 0x8f, 0x6c, 0x32, 0xe4, 0xb2, 0x45, 0x99, 0x65, 0x84, 0x06, 0x0f, 0x85, 0xc1, 0x1b, 0x7b, + 0x84, 0x29, 0x69, 0x53, 0xe6, 0xcf, 0x0c, 0x36, 0xf3, 0x89, 0x19, 0x7e, 0x55, 0x66, 0xcc, 0xb2, + 0xdb, 0x13, 0xdb, 0x68, 0x33, 0x6b, 0x4a, 0x28, 0xc3, 0xd3, 0xf0, 0xaa, 0xd5, 0xed, 0xb1, 0x3b, + 0x76, 0x85, 0xd8, 0xe6, 0x52, 0xa0, 0x6d, 0x9e, 0x41, 0x5e, 0x77, 0x3f, 0xed, 0xba, 0x33, 0x87, + 0xa1, 0x1f, 0x43, 0x81, 0x5f, 0x64, 0x48, 0xad, 0xcf, 0x88, 0x22, 0x35, 0xa4, 0x56, 0x46, 0xcf, + 0x73, 0xc5, 0xc0, 0xfa, 0x8c, 0x20, 0x04, 0xb2, 0xef, 0x7e, 0x4a, 0x95, 0xb4, 0xd0, 0x0b, 0x19, + 0x7d, 0x04, 0x9b, 0x96, 0x63, 0x92, 0x8b, 0x21, 0x71, 0x98, 0x6f, 0x11, 0xaa, 0x64, 0xc4, 0xc7, + 0x92, 0x50, 0xf6, 0x03, 0xdd, 0x81, 0x9c, 0x97, 0x2b, 0x1b, 0xcd, 0x2f, 0xef, 0x41, 0xb9, 0x23, + 0xa2, 0x73, 0x8c, 0x1d, 0xeb, 0x8c, 0x50, 0x86, 0x3a, 0x00, 0x94, 0x61, 0x9f, 0x0d, 0xf9, 0x4d, + 0xc5, 0x79, 0xc5, 0x9d, 0x9f, 0xa8, 0x71, 0x40, 0xf9, 0x4b, 0xd4, 0x89, 0x6d, 0xa8, 0x27, 0xd1, + 0x4b, 0x3a, 0xf2, 0xdb, 0xcb, 0x7a, 0x4a, 0x2f, 0x08, 0x37, 0xae, 0x45, 0xbf, 0x83, 0x3c, 0x71, + 0xcc, 0x00, 0x21, 0xbd, 0x3e, 0x42, 0x8e, 0x38, 0xa6, 0xf0, 0x7f, 0x0c, 0x1b, 0xd4, 0xc3, 0x0e, + 0xbf, 0x79, 0xa6, 0x55, 0xdc, 0x79, 0x90, 0x70, 0x0e, 0x73, 0xa3, 0x0e, 0x3c, 0xec, 0x84, 0x6e, + 0x81, 0x2d, 0x7a, 0x0e, 0x1b, 0x67, 0x96, 0x4d, 0xa8, 0x22, 0x0b, 0xa7, 0x5f, 0xaa, 0xd7, 0x14, + 0x81, 0xfa, 0xfe, 0x83, 0xd5, 0x7d, 0xcb, 0x26, 0x11, 0x92, 0x00, 0x40, 0x1a, 0x14, 0x4d, 0x42, + 0x0d, 0xdf, 0xf2, 0x98, 0xeb, 0x53, 0x65, 0x43, 0xe0, 0x3d, 0x4a, 0xe0, 0xd1, 0x37, 0xb6, 0x1a, + 0xe6, 0x5a, 0xed, 0xad, 0x2c, 0x43, 0x90, 0xa4, 0x2f, 0xda, 0x85, 0x8c, 0x69, 0xf9, 0x4a, 0x4e, + 0x04, 0xa1, 0xf9, 0x1d, 0xef, 0xe8, 0x5f, 0x30, 0xe2, 0x3b, 0xd8, 0x1e, 0x30, 0xd7, 0xc7, 0xe3, + 0xe8, 0x22, 0xdc, 0x09, 0x7d, 0x0c, 0xe5, 0x33, 0xd7, 0x9f, 0x62, 0x36, 0x9c, 0x13, 0x9f, 0x5a, + 0xae, 0xa3, 0xe4, 0x1b, 0x52, 0x6b, 0x53, 0xdf, 0x0c, 0xb4, 0xa7, 0x81, 0x12, 0x8d, 0x01, 0x0c, + 0x7b, 0x46, 0x19, 0xf1, 0x87, 0x96, 0xa9, 0x14, 0x1a, 0x52, 0xab, 0xd4, 0x79, 0xce, 0x51, 0xfe, + 0x7d, 0x59, 0x7f, 0x3c, 0xb6, 0xd8, 0x64, 0x36, 0x52, 0x0d, 0x77, 0xda, 0x5e, 0x9d, 0x6d, 0x8e, + 0x62, 0xb9, 0xed, 0x9d, 0x8f, 0xdb, 0xa2, 0x38, 0x67, 0x33, 0xcb, 0x54, 0x5f, 0xbd, 0xd2, 0x7a, + 0xcb, 0xcb, 0x7a, 0xa1, 0x1b, 0x00, 0x6a, 0x3d, 0xbd, 0x10, 0x62, 0x6b, 0x26, 0x7a, 0x0d, 0x39, + 0xc7, 0x35, 0x09, 0x3f, 0x05, 0x1a, 0x52, 0x6b, 0xa3, 0xb3, 0xb7, 0xbc, 0xac, 0x67, 0x5f, 0xb8, + 0x26, 0xd1, 0x7a, 0x5f, 0xad, 0x7b, 0x56, 0xf4, 0xee, 0xc0, 0x4d, 0xcf, 0x72, 0x44, 0xcd, 0x44, + 0xbb, 0x00, 0xa2, 0x35, 0x87, 0xbc, 0x35, 0x95, 0xa2, 0x08, 0xd7, 0xbd, 0x44, 0xb8, 0xc4, 0x47, + 0x55, 0x73, 0xce, 0xdc, 0xa8, 0xda, 0x84, 0x86, 0x2b, 0xd0, 0x01, 0x94, 0x78, 0xa5, 0x2f, 0x86, + 0x06, 0xef, 0x17, 0xaa, 0x94, 0x84, 0xf7, 0xa3, 0x6b, 0xf3, 0x1f, 0x75, 0x56, 0x94, 0x2f, 0xe1, + 0x2c, 0x34, 0x14, 0x9d, 0x40, 0x71, 0x3a, 0x37, 0x8c, 0xe1, 0x99, 0x65, 0x33, 0xe2, 0x2b, 0x9b, + 0x0d, 0xa9, 0x55, 0xde, 0xf9, 0xe8, 0x5a, 0xa8, 0xe3, 0xd3, 0x6e, 0x77, 0x5f, 0x98, 0x76, 0xca, + 0xcb, 0xcb, 0x3a, 0xc4, 0xbf, 0x75, 0xe0, 0x38, 0x81, 0x8c, 0x5e, 0x43, 0xc9, 0x70, 0xa7, 0x9e, + 0x4d, 0x18, 0x19, 0x9a, 0x23, 0xaa, 0x94, 0x1b, 0x99, 0xd6, 0x66, 0xe7, 0xe9, 0xda, 0x41, 0x4b, + 0x70, 0x8b, 0xaa, 0xf5, 0xf4, 0x62, 0x04, 0xd6, 0x1b, 0xf1, 0xb2, 0xaf, 0x58, 0x0e, 0xf3, 0x5d, + 0x73, 0x66, 0x10, 0x73, 0x18, 0xb4, 0xcd, 0xd6, 0x3a, 0x6d, 0xb3, 0x15, 0xbb, 0x0d, 0x44, 0x03, + 0xb9, 0x80, 0xe2, 0xd2, 0x1d, 0x1a, 0x13, 0xec, 0x8c, 0x09, 0x55, 0x2a, 0x02, 0x6b, 0x77, 0xdd, + 0x6e, 0x8a, 0xdb, 0x41, 0x27, 0x73, 0x8b, 0x17, 0x68, 0x78, 0xdc, 0x9d, 0x18, 0xbb, 0x1b, 0x40, + 0xa3, 0x01, 0xdc, 0xf5, 0x43, 0xa3, 0x61, 0x82, 0x73, 0xee, 0xac, 0xcf, 0x18, 0x77, 0x22, 0xff, + 0xc1, 0x8a, 0x7b, 0xfe, 0x00, 0x69, 0xcb, 0x54, 0x90, 0x68, 0x83, 0xbd, 0xdb, 0xb5, 0x41, 0x5a, + 0xeb, 0xe9, 0x69, 0xcb, 0x44, 0x3d, 0xa8, 0x79, 0xd8, 0x67, 0x16, 0xe3, 0x17, 0x4d, 0x84, 0x88, + 0xb3, 0x85, 0x83, 0xa7, 0x84, 0x2a, 0x77, 0x1b, 0x99, 0x56, 0x41, 0x7f, 0xb8, 0xb2, 0x8a, 0xa3, + 0xb0, 0x1f, 0xd9, 0xa0, 0x1d, 0x28, 0xd9, 0xae, 0x81, 0x6d, 0x8b, 0x2d, 0x86, 0xe7, 0x73, 0xaa, + 0x6c, 0x73, 0x9f, 0xce, 0xd6, 0xf2, 0xb2, 0x5e, 0x3c, 0x0a, 0xf5, 0x87, 0xa7, 0x54, 0x2f, 0x46, + 0x46, 0x87, 0x73, 0x8a, 0xfe, 0x08, 0xf7, 0x4c, 0xe2, 0xf9, 0xc4, 0xc0, 0x8c, 0x27, 0x37, 0x9a, + 0x39, 0x54, 0xb9, 0x27, 0xb2, 0xd2, 0xfa, 0x36, 0x27, 0xf1, 0x01, 0xa5, 0x9e, 0xf0, 0x01, 0x35, + 0x88, 0x6c, 0x5f, 0xf2, 0x81, 0xa2, 0x6f, 0xc7, 0x30, 0xab, 0x2f, 0x14, 0x2d, 0xe0, 0x6e, 0x32, + 0xe3, 0xee, 0x9c, 0x70, 0x0e, 0x52, 0xee, 0x8b, 0xee, 0x7e, 0xfe, 0xd5, 0x65, 0xbd, 0xb7, 0x7e, + 0x79, 0x92, 0x69, 0x9b, 0xf9, 0x24, 0xc9, 0x87, 0xdd, 0x10, 0x4f, 0x4f, 0x94, 0x55, 0xa4, 0x43, + 0x7f, 0x97, 0x60, 0x3b, 0x7e, 0x4f, 0x22, 0x94, 0x0f, 0xc4, 0xcb, 0x7e, 0xbf, 0x6e, 0xbd, 0xc5, + 0xaf, 0x59, 0x45, 0x9a, 0x8f, 0xb8, 0x45, 0xe7, 0xe9, 0x9f, 0xfe, 0xf3, 0xc3, 0xba, 0xeb, 0x2e, + 0xbd, 0x0a, 0x89, 0x5e, 0x40, 0x8e, 0x11, 0x07, 0x73, 0x7a, 0x51, 0xc4, 0x05, 0xd5, 0x75, 0x2f, + 0x78, 0x22, 0xdc, 0xa2, 0x09, 0x17, 0x82, 0x54, 0xff, 0x9f, 0x06, 0x99, 0xa3, 0xa3, 0x4f, 0x40, + 0xe6, 0x3d, 0x1b, 0x0e, 0xda, 0x1b, 0x5a, 0x56, 0x98, 0xf2, 0x99, 0xef, 0x61, 0x36, 0x11, 0x93, + 0xb5, 0xa0, 0x0b, 0x19, 0xdd, 0x87, 0x2c, 0x9d, 0xe0, 0x27, 0x9f, 0xec, 0x28, 0x32, 0xaf, 0x7c, + 0x3d, 0xfc, 0x75, 0x85, 0x1b, 0xb3, 0xb7, 0xe0, 0xc6, 0xf7, 0x37, 0x83, 0xdc, 0xad, 0x37, 0x83, + 0xfc, 0x0f, 0xd8, 0x0c, 0xda, 0x50, 0x4c, 0x34, 0x91, 0x98, 0x76, 0x85, 0x80, 0x7a, 0xe3, 0x1e, + 0xd2, 0x21, 0x6e, 0xa1, 0x03, 0x39, 0x9f, 0xa9, 0xc8, 0x07, 0x72, 0x7e, 0xa3, 0x92, 0xad, 0xfe, + 0x53, 0x02, 0x74, 0x95, 0x9f, 0xd0, 0x53, 0x90, 0x3f, 0x74, 0xd7, 0x11, 0x0e, 0xe8, 0x19, 0xa4, + 0xb5, 0x9e, 0x48, 0xc3, 0x2d, 0xc8, 0x3c, 0xad, 0xf5, 0xd0, 0x13, 0x90, 0x79, 0x8b, 0x88, 0x45, + 0x6d, 0x9d, 0x4d, 0x43, 0x17, 0xe6, 0xd5, 0xbf, 0x4a, 0x90, 0x7f, 0xe9, 0xbb, 0x63, 0x9f, 0xd0, + 0xc4, 0xfa, 0x23, 0xdd, 0x7e, 0xfd, 0x29, 0xfb, 0x64, 0x9e, 0x64, 0xe4, 0x0f, 0xd8, 0xe1, 0x4a, + 0x3e, 0x99, 0xaf, 0xc8, 0xb8, 0xfa, 0x2b, 0xc8, 0x06, 0xf5, 0x8f, 0xee, 0x0b, 0x5a, 0xe6, 0x21, + 0x96, 0x3b, 0xd9, 0x04, 0xb7, 0x22, 0x90, 0xc5, 0xc8, 0x4f, 0x8b, 0xb2, 0x15, 0x72, 0x75, 0x1f, + 0x94, 0xeb, 0xda, 0x1a, 0x55, 0x20, 0x73, 0x4e, 0x16, 0x02, 0x68, 0x53, 0xe7, 0x22, 0xda, 0x86, + 0x8d, 0x39, 0xb6, 0x67, 0x24, 0xec, 0x87, 0xe0, 0xc7, 0x6e, 0xfa, 0x37, 0xd2, 0x81, 0x9c, 0xcf, + 0x56, 0x72, 0xcd, 0xaf, 0x25, 0xf8, 0x51, 0xf0, 0xe6, 0x97, 0x57, 0xe9, 0xf9, 0xdb, 0x05, 0x25, + 0xdd, 0x54, 0x50, 0x71, 0x9c, 0xd3, 0xb7, 0x8d, 0xb3, 0x09, 0x85, 0xc0, 0x9a, 0x6f, 0x54, 0x19, + 0x31, 0xb0, 0x9e, 0xdd, 0x6e, 0x60, 0xe5, 0x83, 0x33, 0xb5, 0x9e, 0x9e, 0x0f, 0x90, 0x35, 0xb3, + 0xf9, 0x75, 0x1a, 0xca, 0x7d, 0xc7, 0xf0, 0x17, 0x1e, 0x7f, 0xb9, 0x58, 0x98, 0xf6, 0x21, 0x4b, + 0x8d, 0x09, 0x09, 0x4b, 0xbe, 0xfc, 0x3d, 0x5c, 0xf6, 0xbe, 0xa3, 0x3a, 0x10, 0x5e, 0x7a, 0xe8, + 0xcd, 0x73, 0x47, 0xb1, 0xcd, 0xa2, 0xdc, 0x71, 0x19, 0xfd, 0x45, 0x82, 0x06, 0x09, 0xbc, 0x88, + 0xd9, 0xc3, 0x0c, 0x1f, 0x92, 0x45, 0x67, 0x71, 0x78, 0x3c, 0x38, 0xc6, 0x7c, 0x8d, 0x3c, 0x24, + 0x0b, 0xad, 0x17, 0xae, 0xf5, 0xc7, 0xeb, 0x1e, 0xdb, 0xbf, 0x01, 0x4f, 0x54, 0x86, 0x7e, 0xe3, + 0xb1, 0xd5, 0x01, 0x7c, 0xbc, 0x16, 0x54, 0xb2, 0xc8, 0x0a, 0xdf, 0x51, 0x64, 0xa5, 0x44, 0x91, + 0x35, 0x1f, 0x40, 0x36, 0x08, 0x0b, 0xda, 0x84, 0xc2, 0x5e, 0x7f, 0xb0, 0xf3, 0xe4, 0xd7, 0xcf, + 0xba, 0xc7, 0x95, 0xd4, 0xae, 0xfc, 0xbf, 0xbf, 0xd5, 0xa5, 0xe6, 0x29, 0x00, 0xaf, 0x65, 0x2a, + 0x86, 0x32, 0x7a, 0x2e, 0x28, 0x34, 0x1a, 0xe2, 0xd2, 0x07, 0x0e, 0xf1, 0x84, 0x6f, 0xf3, 0x5f, + 0x12, 0x3c, 0xe4, 0xe7, 0x9a, 0x33, 0x9b, 0x98, 0x41, 0xda, 0xfb, 0x17, 0xc4, 0x98, 0xf1, 0xa0, + 0xed, 0xf9, 0x63, 0x8a, 0x30, 0x14, 0xc3, 0xea, 0x62, 0x0b, 0x2f, 0xca, 0xf4, 0xf5, 0x63, 0xf5, + 0xfb, 0xb0, 0xc2, 0x52, 0x3e, 0x59, 0x78, 0x44, 0x87, 0xd1, 0x4a, 0x46, 0x3f, 0x83, 0x4a, 0x78, + 0x04, 0xbf, 0x18, 0x99, 0x12, 0x87, 0x85, 0x4d, 0xb8, 0x15, 0xe8, 0x07, 0x91, 0xba, 0xf9, 0x53, + 0x80, 0x18, 0x04, 0xe5, 0x41, 0xde, 0x7f, 0x75, 0x74, 0x54, 0x49, 0xa1, 0x2d, 0x28, 0x6a, 0x2f, + 0xba, 0x7a, 0xff, 0xb8, 0xff, 0xe2, 0x64, 0xef, 0xa8, 0x22, 0x35, 0xff, 0x21, 0xc1, 0x96, 0x4e, + 0x28, 0x73, 0x7d, 0xb2, 0xa2, 0xb6, 0x3d, 0xc8, 0xd1, 0xd9, 0x74, 0x8a, 0xfd, 0x45, 0xc8, 0xd1, + 0x6b, 0xcf, 0xaf, 0xc8, 0x0f, 0x35, 0xa0, 0xe8, 0x85, 0x70, 0x9a, 0x79, 0x11, 0xfe, 0xb9, 0x9c, + 0x54, 0xa1, 0xdf, 0x42, 0xf0, 0x57, 0x35, 0x1f, 0xc6, 0x99, 0x75, 0x86, 0xf1, 0xca, 0xfc, 0xe7, + 0x8f, 0x20, 0xb1, 0xf8, 0x23, 0x80, 0xec, 0x11, 0x66, 0x84, 0xb2, 0x4a, 0x0a, 0xe5, 0x20, 0xb3, + 0x67, 0xdb, 0x15, 0xa9, 0xf3, 0x8b, 0xb7, 0x5f, 0xd6, 0x52, 0x6f, 0x97, 0x35, 0xe9, 0xf3, 0x65, + 0x4d, 0xfa, 0x62, 0x59, 0x93, 0xfe, 0xbb, 0xac, 0x49, 0x7f, 0x7e, 0x57, 0x4b, 0x7d, 0xfe, 0xae, + 0x96, 0xfa, 0xe2, 0x5d, 0x2d, 0xf5, 0xba, 0xb0, 0x7a, 0xc4, 0x28, 0x2b, 0xfe, 0x09, 0xf0, 0xf8, + 0x9b, 0x00, 0x00, 0x00, 0xff, 0xff, 0xfd, 0x24, 0xcd, 0x74, 0xe3, 0x10, 0x00, 0x00, } diff --git a/pkg/ccl/backupccl/backup.proto b/pkg/ccl/backupccl/backup.proto index 02022d2e5aa1..a3a5ccaf744e 100644 --- a/pkg/ccl/backupccl/backup.proto +++ b/pkg/ccl/backupccl/backup.proto @@ -180,4 +180,5 @@ message ScheduledBackupExecutionArgs { message RestoreProgress { RowCount summary = 1 [(gogoproto.nullable) = false]; int64 progressIdx = 2; + roachpb.Span dataSpan = 3 [(gogoproto.nullable) = false]; } diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index df2b63d2164b..a6561480ca8a 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -159,6 +159,7 @@ func runRestoreData( progDetails := RestoreProgress{} progDetails.Summary = countRows(importRes.(*roachpb.ImportResponse).Imported, spec.PKIDs) progDetails.ProgressIdx = entry.ProgressIdx + progDetails.DataSpan = entry.Span details, err := gogotypes.MarshalAny(&progDetails) if err != nil { return err diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 10ddde23b4c4..71e4a8a4a39d 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -13,9 +13,6 @@ import ( "context" "fmt" "math" - "runtime" - "sync/atomic" - "time" "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -28,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv" "github.com/cockroachdb/cockroach/pkg/sql/covering" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -44,7 +42,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" - "github.com/opentracing/opentracing-go" + "github.com/gogo/protobuf/types" ) type intervalSpan roachpb.Span @@ -66,7 +64,6 @@ const ( backupFile tableSpan completedSpan - request ) type importEntry struct { @@ -79,13 +76,6 @@ type importEntry struct { // Only set if entryType is backupFile dir roachpb.ExternalStorage file BackupManifest_File - - // Only set if entryType is request - files []roachpb.ImportRequest_File - - // for progress tracking we assign the spans numbers as they can be executed - // out-of-order based on splitAndScatter's scheduling. - progressIdx int } // makeImportSpans pivots the backups, which are grouped by time, into @@ -124,7 +114,7 @@ func makeImportSpans( lowWaterMark roachpb.Key, user string, onMissing func(span covering.Range, start, end hlc.Timestamp) error, -) ([]importEntry, hlc.Timestamp, error) { +) ([]execinfrapb.RestoreSpanEntry, hlc.Timestamp, error) { // Put the covering for the already-completed spans into the // OverlapCoveringMerge input first. Payloads are returned in the same order // that they appear in the input; putting the completedSpan first means we'll @@ -225,7 +215,7 @@ func makeImportSpans( importRanges := covering.OverlapCoveringMerge(backupCoverings) // Translate the output of OverlapCoveringMerge into requests. - var requestEntries []importEntry + var requestEntries []execinfrapb.RestoreSpanEntry rangeLoop: for _, importRange := range importRanges { needed := false @@ -265,165 +255,15 @@ rangeLoop: } // If needed is false, we have data backed up that is not necessary // for this restore. Skip it. - requestEntries = append(requestEntries, importEntry{ - Span: roachpb.Span{Key: importRange.Start, EndKey: importRange.End}, - entryType: request, - files: files, + requestEntries = append(requestEntries, execinfrapb.RestoreSpanEntry{ + Span: roachpb.Span{Key: importRange.Start, EndKey: importRange.End}, + Files: files, }) } } return requestEntries, maxEndTime, nil } -// splitAndScatter creates new ranges for importSpans and scatters replicas and -// leaseholders to be as evenly balanced as possible. It does this with some -// amount of parallelism but also staying as close to the order in importSpans -// as possible (the more out of order, the more work is done if a RESTORE job -// loses its lease and has to be restarted). -// -// At a high level, this is accomplished by splitting and scattering large -// "chunks" from the front of importEntries in one goroutine, each of which are -// in turn passed to one of many worker goroutines that split and scatter the -// individual entries. -// -// importEntries are sent to readyForImportCh as they are scattered, so letting -// that channel send block can be used for backpressure on the splits and -// scatters. -// -// TODO(dan): This logic is largely tuned by running BenchmarkRestore2TB. See if -// there's some way to test it without running an O(hour) long benchmark. -func splitAndScatter( - restoreCtx context.Context, - settings *cluster.Settings, - db *kv.DB, - kr *storageccl.KeyRewriter, - numClusterNodes int, - importSpans []importEntry, - readyForImportCh chan<- importEntry, -) error { - var span opentracing.Span - ctx, span := tracing.ChildSpan(restoreCtx, "presplit-scatter") - defer tracing.FinishSpan(span) - - g := ctxgroup.WithContext(ctx) - - // TODO(dan): This not super principled. I just wanted something that wasn't - // a constant and grew slower than linear with the length of importSpans. It - // seems to be working well for BenchmarkRestore2TB but worth revisiting. - chunkSize := int(math.Sqrt(float64(len(importSpans)))) - importSpanChunks := make([][]importEntry, 0, len(importSpans)/chunkSize) - for start := 0; start < len(importSpans); { - importSpanChunk := importSpans[start:] - end := start + chunkSize - if end < len(importSpans) { - importSpanChunk = importSpans[start:end] - } - importSpanChunks = append(importSpanChunks, importSpanChunk) - start = end - } - - importSpanChunksCh := make(chan []importEntry) - expirationTime := db.Clock().Now().Add(time.Hour.Nanoseconds(), 0) - g.GoCtx(func(ctx context.Context) error { - defer close(importSpanChunksCh) - for idx, importSpanChunk := range importSpanChunks { - // TODO(dan): The structure between this and the below are very - // similar. Dedup. - chunkKey, err := rewriteBackupSpanKey(kr, importSpanChunk[0].Key) - if err != nil { - return err - } - - // TODO(dan): Really, this should be splitting the Key of the first - // entry in the _next_ chunk. - log.VEventf(restoreCtx, 1, "presplitting chunk %d of %d", idx, len(importSpanChunks)) - if err := db.AdminSplit(ctx, chunkKey, expirationTime); err != nil { - return err - } - - log.VEventf(restoreCtx, 1, "scattering chunk %d of %d", idx, len(importSpanChunks)) - scatterReq := &roachpb.AdminScatterRequest{ - RequestHeader: roachpb.RequestHeaderFromSpan(roachpb.Span{ - Key: chunkKey, - EndKey: chunkKey.Next(), - }), - // TODO(dan): This is a bit of a hack, but it seems to be an effective - // one (see the PR that added it for graphs). As of the commit that - // added this, scatter is not very good at actually balancing leases. - // This is likely for two reasons: 1) there's almost certainly some - // regression in scatter's behavior, it used to work much better and 2) - // scatter has to operate by balancing leases for all ranges in a - // cluster, but in RESTORE, we really just want it to be balancing the - // span being restored into. - RandomizeLeases: true, - } - if _, pErr := kv.SendWrapped(ctx, db.NonTransactionalSender(), scatterReq); pErr != nil { - // TODO(dan): Unfortunately, Scatter is still too unreliable to - // fail the RESTORE when Scatter fails. I'm uncomfortable that - // this could break entirely and not start failing the tests, - // but on the bright side, it doesn't affect correctness, only - // throughput. - log.Errorf(ctx, "failed to scatter chunk %d: %s", idx, pErr.GoError()) - } - - select { - case <-ctx.Done(): - return ctx.Err() - case importSpanChunksCh <- importSpanChunk: - } - } - return nil - }) - - // TODO(dan): This tries to cover for a bad scatter by having 2 * the number - // of nodes in the cluster. Is it necessary? - splitScatterWorkers := numClusterNodes * 2 - var splitScatterStarted uint64 // Only access using atomic. - for worker := 0; worker < splitScatterWorkers; worker++ { - g.GoCtx(func(ctx context.Context) error { - for importSpanChunk := range importSpanChunksCh { - for _, importSpan := range importSpanChunk { - idx := atomic.AddUint64(&splitScatterStarted, 1) - - newSpanKey, err := rewriteBackupSpanKey(kr, importSpan.Span.Key) - if err != nil { - return err - } - - // TODO(dan): Really, this should be splitting the Key of - // the _next_ entry. - log.VEventf(restoreCtx, 1, "presplitting %d of %d", idx, len(importSpans)) - if err := db.AdminSplit(ctx, newSpanKey, expirationTime); err != nil { - return err - } - - log.VEventf(restoreCtx, 1, "scattering %d of %d", idx, len(importSpans)) - scatterReq := &roachpb.AdminScatterRequest{ - RequestHeader: roachpb.RequestHeaderFromSpan(roachpb.Span{Key: newSpanKey, EndKey: newSpanKey.Next()}), - } - if _, pErr := kv.SendWrapped(ctx, db.NonTransactionalSender(), scatterReq); pErr != nil { - // TODO(dan): Unfortunately, Scatter is still too unreliable to - // fail the RESTORE when Scatter fails. I'm uncomfortable that - // this could break entirely and not start failing the tests, - // but on the bright side, it doesn't affect correctness, only - // throughput. - log.Errorf(ctx, "failed to scatter %d: %s", idx, pErr.GoError()) - } - - select { - case <-ctx.Done(): - return ctx.Err() - case readyForImportCh <- importSpan: - } - } - } - return nil - }) - } - - return g.Wait() -} - // WriteDescriptors writes all the the new descriptors: First the ID -> // TableDescriptor for the new table, then flip (or initialize) the name -> ID // entry so any new queries will use the new one. The tables are assigned the @@ -593,9 +433,8 @@ func rewriteBackupSpanKey(kr *storageccl.KeyRewriter, key roachpb.Key) (roachpb. // files. func restore( restoreCtx context.Context, - db *kv.DB, + phs sql.PlanHookState, numClusterNodes int, - settings *cluster.Settings, backupManifests []BackupManifest, backupLocalityInfo []jobspb.RestoreDetails_BackupLocalityInfo, endTime hlc.Timestamp, @@ -604,17 +443,17 @@ func restore( spans []roachpb.Span, job *jobs.Job, encryption *jobspb.BackupEncryptionOptions, - user string, ) (RowCount, error) { + user := phs.User() // A note about contexts and spans in this method: the top-level context // `restoreCtx` is used for orchestration logging. All operations that carry // out work get their individual contexts. mu := struct { syncutil.Mutex + highWaterMark int res RowCount requestsCompleted []bool - highWaterMark int }{ highWaterMark: -1, } @@ -633,10 +472,6 @@ func restore( NewDesc: newDescBytes, }) } - kr, err := storageccl.MakeKeyRewriterFromRekeys(rekeys) - if err != nil { - return mu.res, err - } // Pivot the backups, which are grouped by time, into requests for import, // which are grouped by keyrange. @@ -648,7 +483,7 @@ func restore( } for i := range importSpans { - importSpans[i].progressIdx = i + importSpans[i].ProgressIdx = int64(i) } mu.requestsCompleted = make([]bool, len(importSpans)) @@ -658,7 +493,7 @@ func restore( case *jobspb.Progress_Restore: mu.Lock() if mu.highWaterMark >= 0 { - d.Restore.HighWater = importSpans[mu.highWaterMark].Key + d.Restore.HighWater = importSpans[mu.highWaterMark].Span.Key } mu.Unlock() default: @@ -671,40 +506,22 @@ func restore( pkIDs[roachpb.BulkOpSummaryID(uint64(tbl.GetID()), uint64(tbl.TableDesc().PrimaryIndex.ID))] = true } - // We're already limiting these on the server-side, but sending all the - // Import requests at once would fill up distsender/grpc/something and cause - // all sorts of badness (node liveness timeouts leading to mass leaseholder - // transfers, poor performance on SQL workloads, etc) as well as log spam - // about slow distsender requests. Rate limit them here, too. - // - // Use the number of cpus across all nodes in the cluster as the number of - // outstanding Import requests for the rate limiting. Note that this assumes - // all nodes in the cluster have the same number of cpus, but it's okay if - // that's wrong. - // - // TODO(dan): Make this limiting per node. - maxConcurrentImports := numClusterNodes * runtime.NumCPU() - importsSem := make(chan struct{}, maxConcurrentImports) - g := ctxgroup.WithContext(restoreCtx) - // The Import (and resulting AddSSTable) requests made below run on - // leaseholders, so presplit and scatter the ranges to balance the work - // among many nodes. - // - // We're about to start off some goroutines that presplit & scatter each - // import span. Once split and scattered, the span is submitted to - // readyForImportCh to indicate it's ready for Import. Since import is so - // much slower, we buffer the channel to keep the split/scatter work from - // getting too far ahead. This both naturally rate limits the split/scatters - // and bounds the number of empty ranges created if the RESTORE fails (or is - // canceled). - const presplitLeadLimit = 10 - readyForImportCh := make(chan importEntry, presplitLeadLimit) - g.GoCtx(func(ctx context.Context) error { - defer close(readyForImportCh) - return splitAndScatter(ctx, settings, db, kr, numClusterNodes, importSpans, readyForImportCh) - }) + // TODO(dan): This not super principled. I just wanted something that wasn't + // a constant and grew slower than linear with the length of importSpans. It + // seems to be working well for BenchmarkRestore2TB but worth revisiting. + chunkSize := int(math.Sqrt(float64(len(importSpans)))) + importSpanChunks := make([][]execinfrapb.RestoreSpanEntry, 0, len(importSpans)/chunkSize) + for start := 0; start < len(importSpans); { + importSpanChunk := importSpans[start:] + end := start + chunkSize + if end < len(importSpans) { + importSpanChunk = importSpans[start:end] + } + importSpanChunks = append(importSpanChunks, importSpanChunk) + start = end + } requestFinishedCh := make(chan struct{}, len(importSpans)) // enough buffer to never block g.GoCtx(func(ctx context.Context) error { @@ -713,81 +530,54 @@ func restore( return progressLogger.Loop(ctx, requestFinishedCh) }) - // Wrap the relevant BackupEncryptionOptions to be used by the KV - // ImportRequest. - // TODO(adityamaru): Move this wrapping to when the Restore DataSpec is being - // created once we switch to using DistSQL for RESTORE. This is how BACKUP - // does it currently. - var fileEncryption *roachpb.FileEncryptionOptions - if encryption != nil { - fileEncryption = &roachpb.FileEncryptionOptions{Key: encryption.Key} - } + progCh := make(chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress) g.GoCtx(func(ctx context.Context) error { - log.Eventf(restoreCtx, "commencing import of data with concurrency %d", maxConcurrentImports) - for readyForImportSpan := range readyForImportCh { - newSpanKey, err := rewriteBackupSpanKey(kr, readyForImportSpan.Span.Key) - if err != nil { - return err - } - idx := readyForImportSpan.progressIdx - - importRequest := &roachpb.ImportRequest{ - // Import is a point request because we don't want DistSender to split - // it. Assume (but don't require) the entire post-rewrite span is on the - // same range. - RequestHeader: roachpb.RequestHeader{Key: newSpanKey}, - DataSpan: readyForImportSpan.Span, - Files: readyForImportSpan.files, - EndTime: endTime, - Rekeys: rekeys, - Encryption: fileEncryption, - } - - log.VEventf(restoreCtx, 1, "importing %d of %d", idx, len(importSpans)) - - select { - case importsSem <- struct{}{}: - case <-ctx.Done(): - return ctx.Err() + // When a processor is done importing a span, it will send a progress update + // to progCh. + for progress := range progCh { + mu.Lock() + var progDetails RestoreProgress + if err := types.UnmarshalAny(&progress.ProgressDetails, &progDetails); err != nil { + log.Errorf(ctx, "unable to unmarshal restore progress details: %+v", err) } - g.GoCtx(func(ctx context.Context) error { - ctx, importSpan := tracing.ChildSpan(ctx, "import") - log.Event(ctx, "acquired semaphore") - defer tracing.FinishSpan(importSpan) - defer func() { <-importsSem }() - - importRes, pErr := kv.SendWrapped(ctx, db.NonTransactionalSender(), importRequest) - if pErr != nil { - return errors.Wrapf(pErr.GoError(), "importing span %v", importRequest.DataSpan) - - } + mu.res.add(progDetails.Summary) + idx := progDetails.ProgressIdx - mu.Lock() - mu.res.add(countRows(importRes.(*roachpb.ImportResponse).Imported, pkIDs)) - - // Assert that we're actually marking the correct span done. See #23977. - if !importSpans[idx].Key.Equal(importRequest.DataSpan.Key) { - mu.Unlock() - return errors.Newf("request %d for span %v (to %v) does not match import span for same idx: %v", - idx, importRequest.DataSpan, newSpanKey, importSpans[idx], - ) - } - mu.requestsCompleted[idx] = true - for j := mu.highWaterMark + 1; j < len(mu.requestsCompleted) && mu.requestsCompleted[j]; j++ { - mu.highWaterMark = j - } + // Assert that we're actually marking the correct span done. See #23977. + if !importSpans[progDetails.ProgressIdx].Span.Key.Equal(progDetails.DataSpan.Key) { mu.Unlock() + return errors.Newf("request %d for span %v does not match import span for same idx: %v", + idx, progDetails.DataSpan, importSpans[idx], + ) + } + mu.requestsCompleted[idx] = true + for j := mu.highWaterMark + 1; j < len(mu.requestsCompleted) && mu.requestsCompleted[j]; j++ { + mu.highWaterMark = j + } + mu.Unlock() - requestFinishedCh <- struct{}{} - return nil - }) + // Signal that an ImportRequest finished to update job progress. + requestFinishedCh <- struct{}{} } - log.Event(restoreCtx, "wait for outstanding imports to finish") return nil }) + // TODO(pbardea): Improve logging in processors. + if err := distRestore( + restoreCtx, + phs, + importSpanChunks, + pkIDs, + encryption, + rekeys, + endTime, + progCh, + ); err != nil { + return mu.res, err + } + if err := g.Wait(); err != nil { // This leaves the data that did get imported in case the user wants to // retry. @@ -1186,9 +976,8 @@ func (r *restoreResumer) Resume( res, err := restore( ctx, - p.ExecCfg().DB, + p, numClusterNodes, - p.ExecCfg().Settings, backupManifests, details.BackupLocalityInfo, details.EndTime, @@ -1197,7 +986,6 @@ func (r *restoreResumer) Resume( spans, r.job, details.Encryption, - p.User(), ) if err != nil { return err diff --git a/pkg/ccl/backupccl/restore_processor_planning.go b/pkg/ccl/backupccl/restore_processor_planning.go new file mode 100644 index 000000000000..304ee39858d3 --- /dev/null +++ b/pkg/ccl/backupccl/restore_processor_planning.go @@ -0,0 +1,252 @@ +// Copyright 2020 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package backupccl + +import ( + "bytes" + "context" + "sort" + + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/logtags" +) + +// distRestore plans a 2 stage distSQL flow for a distributed restore. It +// streams back progress updates over the given progCh. The first stage is a +// splitAndScatter processor on every node that is running a compatible version. +// Those processors will then route the spans after they have split and +// scattered them to the restore data processors - the second stage. The spans +// should be routed to the node that is the leaseholder of that span. The +// restore data processor will finally download and insert the data, and this is +// reported back to the coordinator via the progCh. +// This method also closes the given progCh. +func distRestore( + ctx context.Context, + phs sql.PlanHookState, + chunks [][]execinfrapb.RestoreSpanEntry, + pkIDs map[uint64]bool, + encryption *jobspb.BackupEncryptionOptions, + rekeys []roachpb.ImportRequest_TableRekey, + restoreTime hlc.Timestamp, + progCh chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress, +) error { + ctx = logtags.AddTag(ctx, "restore-distsql", nil) + defer close(progCh) + var noTxn *kv.Txn + + dsp := phs.DistSQLPlanner() + evalCtx := phs.ExtendedEvalContext() + + // Wrap the relevant BackupEncryptionOptions to be used by the KV + // ImportRequest. + var fileEncryption *roachpb.FileEncryptionOptions + if encryption != nil { + fileEncryption = &roachpb.FileEncryptionOptions{Key: encryption.Key} + } + + planCtx, _, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, phs.ExecCfg()) + if err != nil { + return err + } + + nodes := getAllCompatibleNodes(planCtx) + + splitAndScatterSpecs, err := makeSplitAndScatterSpecs(nodes, chunks, rekeys) + if err != nil { + return err + } + + restoreDataSpec := execinfrapb.RestoreDataSpec{ + RestoreTime: restoreTime, + Encryption: fileEncryption, + Rekeys: rekeys, + PKIDs: pkIDs, + } + + if len(splitAndScatterSpecs) == 0 { + // We should return an error here as there are no nodes that are compatible, + // but we should have at least found ourselves. + return nil + } + + gatewayNodeID, err := evalCtx.ExecCfg.NodeID.OptionalNodeIDErr(47970) + if err != nil { + return err + } + p := sql.MakePhysicalPlan(gatewayNodeID) + + // Plan SplitAndScatter in a round-robin fashion. + splitAndScatterStageID := p.NewStageOnNodes(nodes) + splitAndScatterProcs := make(map[roachpb.NodeID]physicalplan.ProcessorIdx) + + defaultStream := int32(0) + rangeRouterSpec := execinfrapb.OutputRouterSpec_RangeRouterSpec{ + Spans: nil, + DefaultDest: &defaultStream, + Encodings: []execinfrapb.OutputRouterSpec_RangeRouterSpec_ColumnEncoding{ + { + Column: 0, + Encoding: sqlbase.DatumEncoding_ASCENDING_KEY, + }, + }, + } + for stream, nodeID := range nodes { + startBytes, endBytes, err := routingSpanForNode(nodeID) + if err != nil { + return err + } + + span := execinfrapb.OutputRouterSpec_RangeRouterSpec_Span{ + Start: startBytes, + End: endBytes, + Stream: int32(stream), + } + rangeRouterSpec.Spans = append(rangeRouterSpec.Spans, span) + } + // The router expects the spans to be sorted. + sort.Slice(rangeRouterSpec.Spans, func(i, j int) bool { + return bytes.Compare(rangeRouterSpec.Spans[i].Start, rangeRouterSpec.Spans[j].Start) == -1 + }) + + for _, n := range nodes { + spec := splitAndScatterSpecs[n] + if spec == nil { + // We may have fewer chunks than we have nodes for very small imports. In + // this case we only want to plan splitAndScatter nodes on a subset of + // nodes. Note that we still want to plan a RestoreData processor on every + // node since each entry could be scattered anywhere. + continue + } + proc := physicalplan.Processor{ + Node: n, + Spec: execinfrapb.ProcessorSpec{ + Core: execinfrapb.ProcessorCoreUnion{SplitAndScatter: splitAndScatterSpecs[n]}, + Post: execinfrapb.PostProcessSpec{}, + Output: []execinfrapb.OutputRouterSpec{ + { + Type: execinfrapb.OutputRouterSpec_BY_RANGE, + RangeRouterSpec: rangeRouterSpec, + }, + }, + StageID: splitAndScatterStageID, + }, + } + pIdx := p.AddProcessor(proc) + splitAndScatterProcs[n] = pIdx + } + + // Plan RestoreData. + restoreDataStageID := p.NewStageOnNodes(nodes) + restoreDataProcs := make(map[roachpb.NodeID]physicalplan.ProcessorIdx) + for _, n := range nodes { + proc := physicalplan.Processor{ + Node: n, + Spec: execinfrapb.ProcessorSpec{ + Input: []execinfrapb.InputSyncSpec{ + {ColumnTypes: splitAndScatterOutputTypes}, + }, + Core: execinfrapb.ProcessorCoreUnion{RestoreData: &restoreDataSpec}, + Post: execinfrapb.PostProcessSpec{}, + Output: []execinfrapb.OutputRouterSpec{{Type: execinfrapb.OutputRouterSpec_PASS_THROUGH}}, + StageID: restoreDataStageID, + }, + } + pIdx := p.AddProcessor(proc) + restoreDataProcs[n] = pIdx + p.ResultRouters = append(p.ResultRouters, pIdx) + } + + for _, srcProc := range splitAndScatterProcs { + slot := 0 + for _, destProc := range restoreDataProcs { + p.Streams = append(p.Streams, physicalplan.Stream{ + SourceProcessor: srcProc, + SourceRouterSlot: slot, + DestProcessor: destProc, + DestInput: 0, + }) + slot++ + } + } + + dsp.FinalizePlan(planCtx, &p) + + metaFn := func(_ context.Context, meta *execinfrapb.ProducerMetadata) error { + if meta.BulkProcessorProgress != nil { + // Send the progress up a level to be written to the manifest. + progCh <- meta.BulkProcessorProgress + } + return nil + } + + rowResultWriter := sql.NewRowResultWriter(nil) + + recv := sql.MakeDistSQLReceiver( + ctx, + sql.NewMetadataCallbackWriter(rowResultWriter, metaFn), + tree.Rows, + nil, /* rangeCache */ + noTxn, /* txn - the flow does not read or write the database */ + func(ts hlc.Timestamp) {}, + evalCtx.Tracing, + ) + defer recv.Release() + + // Copy the evalCtx, as dsp.Run() might change it. + evalCtxCopy := *evalCtx + dsp.Run(planCtx, noTxn, &p, recv, &evalCtxCopy, nil /* finishedSetupFn */)() + return rowResultWriter.Err() +} + +// getAllCompatibleNodes returns all nodes that are OK to use in the DistSQL +// plan. +func getAllCompatibleNodes(planCtx *sql.PlanningCtx) []roachpb.NodeID { + nodes := make([]roachpb.NodeID, 0, len(planCtx.NodeStatuses)) + for node, status := range planCtx.NodeStatuses { + if status == sql.NodeOK { + nodes = append(nodes, node) + } + } + return nodes +} + +// makeSplitAndScatterSpecs returns a map from nodeID to the SplitAndScatter +// spec that should be planned on that node. Given the chunks of ranges to +// import it round-robin distributes the chunks amongst the given nodes. +func makeSplitAndScatterSpecs( + nodes []roachpb.NodeID, + chunks [][]execinfrapb.RestoreSpanEntry, + rekeys []roachpb.ImportRequest_TableRekey, +) (map[roachpb.NodeID]*execinfrapb.SplitAndScatterSpec, error) { + specsByNodes := make(map[roachpb.NodeID]*execinfrapb.SplitAndScatterSpec) + for i, chunk := range chunks { + node := nodes[i%len(nodes)] + if spec, ok := specsByNodes[node]; ok { + spec.Chunks = append(spec.Chunks, execinfrapb.SplitAndScatterSpec_RestoreEntryChunk{ + Entries: chunk, + }) + } else { + specsByNodes[node] = &execinfrapb.SplitAndScatterSpec{ + Chunks: []execinfrapb.SplitAndScatterSpec_RestoreEntryChunk{{ + Entries: chunk, + }}, + Rekeys: rekeys, + } + } + } + return specsByNodes, nil +} diff --git a/pkg/ccl/backupccl/split_and_scatter_processor.go b/pkg/ccl/backupccl/split_and_scatter_processor.go index a317a47803bd..38d276c2a1fe 100644 --- a/pkg/ccl/backupccl/split_and_scatter_processor.go +++ b/pkg/ccl/backupccl/split_and_scatter_processor.go @@ -252,7 +252,7 @@ func runSplitAndScatter( for importSpanChunk := range importSpanChunksCh { log.Infof(ctx, "processing a chunk") for _, importSpan := range importSpanChunk { - log.Infof(ctx, "processing a span") + log.Infof(ctx, "processing a span [%s,%s)", importSpan.Span.Key, importSpan.Span.EndKey) destination, err := scatterer.splitAndScatterKey(ctx, db, kr, importSpan.Span.Key, false /* randomizeLeases */) if err != nil { return err diff --git a/pkg/sql/rowexec/processors.go b/pkg/sql/rowexec/processors.go index f1c52aa59f50..361308fe64af 100644 --- a/pkg/sql/rowexec/processors.go +++ b/pkg/sql/rowexec/processors.go @@ -253,7 +253,7 @@ func NewProcessor( return NewBackupDataProcessor(flowCtx, processorID, *core.BackupData, outputs[0]) } if core.SplitAndScatter != nil { - if err := checkNumInOut(inputs, outputs, 1, 1); err != nil { + if err := checkNumInOut(inputs, outputs, 0, 1); err != nil { return nil, err } if NewSplitAndScatterProcessor == nil { From 418919888b445047212ba89a044a7de5960aaedf Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Tue, 4 Aug 2020 11:15:17 -0400 Subject: [PATCH 4/4] roachtest: reflake (skip) disk-stalled test for release-19.1 Fixes #52181. This test doesn't make use of all the `roachtest start` smartness to figure out what sub-command to use and instead constructs the raw start command by hand. Given our support policy, let's just bump the minimum version this test expects to run. This feels like another instance where https://github.com/cockroachdb/cockroach/issues/51897 would be nice to have. Release note: None --- pkg/cmd/roachtest/disk_stall.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cmd/roachtest/disk_stall.go b/pkg/cmd/roachtest/disk_stall.go index 38d5558dacf0..2e151b42a196 100644 --- a/pkg/cmd/roachtest/disk_stall.go +++ b/pkg/cmd/roachtest/disk_stall.go @@ -34,7 +34,7 @@ func registerDiskStalledDetection(r *testRegistry) { affectsLogDir, affectsDataDir, ), Owner: OwnerKV, - MinVersion: "v19.1.0", + MinVersion: "v19.2.0", Cluster: makeClusterSpec(1), Run: func(ctx context.Context, t *test, c *cluster) { runDiskStalledDetection(ctx, t, c, affectsLogDir, affectsDataDir)