Go语言学习13-常见软件架构的实现
架构模式
An architectural pattern is a general, reusable solution to a commonly occurring problem in software architectural within a given context. ——wikipedia
Pipe-Filter 架构

Pipe-Filter 模式
-  非常适合于数据处理及数据分析系统 
-  Filter 封装数据处理的功能 
-  松耦合: Filter只跟数据(格式) 耦合 
-  Pipe用于连接 Filter 传递数据或者在异步处理过程中缓冲数据流 进程内同步调用时, pipe 演变为数据在方法调用间传递 
Filter和组合模式


示例
 
 
// filter.go
// Package pipefilter is to define the interfaces
// and the structures for pipe-filter style implementation
package pipefilter
// Request is the input of the filter
type Request interface{}
// Response is the output of the filter
type Response interface{}
// Filter interface is the definition of the data processing components
// Pipe-Filter structure
type Filter interface {
	Process(data Request) (Response, error)
}
// split_filter.go
package pipefilter
import (
	"errors"
	"strings"
)
var SplitFilterWrongFormatError = errors.New("input data should be string")
type SplitFilter struct {
	delimiter string
}
func NewSplitFilter(delimiter string) *SplitFilter {
	return &SplitFilter{delimiter}
}
func (sf *SplitFilter) Process(data Request) (Response, error) {
	str, ok := data.(string) // 检查数据格式/类型, 是否可以处理
	if !ok {
		return nil, SplitFilterWrongFormatError
	}
	parts := strings.Split(str, sf.delimiter)
	return parts, nil
}
// to_int_filter.go
package pipefilter
import (
	"errors"
	"strconv"
)
var ToIntFilterWrongFormatError = errors.New("input data should be []string")
type ToIntFilter struct {
}
func NewToIntFilter() *ToIntFilter {
	return &ToIntFilter{}
}
func (tif *ToIntFilter) Process(data Request) (Response, error) {
	parts, ok := data.([]string)
	if !ok {
		return nil, ToIntFilterWrongFormatError
	}
	ret := []int{}
	for _, part := range parts {
		s, err := strconv.Atoi(part)
		if err != nil {
			return nil, err
		}
		ret = append(ret, s)
	}
	return ret, nil
}
// sum_filter.go
package pipefilter
import "errors"
var SumFilterWrongFormatError = errors.New("input data should be []int")
type SumFilter struct {
}
func NewSumFilter() *SumFilter {
	return &SumFilter{}
}
func (sf *SumFilter) Process(data Request) (Response, error) {
	elems, ok := data.([]int)
	if !ok {
		return nil, SumFilterWrongFormatError
	}
	ret := 0
	for _, elem := range elems {
		ret += elem
	}
	return ret, nil
}
// straight_pipeline.go
package pipefilter
// StraightPipeline is composed of the filters, and the filters are piled as a straight line.
type StraightPipeline struct {
	Name    string
	Filters *[]Filter
}
// NewStraightPipeline create a new StraightPipelineWithWallTime
func NewStraightPipeline(name string, filters ...Filter) *StraightPipeline {
	return &StraightPipeline{
		Name:    name,
		Filters: &filters,
	}
}
// Process is to process the coming data by the pipeline
func (f *StraightPipeline) Process(data Request) (Response, error) {
	var ret interface{}
	var err error
	for _, filter := range *f.Filters {
		ret, err = filter.Process(data)
		if err != nil {
			return ret, err
		}
		data = ret
	}
	return ret, err
}
Micro Kernel架构

- 特点 
  - 易于扩展
- 错误隔离
- 保持架构一致性
 
- 要点 
  - 内核包含公共流程或通用逻辑
- 将可变或可扩展部分规划为扩展点
- 抽象扩展点行为, 定义接口
- 利用插件进行扩展
 
示例

