版本
7.0.0
描述
Skywalking架构分为三个部分:agent采集端,oap-server服务端,webapp前端展示。
 今天就来看一看,agent端是如何进行数据采集并将数据发送给oap-server服务端的。
采集数据流
以InstanceMethodsAroundInterceptor接口为例,其定义了三个接口方法。
beforeMethod方法,创建span,并收集IP,端口,调用路径等信息。
 
    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
        MethodInterceptResult result) throws Throwable {
        Invoker invoker = (Invoker) allArguments[0];
        Invocation invocation = (Invocation) allArguments[1];
        RpcContext rpcContext = RpcContext.getContext();
        boolean isConsumer = rpcContext.isConsumerSide();
        URL requestURL = invoker.getUrl();
        AbstractSpan span;
        final String host = requestURL.getHost();
        final int port = requestURL.getPort();
        if (isConsumer) {
            final ContextCarrier contextCarrier = new ContextCarrier();
            span = ContextManager.createExitSpan(generateOperationName(requestURL, invocation), contextCarrier, host + ":" + port);
            CarrierItem next = contextCarrier.items();
            while (next.hasNext()) {
                next = next.next();
                rpcContext.getAttachments().put(next.getHeadKey(), next.getHeadValue());
                if (invocation.getAttachments().containsKey(next.getHeadKey())) {
                    invocation.getAttachments().remove(next.getHeadKey());
                }
            }
        } else {
            ContextCarrier contextCarrier = new ContextCarrier();
            CarrierItem next = contextCarrier.items();
            while (next.hasNext()) {
                next = next.next();
                next.setHeadValue(rpcContext.getAttachment(next.getHeadKey()));
            }
            span = ContextManager.createEntrySpan(generateOperationName(requestURL, invocation), contextCarrier);
        }
        Tags.URL.set(span, generateRequestURL(requestURL, invocation));
        span.setComponent(ComponentsDefine.DUBBO);
        span.tag(new StringTag(1017, "dubbo.parameter"), JSONObject.toJSONString(RpcContext.getContext().getArguments()));
        SpanLayer.asRPCFramework(span);
    }
 
handleMethodException方法,处理异常。
 
    @Override
    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
        Class<?>[] argumentsTypes, Throwable t) {
        dealException(t);
    }
    /**
     * Log the throwable, which occurs in Dubbo RPC service.
     */
    private void dealException(Throwable throwable) {
        AbstractSpan span = ContextManager.activeSpan();
        span.errorOccurred();
        span.log(throwable);
    }
 
afterMethod方法,处理异常,结束span,同事返回结果。
 
    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
        Object ret) throws Throwable {
        Result result = (Result) ret;
        if (result != null && result.getException() != null) {
            dealException(result.getException());
        }
        ContextManager.stopSpan();
        return ret;
    }
 
ContextManager.stopSpan();
 
这个方法要重点关注。后续数据处理都是由这个方法开始的。
    public static void stopSpan() {
        final AbstractTracerContext context = get();
        stopSpan(context.activeSpan(), context);
    }
    private static void stopSpan(AbstractSpan span, final AbstractTracerContext context) {
        if (context.stopSpan(span)) {
            CONTEXT.remove();
            RUNTIME_CONTEXT.remove();
        }
    }
 
接着,找到链路上下文进行调用 TracingContext的stopSpan或者asyncStop方法
    public boolean stopSpan(AbstractSpan span) {
		
        finish();
        return activeSpanStack.isEmpty();
    }
 
上面方法类调用了 TracingContext#finish()进行处理
    private void finish() {
        	......
                TracingContext.ListenerManager.notifyFinish(finishedSegment);
			......
    }
 
到这里,链路追踪端才算真正结束,当追踪段链路结束之后,会通知监听器向TraceSegmentServiceClient客户端发出通知。
 调用 TracingContext.ListenerManager#notifyFinish(TraceSegment finishedSegment)
        static void notifyFinish(TraceSegment finishedSegment) {
            for (TracingContextListener listener : LISTENERS) {
                listener.afterFinished(finishedSegment);
            }
        }
 
在TraceSegmentServiceClient#afterFinished(TraceSegment traceSegment)中,会进行traceSegment结束的后续处理。
 在agent端,追踪端信息采用生产者消费者模式进行处理。
    public void afterFinished(TraceSegment traceSegment) {
	.......
        if (!carrier.produce(traceSegment)) {
	......
        }
    }
 
