-
Notifications
You must be signed in to change notification settings - Fork 297
/
signer.go
652 lines (566 loc) · 18.7 KB
/
signer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
package user
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"
"github.com/celestiaorg/go-square/blob"
"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/client/grpc/tmservice"
"github.com/cosmos/cosmos-sdk/crypto/keyring"
cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types"
sdktypes "github.com/cosmos/cosmos-sdk/types"
sdktx "github.com/cosmos/cosmos-sdk/types/tx"
"github.com/cosmos/cosmos-sdk/types/tx/signing"
authsigning "github.com/cosmos/cosmos-sdk/x/auth/signing"
authtypes "github.com/cosmos/cosmos-sdk/x/auth/types"
abci "github.com/tendermint/tendermint/abci/types"
"google.golang.org/grpc"
"github.com/celestiaorg/celestia-app/v2/app/encoding"
apperrors "github.com/celestiaorg/celestia-app/v2/app/errors"
blobtypes "github.com/celestiaorg/celestia-app/v2/x/blob/types"
)
const (
DefaultPollTime = 3 * time.Second
DefaultGasMultiplier float64 = 1.1
)
type Option func(s *Signer)
// WithGasMultiplier is a functional option allows to configure the gas multiplier.
func WithGasMultiplier(multiplier float64) Option {
return func(s *Signer) {
s.gasMultiplier = multiplier
}
}
// Signer is an abstraction for building, signing, and broadcasting Celestia transactions
type Signer struct {
keys keyring.Keyring
address sdktypes.AccAddress
enc client.TxConfig
grpc *grpc.ClientConn
pk cryptotypes.PubKey
chainID string
accountNumber uint64
// FIXME: the signer is currently incapable of detecting an appversion
// change and could produce incorrect PFBs if it the network is at an
// appVersion that the signer does not support
appVersion uint64
mtx sync.RWMutex
// how often to poll the network for confirmation of a transaction
pollTime time.Duration
// the signers local view of the sequence number
localSequence uint64
// the chains last known sequence number
networkSequence uint64
// gasMultiplier is used to increase gas limit as it is sometimes underestimated
gasMultiplier float64
// lookup map of all pending and yet to be confirmed outbound transactions
outboundSequences map[uint64]struct{}
// a reverse map for confirming which sequence numbers have been committed
reverseTxHashSequenceMap map[string]uint64
}
// NewSigner returns a new signer using the provided keyring
func NewSigner(
keys keyring.Keyring,
conn *grpc.ClientConn,
address sdktypes.AccAddress,
enc client.TxConfig,
chainID string,
accountNumber, sequence,
appVersion uint64,
options ...Option,
) (*Signer, error) {
// check that the address exists
record, err := keys.KeyByAddress(address)
if err != nil {
return nil, err
}
pk, err := record.GetPubKey()
if err != nil {
return nil, err
}
signer := &Signer{
keys: keys,
address: address,
grpc: conn,
enc: enc,
pk: pk,
chainID: chainID,
accountNumber: accountNumber,
appVersion: appVersion,
localSequence: sequence,
networkSequence: sequence,
pollTime: DefaultPollTime,
gasMultiplier: DefaultGasMultiplier,
outboundSequences: make(map[uint64]struct{}),
reverseTxHashSequenceMap: make(map[string]uint64),
}
for _, opt := range options {
opt(signer)
}
return signer, nil
}
// SetupSingleSigner sets up a signer based on the provided keyring. The keyring
// must contain exactly one key. It extracts the address from the key and uses
// the grpc connection to populate the chainID, account number, and sequence
// number.
func SetupSingleSigner(
ctx context.Context,
keys keyring.Keyring,
conn *grpc.ClientConn,
encCfg encoding.Config,
options ...Option,
) (*Signer, error) {
records, err := keys.List()
if err != nil {
return nil, err
}
if len(records) != 1 {
return nil, errors.New("keyring must contain exactly one key")
}
address, err := records[0].GetAddress()
if err != nil {
return nil, err
}
return SetupSigner(ctx, keys, conn, address, encCfg, options...)
}
// SetupSigner uses the underlying grpc connection to populate the chainID, accountNumber and sequence number of the
// account.
func SetupSigner(
ctx context.Context,
keys keyring.Keyring,
conn *grpc.ClientConn,
address sdktypes.AccAddress,
encCfg encoding.Config,
options ...Option,
) (*Signer, error) {
resp, err := tmservice.NewServiceClient(conn).GetLatestBlock(
ctx,
&tmservice.GetLatestBlockRequest{},
)
if err != nil {
return nil, err
}
chainID := resp.SdkBlock.Header.ChainID
appVersion := resp.SdkBlock.Header.Version.App
accNum, seqNum, err := QueryAccount(ctx, conn, encCfg, address.String())
if err != nil {
return nil, err
}
return NewSigner(keys, conn, address, encCfg.TxConfig, chainID, accNum, seqNum, appVersion, options...)
}
// SubmitTx forms a transaction from the provided messages, signs it, and submits it to the chain. TxOptions
// may be provided to set the fee and gas limit.
func (s *Signer) SubmitTx(ctx context.Context, msgs []sdktypes.Msg, opts ...TxOption) (*sdktypes.TxResponse, error) {
tx, err := s.CreateTx(msgs, opts...)
if err != nil {
return nil, err
}
resp, err := s.BroadcastTx(ctx, tx)
if err != nil {
return resp, err
}
return s.ConfirmTx(ctx, resp.TxHash)
}
// SubmitPayForBlob forms a transaction from the provided blobs, signs it, and submits it to the chain.
// TxOptions may be provided to set the fee and gas limit.
func (s *Signer) SubmitPayForBlob(ctx context.Context, blobs []*blob.Blob, opts ...TxOption) (*sdktypes.TxResponse, error) {
resp, err := s.BroadcastPayForBlob(ctx, blobs, opts...)
if err != nil {
return resp, err
}
return s.ConfirmTx(ctx, resp.TxHash)
}
// BroadcastPayForBlob forms a transaction from the provided blobs, signs it,
// and broadcasts it to the chain. TxOptions may be provided to set the fee and
// gas limit. This function does not block on the tx actually being included in
// a block.
func (s *Signer) BroadcastPayForBlob(ctx context.Context, blobs []*blob.Blob, opts ...TxOption) (*sdktypes.TxResponse, error) {
s.mtx.Lock()
defer s.mtx.Unlock()
txBytes, seqNum, err := s.createPayForBlobs(blobs, opts...)
if err != nil {
return nil, err
}
return s.broadcastTx(ctx, txBytes, seqNum)
}
// CreateTx forms a transaction from the provided messages and signs it. TxOptions may be optionally
// used to set the gas limit and fee.
func (s *Signer) CreateTx(msgs []sdktypes.Msg, opts ...TxOption) (authsigning.Tx, error) {
s.mtx.Lock()
defer s.mtx.Unlock()
return s.createTx(msgs, opts...)
}
func (s *Signer) createTx(msgs []sdktypes.Msg, opts ...TxOption) (authsigning.Tx, error) {
txBuilder := s.txBuilder(opts...)
if err := txBuilder.SetMsgs(msgs...); err != nil {
return nil, err
}
if err := s.signTransaction(txBuilder, s.getAndIncrementSequence()); err != nil {
return nil, err
}
return txBuilder.GetTx(), nil
}
func (s *Signer) CreatePayForBlob(blobs []*blob.Blob, opts ...TxOption) ([]byte, error) {
s.mtx.Lock()
defer s.mtx.Unlock()
blobTx, _, err := s.createPayForBlobs(blobs, opts...)
return blobTx, err
}
func (s *Signer) createPayForBlobs(blobs []*blob.Blob, opts ...TxOption) ([]byte, uint64, error) {
msg, err := blobtypes.NewMsgPayForBlobs(s.address.String(), s.appVersion, blobs...)
if err != nil {
return nil, 0, err
}
tx, err := s.createTx([]sdktypes.Msg{msg}, opts...)
if err != nil {
return nil, 0, err
}
seqNum, err := getSequenceNumber(tx)
if err != nil {
panic(err)
}
txBytes, err := s.EncodeTx(tx)
if err != nil {
return nil, 0, err
}
blobTx, err := blob.MarshalBlobTx(txBytes, blobs...)
return blobTx, seqNum, err
}
func (s *Signer) EncodeTx(tx sdktypes.Tx) ([]byte, error) {
return s.enc.TxEncoder()(tx)
}
func (s *Signer) DecodeTx(txBytes []byte) (authsigning.Tx, error) {
tx, err := s.enc.TxDecoder()(txBytes)
if err != nil {
return nil, err
}
authTx, ok := tx.(authsigning.Tx)
if !ok {
return nil, errors.New("not an authsigning transaction")
}
return authTx, nil
}
// BroadcastTx submits the provided transaction bytes to the chain and returns the response.
func (s *Signer) BroadcastTx(ctx context.Context, tx authsigning.Tx) (*sdktypes.TxResponse, error) {
s.mtx.Lock()
defer s.mtx.Unlock()
txBytes, err := s.EncodeTx(tx)
if err != nil {
return nil, err
}
sequence, err := getSequenceNumber(tx)
if err != nil {
return nil, err
}
return s.broadcastTx(ctx, txBytes, sequence)
}
// CONTRACT: assumes the caller has the lock
func (s *Signer) broadcastTx(ctx context.Context, txBytes []byte, sequence uint64) (*sdktypes.TxResponse, error) {
if _, exists := s.outboundSequences[sequence]; exists {
return s.retryBroadcastingTx(ctx, txBytes, sequence+1)
}
if sequence < s.networkSequence {
s.localSequence = s.networkSequence
return s.retryBroadcastingTx(ctx, txBytes, s.localSequence)
}
txClient := sdktx.NewServiceClient(s.grpc)
resp, err := txClient.BroadcastTx(
ctx,
&sdktx.BroadcastTxRequest{
Mode: sdktx.BroadcastMode_BROADCAST_MODE_SYNC,
TxBytes: txBytes,
},
)
if err != nil {
return nil, err
}
if apperrors.IsNonceMismatchCode(resp.TxResponse.Code) {
// extract what the lastCommittedNonce on chain is
nextSequence, err := apperrors.ParseExpectedSequence(resp.TxResponse.RawLog)
if err != nil {
return nil, fmt.Errorf("parsing nonce mismatch upon retry: %w", err)
}
s.networkSequence = nextSequence
s.localSequence = nextSequence
// FIXME: We can't actually resign the transaction. A malicious node
// may manipulate us into signing the same transaction several times
// and then executing them. We need some proof of what the last network
// sequence is rather than relying on an error provided by the node
// return s.retryBroadcastingTx(ctx, txBytes, nextSequence)
// Ref: https://github.com/celestiaorg/celestia-app/issues/3256
} else if resp.TxResponse.Code == abci.CodeTypeOK {
s.outboundSequences[sequence] = struct{}{}
s.reverseTxHashSequenceMap[resp.TxResponse.TxHash] = sequence
return resp.TxResponse, nil
}
return resp.TxResponse, fmt.Errorf("tx failed with code %d: %s", resp.TxResponse.Code, resp.TxResponse.RawLog)
}
// retryBroadcastingTx creates a new transaction by copying over an existing transaction but creates a new signature with the
// new sequence number. It then calls `broadcastTx` and attempts to submit the transaction
func (s *Signer) retryBroadcastingTx(ctx context.Context, txBytes []byte, newSequenceNumber uint64) (*sdktypes.TxResponse, error) {
blobTx, isBlobTx := blob.UnmarshalBlobTx(txBytes)
if isBlobTx {
txBytes = blobTx.Tx
}
tx, err := s.DecodeTx(txBytes)
if err != nil {
return nil, err
}
txBuilder := s.txBuilder()
if err := txBuilder.SetMsgs(tx.GetMsgs()...); err != nil {
return nil, err
}
if granter := tx.FeeGranter(); granter != nil {
txBuilder.SetFeeGranter(granter)
}
if payer := tx.FeePayer(); payer != nil {
txBuilder.SetFeePayer(payer)
}
if memo := tx.GetMemo(); memo != "" {
txBuilder.SetMemo(memo)
}
if fee := tx.GetFee(); fee != nil {
txBuilder.SetFeeAmount(fee)
}
if gas := tx.GetGas(); gas > 0 {
txBuilder.SetGasLimit(gas)
}
if err := s.signTransaction(txBuilder, newSequenceNumber); err != nil {
return nil, fmt.Errorf("resigning transaction: %w", err)
}
newTxBytes, err := s.EncodeTx(txBuilder.GetTx())
if err != nil {
return nil, err
}
// rewrap the blob tx if it was originally a blob tx
if isBlobTx {
newTxBytes, err = blob.MarshalBlobTx(newTxBytes, blobTx.Blobs...)
if err != nil {
return nil, err
}
}
return s.broadcastTx(ctx, newTxBytes, newSequenceNumber)
}
// ConfirmTx periodically pings the provided node for the commitment of a transaction by its
// hash. It will continually loop until the context is cancelled, the tx is found or an error
// is encountered.
func (s *Signer) ConfirmTx(ctx context.Context, txHash string) (*sdktypes.TxResponse, error) {
txClient := sdktx.NewServiceClient(s.grpc)
pollTicker := time.NewTicker(s.getPollTime())
defer pollTicker.Stop()
for {
resp, err := txClient.GetTx(ctx, &sdktx.GetTxRequest{Hash: txHash})
if err == nil {
if resp.TxResponse.Code != 0 {
s.updateNetworkSequence(txHash, false)
return resp.TxResponse, fmt.Errorf("tx was included but failed with code %d: %s", resp.TxResponse.Code, resp.TxResponse.RawLog)
}
s.updateNetworkSequence(txHash, true)
return resp.TxResponse, nil
}
// FIXME: this is a relatively brittle of working out whether to retry or not. The tx might be not found for other
// reasons. It may have been removed from the mempool at a later point. We should build an endpoint that gives the
// signer more information on the status of their transaction and then update the logic here
if !strings.Contains(err.Error(), "not found") {
return &sdktypes.TxResponse{}, err
}
// Wait for the next round.
select {
case <-ctx.Done():
return &sdktypes.TxResponse{}, ctx.Err()
case <-pollTicker.C:
}
}
}
func (s *Signer) EstimateGas(ctx context.Context, msgs []sdktypes.Msg, opts ...TxOption) (uint64, error) {
txBuilder := s.txBuilder(opts...)
if err := txBuilder.SetMsgs(msgs...); err != nil {
return 0, err
}
if err := s.signTransaction(txBuilder, s.LocalSequence()); err != nil {
return 0, err
}
txBytes, err := s.enc.TxEncoder()(txBuilder.GetTx())
if err != nil {
return 0, err
}
resp, err := sdktx.NewServiceClient(s.grpc).Simulate(ctx, &sdktx.SimulateRequest{
TxBytes: txBytes,
})
if err != nil {
return 0, err
}
gasLimit := uint64(float64(resp.GasInfo.GasUsed) * s.gasMultiplier)
return gasLimit, nil
}
// ChainID returns the chain ID of the signer.
func (s *Signer) ChainID() string {
return s.chainID
}
// AccountNumber returns the account number of the signer.
func (s *Signer) AccountNumber() uint64 {
return s.accountNumber
}
// Address returns the address of the signer.
func (s *Signer) Address() sdktypes.AccAddress {
return s.address
}
// SetPollTime sets how often the signer should poll for the confirmation of the transaction
func (s *Signer) SetPollTime(pollTime time.Duration) {
s.mtx.Lock()
defer s.mtx.Unlock()
s.pollTime = pollTime
}
func (s *Signer) getPollTime() time.Duration {
s.mtx.Lock()
defer s.mtx.Unlock()
return s.pollTime
}
// PubKey returns the public key of the signer
func (s *Signer) PubKey() cryptotypes.PubKey {
return s.pk
}
// LocalSequence returns the next sequence number of the signers
// locally saved
func (s *Signer) LocalSequence() uint64 {
s.mtx.RLock()
defer s.mtx.RUnlock()
return s.localSequence
}
func (s *Signer) NetworkSequence() uint64 {
s.mtx.RLock()
defer s.mtx.RUnlock()
return s.networkSequence
}
// getAndIncrementSequence gets the latest signed sequence and increments the
// local sequence number
func (s *Signer) getAndIncrementSequence() uint64 {
defer func() { s.localSequence++ }()
return s.localSequence
}
// ForceSetSequence manually overrides the current local and network level
// sequence number. Be careful when invoking this as it may cause the
// transactions to reject the sequence if it doesn't match the one in state
func (s *Signer) ForceSetSequence(seq uint64) {
s.mtx.Lock()
defer s.mtx.Unlock()
s.localSequence = seq
s.networkSequence = seq
}
// updateNetworkSequence is called once a transaction is confirmed
// and updates the chains last known sequence number
func (s *Signer) updateNetworkSequence(txHash string, success bool) {
s.mtx.Lock()
defer s.mtx.Unlock()
sequence, exists := s.reverseTxHashSequenceMap[txHash]
if !exists {
return
}
if success && sequence >= s.networkSequence {
s.networkSequence = sequence + 1
}
delete(s.outboundSequences, sequence)
delete(s.reverseTxHashSequenceMap, txHash)
}
// Keyring exposes the signers underlying keyring
func (s *Signer) Keyring() keyring.Keyring {
return s.keys
}
func (s *Signer) signTransaction(builder client.TxBuilder, sequence uint64) error {
signers := builder.GetTx().GetSigners()
if len(signers) != 1 {
return fmt.Errorf("expected 1 signer, got %d", len(signers))
}
if !s.address.Equals(signers[0]) {
return fmt.Errorf("expected signer %s, got %s", s.address.String(), signers[0].String())
}
// To ensure we have the correct bytes to sign over we produce
// a dry run of the signing data
err := builder.SetSignatures(s.getSignatureV2(sequence, nil))
if err != nil {
return fmt.Errorf("error setting draft signatures: %w", err)
}
// now we can use the data to produce the signature from the signer
signature, err := s.createSignature(builder, sequence)
if err != nil {
return fmt.Errorf("error creating signature: %w", err)
}
err = builder.SetSignatures(s.getSignatureV2(sequence, signature))
if err != nil {
return fmt.Errorf("error setting signatures: %w", err)
}
return nil
}
func (s *Signer) createSignature(builder client.TxBuilder, sequence uint64) ([]byte, error) {
signerData := authsigning.SignerData{
Address: s.address.String(),
ChainID: s.ChainID(),
AccountNumber: s.accountNumber,
Sequence: sequence,
PubKey: s.pk,
}
bytesToSign, err := s.enc.SignModeHandler().GetSignBytes(
signing.SignMode_SIGN_MODE_DIRECT,
signerData,
builder.GetTx(),
)
if err != nil {
return nil, fmt.Errorf("error getting sign bytes: %w", err)
}
signature, _, err := s.keys.SignByAddress(s.address, bytesToSign)
if err != nil {
return nil, fmt.Errorf("error signing bytes: %w", err)
}
return signature, nil
}
// txBuilder returns the default sdk Tx builder using the celestia-app encoding config
func (s *Signer) txBuilder(opts ...TxOption) client.TxBuilder {
builder := s.enc.NewTxBuilder()
for _, opt := range opts {
builder = opt(builder)
}
return builder
}
// QueryAccount fetches the account number and sequence number from the celestia-app node.
func QueryAccount(ctx context.Context, conn *grpc.ClientConn, encCfg encoding.Config, address string) (accNum uint64, seqNum uint64, err error) {
qclient := authtypes.NewQueryClient(conn)
resp, err := qclient.Account(
ctx,
&authtypes.QueryAccountRequest{Address: address},
)
if err != nil {
return accNum, seqNum, err
}
var acc authtypes.AccountI
err = encCfg.InterfaceRegistry.UnpackAny(resp.Account, &acc)
if err != nil {
return accNum, seqNum, err
}
accNum, seqNum = acc.GetAccountNumber(), acc.GetSequence()
return accNum, seqNum, nil
}
func (s *Signer) getSignatureV2(sequence uint64, signature []byte) signing.SignatureV2 {
sigV2 := signing.SignatureV2{
Data: &signing.SingleSignatureData{
SignMode: signing.SignMode_SIGN_MODE_DIRECT,
Signature: signature,
},
Sequence: sequence,
}
if sequence == 0 {
sigV2.PubKey = s.pk
}
return sigV2
}
func getSequenceNumber(tx authsigning.Tx) (uint64, error) {
sigs, err := tx.GetSignaturesV2()
if err != nil {
return 0, err
}
if len(sigs) > 1 {
return 0, fmt.Errorf("only a signle signature is supported, got %d", len(sigs))
}
return sigs[0].Sequence, nil
}