用Python实现Hadoop实时作业状态监控

2017-01-07 19:12:19来源:CSDN作者:GitzLiu人点击



基于Python的Hadoop实时作业状态监控


前言:

  任务需要,要求完成这么一个程序,恰好博主以前在虚拟机上部署过hadoop,但是部署完后一直没用过,这次就来尝试下吧。

进入正题:

一、环境及工具:

ubuntu14.04 LTS
Hadoop
Python
PycURL


二、关于 API

  先把语言放在一边,要想监控hadoop的作业状态,那hadoop至少要提供相应的API 吧,上官网一通猛翻,果然找到了,Hadoop RESTful API,
   [ Hadoop RESTful API ]
  这个链接比官网要详细一些,里面提示了hadoop提供了curl工具来使用它的API。curl是什么呢,直接上官网去看WebHDFS REST API例子吧,传送门如下:
  [ WebHDFS REST API ]
  不过看完上述两个链接,我还是不知道如何开始写我的程序,继续在官网奋战,终于找到了这个简单粗暴实用的API:hadoop提供了ResourceManager REST API。
恩,链接放在下面,让各位少走弯路(说多都是泪)。
  [ ResourceManager REST API ]
  API在手,我们就要考虑下如何利用这个资源了,前面说了要利用curl这个工具,但我们是用Python来实现这个监控程序,Python怎么来调用curl呢?带着这个疑问在网上走弯路,神奇的工具出现了,pycurl库,恩,看名字我想大家也明白了,前人的智慧不服不行。
PycURL is a Python interface to libcurl
关于pycurl的使用方法可上网查查,这里也放个链接提供参考:
“自己的博客,还没写,等完成这篇我在来填坑,大家可以先上网查查”

在环境中安装pycurl,安装方法如下:
   [ PycURL库安装 ]
  
  

三、利用PycURL获取json数据

  好了万事具备,我们可以上代码了:

    b = StringIO.StringIO()     c = pycurl.Curl()     checkurl = "http://<rm http address:port>/ws/v1/cluster/apps" #需要监控的地址    c.setopt(pycurl.URL, checkurl)     c.setopt(pycurl.HTTPHEADER, ["Accept:"])     c.setopt(pycurl.WRITEFUNCTION, b.write) #回调    c.setopt(pycurl.FOLLOWLOCATION, 1)     c.setopt(pycurl.MAXREDIRS, 5) #重定向    c.setopt(pycurl.CONNECTTIMEOUT, 60) #链接超时    c.perform() #运行    status = c.getinfo(c.HTTP_CODE)     if status!=200  #HTTP状态码,200表示成功         return    html = b.getvalue()

  通过运行这段程序以及官网上的API描述,我们发现html里面存储的信息是标准的json数据,这就非常好了,因为python提供了字典这种数据结构,若是我们可以把json数据转换为字典结构,那岂不是会方便很多?
  上网简单一搜索,发现python提供了json.loads()转换函数,果然厉害。
  
  

四、转换成Python字典结构

dic_a=json.loads(html)dic_b=dic_a['apps']              #dic_b=second diclist_c=dic_b['app']              #list_c= thrid listfor dic_d in list_c:             #dic_d= fourth dic        print dic_d['state']

  看上述代码也能明白,json数据转换完后的字典,嵌套着字典和列表,需要一层层访问,才能获得想要的信息,而我想要的就是state这个作业状态 信息。
  至此,我们已经获取到了想要的内容,大家可以根据自己的需求来合理利用。
 
 

五、完整代码

  下面是按我的需求写的完整程序,带有计时功能,如果没有作业处于运行态,就启动倒计时,时间到了仍没有作业运行就发出提示(或者运行其他内容),倒计时期间有其他作业运行就关闭计时,直到又没有作业执行了,再次重新开始倒计时。

# -*- encoding: utf-8 -*-import timeimport threading import pycurlimport StringIOimport reimport jsonst=0def time_count():    global st    i=10    while i>0:        print "i:%d"%i        i-=1        time.sleep(1)        if st==1:            st=0            return    print "there is no running task"def check_state():    st_num=0    b = StringIO.StringIO()     c = pycurl.Curl()     checkurl = "http://<rm http address:port>/ws/v1/cluster/apps" #需要监控的地址    c.setopt(pycurl.URL, checkurl)     c.setopt(pycurl.HTTPHEADER, ["Accept:"])     c.setopt(pycurl.WRITEFUNCTION, b.write) #回调    c.setopt(pycurl.FOLLOWLOCATION, 1)     c.setopt(pycurl.MAXREDIRS, 5) #重定向    c.setopt(pycurl.CONNECTTIMEOUT, 60) #链接超时    c.perform() #运行    status = c.getinfo(c.HTTP_CODE)     html = b.getvalue()    dic_a=json.loads(html)    dic_b=dic_a['apps']                  #dic_b=second dic    if dic_b!=None:                  #没有作业时,列表为空,非空才可获取        list_c=dic_b['app']              #list_c= thrid liebiao        for dic_d in list_c:             #dic_d= fourth dic            if dic_d['state']=='FINISHED':                st_num+=1        if st_num==len(list_c):          # 所有作业finished            return 0        else:            return 1    else:        return 0    c.close()     b.close() if __name__ == '__main__':          t1=threading.Thread(target=time_count)    pnum=0;    while True:        pnum+=1        print 'check state %d/n'%pnum        re_state=check_state()        if re_state==0:                                        #no task is running :            if t1.is_alive() ==False:                t1=threading.Thread(target=time_count)                t1.start()        if re_state==1:                                        # task is running :            if t1.is_alive() ==True:                st=1                                   #shutdown thread        #other program        time.sleep(1)    #other program

一些说明
  本程序可以不使用线程,函数完全可以胜任,但由于后期还需改动,使用线程会更具模块化一些,方便扩展。
  

六、后记

  到这就已经结束了,但不知道有没有细心的朋友发现,所谓监控hadoop作业的状态,并且hadoop API还提供了json数据的返回形式,其实…

  来,我们看下面这段代码:

import urlliburl="http://<rm http address:port>/ws/v1/cluster/apps"page = urllib.urlopen(url)html = page.read()reg = r'“state”:[A-Z]*'imgre = re.compile(reg)imglist = re.findall(imgre,html)for content in imglist:    print content

  是不是有点懵,恩,博主程序写到一半的时候也发现了,既然监控只是要确认作业state信息,那么我们直接把html爬下来,利用正则分析下,不是一样么?

简单粗暴,高效快捷。

  不黑了,当然,对于我们这个程序,爬虫会更高效一些,但hadoop信息千千万,我们所需的数据远不止一个state 这么简单,而利用API获得的json,则会有利于我们进行一些更加复杂的操作。

最新文章

123

最新摄影

微信扫一扫

第七城市微信公众平台