Python
安装
Anaconda 是一个用于科学计算的 Python 发行版,安装好了 Anaconda 就相当于安装好了 Python,支持 Linux, Mac, Windows, 它里面集成了众多流行的科学计算、数据分析的 Python 包。可前往 管网 或 清华镜像 下载 Anaconda 安装包。
安装成功后,打开 Anaconda Powershell Prompt(适用于 Windows 系统;Linux 直接在命令行中输入即可) 查看 Python 版本,命令如下:
python --version # 或者 python -VConda 和 PyPI 都是 Python 生态系统中的软件包管理平台;PyPI 是 Python 的官方第三方软件包仓库,Conda 是一个开源的跨平台包管理工具,不仅可以管理 Python 包,还可以管理其他语言(如 R、C++ 等);推荐优先使用 Conda,若无法下载依赖,再使用 PyPI;
可以使用 conda 和 pip 工具从 Conda、PyPI 仓库中下载包,他们的用法相似,如下:
conda create -n myenv # 创建环境;参考下节
conda activate myenv # 激活环境
conda install numpy # 安装包
conda list # 查看已安装的包
conda search some-packege # 搜索包pip_search some-packege # 搜索包;需额外安装 pip install pip-search
pip install requests # 安装包
pip install --upgrade requests # 升级包
pip uninstall requests # 卸载包
pip list # 查看已安装的包提示
有些依赖在 Conda 中有,但在 PyPI 中没有,反之亦然;优先使用 Conda,下载不下来再使用 PyPI。
由于国内下载依赖十分缓慢,使用清华提供的镜像加速仓库,能有效提升下载速度,配置如下:
# 参考地址:https://mirrors.tuna.tsinghua.edu.cn/help/anaconda/
# 来到 Anaconda 安装目录,找到 .condarc 文件,替换全文,粘贴如下内容
# 粘贴完成后,运行 conda clean -i 指令清除索引,使配置生效
# 执行 conda info 查看配置是否生效
channels:
- defaults
show_channel_urls: true
default_channels:
- https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main
- https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/r
- https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/msys2
custom_channels:
conda-forge: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
pytorch: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud# 参考地址:https://mirrors.tuna.tsinghua.edu.cn/help/pypi/
# 升级 pip 到最新的版本
python -m pip install --upgrade pip
# 配置镜像加速(永久)
pip config set global.index-url https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
# 查看配置是否生效
pip config listConda 环境
类似于 mvn 管理 Node 版本,Conda 环境是一个独立的 Python 运行环境,包含特定版本的 Python 和一组安装的包。通过创建不同的环境,可以隔离不同项目的依赖,避免包之间的冲突。
打开 Anaconda Powershell Prompt(适用于 Windows 系统),终端会提示当前环境名称,初始为 base。
# 创建新的 Conda 环境;python=xxx 可选,若不指定,默认和当前环境 Python 版本一致
conda create -n myenv python=3.11
# 激活环境
conda activate myenv
# 退出环境
conda deactivate
# 查看所有环境
conda info --envs
# 删除指定环境
conda env remove --name myenv
# 查看 Conda 版本
conda --version # 或者 conda -V提示
conda config --set auto_activate_base false离线导入依赖
conda
conda activate 环境名 # 可新建一个环境用于测试 conda install xxx # 安装依赖 # 导出所有依赖列表(可选,手动安装时会用到,这里只列举一下,不会使用) conda list --export > D:\test\conda_requirements.txt # 将当前环境的所有包打包成一个压缩文件;可能需要下载依赖:conda install conda-pack # 注意,压缩包内容,不同系统会有差异,建议源机器和目标机器操作系统、Python 版本保持一致! conda pack -n 环境名 -o D:\test\myenv-pkg.tar.gz # 离线导入 # 解压 D:\test\myenv-pkg.tar.gz ,在 Windows 系统中,双 # 击 Scripts 中的 activate.bat 即可将环境导入到目标机器 # 在 Linux 系统中,执行如下指令即可: source ./myenv-pkg/bin/activatepip
注意,为保证兼容性,建议源机器和目标机器使用的操作平台和 Python 版本一致;不一致也无所谓,因为 pip 下载能手动指定操作平台和 Python 版本。
离线导入单个依赖# 在能联网的机器时,随便进入一个目录,并执行下载包命令,如下: cd D:\test\py_package # 简单下载;这种方式下载的依赖,关联的操作系统和 Python 版本和本机一致 pip download pdf2docx # 指定平台、版本下载;这种方式,即使在 Mac 机器上,也能为 Windows 电脑下载依赖 # --only-binary=:all: 表示只下载二进制包(.whl 文件),而不下载源码包(.tar.gz 文件);若不指 # 定,pip 可能会尝试下载源码包并在本地编译,这在离线环境中可能不可行 pip download pdf2docx --platform win_amd64 --python-version 3.12.7 --only-binary=:all: # 下载完成后,看到一堆 .whl 或 .tar.gz 文件,将所有内容拷贝到离线的电脑中,执行如下指令 cd D:\test\py_package # --no-index 表示不要从 PyPI 下载;--find-links=. 表示当前目录查找包 pip install --no-index --find-links=. pdf2docx离线导入全部依赖# 这种方式直接将源机器的所有依赖导入至目标机器 # 使用 Anaconda Powershell Prompt(适用于 Windows 系统;Linux 直接在命令行中输入即可) 执行如下命令,导出已安装的库列表到一个文件中 pip freeze > D:\test\requirements.txt # 下载 requirements.txt 中列出的所有库及其依赖 cd D:\test\py_package_all pip download -r ../requirements.txt # 简单下载 pip download -r ../requirements.txt --platform win-amd64 --python-version 3.12.7 --only-binary=:all: # 离线导入 cd D:\test\py_package_all # 安装目标机器上没有的库,若目标存在更高或相同版本,则跳过;存在更低版本,则覆盖; pip install --no-index --find-links=. -r ../requirements.txt # 若目标存在相同版本,跳过;不管存在更高或更低版本,都依照 requirements.txt 指定的版本安装 pip install --no-index --find-links=. -r ../requirements.txt --upgrade使用如下 Python 代码可查看当前系统的操作平台:
import sysconfig print(sysconfig.get_platform())提示
不推荐直接复制 site-packages 目录,pip 或 conda 无法管理直接复制的库,也可能存在一些兼容性问题。若两台机器环境完全一致,可以尝试。site-packages 目录用于存放已安装的依赖,使用如下代码查看其所在目录:
import site print(site.getsitepackages())
基础
Python 的基础语法可参考 Python 中文网 学习。
数组
NumPy (Numerical Python)模块是 Python 语言的一个科学计算的第三方模块。NumPy 模块可以构建多维数据的容器,将各种类型的数据快速地整合在一起,完成多维数据的计算及大型矩阵的存储和处理。
import numpy as np
# 创建数组(一维数组)
a = np.array([1, 2, 3, 4, 5, 6, 7])
# 创建数组(二维数组)
c = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
# 起始值为 1、结束值为 20(结果不含该值)、步长(不传默认为 1)为 4 的等差数列
d = np.arange(1, 20, 4)
# 创建 [0,1) 区间内的随机数
e = np.random.rand(3)
f = np.random.rand(2, 3) # 创建 [0,1) 区间内的二维随机数(2 行 3 列)
# 创建符合标准正态分布(均值为 0,标准差为 1)的随机数
g = np.random.randn(3)
# 创建 [1,5) 区间内的随机整数,共十个元素
h = np.random.randint(1, 5, 10)
i = np.random.randint(1, 5, (4, 2)) # 创建 [1,5) 区间内的随机整数二维数组,共 4 行 2 列# 查看数组行数、列数、维度、元素个数、数据类型
print(f'行列:{c.shape} 维度:{c.ndim} 个数:{c.size} 数据类型:{c.dtype}')
# 获取数组元素
print(f'第二个元素{a[1]},倒数第一个元素{a[-1]},索引为 1 到 4(不包含 4)的元素{a[1:4]}')
print(f'索引为 1 到 6(不包含 6),步长为 2 间隔获取元素{a[1:6:2]}')
print(f'二维数组第二行:{c[1]},第二行第三列:{c[1, 2]},索引为 1 到 2(不包含 2):{c[1:2]}')
# 运算
print(f'原数组 a:{a},和:{a.sum()},平均值:{a.mean()},最大值:{a.max()}')# 添加数组元素
j = np.append(c, [10, 11, 12]) # 二维数组末尾追加一维数组,得到的结果是一维数组
k = np.append(c, [[10, 11, 12]], axis=0) # 二维数组末尾追加二维数组;axis 维度 0 表示行,1 表示列
print(f'j 追加后:{j},k 追加后:{k},原数组 c 不会改变:{c}')
l = np.insert(c, 1, [10, 11, 12]) # 二维数组追加一维数组,在指定索引 1 处插入
m = np.insert(c, 1, [10, 11, 12], axis=0) # 二维数组追加二维数组;axis 维度 0 表示行
print(f'l 追加后:{l},m 追加后:{m},原数组 c 不会改变:{c}')
# 删除数组元素
n = np.delete(c, 2)
o = np.delete(c, 2, axis=0)
print(f'n 删除后:{n},o 删除后:{o},原数组 c 不会改变:{c}')# 去重
p = np.unique(h)
print(f'p 去重后:{p},原数组 h 不会改变:{h}')
# 拼接
q = np.concatenate((a, d), axis=0) # axis 维度 0 表示行,一维数组可以不写
print(f'q 拼接后:{q},原数组 a:{a},原数组 d:{d}')爬虫
爬虫是指按照一定的规则自动地从网页上抓取数据的代码或脚本,它能模拟浏览器对存储指定网页的服务器发起请求,从而获得网页的源代码,再从源代码中提取出需要的数据。
Python 的第三方模块 requests,可以模拟浏览器发起 HTTP 或 HTTPS 协议的网络请求,从而获取网页源代码。
Python 的第三方模块 BeautifulSoup,能将 HTML 或 XML 文档加载为 BeautifulSoup 对象,使获取 DOM 十分简单,适用于静态页面快速抓取。
Python 的第三方模块 Selenium,能模拟浏览器操作(点击、输入、滚动等),适用于需要交互的页面,也能获取 DOM,但功能没有 BeautifulSoup 强大。
简单案例
通过爬取当当网的图书销售排行榜数据,了解爬虫的基本流程:
- 导入 requests 模块,使用 requests.get() 方法获取网页源代码;
- 使用第三方模块 BeautifulSoup 解析网页源代码,该解析器能将网页源代码加载为 BeautifulSoup 对象,使获取 DOM 十分简单;
- 将数据保存到本地文件中;
以下是代码实现:
import requests
# pandas 模块是基于 NumPy 模块开发的,它不仅能直观地展现数据的结构,还具备强大的数据处理和分析功能
import pandas as pd
from bs4 import BeautifulSoup
data_info = {
'图书排名': [],
'图书名称': [],
'图书作者': [],
'图书出版时间': [],
'图书出版社': [],
'图书价格': []
}
def safe_select_text(element, selector, default=""):
"""安全提取文本,避免 IndexError"""
selected = element.select(selector)
return selected[0].text.strip() if selected else default
def parse_html(soup):
li_list = soup.select('.bang_list li')
for li in li_list:
data_info['图书排名'].append(safe_select_text(li, '.list_num').replace('.', ''))
data_info['图书名称'].append(safe_select_text(li, '.name a'))
# 提取作者(防止 IndexError)
publisher_info = li.select('.publisher_info')
author = safe_select_text(publisher_info[0], 'a') if publisher_info else ""
data_info['图书作者'].append(author)
# 提取出版时间
data_info['图书出版时间'].append(safe_select_text(li, '.publisher_info span'))
# 提取出版社(防止 IndexError)
publisher = safe_select_text(publisher_info[1], 'a') if len(publisher_info) > 1 else ""
data_info['图书出版社'].append(publisher)
# 提取价格(防止 IndexError)
price_text = safe_select_text(li, '.price .price_n').replace('¥', '')
data_info['图书价格'].append(float(price_text) if price_text else 0.0)
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36'
}
for i in range(1, 26):
url = f'http://bang.dangdang.com/books/bestsellers/01.00.00.00.00.00-recent30-0-0-1-{i}'
response = requests.get(url=url, headers=headers, timeout=10)
html_content = response.text
soup = BeautifulSoup(html_content, 'lxml') # 用读取的 HTML 文档实例化一个 BeautifulSoup 对象,第 2 个参数 lxml 表示指定解析器为 lxml
parse_html(soup)
print(f'第{i}页爬取完毕')
# 将 Python 字典(或其他结构化数据)转换成 DataFrame(表格型数据结构)
book_info = pd.DataFrame(data_info)
# 检查缺失值和重复值
print(book_info.isnull().sum()) # 每列的缺失值数量
print(book_info.duplicated().sum()) # 重复行数量
# 改进:使用 .loc[] 避免链式赋值(Pandas 3.0 兼容)
book_info.loc[book_info['图书价格'] > 100, '图书价格'] = None # 正确写法
# 删除缺失值
book_info = book_info.dropna()
# 保存为 CSV
book_info.to_csv('当当网图书销售排行。csv', encoding='utf-8', index=False)
print("数据已保存至当前 py 文件所在目录: 当当网图书销售排行。csv")进阶案例
任务目标: 使用 Selenium 模块模拟登录淘宝
准备
在 Python 中 Selenium 模块之前,需要先在系统中下载和安装浏览器驱动程序(驱动会被 Selenium 加载为一个浏览器对象,模拟浏览器各种操作),不同的浏览器有不同的驱动程序(如谷歌叫 ChromeDriver、火狐叫 GeckoDriver 等)。
以谷歌为例,打开 谷歌驱动官方下载地址,找到和谷歌浏览器对应版本的驱动,下载解压后,会得到一个 chromedriver.exe 文件,将其放到任意位置即可。通过如下代码测试是否生效:
from selenium import webdriver from selenium.webdriver.chrome.service import Service # 注意驱动路径应修改为自己的路径 service = Service(r'D:\application_other\chromedriver-win64\chromedriver.exe') browser = webdriver.Chrome(service=service) browser.get('https://www.taobao.com') browser.implicitly_wait(5) # 设置隐式等待,超时时间为 5 秒,防止网页未加载完就进入后续操作执行后,会自动打开谷歌浏览器,访问淘宝网。
模拟登录淘宝
from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By import time # 导入 time 模块,用于时间处理 # 1. 打开淘宝网首页 service = Service(r'D:\application_other\chromedriver-win64\chromedriver.exe') browser = webdriver.Chrome(service=service) browser.get('https://www.taobao.com') browser.implicitly_wait(5) # 设置隐式等待,超时时间为 3 秒,防止网页未加载完就进入后续操 # 2. 点击去登录按钮 to_login_page_butt = browser.find_element(By.XPATH, '//*[@id="J_SiteNavLogin"]/div[1]/div[1]/a[1]') # 某个标签的 XPATH 可直接用浏览器的标签选择器复制 to_login_page_butt.click() # 跳转到登录页 browser.implicitly_wait(3) # 设置隐式等待,超时时间为 3 秒,防止网页未加载完就进入后续操 browser.maximize_window() # 将窗口最大化,以便截取未变形和未缩放的登录页面 # 3. 点击密码登录选项卡 login_pass_tab = browser.find_element(By.CLASS_NAME, 'password-login-tab-item') # 通过 class 属性值定位“密码登录”标签 login_pass_tab.click() time.sleep(3) # 模拟真实的人类用户 # 4. 输入账号密码登录 username_input = browser.find_element(By.NAME, 'fm-login-id') # 通过标签的 name 属性值定位账号输入框 password_input = browser.find_element(By.NAME, 'fm-login-password') # 通过标签的 name 属性值定位账号输入框 username_input.send_keys('18282998944') # 在账号输入框中输入账号 time.sleep(3) password_input.send_keys('xxx') # 在密码输入框中输入密码 time.sleep(3) login_butt = browser.find_element(By.XPATH, '//*[@id="login-form"]/div[6]/button') # 定位到登录按钮 login_butt.click()
并发编程
多进程示例
多进程任务适合 CPU 密集型任务,如计算密集型任务(如科学计算、数据处理等),不适合 IO 密集型任务(如网络请求、文件读写等)。
import multiprocessing
import time
import os
# 模拟第一个耗时任务(有返回值)
def worker_one(task_id):
time.sleep(3) # 模拟工作耗时
print(f"任务 {task_id} 由进程 (PID: {os.getpid()}) 完成")
return f"返回了{task_id}"
# 模拟第二个耗时任务(无返回值)
def worker_two(task_id, user_name):
time.sleep(3) # 模拟工作耗时
print(f"任务 {task_id} 由进程 (PID: {os.getpid()}) 完成 -- 用户名为 {user_name}")
if __name__ == '__main__': # 在 Windows 系统上需要加上此行代码,避免子进程无限递归创建的问题
print(f"主进程 (PID: {os.getpid()}) 启动")
# 创建进程池(3 个进程)
with multiprocessing.Pool(3) as pool:
# 提交 4 个任务到进程池
pool.map(worker_one, [1, 2, 3, 4]) # 同步 map(接收多个任务,阻塞主进程直到完成)
map_async_result = pool.map_async(worker_one, [1, 2, 3, 4]) # 异步 map(非阻塞)
apply_async_result = pool.apply_async(worker_one, (9,)) # 异步 apply(接收单个任务),注意这里的 (9,) 代表序列(任务的参数 args),如果只有一个参数,需要加逗号
pool.starmap(worker_two, [(1, 'zs'), (2, 'ls'), (3, 'ww'), (4, 'zl')]) # 传递多个参数,同步 starmap(阻塞主进程直到完成)
starmap_async_result = pool.starmap_async(worker_two, [(1, 'zs'), (2, 'ls'), (3, 'ww'), (4, 'zl')]) # 异步 starmap(非阻塞)
print("主进程:所有任务已提交,等待异步任务完成。..")
# 获取异步任务结果
map_async_output = map_async_result.get()
# with 语句块会在结束时自动关闭进程池并等待所有任务完成,因此无需主动调用 pool.close() 或 pool.join() 方法
print("主进程:所有任务完成!")
print(f"所有任务结果:{map_async_output}")多线程示例
由于 GIL 的存在,多线程更适合 I/O 密集型任务(网络请求、文件读写等),而不是 CPU 密集型任务(计算密集型)。当多个线程访问共享资源时,需要使用锁机制,以下为简单示例,未涉及共享资源,因此不需要锁。
import concurrent.futures
import time
import threading
# 模拟第一个耗时任务
def worker_one(task_id):
# 获取当前线程信息
thread_id = threading.get_ident()
thread_name = threading.current_thread().name
time.sleep(1) # 模拟工作耗时
result = f"任务 {task_id} 由线程 (ID: {thread_id}, 名称:{thread_name}) 完成"
return result
# 模拟第二个耗时任务
def worker_two(task_id, user_name):
# 获取当前线程信息
thread_id = threading.get_ident()
thread_name = threading.current_thread().name
time.sleep(1) # 模拟工作耗时
result = f"任务 {task_id} 由线程 (ID: {thread_id}, 名称:{thread_name}) 完成 -- 用户名为 {user_name}"
return result
if __name__ == '__main__':
print(f"主线程 (ID: {threading.get_ident()}, 名称:{threading.current_thread().name}) 启动")
# 创建线程池(3 个线程)
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# 1. 同步 map(阻塞主线程直到完成)
print(">>> 开始同步 map 任务")
map_result = list(executor.map(worker_one, [1, 2, 3, 4]))
# 2. 异步 map(非阻塞)
print(">>> 开始异步 map 任务")
map_future = executor.map(worker_one, [5, 6, 7, 8])
# 3. 异步 submit(单个任务)
print(">>> 开始异步 submit 任务")
apply_future = executor.submit(worker_one, 9)
# 4. 使用 map 处理多参数函数(需要 zip 技巧)
print(">>> 开始多参数任务")
# 准备参数列表
task_ids = [1, 2, 3, 4]
user_names = ['zs', 'ls', 'ww', 'zl']
# 使用 lambda 和 zip 处理多参数
starmap_result = list(executor.map(lambda args: worker_two(*args), zip(task_ids, user_names)))
# 5. 异步 submit 处理多参数任务
print(">>> 开始异步多参数任务")
starmap_futures = [executor.submit(worker_two, i, name) for i, name in zip([5, 6, 7, 8], user_names)]
print("主线程:所有任务已提交,等待异步任务完成。..")
# 获取异步任务结果
map_async_output = list(map_future) # map 结果转换为列表
apply_async_output = apply_future.result() # 单个 submit 任务结果
starmap_async_output = [f.result() for f in concurrent.futures.as_completed(starmap_futures)]
print("\n===== 所有任务结果 =====")
print(f"同步 map 结果:{map_result}")
print(f"异步 map 结果:{map_async_output}")
print(f"异步 submit 结果:{apply_async_output}")
print(f"同步 starmap 结果:{starmap_result}")
print(f"异步 starmap 结果:{starmap_async_output}")
print("主线程:所有任务完成!")多进程和多线程对比
| 特性 | 多进程 (multiprocessing.Pool) | 多线程 (ThreadPoolExecutor) |
|---|---|---|
| 执行方式 | 真正的并行(多核 CPU) | 并发(受 GIL 限制,适合 I/O 密集型) |
| 内存隔离 | 完全隔离(独立内存空间) | 共享内存(需要同步机制) |
| 创建开销 | 较大(需要复制内存) | 较小(轻量级) |
| 通信方式 | 需要 IPC(队列、管道等) | 可直接共享变量(需线程安全) |
| 适用场景 | CPU 密集型任务 | I/O 密集型任务 |
| 错误处理 | 子进程崩溃不影响主进程 | 线程崩溃可能导致整个程序崩溃 |
| 资源限制 | 受 CPU 核心数限制 | 可创建大量线程(但有效并发受 GIL 限制) |

