Skip to content

Commit

Permalink
Implement background service with Go (#59)
Browse files Browse the repository at this point in the history
  • Loading branch information
tuanvumaihuynh authored Oct 30, 2024
1 parent b60e0c4 commit 00fc1c8
Show file tree
Hide file tree
Showing 32 changed files with 1,746 additions and 21 deletions.
16 changes: 0 additions & 16 deletions .github/workflows/viot-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,6 @@ jobs:
working-directory: ./viot
run: bash ./scripts/test.sh --cov-report=xml

sonar:
needs:
- test
runs-on: ubuntu-latest
steps:
- name: Check out Git repository
uses: actions/checkout@v4

- name: SonarCloud Scan
uses: sonarsource/sonarcloud-github-action@master
with:
projectBaseDir: viot
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}

codecov:
needs:
- test
Expand Down
24 changes: 24 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,27 @@ services:
viot:
aliases:
- viot-celery

viot-background:
build:
context: .
dockerfile: ./docker/viot-background/Dockerfile
container_name: viot-background
restart: unless-stopped
environment:
- ENVIRONMENT=development
- POSTGRES_DSN=postgres://postgres:postgres@timescaledb:5432/postgres
- MQTT_BROKER_URL=tcp://emqx:1883
- MQTT_CLIENT_ID=b348a2b5-5e1f-47bb-95d0-452002a78611
- MQTT_USERNAME=ABCD123
- MQTT_PASSWORD=ABCD123
- DEVICE_DATA_PROCESSOR_MAX_BATCH_SIZE=5000
- DEVICE_DATA_PROCESSOR_MAX_BATCH_INTERVAL_MS=2000
- DEVICE_ATTRIBUTE_PROCESSOR_MAX_BATCH_SIZE=5000
- DEVICE_ATTRIBUTE_PROCESSOR_MAX_BATCH_INTERVAL_MS=2000
depends_on:
- emqx
networks:
viot:
aliases:
- viot-background
57 changes: 57 additions & 0 deletions docker/emqx/etc/cluster.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -135,5 +135,62 @@ rule_engine {
FROM
"$events/client_disconnected"~"""
}
device_data {
actions = [
{
args {
direct_dispatch = true
mqtt_properties {}
payload = """{"device_id": "${clientid}", "ts": "${ts}", "data": ${p}}"""
qos = 2
retain = true
topic = "v2/private/device_data"
user_properties = ""
}
function = republish
}
]
description = ""
enable = true
metadata {created_at = 1730256192783}
sql = """~
SELECT
clientid,
CASE
WHEN is_null(payload.ts) THEN now_rfc3339('millisecond')
ELSE format_date('millisecond', 'Z', '%Y-%m-%dT%H:%M:%S.%3N%:z', payload.ts)
END as ts,
CASE
WHEN is_null(payload.ts) THEN json_decode(payload)
ELSE payload.values
END as p
FROM
"v2/devices/me/data"~"""
}
device_attribute {
actions = [
{
args {
direct_dispatch = true
mqtt_properties {}
payload = """{"device_id": "${clientid}", "attributes": ${p}}"""
qos = 2
retain = true
topic = "v2/private/device_attribute"
user_properties = ""
}
function = republish
}
]
description = ""
enable = true
metadata {created_at = 1730259649238}
sql = """~
SELECT
clientid,
json_decode(payload) as p
FROM
"v2/devices/me/attributes"~"""
}
}
}
23 changes: 23 additions & 0 deletions docker/viot-background/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
FROM golang:1.23.2-alpine AS builder

WORKDIR /app

COPY ./viot-background .

RUN go mod download

RUN go build -o main ./cmd/main.go


FROM alpine:3.20

RUN addgroup -S viot && adduser -S viot -G viot

WORKDIR /app

COPY --from=builder /app/main /app/main

RUN chown -R viot:viot /app
USER viot

CMD ["/app/main"]
14 changes: 14 additions & 0 deletions viot-background/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
ENVIRONMENT=development

POSTGRES_DSN=postgres://postgres:postgres@localhost:5432/postgres

MQTT_BROKER_URL=tcp://localhost:1883
MQTT_CLIENT_ID=b348a2b5-5e1f-47bb-95d0-452002a78611
MQTT_USERNAME=ABCD123
MQTT_PASSWORD=ABCD123

DEVICE_DATA_PROCESSOR_MAX_BATCH_SIZE=1000
DEVICE_DATA_PROCESSOR_MAX_BATCH_INTERVAL_MS=2000

DEVICE_ATTRIBUTE_PROCESSOR_MAX_BATCH_SIZE=1000
DEVICE_ATTRIBUTE_PROCESSOR_MAX_BATCH_INTERVAL_MS=2000
20 changes: 20 additions & 0 deletions viot-background/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so

# Folders
_obj
_test
vendor

*.exe
*.test
*.prof
*.pprof
*.out
*.log

/bin
coverage.out
coverage.html
13 changes: 13 additions & 0 deletions viot-background/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
sqlc:
sqlc generate

test:
go test -v -cover -short ./...

server:
go run cmd/main.go

mock:
mockery --name=Store --dir=./db/repository --output=./db/mock --outpkg=mockdb --case=underscore

.PHONY: sqlc test server mock
97 changes: 97 additions & 0 deletions viot-background/cmd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package main

import (
"context"
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/jackc/pgx/v5/pgxpool"
"go.uber.org/zap"

"github.com/vuxmai/viot/viot-background/db/repository"
"github.com/vuxmai/viot/viot-background/internal/processor"
"github.com/vuxmai/viot/viot-background/pkg/config"
"github.com/vuxmai/viot/viot-background/pkg/logger"
)

const (
DeviceDataMqttTopic = "v2/private/device_data"
DeviceAttributeMqttTopic = "v2/private/device_attribute"
)

func NewMqttClient(cfg *config.MqttConfig, logger *zap.Logger) mqtt.Client {
opts := mqtt.NewClientOptions()
opts.AddBroker(cfg.BrokerUrl)
opts.SetClientID(cfg.ClientId)
opts.SetUsername(cfg.Username)
opts.SetPassword(cfg.Password)
opts.SetKeepAlive(time.Second * 60)

opts.OnConnect = func(client mqtt.Client) {
logger.Info("Connected to MQTT Broker")
}
opts.OnConnectionLost = func(client mqtt.Client, err error) {
logger.Error("Connection lost: %v", zap.Error(err))
}

client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
logger.Fatal("Failed to connect to MQTT Broker", zap.Error(token.Error()))
}

return client
}

func main() {
// Load the configuration
cfg := config.LoadConfig()

logger := logger.NewZapLogger(cfg.Environment)

// Context
ctx := context.Background()

// Database
db, err := pgxpool.New(ctx, cfg.PostgresDsn)
if err != nil {
logger.Fatal(err.Error())
}

// Repository
store := repository.NewStore(db)

// Channel
deviceDataCh := make(chan []byte, 10000)
deviceAttributeCh := make(chan []byte, 10000)

// Worker
deviceDataProcessor := processor.NewDeviceDataProcessor(deviceDataCh, logger)
deviceAttributeProcessor := processor.NewDeviceAttributeProcessor(deviceAttributeCh, logger)

deviceDataProcessor.Start(ctx, &cfg.DeviceDataProcessorConfig, store)
deviceAttributeProcessor.Start(ctx, &cfg.DeviceAttributeProcessorConfig, store)

// MQTT Client
client := NewMqttClient(&cfg.MqttConfig, logger)

// Subscribe to topic
token := client.Subscribe(DeviceDataMqttTopic, 2, func(_ mqtt.Client, msg mqtt.Message) {
deviceDataCh <- msg.Payload()
})
if token.Wait() && token.Error() != nil {
logger.Info("Subscribe error",
zap.Error(token.Error()),
zap.String("topic", DeviceDataMqttTopic))
}

token = client.Subscribe(DeviceAttributeMqttTopic, 2, func(_ mqtt.Client, msg mqtt.Message) {
deviceAttributeCh <- msg.Payload()
})
if token.Wait() && token.Error() != nil {
logger.Info("Subscribe error",
zap.Error(token.Error()),
zap.String("topic", DeviceAttributeMqttTopic))
}

select {}
}
20 changes: 20 additions & 0 deletions viot-background/codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
coverage:
status:
project:
default:
target: auto
threshold: 5%
if_not_found: success
patch:
default:
target: auto
threshold: 5%
if_not_found: success

ignore:
- "**/mocks/"
- "cmd/"
- "docs/"
- "pkg/"
- "vendor/"
- "proto/"
66 changes: 66 additions & 0 deletions viot-background/db/mock/store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 00fc1c8

Please sign in to comment.