当前位置:AIGC资讯 > 数据采集 > 正文

flume spooldir 定期采集日期目录

这里以cdh5-1.6.0_5.10.2为例。

flume源码下载地址:https://github.com/cloudera/flume-ng/tree/cdh5-1.6.0_5.10.2,SpoolDirectorySource在https://github.com/cloudera/flume-ng/tree/cdh5-1.6.0_5.10.2/flume-ng-core项目下,下载之后找到org/apache/flume/source/SpoolDirectorySource修改源码如下:

@Override
public synchronized void start() {
  //添加解析日期目录方法
  spoolDirectory = directory(spoolDirectory);
  logger.info("SpoolDirectorySource source starting with directory: {}",
      spoolDirectory);

  executor = Executors.newSingleThreadScheduledExecutor();

  File directory = new File(spoolDirectory);
  try {
    reader = new ReliableSpoolingFileEventReader.Builder()
        .spoolDirectory(directory)
        .completedSuffix(completedSuffix)
        .ignorePattern(ignorePattern)
        .trackerDirPath(trackerDirPath)
        .annotateFileName(fileHeader)
        .fileNameHeader(fileHeaderKey)
        .annotateBaseName(basenameHeader)
        .baseNameHeader(basenameHeaderKey)
        .deserializerType(deserializerType)
        .deserializerContext(deserializerContext)
        .deletePolicy(deletePolicy)
        .inputCharset(inputCharset)
        .decodeErrorPolicy(decodeErrorPolicy)
        .consumeOrder(consumeOrder)
        .recursiveDirectorySearch(recursiveDirectorySearch)
        .build();
  } catch (IOException ioe) {
    throw new FlumeException("Error instantiating spooling event parser",
        ioe);
  }

  Runnable runner = new SpoolDirectoryRunnable(reader, sourceCounter);
  executor.scheduleWithFixedDelay(
      runner, 0, POLL_DELAY_MS, TimeUnit.MILLISECONDS);

  super.start();
  logger.debug("SpoolDirectorySource source started");
  sourceCounter.start();
}

/**
 * 解析时间
 * @param pattern
 * @return
 */
public static String getTime(String pattern) {
    SimpleDateFormat sdf = null;
    try{
        sdf = new SimpleDateFormat(pattern);
    }catch (Exception e){
        return "";
    }
    return sdf.format(new Date(System.currentTimeMillis()));
}

/**
 * 解析时间
 *
 * @param spoolDirectory
 * @return
 */
public static String spoolTimeDirectory(String spoolDirectory) {
    String spool = spoolDirectory.substring(spoolDirectory.lastIndexOf("/") + 1, spoolDirectory.length());
    String time = getTime(spool);
    if (StringUtils.isNotBlank(time)) {
        return time;
    }
    return spool;
}

/**
 * 拼装目录
 *
 * @param spoolDirectory
 * @return
 */
public static String directory(String spoolDirectory) {
    String spoolDir = spoolDirectory.substring(0, spoolDirectory.lastIndexOf("/") + 1);
    return spoolDir + spoolTimeDirectory(spoolDirectory);
}

按照如上简单修改之后,编译之后倒入到jar包,替换cdh集群中的flume即可。配置文件如下:

app.sources=r1
app.sinks=s1
app.channels=c1

app.sources.r1.type=spooldir
app.sources.r1.spoolDir=/data/log/yyyy-MM-dd
app.sources.r1.channels=c1
app.sources.r1.fileHeader=false
#一行读取默认最大限制为2048,这里重新设置最大限制
app.sources.r1.deserializer.maxLineLength =1048576

#app.sources.r1.interceptors =i1
#app.sources.r1.interceptors.i1.type = timestamp

app.sinks.s1.type = hdfs
app.sinks.s1.hdfs.path = hdfs://hadoop1:8020/home/data/avatar-log/data-log/%Y-%m-%d
#文件前缀和后缀
app.sinks.s1.hdfs.filePrefix = gdapp_log
app.sinks.s1.hdfs.fileSuffix = .log
#通过设置 hdfs.inUsePrefix,例如设置为 .时,hdfs 会把该文件当做隐藏文件,以避免在 mr 过程中读到这些临时文件,引起一些错误
app.sinks.s1.hdfs.inUsePrefix = .
#同时打开的最大文件数目
app.sinks.s1.hdfs.maxOpenFiles = 5000
app.sinks.s1.hdfs.batchSize= 1000
app.sinks.s1.hdfs.fileType = DataStream
app.sinks.s1.hdfs.writeFormat =Text
#128M为一个采集后的存储文件大小
app.sinks.s1.hdfs.rollSize = 134217728
app.sinks.s1.hdfs.rollCount = 0
app.sinks.s1.hdfs.rollInterval = 300
app.sinks.s1.hdfs.useLocalTimeStamp = true
app.sinks.s1.channel = c1

#app.channels.c1.type=file
#app.channels.c1.checkpointDir=./file_channel/checkpoint
#app.channels.c1.dataDirs=./file_channel/data
app.channels.c1.type = memory
app.channels.c1.capacity = 10000
app.channels.c1.transactionCapacity = 1000


更新时间 2023-11-08