189 8069 5689

怎么用python+Element实现主机Host操作

这篇文章主要讲解了“怎么用python+Element实现主机Host操作”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“怎么用python+Element实现主机Host操作”吧!

创新互联建站长期为1000多家客户提供的网站建设服务,团队从业经验10年,关注不同地域、不同群体,并针对不同对象提供差异化的产品和服务;打造开放共赢平台,与合作伙伴共同营造健康的互联网生态环境。为广元企业提供专业的成都做网站、网站设计,广元网站改版等技术服务。拥有十多年丰富建站经验和众多成功案例,为您定制开发。

1.前端HTML和CSS

<%inherit file="/base.html"/>


    
        
            
                
                    
                        
                            选择业务:
                            
                                
                                    
                                
                            
                        
                                     
                                                                                           请输入主机IP:                                                                                                                                                                                                                                                                                       搜索                                                                                                                                                                                                                                                                         {{ scope.row.is_monitored ? "移除监控":"加入监控" }}                     查看性能                     查看状态                                                     

2.前端JS


    new Vue({
        el: '#app',
        data: {
            bkBizData: [],
            hostData: [],
            searchBiz:'',
            searchHostIp:''
        },
        mounted() {
            // 页面加载就获取所有模板
            this.init();
            this.getSearch();
        },
        methods: {
            init() {
                axios.get(site_url + "get_biz_list/").then(res => {
                    if (res.data.result){
                        this.bkBizData = res.data.data;
                    }else{
                        this.$message.error('获取业务失败');
                    }
                },'json');
                this.getSearch();
            },
            getSearch() {
                axios.get(site_url + "host_view/?search_biz_id=" + this.searchBiz + "&query_str=" + this.searchHostIp).then(res => {
                    if (res.data.result){
                        this.hostData = res.data.data;
                    }else{
                        this.$message.error('获取模板失败');
                    }
                },'json');
            },
            changeBiz(){
                this.getSearch();
            },
            editHost(row) {
                const tmpText = row.is_monitored ? "主机移除监控队列, 是否继续?" : "主机加入监控队列, 是否继续?";
                this.$confirm(tmpText, '提示', {
                  confirmButtonText: '确定',
                  cancelButtonText: '取消',
                  type: 'warning'
                }).then(() => {
                    axios.post(site_url + "host_view/", {"host_id": row.host_id,"is_monitored": row.is_monitored}).then(res => {
                        if (res.data.result) {
                            if(row.is_monitored){
                                this.$message.success('主机移除监控队列成功');
                            } else {
                                this.$message.warning('主机加入监控队列成功');
                            }
                            this.getSearch();
                        } else {
                            this.$message.error('更新主机监控状态失败');
                        }
                    }, 'json');
                }).catch(() => {
                    this.$message({type: 'info', message: '已取消删除'});
                });
            },
            getPerformData(row) {
                this.$confirm("是否查询系统资源使用情况?", '提示', {
                  confirmButtonText: '确定',
                  cancelButtonText: '取消',
                  type: 'warning'
                }).then(() => {
                    const params = {
                        "host_id": row.host_id,
                        "bk_biz_id": row.bk_biz_id,
                        "ip": row.ip,
                        "cloud_id": row.cloud_id,
                    };
                    axios.post(site_url + "get_perform_data/", params).then(res => {
                        if (res.data.result) {
                            row.mem_use = res.data.data.mem;
                            row.cpu_use = res.data.data.cpu;
                            row.disk_use = res.data.data.disk;
                        } else {
                            this.$message.error('查询主机系统资源使用情况失败');
                        }
                    }, 'json');
                }).catch(() => {
                  this.$message({type: 'info', message: '已取消查询'});
                });
            },
            getStatus(row) {
                location.href = site_url + 'status/'
            }
        }
    })

3.Django代码

urls.py文件内容

from django.conf.urls import patterns

from home_application.temp import views as temp_view
from home_application.job import views as job_view
from home_application.host import views as host_view
from home_application.exam import views as exam_view

urlpatterns = patterns(
    'home_application.views',
    (r'^$', job_view.job),
    (r'^dev-guide/$', 'dev_guide'),
    (r'^contactus/$', 'contactus'),
    (r'^api/test/$', 'test'),
    (r'^temp/$', 'temp'),
    (r'^host/$', host_view.host),
    (r'^status/$', host_view.status),
    (r'^host_view/$', host_view.HostView.as_view()),
    (r'^get_all_hosts/$', host_view.get_all_hosts),
    (r'^get_perform_data/$', host_view.get_perform_data),
    ...
)

host\views.py文件内容

import json

from django.views.generic import View
from django.views.decorators.csrf import csrf_exempt
from django.utils.decorators import method_decorator
from django.http import JsonResponse

from common.mymako import render_mako_context

from home_application.models import Host, LoadData
from home_application.utils.job_api import FastJobApi

