源码视角讲解Sentinel服务治理工作原理

目录
一、服务治理流程
1.服务治理流程图
2.重要概念
3.示例代码
二、定义流控规则
1.定义规则示例
2.将规则更新到缓存
三、定义受保护的资源
1.示例代码
2.资源上下文
3.构造资源插槽链
四、链条执行与规则判断
一、服务治理流程

通过定义规则、受保护的资源,统计调用链及运行时指标;通过比较运行指标与定义的规则,符合规则放行,不符合则阻塞。

1.服务治理流程图

2.重要概念
Resource

资源受保护的一段代码,ResourceWrapper实现类StringResourceWrapper和MethodResourceWrapper。

Context

Context存储当前调用链的元数据

String name: 上下文名称
DefaultNode entranceNode: 当前调用链的入口节点(根节点)
private Entry curEntry:当前调用Entry
private String origin:调用源可以是appId
private final boolean async:是否异步
Slot

每个插槽负责不同的职责,统计流量信息、流控规则校验等。
不同规则的Slot形成插槽链表,逐级向下执行。

Entry

Entry通行证token,允许通过的请求返回Entry对象,反之返回BlockException。

private long createTime: 创建时间用于统计RT
private Node curNode: 记录当前上下文中资源的统计信息
private Node originNode:调用源的统计信息,调用源可以是appId
protected ResourceWrapper resourceWrapper:资源封装类
Node

Node保存资源的实时统计信息

//每分钟的请求数(通过的+阻塞的)
long totalRequest()
//每分钟通过的请求数
long totalPass()
//每分钟请求成功的数量
long totalSuccess();
//每分钟请求阻塞的数量
long blockRequest()
//每分钟业务异常数量
long totalException()
//通过的QPS
double passQps()
//阻塞的QPS
double blockQps()
//总的QPS
double totalQps()
//成功的QPS
double successQps()
//成功QPS的最大值 到当前时间
double maxSuccessQps()
//异常的QPS
double exceptionQps()
//平均RT
double avgRt()
//最小的RT
double minRt()
//返回当前活动的线程数
int curThreadNum()
//前一秒的block QPS
double previousBlockQps()
//前一秒通过的 QPS
double previousPassQps()
//获取资源的有效统计信息
Map<Long, MetricNode> metrics()
//增加通过请求的数量
void addPassRequest(int count)
//增加rt时间和成功的请求数量
void addRtAndSuccess(long rt, int success)
//增加阻塞的QPS
void increaseBlockQps(int count)
//增加异常QPS
void increaseExceptionQps(int count)
//增加当前线程的数量
void increaseThreadNum()
//减少当前线程的数量
void decreaseThreadNum()
//重置计数器
void reset()
3.示例代码

FlowQpsDemo.java

public static void main(String[] args) throws Exception {
    //定义流控规则
    initFlowQpsRule();
    //打印流控信息
    tick();
    //模拟多线程触发流控
    simulateTraffic();

    System.out.println("===== begin to do flow control");
    System.out.println("only 20 requests per second can pass");
}

小结:通过流控示例代码分析流Sentinel服务治理的工作流程。

二、定义流控规则
1.定义规则示例
private static void initFlowQpsRule() {
    List<FlowRule> rules = new ArrayList<FlowRule>();
    FlowRule rule1 = new FlowRule();
    // 设置资源名称
    rule1.setResource(KEY);
    // 阀值
    rule1.setCount(20);
    // 基于运行指标QPS阀值类型
    rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
    // 对调用来源不做限制
    rule1.setLimitApp("default");
    rules.add(rule1);
    // 限流规则加入到缓存
    FlowRuleManager.loadRules(rules);
}
2.将规则更新到缓存

FlowRuleManager#成员变量

// 缓存定义的规则
private static final Map<String, List<FlowRule>> flowRules = new ConcurrentHashMap<String, List<FlowRule>>();
// 回调的Listener实现
private static final FlowPropertyListener LISTENER = new FlowPropertyListener();
// 用于注册Listener
private static SentinelProperty<List<FlowRule>> currentProperty = new DynamicSentinelProperty<List<FlowRule>>();

static {
    //将FlowPropertyListener注册到currentProperty中
    currentProperty.addListener(LISTENER);
    SCHEDULER.scheduleAtFixedRate(new MetricTimerListener(), 0, 1, TimeUnit.SECONDS);
}

小结:FlowRuleManager通过静态方法将FlowPropertyListener注册到DynamicSentinelProperty中。

FlowRuleManager#loadRules

public static void loadRules(List<FlowRule> rules) {
  currentProperty.updateValue(rules);
}

DynamicSentinelProperty#updateValue

@Override
public boolean updateValue(T newValue) {
    //...
    value = newValue;
    for (PropertyListener<T> listener : listeners) {
        //回调listener
        listener.configUpdate(newValue);
    }
    return true;
}

FlowRuleManager#FlowPropertyListener#configUpdate

private static final class FlowPropertyListener implements PropertyListener<List<FlowRule>> {
//回调执行方法,更新缓存规则
@Override
public void configUpdate(List<FlowRule> value) {
    Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value);
    if (rules != null) {
        flowRules.clear();
        flowRules.putAll(rules);
    }
    RecordLog.info("[FlowRuleManager] Flow rules received: " + flowRules);
}
}

