Skip to content

Senders

Sun Jianbo edited this page Jan 23, 2018 · 12 revisions

Senders的主要作用是将Parser后的数据发送至Sender支持的各类服务,目前支持发送到的服务包括: Pandora、ElasticSearch、File、InfluxDB、MongoDB五种服务。

  1. Pandora Sender: 发送到Pandora(七牛大数据处理平台)。
  2. ElasticSearch Sender: 发送到ElasticSearch。
  3. File Sender: 发送到本地文件。
  4. InfluxDB Sender: 发送到InfluxDB。
  5. MongoDB Accumulate Sender: 聚合后发送到MongoDB。
  6. Kafka Sender: 发送到Kafka。

除了上述五种发送到服务之外,logkit还支持一种发送到本地磁盘进行数据发送预处理的模式(fault_tolerant设置为true,ft_strategy设置为always_save),进行了fault_tolerant模式设置后,数据的reader/parser就和发送异步,数据可以持续读取并解析进入磁盘队列,sender则可以多线程发送,可以有效提升发送效率,并发发送数据。

典型配置如下

"senders":[{
        "name":"pandora_sender",
        "sender_type":"pandora",
        "pandora_ak":"",
        "pandora_sk":"",
        "pandora_host":"https://pipeline.qiniu.com",
        "pandora_repo_name":"yourRepoName",
        "pandora_region":"nb",
        "pandora_schema":"field1 pandora_field1,field2,field3 pandora_field3",
        "fault_tolerant":"true",
        "ft_sync_every":"5",
        "ft_save_log_path":"./ft_log",
        "ft_write_limit":"1",
        "ft_strategy":"always_save",
        "ft_procs":"2"
}]
  1. name: 是sender的标识
  2. sender_type: sender类型,支持file, mongodb_acc, pandora, influxdb
  3. fault_tolerant: 是否用异步容错方式进行发送,默认为true。
  4. ft_save_log_path: 选填,当fault_tolerant为true时可以指定备份数据的存放路径。该路径必须为文件夹,该文件夹会作为本地磁盘队列,存放数据,进行异步容错发送。
  5. ft_sync_every:选填,当fault_tolerant为true时候,该参数可以确定多少次发送数据会记录一次本地磁盘队列的offset。
  6. ft_write_limit:选填,为了避免速率太快导致磁盘压力加大,可以根据系统情况自行限定写入本地磁盘的速率,单位MB/s。默认10MB/s
  7. ft_strategy: 选填,该选项设置为backup_only的时候,数据不经过本地队列直接发送到下游,设为always_save时则所有数据会先发送到本地队列,选concurrent的时候会直接并发发送,不经过队列。无论该选项设置什么,失败的数据都会加入到重试队列中异步循环重试。默认选项为backup_only
  8. ft_procs :该选项表示从本地队列获取数据点并向下游发送的并发数,如果ft_strategy设置为backup_only,则本项设置无效,只有本地队列有数据时,该项配置才有效,默认并发数为1.
  9. ft_memory_channel: 选填,默认为"false",不启用。开启该选项会使用memory channel作为 fault_tolerant 中disk queue的替代,相当于把fault_tolerant 作为一个队列使用,使得发送和数据读取解析变为异步,加速整个发送的过程。但是使用 memory channel 数据不落磁盘,会有数据丢失的风险。该功能适合与 ft_procs 连用,利用内存队列,异步后,在发送端多并发加速。
  10. ft_memory_channel_size: 选填,默认为"100",单位为batch,也就是100代表100个待发送的批次,当ft_memory_channel启用时有效,设置memory channel的大小。 注意:该选项设置的大小表达的是队列中可存储的元素个数,并不是占用的内存大小