carrier#produce(TraceSegment traceSegment)生产消息。
 生产者-消费者模式,将本次产生的追踪链信息保存到buffer中。
 当driver存在且不在运行状态时,丢弃消息。
    public boolean produce(T data) {
        if (driver != null) {
            if (!driver.isRunning(channels)) {
                return false;
            }
        }
        return this.channels.save(data);
    }
 
channels#save(T data)
 如果有重试策略,当保存失败时会重试指定次数,如果保存成功则返回true,如果失败则重试直到最大次数。
 如果没有重试,则在保存数据成功时,返回true。
 如果都没有保存成功,则返回false。
    public boolean save(T data) {
        int index = dataPartitioner.partition(bufferChannels.length, data);
        int retryCountDown = 1;
        if (BufferStrategy.IF_POSSIBLE.equals(strategy)) {
            int maxRetryCount = dataPartitioner.maxRetryCount();
            if (maxRetryCount > 1) {
                retryCountDown = maxRetryCount;
            }
        }
        for (; retryCountDown > 0; retryCountDown--) {
            if (bufferChannels[index].save(data)) {
                return true;
            }
        }
        return false;
    }
 
默认使用ArrayBlockingQueueBuffer,但是追踪链使用的是Buffer作为缓冲区,参考TraceSegmentServiceClient的boot()方法:
public void boot() {
        ......
        carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
        ......
    }
 
接着,继续看bufferChannels[index].save(data)。
 当当前缓冲数组里面有数据时,直接返回false。缓冲数组容量默认300。 容量处理策略可以降低agent对业务应用的影响,因为不限制的话,当生产的调用链过大,但是没有及时消费将会堆积在JVM内存中,有导致业务应用内存溢出的风险。
    public boolean save(T data) {
        int i = index.getAndIncrement();
        if (buffer[i] != null) {
            switch (strategy) {
                case BLOCKING:
                    while (buffer[i] != null) {
                        try {
                            Thread.sleep(1L);
                        } catch (InterruptedException e) {
                        }
                    }
                    break;
                case IF_POSSIBLE:
                    return false;
                default:
            }
        }
        buffer[i] = data;
        return true;
    }
 
当生产者开始工作后,消费者启动,并以每20秒消费一次周期性执行。
    /**
     * set consumeDriver to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work with 20
     * millis consume cycle.
     *
     * @param consumer single instance of consumer, all consumer threads will all use this instance.
     * @param num      number of consumer threads
     */
    public DataCarrier consume(IConsumer<T> consumer, int num) {
        return this.consume(consumer, num, 20);
    }
    public DataCarrier consume(Class<? extends IConsumer<T>> consumerClass, int num, long consumeCycle) {
        if (driver != null) {
            driver.close(channels);
        }
        driver = new ConsumeDriver<T>(this.name, this.channels, consumerClass, num, consumeCycle);
        driver.begin(channels);
        return this;
    }
 
具体执行消费任务是从driver.begin(channels)里面触发的,此方法启动消费者线程。
    @Override
    public void begin(Channels channels) {
        if (running) {
            return;
        }
        try {
            lock.lock();
            this.allocateBuffer2Thread();
            for (ConsumerThread consumerThread : consumerThreads) {
                consumerThread.start();
            }
            running = true;
        } finally {
            lock.unlock();
        }
    }
 
再看消费者线程ConsumerThread实现。
    @Override
    public void run() {
        running = true;
        final List<T> consumeList = new ArrayList<T>(1500);
        while (running) {
            if (!consume(consumeList)) {
                try {
                    Thread.sleep(consumeCycle);
                } catch (InterruptedException e) {
                }
            }
        }
        // consumer thread is going to stop
        // consume the last time
        consume(consumeList);
        consumer.onExit();
    }
    private boolean consume(List<T> consumeList) {
        for (DataSource dataSource : dataSources) {
            dataSource.obtain(consumeList);
        }
        if (!consumeList.isEmpty()) {
            try {
                consumer.consume(consumeList);
            } catch (Throwable t) {
                consumer.onError(consumeList, t);
            } finally {
                consumeList.clear();
            }
            return true;
        }
        return false;
    }
 
到这里,整个数据的采集基本上就结束了。后续消费者会消费消息,并将其发送到oap-server服务端进行聚合,然后再保存到数据库。
 消费者流程后续有时间分析。
最后
本文时个人见解,如有错误的地方,欢迎指正。