189 8069 5689

python并发执行函数 python多线程执行函数

python实现多线程并发执行

由于停服维护的需求(服务越来越多的原因),此前编写的shell脚本执行速度缓慢(for循环,这就会很慢),为提高执行速度,参考很多资料,完成此脚本,实现并发执行机制.(当然这是测试脚本,有需要的同学,拿去改ba改ba,应该就可以用了)

创新互联主营锦屏网站建设的网络公司,主营网站建设方案,重庆App定制开发,锦屏h5小程序定制开发搭建,锦屏网站营销推广欢迎锦屏等地区企业咨询

此处脚本参考了

python并发编程-进程池

在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。多进程是实现并发的手段之一,需要注意的问题是:

例如当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个。。。手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。

我们就可以通过维护一个进程池来控制进程数目,比如httpd的进程模式,规定最小进程数和最大进程数..

ps: 对于远程过程调用的高级应用程序而言,应该使用进程池,Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。

创建进程池的类:如果指定numprocess为3,则进程池会从无到有创建三个进程,然后自始至终使用这三个进程去执行所有任务,不会开启其他进程

参数介绍:

方法介绍:

主要方法:

其他方法(了解部分)

应用:

发现:并发开启多个客户端,服务端同一时间只有3个不同的pid,干掉一个客户端,另外一个客户端才会进来,被3个进程之一处理

回调函数:

需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数

我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。

如果在主进程中等待进程池中所有任务都执行完毕后,再统一处理结果,则无需回调函数

python用例并发怎么解决

python-selenium并发执行测试用例(方法一 各模块每一条并发执行)

总执行代码:

# coding=utf-8

import unittest,os,time

import HTMLTestRunner

import threading

import sys

sys.path.append('C:/Users/Dell/Desktop/CARE/program')#使用编辑器,要指定当前目录,不然无法执行第20行代码

def creatsuite():

casedir = []

list = os.listdir(os.path.dirname(os.getcwd()))#获取当前路径的上一级目录的所有文件夹,这里可以改成绝对路径(要搜索的文件路径)

for xx in list:

if "baidu" in xx:

casedir.append(xx)

suite =[]

for n in casedir:

testunit = unittest.TestSuite()

unittest.defaultTestLoader._top_level_dir = None

#(unittest.defaultTestLoader(): defaultTestLoader()类,通过该类下面的discover()方法可自动更具测试目录start_dir匹配查找测试用例文件(test*.py),

并将查找到的测试用例组装到测试套件,因此可以直接通过run()方法执行discover)

discover = unittest.defaultTestLoader.discover(str(n),pattern='tet_*.py',top_level_dir=None)

for test_suite in discover:

for test_case in test_suite:

testunit.addTests(test_case)

suite.append(testunit)

return suite, casedir

def runcase(suite,casedir):

lastPath = os.path.dirname(os.getcwd())#获取当前路径的上一级

resultDir = lastPath+"\\run\\report\\" #报告存放路径

now = time.strftime("%Y-%m-%d %H.%M.%S",time.localtime())

filename = resultDir + now +" result.html"

fp = file(filename, 'wb')

proclist=[]

s=0

for i in suite:

runner = HTMLTestRunner.HTMLTestRunner(stream=fp,title=str(casedir[s])+u'测试报告',description=u'用例执行情况:')

proc = threading.Thread(target=runner.run,args=(i,))

proclist.append(proc)

s=s+1

for proc in proclist:

proc.start()

for proc in proclist:

proc.join()

fp.close()

if __name__ == "__main__":

runtmp=creatsuite()

runcase(runtmp[0],runtmp[1])

python 多进程是真的并发吗

Python提供了非常好用的多进程包multiprocessing,你只需要定义一个函数,Python会替你完成其他所有事情。

借助这个包,可以轻松完成从单进程到并发执行的转换。

1、新建单一进程

如果我们新建少量进程,可以如下:

import multiprocessing

import time

def func(msg):

for i in xrange(3):

print msg

time.sleep(1)

if __name__ == "__main__":

p = multiprocessing.Process(target=func, args=("hello", ))

p.start()

p.join()

print "Sub-process done."12345678910111213

2、使用进程池

是的,你没有看错,不是线程池。它可以让你跑满多核CPU,而且使用方法非常简单。

注意要用apply_async,如果落下async,就变成阻塞版本了。

processes=4是最多并发进程数量。

import multiprocessing

import time

def func(msg):

for i in xrange(3):

print msg

time.sleep(1)

if __name__ == "__main__":

pool = multiprocessing.Pool(processes=4)

