当前位置:AIGC资讯 > 数据采集 > 正文

SkyWalking Agent 数据采集流程

版本

7.0.0

描述

Skywalking架构分为三个部分:agent采集端,oap-server服务端,webapp前端展示。
今天就来看一看,agent端是如何进行数据采集并将数据发送给oap-server服务端的。

采集数据流

InstanceMethodsAroundInterceptor接口为例,其定义了三个接口方法。

beforeMethod方法,创建span,并收集IP,端口,调用路径等信息。

    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
        MethodInterceptResult result) throws Throwable {
        Invoker invoker = (Invoker) allArguments[0];
        Invocation invocation = (Invocation) allArguments[1];
        RpcContext rpcContext = RpcContext.getContext();
        boolean isConsumer = rpcContext.isConsumerSide();
        URL requestURL = invoker.getUrl();
        AbstractSpan span;
        final String host = requestURL.getHost();
        final int port = requestURL.getPort();
        if (isConsumer) {
            final ContextCarrier contextCarrier = new ContextCarrier();
            span = ContextManager.createExitSpan(generateOperationName(requestURL, invocation), contextCarrier, host + ":" + port);
            CarrierItem next = contextCarrier.items();
            while (next.hasNext()) {
                next = next.next();
                rpcContext.getAttachments().put(next.getHeadKey(), next.getHeadValue());
                if (invocation.getAttachments().containsKey(next.getHeadKey())) {
                    invocation.getAttachments().remove(next.getHeadKey());
                }
            }
        } else {
            ContextCarrier contextCarrier = new ContextCarrier();
            CarrierItem next = contextCarrier.items();
            while (next.hasNext()) {
                next = next.next();
                next.setHeadValue(rpcContext.getAttachment(next.getHeadKey()));
            }

            span = ContextManager.createEntrySpan(generateOperationName(requestURL, invocation), contextCarrier);
        }

        Tags.URL.set(span, generateRequestURL(requestURL, invocation));
        span.setComponent(ComponentsDefine.DUBBO);
        span.tag(new StringTag(1017, "dubbo.parameter"), JSONObject.toJSONString(RpcContext.getContext().getArguments()));
        SpanLayer.asRPCFramework(span);
    }

handleMethodException方法,处理异常。

    @Override
    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
        Class<?>[] argumentsTypes, Throwable t) {
        dealException(t);
    }

    /**
     * Log the throwable, which occurs in Dubbo RPC service.
     */
    private void dealException(Throwable throwable) {
        AbstractSpan span = ContextManager.activeSpan();
        span.errorOccurred();
        span.log(throwable);
    }

afterMethod方法,处理异常,结束span,同事返回结果。

    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
        Object ret) throws Throwable {
        Result result = (Result) ret;
        if (result != null && result.getException() != null) {
            dealException(result.getException());
        }
        ContextManager.stopSpan();
        return ret;
    }

ContextManager.stopSpan();

这个方法要重点关注。后续数据处理都是由这个方法开始的。

    public static void stopSpan() {
        final AbstractTracerContext context = get();
        stopSpan(context.activeSpan(), context);
    }

    private static void stopSpan(AbstractSpan span, final AbstractTracerContext context) {
        if (context.stopSpan(span)) {
            CONTEXT.remove();
            RUNTIME_CONTEXT.remove();
        }
    }

接着,找到链路上下文进行调用 TracingContext的stopSpan或者asyncStop方法

    public boolean stopSpan(AbstractSpan span) {
		
        finish();
        return activeSpanStack.isEmpty();
    }

上面方法类调用了 TracingContext#finish()进行处理

    private void finish() {
        	......
                TracingContext.ListenerManager.notifyFinish(finishedSegment);
			......
    }

到这里,链路追踪端才算真正结束,当追踪段链路结束之后,会通知监听器向TraceSegmentServiceClient客户端发出通知。
调用 TracingContext.ListenerManager#notifyFinish(TraceSegment finishedSegment)

        static void notifyFinish(TraceSegment finishedSegment) {
            for (TracingContextListener listener : LISTENERS) {
                listener.afterFinished(finishedSegment);
            }
        }

TraceSegmentServiceClient#afterFinished(TraceSegment traceSegment)中,会进行traceSegment结束的后续处理。
agent端,追踪端信息采用生产者消费者模式进行处理。

    public void afterFinished(TraceSegment traceSegment) {
	.......
        if (!carrier.produce(traceSegment)) {
	......
        }
    }

