版本
7.0.0
描述
Skywalking
架构分为三个部分:agent
采集端,oap-server
服务端,webapp
前端展示。
今天就来看一看,agent
端是如何进行数据采集并将数据发送给oap-server
服务端的。
采集数据流
以InstanceMethodsAroundInterceptor
接口为例,其定义了三个接口方法。
beforeMethod
方法,创建span
,并收集IP
,端口,调用路径等信息。
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
Invoker invoker = (Invoker) allArguments[0];
Invocation invocation = (Invocation) allArguments[1];
RpcContext rpcContext = RpcContext.getContext();
boolean isConsumer = rpcContext.isConsumerSide();
URL requestURL = invoker.getUrl();
AbstractSpan span;
final String host = requestURL.getHost();
final int port = requestURL.getPort();
if (isConsumer) {
final ContextCarrier contextCarrier = new ContextCarrier();
span = ContextManager.createExitSpan(generateOperationName(requestURL, invocation), contextCarrier, host + ":" + port);
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
rpcContext.getAttachments().put(next.getHeadKey(), next.getHeadValue());
if (invocation.getAttachments().containsKey(next.getHeadKey())) {
invocation.getAttachments().remove(next.getHeadKey());
}
}
} else {
ContextCarrier contextCarrier = new ContextCarrier();
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
next.setHeadValue(rpcContext.getAttachment(next.getHeadKey()));
}
span = ContextManager.createEntrySpan(generateOperationName(requestURL, invocation), contextCarrier);
}
Tags.URL.set(span, generateRequestURL(requestURL, invocation));
span.setComponent(ComponentsDefine.DUBBO);
span.tag(new StringTag(1017, "dubbo.parameter"), JSONObject.toJSONString(RpcContext.getContext().getArguments()));
SpanLayer.asRPCFramework(span);
}
handleMethodException
方法,处理异常。
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
dealException(t);
}
/**
* Log the throwable, which occurs in Dubbo RPC service.
*/
private void dealException(Throwable throwable) {
AbstractSpan span = ContextManager.activeSpan();
span.errorOccurred();
span.log(throwable);
}
afterMethod
方法,处理异常,结束span
,同事返回结果。
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
Result result = (Result) ret;
if (result != null && result.getException() != null) {
dealException(result.getException());
}
ContextManager.stopSpan();
return ret;
}
ContextManager.stopSpan();
这个方法要重点关注。后续数据处理都是由这个方法开始的。
public static void stopSpan() {
final AbstractTracerContext context = get();
stopSpan(context.activeSpan(), context);
}
private static void stopSpan(AbstractSpan span, final AbstractTracerContext context) {
if (context.stopSpan(span)) {
CONTEXT.remove();
RUNTIME_CONTEXT.remove();
}
}
接着,找到链路上下文进行调用 TracingContext的stopSpan或者asyncStop方法
public boolean stopSpan(AbstractSpan span) {
finish();
return activeSpanStack.isEmpty();
}
上面方法类调用了 TracingContext#finish()
进行处理
private void finish() {
......
TracingContext.ListenerManager.notifyFinish(finishedSegment);
......
}
到这里,链路追踪端才算真正结束,当追踪段链路结束之后,会通知监听器向TraceSegmentServiceClient
客户端发出通知。
调用 TracingContext.ListenerManager#notifyFinish(TraceSegment finishedSegment)
static void notifyFinish(TraceSegment finishedSegment) {
for (TracingContextListener listener : LISTENERS) {
listener.afterFinished(finishedSegment);
}
}
在TraceSegmentServiceClient#afterFinished(TraceSegment traceSegment)
中,会进行traceSegment结束的后续处理。
在agent
端,追踪端信息采用生产者消费者模式进行处理。
public void afterFinished(TraceSegment traceSegment) {
.......
if (!carrier.produce(traceSegment)) {
......
}
}
carrier#produce(TraceSegment traceSegment)
生产消息。
生产者-消费者模式,将本次产生的追踪链信息保存到buffer中。
当driver
存在且不在运行状态时,丢弃消息。
public boolean produce(T data) {
if (driver != null) {
if (!driver.isRunning(channels)) {
return false;
}
}
return this.channels.save(data);
}
channels#save(T data)
如果有重试策略,当保存失败时会重试指定次数,如果保存成功则返回true,如果失败则重试直到最大次数。
如果没有重试,则在保存数据成功时,返回true。
如果都没有保存成功,则返回false。
public boolean save(T data) {
int index = dataPartitioner.partition(bufferChannels.length, data);
int retryCountDown = 1;
if (BufferStrategy.IF_POSSIBLE.equals(strategy)) {
int maxRetryCount = dataPartitioner.maxRetryCount();
if (maxRetryCount > 1) {
retryCountDown = maxRetryCount;
}
}
for (; retryCountDown > 0; retryCountDown--) {
if (bufferChannels[index].save(data)) {
return true;
}
}
return false;
}
默认使用ArrayBlockingQueueBuffer
,但是追踪链使用的是Buffer
作为缓冲区,参考TraceSegmentServiceClient
的boot()
方法:
public void boot() {
......
carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
......
}
接着,继续看bufferChannels[index].save(data)
。
当当前缓冲数组里面有数据时,直接返回false
。缓冲数组容量默认300
。 容量处理策略可以降低agent
对业务应用的影响,因为不限制的话,当生产的调用链过大,但是没有及时消费将会堆积在JVM
内存中,有导致业务应用内存溢出的风险。
public boolean save(T data) {
int i = index.getAndIncrement();
if (buffer[i] != null) {
switch (strategy) {
case BLOCKING:
while (buffer[i] != null) {
try {
Thread.sleep(1L);
} catch (InterruptedException e) {
}
}
break;
case IF_POSSIBLE:
return false;
default:
}
}
buffer[i] = data;
return true;
}
当生产者开始工作后,消费者启动,并以每20秒消费一次周期性执行。
/**
* set consumeDriver to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work with 20
* millis consume cycle.
*
* @param consumer single instance of consumer, all consumer threads will all use this instance.
* @param num number of consumer threads
*/
public DataCarrier consume(IConsumer<T> consumer, int num) {
return this.consume(consumer, num, 20);
}
public DataCarrier consume(Class<? extends IConsumer<T>> consumerClass, int num, long consumeCycle) {
if (driver != null) {
driver.close(channels);
}
driver = new ConsumeDriver<T>(this.name, this.channels, consumerClass, num, consumeCycle);
driver.begin(channels);
return this;
}
具体执行消费任务是从driver.begin(channels)
里面触发的,此方法启动消费者线程。
@Override
public void begin(Channels channels) {
if (running) {
return;
}
try {
lock.lock();
this.allocateBuffer2Thread();
for (ConsumerThread consumerThread : consumerThreads) {
consumerThread.start();
}
running = true;
} finally {
lock.unlock();
}
}
再看消费者线程ConsumerThread
实现。
@Override
public void run() {
running = true;
final List<T> consumeList = new ArrayList<T>(1500);
while (running) {
if (!consume(consumeList)) {
try {
Thread.sleep(consumeCycle);
} catch (InterruptedException e) {
}
}
}
// consumer thread is going to stop
// consume the last time
consume(consumeList);
consumer.onExit();
}
private boolean consume(List<T> consumeList) {
for (DataSource dataSource : dataSources) {
dataSource.obtain(consumeList);
}
if (!consumeList.isEmpty()) {
try {
consumer.consume(consumeList);
} catch (Throwable t) {
consumer.onError(consumeList, t);
} finally {
consumeList.clear();
}
return true;
}
return false;
}
到这里,整个数据的采集基本上就结束了。后续消费者会消费消息,并将其发送到oap-server
服务端进行聚合,然后再保存到数据库。
消费者流程后续有时间分析。
最后
本文时个人见解,如有错误的地方,欢迎指正。