for i in xrange(10):

msg = "hello %d" %(i)

pool.apply_async(func, (msg, ))

pool.close()

pool.join()

print "Sub-process(es) done."12345678910111213141516

3、使用Pool,并需要关注结果

更多的时候,我们不仅需要多进程执行,还需要关注每个进程的执行结果,如下:

import multiprocessing

import time

def func(msg):

for i in xrange(3):

print msg

time.sleep(1)

return "done " + msg

if __name__ == "__main__":

pool = multiprocessing.Pool(processes=4)

result = []

for i in xrange(10):

msg = "hello %d" %(i)

result.append(pool.apply_async(func, (msg, )))

pool.close()

pool.join()

for res in result:

print res.get()

print "Sub-process(es) done."1234567891011121314151617181920

2014.12.25更新

根据网友评论中的反馈,在Windows下运行有可能崩溃(开启了一大堆新窗口、进程),可以通过如下调用来解决:

multiprocessing.freeze_support()1

附录(自己的脚本):

#!/usr/bin/python

import threading

import subprocess

import datetime

import multiprocessing

def dd_test(round, th):

test_file_arg = 'of=/zbkc/test_mds_crash/1m_%s_%s_{}' %(round, th)

command = "seq 100 | xargs -i dd if=/dev/zero %s bs=1M count=1" %test_file_arg

print command

subprocess.call(command,shell=True,stdout=open('/dev/null','w'),stderr=subprocess.STDOUT)

def mds_stat(round):

p = subprocess.Popen("zbkc mds stat", shell = True, stdout = subprocess.PIPE)

out = p.stdout.readlines()

if out[0].find('active') != -1:

command = "echo '0205pm %s round mds status OK, %s' /round_record" %(round, datetime.datetime.now())

command_2 = "time (ls /zbkc/test_mds_crash/) 2/round_record"

command_3 = "ls /zbkc/test_mds_crash | wc -l /round_record"

subprocess.call(command,shell=True)

subprocess.call(command_2,shell=True)

subprocess.call(command_3,shell=True)

return 1

else:

command = "echo '0205 %s round mds status abnormal, %s, %s' /round_record" %(round, out[0], datetime.datetime.now())

subprocess.call(command,shell=True)

return 0

#threads = []

for round in range(1, 1600):

pool = multiprocessing.Pool(processes = 10) #使用进程池

for th in range(10):

# th_name = "thread-" + str(th)

# threads.append(th_name) #添加线程到线程列表

# threading.Thread(target = dd_test, args = (round, th), name = th_name).start() #创建多线程任务

pool.apply_async(dd_test, (round, th))

pool.close()

pool.join()

#等待线程完成

# for t in threads:

# t.join()

if mds_stat(round) == 0:

subprocess.call("zbkc -s",shell=True)

break

python里并发执行协程时部分阻塞超时怎么办

在前面的例子里学习了并发地执行多个协程来下载图片,也许其中一个协程永远下载不了,一直阻塞,这时怎么办呢?

碰到这种需求时不要惊慌,可以使用wait()里的timeout参数来设置等待时间,也就是从这个函数开始运行算起,如果时间到达协程没有执行完成,就可以不再等它们了,直接从wait()函数里返回,返回之后就可以判断那些没有执行成功的,可以把这些协程取消掉。例子如下:

[python] view plain copy

import asyncio

async def phase(i):

print('in phase {}'.format(i))

try:

await asyncio.sleep(0.1 * i)

except asyncio.CancelledError:

print('phase {} canceled'.format(i))

raise

else:

print('done with phase {}'.format(i))

return 'phase {} result'.format(i)

async def main(num_phases):

print('starting main')

phases = [

phase(i)

for i in range(num_phases)

]

print('waiting 0.1 for phases to complete')

completed, pending = await asyncio.wait(phases, timeout=0.1)

print('{} completed and {} pending'.format(

len(completed), len(pending),

))

# Cancel remaining tasks so they do not generate errors

# as we exit without finishing them.

if pending:

print('canceling tasks')

for t in pending:

t.cancel()

print('exiting main')

event_loop = asyncio.get_event_loop()

try:

event_loop.run_until_complete(main(3))

finally:

event_loop.close()

结果输出如下:

starting main

waiting 0.1 for phases to complete

in phase 0

in phase 2

in phase 1

done with phase 0

1 completed and 2 pending

canceling tasks

exiting main

phase 1 canceled

phase 2 canceled


文章标题:python并发执行函数 python多线程执行函数
分享地址:http://cdxtjz.com/article/doohhps.html

其他资讯