package microkernel
import (
	"context"
	"errors"
	"fmt"
	"strings"
	"sync"
)
const (
	Waiting = iota
	Running
)
var WrongStateError = errors.New("can not take the operation in the current state")
type CollectorsError struct {
	CollectorsErrors []error
}
func (ce CollectorsError) Error() string {
	var strs []string
	for _, err := range ce.CollectorsErrors {
		strs = append(strs, err.Error())
	}
	return strings.Join(strs, ";")
}
type Event struct {
	Source  string
	Content string
}
type EventReceiver interface {
	OnEvent(evt Event)
}
type Collector interface {
	Init(evtReceiver EventReceiver) error
	Start(agtCtx context.Context) error
	Stop() error
	Destroy() error
}
type Agent struct {
	collectors map[string]Collector
	evtBuf     chan Event
	cancel     context.CancelFunc
	ctx        context.Context
	state      int
}
func (agt *Agent) EventProcessGroutine() {
	var evtSeg [10]Event
	for {
		for i := 0; i < 10; i++ {
			select {
			case evtSeg[i] = <-agt.evtBuf:
			case <-agt.ctx.Done():
				return
			}
		}
		fmt.Println(evtSeg)
	}
}
func NewAgent(sizeEvtBuf int) *Agent {
	agt := Agent{
		collectors: map[string]Collector{},
		evtBuf:     make(chan Event, sizeEvtBuf),
		state:      Waiting,
	}
	return &agt
}
func (agt *Agent) RegisterCollector(name string, collector Collector) error {
	if agt.state != Waiting {
		return WrongStateError
	}
	agt.collectors[name] = collector
	return collector.Init(agt)
}
func (agt *Agent) startCollectors() error {
	var err error
	var errs CollectorsError
	var mutex sync.Mutex
	for name, collector := range agt.collectors {
		go func(name string, collector Collector, ctx context.Context) {
			defer func() {
				mutex.Unlock()
			}()
			err = collector.Start(ctx)
			mutex.Lock()
			if err != nil {
				errs.CollectorsErrors = append(errs.CollectorsErrors,
					errors.New(name+":"+err.Error()))
			}
		}(name, collector, agt.ctx)
	}
	if len(errs.CollectorsErrors) == 0 {
		return nil
	}
	return errs
}
func (agt *Agent) stopCollectors() error {
	var err error
	var errs CollectorsError
	for name, collector := range agt.collectors {
		if err = collector.Stop(); err != nil {
			errs.CollectorsErrors = append(errs.CollectorsErrors,
				errors.New(name+":"+err.Error()))
		}
	}
	if len(errs.CollectorsErrors) == 0 {
		return nil
	}
	return errs
}
func (agt *Agent) destroyCollectors() error {
	var err error
	var errs CollectorsError
	for name, collector := range agt.collectors {
		if err = collector.Destroy(); err != nil {
			errs.CollectorsErrors = append(errs.CollectorsErrors,
				errors.New(name+":"+err.Error()))
		}
	}
	if len(errs.CollectorsErrors) == 0 {
		return nil
	}
	return errs
}
func (agt *Agent) Start() error {
	if agt.state != Waiting {
		return WrongStateError
	}
	agt.state = Running
	agt.ctx, agt.cancel = context.WithCancel(context.Background())
	go agt.EventProcessGroutine()
	return agt.startCollectors()
}
func (agt *Agent) Stop() error {
	if agt.state != Running {
		return WrongStateError
	}
	agt.state = Waiting
	agt.cancel()
	return agt.stopCollectors()
}
func (agt *Agent) Destroy() error {
	if agt.state != Waiting {
		return WrongStateError
	}
	return agt.destroyCollectors()
}
func (agt *Agent) OnEvent(evt Event) {
	agt.evtBuf <- evt
}


![[蓝桥杯 2023 省 B] 飞机降落(暴搜DFS+贪心)](https://img-blog.csdnimg.cn/direct/adfa4a489ba94996a38144a6665619f2.png)
