carrier#produce(TraceSegment traceSegment)生产消息。
生产者-消费者模式,将本次产生的追踪链信息保存到buffer中。
driver存在且不在运行状态时,丢弃消息。

    public boolean produce(T data) {
        if (driver != null) {
            if (!driver.isRunning(channels)) {
                return false;
            }
        }
        return this.channels.save(data);
    }

channels#save(T data)
如果有重试策略,当保存失败时会重试指定次数,如果保存成功则返回true,如果失败则重试直到最大次数。
如果没有重试,则在保存数据成功时,返回true。
如果都没有保存成功,则返回false。

    public boolean save(T data) {
        int index = dataPartitioner.partition(bufferChannels.length, data);
        int retryCountDown = 1;
        if (BufferStrategy.IF_POSSIBLE.equals(strategy)) {
            int maxRetryCount = dataPartitioner.maxRetryCount();
            if (maxRetryCount > 1) {
                retryCountDown = maxRetryCount;
            }
        }
        for (; retryCountDown > 0; retryCountDown--) {
            if (bufferChannels[index].save(data)) {
                return true;
            }
        }
        return false;
    }

默认使用ArrayBlockingQueueBuffer,但是追踪链使用的是Buffer作为缓冲区,参考TraceSegmentServiceClientboot()方法:

public void boot() {
        ......
        carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
        ......
    }

接着,继续看bufferChannels[index].save(data)
当当前缓冲数组里面有数据时,直接返回false。缓冲数组容量默认300。 容量处理策略可以降低agent对业务应用的影响,因为不限制的话,当生产的调用链过大,但是没有及时消费将会堆积在JVM内存中,有导致业务应用内存溢出的风险。

    public boolean save(T data) {
        int i = index.getAndIncrement();
        if (buffer[i] != null) {
            switch (strategy) {
                case BLOCKING:
                    while (buffer[i] != null) {
                        try {
                            Thread.sleep(1L);
                        } catch (InterruptedException e) {
                        }
                    }
                    break;
                case IF_POSSIBLE:
                    return false;
                default:
            }
        }
        buffer[i] = data;
        return true;
    }

当生产者开始工作后,消费者启动,并以每20秒消费一次周期性执行。

    /**
     * set consumeDriver to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work with 20
     * millis consume cycle.
     *
     * @param consumer single instance of consumer, all consumer threads will all use this instance.
     * @param num      number of consumer threads
     */
    public DataCarrier consume(IConsumer<T> consumer, int num) {
        return this.consume(consumer, num, 20);
    }
    public DataCarrier consume(Class<? extends IConsumer<T>> consumerClass, int num, long consumeCycle) {
        if (driver != null) {
            driver.close(channels);
        }
        driver = new ConsumeDriver<T>(this.name, this.channels, consumerClass, num, consumeCycle);
        driver.begin(channels);
        return this;
    }

具体执行消费任务是从driver.begin(channels)里面触发的,此方法启动消费者线程。

    @Override
    public void begin(Channels channels) {
        if (running) {
            return;
        }
        try {
            lock.lock();
            this.allocateBuffer2Thread();
            for (ConsumerThread consumerThread : consumerThreads) {
                consumerThread.start();
            }
            running = true;
        } finally {
            lock.unlock();
        }
    }

再看消费者线程ConsumerThread实现。

    @Override
    public void run() {
        running = true;

        final List<T> consumeList = new ArrayList<T>(1500);
        while (running) {
            if (!consume(consumeList)) {
                try {
                    Thread.sleep(consumeCycle);
                } catch (InterruptedException e) {
                }
            }
        }

        // consumer thread is going to stop
        // consume the last time
        consume(consumeList);

        consumer.onExit();
    }

    private boolean consume(List<T> consumeList) {
        for (DataSource dataSource : dataSources) {
            dataSource.obtain(consumeList);
        }

        if (!consumeList.isEmpty()) {
            try {
                consumer.consume(consumeList);
            } catch (Throwable t) {
                consumer.onError(consumeList, t);
            } finally {
                consumeList.clear();
            }
            return true;
        }
        return false;
    }

到这里,整个数据的采集基本上就结束了。后续消费者会消费消息,并将其发送到oap-server服务端进行聚合,然后再保存到数据库。
消费者流程后续有时间分析。

最后

本文时个人见解,如有错误的地方,欢迎指正。

更新时间 2023-11-08