补充说明

  • 设置fault_tolerant为"true"时,会维持一个本地队列缓存起需要发送的数据。当数据发送失败的时候会在本地队列进行重试,此时如果发送错误,不会影响logkit继续收集日志。
  • 设置fault_tolerant为"true"时,可以保证每次服务重启的时候都从上次的发送offset继续进行发送。在parse过程中产生中间结果,需要高容错性发送的时候最适合采用此种模式。
  • 设置fault_tolerant为"true"时,一般希望日志收集程序对机器性能影响较小的时候,建议首先考虑将ft_strategy设置为backup_only,配置这个选项会首先尝试发送数据,发送失败的数据才放到备份队列等待下次重试。如果还希望更小的性能影响,并且数据敏感性不高,也可以不使用fault_tolerant模式。
  • 当日志发送的速度已经赶不上日志生产速度时,设置fault_tolerant为"true",且ft_strategy设置为concurrent,通过设置ft_procs加大并发,ft_procs设置越大并发度越高,发送越快,对机器性能带来的影响也越大。
  • 如果ft_procs增加已经不能再加大发送日志速度,那么就需要 加大ft_write_limit限制,为logkit 的队列提升磁盘的读写速度。
  • senders支持多个sender配置,但是我们不推荐在senders中加入多个sender,因为一旦某个sender发送缓慢,就会导致其他sender等待这个sender发送成功后再发。简单来说,配置多个sender会互相影响。

如何添加更多Sender(自定义Sender)?

Sender的主要作用是将队列中的数据发送至Sender支持的各类服务,实现logkit的sender仅需实现如下接口:

type Sender interface {
	Name() string
	Send([]Data) error
	Close() error
}

其中包括三个函数,Name()标识Sender名称,Send()发送数据,Close()关闭一些服务连接等常规操作。

实现一个Sender的注意事项

  1. 多线程发送:多线程发送可以充分利用CPU多核的能力,提升发送效率,这一点我们已经设计了ft sender作为框架解决了该问题。
  2. 错误处理与等待:服务端偶尔出现一些异常是很正常的事情,此时就要做好不同错误情况的处理,不会因为某个错误而导致程序出错,另外一方面,一旦发现出错应该让sender等待一定时间再发送,设定一个对后端友好的变长错误等待机制也非常重要。一般情况下,可以采用随着连续错误出现递增等待时间的方法,直到一个最顶峰(如10s),就不再增加,当服务端恢复后再取消等待。
  3. 数据压缩发送:带宽是非常珍贵的资源,通常服务端都会提供gzip压缩的数据接收接口,而sender利用这些接口,将数据压缩后发送,能节省大量带宽成本。
  4. 带宽限流:通常情况下数据收集工具只是机器上的一个附属程序,主要资源如带宽还是要预留给主服务,所以限制sender的带宽用量也是非常重要的功能,限流的方法就可以采用前面Channel一节提到的令牌桶算法。
  5. 字段填充(UUID/timestamp):通常情况下收集的数据信息可能不是完备的,需要填充一些信息进去,如全局唯一的UUID、代表收集时间的timestamp等字段,提供这些字段自动填充的功能,有利于用户对其数据做唯一性、时效性等方面的判断。
  6. 字段别名:解析后的字段名称中经常会出现一些特殊字符,如"$","@"等符号,如果发送的服务端不支持这些特殊字符,就需要提供一些重命名的功能,将这些字段映射到一个别的名称。
  7. 字段筛选:未必解析后的所有字段数据都需要发送,这时候提供一个字段筛选的功能,可以方便用户选择去掉一些无用的字段,也可以节省传输的成本。也可以在Transformer中也提供类似discard transformer的功能,将某个字段去掉。
  8. 类型转换:类型转换是一个说来简单但是做起来非常繁琐的事情,不只是纯粹的整型转换成浮点型,或者字符串转成整型这么简单,还涉及到你发送到的服务端支持的一些特殊类型,如date时间类型等,更多的类型转换实际上相当于最佳实践,能够做好这些类型转换,就会让用户体验得到极大提升。
  9. 简单、简单、简单:除了上述这些,剩下的就是尽可能的让用户使用简单。比如假设我们要写一个 mysql sender,mysql的数据库和表如果不存在,可能数据会发送失败,那就可以考虑提前创建;又比如数据如果有更新了,那么就需要将针对这些更新的字段去更新服务的Schema等等。

注册Sender

与Parser类似,实现完毕的Sender注意要注册进SenderRegistry中,如下所示:

