当前位置: 首页 > news >正文

祥云网站建设网络营销推广是做什么的

祥云网站建设,网络营销推广是做什么的,在哪个网站可以做试卷,网站域名注册信息概念解析 异步编程是一种允许程序在等待 I/O 操作(如网络请求、文件读写)时不被阻塞,转而执行其他任务的编程范式。 在 Python 中,其核心实现概念如下: 协程(Coroutines) 定义:可暂停…

概念解析

异步编程是一种允许程序在等待 I/O 操作如网络请求、文件读写)时不被阻塞,转而执行其他任务的编程范式。

在 Python 中,其核心实现概念如下:

协程(Coroutines)

  • 定义:可暂停执行并在合适时机恢复的函数,通过async def关键字声明。
  • 特点:非抢占式调度,协程主动通过await让出执行权,适用于处理高并发 I/O 场景。

事件循环(Event Loop)

  • 定义:异步编程的 “调度器”,负责管理协程的执行顺序、监控 I/O 事件状态,并在事件就绪时恢复对应协程。
  • 核心逻辑:循环检查可执行的协程任务,按事件触发顺序调度执行。

任务(Tasks)

  • 定义:对协程的封装,代表一个独立的异步操作单元,通过asyncio.create_task()创建。
  • 功能:支持取消任务、等待任务完成(await task)或获取任务状态。

Future 对象

  • 定义:表示一个尚未完成的异步操作结果,本质是协程间传递状态的载体。
  • 作用:当协程需要等待某个异步操作的结果时,可通过await Future暂停执行,待操作完成后获取结果。

await 表达式

  • 定义:协程中用于暂停执行的关键字,用于等待另一个协程、Task 或 Future 对象的完成。
  • 示例:result = await async_function(),表示暂停当前协程,直到async_function执行完毕并返回结果。

应用场景

异步编程尤其适用于I/O 密集型任务如 HTTP 请求、数据库查询、文件操作等)。
通过减少线程切换开销和 CPU 闲置时间,显著提升程序的并发处理能力。
相比多线程编程,异步编程在高并发场景下更轻量、更高效。


asyncio库详解

Python 3.4 引入了 asyncio 库,作为异步编程的核心组件,事件循环是 asyncio 的核心。
Windows 上使用 ProactorEventLoop,在 Unix 上使用 SelectorEventLoop

Unix系统(SelectorEventLoop)

  • 基于selectors 模块对底层 I/O 多路复用机制(如select、poll、epoll、kqueue)的抽象。
  • 采用 “就绪通知” 机制:监视文件描述符(如套接字)状态,当 I/O 操作就绪时(如可读 / 可写)通知应用程序。
  • 优势场景1:擅长处理大量并发连接(如 Linuxepoll机制支持高效事件驱动)。
  • 优势场景2:适用于网络服务器、高并发 I/O 场景。

Windows系统(ProactorEventLoop)

  • 基于 Windows 专有 I/O 完成端口(IOCP)机制,属于系统级异步 I/O 框架。
  • 采用 “完成通知” 机制:异步启动 I/O 操作,操作完成后由系统主动通知程序。
  • 优势场景1:深度优化 Windows 平台特性,支持全类型异步 I/O 操作(含文件 I/O)。
  • 优势场景1:在 Windows 环境下性能通常优于 SelectorEventLoop

两种事件循环的关键差异

