Box is a high-performance, transactional message processing library that provides reliable message queuing and processing capabilities using PostgreSQL as the backing store.
- 📦 PostgreSQL-backed message storage
- 🔄 FIFO (First-In-First-Out) message processing
- 🔒 Transactional guarantees
- 🎯 Message type-based routing
- 📊 Consumer offset tracking
- 🚀 Batch processing support
- 🔌 Pluggable message handlers
- 📈 Processing status monitoring
go get github.com/thefabric-io/box
import (
"github.com/thefabric-io/box"
"github.com/thefabric-io/box/pgbox"
)
// Create a new PostgreSQL box
box := pgbox.NewPostgresBox("myschema", "mytopic", "mytype")
type MyHandler struct {}
func (h *MyHandler) HandleEvent(ctx context.Context, tx transactional.Transaction, msg box.Message) error {
// Process your message here
return nil
}
// Create a FIFO subscriber
subscriber := box.NewFIFOSubscriber(
transactional, // Your transactional interface implementation
"myconsumer", // Consumer name
box, // The box instance
100, // Batch size
time.Second * 5, // Wait time between empty batches
)
// Register message handler
subscriber.RegisterHandler("message_type", &MyHandler{})
// Create a manager
manager, _ := box.NewManager()
// Add subscriber
manager.AddProcessors(subscriber)
// Start processing
ctx := context.Background()
manager.Run(ctx)
Box implements a message processing system with the following key components:
- Box : Interface for message storage and retrieval
- Message : Core message structure with metadata
- Handler : Message processing logic
- Subscriber : Message consumption and routing
- Manager : Orchestrates multiple subscribers
The PostgreSQL box can be configured with:
- Schema name
- Topic name
- Message type
- Batch size
- Wait time between empty batches
-
Transaction Management
- Always use the provided transaction context
- Handle transaction rollbacks appropriately
-
Error Handling
- Implement proper error handling in handlers
- Use context for timeouts and cancellation
-
Message Types
- Use consistent message type naming
- Register handlers for all expected message types
-
Monitoring
- Track processing status using the Status() method
- Implement proper logging in handlers
Contributions are welcome! Please feel free to submit a Pull Request.
- Fork the repository
- Create your feature branch ( git checkout -b feature/amazing-feature )
- Commit your changes ( git commit -m 'Add some amazing feature' )
- Push to the branch ( git push origin feature/amazing-feature )
- Open a Pull Request
This project is licensed under the MIT License - see the LICENSE file for details.
- Built with sqlx
- Uses lib/pq for PostgreSQL support
For support, please open an issue in the GitHub repository.