perform_script = """
#!/bin/bash

MEMORY=$(free -m | awk 'NR==2{printf "%.2f%%",$3*100/$2}')
DISK=$(df -h| awk '$NF=="/"{printf "%s",$5}')
CPU=$(top -bn1 | grep load | awk '{printf "%.2f%%",$(NF-2)}')
DATE=$(date "+%Y-%m-%d %H:%M:%S")
echo -e "$DATE|$MEMORY|$DISK|$CPU"
"""

def host(request):
    return render_mako_context(request, '/home_application/host.html')

def status(request):
    return render_mako_context(request, "/home_application/status.html")

class CsrfExemptView(View):
    @method_decorator(csrf_exempt)
    def dispatch(self, request, *args, **kwargs):
        return super(CsrfExemptView, self).dispatch(request, *args, **kwargs)

def get_all_hosts(request):
    from home_application.utils.cc_by_request import cc_search_host
    res_data = cc_search_host().get("info")
    for host in res_data:
        bk_host_innerip = host.get("host", {}).get("bk_host_innerip")
        bk_host_id = host.get("host", {}).get("bk_host_id")
        bk_host_name = host.get("host", {}).get("bk_host_name")
        bk_os_name = host.get("host", {}).get("bk_os_name")
        bk_biz_id = host.get("biz", [])[0].get("bk_biz_id")
        bk_biz_name = host.get("biz", [])[0].get("bk_biz_name")
        cloud_name = host.get("host", {}).get("bk_cloud_id")[0].get("bk_inst_name")
        cloud_id = host.get("host", {}).get("bk_cloud_id")[0].get("bk_inst_id")
        host_obj = {
            "ip": bk_host_innerip,
            "bk_biz_id": bk_biz_id,
            "os_name": bk_os_name,
            "host_name": bk_host_name,
            "bk_biz_name": bk_biz_name,
            "cloud_id": cloud_id,
            "cloud_name": cloud_name,
        }
        Host.objects.update_or_create(host_id=bk_host_id, defaults=host_obj)
    return JsonResponse({"result": res_data})

@csrf_exempt
def get_perform_data(request):
    data = json.loads(request.body)
    host_id = data.get("host_id")
    try:
        obj = LoadData.objects.filter(host_id=host_id).last()
        if obj:
            res_data = {
                "cpu": obj.cpu,
                "mem": obj.mem,
                "disk": obj.disk,
            }
            return JsonResponse({"result": True, "data": res_data})
        bk_biz_id = int(data.get("bk_biz_id"))
        ip = data.get("ip")
        cloud_id = data.get("cloud_id")
        # res_data = execute_script_and_return(request,ip_list, bk_biz_id, perform_script)
        obj = FastJobApi(bk_biz_id, cloud_id, ip, perform_script)
        job_res = obj.execute_script_and_return()
        if job_res:
            log_content = job_res[0].get("log_content")
            res_data = {
                "result": True,
                "cpu": log_content.split("|")[3],
                "mem": log_content.split("|")[1],
                "disk": log_content.split("|")[2],
            }
            return JsonResponse({"result": True, "data": res_data})
        return JsonResponse({"result": False})
    except Exception:
        return JsonResponse({"result": False})

class HostView(CsrfExemptView):
    def get(self, request, *args, **kwargs):
        try:
            host_query = Host.objects.all()
        except Exception:
            return JsonResponse({"result": False})
        search_biz_id = request.GET.get("search_biz_id")
        query_str = request.GET.get("query_str")
        if search_biz_id:
            host_query = host_query.filter(bk_biz_id=search_biz_id)
        if query_str:
            host_query = host_query.filter(ip__in=query_str.split(","))
        host_query = host_query[:30] if host_query.count() > 10 else host_query
        res_data = [i.to_dict() for i in host_query]
        return JsonResponse({"result": True, "data": res_data})

    def post(self, request, *args, **kwargs):
        try:
            data = json.loads(request.body)
            host_id = data.get("host_id")
            is_monitored = data.get("is_monitored")
            if is_monitored:
                Host.objects.filter(host_id=host_id).update(is_monitored=False)
            else:
                Host.objects.filter(host_id=host_id).update(is_monitored=True)
            return JsonResponse({"result": True})
        except Exception:
            return JsonResponse({"result": False})

models.py文件内容

from django.db import models

from home_application.utils.parse_time import parse_datetime_to_timestr