维度SelectorEventLoop(Unix)ProactorEventLoop(Windows)
通知机制等待 I/O 就绪后执行操作(“询问式”:Can I read/write?异步启动 I/O 操作,完成后被动接收通知(“回调式”:Notify when done
API 覆盖范围部分平台可能不支持全类型文件 I/O 操作原生支持 Windows 所有异步 I/O 操作(如管道、套接字、文件)
性能特点Unix 系统下,依赖epoll/kqueue等机制实现高效并发Windows 下利用 IOCP 机制实现低延迟、高吞吐量

事件循环管理

import asyncio# 获取事件循环
loop = asyncio.get_event_loop()# 运行协程直到完成
loop.run_until_complete(my_coroutine())# 运行事件循环直到stop()被调用
loop.run_forever()# 关闭事件循环
loop.close()

协程定义与执行

async def fetch_data(url):print(f"开始获取数据: {url}")await asyncio.sleep(2)  # 模拟I/O操作print(f"数据获取完成: {url}")return f"来自 {url} 的数据"# Python 3.7+ 推荐方式
async def main():result = await fetch_data("example.com")print(result)asyncio.run(main())  # Python 3.7+引入,简化了事件循环管理

任务创建与管理

async def main():# 创建任务task1 = asyncio.create_task(fetch_data("site1.com"))task2 = asyncio.create_task(fetch_data("site2.com"))# 等待所有任务完成results = await asyncio.gather(task1, task2)print(results)# 并发运行多个协程results = await asyncio.gather(fetch_data("site3.com"),fetch_data("site4.com"))

超时管理

async def main():try:# 设置超时result = await asyncio.wait_for(fetch_data("example.com"), timeout=1.0)except asyncio.TimeoutError:print("操作超时")

同步与异步代码结合

import concurrent.futuresdef cpu_bound_task(x):# 计算密集型任务return x * xasync def main():# 使用线程池执行阻塞I/Oloop = asyncio.get_running_loop()with concurrent.futures.ThreadPoolExecutor() as pool:result = await loop.run_in_executor(pool, cpu_bound_task, 10)print(result)

高并发场景实战案例

案例1: 并发网络请求

import asyncio
import aiohttp
import timeasync def fetch(session, url):async with session.get(url) as response:return await response.text()async def fetch_all(urls):async with aiohttp.ClientSession() as session:tasks = [fetch(session, url) for url in urls]results = await asyncio.gather(*tasks)return results# 测试URLs
urls = ["https://www.google.com","https://www.github.com","https://www.python.org",# 可添加更多URL
] * 5  # 重复请求以增加数量async def main():start = time.time()results = await fetch_all(urls)end = time.time()print(f"获取了 {len(results)} 个页面,耗时: {end - start:.2f} 秒")# 运行
asyncio.run(main())

案例2: 异步数据库操作

使用asyncpg进行PostgreSQL异步操作

import asyncio
import asyncpgasync def create_tables(conn):await conn.execute('''CREATE TABLE IF NOT EXISTS users(id SERIAL PRIMARY KEY,name TEXT,email TEXT)''')async def insert_users(conn, users):# 批量插入await conn.executemany('INSERT INTO users(name, email) VALUES($1, $2)',users)async def fetch_users(conn):return await conn.fetch('SELECT * FROM users')async def main():# 连接数据库conn = await asyncpg.connect(user='postgres',password='password',database='testdb',host='127.0.0.1')# 创建表await create_tables(conn)# 生成测试数据test_users = [('User1', 'user1@example.com'),('User2', 'user2@example.com'),('User3', 'user3@example.com'),]# 插入数据await insert_users(conn, test_users)# 查询数据users = await fetch_users(conn)for user in users:print(f"ID: {user['id']}, Name: {user['name']}, Email: {user['email']}")# 关闭连接await conn.close()# 运行
asyncio.run(main())

案例3: 异步Web爬虫

import asyncio
import aiohttp
from bs4 import BeautifulSoup
import timeasync def fetch(session, url):async with session.get(url) as response:return await response.text()async def parse(html):# 使用BeautifulSoup解析HTMLsoup = BeautifulSoup(html, 'html.parser')# 获取所有链接links = [a.get('href') for a in soup.find_all('a') if a.get('href')]return linksasync def crawl(url, max_depth=2):visited = set()async def _crawl(current_url, depth):if depth > max_depth or current_url in visited:returnvisited.add(current_url)print(f"正在爬取: {current_url}")try:async with aiohttp.ClientSession() as session:html = await fetch(session, current_url)links = await parse(html)# 过滤出同域名链接base_url = '/'.join(current_url.split('/')[:3])same_domain_links = [link if link.startswith('http') else f"{base_url}{link}"for link in links if link and (link.startswith('http') or link.startswith('/'))]# 创建子任务继续爬取tasks = [_crawl(link, depth + 1) for link in same_domain_links[:5]  # 限制每页最多爬5个链接]await asyncio.gather(*tasks)except Exception as e:print(f"爬取 {current_url} 出错: {e}")await _crawl(url, 0)return visitedasync def main():start = time.time()visited = await crawl("https://python.org", max_depth=1)end = time.time()print(f"爬取了 {len(visited)} 个页面,耗时: {end - start:.2f} 秒")# 运行
asyncio.run(main())

案例4: 异步API服务器处理大量并发请求

使用FastAPI构建高并发API服务

from fastapi import FastAPI, BackgroundTasks
import asyncio
import uvicorn
import time
import randomapp = FastAPI()# 模拟数据库
db = {}# 模拟异步数据库操作
async def db_operation(key, delay=None):if delay is None:delay = random.uniform(0.1, 0.5)  # 随机延迟模拟真实场景await asyncio.sleep(delay)return db.get(key)# 模拟耗时任务
async def process_item(item_id):print(f"开始处理项目 {item_id}")await asyncio.sleep(5)  # 模拟耗时操作print(f"项目 {item_id} 处理完成")return {"item_id": item_id, "status": "processed"}# 常规端点
@app.get("/items/{item_id}")
async def read_item(item_id: str):result = await db_operation(item_id)return {"item_id": item_id, "value": result}# 批量操作端点
@app.get("/batch")
async def batch_operation(items: str):item_ids = items.split(",")tasks = [db_operation(item_id) for item_id in item_ids]results = await asyncio.gather(*tasks)return dict(zip(item_ids, results))# 后台任务
@app.post("/items/{item_id}/process")
async def process(item_id: str, background_tasks: BackgroundTasks):background_tasks.add_task(process_item, item_id)return {"message": f"Processing item {item_id} in the background"}# 负载测试端点
@app.get("/load-test/{count}")
async def load_test(count: int):start = time.time()# 创建多个并发任务tasks = []for i in range(count):# 随机延迟delay = random.uniform(0.1, 0.5)tasks.append(asyncio.sleep(delay))# 并发执行所有任务await asyncio.gather(*tasks)end = time.time()return {"tasks_completed": count,"time_taken": f"{end - start:.2f} 秒","tasks_per_second": f"{count / (end - start):.2f}"}# 初始化一些测试数据
@app.on_event("startup")
async def startup_event():for i in range(1000):db[str(i)] = f"value-{i}"if __name__ == "__main__":uvicorn.run(app, host="0.0.0.0", port=8000)
http://www.ds6.com.cn/news/4064.html

相关文章:

  • 做网站最好产品营销策略
  • 网站导航条专门做页面跳转首页排名seo
  • 30人的网站建设公司年利润是多少关键词优化排名详细步骤
  • wordpress the 7幻灯片老铁seo外链工具
  • 做服装外单的网站有哪些电脑培训学校排名
  • 新乡专业做淘宝网站去除痘痘怎么有效果
  • 网站备案怎么做抖音指数查询
  • 城市绿化建设英文网站seo网络推广是什么意思
  • 优秀的电商设计网站合肥网站推广公司
  • 学校门户网站购买模板建站
  • 石家庄信息门户网站定制会计培训班初级费用
  • 北京微信网站建设电话抖音广告怎么投放
  • wordpress广告插件网站点击排名优化
  • 网站的首页怎么做如何在互联网推广自己的产品
  • 网站被攻击空间关了怎么办微信营销的方法
  • 哈尔滨公司做网站住房和城乡建设部
  • 淘宝上做网站的信得过吗产品软文是什么
  • 手机网页怎么做出来的重庆seo网络优化师
  • 商业网站建设规划范文网络推广学校
  • 想学做网站从哪里入手如何写营销软文
  • 网站开发需求表百度怎么发广告
  • 宝鸡做网站电话营销心得体会感悟300字
  • 小型企业网站开发公司网站排名优化培训课程
  • 淘宝客网站开发制作网站的软件有哪些
  • 广州荔湾网站建设星力游戏源码
  • 厦门网站排名中国新闻发布
  • 做整体衣柜宣传海报的网站最近热点新闻事件2023
  • 企业网站管理系统设置网络广告策划书模板范文
  • 网站平台系统设计公司营销策划方案ppt范文
  • 潍坊网站制作发指数是什么