视频链接:https://www.bilibili.com/video/BV1SV4y1i7bW
文章目录
- [蒙特卡洛方法] 02 重要性采样(importance sampling)及 python 实现
- Basics
- 实现重要性采样
- [蒙特卡洛方法] 03 接受/拒绝采样(accept/reject samping)初步 cases 分析
- Basics
- Examples
- 采样效率
- [蒙特卡洛方法] 04 重要性采样补充,数学性质及 On-policy vs. Off-policy
- p ( x ) p(x) p(x) v.s. q ( x ) q(x) q(x)
- RL: On-policy v.s. Off-policy
- [蒙特卡洛方法] 05 接受拒绝采样框架、原理推导与分析(MCMC 基础)
- 06 马尔可夫链(Markov Chain)与稳态分布和细致平稳条件(detailed balance)
- 2状态MC链(满足细致平稳)
- 3状态MC链(不满足细致平稳)
- 随机游走
[蒙特卡洛方法] 02 重要性采样(importance sampling)及 python 实现
Basics
- 计算 f ( x ) f(x) f(x) w.r.t p ( x ) p(x) p(x)(distribution) 的期望
⟨ f ⟩ p = ∫ f ( x ) p ( x ) d x \langle f \rangle_{p} =\int f(x)p(x)dx ⟨f⟩p=∫f(x)p(x)dx
- 在一些场景下,很难尤其解析解形式,所以要转向数值解,
- 如果可以正确地在 p ( x ) p(x) p(x) 中进行采样,假如采样 N N N 次,得到 N N N 个样本: x 1 , x 2 , ⋯ , x N x_1,x_2,\cdots,x_N x1,x2,⋯,xN
⟨ f ⟩ p ≈ 1 N ∑ i f ( x i ) x i ∼ p ( x ) \langle f \rangle_{p} \approx \frac{1}{N}\sum_i f(x_i)_{x_i\sim p(x)} ⟨f⟩p≈N1i∑f(xi)xi∼p(x)
- 现在的问题在于
p
(
x
)
p(x)
p(x) 很难被采样,这个时候引入
q
(
x
)
q(x)
q(x)(也是一个概率分布,容易被采样)来间接地计算:
-
q
(
x
)
q(x)
q(x):称为重要性分布(importance distribution),所谓的重要性采样,即是从这样一个简单的重要性分布中进行采样
- 从 q ( x ) q(x) q(x) 采样 N N N 次,得到 N N N 个样本: x 1 , x 2 , ⋯ , x N x_1,x_2,\cdots,x_N x1,x2,⋯,xN
-
w
(
x
)
=
p
(
x
)
q
(
x
)
w(x)=\frac{p(x)}{q(x)}
w(x)=q(x)p(x):称为 importance weight
⟨ f ⟩ p = ∫ f ( x ) p ( x ) q ( x ) q ( x ) d x = ∫ f ( x ) w ( x ) q ( x ) d x , w ( x ) = p ( x ) q ( x ) = ⟨ f ⋅ w ⟩ q ≈ 1 N ∑ i N f ( x i ) w ( x i ) x i ∼ q ( x ) \begin{split} \langle f \rangle_{p}&=\int f(x)\frac{p(x)}{q(x)}q(x)dx\\ &=\int f(x)w(x)q(x)dx, \qquad w(x)=\frac{p(x)}{q(x)}\\ &=\langle f\cdot w\rangle_q\\ &\approx\frac{1}{N}\sum_i^Nf(x_i)w(x_i)_{x_i\sim q(x)} \end{split} ⟨f⟩p=∫f(x)q(x)p(x)q(x)dx=∫f(x)w(x)q(x)dx,w(x)=q(x)p(x)=⟨f⋅w⟩q≈N1i∑Nf(xi)w(xi)xi∼q(x)
-
q
(
x
)
q(x)
q(x):称为重要性分布(importance distribution),所谓的重要性采样,即是从这样一个简单的重要性分布中进行采样
实现重要性采样
import numpy as np
import matplotlib.pyplot as plt
from scipy.special import logsumexp
-
正态分布与均匀分布
-
f ( x ) = 1 σ 2 π e − 1 2 ( x − μ σ ) 2 f(x) = \frac{1}{\sigma \sqrt{2\pi} } e^{-\frac{1}{2}\left(\frac{x-\mu}{\sigma}\right)^2} f(x)=σ2π1e−21(σx−μ)2
- log f ( x ) = − 1 2 ( x − μ σ ) 2 \log f(x)=-\frac12\left(\frac{x-\mu}{\sigma}\right)^2 logf(x)=−21(σx−μ)2
-
f ( x ) = 1 b − a f(x)=\frac{1}{b-a} f(x)=b−a1
- log f ( x ) = − log ( b − a ) \log f(x)=-\log (b-a) logf(x)=−log(b−a)
-
-
w i = p ( x i ) q ( x i ) , log w i = log p ( x i ) − log q ( x i ) w_i=\frac{p(x_i)}{q(x_i)}, \log w_i=\log p(x_i)-\log q(x_i) wi=q(xi)p(xi),logwi=logp(xi)−logq(xi)
- w i = exp ( w i ) ∑ j exp ( w j ) w_i=\frac{\exp(w_i)}{\sum_j\exp(w_j)} wi=∑jexp(wj)exp(wi)
- log w i = w i − log ∑ j exp ( w j ) \log w_i=w_i-\log\sum_j\exp(w_j) logwi=wi−log∑jexp(wj)
class Pdf:
def __call__(self, x):
'''log prob'''
pass
def sample(self, n):
pass
# 待估计的分布
class Norm(Pdf):
def __init__(self, mu=0, sigma=1):
self.mu = mu
self.sigma = sigma
def __call__(self, x):
return -0.5*(x-self.mu)**2/self.sigma**2
# 不会被调用
def sample(self, n):
return np.random.normal(self.mu, self.sigma, n)
# 重要性分布
class Uniform(Pdf):
def __init__(self, low, high):
self.low = low
self.high = high
def __call__(self, x):
return np.repeat(-np.log(self.high - self.low), len(x))
def sample(self, n):
return np.random.uniform(self.low, self.high, n)
class ImportanceSampler:
def __init__(self, p_dist, q_dist):
self.p_dist = p_dist
self.q_dist = q_dist
def sample(self, n):
samples = self.q_dist.sample(n)
weights = self.calc_weights(samples)
norm_weights = weights - logsumexp(weights)
return samples, norm_weights
def calc_weights(self, samples):
# log(p(x)/q(x)) = log(p) - log(q)
return self.p_dist(samples) - self.q_dist(samples)
N = 200000
target_p = Norm()
imp_q = Uniform(-20, 30)
sampler = ImportanceSampler(target_p, imp_q)
biased_samples, logws = sampler.sample(N)
samples = np.random.choice(biased_samples, N, p=np.exp(logws))
_ = plt.hist(samples, bins=40)
上面就是一个重要性采样的结果,本质上是一个加权采样的过程。
- 对于高维
x
=
(
x
1
,
⋯
,
x
d
)
\mathbf x=(x_1,\cdots,x_d)
x=(x1,⋯,xd),
- target distribution p p p
- importance distribution q q q,
- 很难找到其 trivial importance distribution q q q
- 序列式地构建
q
q
q
- q ( x ) = q 1 ( x 1 ) q 2 ( x 2 ∣ x 1 ) ⋯ q d ( x d ∣ x 1 , x 2 , ⋯ , x d − 1 ) q(\mathbf x)=q_1(x_1)q_2(x_2|x_1)\cdots q_d(x_d|x_1,x_2,\cdots,x_{d-1}) q(x)=q1(x1)q2(x2∣x1)⋯qd(xd∣x1,x2,⋯,xd−1)
- 同样序列式地分解
p
p
p
- p ( x ) = p ( x 1 ) p ( x 2 ∣ x 1 ) ⋯ p ( x d ∣ x 1 , x 2 , ⋯ , x d − 1 ) p(\mathbf x)=p(x_1)p(x_2|x_1)\cdots p(x_d|x_1,x_2,\cdots,x_{d-1}) p(x)=p(x1)p(x2∣x1)⋯p(xd∣x1,x2,⋯,xd−1)
- 因此有
w ( x ) = p ( x ) q ( x ) = p ( x 1 ) p ( x 2 ∣ x 1 ) ⋯ p ( x d ∣ x 1 , x 2 , ⋯ , x d − 1 ) q 1 ( x 1 ) q 2 ( x 2 ∣ x 1 ) ⋯ q d ( x d ∣ x 1 , x 2 , ⋯ , x d − 1 ) = p ( x 1 ) p ( x 2 ∣ x 1 ) ⋯ q 1 ( x 1 ) q 2 ( x 2 ∣ x 1 ) ⋯ ⋅ p ( x d ∣ x 1 , x 2 , ⋯ , x d − 1 ) q d ( x d ∣ x 1 , x 2 , ⋯ , x d − 1 ) = w ( x t − 1 ) p ( x d ∣ x t − 1 ) q d ( x d ∣ x t − 1 ) \begin{split} w(\mathbf x)=\frac{p(\mathbf x)}{q(\mathbf x)}&=\frac{p(x_1)p(x_2|x_1)\cdots p(x_d|x_1,x_2,\cdots,x_{d-1})}{q_1(x_1)q_2(x_2|x_1)\cdots q_d(x_d|x_1,x_2,\cdots,x_{d-1})}\\ &=\frac{p(x_1)p(x_2|x_1)\cdots}{q_1(x_1)q_2(x_2|x_1)\cdots}\cdot\frac{p(x_d|x_1,x_2,\cdots,x_{d-1})}{q_d(x_d|x_1,x_2,\cdots,x_{d-1})}\\ &=w(\mathbf x_{t-1})\frac{p(x_d|\mathbf x_{t-1})}{q_d(x_d|\mathbf x_{t-1})} \end{split} w(x)=q(x)p(x)=q1(x1)q2(x2∣x1)⋯qd(xd∣x1,x2,⋯,xd−1)p(x1)p(x2∣x1)⋯p(xd∣x1,x2,⋯,xd−1)=q1(x1)q2(x2∣x1)⋯p(x1)p(x2∣x1)⋯⋅qd(xd∣x1,x2,⋯,xd−1)p(xd∣x1,x2,⋯,xd−1)=w(xt−1)qd(xd∣xt−1)p(xd∣xt−1)
-
n n n time steps, 得到 n n n 次 observations y 1 , ⋯ , y n ≡ y 1 : n y_1,\cdots,y_n\equiv y_{1:n} y1,⋯,yn≡y1:n,我们想知道观察背后的 x 1 , ⋯ , x n ≡ x 1 : n x_1,\cdots,x_n\equiv x_{1:n} x1,⋯,xn≡x1:n
- x 1 : n → y 1 : n x_{1:n}\rightarrow y_{1:n} x1:n→y1:n
-
先验后验分布
- 先验分布(prior distribution): P ( x 1 : n ) P(x_{1:n}) P(x1:n)
- 后验分布(posterior distribution): P ( x 1 : n ∣ y 1 : n ) P(x_{1:n}|y_{1:n}) P(x1:n∣y1:n)
-
假如 x 1 : n x_{1:n} x1:n 是 markov 的,即 P ( x n + 1 ∣ x 1 : n ) = P ( x n + 1 ∣ x n ) P(x_{n+1}|x_{1:n})=P(x_{n+1}|x_n) P(xn+1∣x1:n)=P(xn+1∣xn)
-
从 IS 到 SIS
- 假如先验分布 P ( x 1 : n ) P(x_{1:n}) P(x1:n) 就是重要性分布(importance distribution)为 q ( ⋅ ) q(\cdot) q(⋅)
- 则有第 i i i 个样本的 importance weight
w ( x n i ) ∝ w ( x n − 1 i ) p ( y n ∣ x n i ) w(x_n^i)\propto w(x_{n-1}^i)p(y_n|x_n^i) w(xni)∝w(xn−1i)p(yn∣xni)
- references
- http://www.stat.ucla.edu/~zhou/courses/Stats102C-IS.pdf
- https://gist.github.com/simeoncarstens/3d3cc9bb340edeecf21bb58f419da860
[蒙特卡洛方法] 03 接受/拒绝采样(accept/reject samping)初步 cases 分析
- references
- https://www.youtube.com/watch?v=kYWHfgkRc9s
- https://rh8liuqy.github.io/Accept_reject.html
import matplotlib
matplotlib.rcParams['figure.dpi'] = 150
Basics
- 也叫 accept/reject algorithm
- 一种采样 sampling method
- 采样效率(sample efficiency):接受率的问题
p ( x ) , q ( x ) p(x), \quad q(x) p(x),q(x)
- monte carlo (计算机随机采样)的方式估计未知(不知解析解)分布
Examples
以 f ( x ) = 1.2 − x 4 f(x)=1.2-x^4 f(x)=1.2−x4为例
import numpy as np
import matplotlib.pyplot as plt
def f(x):
return 1.2 - x**4
xs = np.linspace(0, 1, 1000)
ys = f(xs)
plt.plot(xs, ys, label="$f(x)=1.2-x^4$")
plt.fill_between(xs, ys, 0, alpha=0.2)
plt.xlim(0, 1)
plt.ylim(0, 1.25)
plt.xlabel("x")
plt.ylabel("y")
plt.legend()
第一个sample方法
在 x ∈ [ 0 , 1 ] , y ∈ [ 0 , 1.2 ] x\in[0,1],y\in[0,1.2] x∈[0,1],y∈[0,1.2]上采样,落在 f ( x ) f(x) f(x)以下的接受,反之拒绝,采样结果构成矩形区域
def sample(f, xmin=0, xmax=1, ymax=1.2):
while True:
x = np.random.uniform(low=xmin, high=xmax)
y = np.random.uniform(low=0, high=ymax)
if y < f(x):
return x
samples = [sample(f, ) for _ in range(100000)]
plt.plot(xs, ys, label='$f(x)$')
plt.hist(samples, density=True, alpha=.2, label='sample dist', color='green', bins=30)
plt.xlim(0, 1)
plt.ylim(0, 1.3)
plt.xlabel('x')
plt.ylabel('y')
plt.legend()
第二个sample方法(batchsample)
def batch_sample(f, num_samples, xmin=0, xmax=1., ymax=1.2, batch_size=1000):
sampels = []
while len(samples) < num_samples:
xs = np.random.uniform(low=xmin, high=xmax, size=batch_size)
ys = np.random.uniform(low=0, high=ymax, size=batch_size)
sampels.extend(xs[ys < f(xs)].tolist())
return samples[:num_samples]
samples = batch_sample(f, num_samples=10000)
plt.plot(xs, ys, label='$f(x)$')
plt.hist(samples, density=True, alpha=.2, label='sample dist', color='green', bins=30)
plt.xlim(0, 1)
plt.ylim(0, 1.3)
plt.xlabel('x')
plt.ylabel('y')
plt.legend()
很粗(每个batch大小为1000)
使用%timeit
可以测试运行时间:
%timeit [sample(f) for i in range(10000)]
# 48.9 ms ± 7.41 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%timeit batch_sample(f, 10000)
# 18.4 µs ± 78.1 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
👆单点采样的效率远不及批采样,但精度要好一些
采样效率
以 f ( x ) = e − π x 2 f(x)=e^{-\pi x^2} f(x)=e−πx2为例
gauss = lambda x: np.exp(-np.pi*x**2)
xs = np.linspace(-10, 10, 10000)
ys = gauss(xs)
plt.plot(xs, ys)
plt.fill_between(xs, ys, 0, alpha=0.2)
plt.xlabel("x"), plt.ylabel("f(x)")
使用batchsample:
def batch_sample(f, num_samples, xmin=-10, xmax=10, ymax=1):
x = np.random.uniform(low=xmin, high=xmax, size=num_samples)
y = np.random.uniform(low=0, high=ymax, size=num_samples)
passed = (y < f(x)).astype(int)
return x, y, passed
sample_xs, sample_ys, passed = batch_sample(gauss, 10000)
sum(passed)/10000 # 0.0511
绘图采样:
plt.plot(xs, ys)
plt.fill_between(xs, ys, alpha=0.2)
plt.scatter(sample_xs, sample_ys, c=passed, cmap='RdYlGn', vmin=-0.1, vmax=1.1, lw=0, s=2)
plt.xlim(-10, 10)
plt.ylim(0, 1)
plt.title(f'rej sampling efficiency: {sum(passed)/10000:.2%}')
[蒙特卡洛方法] 04 重要性采样补充,数学性质及 On-policy vs. Off-policy
- references
- https://speech.ee.ntu.edu.tw/~tlkagk/courses/MLDS_2018/Lecture/PPO%20(v3).pdf
- https://medium.com/@amir_masoud/a-simple-tutorial-on-sampling-importance-and-monte-carlo-with-python-codes-8ce809b91465
p ( x ) p(x) p(x) v.s. q ( x ) q(x) q(x)
通过引入 q ( x ) q(x) q(x),使得不好采样的 p ( x ) p(x) p(x)可间接采样,并且将on policy的梯度算法转化为目前流行的基于ppo dpo的off policy算法
- evaluate & sample
- p ( x ) p(x) p(x) is difficult to sample from.
- We can evaluate p ( x ) p(x) p(x).
- q ( x ) q(x) q(x) is easy to evaluate and sample from.
E x ∼ p f ( x ) ≈ 1 N ∑ i = 1 N f ( x i ) , x i ∼ p ( x ) E_{x\sim p}f(x)\approx\frac1N\sum_{i=1}^Nf(x^i), \quad x^i\sim p(x) Ex∼pf(x)≈N1i=1∑Nf(xi),xi∼p(x)
- 如果我们没办法直接从 p ( x ) p(x) p(x) 中采样数据,我们仅可以从 q ( x ) q(x) q(x) 中采样数据, x i ∼ q ( x ) x^i\sim q(x) xi∼q(x)
E x ∼ p [ f ( x ) ] = ∫ f ( x ) p ( x ) d x = ∫ f ( x ) p ( x ) q ( x ) ⏟ importance weight q ( x ) d x = E x ∼ q [ f ( x ) p ( x ) q ( x ) ] \begin{split} E_{x\sim p}[f(x)]&=\int f(x)p(x)dx=\int f(x)\underbrace{\frac{p(x)}{q(x)}}_{\text{importance weight}}q(x)dx\\ &=E_{x\sim q}[f(x)\frac{p(x)}{q(x)}] \end{split} Ex∼p[f(x)]=∫f(x)p(x)dx=∫f(x)importance weight q(x)p(x)q(x)dx=Ex∼q[f(x)q(x)p(x)]
-
这种变换尽管保证了期望相同,但方差是不同的
E x ∼ p [ f ( x ) ] = E x ∼ q [ f ( x ) p ( x ) q ( x ) ] E_{x\sim p}[f(x)]=E_{x\sim q}[f(x)\frac{p(x)}{q(x)}] Ex∼p[f(x)]=Ex∼q[f(x)q(x)p(x)]
Var x ∼ q [ f ( x ) p ( x ) q ( x ) ] = E x ∼ q [ ( f ( x ) p ( x ) q ( x ) ) 2 ] − ( E x ∼ q [ f ( x ) p ( x ) q ( x ) ] ) 2 = E x ∼ p [ f 2 ( x ) p ( x ) q ( x ) ] − ( E x ∼ p [ f ( x ) ] ) 2 \begin{split} \text{Var}_{x\sim q}[f(x)\frac{p(x)}{q(x)}]&=E_{x\sim q}[(f(x)\frac{p(x)}{q(x)})^2]-(E_{x\sim q}[f(x)\frac{p(x)}{q(x)}])^2\\ &=E_{x\sim p}[f^2(x)\frac{p(x)}{q(x)}] - (E_{x\sim p}[f(x)])^2 \end{split} Varx∼q[f(x)q(x)p(x)]=Ex∼q[(f(x)q(x)p(x))2]−(Ex∼q[f(x)q(x)p(x)])2=Ex∼p[f2(x)q(x)p(x)]−(Ex∼p[f(x)])2
-
数值稳定性及采样覆盖率的问题 => TRPO/PPO
这个例子就是一个极端的例子👆
- p ( x ) p(x) p(x)和 q ( x ) q(x) q(x)几乎完全对称(分布差异过大),因此按照 p ( x ) p(x) p(x)采样,大部分都是负的,也就是对应 q ( x ) q(x) q(x)的长尾部分,而 q ( x ) q(x) q(x)的众数部分反而权重很低。
- 实际上 f ( x ) f(x) f(x)的期望应该是正的(正的概率大)
- 而重要性采样的话,很可能算出来期望是负的
- 当然如果采样数足够多,是可以避免这个问题,但采样不足就容易出问题。(符号出问题,本质上是误差太大,数值稳定性不足)
RL: On-policy v.s. Off-policy
- On-policy:The agent learned and the agent interacting with the env is the same;
-
一边跟环境互动,一边在学习;
∇ R ˉ ( θ ) = E τ ∼ p θ ( τ ) [ R ( τ ) ∇ log p θ ( τ ) ] \nabla \bar R(\theta)=\mathbb E_{\tau\sim p_\theta(\tau)}[R(\tau)\nabla \log p_\theta(\tau)] ∇Rˉ(θ)=Eτ∼pθ(τ)[R(τ)∇logpθ(τ)] -
π θ \pi_\theta πθ to collect data, when θ \theta θ is updated, we have to sampling training data again;
- 这是一个数据利用效率的问题(on-policy的数据利用率太低)
-
- Off-policy:The agent learned and the agent interacting with the env is different;
- 看别人跟环境互动 (generate demonstration),再学习;
这两个里面的
π
ˉ
\bar \pi
πˉ就是对应后面的
π
θ
′
\pi_{\theta'}
πθ′或者
π
θ
o
l
d
\pi_{\theta_{\rm old}}
πθold
L
θ
o
l
d
I
S
(
θ
)
=
E
^
t
[
π
θ
(
a
t
∣
s
t
)
π
θ
o
l
d
(
a
t
∣
s
t
)
A
^
t
]
\mathcal{L}^{IS}_{\theta_{\rm old}}(\theta)=\mathbb{\hat E}_t\left[\frac{\pi_\theta(a_t|s_t)}{\pi_{\theta_{\rm old}}(a_t|s_t)}\hat A_t\right]
LθoldIS(θ)=E^t[πθold(at∣st)πθ(at∣st)A^t]
∇ R ˉ θ = E τ ∼ p θ ( τ ) [ R ( τ ) ∇ log p θ ( τ ) ] ∇ R ˉ θ = E τ ∼ p θ ′ ( τ ) [ p θ ( τ ) p θ ′ ( τ ) R ( τ ) ∇ log p θ ( τ ) ] \begin{split} \nabla \bar R_\theta=\mathbb E_{\tau\sim p_\theta(\tau)}[R(\tau)\nabla \log p_\theta(\tau)]\\ \nabla \bar R_\theta=\mathbb E_{\tau\sim p_{\theta'}(\tau)}[\frac{p_\theta(\tau)}{p_{\theta'}(\tau)}R(\tau)\nabla \log p_\theta(\tau)]\\ \end{split} ∇Rˉθ=Eτ∼pθ(τ)[R(τ)∇logpθ(τ)]∇Rˉθ=Eτ∼pθ′(τ)[pθ′(τ)pθ(τ)R(τ)∇logpθ(τ)]
- using the sample from
π
θ
′
\pi_{\theta'}
πθ′ to train
θ
\theta
θ.
θ
′
\theta'
θ′ is fixed, so we can re-use the sample again.
- 用 π θ ′ \pi_{\theta'} πθ′ 跟环境做互动,产生 demonstration
- sample data from θ ′ \theta' θ′ ( π θ ′ \pi_{\theta'} πθ′). Use the data to train θ \theta θ many times.
import numpy as np
import scipy.stats as stats
import matplotlib.pyplot as plt
import seaborn as sns
def f_x(x):
return 1/(1 + np.exp(-x))
def distribution(mu=0, sigma=1):
# return probability given a value
distribution = stats.norm(mu, sigma)
return distribution
n = 1000
p_mu = 3.5
p_sigma = 1
q_mu = 3
q_sigma = 1
p_x = distribution(p_mu, p_sigma)
q_x = distribution(q_mu, q_sigma)
plt.figure(figsize=[10, 4])
sns.distplot([np.random.normal(p_mu, p_sigma) for _ in range(3000)], label="distribution $p(x)$")
sns.distplot([np.random.normal(q_mu, q_sigma) for _ in range(3000)], label="distribution $q(x)$")
# value
s_list = []
for i in range(n):
# draw a sample
x_i = np.random.normal(p_mu, p_sigma)
s_list.append(f_x(x_i))
print(np.mean(s_list), np.var(s_list))
# calculate value sampling from a different distribution
value_list = []
for i in range(n):
# sample from different distribution
x_i = np.random.normal(q_mu, q_sigma)
value = f_x(x_i) * (p_x.pdf(x_i) / q_x.pdf(x_i))
value_list.append(value)
print("average {} variance {}".format(np.mean(value_list), np.var(value_list))) # average 0.9502806461397122 variance 0.36348771820129505
# pre-setting different q(x)
n = 5000
mu_target = 3.5
sigma_target = 1
mu_appro = 1
sigma_appro = 1
p_x = distribution(mu_target, sigma_target)
q_x = distribution(mu_appro, sigma_appro)
plt.figure(figsize=[10, 4])
sns.distplot([np.random.normal(mu_target, sigma_target) for _ in range(3000)], label="distribution $p(x)$")
sns.distplot([np.random.normal(mu_appro, sigma_appro) for _ in range(3000)], label="distribution $q(x)$")
plt.title("Distributions", size=16)
plt.legend()
# calculate value sampling from a different distribution
value_list = []
# need larger steps
for i in range(n):
# sample from different distribution
x_i = np.random.normal(mu_appro, sigma_appro)
value = f_x(x_i) * (p_x.pdf(x_i) / q_x.pdf(x_i))
value_list.append(value)
print("average {} variance {}".format(np.mean(value_list), np.var(value_list)))
data_target = np.random.normal(mu_target, sigma_target, 3000)
data_appro = np.random.normal(mu_appro, sigma_appro, 3000)
# 创建一个 FacetGrid,绘制两个分布
sns.displot([data_target, data_appro],
kind="hist", kde=True, stat="density", common_norm=False,
palette="pastel", aspect=1.5
)
plt.legend(labels=["distribution $p(x)$", "distribution $q(x)$"])
[蒙特卡洛方法] 05 接受拒绝采样框架、原理推导与分析(MCMC 基础)
前提:从目标分布中得到近似采样的结果。
我们只知道目标的PDF,这个PDF是不能积分得到CDF的。
generate observations from a distribution.
- target distribution vs. candidate distribution (proposal distribution)
p ( s ) = f ( s ) N C p(s)=\frac{f(s)}{NC} p(s)=NCf(s)- NC:Normalizing constant(是分子部分的积分,
∫
f
(
s
)
d
s
\int f(s)ds
∫f(s)ds)
- reduce any probability function to a probability density function(PDF) with total probability of one.
- 我们不知道目标分布(
p
(
s
)
p(s)
p(s)),只知道 numerator(
f
(
s
)
f(s)
f(s))
- 无法直接对 f ( s ) f(s) f(s) 进行采样;
- NC:Normalizing constant(是分子部分的积分,
∫
f
(
s
)
d
s
\int f(s)ds
∫f(s)ds)
- reject sampling
- 选择 g ( s ) g(s) g(s) 跟 p ( s ) p(s) p(s) 很接近,且易采样;
- scale g ( s ) g(s) g(s) by M, 确保 M ⋅ g ( s ) ≥ f ( s ) M\cdot g(s)\geq f(s) M⋅g(s)≥f(s)
- algorithm
- sample s s s from g ( s ) g(s) g(s)
- accept ratio: f ( s ) M g ( s ) \frac{f(s)}{Mg(s)} Mg(s)f(s)(一定是 <= 1 的)
- 可以做一个简单的推导(
D
(
s
∣
A
)
D(s|A)
D(s∣A),given accepted,这个样本
s
s
s 是
p
(
s
)
p(s)
p(s) 的样本的 density)
-
D
(
s
∣
A
)
=
P
(
A
∣
s
)
D
(
s
)
P
(
A
)
=
f
(
s
)
M
g
(
s
)
g
(
s
)
P
(
A
)
D(s|A)=\frac{P(A|s)D(s)}{P(A)}=\frac{\frac{f(s)}{Mg(s)}g(s)}{P(A)}
D(s∣A)=P(A)P(A∣s)D(s)=P(A)Mg(s)f(s)g(s)
- P ( A ) = ∫ g ( s ) f ( s ) M g ( s ) d s = 1 M ∫ f ( s ) d s = N C M P(A)=\int g(s)\frac{f(s)}{Mg(s)}ds=\frac1M\int f(s)ds=\frac{NC}M P(A)=∫g(s)Mg(s)f(s)ds=M1∫f(s)ds=MNC
- D ( s ∣ A ) = f ( s ) / M N C / M = f ( s ) / N C = p ( s ) D(s|A)=\frac{f(s)/M}{NC/M}=f(s)/NC=p(s) D(s∣A)=NC/Mf(s)/M=f(s)/NC=p(s)
-
D
(
s
∣
A
)
=
P
(
A
∣
s
)
D
(
s
)
P
(
A
)
=
f
(
s
)
M
g
(
s
)
g
(
s
)
P
(
A
)
D(s|A)=\frac{P(A|s)D(s)}{P(A)}=\frac{\frac{f(s)}{Mg(s)}g(s)}{P(A)}
D(s∣A)=P(A)P(A∣s)D(s)=P(A)Mg(s)f(s)g(s)
- M 越大,accept rate( P ( A ) P(A) P(A))越低,意味着采样效率(inefficient)
求出一个包络,这样我们就在包络(envelope)里采样,以防重要性采样时接受率太低,导致需要采样的次数太多。
举个例子:
f ( s ) = exp ( − s 4 4 + s 2 2 ) g ( s ) = 1 2 π exp ( − s 2 2 ) \begin{split} f(s)=\exp\left(\frac{-s^4}4 + \frac{s^2}2\right)\\ g(s)=\frac{1}{\sqrt{2\pi}}\exp\left(-\frac{s^2}2\right) \end{split} f(s)=exp(4−s4+2s2)g(s)=2π1exp(−2s2)
这里 f ( x ) f(x) f(x)不太好求积分,不知道应该除以多少才是积分为1,这里我们拿 g ( x ) g(x) g(x)来进行重要性采样
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm
# 设定随机种子以保证结果可重复
np.random.seed(42)
# 定义目标函数 f(s)
def f(s):
return np.exp(-s**4 / 4 + s**2 / 2)
# 定义提议分布 g(s) 和其概率密度函数
def g(s):
return norm.pdf(s, loc=0, scale=1) # 标准正态分布
# 确定常数 C
# 我们通过取 f(s)/g(s) 的最大值来估计 C
s_vals = np.linspace(-3, 3, 1000)
ratio = f(s_vals) / g(s_vals)
C = np.max(ratio) * 1.1 # 添加10%的安全边际
print(f"估计常数 C: {C:.4f}")
# 估计常数 C: 7.4951
# 接受-拒绝采样函数
def acceptance_rejection_sampling(f, g, C, size=10000):
samples = []
num_attempts = 0
while len(samples) < size:
s = np.random.normal(0, 1) # 从 g(s) 中采样
# 方式1
# u = np.random.uniform(0, C * g(s))
# if u < f(s):
# samples.append(s)
# 方式2
r = np.random.random()
if r < f(s) / (C*g(s)):
samples.append(s)
num_attempts += 1
acceptance_rate = size / num_attempts
return np.array(samples), acceptance_rate
# 进行采样
sample_size = 10000
samples, acc_rate = acceptance_rejection_sampling(f, g, C, size=sample_size)
print(f"接受率: {acc_rate*100:.2f}%")
# 接受率: 52.63%
这样就是在scale的常数 C = 7.4951 C=7.4951 C=7.4951之下(构造出包络),大约是52.63%的采样接受率的情况下
下面做一些可视化:
from scipy.integrate import quad
quad(f, -np.inf, np.inf)[0]
quad(f, -np.inf, np.inf)[0] / C
# 归一化 f(s) 以便比较(仅用于可视化,不用于采样)
def f_normalized(s):
# 数值积分 f(s) 以获得归一化常数
integral, _ = quad(f, -np.inf, np.inf)
return f(s) / integral
# 创建绘图
s_plot = np.linspace(-3, 3, 1000)
f_plot = f(s_plot)
g_plot = g(s_plot)
Cg_plot = C * g_plot
# 计算归一化的 f(s) 供比较
f_norm_plot = f_normalized(s_plot)
plt.figure(figsize=(12, 8))
# 绘制目标函数 f(s)
plt.plot(s_plot, f_plot, label='f(s) unnormalized', color='blue')
# 绘制提议分布 Cg(s)
plt.plot(s_plot, Cg_plot, label='C * g(s)', color='red', linestyle='--')
# 绘制归一化的 f(s)
plt.plot(s_plot, f_norm_plot, label='f(s) normalized', color='green')
plt.title(' f(s) vs. Cg(s)')
plt.xlabel('s')
plt.ylabel('density')
plt.legend()
plt.grid(True)
plt.show()
这里蓝色是原始的 f ( x ) f(x) f(x),红色虚线是构造出来的一个 f ( x ) f(x) f(x)的包络,最后绿线是 f ( x ) f(x) f(x)经过scale后的归一化结果
# 绘制采样结果的直方图与真实分布对比
plt.figure(figsize=(12, 6))
plt.hist(samples, bins=50, density=True, alpha=0.6, color='gray', label='samples')
# 绘制归一化的 f(s)
plt.plot(s_plot, f_norm_plot, label='f(s) normalized', color='blue')
# 绘制提议分布 g(s) 作为参考
plt.plot(s_plot, g_plot, label='g(s) normalized', color='red', linestyle='--')
plt.title('sample vs. true')
plt.xlabel('s')
plt.ylabel('density')
plt.legend()
plt.grid(True)
plt.show()
# 进一步评估:计算样本的均值和方差
sample_mean = np.mean(samples)
sample_var = np.var(samples)
print(f"样本均值: {sample_mean:.4f}")
print(f"样本方差: {sample_var:.4f}")
# 理论均值和方差(通过数值积分计算)
from scipy.integrate import quad
def compute_theoretical_moments():
# 归一化常数
integral, _ = quad(f, -np.inf, np.inf)
# 计算期望 E[s]
integrand_mean = lambda s: s * f(s)
mean, _ = quad(integrand_mean, -np.inf, np.inf)
mean /= integral
# 计算 E[s^2]
integrand_var = lambda s: s**2 * f(s)
mean_sq, _ = quad(integrand_var, -np.inf, np.inf)
mean_sq /= integral
var = mean_sq - mean**2
return mean, var
theoretical_mean, theoretical_var = compute_theoretical_moments()
print(f"理论均值: {theoretical_mean:.4f}")
print(f"理论方差: {theoretical_var:.4f}")
样本均值: 0.0125
样本方差: 1.0375
理论均值: 0.0000
理论方差: 1.0418
06 马尔可夫链(Markov Chain)与稳态分布和细致平稳条件(detailed balance)
- 马尔可夫性:给定当前状态,下一步的状态只与当前状态有关,而与过去的历史无关
- P ( X t + 1 = j ∣ X t = i , X t − 1 = i − 1 , ⋯ , X 0 = i 0 ) = P ( X t + 1 = j ∣ X t = i ) P(X_{t+1}=j|X_t=i,X_{t-1}=i-1,\cdots,X_0=i_0)=P(X_{t+1}=j|X_t=i) P(Xt+1=j∣Xt=i,Xt−1=i−1,⋯,X0=i0)=P(Xt+1=j∣Xt=i)
- 如果状态空间是有限的,通常用一个
N
×
N
N\times N
N×N的概率转移矩阵
P
P
P来刻画马尔可夫链
- P i j = P ( X t + 1 = j ∣ X t = i ) P_{ij}=P(X_{t+1}=j|X_t=i) Pij=P(Xt+1=j∣Xt=i)
- 行和为1,即任一状态 i i i 出发,转移到所有可能状态的概率之和为1;
- 稳态分布(stationary distribution)
- 若存在一个概率向量
π
=
(
π
1
,
π
2
,
⋯
,
π
N
)
\mathbf{\pi}=(\pi_1,\pi_2,\cdots,\pi_N)
π=(π1,π2,⋯,πN),使得无论初始分布如何,当
t
t
t 很大时,马尔可夫链的分布趋近于
π
\mathbf\pi
π,则称
π
\mathbf\pi
π 为稳态分布
- π P = π \mathbf\pi P=\mathbf\pi πP=π
- 并非所有的马尔可夫链都存在稳态分布(Stationary Distribution)。稳态分布的存在性和唯一性取决于马尔可夫链的性质,如其是否不可约、周期性以及状态的遍历性等。
- 若存在一个概率向量
π
=
(
π
1
,
π
2
,
⋯
,
π
N
)
\mathbf{\pi}=(\pi_1,\pi_2,\cdots,\pi_N)
π=(π1,π2,⋯,πN),使得无论初始分布如何,当
t
t
t 很大时,马尔可夫链的分布趋近于
π
\mathbf\pi
π,则称
π
\mathbf\pi
π 为稳态分布
- 细致平稳条件(detailed balance)
- 细致平稳条件,又称详细平衡条件或微观可逆性条件,指的是在稳态分布 π \pi π 下,系统从状态 i i i 转移到状态 j j j 的概率流和从状态 j j j 转移到状态 i i i 的概率流相同
- π i P i j = π j P j i , ∀ i , j \pi_iP_{ij}=\pi_jP_{ji},\forall i,j πiPij=πjPji,∀i,j
- 如果某马尔可夫链的稳态分布 π \pi π 满足细致平稳条件,则称该马尔可夫链是可逆的(reversible)
- 满足细致平衡条件的概率分布
π
\pi
π 必然是马尔可夫链的一个稳态分布
- ∑ i π i P i j = ∑ i π j P j i = π j ∑ i P ( j i ) = π j \sum_i \pi_iP_{ij}=\sum_i\pi_jP_{ji}=\pi_j\sum_iP(ji)=\pi_j ∑iπiPij=∑iπjPji=πj∑iP(ji)=πj
- π P = π \pi P=\pi πP=π
2状态MC链(满足细致平稳)
P = ( 0.6 0.4 0.2 0.8 ) P=\begin{pmatrix} 0.6 & 0.4 \\ 0.2 & 0.8 \end{pmatrix} P=(0.60.20.40.8)
- P 11 = 0.6 P_{11}=0.6 P11=0.6: 表示当前在状态 1 时,以 0.6 的概率留在状态 1,以 0.4 的概率转移到状态 2;
- P 22 = 0.8 P_{22}=0.8 P22=0.8: 表示当前在状态 2 时,以 0.8 的概率留在状态 2,以 0.2 的概率转移到状态 1);
- 求其稳态分布 π = ( π 1 , π 2 ) = ( 1 3 , 2 3 ) \pi=(\pi_1,\pi_2)=(\frac 13,\frac 23) π=(π1,π2)=(31,32)
- 验证细致平稳条件:
π
1
P
12
=
π
2
P
21
\pi_1P_{12}=\pi_2P_{21}
π1P12=π2P21
- 1/3 * 0.4 == 2/3 * 0.2
import numpy as np
import matplotlib.pyplot as plt
# 转移矩阵
P = np.array([
[0.6, 0.4],
[0.2, 0.8]
])
# ---------- (1) 迭代法求稳态分布 ----------
def iterate_stationary(P, max_iter=1000, tol=1e-12):
"""
通过迭代方法数值求稳态分布: 令 mu_{t+1} = mu_t * P.
"""
n = P.shape[0]
# 随机给个初始分布
mu = np.ones(n) / n # 比如取 [0.5, 0.5]
for i in range(max_iter):
new_mu = mu @ P
if np.linalg.norm(new_mu - mu) < tol:
print(f'convergence at iter: {i}')
break
mu = new_mu
return new_mu
mu_stationary = iterate_stationary(P)
print("数值迭代得到的稳态分布:", mu_stationary)
# convergence at iter: 29
# 数值迭代得到的稳态分布: [0.33333333 0.66666667]
# ---------- (2) 随机模拟并估计分布 ----------
def simulate_markov_chain(P, T=10000, start_state=0):
"""
在给定的转移矩阵 P 下进行 T 步模拟,
返回每个时刻的状态序列(状态从 0 开始标号)
"""
states = [start_state]
for t in range(T):
current = states[-1]
next_state = np.random.choice(len(P), p=P[current])
states.append(next_state)
return states
T = 10_000
start_state = 0 # 从状态0开始
states = simulate_markov_chain(P, T=T, start_state=start_state)
states_arr = np.array(states)
# 估计每个状态在后期出现的频率
burn_in = 1000 # 前1000步不计入统计,视作“热身”以逼近稳态
freq_state_0 = np.mean(states_arr[burn_in:] == 0)
freq_state_1 = np.mean(states_arr[burn_in:] == 1)
print("模拟后得到的状态占比: (state0, state1) =", (freq_state_0, freq_state_1))
# 模拟后得到的状态占比: (state0, state1) = (0.34562826352627485, 0.6543717364737252)
绘图
# plt.figure(figsize=(8, 3))
plt.plot(states_arr[:200], '-o', markersize=3)
plt.yticks([0, 1])
plt.title("state transition process for the first 200 steps")
plt.xlabel("step")
plt.ylabel("state")
3状态MC链(不满足细致平稳)
- 在一些应用场合下,马尔可夫链存在稳态分布,但并不满足细致平稳条件(也就是不可逆)。下面构造一个 3 状态 示例。
- 我们设状态空间为
{
0
,
1
,
2
}
\{0,1,2\}
{0,1,2},并定义转移矩阵
P = ( 0.5 0.2 0.3 0.3 0.5 0.2 0.2 0.3 0.5 ) P=\begin{pmatrix} 0.5 & 0.2 & 0.3\\ 0.3 & 0.5 & 0.2\\ 0.2 & 0.3 & 0.5 \end{pmatrix} P= 0.50.30.20.20.50.30.30.20.5 - 求得稳态分布是 ( 1 / 3 , 1 / 3 , 1 / 3 ) (1/3,1/3,1/3) (1/3,1/3,1/3)
P = np.array([
[0.5, 0.2, 0.3],
[0.3, 0.5, 0.2],
[0.2, 0.3, 0.5]
])
# ---------- (1) 迭代求稳态分布 ----------
mu_stationary = iterate_stationary(P)
print("3状态马尔可夫链的数值稳态分布:", mu_stationary)
# convergence at iter: 0
# 3状态马尔可夫链的数值稳态分布: [0.33333333 0.33333333 0.33333333]
# ---------- (2) 随机模拟 ----------
states_3 = simulate_markov_chain(P, T=10000, start_state=0)
states_3_arr = np.array(states_3)
burn_in = 1000
freq = []
for i in range(3):
freq.append(np.mean(states_3_arr[burn_in:] == i))
print("3状态马尔可夫链的模拟占比:", freq)
# 3状态马尔可夫链的模拟占比: [0.3420731029885568, 0.32663037440284415, 0.33129652260859904]
随机游走
经过t步后,粒子位置的分布近似于 N ( 0 , t ) N(0, t) N(0,t),其中:
- 均值为0(因为向左向右概率相等)
- 方差为 t = t 2 t=\sqrt t^2 t=t2(每步的方差为1,t步后方差累加)
from tqdm.notebook import tqdm
def simulate_multiple_walks(n_trials=1000, T=100000, start_state=0):
# 存储所有trials最终状态的数组
final_states = np.zeros(n_trials)
for i in tqdm(range(n_trials)):
# 生成单次随机游走
steps = np.random.choice([-1, 1], size=T)
final_states[i] = start_state + np.sum(steps)
return final_states
# 参数设置
T = 100000 # 每次游走的步数
n_trials = 1000 # 试验次数
# 进行多次随机游走
final_states = simulate_multiple_walks(n_trials=n_trials, T=T)
# 绘图
plt.figure(figsize=(12, 6))
plt.hist(final_states, bins=50, density=True, alpha=0.7, color='blue', label='Empirical')
# 添加理论分布曲线
theoretical_std = np.sqrt(T)
x = np.linspace(min(final_states), max(final_states), 1000)
theoretical_pdf = 1/(theoretical_std * np.sqrt(2*np.pi)) * np.exp(-x**2/(2*T))
plt.plot(x, theoretical_pdf, 'r--', label='Theoretical N(0,t)')
plt.title(f"Distribution of {n_trials} Random Walks Final States after {T} steps")
plt.xlabel("Final State")
plt.ylabel("Density")
plt.legend()
# 打印统计信息
print(f"Empirical mean: {np.mean(final_states):.2f}")
print(f"Empirical std: {np.std(final_states):.2f}")
print(f"Theoretical std: {theoretical_std:.2f}")
Empirical mean: 4.38
Empirical std: 323.70
Theoretical std: 316.23