这里以cdh5-1.6.0_5.10.2为例。
flume源码下载地址:https://github.com/cloudera/flume-ng/tree/cdh5-1.6.0_5.10.2,SpoolDirectorySource在https://github.com/cloudera/flume-ng/tree/cdh5-1.6.0_5.10.2/flume-ng-core项目下,下载之后找到org/apache/flume/source/SpoolDirectorySource修改源码如下:
@Override
public synchronized void start() {
//添加解析日期目录方法
spoolDirectory = directory(spoolDirectory);
logger.info("SpoolDirectorySource source starting with directory: {}",
spoolDirectory);
executor = Executors.newSingleThreadScheduledExecutor();
File directory = new File(spoolDirectory);
try {
reader = new ReliableSpoolingFileEventReader.Builder()
.spoolDirectory(directory)
.completedSuffix(completedSuffix)
.ignorePattern(ignorePattern)
.trackerDirPath(trackerDirPath)
.annotateFileName(fileHeader)
.fileNameHeader(fileHeaderKey)
.annotateBaseName(basenameHeader)
.baseNameHeader(basenameHeaderKey)
.deserializerType(deserializerType)
.deserializerContext(deserializerContext)
.deletePolicy(deletePolicy)
.inputCharset(inputCharset)
.decodeErrorPolicy(decodeErrorPolicy)
.consumeOrder(consumeOrder)
.recursiveDirectorySearch(recursiveDirectorySearch)
.build();
} catch (IOException ioe) {
throw new FlumeException("Error instantiating spooling event parser",
ioe);
}
Runnable runner = new SpoolDirectoryRunnable(reader, sourceCounter);
executor.scheduleWithFixedDelay(
runner, 0, POLL_DELAY_MS, TimeUnit.MILLISECONDS);
super.start();
logger.debug("SpoolDirectorySource source started");
sourceCounter.start();
}
/**
* 解析时间
* @param pattern
* @return
*/
public static String getTime(String pattern) {
SimpleDateFormat sdf = null;
try{
sdf = new SimpleDateFormat(pattern);
}catch (Exception e){
return "";
}
return sdf.format(new Date(System.currentTimeMillis()));
}
/**
* 解析时间
*
* @param spoolDirectory
* @return
*/
public static String spoolTimeDirectory(String spoolDirectory) {
String spool = spoolDirectory.substring(spoolDirectory.lastIndexOf("/") + 1, spoolDirectory.length());
String time = getTime(spool);
if (StringUtils.isNotBlank(time)) {
return time;
}
return spool;
}
/**
* 拼装目录
*
* @param spoolDirectory
* @return
*/
public static String directory(String spoolDirectory) {
String spoolDir = spoolDirectory.substring(0, spoolDirectory.lastIndexOf("/") + 1);
return spoolDir + spoolTimeDirectory(spoolDirectory);
}
按照如上简单修改之后,编译之后倒入到jar包,替换cdh集群中的flume即可。配置文件如下:
app.sources=r1
app.sinks=s1
app.channels=c1
app.sources.r1.type=spooldir
app.sources.r1.spoolDir=/data/log/yyyy-MM-dd
app.sources.r1.channels=c1
app.sources.r1.fileHeader=false
#一行读取默认最大限制为2048,这里重新设置最大限制
app.sources.r1.deserializer.maxLineLength =1048576
#app.sources.r1.interceptors =i1
#app.sources.r1.interceptors.i1.type = timestamp
app.sinks.s1.type = hdfs
app.sinks.s1.hdfs.path = hdfs://hadoop1:8020/home/data/avatar-log/data-log/%Y-%m-%d
#文件前缀和后缀
app.sinks.s1.hdfs.filePrefix = gdapp_log
app.sinks.s1.hdfs.fileSuffix = .log
#通过设置 hdfs.inUsePrefix,例如设置为 .时,hdfs 会把该文件当做隐藏文件,以避免在 mr 过程中读到这些临时文件,引起一些错误
app.sinks.s1.hdfs.inUsePrefix = .
#同时打开的最大文件数目
app.sinks.s1.hdfs.maxOpenFiles = 5000
app.sinks.s1.hdfs.batchSize= 1000
app.sinks.s1.hdfs.fileType = DataStream
app.sinks.s1.hdfs.writeFormat =Text
#128M为一个采集后的存储文件大小
app.sinks.s1.hdfs.rollSize = 134217728
app.sinks.s1.hdfs.rollCount = 0
app.sinks.s1.hdfs.rollInterval = 300
app.sinks.s1.hdfs.useLocalTimeStamp = true
app.sinks.s1.channel = c1
#app.channels.c1.type=file
#app.channels.c1.checkpointDir=./file_channel/checkpoint
#app.channels.c1.dataDirs=./file_channel/data
app.channels.c1.type = memory
app.channels.c1.capacity = 10000
app.channels.c1.transactionCapacity = 1000