小结:FlowRuleManager的内部静态类FlowPropertyListener#configUpdate方法将规则更新到缓存,缓存在FlowRuleManager的成员变量flowRules中。

三、定义受保护的资源
1.示例代码
while (!stop) {
    Entry entry = null;
    try {
        // 定义受保护的资源
        entry = SphU.entry(KEY);
        // 获得通行证 允许通过
        pass.addAndGet(1);
    } catch (BlockException e1) {
        // 未获得通行证 被阻塞
        block.incrementAndGet();
    } catch (Exception e2) {
        // biz exception
    } finally {
        total.incrementAndGet();
        if (entry != null) {
            entry.exit();
        }
    }
}

总结:通过SphU.entry作为入口,如果符合规则放行返回Entry实例;否则抛出BlockException被阻塞。

2.资源上下文

CtSph#entryWithPriority

private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
//从线程变量获取资源上下文
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
    return new CtEntry(resourceWrapper, null, context);
}
//创建默认资源上下文 名称为:sentinel_default_context
if (context == null) {
    context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType());
}

小结:先从当前线程上下文获取资源上下文Context;如果null则使用默认上下文名称sentinel_default_context通过MyContextUtil.myEnter创建。

创建上下文Context

CtSph#MyContextUtil#myEnter->ContextUtil#trueEnter

//从当前线程变量中 获取上下文
Context context = contextHolder.get();
    if (context == null) {
    ]//创建用于统计进来流量的Node
    node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
    //Add entrance node.
    Constants.ROOT.addChild(node);
    //将该资源的Node加入缓存
    Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
    newMap.putAll(contextNameNodeMap);
    //name为上下文名称
    newMap.put(name, node);
    contextNameNodeMap = newMap;
}
//构建资源上下文 包含:统计信息的Node
context = new Context(node, name);
context.setOrigin(origin);
//设置到当前线程变量
contextHolder.set(context);

小结:创建负责入口统计信息的EntranceNode;构建资源上下文并设置到线程变量ThreadLocal中。

3.构造资源插槽链

CtSph#entryWithPriority

private Entry entryWithPriority()
        throws BlockException {
...
//构造资源的插槽链
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
...
}

CtSph#lookProcessChain

 ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
    //获取该资源关联的插槽
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
    //加锁
    synchronized (LOCK) {
        //再次获取该资源关联的插槽
        chain = chainMap.get(resourceWrapper);
        //插槽链为null创建新链条
        if (chain == null) {
            // Entry size limit.
            //插槽链条的数量最多为6000个
            if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                return null;
            }
            //构造插槽链条
            chain = SlotChainProvider.newSlotChain();
            Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                chainMap.size() + 1);
            newMap.putAll(chainMap);
            newMap.put(resourceWrapper, chain);
            chainMap = newMap;
        }
    }
}
return chain;
}

SlotChainProvider#newSlotChain

public static ProcessorSlotChain newSlotChain() {
    // 构造插槽链
    if (builder != null) {
        return builder.build();
    }
    // 加载自定义SlotChainBuilder
    // 默认选择DefaultProcessorSlotChain
    resolveSlotChainBuilder();
    // 再次判断为null使用默认的DefaultProcessorSlotChain
    if (builder == null) {
        ...
        builder = new DefaultSlotChainBuilder();
    }
    return builder.build();
}

DefaultSlotChainBuilder#build

ProcessorSlotChain chain = new DefaultProcessorSlotChain();
chain.addLast(new NodeSelectorSlot());
chain.addLast(new ClusterBuilderSlot());
chain.addLast(new LogSlot());
chain.addLast(new StatisticSlot());
chain.addLast(new SystemSlot());
chain.addLast(new AuthoritySlot());
chain.addLast(new FlowSlot());
chain.addLast(new DegradeSlot());

小结:从创建插槽链流程可以看出,使用默认插槽构造器DefaultProcessorSlotChain创建8个插槽形成链表结构;分别为:NodeSelectorSlot、ClusterBuilderSlot、LogSlot、StatisticSlot、SystemSlot、AuthoritySlot、FlowSlot、DegradeSlot。插槽构造器也可以自定义。

四、链条执行与规则判断

CtSph#entryWithPriority

private Entry entryWithPriority(
   Entry e = new CtEntry(resourceWrapper, chain, context);
    try {
        //触发插槽链执行及规则校验
        chain.entry();
    } catch (BlockException e1) {
        //触发流控向上抛出BlockException
        e.exit(count, args);
        throw e1;
    } catch (Throwable e1) {
    }
    //返回通行证Entry
    return e;
}

小结:通过chain.entry()触发插槽链条执行,默认会经过上面8个插槽。每个插槽履行自己职责,判断是否符合流控规则,符合规则放行返回通行证Entry;不符合触发流控向上抛出BlockException。具体各个插槽的职责后续文章再做分析。

未经允许不得转载:大自然的搬运工 » 源码视角讲解Sentinel服务治理工作原理

赞 (0)

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址