Skip to content
This repository has been archived by the owner on Apr 29, 2024. It is now read-only.

Binary / gRPC server #7

Merged
merged 19 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions .github/workflows/ci_release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,9 @@ jobs:
with:
GO_VERSION: "1.21"

proto:
uses: ./.github/workflows/proto.yml

# Make a release if this is a manually trigger job, i.e. workflow_dispatch
release:
needs: [lint, test, proto]
needs: [lint, test]
runs-on: ubuntu-latest
if: ${{ github.event_name == 'workflow_dispatch' }}
permissions: "write-all"
Expand Down
14 changes: 0 additions & 14 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,6 @@ jobs:
github-token: ${{ secrets.github_token }}
if: env.GIT_DIFF

# hadolint lints the Dockerfile
hadolint:
uses: rollkit/.github/.github/workflows/reusable_dockerfile_lint.yml@v0.2.2 # yamllint disable-line rule:line-length
with:
dockerfile: docker/mockserv.Dockerfile

yamllint:
runs-on: ubuntu-latest
steps:
Expand All @@ -52,11 +46,3 @@ jobs:
steps:
- uses: actions/checkout@v4
- uses: rollkit/.github/.github/actions/markdown-lint@v0.2.2

protobuf-lint:
runs-on: ubuntu-latest
timeout-minutes: 5
steps:
- uses: actions/checkout@v4
- run: make proto-gen
- run: make proto-lint
41 changes: 41 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
run:
timeout: 5m
modules-download-mode: readonly

linters:
enable:
- deadcode
- errcheck
- gofmt
- goimports
- gosec
- gosimple
- govet
- ineffassign
- misspell
- revive
- staticcheck
- structcheck
- typecheck
- unused
- varcheck

issues:
exclude-use-default: false
include:
- EXC0012 # EXC0012 revive: Annoying issue about not having a comment.
- EXC0014 # EXC0014 revive: Annoying issue about not having a comment.

linters-settings:
revive:
rules:
- name: package-comments
disabled: true
- name: duplicated-imports
severity: warning
- name: exported
arguments:
- disableStutteringCheck

goimports:
local-prefixes: github.com/rollkit
6 changes: 6 additions & 0 deletions .markdownlint.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
default: true
MD010:
code_blocks: false
MD013: false
MD024:
allow_different_nesting: true
117 changes: 77 additions & 40 deletions celestia.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,41 @@ package celestia

import (
"context"
"errors"
"fmt"
"encoding/binary"
"log"
"strings"

"github.com/celestiaorg/celestia-app/x/blob/types"
rpc "github.com/celestiaorg/celestia-node/api/rpc/client"
"github.com/celestiaorg/celestia-node/blob"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/nmt"

"github.com/rollkit/go-da"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
)

// CelestiaDA implements the celestia backend for the DA interface
type CelestiaDA struct {
client *rpc.Client
height uint64
namespace share.Namespace
ctx context.Context
}

