Skip to content

Dynamic DataSource Extension Framework Design

louyuting edited this page Mar 10, 2020 · 2 revisions

DataSource Extension

The overall workflow is the same as the Java version. Please see the architecture below:

This article focuses on the design of updating the property on the application side.

Abstract Model:

Property

// PropertyConverter func is to converter source message bytes to the specific property.
type PropertyConverter func(src []byte) interface{}

// PropertyUpdater func is to update the specific properties to downstream.
type PropertyUpdater func(data interface{}) error

// abstract interface to
type PropertyHandler interface {
	// check whether the current src is consistent with last update property
	isPropertyConsistent(src interface{}) bool
	// handle the current property
	Handle(src []byte) error
}

// DefaultPropertyHandler encapsulate the Converter and updater of property.
// One DefaultPropertyHandler instance is to handle one property type.
// DefaultPropertyHandler should check whether current property is consistent with last update property
// converter convert the message to the specific property
// updater update the specific property to downstream.
type DefaultPropertyHandler struct {
	lastUpdateProperty interface{}

	converter PropertyConverter
	updater   PropertyUpdater
}

func (h *DefaultPropertyHandler) isPropertyConsistent(src interface{}) bool {
	isConsistent := reflect.DeepEqual(src, h.lastUpdateProperty)
	if isConsistent {
		return true
	} else {
		h.lastUpdateProperty = src
		return false
	}
}

func (h *DefaultPropertyHandler) Handle(src []byte) error {
	defer func() {
		if err := recover(); err != nil && logger != nil {
			logger.Panicf("Unexpected panic: %+v", errors.Errorf("%+v", err))
		}
	}()
	// converter to target property
	realProperty := h.converter(src)
	isConsistent := h.isPropertyConsistent(realProperty)
	if isConsistent {
		return nil
	}
	return h.updater(realProperty)
}

func NewSinglePropertyHandler(converter PropertyConverter, updater PropertyUpdater) *DefaultPropertyHandler {
	return &DefaultPropertyHandler{
		converter: converter,
		updater:   updater,
	}
}

PropertyHandler is an abstract interface to describe the processing logic for the dynamic property, and DefaultPropertyHandler is the default implementation.

The process of a dynamic property is divided into two parts:

  1. Converter: Read source bytes of property and convert to target type property;
  2. Updater: Update the newer property to the downstream.

That is to say, each PropertyHandler must have a Converter function and a Updater function.

In order to avoid the useless update operation, DefaultPropertyHandler instance will cache the last updated property and check the consistency between current property value and last property value.

Datasource

Datasource is the generic interface to describe the datasource instance. Each DataSource instance listen in one property.

type DataSource interface {
	// Add specified property handler in current datasource
	AddPropertyHandler(h PropertyHandler)
	// Remove specified property handler in current datasource
	RemovePropertyHandler(h PropertyHandler)
	// Read original data from the data source.
	ReadSource() []byte
	// Initialize, init datasource and load initial rules
	// start listener
	Initialize()
	// Close the data source.
	io.Closer
}

Each DataSource instance listen on one property. There might be one or multi PropertyHandler in one Datasource to handle the same property.

the workflow of updating is as below:

  1. Read dynamic datasource message;
  2. iterate DataSourceHandler List to handle dynamic datasource message;
  3. Call Convert function of DataSourceHandler to convert raw message to specific property;
  4. Call Update function of DataSourceHandler to update specific property to downstream component.

Use case

freshable file datasource

TBD