源码解析 4层流量治理,tcp流量dump
 · 阅读需 5 分钟
作者简介: 龚中强,是开源社区的爱好者,致力于拥抱开源。
写作时间: 2022年4月26日
Overview
此文档的目的在于分析 tcp 流量 dump 的实现
前提:
文档内容所涉及代码版本如下
https://github.com/mosn/layotto
Layotto 0e97e970dc504e0298017bd956d2841c44c0810b(main分支)
源码分析
代码均在: tcpcopy代码
model.go分析
此类是 tcpcopy 的配置对象的核心类
type DumpConfig struct {
	Switch     string  `json:"switch"`       // dump 开关.配置值:'ON' 或 'OFF'
	Interval   int     `json:"interval"`     // dump 采样间隔, 单位: 秒
	Duration   int     `json:"duration"`     // 单个采样周期, 单位: 秒
	CpuMaxRate float64 `json:"cpu_max_rate"` // cpu 最大使用率。当超过此阈值,dump 功能将停止
	MemMaxRate float64 `json:"mem_max_rate"` // mem 最大使用率。当超过此阈值,dump 功能将停止
}
type DumpUploadDynamicConfig struct {
	Unique_sample_window string             // 指定采样窗口
	BusinessType         _type.BusinessType // 业务类型
	Port                 string             // 端口
	Binary_flow_data     []byte             // 二进制数据
	Portrait_data        string             // 用户上传的数据
}
persistence.go分析
此类是 tcpcopy 的 dump 持久化核心处理类
// 该方法在 tcpcopy.go 中 OnData 中调用
func IsPersistence() bool {
	// 判断 dump 开关是否开启
	if !strategy.DumpSwitch {
		if log.DefaultLogger.GetLogLevel() >= log.DEBUG {
			log.DefaultLogger.Debugf("%s the dump switch is %t", model.LogDumpKey, strategy.DumpSwitch)
		}
		return false
	}
	// 判断是否在采样窗口中
	if atomic.LoadInt32(&strategy.DumpSampleFlag) == 0 {
		if log.DefaultLogger.GetLogLevel() >= log.DEBUG {
			log.DefaultLogger.Debugf("%s the dump sample flag is %d", model.LogDumpKey, strategy.DumpSampleFlag)
		}
		return false
	}
	// 判断是否 dump 功能停止(获取系统负载判断处理器和内存是否超过 tcpcopy 的阈值,如果超过则停止)
	if !strategy.IsAvaliable() {
		if log.DefaultLogger.GetLogLevel() >= log.DEBUG {
			log.DefaultLogger.Debugf("%s the system usages are beyond max rate.", model.LogDumpKey)
		}
		return false
	}
	return true
}
// 根据配置信息持久化数据
func persistence(config *model.DumpUploadDynamicConfig) {
	// 1.持久化二进制数据
	if config.Binary_flow_data != nil && config.Port != "" {
		if GetTcpcopyLogger().GetLogLevel() >= log.INFO {
			GetTcpcopyLogger().Infof("[%s][%s]% x", config.Unique_sample_window, config.Port, config.Binary_flow_data)
		}
	}
	if config.Portrait_data != "" && config.BusinessType != "" {
		// 2. 持久化用户定义的数据
		if GetPortraitDataLogger().GetLogLevel() >= log.INFO {
			GetPortraitDataLogger().Infof("[%s][%s][%s]%s", config.Unique_sample_window, config.BusinessType, config.Port, config.Portrait_data)
		}
		// 3. 增量持久化内存中的配置信息的变动内容
		buf, err := configmanager.DumpJSON()
		if err != nil {
			if log.DefaultLogger.GetLogLevel() >= log.DEBUG {
				log.DefaultLogger.Debugf("[dump] Failed to load mosn config mem.")
			}
			return
		}
		// 3.1. 如果数据变化则 dump 
		tmpMd5ValueOfMemDump := common.CalculateMd5ForBytes(buf)
		memLogger := GetMemLogger()
		if tmpMd5ValueOfMemDump != md5ValueOfMemDump ||
			(tmpMd5ValueOfMemDump == md5ValueOfMemDump && common.GetFileSize(getMemConfDumpFilePath()) <= 0) {
			md5ValueOfMemDump = tmpMd5ValueOfMemDump
			if memLogger.GetLogLevel() >= log.INFO {
				memLogger.Infof("[%s]%s", config.Unique_sample_window, buf)
			}
		} else {
			if memLogger.GetLogLevel() >= log.INFO {
				memLogger.Infof("[%s]%+v", config.Unique_sample_window, incrementLog)
			}
		}
	}
}
tcpcopy.go分析
此类为是 tcpcopy 的核心类。
// 向 Mosn 注册 NetWork 
func init() {
	api.RegisterNetwork("tcpcopy", CreateTcpcopyFactory)
}
// 返回 tcpcopy 工厂
func CreateTcpcopyFactory(cfg map[string]interface{}) (api.NetworkFilterChainFactory, error) {
	tcpConfig := &config{}
	// dump 策略转静态配置
	if stg, ok := cfg["strategy"]; ok {
	...
	}
	// TODO extract some other fields
	return &tcpcopyFactory{
		cfg: tcpConfig,
	}, nil
}
// 供 pkg/configmanager/parser.go 调用添加或者更新Network filter factory
func (f *tcpcopyFactory) Init(param interface{}) error {
	// 设置监听的地址和端口配置
	...
	return nil
}
// 实现的是 ReadFilter 的 OnData 接口,每次从连接拿到数据都进方法进行处理
func (f *tcpcopyFactory) OnData(data types.IoBuffer) (res api.FilterStatus) {
	// 判断当前请求数据是否需要采样 dump 
	if !persistence.IsPersistence() {
		return api.Continue
	}
	// 异步的采样 dump
	config := model.NewDumpUploadDynamicConfig(strategy.DumpSampleUuid, "", f.cfg.port, data.Bytes(), "")
	persistence.GetDumpWorkPoolInstance().Schedule(config)
	return api.Continue
}
最后我们再来回顾一下整体流程走向:
- 
从 tcpcopy.go 的初始化函数init() 开始,程序向 CreateGRPCServerFilterFactory 传入 CreateTcpcopyFactory.
 - 
Mosn 创建出一个filter chain(代码位置factory.go) ,通过循环调用CreateFilterChain将所有的filter加入到链路结构包括本文的 tcpcopy.
 - 
当流量通过 mosn 将会进入到 tcpcopy.go 的 OnData 方法进行 tcpdump 的逻辑处理.