// NewCelestiaDA returns an instance of CelestiaDA
func NewCelestiaDA(client *rpc.Client, height uint64, namespace share.Namespace, ctx context.Context) *CelestiaDA {
func NewCelestiaDA(client *rpc.Client, namespace share.Namespace, ctx context.Context) *CelestiaDA {
return &CelestiaDA{
client: client,
height: height,
namespace: namespace,
ctx: ctx,
}
}

// Get returns Blob for each given ID, or an error.
func (c *CelestiaDA) Get(ids []da.ID) ([]da.Blob, error) {
var blobs []da.Blob
for _, id := range ids {
// TODO: extract commitment from ID
blob, err := c.client.Blob.Get(c.ctx, c.height, c.namespace, blob.Commitment(id))
height, commitment := splitID(id)
blob, err := c.client.Blob.Get(c.ctx, height, c.namespace, commitment)
if err != nil {
return nil, err
}
Expand All @@ -45,59 +45,77 @@ func (c *CelestiaDA) Get(ids []da.ID) ([]da.Blob, error) {
return blobs, nil
}

// GetIDs returns IDs of all Blobs located in DA at given height.
func (c *CelestiaDA) GetIDs(height uint64) ([]da.ID, error) {
var ids []da.ID
blobs, err := c.client.Blob.GetAll(c.ctx, c.height, []share.Namespace{c.namespace})
if errors.Is(err, blob.ErrBlobNotFound) {
return nil, nil
}
blobs, err := c.client.Blob.GetAll(c.ctx, height, []share.Namespace{c.namespace})
if err != nil {
if strings.Contains(err.Error(), blob.ErrBlobNotFound.Error()) {
return nil, nil
}
Manav-Aggarwal marked this conversation as resolved.
Show resolved Hide resolved
return nil, err
}
for _, blob := range blobs {
// TODO: commitment -> id
ids = append(ids, blob.Commitment)
for _, b := range blobs {
ids = append(ids, makeID(height, b.Commitment))
}
return ids, nil
}

// Commit creates a Commitment for each given Blob.
func (c *CelestiaDA) Commit(daBlobs []da.Blob) ([]da.Commitment, error) {
var blobs []*tmproto.Blob
for _, daBlob := range daBlobs {
b, err := blob.NewBlobV0(c.namespace, daBlob)
if err != nil {
return nil, err
}
blobs = append(blobs, &b.Blob)
_, commitments, err := c.blobsAndCommitments(daBlobs)
return commitments, err
}

// Submit submits the Blobs to Data Availability layer.
func (c *CelestiaDA) Submit(daBlobs []da.Blob) ([]da.ID, []da.Proof, error) {
blobs, commitments, err := c.blobsAndCommitments(daBlobs)
if err != nil {
return nil, nil, err
}
commitments, err := types.CreateCommitments(blobs)
height, err := c.client.Blob.Submit(c.ctx, blobs, blob.DefaultSubmitOptions())
if err != nil {
return nil, err
return nil, nil, err
}
var daCommitments []da.Commitment
for _, commitment := range commitments {
daCommitments = append(daCommitments, da.Commitment(commitment))
log.Println("successfully submitted blobs", "height", height)
ids := make([]da.ID, len(daBlobs))
proofs := make([]da.Proof, len(daBlobs))
for i, commitment := range commitments {
ids[i] = makeID(height, commitment)
proof, err := c.client.Blob.GetProof(c.ctx, height, c.namespace, commitment)
if err != nil {
return nil, nil, err
}
tzdybal marked this conversation as resolved.
Show resolved Hide resolved
// TODO(tzdybal): does always len(*proof) == 1?
proofs[i], err = (*proof)[0].MarshalJSON()
tzdybal marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, nil, err
}
}
return daCommitments, nil
return ids, proofs, nil
}

func (c *CelestiaDA) Submit(daBlobs []da.Blob) ([]da.ID, []da.Proof, error) {
// blobsAndCommitments converts []da.Blob to []*blob.Blob and generates corresponding []da.Commitment
func (c *CelestiaDA) blobsAndCommitments(daBlobs []da.Blob) ([]*blob.Blob, []da.Commitment, error) {
var blobs []*blob.Blob
var commitments []da.Commitment
for _, daBlob := range daBlobs {
b, err := blob.NewBlobV0(c.namespace, daBlob)
if err != nil {
return nil, nil, err
}
blobs = append(blobs, b)

commitment, err := types.CreateCommitment(&b.Blob)
if err != nil {
return nil, nil, err
}
commitments = append(commitments, commitment)
}
height, err := c.client.Blob.Submit(c.ctx, blobs, blob.DefaultSubmitOptions())
if err != nil {
return nil, nil, err
}
fmt.Println("succesfully submitted blobs", "height", height)
return nil, nil, nil
return blobs, commitments, nil
}

// Validate validates Commitments against the corresponding Proofs. This should be possible without retrieving the Blobs.
func (c *CelestiaDA) Validate(ids []da.ID, daProofs []da.Proof) ([]bool, error) {
var included []bool
var proofs []*blob.Proof
Expand All @@ -110,14 +128,33 @@ func (c *CelestiaDA) Validate(ids []da.ID, daProofs []da.Proof) ([]bool, error)
proofs = append(proofs, proof)
}
for i, id := range ids {
// TODO: extract commitment from ID
isIncluded, err := c.client.Blob.Included(c.ctx, c.height, share.Namespace(c.namespace), proofs[i], blob.Commitment(id))
if err != nil {
return nil, err
}
height, commitment := splitID(id)
// TODO(tzdybal): for some reason, if proof doesn't match commitment, API returns (false, "blob: invalid proof")
// but analysis of the code in celestia-node implies this should never happen - maybe it's caused by openrpc?
// there is no way of gently handling errors here, but returned value is fine for us
tzdybal marked this conversation as resolved.
Show resolved Hide resolved
isIncluded, _ := c.client.Blob.Included(c.ctx, height, c.namespace, proofs[i], commitment)
included = append(included, isIncluded)
}
return included, nil
}

// heightLen is a length (in bytes) of serialized height.
//
// This is 8 as uint64 consist of 8 bytes.
const heightLen = 8

func makeID(height uint64, commitment da.Commitment) da.ID {
id := make([]byte, heightLen+len(commitment))
binary.LittleEndian.PutUint64(id, height)
copy(id[heightLen:], commitment)
return id
}

func splitID(id da.ID) (uint64, da.Commitment) {
if len(id) <= heightLen {
return 0, nil
}
return binary.LittleEndian.Uint64(id[:heightLen]), id[heightLen:]
}

var _ da.DA = &CelestiaDA{}
105 changes: 102 additions & 3 deletions celestia_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,109 @@
package celestia_test

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"log"
"net/http"
"strings"
"testing"
"time"

"github.com/ory/dockertest/v3"
"github.com/stretchr/testify/suite"

rpc "github.com/celestiaorg/celestia-node/api/rpc/client"
"github.com/celestiaorg/celestia-node/share"

"github.com/rollkit/celestia-da"
"github.com/rollkit/go-da/test"
)

func TestCelestiaDA(t *testing.T) {
t.Skip()
// TODO
type TestSuite struct {
suite.Suite

pool *dockertest.Pool
resource *dockertest.Resource

token string
}

func (t *TestSuite) SetupSuite() {
pool, err := dockertest.NewPool("")
if err != nil {
t.Failf("Could not construct docker pool", "error: %v\n", err)
}
t.pool = pool

// uses pool to try to connect to Docker
err = pool.Client.Ping()
if err != nil {
t.Failf("Could not connect to Docker", "error: %v\n", err)
}

// pulls an image, creates a container based on it and runs it
resource, err := pool.Run("ghcr.io/rollkit/local-celestia-devnet", "4ecd750", []string{})
if err != nil {
t.Failf("Could not start resource", "error: %v\n", err)
}
t.resource = resource

// exponential backoff-retry, because the application in the container might not be ready to accept connections yet
pool.MaxWait = 60 * time.Second
if err := pool.Retry(func() error {
resp, err := http.Get(fmt.Sprintf("http://localhost:%s/balance", resource.GetPort("26659/tcp")))
if err != nil {
return err
}
bz, err := io.ReadAll(resp.Body)
_ = resp.Body.Close()
if err != nil {
return err
}
if strings.Contains(string(bz), "error") {
return errors.New(string(bz))
}
return nil
}); err != nil {
log.Fatalf("Could not start local-celestia-devnet: %s", err)
}

opts := dockertest.ExecOptions{}
buf := new(bytes.Buffer)
opts.StdOut = buf
opts.StdErr = buf
_, err = resource.Exec([]string{"/bin/celestia", "bridge", "auth", "admin", "--node.store", "/home/celestia/bridge"}, opts)
if err != nil {
t.Failf("Could not execute command", "error: %v\n", err)
}

t.token = buf.String()
}
Comment on lines +34 to +84
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SetupSuite function is setting up the Docker pool and resource correctly. It also handles errors properly. However, it's recommended to use t.Fatalf instead of t.Failf followed by log.Fatalf to fail the test immediately when an error occurs.


func (t *TestSuite) TearDownSuite() {
if err := t.pool.Purge(t.resource); err != nil {
t.Failf("failed to purge docker resource", "error: %v\n", err)
}
}

func TestIntegrationTestSuite(t *testing.T) {
suite.Run(t, new(TestSuite))
}

func (t *TestSuite) TestCelestiaDA() {
client, err := rpc.NewClient(context.Background(), t.getRPCAddress(), t.token)
t.Require().NoError(err)
ns, err := share.NewBlobNamespaceV0([]byte("test"))
t.Require().NoError(err)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
da := celestia.NewCelestiaDA(client, ns, ctx)
test.RunDATestSuite(t.T(), da)
}

func (t *TestSuite) getRPCAddress() string {
return fmt.Sprintf("http://localhost:%s", t.resource.GetPort("26658/tcp"))
}
Loading