func NewSenderRegistry() *SenderRegistry {
	ret := &SenderRegistry{
		senderTypeMap: map[string]func(conf.MapConf) (Sender, error){},
	}
	ret.RegisterSender(TypeFile, NewFileSender)
	ret.RegisterSender(TypePandora, NewPandoraSender)
	ret.RegisterSender(TypeMongodbAccumulate, NewMongodbAccSender)
	ret.RegisterSender(TypeInfluxdb, NewInfluxdbSender)
	ret.RegisterSender(TypeElastic, NewElasticSender)
	ret.RegisterSender(TypeDiscard, NewDiscardSender)

       // ret.RegisterSender(TypeMyNewSender, NewMyNewSender)

	return ret
}

一个示例的自定义Sender

package samples

import (
	"fmt"

	"github.com/qiniu/log"
	"github.com/qiniu/logkit/conf"
	"github.com/qiniu/logkit/sender"
)

// CustomSender 仅作为示例,什么都不做,只是把数据打印出来而已
type CustomSender struct {
	name   string
	prefix string
}

func NewMySender(c conf.MapConf) (sender.Sender, error) {
	name, _ := c.GetStringOr("name", "my_sender_name")
	prefix, _ := c.GetStringOr("prefix", "")
	return &CustomSender{
		name:   name,
		prefix: prefix,
	}, nil
}

func (c *CustomSender) Name() string {
	return c.name
}

func (c *CustomSender) Send(datas []sender.Data) error {
	for _, d := range datas {
		var line string
		for k, v := range d {
			line += fmt.Sprintf("%v=%v ", k, v)
		}
		log.Info(c.prefix, line)
	}
	return nil
}
func (c *CustomSender) Close() error {
	return nil
}

关于 Sender 发送信息统计的约定

由于 sender 模块具有失败重发功能,信息统计比较复杂,为了减少分歧,现对 sender 模块的发送信息统计做以下约定:

  1. 所有 sender 的发送信息统计由其调用方负责:
    • 由于 sender 本身无法知道自己发送的 data 是首次发送还是失败重发的,所以每个 sender 只做好自己本次数据发送的状态统计,并将本次发送的状态反馈给其调用方;
    • sender 的调用方目前有两种:
      • 第一种为 runner(目前具体为 runner 和 metric runner 两个), 在现有两个 runner 的实现中,均有一个 trySend() 的函数,此函数中对于数据为首次发送还是失败重发做了区分,并分别做了处理,具体为:
        • 若为首次发送,则 总成功条数 = 之前成功条数 + 本次成功条数,总失败条数 = 之前失败条数 + 本次失败条数;
        • 若为失败重发,则 总成功条数 = 之前成功条数 + 本次成功条数,总失败条数 = 之前失败条数 - 本次成功条数;
      • 第二种为 fault_tolerant sender,该 sender 是对普通 sender 的高级封装,它可以根据用户的设定将发送失败的数据进行本地持久化,并不断的调用普通 sender 重新发送,直到发送成功为止。所以该 sender 为 StatsSender sender,它本身记录着所调用的 sender 的发送信息;
    • 除非需要,请不要将 sender 定义为 StatsSender, 如果定义为 StatsSender, 那么需要对发送信息的统计负责。包括首次发送和失败重发,都要能够准确记录;
  2. 虽然在开启 ft 模式后, runner 也调用了 fault_tolerant sender, 但是 fault_tolerant sender 不必将统计信息返回给 runner (其实,对于 fault_tolerant sender 来说,它失败重发的那部分状态是无法直接反馈传递给 runner 的)。因为当我们请求 runner 目前的状态的时候,StatsSender 的状态是从 StatsSender 那里直接获取的,而不是由 runner 提供. 具体可以看 runner 中 Status() 函数的实现;
  3. sender 的返回值目前有两种:
    • 第一种为返回普通的 error, 若返回 nil 则调用方认为所有数据发送成功,否则认为所有数据发送失败;
    • 第二种为返回 utils.StatsError, 返回值为这种类型的可以在中具体写明发送成功了多少条,失败了多少条,具体的错误信息是什么等,我们推荐使用这种返回值形式;
Clone this wiki locally