class Host(models.Model):
    host_id = models.IntegerField(u"主机ID", primary_key=True, unique=True)
    ip = models.CharField(u"IP地址", max_length=32, blank=True, null=True)
    bk_biz_id = models.CharField(u"业务ID", max_length=16, blank=True, null=True)
    bk_biz_name = models.CharField(u"业务名称", max_length=512, blank=True, null=True)
    os_name = models.CharField(u"系统名", max_length=128, blank=True, null=True)
    host_name = models.CharField(u"主机名", max_length=128, blank=True, null=True)
    cloud_id = models.IntegerField(u"云区域ID", blank=True, null=True)
    cloud_name = models.CharField(u"云区域名称", max_length=32, blank=True, null=True)
    is_monitored = models.BooleanField(u"是否已监控", default=False)

    def to_dict(self):
        return {
            "host_id": self.host_id,
            "ip": self.ip,
            "pk": self.pk,
            "bk_biz_id": self.bk_biz_id,
            "bk_biz_name": self.bk_biz_name,
            "os_name": self.os_name,
            "host_name": self.host_name,
            "cloud_name": self.cloud_name,
            "cloud_id": self.cloud_id,
            "is_monitored": self.is_monitored,
            "mem_use": "-",
            "cpu_use": "-",
            "disk_use": "-",
        }

    def get_load_data(self):
        load_query = LoadData.objects.filter(host_id=self.pk).order_by("create_time")
        return [i.to_dict() for i in load_query]

class LoadData(models.Model):
    host_id = models.IntegerField(u"主机ID", default=0)
    cpu = models.IntegerField(u"CPU使用率", default=0)
    mem = models.IntegerField(u"内存使用率", default=0)
    disk = models.IntegerField(u"硬盘使用率", default=0)
    create_time = models.DateTimeField(u"创建时间", auto_now_add=True)

    def to_dict(self):
        return {
            "host_id": self.host_id,
            "cpu": self.cpu,
            "mem": self.mem,
            "disk": self.disk,
            "create_time": parse_datetime_to_timestr(self.create_time)
        }

FastJobApi部分代码

import base64
import time

from conf.default import APP_ID, APP_TOKEN

from blueking.component.shortcuts import get_client_by_user, get_client_by_request

count = 0

class FastJobApi(object):
    def __init__(self, bk_biz_id, bk_cloud_id, ip_list, script_content):
        self.client = get_client_by_user('admin')
        self.username = "admin"
        self.biz_id = bk_biz_id
        self.script_content = script_content
        self.ip_list = [{"bk_cloud_id": bk_cloud_id, "ip": i} for i in ip_list.split(",")]

    def _fast_execute_script(self, execute_account="root", param_content='', script_timeout=1000):
        """
        快速执行脚本
        :param execute_account: 执行脚本的账户名
        :param param_content: 执行脚本的参数
        :param script_timeout: 脚本执行超时时间
        :return: job_instance_id
        """
        kwargs = {
            "bk_app_code": APP_ID,
            "bk_app_secret": APP_TOKEN,
            "bk_biz_id": self.biz_id,
            "bk_username": self.username,
            "script_content": base64.b64encode(self.script_content),
            "ip_list": self.ip_list,
            "script_type": 1,
            "account": execute_account,
            "script_param": base64.b64encode(param_content),
            "script_timeout": script_timeout
        }
        result = self.client.job.fast_execute_script(kwargs)
        if result["result"]:
            return result.get("data").get("job_instance_id")
        return False

    def _get_job_instance_status(self, task_id):
        """
        获取脚本执行状态
        :param task_id: 执行脚本的 job_instance_id
        :return:
        """
        global count
        count += 1
        # 查询执行状态
        resp = self.client.job.get_job_instance_status(
            bk_username=self.username,
            bk_biz_id=self.biz_id,
            job_instance_id=task_id
        )
        if resp.get('data').get('is_finished'):
            count = 0
            return True
        elif not resp.get('data').get('is_finished') and count <= 5:
            time.sleep(2)
            return self._get_job_instance_status(task_id)
        else:
            count = 0
            return False

    def _get_job_instance_log(self, task_id):
        """
        查询作业日志
        :param task_id: 执行脚本的 job_instance_id
        :return:
        """
        if self._get_job_instance_status(task_id):
            resp = self.client.job.get_job_instance_log(
                job_instance_id=task_id,
                bk_biz_id=self.biz_id,
                bk_username='admin'
            )
            return resp['data'][0]['step_results'][0]['ip_logs']

    def execute_script_and_return(self):
        """
        执行脚本并获取脚本执行结果
        :return: 脚本执行结果
        """
        job_instance_id = self._fast_execute_script()
        if job_instance_id:
            ip_logs = self._get_job_instance_log(job_instance_id)
            return ip_logs

实现效果

怎么用python+Element实现主机Host操作

怎么用python+Element实现主机Host操作

怎么用python+Element实现主机Host操作

怎么用python+Element实现主机Host操作

怎么用python+Element实现主机Host操作

感谢各位的阅读,以上就是“怎么用python+Element实现主机Host操作”的内容了,经过本文的学习后,相信大家对怎么用python+Element实现主机Host操作这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是创新互联,小编将为大家推送更多相关知识点的文章,欢迎关注!


标题名称:怎么用python+Element实现主机Host操作
文章来源:http://cdxtjz.com/article/goegpg.html