webtail 文件获取,文件监控,websocket
发布时间:2022-06-14 10:35 所属栏目:61 来源:互联网
导读:webtail 能够持续读取一个文件,并将文件内容通过websocket,实时推送到web端,webtail文件读取基于linux的inotify,所以没有可移植性. 文件读取 先介绍下背景,之前遇到过服务器上需要长时间tail一个日志,之前经常是通过一个终端连到服务器上,但是对于长时间观
webtail 能够持续读取一个文件,并将文件内容通过websocket,实时推送到web端,webtail文件读取基于linux的inotify,所以没有可移植性. 文件读取 先介绍下背景,之前遇到过服务器上需要长时间tail一个日志,之前经常是通过一个终端连到服务器上,但是对于长时间观察,就不太方便:老是要开着终端,没法用手机等移动设备查看;多人共享查看比较麻烦,都需要登录到服务器上。 webtail实现类似于tail,能够持续读取一个文件,并将文件内容通过websocket,实时推送到web端。webtail文件读取基于linux的inotify,所以没有可移植性,websocket使用基于asio的websocketpp,代码维护在这里。 webtail代码维护在https://gitorious.org/webtail/webtail,目前还是一个简单可用的小应用,还有很多可以提升的地方。 本文介绍下文件读取的部分. 文件读取分为两个部分,文件读取和文件监控. 文件监控: 文件监控直接通过了linux的inotify接口实现。这里没有考虑移植性,也就没像tailf那样,通过宏来判断是否支持inotify,如果不支持,降级使用循环轮寻的方式读取。 inotify的使用还是比较方便的基本上就是:inotify_init,inotify_add_watch,然后配合read系统调用,获取文件修改信息。因此实现也非常方便。 首先是在构造函数里面初始化inotify:inotifyFd = inotify_init(); 然后提供一个watch接口,通过传入前文描述的TFile对象和内容读取的回调函数,添加对应文件的监控和回调,代码如下: void FileWatcher::watch ( boost::shared_ptr< TFile > tFile, std::list< FileWatcher::ReadCallBack > callBackList ) { if(!tFile->hasError() && !callBackList.empty()) { int wd = inotify_add_watch(inotifyFd, tFile->name().c_str(), IN_MODIFY); if(wd > 0) { tFileMap.insert(std::make_pair<int, boost::shared_ptr<TFile> >(wd, tFile)); callBackMap.insert(std::make_pair<int, std::list<ReadCallBack> >(wd, callBackList)); //init read std::string initContent = tFile->read(); BOOST_FOREACH(ReadCallBack &callback, callBackList) { callback(initContent); } } } } 这里通过TFile的文件名,向内核注册添加该文件的modify事件,并且在注册成功之后,进行初始读取(这里有个小问题,由于后面websocket端没有做缓存,所以由于初始读取的时候还没有任何websocket客户端连接,所以通过web无法读取初始内容,也就是文件最后10行)。同时,这个类维护两个hashmap,分别是监听描述符wd->tFile和wd->callbacklist。 监听完成后,就是启动监听,也就是通过读取fd,感知被监听文件的变更,由于这里只监听了文件修改,那么读取到这个事件之后,就可以对该文件进行增量读取(前文已经描述了读取方法),代码如下: char * buffer = new char[inotifyBufferSize]; while(!_interrupted) { if(read(inotifyFd, buffer, inotifyBufferSize) < 0) { if(errno == EINTR) { // interrupt delete buffer; return; } } struct inotify_event *event = ( struct inotify_event * ) buffer; int wd = event->wd; BOOST_AUTO(tFileIter, tFileMap.find(wd)); if(tFileIter != tFileMap.end()) { boost::shared_ptr<TFile> tFile = tFileIter->second; std::string content = tFile->read(); BOOST_AUTO(iter, callBackMap.find(wd)); if(iter != callBackMap.end()) { std::list<ReadCallBack> callbacks = iter->second; BOOST_FOREACH(ReadCallBack &callback, callbacks) { callback(content); } } } } delete buffer; 这里参照inotify的文档,首先读取缓冲区大小设置为:static const int inotifyBufferSize = sizeof(struct inotify_event) + NAME_MAX + 1; 也就是inotify_event结构的长度,和名字最大值。由于inotify_event是变长字段(包含一个可选的文件名字段),所以这里采用了系统限制的文件名最大值NAME_MAX,这个宏在climits中定义,在linux中大小为255字节。 然后通过系统调用read,读取文件描述符inotifyFd,这里如果没有新的事件产生,read会进入阻塞状态,节省系统资源。如果有返回,则处理返回的inotify_event对象(注意在监听modify事件的时候,是没有文件名的)。通过结构中的wd,从之前保存的hashmap中获取对应的tFile对象进行增量读取,然后再读取wd对应的回调函数,将读取内容返回。 这里有个小问题需要处理,就是如何中断读取。之前为了在gtest中能够通过单元测试的方式进行测试,通过查看手册可以知道,如果read调用被系统信号中断,会标记错误码为EINTR。所以,当读取失败的时候,可以通过对ERRNO检查,判断是否是信号中断。 由于程序会一直运行,知道通过信号终止,所以析构变的不是很重要了。这里析构函数里面通过调用inotify_rm_watch将之前保存的wd全部去掉,然后通过close调用交inotify的文件描述符关闭即可: FileWatcher::~FileWatcher() { if(inotifyFd > 0) { boost::unordered::unordered_map<int, boost::shared_ptr<TFile> >::iterator iter; for(iter = tFileMap.begin(); iter != tFileMap.end(); ++iter) { inotify_rm_watch(inotifyFd, iter->first); } close(inotifyFd); } } 文件读取: 由于unix的文件多次打开独立维护file table,共享v-node table(参照APUE 3.10 file sharing)。因此只维护一个文件描述符和当前文件偏移,每次读取通过fstat获取文档当前大小,和维护的文件偏移进行对比,如果小于文件大小,则读取文件直到末尾,并输出。 这里有几点是模仿tailf的,首先是不修改文件访问时间,这是通过在open系统调用中,增加O_NOATIME,按照man open的解释:Do not update the file last access time (st_atime in the inode) when the file is read(2). This flag is intended for use by indexing or backup programs, where its use can significantly reduce the amount of disk activity. This flag may not be effective on all file systems. One example is NFS, where the server main?tains the access time. 启动了这个参数,通过read调用读取文件的时候,不会修改atime了。 其次是首次读取的时候,读取文件最后10行,由于websocket发送没有做缓存,所以从web端没法看见,这里代码也是参照了tailf,代码如下: char *buffer = new char[initLen * BUFSIZ]; char *p = buffer; char lineBuffer[BUFSIZ]; int readSize; int lineCount = 0; bool head = true; int i = 0, j = 0; while((readSize = ::read(fd, lineBuffer, BUFSIZ - 1)) > 0) { for(i = 0, j = 0; i < readSize; ++i) { // read line, save to buffer if(lineBuffer[i] == 'n') { std::memcpy(p, lineBuffer + j, i - j + 1); std::memset(p + i - j + 1, '', 1); j = i + 1; if(++lineCount >= initLen) { lineCount = 0; head = false; } p = buffer + (lineCount * BUFSIZ); } } // read break in the middle of line if(j < i) { // finished read all files if(readSize < BUFSIZ) { std::memcpy(p, lineBuffer + j, i - j + 1); std::memset(p + i - j + 1, '', 1); ++lineCount; } else if (j == 0){ // long line drop? continue; } else { // not finished, seek to line begin curPos = lseek(fd, j - i -1, SEEK_CUR); } } } std::string initReadResult; if(head) { for(i = 0; i < lineCount; ++i) { initReadResult += (buffer + i * BUFSIZ); } } else { for(i = lineCount; i < initLen; ++i) { initReadResult += (buffer + i * BUFSIZ); } for(i = 0; i < lineCount; ++i) { initReadResult += (buffer + i * BUFSIZ); } //phpfensi.com } curPos = lseek(fd, 0, SEEK_CUR); delete buffer; 首先声明一段用来保存最终n行的缓存,缓存最长行长度是BUFSIZ,这个在linux中的定义是8192字节,然后每次读取BUFSIZ-1个字节,最后一个用来放,类似fgets的实现,解析其中的换行符,由于只打算在linux中使用,所以只解析n,最后根据状态,从缓存中读取最后n行数据到string中. 如果不是初始读取,之前已经说过逻辑了,将fstat获取到的文件大小和保存的当前偏移做比较,如果有新的内容,则读取,没有就直接返回。 (编辑:ASP站长网) |
相关内容
网友评论
推荐文章
热点阅读